Skip to main content

agent_orchestrator/
state.rs

1use crate::collab::MessageBus;
2use crate::config::ActiveConfig;
3use crate::config_load::ConfigSelfHealReport;
4use crate::events::{EventSink, TracingEventSink};
5use crate::metrics::{AgentHealthState, AgentMetrics, AgentRuntimeState};
6use crate::runtime::DaemonRuntimeState;
7use arc_swap::ArcSwap;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::atomic::AtomicBool;
12use std::sync::{Arc, OnceLock};
13use tokio::process::Child;
14use tokio::sync::{Mutex, Notify};
15
16/// Maximum number of tasks that may run concurrently in-process.
17pub const MAX_CONCURRENT_TASKS: usize = 10;
18
19static TASK_SEMAPHORE: OnceLock<Arc<tokio::sync::Semaphore>> = OnceLock::new();
20
21/// Returns the global task-execution semaphore.
22pub fn task_semaphore() -> &'static Arc<tokio::sync::Semaphore> {
23    TASK_SEMAPHORE.get_or_init(|| Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_TASKS)))
24}
25
26/// Owned wrapper returned by bootstrap helpers.
27#[derive(Clone)]
28pub struct ManagedState {
29    /// Shared inner runtime state.
30    pub inner: Arc<InnerState>,
31}
32
33/// Snapshot of the currently active configuration and its status.
34#[derive(Clone)]
35pub struct ConfigRuntimeSnapshot {
36    /// Active runtime configuration.
37    pub active_config: Arc<ActiveConfig>,
38    /// Optional validation or load error for the active config.
39    pub active_config_error: Option<String>,
40    /// Optional self-heal notice associated with the active config.
41    pub active_config_notice: Option<ConfigSelfHealReport>,
42}
43
44impl ConfigRuntimeSnapshot {
45    /// Creates a runtime snapshot from configuration and status values.
46    pub fn new(
47        active_config: ActiveConfig,
48        active_config_error: Option<String>,
49        active_config_notice: Option<ConfigSelfHealReport>,
50    ) -> Self {
51        Self {
52            active_config: Arc::new(active_config),
53            active_config_error,
54            active_config_notice,
55        }
56    }
57}
58
59/// Shared daemon state referenced by services and scheduler code.
60pub struct InnerState {
61    /// Runtime data directory (`~/.orchestratord` by default).
62    pub data_dir: PathBuf,
63    /// SQLite database path.
64    pub db_path: PathBuf,
65    /// Whether unsafe mode is enabled.
66    pub unsafe_mode: bool,
67    /// Async database handle.
68    pub async_database: Arc<crate::async_database::AsyncDatabase>,
69    /// Directory containing task and command logs.
70    pub logs_dir: PathBuf,
71    /// Atomically swappable configuration snapshot.
72    pub config_runtime: ArcSwap<ConfigRuntimeSnapshot>,
73    /// Currently running tasks keyed by task ID.
74    pub running: Mutex<HashMap<String, RunningTask>>,
75    /// Runtime agent-health map.
76    pub agent_health: tokio::sync::RwLock<HashMap<String, AgentHealthState>>,
77    /// Runtime agent metrics map.
78    pub agent_metrics: tokio::sync::RwLock<HashMap<String, AgentMetrics>>,
79    /// Runtime agent lifecycle map.
80    pub agent_lifecycle: tokio::sync::RwLock<HashMap<String, AgentRuntimeState>>,
81    /// Collaboration message bus.
82    pub message_bus: Arc<MessageBus>,
83    // FR-016 sync exception: event emission must remain callable from sync and async
84    // paths without making the EventSink interface async. This lock is an
85    // observability boundary, not async main-path shared business state.
86    /// Event sink used by synchronous and asynchronous execution paths.
87    pub event_sink: std::sync::RwLock<Arc<dyn EventSink>>,
88    /// Serialized database write coordinator.
89    pub db_writer: Arc<crate::db_write::DbWriteCoordinator>,
90    /// Interactive session store.
91    pub session_store: Arc<crate::session_store::AsyncSessionStore>,
92    /// Async task repository wrapper.
93    pub task_repo: Arc<crate::task_repository::AsyncSqliteTaskRepository>,
94    /// Workflow store manager.
95    pub store_manager: crate::store::StoreManager,
96    /// Runtime daemon lifecycle state.
97    pub daemon_runtime: DaemonRuntimeState,
98    /// In-process wakeup channel for idle workers.
99    pub worker_notify: Arc<Notify>,
100    /// Broadcast channel for trigger-relevant task events (task_completed / task_failed).
101    pub trigger_event_tx:
102        tokio::sync::broadcast::Sender<crate::trigger_engine::TriggerEventPayload>,
103    /// Handle for notifying the trigger engine of config changes.
104    pub trigger_engine_handle: std::sync::Mutex<Option<crate::trigger_engine::TriggerEngineHandle>>,
105}
106
107impl InnerState {
108    /// Emits an event through the currently configured event sink.
109    ///
110    /// When the event is `task_completed` or `task_failed`, it is also broadcast
111    /// on `trigger_event_tx` so the trigger engine can evaluate event triggers.
112    pub fn emit_event(
113        &self,
114        task_id: &str,
115        task_item_id: Option<&str>,
116        event_type: &str,
117        payload: Value,
118    ) {
119        let sink = clone_event_sink(self);
120        sink.emit(task_id, task_item_id, event_type, payload);
121
122        // Broadcast to trigger engine for event-driven triggers.
123        if matches!(event_type, "task_completed" | "task_failed") {
124            crate::trigger_engine::broadcast_task_event(
125                self,
126                crate::trigger_engine::TriggerEventPayload {
127                    event_type: event_type.to_string(),
128                    task_id: task_id.to_string(),
129                },
130            );
131        }
132    }
133}
134
135/// Mutable runtime handle for a task process.
136#[derive(Clone)]
137pub struct RunningTask {
138    /// Shared stop flag observed by all forked execution branches.
139    pub stop_flag: Arc<AtomicBool>,
140    /// Handle to the currently running child process, if any.
141    pub child: Arc<Mutex<Option<Child>>>,
142}
143
144impl Default for RunningTask {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150impl RunningTask {
151    /// Creates an empty running-task handle.
152    pub fn new() -> Self {
153        Self {
154            stop_flag: Arc::new(AtomicBool::new(false)),
155            child: Arc::new(Mutex::new(None)),
156        }
157    }
158
159    /// Create a sibling that shares `stop_flag` but has its own `child` slot.
160    /// Used for parallel item execution — stopping the task sets the shared flag,
161    /// which all forked items observe.
162    pub fn fork(&self) -> Self {
163        Self {
164            stop_flag: Arc::clone(&self.stop_flag),
165            child: Arc::new(Mutex::new(None)),
166        }
167    }
168}
169
170/// Loads the current configuration snapshot.
171pub fn config_runtime_snapshot(state: &InnerState) -> Arc<ConfigRuntimeSnapshot> {
172    state.config_runtime.load_full()
173}
174
175/// Replaces the current configuration snapshot atomically.
176pub fn set_config_runtime_snapshot(state: &InnerState, snapshot: ConfigRuntimeSnapshot) {
177    state.config_runtime.store(Arc::new(snapshot));
178}
179
180/// Updates the configuration snapshot using a read-modify-write closure.
181pub fn update_config_runtime<R>(
182    state: &InnerState,
183    f: impl FnOnce(&ConfigRuntimeSnapshot) -> (ConfigRuntimeSnapshot, R),
184) -> R {
185    let current = state.config_runtime.load_full();
186    let (next, result) = f(current.as_ref());
187    state.config_runtime.store(Arc::new(next));
188    result
189}
190
191/// Replaces only the active config while preserving status fields.
192pub fn replace_active_config(state: &InnerState, active_config: ActiveConfig) {
193    update_config_runtime(state, |current| {
194        (
195            ConfigRuntimeSnapshot {
196                active_config: Arc::new(active_config),
197                active_config_error: current.active_config_error.clone(),
198                active_config_notice: current.active_config_notice.clone(),
199            },
200            (),
201        )
202    });
203}
204
205/// Replaces the active config status fields while preserving the config itself.
206pub fn replace_active_config_status(
207    state: &InnerState,
208    active_config_error: Option<String>,
209    active_config_notice: Option<ConfigSelfHealReport>,
210) {
211    update_config_runtime(state, |current| {
212        (
213            ConfigRuntimeSnapshot {
214                active_config: Arc::clone(&current.active_config),
215                active_config_error,
216                active_config_notice,
217            },
218            (),
219        )
220    });
221}
222
223/// Clears active-config error and notice state.
224pub fn clear_active_config_status(state: &InnerState) {
225    replace_active_config_status(state, None, None);
226}
227
228/// Resets the active config to an empty default snapshot.
229pub fn reset_active_config_to_default(state: &InnerState) {
230    set_config_runtime_snapshot(
231        state,
232        ConfigRuntimeSnapshot::new(
233            ActiveConfig {
234                config: Default::default(),
235                workspaces: Default::default(),
236                projects: Default::default(),
237            },
238            None,
239            None,
240        ),
241    );
242}
243
244/// Clones the current event sink, recovering from poisoning if needed.
245pub fn clone_event_sink(state: &InnerState) -> Arc<dyn EventSink> {
246    match state.event_sink.read() {
247        Ok(guard) => guard.clone(),
248        Err(err) => {
249            drop(err.into_inner());
250            let mut guard = state
251                .event_sink
252                .write()
253                .unwrap_or_else(|poisoned| poisoned.into_inner());
254            *guard = Arc::new(TracingEventSink::new());
255            state.event_sink.clear_poison();
256            guard.clone()
257        }
258    }
259}
260
261/// Replaces the current event sink, recovering from poisoning if needed.
262pub fn replace_event_sink(state: &InnerState, sink: Arc<dyn EventSink>) {
263    match state.event_sink.write() {
264        Ok(mut guard) => *guard = sink,
265        Err(err) => {
266            let mut guard = err.into_inner();
267            *guard = sink;
268            state.event_sink.clear_poison();
269        }
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use crate::config_load::{read_active_config, read_loaded_config};
277    use crate::test_utils::TestState;
278    use std::sync::atomic::Ordering;
279
280    #[test]
281    fn running_task_starts_with_defaults() {
282        let runtime = RunningTask::new();
283        assert!(!runtime.stop_flag.load(Ordering::SeqCst));
284        assert!(runtime.child.try_lock().expect("lock child").is_none());
285    }
286
287    #[tokio::test]
288    async fn state_accessors_round_trip_agent_maps() {
289        let mut fixture = TestState::new();
290        let state = fixture.build();
291
292        state.agent_health.write().await.insert(
293            "echo".to_string(),
294            AgentHealthState {
295                consecutive_errors: 1,
296                diseased_until: None,
297                total_lifetime_errors: 1,
298                capability_health: HashMap::new(),
299            },
300        );
301        state
302            .agent_metrics
303            .write()
304            .await
305            .insert("echo".to_string(), AgentMetrics::default());
306
307        assert!(state.agent_health.read().await.contains_key("echo"));
308        assert!(state.agent_metrics.read().await.contains_key("echo"));
309    }
310
311    #[test]
312    fn emit_event_is_safe_with_noop_sink() {
313        let mut fixture = TestState::new();
314        let state = fixture.build();
315        state.emit_event(
316            "task-1",
317            Some("item-1"),
318            "heartbeat",
319            serde_json::json!({"ok": true}),
320        );
321    }
322
323    #[test]
324    fn update_config_runtime_replaces_snapshot_without_exposing_guards() {
325        let mut fixture = TestState::new();
326        let state = fixture.build();
327
328        let original = read_loaded_config(&state)
329            .expect("read loaded config")
330            .projects
331            .get(crate::config::DEFAULT_PROJECT_ID)
332            .and_then(|p| p.workflows.keys().next())
333            .cloned()
334            .unwrap_or_default();
335        update_config_runtime(&state, |current| {
336            let mut next = current.clone();
337            let workflow_clone = next
338                .active_config
339                .projects
340                .get(crate::config::DEFAULT_PROJECT_ID)
341                .and_then(|p| p.workflows.get(&original))
342                .cloned()
343                .expect("default workflow should exist");
344            Arc::make_mut(&mut next.active_config)
345                .projects
346                .entry(crate::config::DEFAULT_PROJECT_ID.to_string())
347                .or_insert_with(|| crate::config::ResolvedProject {
348                    workspaces: HashMap::new(),
349                    agents: HashMap::new(),
350                    workflows: HashMap::new(),
351                    step_templates: HashMap::new(),
352                    env_stores: HashMap::new(),
353                    execution_profiles: HashMap::new(),
354                })
355                .workflows
356                .insert(format!("{}-updated", original), workflow_clone);
357            (next, ())
358        });
359
360        let updated_exists = read_loaded_config(&state)
361            .expect("re-read active config")
362            .projects
363            .get(crate::config::DEFAULT_PROJECT_ID)
364            .map(|p| p.workflows.contains_key(&format!("{}-updated", original)))
365            .unwrap_or(false);
366        assert!(updated_exists);
367    }
368
369    #[test]
370    fn read_active_config_rejects_non_runnable_snapshot() {
371        let mut fixture = TestState::new();
372        let state = fixture.build();
373
374        replace_active_config_status(
375            &state,
376            Some("active config is not runnable".to_string()),
377            None,
378        );
379
380        let error = read_active_config(&state).expect_err("non-runnable config should fail");
381        assert!(error.to_string().contains("not runnable"));
382    }
383
384    #[tokio::test]
385    async fn agent_health_and_metrics_reset_explicitly() {
386        let mut fixture = TestState::new();
387        let state = fixture.build();
388
389        state
390            .agent_health
391            .write()
392            .await
393            .insert("echo".to_string(), AgentHealthState::default());
394        state
395            .agent_metrics
396            .write()
397            .await
398            .insert("echo".to_string(), AgentMetrics::default());
399
400        state.agent_health.write().await.clear();
401        state.agent_metrics.write().await.clear();
402
403        assert!(state.agent_health.read().await.is_empty());
404        assert!(state.agent_metrics.read().await.is_empty());
405    }
406
407    #[test]
408    fn poisoned_event_sink_recovers_with_tracing_sink() {
409        let mut fixture = TestState::new();
410        let state = fixture.build();
411
412        let result = std::thread::spawn({
413            let state = state.clone();
414            move || {
415                let _guard = state.event_sink.write().expect("lock event_sink");
416                panic!("poison event_sink");
417            }
418        });
419        assert!(result.join().is_err());
420
421        state.emit_event(
422            "task-1",
423            Some("item-1"),
424            "heartbeat",
425            serde_json::json!({"ok": true}),
426        );
427
428        assert!(state.event_sink.read().is_ok());
429    }
430}