1use crate::collab::MessageBus;
2use crate::config::ActiveConfig;
3use crate::config_load::ConfigSelfHealReport;
4use crate::events::{EventSink, TracingEventSink};
5use crate::metrics::{AgentHealthState, AgentMetrics, AgentRuntimeState};
6use crate::runtime::DaemonRuntimeState;
7use arc_swap::ArcSwap;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::atomic::AtomicBool;
12use std::sync::{Arc, OnceLock};
13use tokio::process::Child;
14use tokio::sync::{Mutex, Notify};
15
16pub const MAX_CONCURRENT_TASKS: usize = 10;
18
19static TASK_SEMAPHORE: OnceLock<Arc<tokio::sync::Semaphore>> = OnceLock::new();
20
21pub fn task_semaphore() -> &'static Arc<tokio::sync::Semaphore> {
23 TASK_SEMAPHORE.get_or_init(|| Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_TASKS)))
24}
25
26#[derive(Clone)]
28pub struct ManagedState {
29 pub inner: Arc<InnerState>,
31}
32
33#[derive(Clone)]
35pub struct ConfigRuntimeSnapshot {
36 pub active_config: Arc<ActiveConfig>,
38 pub active_config_error: Option<String>,
40 pub active_config_notice: Option<ConfigSelfHealReport>,
42}
43
44impl ConfigRuntimeSnapshot {
45 pub fn new(
47 active_config: ActiveConfig,
48 active_config_error: Option<String>,
49 active_config_notice: Option<ConfigSelfHealReport>,
50 ) -> Self {
51 Self {
52 active_config: Arc::new(active_config),
53 active_config_error,
54 active_config_notice,
55 }
56 }
57}
58
59pub struct InnerState {
61 pub data_dir: PathBuf,
63 pub db_path: PathBuf,
65 pub unsafe_mode: bool,
67 pub async_database: Arc<crate::async_database::AsyncDatabase>,
69 pub logs_dir: PathBuf,
71 pub config_runtime: ArcSwap<ConfigRuntimeSnapshot>,
73 pub running: Mutex<HashMap<String, RunningTask>>,
75 pub agent_health: tokio::sync::RwLock<HashMap<String, AgentHealthState>>,
77 pub agent_metrics: tokio::sync::RwLock<HashMap<String, AgentMetrics>>,
79 pub agent_lifecycle: tokio::sync::RwLock<HashMap<String, AgentRuntimeState>>,
81 pub message_bus: Arc<MessageBus>,
83 pub event_sink: std::sync::RwLock<Arc<dyn EventSink>>,
88 pub db_writer: Arc<crate::db_write::DbWriteCoordinator>,
90 pub session_store: Arc<crate::session_store::AsyncSessionStore>,
92 pub task_repo: Arc<crate::task_repository::AsyncSqliteTaskRepository>,
94 pub store_manager: crate::store::StoreManager,
96 pub daemon_runtime: DaemonRuntimeState,
98 pub worker_notify: Arc<Notify>,
100 pub trigger_event_tx:
102 tokio::sync::broadcast::Sender<crate::trigger_engine::TriggerEventPayload>,
103 pub trigger_engine_handle: std::sync::Mutex<Option<crate::trigger_engine::TriggerEngineHandle>>,
105 pub fs_watcher_reload_tx: std::sync::Mutex<Option<tokio::sync::mpsc::Sender<()>>>,
108 pub task_enqueuer: Arc<dyn crate::scheduler_port::TaskEnqueuer>,
114}
115
116impl InnerState {
117 pub fn emit_event(
122 &self,
123 task_id: &str,
124 task_item_id: Option<&str>,
125 event_type: &str,
126 payload: Value,
127 ) {
128 let sink = clone_event_sink(self);
129 sink.emit(task_id, task_item_id, event_type, payload);
130
131 if matches!(event_type, "task_completed" | "task_failed") {
133 crate::trigger_engine::broadcast_task_event(
134 self,
135 crate::trigger_engine::TriggerEventPayload {
136 event_type: event_type.to_string(),
137 task_id: task_id.to_string(),
138 payload: None,
139 project: None,
140 },
141 );
142 }
143 }
144}
145
146#[derive(Clone)]
148pub struct RunningTask {
149 pub stop_flag: Arc<AtomicBool>,
151 pub child: Arc<Mutex<Option<Child>>>,
153}
154
155impl Default for RunningTask {
156 fn default() -> Self {
157 Self::new()
158 }
159}
160
161impl RunningTask {
162 pub fn new() -> Self {
164 Self {
165 stop_flag: Arc::new(AtomicBool::new(false)),
166 child: Arc::new(Mutex::new(None)),
167 }
168 }
169
170 pub fn fork(&self) -> Self {
174 Self {
175 stop_flag: Arc::clone(&self.stop_flag),
176 child: Arc::new(Mutex::new(None)),
177 }
178 }
179}
180
181pub fn config_runtime_snapshot(state: &InnerState) -> Arc<ConfigRuntimeSnapshot> {
183 state.config_runtime.load_full()
184}
185
186pub fn set_config_runtime_snapshot(state: &InnerState, snapshot: ConfigRuntimeSnapshot) {
188 state.config_runtime.store(Arc::new(snapshot));
189}
190
191pub fn update_config_runtime<R>(
193 state: &InnerState,
194 f: impl FnOnce(&ConfigRuntimeSnapshot) -> (ConfigRuntimeSnapshot, R),
195) -> R {
196 let current = state.config_runtime.load_full();
197 let (next, result) = f(current.as_ref());
198 state.config_runtime.store(Arc::new(next));
199 result
200}
201
202pub fn replace_active_config(state: &InnerState, active_config: ActiveConfig) {
204 update_config_runtime(state, |current| {
205 (
206 ConfigRuntimeSnapshot {
207 active_config: Arc::new(active_config),
208 active_config_error: current.active_config_error.clone(),
209 active_config_notice: current.active_config_notice.clone(),
210 },
211 (),
212 )
213 });
214}
215
216pub fn replace_active_config_status(
218 state: &InnerState,
219 active_config_error: Option<String>,
220 active_config_notice: Option<ConfigSelfHealReport>,
221) {
222 update_config_runtime(state, |current| {
223 (
224 ConfigRuntimeSnapshot {
225 active_config: Arc::clone(¤t.active_config),
226 active_config_error,
227 active_config_notice,
228 },
229 (),
230 )
231 });
232}
233
234pub fn clear_active_config_status(state: &InnerState) {
236 replace_active_config_status(state, None, None);
237}
238
239pub fn reset_active_config_to_default(state: &InnerState) {
241 set_config_runtime_snapshot(
242 state,
243 ConfigRuntimeSnapshot::new(
244 ActiveConfig {
245 config: Default::default(),
246 workspaces: Default::default(),
247 projects: Default::default(),
248 },
249 None,
250 None,
251 ),
252 );
253}
254
255pub fn clone_event_sink(state: &InnerState) -> Arc<dyn EventSink> {
257 match state.event_sink.read() {
258 Ok(guard) => guard.clone(),
259 Err(err) => {
260 drop(err.into_inner());
261 let mut guard = state
262 .event_sink
263 .write()
264 .unwrap_or_else(|poisoned| poisoned.into_inner());
265 *guard = Arc::new(TracingEventSink::new());
266 state.event_sink.clear_poison();
267 guard.clone()
268 }
269 }
270}
271
272pub fn replace_event_sink(state: &InnerState, sink: Arc<dyn EventSink>) {
274 match state.event_sink.write() {
275 Ok(mut guard) => *guard = sink,
276 Err(err) => {
277 let mut guard = err.into_inner();
278 *guard = sink;
279 state.event_sink.clear_poison();
280 }
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::config_load::{read_active_config, read_loaded_config};
288 use crate::test_utils::TestState;
289 use std::sync::atomic::Ordering;
290
291 #[test]
292 fn running_task_starts_with_defaults() {
293 let runtime = RunningTask::new();
294 assert!(!runtime.stop_flag.load(Ordering::SeqCst));
295 assert!(runtime.child.try_lock().expect("lock child").is_none());
296 }
297
298 #[tokio::test]
299 async fn state_accessors_round_trip_agent_maps() {
300 let mut fixture = TestState::new();
301 let state = fixture.build();
302
303 state.agent_health.write().await.insert(
304 "echo".to_string(),
305 AgentHealthState {
306 consecutive_errors: 1,
307 diseased_until: None,
308 total_lifetime_errors: 1,
309 capability_health: HashMap::new(),
310 },
311 );
312 state
313 .agent_metrics
314 .write()
315 .await
316 .insert("echo".to_string(), AgentMetrics::default());
317
318 assert!(state.agent_health.read().await.contains_key("echo"));
319 assert!(state.agent_metrics.read().await.contains_key("echo"));
320 }
321
322 #[test]
323 fn emit_event_is_safe_with_noop_sink() {
324 let mut fixture = TestState::new();
325 let state = fixture.build();
326 state.emit_event(
327 "task-1",
328 Some("item-1"),
329 "heartbeat",
330 serde_json::json!({"ok": true}),
331 );
332 }
333
334 #[test]
335 fn update_config_runtime_replaces_snapshot_without_exposing_guards() {
336 let mut fixture = TestState::new();
337 let state = fixture.build();
338
339 let original = read_loaded_config(&state)
340 .expect("read loaded config")
341 .projects
342 .get(crate::config::DEFAULT_PROJECT_ID)
343 .and_then(|p| p.workflows.keys().next())
344 .cloned()
345 .unwrap_or_default();
346 update_config_runtime(&state, |current| {
347 let mut next = current.clone();
348 let workflow_clone = next
349 .active_config
350 .projects
351 .get(crate::config::DEFAULT_PROJECT_ID)
352 .and_then(|p| p.workflows.get(&original))
353 .cloned()
354 .expect("default workflow should exist");
355 Arc::make_mut(&mut next.active_config)
356 .projects
357 .entry(crate::config::DEFAULT_PROJECT_ID.to_string())
358 .or_insert_with(|| crate::config::ResolvedProject {
359 workspaces: HashMap::new(),
360 agents: HashMap::new(),
361 workflows: HashMap::new(),
362 step_templates: HashMap::new(),
363 env_stores: HashMap::new(),
364 secret_stores: HashMap::new(),
365 execution_profiles: HashMap::new(),
366 })
367 .workflows
368 .insert(format!("{}-updated", original), workflow_clone);
369 (next, ())
370 });
371
372 let updated_exists = read_loaded_config(&state)
373 .expect("re-read active config")
374 .projects
375 .get(crate::config::DEFAULT_PROJECT_ID)
376 .map(|p| p.workflows.contains_key(&format!("{}-updated", original)))
377 .unwrap_or(false);
378 assert!(updated_exists);
379 }
380
381 #[test]
382 fn read_active_config_rejects_non_runnable_snapshot() {
383 let mut fixture = TestState::new();
384 let state = fixture.build();
385
386 replace_active_config_status(
387 &state,
388 Some("active config is not runnable".to_string()),
389 None,
390 );
391
392 let error = read_active_config(&state).expect_err("non-runnable config should fail");
393 assert!(error.to_string().contains("not runnable"));
394 }
395
396 #[tokio::test]
397 async fn agent_health_and_metrics_reset_explicitly() {
398 let mut fixture = TestState::new();
399 let state = fixture.build();
400
401 state
402 .agent_health
403 .write()
404 .await
405 .insert("echo".to_string(), AgentHealthState::default());
406 state
407 .agent_metrics
408 .write()
409 .await
410 .insert("echo".to_string(), AgentMetrics::default());
411
412 state.agent_health.write().await.clear();
413 state.agent_metrics.write().await.clear();
414
415 assert!(state.agent_health.read().await.is_empty());
416 assert!(state.agent_metrics.read().await.is_empty());
417 }
418
419 #[test]
420 fn poisoned_event_sink_recovers_with_tracing_sink() {
421 let mut fixture = TestState::new();
422 let state = fixture.build();
423
424 let result = std::thread::spawn({
425 let state = state.clone();
426 move || {
427 let _guard = state.event_sink.write().expect("lock event_sink");
428 panic!("poison event_sink");
429 }
430 });
431 assert!(result.join().is_err());
432
433 state.emit_event(
434 "task-1",
435 Some("item-1"),
436 "heartbeat",
437 serde_json::json!({"ok": true}),
438 );
439
440 assert!(state.event_sink.read().is_ok());
441 }
442}