1use crate::collab::MessageBus;
2use crate::config::ActiveConfig;
3use crate::config_load::ConfigSelfHealReport;
4use crate::events::{EventSink, TracingEventSink};
5use crate::metrics::{AgentHealthState, AgentMetrics, AgentRuntimeState};
6use crate::runtime::DaemonRuntimeState;
7use arc_swap::ArcSwap;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::atomic::AtomicBool;
12use std::sync::{Arc, OnceLock};
13use tokio::process::Child;
14use tokio::sync::{Mutex, Notify};
15
16pub const MAX_CONCURRENT_TASKS: usize = 10;
18
19static TASK_SEMAPHORE: OnceLock<Arc<tokio::sync::Semaphore>> = OnceLock::new();
20
21pub fn task_semaphore() -> &'static Arc<tokio::sync::Semaphore> {
23 TASK_SEMAPHORE.get_or_init(|| Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_TASKS)))
24}
25
26#[derive(Clone)]
28pub struct ManagedState {
29 pub inner: Arc<InnerState>,
31}
32
33#[derive(Clone)]
35pub struct ConfigRuntimeSnapshot {
36 pub active_config: Arc<ActiveConfig>,
38 pub active_config_error: Option<String>,
40 pub active_config_notice: Option<ConfigSelfHealReport>,
42}
43
44impl ConfigRuntimeSnapshot {
45 pub fn new(
47 active_config: ActiveConfig,
48 active_config_error: Option<String>,
49 active_config_notice: Option<ConfigSelfHealReport>,
50 ) -> Self {
51 Self {
52 active_config: Arc::new(active_config),
53 active_config_error,
54 active_config_notice,
55 }
56 }
57}
58
59pub struct InnerState {
61 pub data_dir: PathBuf,
63 pub db_path: PathBuf,
65 pub unsafe_mode: bool,
67 pub async_database: Arc<crate::async_database::AsyncDatabase>,
69 pub logs_dir: PathBuf,
71 pub config_runtime: ArcSwap<ConfigRuntimeSnapshot>,
73 pub running: Mutex<HashMap<String, RunningTask>>,
75 pub agent_health: tokio::sync::RwLock<HashMap<String, AgentHealthState>>,
77 pub agent_metrics: tokio::sync::RwLock<HashMap<String, AgentMetrics>>,
79 pub agent_lifecycle: tokio::sync::RwLock<HashMap<String, AgentRuntimeState>>,
81 pub message_bus: Arc<MessageBus>,
83 pub event_sink: std::sync::RwLock<Arc<dyn EventSink>>,
88 pub db_writer: Arc<crate::db_write::DbWriteCoordinator>,
90 pub session_store: Arc<crate::session_store::AsyncSessionStore>,
92 pub task_repo: Arc<crate::task_repository::AsyncSqliteTaskRepository>,
94 pub store_manager: crate::store::StoreManager,
96 pub daemon_runtime: DaemonRuntimeState,
98 pub worker_notify: Arc<Notify>,
100 pub trigger_event_tx:
102 tokio::sync::broadcast::Sender<crate::trigger_engine::TriggerEventPayload>,
103 pub trigger_engine_handle: std::sync::Mutex<Option<crate::trigger_engine::TriggerEngineHandle>>,
105}
106
107impl InnerState {
108 pub fn emit_event(
113 &self,
114 task_id: &str,
115 task_item_id: Option<&str>,
116 event_type: &str,
117 payload: Value,
118 ) {
119 let sink = clone_event_sink(self);
120 sink.emit(task_id, task_item_id, event_type, payload);
121
122 if matches!(event_type, "task_completed" | "task_failed") {
124 crate::trigger_engine::broadcast_task_event(
125 self,
126 crate::trigger_engine::TriggerEventPayload {
127 event_type: event_type.to_string(),
128 task_id: task_id.to_string(),
129 payload: None,
130 },
131 );
132 }
133 }
134}
135
136#[derive(Clone)]
138pub struct RunningTask {
139 pub stop_flag: Arc<AtomicBool>,
141 pub child: Arc<Mutex<Option<Child>>>,
143}
144
145impl Default for RunningTask {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151impl RunningTask {
152 pub fn new() -> Self {
154 Self {
155 stop_flag: Arc::new(AtomicBool::new(false)),
156 child: Arc::new(Mutex::new(None)),
157 }
158 }
159
160 pub fn fork(&self) -> Self {
164 Self {
165 stop_flag: Arc::clone(&self.stop_flag),
166 child: Arc::new(Mutex::new(None)),
167 }
168 }
169}
170
171pub fn config_runtime_snapshot(state: &InnerState) -> Arc<ConfigRuntimeSnapshot> {
173 state.config_runtime.load_full()
174}
175
176pub fn set_config_runtime_snapshot(state: &InnerState, snapshot: ConfigRuntimeSnapshot) {
178 state.config_runtime.store(Arc::new(snapshot));
179}
180
181pub fn update_config_runtime<R>(
183 state: &InnerState,
184 f: impl FnOnce(&ConfigRuntimeSnapshot) -> (ConfigRuntimeSnapshot, R),
185) -> R {
186 let current = state.config_runtime.load_full();
187 let (next, result) = f(current.as_ref());
188 state.config_runtime.store(Arc::new(next));
189 result
190}
191
192pub fn replace_active_config(state: &InnerState, active_config: ActiveConfig) {
194 update_config_runtime(state, |current| {
195 (
196 ConfigRuntimeSnapshot {
197 active_config: Arc::new(active_config),
198 active_config_error: current.active_config_error.clone(),
199 active_config_notice: current.active_config_notice.clone(),
200 },
201 (),
202 )
203 });
204}
205
206pub fn replace_active_config_status(
208 state: &InnerState,
209 active_config_error: Option<String>,
210 active_config_notice: Option<ConfigSelfHealReport>,
211) {
212 update_config_runtime(state, |current| {
213 (
214 ConfigRuntimeSnapshot {
215 active_config: Arc::clone(¤t.active_config),
216 active_config_error,
217 active_config_notice,
218 },
219 (),
220 )
221 });
222}
223
224pub fn clear_active_config_status(state: &InnerState) {
226 replace_active_config_status(state, None, None);
227}
228
229pub fn reset_active_config_to_default(state: &InnerState) {
231 set_config_runtime_snapshot(
232 state,
233 ConfigRuntimeSnapshot::new(
234 ActiveConfig {
235 config: Default::default(),
236 workspaces: Default::default(),
237 projects: Default::default(),
238 },
239 None,
240 None,
241 ),
242 );
243}
244
245pub fn clone_event_sink(state: &InnerState) -> Arc<dyn EventSink> {
247 match state.event_sink.read() {
248 Ok(guard) => guard.clone(),
249 Err(err) => {
250 drop(err.into_inner());
251 let mut guard = state
252 .event_sink
253 .write()
254 .unwrap_or_else(|poisoned| poisoned.into_inner());
255 *guard = Arc::new(TracingEventSink::new());
256 state.event_sink.clear_poison();
257 guard.clone()
258 }
259 }
260}
261
262pub fn replace_event_sink(state: &InnerState, sink: Arc<dyn EventSink>) {
264 match state.event_sink.write() {
265 Ok(mut guard) => *guard = sink,
266 Err(err) => {
267 let mut guard = err.into_inner();
268 *guard = sink;
269 state.event_sink.clear_poison();
270 }
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277 use crate::config_load::{read_active_config, read_loaded_config};
278 use crate::test_utils::TestState;
279 use std::sync::atomic::Ordering;
280
281 #[test]
282 fn running_task_starts_with_defaults() {
283 let runtime = RunningTask::new();
284 assert!(!runtime.stop_flag.load(Ordering::SeqCst));
285 assert!(runtime.child.try_lock().expect("lock child").is_none());
286 }
287
288 #[tokio::test]
289 async fn state_accessors_round_trip_agent_maps() {
290 let mut fixture = TestState::new();
291 let state = fixture.build();
292
293 state.agent_health.write().await.insert(
294 "echo".to_string(),
295 AgentHealthState {
296 consecutive_errors: 1,
297 diseased_until: None,
298 total_lifetime_errors: 1,
299 capability_health: HashMap::new(),
300 },
301 );
302 state
303 .agent_metrics
304 .write()
305 .await
306 .insert("echo".to_string(), AgentMetrics::default());
307
308 assert!(state.agent_health.read().await.contains_key("echo"));
309 assert!(state.agent_metrics.read().await.contains_key("echo"));
310 }
311
312 #[test]
313 fn emit_event_is_safe_with_noop_sink() {
314 let mut fixture = TestState::new();
315 let state = fixture.build();
316 state.emit_event(
317 "task-1",
318 Some("item-1"),
319 "heartbeat",
320 serde_json::json!({"ok": true}),
321 );
322 }
323
324 #[test]
325 fn update_config_runtime_replaces_snapshot_without_exposing_guards() {
326 let mut fixture = TestState::new();
327 let state = fixture.build();
328
329 let original = read_loaded_config(&state)
330 .expect("read loaded config")
331 .projects
332 .get(crate::config::DEFAULT_PROJECT_ID)
333 .and_then(|p| p.workflows.keys().next())
334 .cloned()
335 .unwrap_or_default();
336 update_config_runtime(&state, |current| {
337 let mut next = current.clone();
338 let workflow_clone = next
339 .active_config
340 .projects
341 .get(crate::config::DEFAULT_PROJECT_ID)
342 .and_then(|p| p.workflows.get(&original))
343 .cloned()
344 .expect("default workflow should exist");
345 Arc::make_mut(&mut next.active_config)
346 .projects
347 .entry(crate::config::DEFAULT_PROJECT_ID.to_string())
348 .or_insert_with(|| crate::config::ResolvedProject {
349 workspaces: HashMap::new(),
350 agents: HashMap::new(),
351 workflows: HashMap::new(),
352 step_templates: HashMap::new(),
353 env_stores: HashMap::new(),
354 execution_profiles: HashMap::new(),
355 })
356 .workflows
357 .insert(format!("{}-updated", original), workflow_clone);
358 (next, ())
359 });
360
361 let updated_exists = read_loaded_config(&state)
362 .expect("re-read active config")
363 .projects
364 .get(crate::config::DEFAULT_PROJECT_ID)
365 .map(|p| p.workflows.contains_key(&format!("{}-updated", original)))
366 .unwrap_or(false);
367 assert!(updated_exists);
368 }
369
370 #[test]
371 fn read_active_config_rejects_non_runnable_snapshot() {
372 let mut fixture = TestState::new();
373 let state = fixture.build();
374
375 replace_active_config_status(
376 &state,
377 Some("active config is not runnable".to_string()),
378 None,
379 );
380
381 let error = read_active_config(&state).expect_err("non-runnable config should fail");
382 assert!(error.to_string().contains("not runnable"));
383 }
384
385 #[tokio::test]
386 async fn agent_health_and_metrics_reset_explicitly() {
387 let mut fixture = TestState::new();
388 let state = fixture.build();
389
390 state
391 .agent_health
392 .write()
393 .await
394 .insert("echo".to_string(), AgentHealthState::default());
395 state
396 .agent_metrics
397 .write()
398 .await
399 .insert("echo".to_string(), AgentMetrics::default());
400
401 state.agent_health.write().await.clear();
402 state.agent_metrics.write().await.clear();
403
404 assert!(state.agent_health.read().await.is_empty());
405 assert!(state.agent_metrics.read().await.is_empty());
406 }
407
408 #[test]
409 fn poisoned_event_sink_recovers_with_tracing_sink() {
410 let mut fixture = TestState::new();
411 let state = fixture.build();
412
413 let result = std::thread::spawn({
414 let state = state.clone();
415 move || {
416 let _guard = state.event_sink.write().expect("lock event_sink");
417 panic!("poison event_sink");
418 }
419 });
420 assert!(result.join().is_err());
421
422 state.emit_event(
423 "task-1",
424 Some("item-1"),
425 "heartbeat",
426 serde_json::json!({"ok": true}),
427 );
428
429 assert!(state.event_sink.read().is_ok());
430 }
431}