Skip to main content

agent_orchestrator/
state.rs

1use crate::config::ActiveConfig;
2use crate::config_load::ConfigSelfHealReport;
3use crate::events::{EventSink, TracingEventSink};
4use crate::metrics::{AgentHealthState, AgentMetrics, AgentRuntimeState};
5use crate::runtime::DaemonRuntimeState;
6use arc_swap::ArcSwap;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::atomic::AtomicBool;
11use std::sync::{Arc, OnceLock};
12use tokio::process::Child;
13use tokio::sync::{Mutex, Notify};
14
15/// Maximum number of tasks that may run concurrently in-process.
16pub const MAX_CONCURRENT_TASKS: usize = 10;
17
18static TASK_SEMAPHORE: OnceLock<Arc<tokio::sync::Semaphore>> = OnceLock::new();
19
20/// Returns the global task-execution semaphore.
21pub fn task_semaphore() -> &'static Arc<tokio::sync::Semaphore> {
22    TASK_SEMAPHORE.get_or_init(|| Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_TASKS)))
23}
24
25/// Owned wrapper returned by bootstrap helpers.
26#[derive(Clone)]
27pub struct ManagedState {
28    /// Shared inner runtime state.
29    pub inner: Arc<InnerState>,
30}
31
32/// Snapshot of the currently active configuration and its status.
33#[derive(Clone)]
34pub struct ConfigRuntimeSnapshot {
35    /// Active runtime configuration.
36    pub active_config: Arc<ActiveConfig>,
37    /// Optional validation or load error for the active config.
38    pub active_config_error: Option<String>,
39    /// Optional self-heal notice associated with the active config.
40    pub active_config_notice: Option<ConfigSelfHealReport>,
41}
42
43impl ConfigRuntimeSnapshot {
44    /// Creates a runtime snapshot from configuration and status values.
45    pub fn new(
46        active_config: ActiveConfig,
47        active_config_error: Option<String>,
48        active_config_notice: Option<ConfigSelfHealReport>,
49    ) -> Self {
50        Self {
51            active_config: Arc::new(active_config),
52            active_config_error,
53            active_config_notice,
54        }
55    }
56}
57
58/// Shared daemon state referenced by services and scheduler code.
59pub struct InnerState {
60    /// Runtime data directory (`~/.orchestratord` by default).
61    pub data_dir: PathBuf,
62    /// SQLite database path.
63    pub db_path: PathBuf,
64    /// Whether unsafe mode is enabled.
65    pub unsafe_mode: bool,
66    /// Async database handle.
67    pub async_database: Arc<crate::async_database::AsyncDatabase>,
68    /// Directory containing task and command logs.
69    pub logs_dir: PathBuf,
70    /// Atomically swappable configuration snapshot.
71    pub config_runtime: ArcSwap<ConfigRuntimeSnapshot>,
72    /// Currently running tasks keyed by task ID.
73    pub running: Mutex<HashMap<String, RunningTask>>,
74    /// Runtime agent-health map.
75    pub agent_health: tokio::sync::RwLock<HashMap<String, AgentHealthState>>,
76    /// Runtime agent metrics map.
77    pub agent_metrics: tokio::sync::RwLock<HashMap<String, AgentMetrics>>,
78    /// Runtime agent lifecycle map.
79    pub agent_lifecycle: tokio::sync::RwLock<HashMap<String, AgentRuntimeState>>,
80    // FR-016 sync exception: event emission must remain callable from sync and async
81    // paths without making the EventSink interface async. This lock is an
82    // observability boundary, not async main-path shared business state.
83    /// Event sink used by synchronous and asynchronous execution paths.
84    pub event_sink: std::sync::RwLock<Arc<dyn EventSink>>,
85    /// Serialized database write coordinator.
86    pub db_writer: Arc<crate::db_write::DbWriteCoordinator>,
87    /// Interactive session store.
88    pub session_store: Arc<crate::session_store::AsyncSessionStore>,
89    /// Async task repository wrapper.
90    pub task_repo: Arc<crate::task_repository::AsyncSqliteTaskRepository>,
91    /// Workflow store manager.
92    pub store_manager: crate::store::StoreManager,
93    /// Plugin security policy (loaded from `{data_dir}/plugin-policy.yaml`).
94    pub plugin_policy: orchestrator_config::plugin_policy::PluginPolicy,
95    /// Runtime daemon lifecycle state.
96    pub daemon_runtime: DaemonRuntimeState,
97    /// In-process wakeup channel for idle workers.
98    pub worker_notify: Arc<Notify>,
99    /// Broadcast channel for trigger-relevant task events (task_completed / task_failed).
100    pub trigger_event_tx:
101        tokio::sync::broadcast::Sender<crate::trigger_engine::TriggerEventPayload>,
102    /// Handle for notifying the trigger engine of config changes.
103    pub trigger_engine_handle: std::sync::Mutex<Option<crate::trigger_engine::TriggerEngineHandle>>,
104    /// Handle for notifying the filesystem watcher of config changes.
105    /// Set by the daemon; `None` in CLI-only contexts.
106    pub fs_watcher_reload_tx: std::sync::Mutex<Option<tokio::sync::mpsc::Sender<()>>>,
107    /// Scheduler port for cross-crate task enqueue dispatch.
108    ///
109    /// Wired with the real scheduler implementation by the daemon; defaults to
110    /// [`NoopTaskEnqueuer`](crate::scheduler_port::NoopTaskEnqueuer) in tests
111    /// and CLI-only contexts.
112    pub task_enqueuer: Arc<dyn crate::scheduler_port::TaskEnqueuer>,
113}
114
115impl InnerState {
116    /// Emits an event through the currently configured event sink.
117    ///
118    /// When the event is `task_completed` or `task_failed`, it is also broadcast
119    /// on `trigger_event_tx` so the trigger engine can evaluate event triggers.
120    pub fn emit_event(
121        &self,
122        task_id: &str,
123        task_item_id: Option<&str>,
124        event_type: &str,
125        payload: Value,
126    ) {
127        let sink = clone_event_sink(self);
128        sink.emit(task_id, task_item_id, event_type, payload);
129
130        // Broadcast to trigger engine for event-driven triggers.
131        if matches!(event_type, "task_completed" | "task_failed") {
132            crate::trigger_engine::broadcast_task_event(
133                self,
134                crate::trigger_engine::TriggerEventPayload {
135                    event_type: event_type.to_string(),
136                    task_id: task_id.to_string(),
137                    payload: None,
138                    project: None,
139                },
140            );
141        }
142    }
143}
144
145/// Mutable runtime handle for a task process.
146#[derive(Clone)]
147pub struct RunningTask {
148    /// Shared stop flag observed by all forked execution branches.
149    pub stop_flag: Arc<AtomicBool>,
150    /// Handle to the currently running child process, if any.
151    pub child: Arc<Mutex<Option<Child>>>,
152}
153
154impl Default for RunningTask {
155    fn default() -> Self {
156        Self::new()
157    }
158}
159
160impl RunningTask {
161    /// Creates an empty running-task handle.
162    pub fn new() -> Self {
163        Self {
164            stop_flag: Arc::new(AtomicBool::new(false)),
165            child: Arc::new(Mutex::new(None)),
166        }
167    }
168
169    /// Create a sibling that shares `stop_flag` but has its own `child` slot.
170    /// Used for parallel item execution — stopping the task sets the shared flag,
171    /// which all forked items observe.
172    pub fn fork(&self) -> Self {
173        Self {
174            stop_flag: Arc::clone(&self.stop_flag),
175            child: Arc::new(Mutex::new(None)),
176        }
177    }
178}
179
180/// Loads the current configuration snapshot.
181pub fn config_runtime_snapshot(state: &InnerState) -> Arc<ConfigRuntimeSnapshot> {
182    state.config_runtime.load_full()
183}
184
185/// Replaces the current configuration snapshot atomically.
186pub fn set_config_runtime_snapshot(state: &InnerState, snapshot: ConfigRuntimeSnapshot) {
187    state.config_runtime.store(Arc::new(snapshot));
188}
189
190/// Updates the configuration snapshot using a read-modify-write closure.
191pub fn update_config_runtime<R>(
192    state: &InnerState,
193    f: impl FnOnce(&ConfigRuntimeSnapshot) -> (ConfigRuntimeSnapshot, R),
194) -> R {
195    let current = state.config_runtime.load_full();
196    let (next, result) = f(current.as_ref());
197    state.config_runtime.store(Arc::new(next));
198    result
199}
200
201/// Replaces only the active config while preserving status fields.
202pub fn replace_active_config(state: &InnerState, active_config: ActiveConfig) {
203    update_config_runtime(state, |current| {
204        (
205            ConfigRuntimeSnapshot {
206                active_config: Arc::new(active_config),
207                active_config_error: current.active_config_error.clone(),
208                active_config_notice: current.active_config_notice.clone(),
209            },
210            (),
211        )
212    });
213}
214
215/// Replaces the active config status fields while preserving the config itself.
216pub fn replace_active_config_status(
217    state: &InnerState,
218    active_config_error: Option<String>,
219    active_config_notice: Option<ConfigSelfHealReport>,
220) {
221    update_config_runtime(state, |current| {
222        (
223            ConfigRuntimeSnapshot {
224                active_config: Arc::clone(&current.active_config),
225                active_config_error,
226                active_config_notice,
227            },
228            (),
229        )
230    });
231}
232
233/// Clears active-config error and notice state.
234pub fn clear_active_config_status(state: &InnerState) {
235    replace_active_config_status(state, None, None);
236}
237
238/// Resets the active config to an empty default snapshot.
239pub fn reset_active_config_to_default(state: &InnerState) {
240    set_config_runtime_snapshot(
241        state,
242        ConfigRuntimeSnapshot::new(
243            ActiveConfig {
244                config: Default::default(),
245                workspaces: Default::default(),
246                projects: Default::default(),
247            },
248            None,
249            None,
250        ),
251    );
252}
253
254/// Clones the current event sink, recovering from poisoning if needed.
255pub fn clone_event_sink(state: &InnerState) -> Arc<dyn EventSink> {
256    match state.event_sink.read() {
257        Ok(guard) => guard.clone(),
258        Err(err) => {
259            drop(err.into_inner());
260            let mut guard = state
261                .event_sink
262                .write()
263                .unwrap_or_else(|poisoned| poisoned.into_inner());
264            *guard = Arc::new(TracingEventSink::new());
265            state.event_sink.clear_poison();
266            guard.clone()
267        }
268    }
269}
270
271/// Replaces the current event sink, recovering from poisoning if needed.
272pub fn replace_event_sink(state: &InnerState, sink: Arc<dyn EventSink>) {
273    match state.event_sink.write() {
274        Ok(mut guard) => *guard = sink,
275        Err(err) => {
276            let mut guard = err.into_inner();
277            *guard = sink;
278            state.event_sink.clear_poison();
279        }
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use crate::config_load::{read_active_config, read_loaded_config};
287    use crate::test_utils::TestState;
288    use std::sync::atomic::Ordering;
289
290    #[test]
291    fn running_task_starts_with_defaults() {
292        let runtime = RunningTask::new();
293        assert!(!runtime.stop_flag.load(Ordering::SeqCst));
294        assert!(runtime.child.try_lock().expect("lock child").is_none());
295    }
296
297    #[tokio::test]
298    async fn state_accessors_round_trip_agent_maps() {
299        let mut fixture = TestState::new();
300        let state = fixture.build();
301
302        state.agent_health.write().await.insert(
303            "echo".to_string(),
304            AgentHealthState {
305                consecutive_errors: 1,
306                diseased_until: None,
307                total_lifetime_errors: 1,
308                capability_health: HashMap::new(),
309            },
310        );
311        state
312            .agent_metrics
313            .write()
314            .await
315            .insert("echo".to_string(), AgentMetrics::default());
316
317        assert!(state.agent_health.read().await.contains_key("echo"));
318        assert!(state.agent_metrics.read().await.contains_key("echo"));
319    }
320
321    #[test]
322    fn emit_event_is_safe_with_noop_sink() {
323        let mut fixture = TestState::new();
324        let state = fixture.build();
325        state.emit_event(
326            "task-1",
327            Some("item-1"),
328            "heartbeat",
329            serde_json::json!({"ok": true}),
330        );
331    }
332
333    #[test]
334    fn update_config_runtime_replaces_snapshot_without_exposing_guards() {
335        let mut fixture = TestState::new();
336        let state = fixture.build();
337
338        let original = read_loaded_config(&state)
339            .expect("read loaded config")
340            .projects
341            .get(crate::config::DEFAULT_PROJECT_ID)
342            .and_then(|p| p.workflows.keys().next())
343            .cloned()
344            .unwrap_or_default();
345        update_config_runtime(&state, |current| {
346            let mut next = current.clone();
347            let workflow_clone = next
348                .active_config
349                .projects
350                .get(crate::config::DEFAULT_PROJECT_ID)
351                .and_then(|p| p.workflows.get(&original))
352                .cloned()
353                .expect("default workflow should exist");
354            Arc::make_mut(&mut next.active_config)
355                .projects
356                .entry(crate::config::DEFAULT_PROJECT_ID.to_string())
357                .or_insert_with(|| crate::config::ResolvedProject {
358                    workspaces: HashMap::new(),
359                    agents: HashMap::new(),
360                    workflows: HashMap::new(),
361                    step_templates: HashMap::new(),
362                    env_stores: HashMap::new(),
363                    secret_stores: HashMap::new(),
364                    execution_profiles: HashMap::new(),
365                })
366                .workflows
367                .insert(format!("{}-updated", original), workflow_clone);
368            (next, ())
369        });
370
371        let updated_exists = read_loaded_config(&state)
372            .expect("re-read active config")
373            .projects
374            .get(crate::config::DEFAULT_PROJECT_ID)
375            .map(|p| p.workflows.contains_key(&format!("{}-updated", original)))
376            .unwrap_or(false);
377        assert!(updated_exists);
378    }
379
380    #[test]
381    fn read_active_config_rejects_non_runnable_snapshot() {
382        let mut fixture = TestState::new();
383        let state = fixture.build();
384
385        replace_active_config_status(
386            &state,
387            Some("active config is not runnable".to_string()),
388            None,
389        );
390
391        let error = read_active_config(&state).expect_err("non-runnable config should fail");
392        assert!(error.to_string().contains("not runnable"));
393    }
394
395    #[tokio::test]
396    async fn agent_health_and_metrics_reset_explicitly() {
397        let mut fixture = TestState::new();
398        let state = fixture.build();
399
400        state
401            .agent_health
402            .write()
403            .await
404            .insert("echo".to_string(), AgentHealthState::default());
405        state
406            .agent_metrics
407            .write()
408            .await
409            .insert("echo".to_string(), AgentMetrics::default());
410
411        state.agent_health.write().await.clear();
412        state.agent_metrics.write().await.clear();
413
414        assert!(state.agent_health.read().await.is_empty());
415        assert!(state.agent_metrics.read().await.is_empty());
416    }
417
418    #[test]
419    fn poisoned_event_sink_recovers_with_tracing_sink() {
420        let mut fixture = TestState::new();
421        let state = fixture.build();
422
423        let result = std::thread::spawn({
424            let state = state.clone();
425            move || {
426                let _guard = state.event_sink.write().expect("lock event_sink");
427                panic!("poison event_sink");
428            }
429        });
430        assert!(result.join().is_err());
431
432        state.emit_event(
433            "task-1",
434            Some("item-1"),
435            "heartbeat",
436            serde_json::json!({"ok": true}),
437        );
438
439        assert!(state.event_sink.read().is_ok());
440    }
441}