Skip to main content

agent_orchestrator/
runtime.rs

1use std::path::PathBuf;
2use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
3use std::time::Instant;
4
5const STATE_SERVING: u8 = 0;
6const STATE_DRAINING: u8 = 1;
7const STATE_STOPPED: u8 = 2;
8
9/// Lifecycle state exposed by the daemon runtime snapshot.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DaemonLifecycleState {
12    /// The daemon is accepting and executing work.
13    Serving,
14    /// The daemon is draining and should not accept new work.
15    Draining,
16    /// The daemon has fully stopped.
17    Stopped,
18}
19
20impl DaemonLifecycleState {
21    /// Returns the stable machine-readable lifecycle label.
22    pub fn as_str(self) -> &'static str {
23        match self {
24            Self::Serving => "serving",
25            Self::Draining => "draining",
26            Self::Stopped => "stopped",
27        }
28    }
29
30    fn as_u8(self) -> u8 {
31        match self {
32            Self::Serving => STATE_SERVING,
33            Self::Draining => STATE_DRAINING,
34            Self::Stopped => STATE_STOPPED,
35        }
36    }
37
38    fn from_u8(value: u8) -> Self {
39        match value {
40            STATE_DRAINING => Self::Draining,
41            STATE_STOPPED => Self::Stopped,
42            _ => Self::Serving,
43        }
44    }
45}
46
47/// Runtime counters exported by the daemon control plane.
48#[derive(Debug, Clone)]
49pub struct DaemonRuntimeSnapshot {
50    /// Seconds elapsed since the runtime state was created.
51    pub uptime_secs: u64,
52    /// Current lifecycle state of the daemon.
53    pub lifecycle_state: DaemonLifecycleState,
54    /// Whether shutdown has been requested.
55    pub shutdown_requested: bool,
56    /// Configured worker pool size.
57    pub configured_workers: u64,
58    /// Workers currently alive.
59    pub live_workers: u64,
60    /// Workers currently idle.
61    pub idle_workers: u64,
62    /// Workers currently running a task.
63    pub active_workers: u64,
64    /// Number of tasks currently executing.
65    pub running_tasks: u64,
66    /// Total number of worker restarts due to panics.
67    pub total_worker_restarts: u64,
68    /// Persistent daemon incarnation counter (incremented on each startup).
69    pub incarnation: u64,
70    /// Whether the daemon is in maintenance mode (blocks new task creation).
71    pub maintenance_mode: bool,
72}
73
74/// Shared atomic runtime counters for the daemon process.
75pub struct DaemonRuntimeState {
76    started_at: Instant,
77    lifecycle_state: AtomicU8,
78    shutdown_requested: AtomicBool,
79    configured_workers: AtomicU64,
80    live_workers: AtomicU64,
81    idle_workers: AtomicU64,
82    active_workers: AtomicU64,
83    running_tasks: AtomicU64,
84    total_worker_restarts: AtomicU64,
85    incarnation: AtomicU64,
86    maintenance_mode: AtomicBool,
87    /// Binary path for a deferred restart (when other tasks are still running).
88    deferred_restart_binary: std::sync::Mutex<Option<PathBuf>>,
89}
90
91impl Default for DaemonRuntimeState {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97impl DaemonRuntimeState {
98    /// Creates a runtime state initialized in the `serving` lifecycle state.
99    pub fn new() -> Self {
100        Self {
101            started_at: Instant::now(),
102            lifecycle_state: AtomicU8::new(STATE_SERVING),
103            shutdown_requested: AtomicBool::new(false),
104            configured_workers: AtomicU64::new(0),
105            live_workers: AtomicU64::new(0),
106            idle_workers: AtomicU64::new(0),
107            active_workers: AtomicU64::new(0),
108            running_tasks: AtomicU64::new(0),
109            total_worker_restarts: AtomicU64::new(0),
110            incarnation: AtomicU64::new(0),
111            maintenance_mode: AtomicBool::new(false),
112            deferred_restart_binary: std::sync::Mutex::new(None),
113        }
114    }
115
116    /// Produces a point-in-time snapshot of daemon counters.
117    pub fn snapshot(&self) -> DaemonRuntimeSnapshot {
118        DaemonRuntimeSnapshot {
119            uptime_secs: self.started_at.elapsed().as_secs(),
120            lifecycle_state: DaemonLifecycleState::from_u8(
121                self.lifecycle_state.load(Ordering::SeqCst),
122            ),
123            shutdown_requested: self.shutdown_requested.load(Ordering::SeqCst),
124            configured_workers: self.configured_workers.load(Ordering::SeqCst),
125            live_workers: self.live_workers.load(Ordering::SeqCst),
126            idle_workers: self.idle_workers.load(Ordering::SeqCst),
127            active_workers: self.active_workers.load(Ordering::SeqCst),
128            running_tasks: self.running_tasks.load(Ordering::SeqCst),
129            total_worker_restarts: self.total_worker_restarts.load(Ordering::SeqCst),
130            incarnation: self.incarnation.load(Ordering::SeqCst),
131            maintenance_mode: self.maintenance_mode.load(Ordering::SeqCst),
132        }
133    }
134
135    /// Updates the configured worker count published by the runtime snapshot.
136    pub fn set_configured_workers(&self, count: usize) {
137        self.configured_workers
138            .store(count as u64, Ordering::SeqCst);
139    }
140
141    /// Requests shutdown and transitions the runtime into `draining`.
142    pub fn request_shutdown(&self) -> bool {
143        let first = !self.shutdown_requested.swap(true, Ordering::SeqCst);
144        self.lifecycle_state
145            .store(DaemonLifecycleState::Draining.as_u8(), Ordering::SeqCst);
146        first
147    }
148
149    /// Stores a binary path for deferred restart (when other tasks are running).
150    pub fn set_deferred_restart(&self, binary_path: PathBuf) {
151        let mut guard = self
152            .deferred_restart_binary
153            .lock()
154            .unwrap_or_else(|e| e.into_inner());
155        *guard = Some(binary_path);
156    }
157
158    /// Atomically takes the deferred restart binary path, if any.
159    pub fn take_deferred_restart(&self) -> Option<PathBuf> {
160        let mut guard = self
161            .deferred_restart_binary
162            .lock()
163            .unwrap_or_else(|e| e.into_inner());
164        guard.take()
165    }
166
167    /// Marks the daemon lifecycle as fully stopped.
168    pub fn mark_stopped(&self) {
169        self.lifecycle_state
170            .store(DaemonLifecycleState::Stopped.as_u8(), Ordering::SeqCst);
171    }
172
173    /// Increments live and idle worker counters for a newly started worker.
174    pub fn worker_started(&self) {
175        self.live_workers.fetch_add(1, Ordering::SeqCst);
176        self.idle_workers.fetch_add(1, Ordering::SeqCst);
177    }
178
179    /// Decrements worker counters when a worker exits.
180    pub fn worker_stopped(&self, was_busy: bool) {
181        self.live_workers.fetch_sub(1, Ordering::SeqCst);
182        if was_busy {
183            self.active_workers.fetch_sub(1, Ordering::SeqCst);
184        } else {
185            self.idle_workers.fetch_sub(1, Ordering::SeqCst);
186        }
187    }
188
189    /// Moves one worker from idle to active state.
190    pub fn worker_became_busy(&self) {
191        self.idle_workers.fetch_sub(1, Ordering::SeqCst);
192        self.active_workers.fetch_add(1, Ordering::SeqCst);
193    }
194
195    /// Moves one worker from active to idle state.
196    pub fn worker_became_idle(&self) {
197        self.active_workers.fetch_sub(1, Ordering::SeqCst);
198        self.idle_workers.fetch_add(1, Ordering::SeqCst);
199    }
200
201    /// Increments the count of running tasks.
202    pub fn running_task_started(&self) {
203        self.running_tasks.fetch_add(1, Ordering::SeqCst);
204    }
205
206    /// Decrements the count of running tasks.
207    pub fn running_task_finished(&self) {
208        self.running_tasks.fetch_sub(1, Ordering::SeqCst);
209    }
210
211    /// Records a worker restart after a panic recovery.
212    pub fn record_worker_restart(&self) {
213        self.total_worker_restarts.fetch_add(1, Ordering::SeqCst);
214    }
215
216    /// Sets the persistent incarnation counter loaded from the database.
217    pub fn set_incarnation(&self, value: u64) {
218        self.incarnation.store(value, Ordering::SeqCst);
219    }
220
221    /// Enables or disables maintenance mode.
222    pub fn set_maintenance_mode(&self, enabled: bool) {
223        self.maintenance_mode.store(enabled, Ordering::SeqCst);
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn snapshot_reflects_state_transitions() {
233        let runtime = DaemonRuntimeState::new();
234        runtime.set_configured_workers(2);
235        runtime.worker_started();
236        runtime.worker_started();
237        runtime.worker_became_busy();
238        runtime.running_task_started();
239
240        let serving = runtime.snapshot();
241        assert_eq!(serving.lifecycle_state, DaemonLifecycleState::Serving);
242        assert_eq!(serving.configured_workers, 2);
243        assert_eq!(serving.live_workers, 2);
244        assert_eq!(serving.idle_workers, 1);
245        assert_eq!(serving.active_workers, 1);
246        assert_eq!(serving.running_tasks, 1);
247        assert!(!serving.shutdown_requested);
248        assert_eq!(serving.total_worker_restarts, 0);
249        assert_eq!(serving.incarnation, 0);
250        assert!(!serving.maintenance_mode);
251
252        runtime.record_worker_restart();
253        runtime.record_worker_restart();
254        assert_eq!(runtime.snapshot().total_worker_restarts, 2);
255
256        assert!(runtime.request_shutdown());
257        let draining = runtime.snapshot();
258        assert_eq!(draining.lifecycle_state, DaemonLifecycleState::Draining);
259        assert!(draining.shutdown_requested);
260
261        runtime.running_task_finished();
262        runtime.worker_became_idle();
263        runtime.worker_stopped(false);
264        runtime.worker_stopped(false);
265        runtime.mark_stopped();
266        let stopped = runtime.snapshot();
267        assert_eq!(stopped.lifecycle_state, DaemonLifecycleState::Stopped);
268        assert_eq!(stopped.live_workers, 0);
269        assert_eq!(stopped.idle_workers, 0);
270        assert_eq!(stopped.active_workers, 0);
271        assert_eq!(stopped.running_tasks, 0);
272    }
273}