Skip to main content

agent_orchestrator/
state.rs

1use crate::collab::MessageBus;
2use crate::config::ActiveConfig;
3use crate::config_load::ConfigSelfHealReport;
4use crate::events::{EventSink, TracingEventSink};
5use crate::metrics::{AgentHealthState, AgentMetrics, AgentRuntimeState};
6use crate::runtime::DaemonRuntimeState;
7use arc_swap::ArcSwap;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::atomic::AtomicBool;
12use std::sync::{Arc, OnceLock};
13use tokio::process::Child;
14use tokio::sync::{Mutex, Notify};
15
16/// Maximum number of tasks that may run concurrently in-process.
17pub const MAX_CONCURRENT_TASKS: usize = 10;
18
19static TASK_SEMAPHORE: OnceLock<Arc<tokio::sync::Semaphore>> = OnceLock::new();
20
21/// Returns the global task-execution semaphore.
22pub fn task_semaphore() -> &'static Arc<tokio::sync::Semaphore> {
23    TASK_SEMAPHORE.get_or_init(|| Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_TASKS)))
24}
25
26/// Owned wrapper returned by bootstrap helpers.
27#[derive(Clone)]
28pub struct ManagedState {
29    /// Shared inner runtime state.
30    pub inner: Arc<InnerState>,
31}
32
33/// Snapshot of the currently active configuration and its status.
34#[derive(Clone)]
35pub struct ConfigRuntimeSnapshot {
36    /// Active runtime configuration.
37    pub active_config: Arc<ActiveConfig>,
38    /// Optional validation or load error for the active config.
39    pub active_config_error: Option<String>,
40    /// Optional self-heal notice associated with the active config.
41    pub active_config_notice: Option<ConfigSelfHealReport>,
42}
43
44impl ConfigRuntimeSnapshot {
45    /// Creates a runtime snapshot from configuration and status values.
46    pub fn new(
47        active_config: ActiveConfig,
48        active_config_error: Option<String>,
49        active_config_notice: Option<ConfigSelfHealReport>,
50    ) -> Self {
51        Self {
52            active_config: Arc::new(active_config),
53            active_config_error,
54            active_config_notice,
55        }
56    }
57}
58
59/// Shared daemon state referenced by services and scheduler code.
60pub struct InnerState {
61    /// Runtime data directory (`~/.orchestratord` by default).
62    pub data_dir: PathBuf,
63    /// SQLite database path.
64    pub db_path: PathBuf,
65    /// Whether unsafe mode is enabled.
66    pub unsafe_mode: bool,
67    /// Async database handle.
68    pub async_database: Arc<crate::async_database::AsyncDatabase>,
69    /// Directory containing task and command logs.
70    pub logs_dir: PathBuf,
71    /// Atomically swappable configuration snapshot.
72    pub config_runtime: ArcSwap<ConfigRuntimeSnapshot>,
73    /// Currently running tasks keyed by task ID.
74    pub running: Mutex<HashMap<String, RunningTask>>,
75    /// Runtime agent-health map.
76    pub agent_health: tokio::sync::RwLock<HashMap<String, AgentHealthState>>,
77    /// Runtime agent metrics map.
78    pub agent_metrics: tokio::sync::RwLock<HashMap<String, AgentMetrics>>,
79    /// Runtime agent lifecycle map.
80    pub agent_lifecycle: tokio::sync::RwLock<HashMap<String, AgentRuntimeState>>,
81    /// Collaboration message bus.
82    pub message_bus: Arc<MessageBus>,
83    // FR-016 sync exception: event emission must remain callable from sync and async
84    // paths without making the EventSink interface async. This lock is an
85    // observability boundary, not async main-path shared business state.
86    /// Event sink used by synchronous and asynchronous execution paths.
87    pub event_sink: std::sync::RwLock<Arc<dyn EventSink>>,
88    /// Serialized database write coordinator.
89    pub db_writer: Arc<crate::db_write::DbWriteCoordinator>,
90    /// Interactive session store.
91    pub session_store: Arc<crate::session_store::AsyncSessionStore>,
92    /// Async task repository wrapper.
93    pub task_repo: Arc<crate::task_repository::AsyncSqliteTaskRepository>,
94    /// Workflow store manager.
95    pub store_manager: crate::store::StoreManager,
96    /// Runtime daemon lifecycle state.
97    pub daemon_runtime: DaemonRuntimeState,
98    /// In-process wakeup channel for idle workers.
99    pub worker_notify: Arc<Notify>,
100    /// Broadcast channel for trigger-relevant task events (task_completed / task_failed).
101    pub trigger_event_tx:
102        tokio::sync::broadcast::Sender<crate::trigger_engine::TriggerEventPayload>,
103    /// Handle for notifying the trigger engine of config changes.
104    pub trigger_engine_handle: std::sync::Mutex<Option<crate::trigger_engine::TriggerEngineHandle>>,
105}
106
107impl InnerState {
108    /// Emits an event through the currently configured event sink.
109    ///
110    /// When the event is `task_completed` or `task_failed`, it is also broadcast
111    /// on `trigger_event_tx` so the trigger engine can evaluate event triggers.
112    pub fn emit_event(
113        &self,
114        task_id: &str,
115        task_item_id: Option<&str>,
116        event_type: &str,
117        payload: Value,
118    ) {
119        let sink = clone_event_sink(self);
120        sink.emit(task_id, task_item_id, event_type, payload);
121
122        // Broadcast to trigger engine for event-driven triggers.
123        if matches!(event_type, "task_completed" | "task_failed") {
124            crate::trigger_engine::broadcast_task_event(
125                self,
126                crate::trigger_engine::TriggerEventPayload {
127                    event_type: event_type.to_string(),
128                    task_id: task_id.to_string(),
129                    payload: None,
130                },
131            );
132        }
133    }
134}
135
136/// Mutable runtime handle for a task process.
137#[derive(Clone)]
138pub struct RunningTask {
139    /// Shared stop flag observed by all forked execution branches.
140    pub stop_flag: Arc<AtomicBool>,
141    /// Handle to the currently running child process, if any.
142    pub child: Arc<Mutex<Option<Child>>>,
143}
144
145impl Default for RunningTask {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151impl RunningTask {
152    /// Creates an empty running-task handle.
153    pub fn new() -> Self {
154        Self {
155            stop_flag: Arc::new(AtomicBool::new(false)),
156            child: Arc::new(Mutex::new(None)),
157        }
158    }
159
160    /// Create a sibling that shares `stop_flag` but has its own `child` slot.
161    /// Used for parallel item execution — stopping the task sets the shared flag,
162    /// which all forked items observe.
163    pub fn fork(&self) -> Self {
164        Self {
165            stop_flag: Arc::clone(&self.stop_flag),
166            child: Arc::new(Mutex::new(None)),
167        }
168    }
169}
170
171/// Loads the current configuration snapshot.
172pub fn config_runtime_snapshot(state: &InnerState) -> Arc<ConfigRuntimeSnapshot> {
173    state.config_runtime.load_full()
174}
175
176/// Replaces the current configuration snapshot atomically.
177pub fn set_config_runtime_snapshot(state: &InnerState, snapshot: ConfigRuntimeSnapshot) {
178    state.config_runtime.store(Arc::new(snapshot));
179}
180
181/// Updates the configuration snapshot using a read-modify-write closure.
182pub fn update_config_runtime<R>(
183    state: &InnerState,
184    f: impl FnOnce(&ConfigRuntimeSnapshot) -> (ConfigRuntimeSnapshot, R),
185) -> R {
186    let current = state.config_runtime.load_full();
187    let (next, result) = f(current.as_ref());
188    state.config_runtime.store(Arc::new(next));
189    result
190}
191
192/// Replaces only the active config while preserving status fields.
193pub fn replace_active_config(state: &InnerState, active_config: ActiveConfig) {
194    update_config_runtime(state, |current| {
195        (
196            ConfigRuntimeSnapshot {
197                active_config: Arc::new(active_config),
198                active_config_error: current.active_config_error.clone(),
199                active_config_notice: current.active_config_notice.clone(),
200            },
201            (),
202        )
203    });
204}
205
206/// Replaces the active config status fields while preserving the config itself.
207pub fn replace_active_config_status(
208    state: &InnerState,
209    active_config_error: Option<String>,
210    active_config_notice: Option<ConfigSelfHealReport>,
211) {
212    update_config_runtime(state, |current| {
213        (
214            ConfigRuntimeSnapshot {
215                active_config: Arc::clone(&current.active_config),
216                active_config_error,
217                active_config_notice,
218            },
219            (),
220        )
221    });
222}
223
224/// Clears active-config error and notice state.
225pub fn clear_active_config_status(state: &InnerState) {
226    replace_active_config_status(state, None, None);
227}
228
229/// Resets the active config to an empty default snapshot.
230pub fn reset_active_config_to_default(state: &InnerState) {
231    set_config_runtime_snapshot(
232        state,
233        ConfigRuntimeSnapshot::new(
234            ActiveConfig {
235                config: Default::default(),
236                workspaces: Default::default(),
237                projects: Default::default(),
238            },
239            None,
240            None,
241        ),
242    );
243}
244
245/// Clones the current event sink, recovering from poisoning if needed.
246pub fn clone_event_sink(state: &InnerState) -> Arc<dyn EventSink> {
247    match state.event_sink.read() {
248        Ok(guard) => guard.clone(),
249        Err(err) => {
250            drop(err.into_inner());
251            let mut guard = state
252                .event_sink
253                .write()
254                .unwrap_or_else(|poisoned| poisoned.into_inner());
255            *guard = Arc::new(TracingEventSink::new());
256            state.event_sink.clear_poison();
257            guard.clone()
258        }
259    }
260}
261
262/// Replaces the current event sink, recovering from poisoning if needed.
263pub fn replace_event_sink(state: &InnerState, sink: Arc<dyn EventSink>) {
264    match state.event_sink.write() {
265        Ok(mut guard) => *guard = sink,
266        Err(err) => {
267            let mut guard = err.into_inner();
268            *guard = sink;
269            state.event_sink.clear_poison();
270        }
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use crate::config_load::{read_active_config, read_loaded_config};
278    use crate::test_utils::TestState;
279    use std::sync::atomic::Ordering;
280
281    #[test]
282    fn running_task_starts_with_defaults() {
283        let runtime = RunningTask::new();
284        assert!(!runtime.stop_flag.load(Ordering::SeqCst));
285        assert!(runtime.child.try_lock().expect("lock child").is_none());
286    }
287
288    #[tokio::test]
289    async fn state_accessors_round_trip_agent_maps() {
290        let mut fixture = TestState::new();
291        let state = fixture.build();
292
293        state.agent_health.write().await.insert(
294            "echo".to_string(),
295            AgentHealthState {
296                consecutive_errors: 1,
297                diseased_until: None,
298                total_lifetime_errors: 1,
299                capability_health: HashMap::new(),
300            },
301        );
302        state
303            .agent_metrics
304            .write()
305            .await
306            .insert("echo".to_string(), AgentMetrics::default());
307
308        assert!(state.agent_health.read().await.contains_key("echo"));
309        assert!(state.agent_metrics.read().await.contains_key("echo"));
310    }
311
312    #[test]
313    fn emit_event_is_safe_with_noop_sink() {
314        let mut fixture = TestState::new();
315        let state = fixture.build();
316        state.emit_event(
317            "task-1",
318            Some("item-1"),
319            "heartbeat",
320            serde_json::json!({"ok": true}),
321        );
322    }
323
324    #[test]
325    fn update_config_runtime_replaces_snapshot_without_exposing_guards() {
326        let mut fixture = TestState::new();
327        let state = fixture.build();
328
329        let original = read_loaded_config(&state)
330            .expect("read loaded config")
331            .projects
332            .get(crate::config::DEFAULT_PROJECT_ID)
333            .and_then(|p| p.workflows.keys().next())
334            .cloned()
335            .unwrap_or_default();
336        update_config_runtime(&state, |current| {
337            let mut next = current.clone();
338            let workflow_clone = next
339                .active_config
340                .projects
341                .get(crate::config::DEFAULT_PROJECT_ID)
342                .and_then(|p| p.workflows.get(&original))
343                .cloned()
344                .expect("default workflow should exist");
345            Arc::make_mut(&mut next.active_config)
346                .projects
347                .entry(crate::config::DEFAULT_PROJECT_ID.to_string())
348                .or_insert_with(|| crate::config::ResolvedProject {
349                    workspaces: HashMap::new(),
350                    agents: HashMap::new(),
351                    workflows: HashMap::new(),
352                    step_templates: HashMap::new(),
353                    env_stores: HashMap::new(),
354                    execution_profiles: HashMap::new(),
355                })
356                .workflows
357                .insert(format!("{}-updated", original), workflow_clone);
358            (next, ())
359        });
360
361        let updated_exists = read_loaded_config(&state)
362            .expect("re-read active config")
363            .projects
364            .get(crate::config::DEFAULT_PROJECT_ID)
365            .map(|p| p.workflows.contains_key(&format!("{}-updated", original)))
366            .unwrap_or(false);
367        assert!(updated_exists);
368    }
369
370    #[test]
371    fn read_active_config_rejects_non_runnable_snapshot() {
372        let mut fixture = TestState::new();
373        let state = fixture.build();
374
375        replace_active_config_status(
376            &state,
377            Some("active config is not runnable".to_string()),
378            None,
379        );
380
381        let error = read_active_config(&state).expect_err("non-runnable config should fail");
382        assert!(error.to_string().contains("not runnable"));
383    }
384
385    #[tokio::test]
386    async fn agent_health_and_metrics_reset_explicitly() {
387        let mut fixture = TestState::new();
388        let state = fixture.build();
389
390        state
391            .agent_health
392            .write()
393            .await
394            .insert("echo".to_string(), AgentHealthState::default());
395        state
396            .agent_metrics
397            .write()
398            .await
399            .insert("echo".to_string(), AgentMetrics::default());
400
401        state.agent_health.write().await.clear();
402        state.agent_metrics.write().await.clear();
403
404        assert!(state.agent_health.read().await.is_empty());
405        assert!(state.agent_metrics.read().await.is_empty());
406    }
407
408    #[test]
409    fn poisoned_event_sink_recovers_with_tracing_sink() {
410        let mut fixture = TestState::new();
411        let state = fixture.build();
412
413        let result = std::thread::spawn({
414            let state = state.clone();
415            move || {
416                let _guard = state.event_sink.write().expect("lock event_sink");
417                panic!("poison event_sink");
418            }
419        });
420        assert!(result.join().is_err());
421
422        state.emit_event(
423            "task-1",
424            Some("item-1"),
425            "heartbeat",
426            serde_json::json!({"ok": true}),
427        );
428
429        assert!(state.event_sink.read().is_ok());
430    }
431}