Skip to main content

agent_orchestrator/
state.rs

1use crate::collab::MessageBus;
2use crate::config::ActiveConfig;
3use crate::config_load::ConfigSelfHealReport;
4use crate::events::{EventSink, TracingEventSink};
5use crate::metrics::{AgentHealthState, AgentMetrics, AgentRuntimeState};
6use crate::runtime::DaemonRuntimeState;
7use arc_swap::ArcSwap;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::atomic::AtomicBool;
12use std::sync::{Arc, OnceLock};
13use tokio::process::Child;
14use tokio::sync::{Mutex, Notify};
15
16/// Maximum number of tasks that may run concurrently in-process.
17pub const MAX_CONCURRENT_TASKS: usize = 10;
18
19static TASK_SEMAPHORE: OnceLock<Arc<tokio::sync::Semaphore>> = OnceLock::new();
20
21/// Returns the global task-execution semaphore.
22pub fn task_semaphore() -> &'static Arc<tokio::sync::Semaphore> {
23    TASK_SEMAPHORE.get_or_init(|| Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_TASKS)))
24}
25
26/// Owned wrapper returned by bootstrap helpers.
27#[derive(Clone)]
28pub struct ManagedState {
29    /// Shared inner runtime state.
30    pub inner: Arc<InnerState>,
31}
32
33/// Snapshot of the currently active configuration and its status.
34#[derive(Clone)]
35pub struct ConfigRuntimeSnapshot {
36    /// Active runtime configuration.
37    pub active_config: Arc<ActiveConfig>,
38    /// Optional validation or load error for the active config.
39    pub active_config_error: Option<String>,
40    /// Optional self-heal notice associated with the active config.
41    pub active_config_notice: Option<ConfigSelfHealReport>,
42}
43
44impl ConfigRuntimeSnapshot {
45    /// Creates a runtime snapshot from configuration and status values.
46    pub fn new(
47        active_config: ActiveConfig,
48        active_config_error: Option<String>,
49        active_config_notice: Option<ConfigSelfHealReport>,
50    ) -> Self {
51        Self {
52            active_config: Arc::new(active_config),
53            active_config_error,
54            active_config_notice,
55        }
56    }
57}
58
59/// Shared daemon state referenced by services and scheduler code.
60pub struct InnerState {
61    /// Runtime data directory (`~/.orchestratord` by default).
62    pub data_dir: PathBuf,
63    /// SQLite database path.
64    pub db_path: PathBuf,
65    /// Whether unsafe mode is enabled.
66    pub unsafe_mode: bool,
67    /// Async database handle.
68    pub async_database: Arc<crate::async_database::AsyncDatabase>,
69    /// Directory containing task and command logs.
70    pub logs_dir: PathBuf,
71    /// Atomically swappable configuration snapshot.
72    pub config_runtime: ArcSwap<ConfigRuntimeSnapshot>,
73    /// Currently running tasks keyed by task ID.
74    pub running: Mutex<HashMap<String, RunningTask>>,
75    /// Runtime agent-health map.
76    pub agent_health: tokio::sync::RwLock<HashMap<String, AgentHealthState>>,
77    /// Runtime agent metrics map.
78    pub agent_metrics: tokio::sync::RwLock<HashMap<String, AgentMetrics>>,
79    /// Runtime agent lifecycle map.
80    pub agent_lifecycle: tokio::sync::RwLock<HashMap<String, AgentRuntimeState>>,
81    /// Collaboration message bus.
82    pub message_bus: Arc<MessageBus>,
83    // FR-016 sync exception: event emission must remain callable from sync and async
84    // paths without making the EventSink interface async. This lock is an
85    // observability boundary, not async main-path shared business state.
86    /// Event sink used by synchronous and asynchronous execution paths.
87    pub event_sink: std::sync::RwLock<Arc<dyn EventSink>>,
88    /// Serialized database write coordinator.
89    pub db_writer: Arc<crate::db_write::DbWriteCoordinator>,
90    /// Interactive session store.
91    pub session_store: Arc<crate::session_store::AsyncSessionStore>,
92    /// Async task repository wrapper.
93    pub task_repo: Arc<crate::task_repository::AsyncSqliteTaskRepository>,
94    /// Workflow store manager.
95    pub store_manager: crate::store::StoreManager,
96    /// Runtime daemon lifecycle state.
97    pub daemon_runtime: DaemonRuntimeState,
98    /// In-process wakeup channel for idle workers.
99    pub worker_notify: Arc<Notify>,
100    /// Broadcast channel for trigger-relevant task events (task_completed / task_failed).
101    pub trigger_event_tx:
102        tokio::sync::broadcast::Sender<crate::trigger_engine::TriggerEventPayload>,
103    /// Handle for notifying the trigger engine of config changes.
104    pub trigger_engine_handle: std::sync::Mutex<Option<crate::trigger_engine::TriggerEngineHandle>>,
105    /// Handle for notifying the filesystem watcher of config changes.
106    /// Set by the daemon; `None` in CLI-only contexts.
107    pub fs_watcher_reload_tx: std::sync::Mutex<Option<tokio::sync::mpsc::Sender<()>>>,
108    /// Scheduler port for cross-crate task enqueue dispatch.
109    ///
110    /// Wired with the real scheduler implementation by the daemon; defaults to
111    /// [`NoopTaskEnqueuer`](crate::scheduler_port::NoopTaskEnqueuer) in tests
112    /// and CLI-only contexts.
113    pub task_enqueuer: Arc<dyn crate::scheduler_port::TaskEnqueuer>,
114}
115
116impl InnerState {
117    /// Emits an event through the currently configured event sink.
118    ///
119    /// When the event is `task_completed` or `task_failed`, it is also broadcast
120    /// on `trigger_event_tx` so the trigger engine can evaluate event triggers.
121    pub fn emit_event(
122        &self,
123        task_id: &str,
124        task_item_id: Option<&str>,
125        event_type: &str,
126        payload: Value,
127    ) {
128        let sink = clone_event_sink(self);
129        sink.emit(task_id, task_item_id, event_type, payload);
130
131        // Broadcast to trigger engine for event-driven triggers.
132        if matches!(event_type, "task_completed" | "task_failed") {
133            crate::trigger_engine::broadcast_task_event(
134                self,
135                crate::trigger_engine::TriggerEventPayload {
136                    event_type: event_type.to_string(),
137                    task_id: task_id.to_string(),
138                    payload: None,
139                    project: None,
140                },
141            );
142        }
143    }
144}
145
146/// Mutable runtime handle for a task process.
147#[derive(Clone)]
148pub struct RunningTask {
149    /// Shared stop flag observed by all forked execution branches.
150    pub stop_flag: Arc<AtomicBool>,
151    /// Handle to the currently running child process, if any.
152    pub child: Arc<Mutex<Option<Child>>>,
153}
154
155impl Default for RunningTask {
156    fn default() -> Self {
157        Self::new()
158    }
159}
160
161impl RunningTask {
162    /// Creates an empty running-task handle.
163    pub fn new() -> Self {
164        Self {
165            stop_flag: Arc::new(AtomicBool::new(false)),
166            child: Arc::new(Mutex::new(None)),
167        }
168    }
169
170    /// Create a sibling that shares `stop_flag` but has its own `child` slot.
171    /// Used for parallel item execution — stopping the task sets the shared flag,
172    /// which all forked items observe.
173    pub fn fork(&self) -> Self {
174        Self {
175            stop_flag: Arc::clone(&self.stop_flag),
176            child: Arc::new(Mutex::new(None)),
177        }
178    }
179}
180
181/// Loads the current configuration snapshot.
182pub fn config_runtime_snapshot(state: &InnerState) -> Arc<ConfigRuntimeSnapshot> {
183    state.config_runtime.load_full()
184}
185
186/// Replaces the current configuration snapshot atomically.
187pub fn set_config_runtime_snapshot(state: &InnerState, snapshot: ConfigRuntimeSnapshot) {
188    state.config_runtime.store(Arc::new(snapshot));
189}
190
191/// Updates the configuration snapshot using a read-modify-write closure.
192pub fn update_config_runtime<R>(
193    state: &InnerState,
194    f: impl FnOnce(&ConfigRuntimeSnapshot) -> (ConfigRuntimeSnapshot, R),
195) -> R {
196    let current = state.config_runtime.load_full();
197    let (next, result) = f(current.as_ref());
198    state.config_runtime.store(Arc::new(next));
199    result
200}
201
202/// Replaces only the active config while preserving status fields.
203pub fn replace_active_config(state: &InnerState, active_config: ActiveConfig) {
204    update_config_runtime(state, |current| {
205        (
206            ConfigRuntimeSnapshot {
207                active_config: Arc::new(active_config),
208                active_config_error: current.active_config_error.clone(),
209                active_config_notice: current.active_config_notice.clone(),
210            },
211            (),
212        )
213    });
214}
215
216/// Replaces the active config status fields while preserving the config itself.
217pub fn replace_active_config_status(
218    state: &InnerState,
219    active_config_error: Option<String>,
220    active_config_notice: Option<ConfigSelfHealReport>,
221) {
222    update_config_runtime(state, |current| {
223        (
224            ConfigRuntimeSnapshot {
225                active_config: Arc::clone(&current.active_config),
226                active_config_error,
227                active_config_notice,
228            },
229            (),
230        )
231    });
232}
233
234/// Clears active-config error and notice state.
235pub fn clear_active_config_status(state: &InnerState) {
236    replace_active_config_status(state, None, None);
237}
238
239/// Resets the active config to an empty default snapshot.
240pub fn reset_active_config_to_default(state: &InnerState) {
241    set_config_runtime_snapshot(
242        state,
243        ConfigRuntimeSnapshot::new(
244            ActiveConfig {
245                config: Default::default(),
246                workspaces: Default::default(),
247                projects: Default::default(),
248            },
249            None,
250            None,
251        ),
252    );
253}
254
255/// Clones the current event sink, recovering from poisoning if needed.
256pub fn clone_event_sink(state: &InnerState) -> Arc<dyn EventSink> {
257    match state.event_sink.read() {
258        Ok(guard) => guard.clone(),
259        Err(err) => {
260            drop(err.into_inner());
261            let mut guard = state
262                .event_sink
263                .write()
264                .unwrap_or_else(|poisoned| poisoned.into_inner());
265            *guard = Arc::new(TracingEventSink::new());
266            state.event_sink.clear_poison();
267            guard.clone()
268        }
269    }
270}
271
272/// Replaces the current event sink, recovering from poisoning if needed.
273pub fn replace_event_sink(state: &InnerState, sink: Arc<dyn EventSink>) {
274    match state.event_sink.write() {
275        Ok(mut guard) => *guard = sink,
276        Err(err) => {
277            let mut guard = err.into_inner();
278            *guard = sink;
279            state.event_sink.clear_poison();
280        }
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::config_load::{read_active_config, read_loaded_config};
288    use crate::test_utils::TestState;
289    use std::sync::atomic::Ordering;
290
291    #[test]
292    fn running_task_starts_with_defaults() {
293        let runtime = RunningTask::new();
294        assert!(!runtime.stop_flag.load(Ordering::SeqCst));
295        assert!(runtime.child.try_lock().expect("lock child").is_none());
296    }
297
298    #[tokio::test]
299    async fn state_accessors_round_trip_agent_maps() {
300        let mut fixture = TestState::new();
301        let state = fixture.build();
302
303        state.agent_health.write().await.insert(
304            "echo".to_string(),
305            AgentHealthState {
306                consecutive_errors: 1,
307                diseased_until: None,
308                total_lifetime_errors: 1,
309                capability_health: HashMap::new(),
310            },
311        );
312        state
313            .agent_metrics
314            .write()
315            .await
316            .insert("echo".to_string(), AgentMetrics::default());
317
318        assert!(state.agent_health.read().await.contains_key("echo"));
319        assert!(state.agent_metrics.read().await.contains_key("echo"));
320    }
321
322    #[test]
323    fn emit_event_is_safe_with_noop_sink() {
324        let mut fixture = TestState::new();
325        let state = fixture.build();
326        state.emit_event(
327            "task-1",
328            Some("item-1"),
329            "heartbeat",
330            serde_json::json!({"ok": true}),
331        );
332    }
333
334    #[test]
335    fn update_config_runtime_replaces_snapshot_without_exposing_guards() {
336        let mut fixture = TestState::new();
337        let state = fixture.build();
338
339        let original = read_loaded_config(&state)
340            .expect("read loaded config")
341            .projects
342            .get(crate::config::DEFAULT_PROJECT_ID)
343            .and_then(|p| p.workflows.keys().next())
344            .cloned()
345            .unwrap_or_default();
346        update_config_runtime(&state, |current| {
347            let mut next = current.clone();
348            let workflow_clone = next
349                .active_config
350                .projects
351                .get(crate::config::DEFAULT_PROJECT_ID)
352                .and_then(|p| p.workflows.get(&original))
353                .cloned()
354                .expect("default workflow should exist");
355            Arc::make_mut(&mut next.active_config)
356                .projects
357                .entry(crate::config::DEFAULT_PROJECT_ID.to_string())
358                .or_insert_with(|| crate::config::ResolvedProject {
359                    workspaces: HashMap::new(),
360                    agents: HashMap::new(),
361                    workflows: HashMap::new(),
362                    step_templates: HashMap::new(),
363                    env_stores: HashMap::new(),
364                    secret_stores: HashMap::new(),
365                    execution_profiles: HashMap::new(),
366                })
367                .workflows
368                .insert(format!("{}-updated", original), workflow_clone);
369            (next, ())
370        });
371
372        let updated_exists = read_loaded_config(&state)
373            .expect("re-read active config")
374            .projects
375            .get(crate::config::DEFAULT_PROJECT_ID)
376            .map(|p| p.workflows.contains_key(&format!("{}-updated", original)))
377            .unwrap_or(false);
378        assert!(updated_exists);
379    }
380
381    #[test]
382    fn read_active_config_rejects_non_runnable_snapshot() {
383        let mut fixture = TestState::new();
384        let state = fixture.build();
385
386        replace_active_config_status(
387            &state,
388            Some("active config is not runnable".to_string()),
389            None,
390        );
391
392        let error = read_active_config(&state).expect_err("non-runnable config should fail");
393        assert!(error.to_string().contains("not runnable"));
394    }
395
396    #[tokio::test]
397    async fn agent_health_and_metrics_reset_explicitly() {
398        let mut fixture = TestState::new();
399        let state = fixture.build();
400
401        state
402            .agent_health
403            .write()
404            .await
405            .insert("echo".to_string(), AgentHealthState::default());
406        state
407            .agent_metrics
408            .write()
409            .await
410            .insert("echo".to_string(), AgentMetrics::default());
411
412        state.agent_health.write().await.clear();
413        state.agent_metrics.write().await.clear();
414
415        assert!(state.agent_health.read().await.is_empty());
416        assert!(state.agent_metrics.read().await.is_empty());
417    }
418
419    #[test]
420    fn poisoned_event_sink_recovers_with_tracing_sink() {
421        let mut fixture = TestState::new();
422        let state = fixture.build();
423
424        let result = std::thread::spawn({
425            let state = state.clone();
426            move || {
427                let _guard = state.event_sink.write().expect("lock event_sink");
428                panic!("poison event_sink");
429            }
430        });
431        assert!(result.join().is_err());
432
433        state.emit_event(
434            "task-1",
435            Some("item-1"),
436            "heartbeat",
437            serde_json::json!({"ok": true}),
438        );
439
440        assert!(state.event_sink.read().is_ok());
441    }
442}