1use crate::config::ActiveConfig;
2use crate::config_load::ConfigSelfHealReport;
3use crate::events::{EventSink, TracingEventSink};
4use crate::metrics::{AgentHealthState, AgentMetrics, AgentRuntimeState};
5use crate::runtime::DaemonRuntimeState;
6use arc_swap::ArcSwap;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::atomic::AtomicBool;
11use std::sync::{Arc, OnceLock};
12use tokio::process::Child;
13use tokio::sync::{Mutex, Notify};
14
15pub const MAX_CONCURRENT_TASKS: usize = 10;
17
18static TASK_SEMAPHORE: OnceLock<Arc<tokio::sync::Semaphore>> = OnceLock::new();
19
20pub fn task_semaphore() -> &'static Arc<tokio::sync::Semaphore> {
22 TASK_SEMAPHORE.get_or_init(|| Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_TASKS)))
23}
24
25#[derive(Clone)]
27pub struct ManagedState {
28 pub inner: Arc<InnerState>,
30}
31
32#[derive(Clone)]
34pub struct ConfigRuntimeSnapshot {
35 pub active_config: Arc<ActiveConfig>,
37 pub active_config_error: Option<String>,
39 pub active_config_notice: Option<ConfigSelfHealReport>,
41}
42
43impl ConfigRuntimeSnapshot {
44 pub fn new(
46 active_config: ActiveConfig,
47 active_config_error: Option<String>,
48 active_config_notice: Option<ConfigSelfHealReport>,
49 ) -> Self {
50 Self {
51 active_config: Arc::new(active_config),
52 active_config_error,
53 active_config_notice,
54 }
55 }
56}
57
58pub struct InnerState {
60 pub data_dir: PathBuf,
62 pub db_path: PathBuf,
64 pub unsafe_mode: bool,
66 pub async_database: Arc<crate::async_database::AsyncDatabase>,
68 pub logs_dir: PathBuf,
70 pub config_runtime: ArcSwap<ConfigRuntimeSnapshot>,
72 pub running: Mutex<HashMap<String, RunningTask>>,
74 pub agent_health: tokio::sync::RwLock<HashMap<String, AgentHealthState>>,
76 pub agent_metrics: tokio::sync::RwLock<HashMap<String, AgentMetrics>>,
78 pub agent_lifecycle: tokio::sync::RwLock<HashMap<String, AgentRuntimeState>>,
80 pub event_sink: std::sync::RwLock<Arc<dyn EventSink>>,
85 pub db_writer: Arc<crate::db_write::DbWriteCoordinator>,
87 pub session_store: Arc<crate::session_store::AsyncSessionStore>,
89 pub task_repo: Arc<crate::task_repository::AsyncSqliteTaskRepository>,
91 pub store_manager: crate::store::StoreManager,
93 pub plugin_policy: orchestrator_config::plugin_policy::PluginPolicy,
95 pub daemon_runtime: DaemonRuntimeState,
97 pub worker_notify: Arc<Notify>,
99 pub trigger_event_tx:
101 tokio::sync::broadcast::Sender<crate::trigger_engine::TriggerEventPayload>,
102 pub trigger_engine_handle: std::sync::Mutex<Option<crate::trigger_engine::TriggerEngineHandle>>,
104 pub fs_watcher_reload_tx: std::sync::Mutex<Option<tokio::sync::mpsc::Sender<()>>>,
107 pub task_enqueuer: Arc<dyn crate::scheduler_port::TaskEnqueuer>,
113}
114
115impl InnerState {
116 pub fn emit_event(
121 &self,
122 task_id: &str,
123 task_item_id: Option<&str>,
124 event_type: &str,
125 payload: Value,
126 ) {
127 let sink = clone_event_sink(self);
128 sink.emit(task_id, task_item_id, event_type, payload);
129
130 if matches!(event_type, "task_completed" | "task_failed") {
132 crate::trigger_engine::broadcast_task_event(
133 self,
134 crate::trigger_engine::TriggerEventPayload {
135 event_type: event_type.to_string(),
136 task_id: task_id.to_string(),
137 payload: None,
138 project: None,
139 },
140 );
141 }
142 }
143}
144
145#[derive(Clone)]
147pub struct RunningTask {
148 pub stop_flag: Arc<AtomicBool>,
150 pub child: Arc<Mutex<Option<Child>>>,
152}
153
154impl Default for RunningTask {
155 fn default() -> Self {
156 Self::new()
157 }
158}
159
160impl RunningTask {
161 pub fn new() -> Self {
163 Self {
164 stop_flag: Arc::new(AtomicBool::new(false)),
165 child: Arc::new(Mutex::new(None)),
166 }
167 }
168
169 pub fn fork(&self) -> Self {
173 Self {
174 stop_flag: Arc::clone(&self.stop_flag),
175 child: Arc::new(Mutex::new(None)),
176 }
177 }
178}
179
180pub fn config_runtime_snapshot(state: &InnerState) -> Arc<ConfigRuntimeSnapshot> {
182 state.config_runtime.load_full()
183}
184
185pub fn set_config_runtime_snapshot(state: &InnerState, snapshot: ConfigRuntimeSnapshot) {
187 state.config_runtime.store(Arc::new(snapshot));
188}
189
190pub fn update_config_runtime<R>(
192 state: &InnerState,
193 f: impl FnOnce(&ConfigRuntimeSnapshot) -> (ConfigRuntimeSnapshot, R),
194) -> R {
195 let current = state.config_runtime.load_full();
196 let (next, result) = f(current.as_ref());
197 state.config_runtime.store(Arc::new(next));
198 result
199}
200
201pub fn replace_active_config(state: &InnerState, active_config: ActiveConfig) {
203 update_config_runtime(state, |current| {
204 (
205 ConfigRuntimeSnapshot {
206 active_config: Arc::new(active_config),
207 active_config_error: current.active_config_error.clone(),
208 active_config_notice: current.active_config_notice.clone(),
209 },
210 (),
211 )
212 });
213}
214
215pub fn replace_active_config_status(
217 state: &InnerState,
218 active_config_error: Option<String>,
219 active_config_notice: Option<ConfigSelfHealReport>,
220) {
221 update_config_runtime(state, |current| {
222 (
223 ConfigRuntimeSnapshot {
224 active_config: Arc::clone(¤t.active_config),
225 active_config_error,
226 active_config_notice,
227 },
228 (),
229 )
230 });
231}
232
233pub fn clear_active_config_status(state: &InnerState) {
235 replace_active_config_status(state, None, None);
236}
237
238pub fn reset_active_config_to_default(state: &InnerState) {
240 set_config_runtime_snapshot(
241 state,
242 ConfigRuntimeSnapshot::new(
243 ActiveConfig {
244 config: Default::default(),
245 workspaces: Default::default(),
246 projects: Default::default(),
247 },
248 None,
249 None,
250 ),
251 );
252}
253
254pub fn clone_event_sink(state: &InnerState) -> Arc<dyn EventSink> {
256 match state.event_sink.read() {
257 Ok(guard) => guard.clone(),
258 Err(err) => {
259 drop(err.into_inner());
260 let mut guard = state
261 .event_sink
262 .write()
263 .unwrap_or_else(|poisoned| poisoned.into_inner());
264 *guard = Arc::new(TracingEventSink::new());
265 state.event_sink.clear_poison();
266 guard.clone()
267 }
268 }
269}
270
271pub fn replace_event_sink(state: &InnerState, sink: Arc<dyn EventSink>) {
273 match state.event_sink.write() {
274 Ok(mut guard) => *guard = sink,
275 Err(err) => {
276 let mut guard = err.into_inner();
277 *guard = sink;
278 state.event_sink.clear_poison();
279 }
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use crate::config_load::{read_active_config, read_loaded_config};
287 use crate::test_utils::TestState;
288 use std::sync::atomic::Ordering;
289
290 #[test]
291 fn running_task_starts_with_defaults() {
292 let runtime = RunningTask::new();
293 assert!(!runtime.stop_flag.load(Ordering::SeqCst));
294 assert!(runtime.child.try_lock().expect("lock child").is_none());
295 }
296
297 #[tokio::test]
298 async fn state_accessors_round_trip_agent_maps() {
299 let mut fixture = TestState::new();
300 let state = fixture.build();
301
302 state.agent_health.write().await.insert(
303 "echo".to_string(),
304 AgentHealthState {
305 consecutive_errors: 1,
306 diseased_until: None,
307 total_lifetime_errors: 1,
308 capability_health: HashMap::new(),
309 },
310 );
311 state
312 .agent_metrics
313 .write()
314 .await
315 .insert("echo".to_string(), AgentMetrics::default());
316
317 assert!(state.agent_health.read().await.contains_key("echo"));
318 assert!(state.agent_metrics.read().await.contains_key("echo"));
319 }
320
321 #[test]
322 fn emit_event_is_safe_with_noop_sink() {
323 let mut fixture = TestState::new();
324 let state = fixture.build();
325 state.emit_event(
326 "task-1",
327 Some("item-1"),
328 "heartbeat",
329 serde_json::json!({"ok": true}),
330 );
331 }
332
333 #[test]
334 fn update_config_runtime_replaces_snapshot_without_exposing_guards() {
335 let mut fixture = TestState::new();
336 let state = fixture.build();
337
338 let original = read_loaded_config(&state)
339 .expect("read loaded config")
340 .projects
341 .get(crate::config::DEFAULT_PROJECT_ID)
342 .and_then(|p| p.workflows.keys().next())
343 .cloned()
344 .unwrap_or_default();
345 update_config_runtime(&state, |current| {
346 let mut next = current.clone();
347 let workflow_clone = next
348 .active_config
349 .projects
350 .get(crate::config::DEFAULT_PROJECT_ID)
351 .and_then(|p| p.workflows.get(&original))
352 .cloned()
353 .expect("default workflow should exist");
354 Arc::make_mut(&mut next.active_config)
355 .projects
356 .entry(crate::config::DEFAULT_PROJECT_ID.to_string())
357 .or_insert_with(|| crate::config::ResolvedProject {
358 workspaces: HashMap::new(),
359 agents: HashMap::new(),
360 workflows: HashMap::new(),
361 step_templates: HashMap::new(),
362 env_stores: HashMap::new(),
363 secret_stores: HashMap::new(),
364 execution_profiles: HashMap::new(),
365 })
366 .workflows
367 .insert(format!("{}-updated", original), workflow_clone);
368 (next, ())
369 });
370
371 let updated_exists = read_loaded_config(&state)
372 .expect("re-read active config")
373 .projects
374 .get(crate::config::DEFAULT_PROJECT_ID)
375 .map(|p| p.workflows.contains_key(&format!("{}-updated", original)))
376 .unwrap_or(false);
377 assert!(updated_exists);
378 }
379
380 #[test]
381 fn read_active_config_rejects_non_runnable_snapshot() {
382 let mut fixture = TestState::new();
383 let state = fixture.build();
384
385 replace_active_config_status(
386 &state,
387 Some("active config is not runnable".to_string()),
388 None,
389 );
390
391 let error = read_active_config(&state).expect_err("non-runnable config should fail");
392 assert!(error.to_string().contains("not runnable"));
393 }
394
395 #[tokio::test]
396 async fn agent_health_and_metrics_reset_explicitly() {
397 let mut fixture = TestState::new();
398 let state = fixture.build();
399
400 state
401 .agent_health
402 .write()
403 .await
404 .insert("echo".to_string(), AgentHealthState::default());
405 state
406 .agent_metrics
407 .write()
408 .await
409 .insert("echo".to_string(), AgentMetrics::default());
410
411 state.agent_health.write().await.clear();
412 state.agent_metrics.write().await.clear();
413
414 assert!(state.agent_health.read().await.is_empty());
415 assert!(state.agent_metrics.read().await.is_empty());
416 }
417
418 #[test]
419 fn poisoned_event_sink_recovers_with_tracing_sink() {
420 let mut fixture = TestState::new();
421 let state = fixture.build();
422
423 let result = std::thread::spawn({
424 let state = state.clone();
425 move || {
426 let _guard = state.event_sink.write().expect("lock event_sink");
427 panic!("poison event_sink");
428 }
429 });
430 assert!(result.join().is_err());
431
432 state.emit_event(
433 "task-1",
434 Some("item-1"),
435 "heartbeat",
436 serde_json::json!({"ok": true}),
437 );
438
439 assert!(state.event_sink.read().is_ok());
440 }
441}