1use crate::collab::MessageBus;
2use crate::config::ActiveConfig;
3use crate::config_load::ConfigSelfHealReport;
4use crate::events::{EventSink, TracingEventSink};
5use crate::metrics::{AgentHealthState, AgentMetrics, AgentRuntimeState};
6use crate::runtime::DaemonRuntimeState;
7use arc_swap::ArcSwap;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::atomic::AtomicBool;
12use std::sync::{Arc, OnceLock};
13use tokio::process::Child;
14use tokio::sync::{Mutex, Notify};
15
16pub const MAX_CONCURRENT_TASKS: usize = 10;
18
19static TASK_SEMAPHORE: OnceLock<Arc<tokio::sync::Semaphore>> = OnceLock::new();
20
21pub fn task_semaphore() -> &'static Arc<tokio::sync::Semaphore> {
23 TASK_SEMAPHORE.get_or_init(|| Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_TASKS)))
24}
25
26#[derive(Clone)]
28pub struct ManagedState {
29 pub inner: Arc<InnerState>,
31}
32
33#[derive(Clone)]
35pub struct ConfigRuntimeSnapshot {
36 pub active_config: Arc<ActiveConfig>,
38 pub active_config_error: Option<String>,
40 pub active_config_notice: Option<ConfigSelfHealReport>,
42}
43
44impl ConfigRuntimeSnapshot {
45 pub fn new(
47 active_config: ActiveConfig,
48 active_config_error: Option<String>,
49 active_config_notice: Option<ConfigSelfHealReport>,
50 ) -> Self {
51 Self {
52 active_config: Arc::new(active_config),
53 active_config_error,
54 active_config_notice,
55 }
56 }
57}
58
59pub struct InnerState {
61 pub data_dir: PathBuf,
63 pub db_path: PathBuf,
65 pub unsafe_mode: bool,
67 pub async_database: Arc<crate::async_database::AsyncDatabase>,
69 pub logs_dir: PathBuf,
71 pub config_runtime: ArcSwap<ConfigRuntimeSnapshot>,
73 pub running: Mutex<HashMap<String, RunningTask>>,
75 pub agent_health: tokio::sync::RwLock<HashMap<String, AgentHealthState>>,
77 pub agent_metrics: tokio::sync::RwLock<HashMap<String, AgentMetrics>>,
79 pub agent_lifecycle: tokio::sync::RwLock<HashMap<String, AgentRuntimeState>>,
81 pub message_bus: Arc<MessageBus>,
83 pub event_sink: std::sync::RwLock<Arc<dyn EventSink>>,
88 pub db_writer: Arc<crate::db_write::DbWriteCoordinator>,
90 pub session_store: Arc<crate::session_store::AsyncSessionStore>,
92 pub task_repo: Arc<crate::task_repository::AsyncSqliteTaskRepository>,
94 pub store_manager: crate::store::StoreManager,
96 pub daemon_runtime: DaemonRuntimeState,
98 pub worker_notify: Arc<Notify>,
100 pub trigger_event_tx:
102 tokio::sync::broadcast::Sender<crate::trigger_engine::TriggerEventPayload>,
103 pub trigger_engine_handle: std::sync::Mutex<Option<crate::trigger_engine::TriggerEngineHandle>>,
105}
106
107impl InnerState {
108 pub fn emit_event(
113 &self,
114 task_id: &str,
115 task_item_id: Option<&str>,
116 event_type: &str,
117 payload: Value,
118 ) {
119 let sink = clone_event_sink(self);
120 sink.emit(task_id, task_item_id, event_type, payload);
121
122 if matches!(event_type, "task_completed" | "task_failed") {
124 crate::trigger_engine::broadcast_task_event(
125 self,
126 crate::trigger_engine::TriggerEventPayload {
127 event_type: event_type.to_string(),
128 task_id: task_id.to_string(),
129 },
130 );
131 }
132 }
133}
134
135#[derive(Clone)]
137pub struct RunningTask {
138 pub stop_flag: Arc<AtomicBool>,
140 pub child: Arc<Mutex<Option<Child>>>,
142}
143
144impl Default for RunningTask {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150impl RunningTask {
151 pub fn new() -> Self {
153 Self {
154 stop_flag: Arc::new(AtomicBool::new(false)),
155 child: Arc::new(Mutex::new(None)),
156 }
157 }
158
159 pub fn fork(&self) -> Self {
163 Self {
164 stop_flag: Arc::clone(&self.stop_flag),
165 child: Arc::new(Mutex::new(None)),
166 }
167 }
168}
169
170pub fn config_runtime_snapshot(state: &InnerState) -> Arc<ConfigRuntimeSnapshot> {
172 state.config_runtime.load_full()
173}
174
175pub fn set_config_runtime_snapshot(state: &InnerState, snapshot: ConfigRuntimeSnapshot) {
177 state.config_runtime.store(Arc::new(snapshot));
178}
179
180pub fn update_config_runtime<R>(
182 state: &InnerState,
183 f: impl FnOnce(&ConfigRuntimeSnapshot) -> (ConfigRuntimeSnapshot, R),
184) -> R {
185 let current = state.config_runtime.load_full();
186 let (next, result) = f(current.as_ref());
187 state.config_runtime.store(Arc::new(next));
188 result
189}
190
191pub fn replace_active_config(state: &InnerState, active_config: ActiveConfig) {
193 update_config_runtime(state, |current| {
194 (
195 ConfigRuntimeSnapshot {
196 active_config: Arc::new(active_config),
197 active_config_error: current.active_config_error.clone(),
198 active_config_notice: current.active_config_notice.clone(),
199 },
200 (),
201 )
202 });
203}
204
205pub fn replace_active_config_status(
207 state: &InnerState,
208 active_config_error: Option<String>,
209 active_config_notice: Option<ConfigSelfHealReport>,
210) {
211 update_config_runtime(state, |current| {
212 (
213 ConfigRuntimeSnapshot {
214 active_config: Arc::clone(¤t.active_config),
215 active_config_error,
216 active_config_notice,
217 },
218 (),
219 )
220 });
221}
222
223pub fn clear_active_config_status(state: &InnerState) {
225 replace_active_config_status(state, None, None);
226}
227
228pub fn reset_active_config_to_default(state: &InnerState) {
230 set_config_runtime_snapshot(
231 state,
232 ConfigRuntimeSnapshot::new(
233 ActiveConfig {
234 config: Default::default(),
235 workspaces: Default::default(),
236 projects: Default::default(),
237 },
238 None,
239 None,
240 ),
241 );
242}
243
244pub fn clone_event_sink(state: &InnerState) -> Arc<dyn EventSink> {
246 match state.event_sink.read() {
247 Ok(guard) => guard.clone(),
248 Err(err) => {
249 drop(err.into_inner());
250 let mut guard = state
251 .event_sink
252 .write()
253 .unwrap_or_else(|poisoned| poisoned.into_inner());
254 *guard = Arc::new(TracingEventSink::new());
255 state.event_sink.clear_poison();
256 guard.clone()
257 }
258 }
259}
260
261pub fn replace_event_sink(state: &InnerState, sink: Arc<dyn EventSink>) {
263 match state.event_sink.write() {
264 Ok(mut guard) => *guard = sink,
265 Err(err) => {
266 let mut guard = err.into_inner();
267 *guard = sink;
268 state.event_sink.clear_poison();
269 }
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use crate::config_load::{read_active_config, read_loaded_config};
277 use crate::test_utils::TestState;
278 use std::sync::atomic::Ordering;
279
280 #[test]
281 fn running_task_starts_with_defaults() {
282 let runtime = RunningTask::new();
283 assert!(!runtime.stop_flag.load(Ordering::SeqCst));
284 assert!(runtime.child.try_lock().expect("lock child").is_none());
285 }
286
287 #[tokio::test]
288 async fn state_accessors_round_trip_agent_maps() {
289 let mut fixture = TestState::new();
290 let state = fixture.build();
291
292 state.agent_health.write().await.insert(
293 "echo".to_string(),
294 AgentHealthState {
295 consecutive_errors: 1,
296 diseased_until: None,
297 total_lifetime_errors: 1,
298 capability_health: HashMap::new(),
299 },
300 );
301 state
302 .agent_metrics
303 .write()
304 .await
305 .insert("echo".to_string(), AgentMetrics::default());
306
307 assert!(state.agent_health.read().await.contains_key("echo"));
308 assert!(state.agent_metrics.read().await.contains_key("echo"));
309 }
310
311 #[test]
312 fn emit_event_is_safe_with_noop_sink() {
313 let mut fixture = TestState::new();
314 let state = fixture.build();
315 state.emit_event(
316 "task-1",
317 Some("item-1"),
318 "heartbeat",
319 serde_json::json!({"ok": true}),
320 );
321 }
322
323 #[test]
324 fn update_config_runtime_replaces_snapshot_without_exposing_guards() {
325 let mut fixture = TestState::new();
326 let state = fixture.build();
327
328 let original = read_loaded_config(&state)
329 .expect("read loaded config")
330 .projects
331 .get(crate::config::DEFAULT_PROJECT_ID)
332 .and_then(|p| p.workflows.keys().next())
333 .cloned()
334 .unwrap_or_default();
335 update_config_runtime(&state, |current| {
336 let mut next = current.clone();
337 let workflow_clone = next
338 .active_config
339 .projects
340 .get(crate::config::DEFAULT_PROJECT_ID)
341 .and_then(|p| p.workflows.get(&original))
342 .cloned()
343 .expect("default workflow should exist");
344 Arc::make_mut(&mut next.active_config)
345 .projects
346 .entry(crate::config::DEFAULT_PROJECT_ID.to_string())
347 .or_insert_with(|| crate::config::ResolvedProject {
348 workspaces: HashMap::new(),
349 agents: HashMap::new(),
350 workflows: HashMap::new(),
351 step_templates: HashMap::new(),
352 env_stores: HashMap::new(),
353 execution_profiles: HashMap::new(),
354 })
355 .workflows
356 .insert(format!("{}-updated", original), workflow_clone);
357 (next, ())
358 });
359
360 let updated_exists = read_loaded_config(&state)
361 .expect("re-read active config")
362 .projects
363 .get(crate::config::DEFAULT_PROJECT_ID)
364 .map(|p| p.workflows.contains_key(&format!("{}-updated", original)))
365 .unwrap_or(false);
366 assert!(updated_exists);
367 }
368
369 #[test]
370 fn read_active_config_rejects_non_runnable_snapshot() {
371 let mut fixture = TestState::new();
372 let state = fixture.build();
373
374 replace_active_config_status(
375 &state,
376 Some("active config is not runnable".to_string()),
377 None,
378 );
379
380 let error = read_active_config(&state).expect_err("non-runnable config should fail");
381 assert!(error.to_string().contains("not runnable"));
382 }
383
384 #[tokio::test]
385 async fn agent_health_and_metrics_reset_explicitly() {
386 let mut fixture = TestState::new();
387 let state = fixture.build();
388
389 state
390 .agent_health
391 .write()
392 .await
393 .insert("echo".to_string(), AgentHealthState::default());
394 state
395 .agent_metrics
396 .write()
397 .await
398 .insert("echo".to_string(), AgentMetrics::default());
399
400 state.agent_health.write().await.clear();
401 state.agent_metrics.write().await.clear();
402
403 assert!(state.agent_health.read().await.is_empty());
404 assert!(state.agent_metrics.read().await.is_empty());
405 }
406
407 #[test]
408 fn poisoned_event_sink_recovers_with_tracing_sink() {
409 let mut fixture = TestState::new();
410 let state = fixture.build();
411
412 let result = std::thread::spawn({
413 let state = state.clone();
414 move || {
415 let _guard = state.event_sink.write().expect("lock event_sink");
416 panic!("poison event_sink");
417 }
418 });
419 assert!(result.join().is_err());
420
421 state.emit_event(
422 "task-1",
423 Some("item-1"),
424 "heartbeat",
425 serde_json::json!({"ok": true}),
426 );
427
428 assert!(state.event_sink.read().is_ok());
429 }
430}