Skip to main content

agent_orchestrator/
runtime.rs

1use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
2use std::time::Instant;
3
4const STATE_SERVING: u8 = 0;
5const STATE_DRAINING: u8 = 1;
6const STATE_STOPPED: u8 = 2;
7
8/// Lifecycle state exposed by the daemon runtime snapshot.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum DaemonLifecycleState {
11    /// The daemon is accepting and executing work.
12    Serving,
13    /// The daemon is draining and should not accept new work.
14    Draining,
15    /// The daemon has fully stopped.
16    Stopped,
17}
18
19impl DaemonLifecycleState {
20    /// Returns the stable machine-readable lifecycle label.
21    pub fn as_str(self) -> &'static str {
22        match self {
23            Self::Serving => "serving",
24            Self::Draining => "draining",
25            Self::Stopped => "stopped",
26        }
27    }
28
29    fn as_u8(self) -> u8 {
30        match self {
31            Self::Serving => STATE_SERVING,
32            Self::Draining => STATE_DRAINING,
33            Self::Stopped => STATE_STOPPED,
34        }
35    }
36
37    fn from_u8(value: u8) -> Self {
38        match value {
39            STATE_DRAINING => Self::Draining,
40            STATE_STOPPED => Self::Stopped,
41            _ => Self::Serving,
42        }
43    }
44}
45
46/// Runtime counters exported by the daemon control plane.
47#[derive(Debug, Clone)]
48pub struct DaemonRuntimeSnapshot {
49    /// Seconds elapsed since the runtime state was created.
50    pub uptime_secs: u64,
51    /// Current lifecycle state of the daemon.
52    pub lifecycle_state: DaemonLifecycleState,
53    /// Whether shutdown has been requested.
54    pub shutdown_requested: bool,
55    /// Configured worker pool size.
56    pub configured_workers: u64,
57    /// Workers currently alive.
58    pub live_workers: u64,
59    /// Workers currently idle.
60    pub idle_workers: u64,
61    /// Workers currently running a task.
62    pub active_workers: u64,
63    /// Number of tasks currently executing.
64    pub running_tasks: u64,
65    /// Total number of worker restarts due to panics.
66    pub total_worker_restarts: u64,
67    /// Persistent daemon incarnation counter (incremented on each startup).
68    pub incarnation: u64,
69    /// Whether the daemon is in maintenance mode (blocks new task creation).
70    pub maintenance_mode: bool,
71}
72
73/// Shared atomic runtime counters for the daemon process.
74pub struct DaemonRuntimeState {
75    started_at: Instant,
76    lifecycle_state: AtomicU8,
77    shutdown_requested: AtomicBool,
78    configured_workers: AtomicU64,
79    live_workers: AtomicU64,
80    idle_workers: AtomicU64,
81    active_workers: AtomicU64,
82    running_tasks: AtomicU64,
83    total_worker_restarts: AtomicU64,
84    incarnation: AtomicU64,
85    maintenance_mode: AtomicBool,
86}
87
88impl Default for DaemonRuntimeState {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl DaemonRuntimeState {
95    /// Creates a runtime state initialized in the `serving` lifecycle state.
96    pub fn new() -> Self {
97        Self {
98            started_at: Instant::now(),
99            lifecycle_state: AtomicU8::new(STATE_SERVING),
100            shutdown_requested: AtomicBool::new(false),
101            configured_workers: AtomicU64::new(0),
102            live_workers: AtomicU64::new(0),
103            idle_workers: AtomicU64::new(0),
104            active_workers: AtomicU64::new(0),
105            running_tasks: AtomicU64::new(0),
106            total_worker_restarts: AtomicU64::new(0),
107            incarnation: AtomicU64::new(0),
108            maintenance_mode: AtomicBool::new(false),
109        }
110    }
111
112    /// Produces a point-in-time snapshot of daemon counters.
113    pub fn snapshot(&self) -> DaemonRuntimeSnapshot {
114        DaemonRuntimeSnapshot {
115            uptime_secs: self.started_at.elapsed().as_secs(),
116            lifecycle_state: DaemonLifecycleState::from_u8(
117                self.lifecycle_state.load(Ordering::SeqCst),
118            ),
119            shutdown_requested: self.shutdown_requested.load(Ordering::SeqCst),
120            configured_workers: self.configured_workers.load(Ordering::SeqCst),
121            live_workers: self.live_workers.load(Ordering::SeqCst),
122            idle_workers: self.idle_workers.load(Ordering::SeqCst),
123            active_workers: self.active_workers.load(Ordering::SeqCst),
124            running_tasks: self.running_tasks.load(Ordering::SeqCst),
125            total_worker_restarts: self.total_worker_restarts.load(Ordering::SeqCst),
126            incarnation: self.incarnation.load(Ordering::SeqCst),
127            maintenance_mode: self.maintenance_mode.load(Ordering::SeqCst),
128        }
129    }
130
131    /// Updates the configured worker count published by the runtime snapshot.
132    pub fn set_configured_workers(&self, count: usize) {
133        self.configured_workers
134            .store(count as u64, Ordering::SeqCst);
135    }
136
137    /// Requests shutdown and transitions the runtime into `draining`.
138    pub fn request_shutdown(&self) -> bool {
139        let first = !self.shutdown_requested.swap(true, Ordering::SeqCst);
140        self.lifecycle_state
141            .store(DaemonLifecycleState::Draining.as_u8(), Ordering::SeqCst);
142        first
143    }
144
145    /// Marks the daemon lifecycle as fully stopped.
146    pub fn mark_stopped(&self) {
147        self.lifecycle_state
148            .store(DaemonLifecycleState::Stopped.as_u8(), Ordering::SeqCst);
149    }
150
151    /// Increments live and idle worker counters for a newly started worker.
152    pub fn worker_started(&self) {
153        self.live_workers.fetch_add(1, Ordering::SeqCst);
154        self.idle_workers.fetch_add(1, Ordering::SeqCst);
155    }
156
157    /// Decrements worker counters when a worker exits.
158    pub fn worker_stopped(&self, was_busy: bool) {
159        self.live_workers.fetch_sub(1, Ordering::SeqCst);
160        if was_busy {
161            self.active_workers.fetch_sub(1, Ordering::SeqCst);
162        } else {
163            self.idle_workers.fetch_sub(1, Ordering::SeqCst);
164        }
165    }
166
167    /// Moves one worker from idle to active state.
168    pub fn worker_became_busy(&self) {
169        self.idle_workers.fetch_sub(1, Ordering::SeqCst);
170        self.active_workers.fetch_add(1, Ordering::SeqCst);
171    }
172
173    /// Moves one worker from active to idle state.
174    pub fn worker_became_idle(&self) {
175        self.active_workers.fetch_sub(1, Ordering::SeqCst);
176        self.idle_workers.fetch_add(1, Ordering::SeqCst);
177    }
178
179    /// Increments the count of running tasks.
180    pub fn running_task_started(&self) {
181        self.running_tasks.fetch_add(1, Ordering::SeqCst);
182    }
183
184    /// Decrements the count of running tasks.
185    pub fn running_task_finished(&self) {
186        self.running_tasks.fetch_sub(1, Ordering::SeqCst);
187    }
188
189    /// Records a worker restart after a panic recovery.
190    pub fn record_worker_restart(&self) {
191        self.total_worker_restarts.fetch_add(1, Ordering::SeqCst);
192    }
193
194    /// Sets the persistent incarnation counter loaded from the database.
195    pub fn set_incarnation(&self, value: u64) {
196        self.incarnation.store(value, Ordering::SeqCst);
197    }
198
199    /// Enables or disables maintenance mode.
200    pub fn set_maintenance_mode(&self, enabled: bool) {
201        self.maintenance_mode.store(enabled, Ordering::SeqCst);
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[test]
210    fn snapshot_reflects_state_transitions() {
211        let runtime = DaemonRuntimeState::new();
212        runtime.set_configured_workers(2);
213        runtime.worker_started();
214        runtime.worker_started();
215        runtime.worker_became_busy();
216        runtime.running_task_started();
217
218        let serving = runtime.snapshot();
219        assert_eq!(serving.lifecycle_state, DaemonLifecycleState::Serving);
220        assert_eq!(serving.configured_workers, 2);
221        assert_eq!(serving.live_workers, 2);
222        assert_eq!(serving.idle_workers, 1);
223        assert_eq!(serving.active_workers, 1);
224        assert_eq!(serving.running_tasks, 1);
225        assert!(!serving.shutdown_requested);
226        assert_eq!(serving.total_worker_restarts, 0);
227        assert_eq!(serving.incarnation, 0);
228        assert!(!serving.maintenance_mode);
229
230        runtime.record_worker_restart();
231        runtime.record_worker_restart();
232        assert_eq!(runtime.snapshot().total_worker_restarts, 2);
233
234        assert!(runtime.request_shutdown());
235        let draining = runtime.snapshot();
236        assert_eq!(draining.lifecycle_state, DaemonLifecycleState::Draining);
237        assert!(draining.shutdown_requested);
238
239        runtime.running_task_finished();
240        runtime.worker_became_idle();
241        runtime.worker_stopped(false);
242        runtime.worker_stopped(false);
243        runtime.mark_stopped();
244        let stopped = runtime.snapshot();
245        assert_eq!(stopped.lifecycle_state, DaemonLifecycleState::Stopped);
246        assert_eq!(stopped.live_workers, 0);
247        assert_eq!(stopped.idle_workers, 0);
248        assert_eq!(stopped.active_workers, 0);
249        assert_eq!(stopped.running_tasks, 0);
250    }
251}