Skip to main content

agent_orchestrator/
runtime.rs

1use std::path::PathBuf;
2use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
3use std::time::Instant;
4
5const STATE_SERVING: u8 = 0;
6const STATE_DRAINING: u8 = 1;
7const STATE_STOPPED: u8 = 2;
8
9/// Lifecycle state exposed by the daemon runtime snapshot.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum DaemonLifecycleState {
12    /// The daemon is accepting and executing work.
13    Serving,
14    /// The daemon is draining and should not accept new work.
15    Draining,
16    /// The daemon has fully stopped.
17    Stopped,
18}
19
20impl DaemonLifecycleState {
21    /// Returns the stable machine-readable lifecycle label.
22    pub fn as_str(self) -> &'static str {
23        match self {
24            Self::Serving => "serving",
25            Self::Draining => "draining",
26            Self::Stopped => "stopped",
27        }
28    }
29
30    fn as_u8(self) -> u8 {
31        match self {
32            Self::Serving => STATE_SERVING,
33            Self::Draining => STATE_DRAINING,
34            Self::Stopped => STATE_STOPPED,
35        }
36    }
37
38    fn from_u8(value: u8) -> Self {
39        match value {
40            STATE_DRAINING => Self::Draining,
41            STATE_STOPPED => Self::Stopped,
42            _ => Self::Serving,
43        }
44    }
45}
46
47/// Runtime counters exported by the daemon control plane.
48#[derive(Debug, Clone)]
49pub struct DaemonRuntimeSnapshot {
50    /// Seconds elapsed since the runtime state was created.
51    pub uptime_secs: u64,
52    /// Current lifecycle state of the daemon.
53    pub lifecycle_state: DaemonLifecycleState,
54    /// Whether shutdown has been requested.
55    pub shutdown_requested: bool,
56    /// Configured worker pool size.
57    pub configured_workers: u64,
58    /// Workers currently alive.
59    pub live_workers: u64,
60    /// Workers currently idle.
61    pub idle_workers: u64,
62    /// Workers currently running a task.
63    pub active_workers: u64,
64    /// Number of tasks currently executing.
65    pub running_tasks: u64,
66    /// Total number of worker restarts due to panics.
67    pub total_worker_restarts: u64,
68    /// Persistent daemon incarnation counter (incremented on each startup).
69    pub incarnation: u64,
70    /// Whether the daemon is in maintenance mode (blocks new task creation).
71    pub maintenance_mode: bool,
72}
73
74/// Shared atomic runtime counters for the daemon process.
75pub struct DaemonRuntimeState {
76    started_at: Instant,
77    lifecycle_state: AtomicU8,
78    shutdown_requested: AtomicBool,
79    configured_workers: AtomicU64,
80    live_workers: AtomicU64,
81    idle_workers: AtomicU64,
82    active_workers: AtomicU64,
83    running_tasks: AtomicU64,
84    total_worker_restarts: AtomicU64,
85    incarnation: AtomicU64,
86    maintenance_mode: AtomicBool,
87    /// Binary path for a deferred restart (when other tasks are still running).
88    deferred_restart_binary: std::sync::Mutex<Option<PathBuf>>,
89}
90
91impl Default for DaemonRuntimeState {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97impl DaemonRuntimeState {
98    /// Creates a runtime state initialized in the `serving` lifecycle state.
99    pub fn new() -> Self {
100        Self {
101            started_at: Instant::now(),
102            lifecycle_state: AtomicU8::new(STATE_SERVING),
103            shutdown_requested: AtomicBool::new(false),
104            configured_workers: AtomicU64::new(0),
105            live_workers: AtomicU64::new(0),
106            idle_workers: AtomicU64::new(0),
107            active_workers: AtomicU64::new(0),
108            running_tasks: AtomicU64::new(0),
109            total_worker_restarts: AtomicU64::new(0),
110            incarnation: AtomicU64::new(0),
111            maintenance_mode: AtomicBool::new(false),
112            deferred_restart_binary: std::sync::Mutex::new(None),
113        }
114    }
115
116    /// Produces a point-in-time snapshot of daemon counters.
117    pub fn snapshot(&self) -> DaemonRuntimeSnapshot {
118        DaemonRuntimeSnapshot {
119            uptime_secs: self.started_at.elapsed().as_secs(),
120            lifecycle_state: DaemonLifecycleState::from_u8(
121                self.lifecycle_state.load(Ordering::SeqCst),
122            ),
123            shutdown_requested: self.shutdown_requested.load(Ordering::SeqCst),
124            configured_workers: self.configured_workers.load(Ordering::SeqCst),
125            live_workers: self.live_workers.load(Ordering::SeqCst),
126            idle_workers: self.idle_workers.load(Ordering::SeqCst),
127            active_workers: self.active_workers.load(Ordering::SeqCst),
128            running_tasks: self.running_tasks.load(Ordering::SeqCst),
129            total_worker_restarts: self.total_worker_restarts.load(Ordering::SeqCst),
130            incarnation: self.incarnation.load(Ordering::SeqCst),
131            maintenance_mode: self.maintenance_mode.load(Ordering::SeqCst),
132        }
133    }
134
135    /// Updates the configured worker count published by the runtime snapshot.
136    pub fn set_configured_workers(&self, count: usize) {
137        self.configured_workers
138            .store(count as u64, Ordering::SeqCst);
139    }
140
141    /// Requests shutdown and transitions the runtime into `draining`.
142    pub fn request_shutdown(&self) -> bool {
143        let first = !self.shutdown_requested.swap(true, Ordering::SeqCst);
144        self.lifecycle_state
145            .store(DaemonLifecycleState::Draining.as_u8(), Ordering::SeqCst);
146        first
147    }
148
149    /// Stores a binary path for deferred restart (when other tasks are running).
150    pub fn set_deferred_restart(&self, binary_path: PathBuf) {
151        let mut guard = self.deferred_restart_binary.lock().unwrap();
152        *guard = Some(binary_path);
153    }
154
155    /// Atomically takes the deferred restart binary path, if any.
156    pub fn take_deferred_restart(&self) -> Option<PathBuf> {
157        let mut guard = self.deferred_restart_binary.lock().unwrap();
158        guard.take()
159    }
160
161    /// Marks the daemon lifecycle as fully stopped.
162    pub fn mark_stopped(&self) {
163        self.lifecycle_state
164            .store(DaemonLifecycleState::Stopped.as_u8(), Ordering::SeqCst);
165    }
166
167    /// Increments live and idle worker counters for a newly started worker.
168    pub fn worker_started(&self) {
169        self.live_workers.fetch_add(1, Ordering::SeqCst);
170        self.idle_workers.fetch_add(1, Ordering::SeqCst);
171    }
172
173    /// Decrements worker counters when a worker exits.
174    pub fn worker_stopped(&self, was_busy: bool) {
175        self.live_workers.fetch_sub(1, Ordering::SeqCst);
176        if was_busy {
177            self.active_workers.fetch_sub(1, Ordering::SeqCst);
178        } else {
179            self.idle_workers.fetch_sub(1, Ordering::SeqCst);
180        }
181    }
182
183    /// Moves one worker from idle to active state.
184    pub fn worker_became_busy(&self) {
185        self.idle_workers.fetch_sub(1, Ordering::SeqCst);
186        self.active_workers.fetch_add(1, Ordering::SeqCst);
187    }
188
189    /// Moves one worker from active to idle state.
190    pub fn worker_became_idle(&self) {
191        self.active_workers.fetch_sub(1, Ordering::SeqCst);
192        self.idle_workers.fetch_add(1, Ordering::SeqCst);
193    }
194
195    /// Increments the count of running tasks.
196    pub fn running_task_started(&self) {
197        self.running_tasks.fetch_add(1, Ordering::SeqCst);
198    }
199
200    /// Decrements the count of running tasks.
201    pub fn running_task_finished(&self) {
202        self.running_tasks.fetch_sub(1, Ordering::SeqCst);
203    }
204
205    /// Records a worker restart after a panic recovery.
206    pub fn record_worker_restart(&self) {
207        self.total_worker_restarts.fetch_add(1, Ordering::SeqCst);
208    }
209
210    /// Sets the persistent incarnation counter loaded from the database.
211    pub fn set_incarnation(&self, value: u64) {
212        self.incarnation.store(value, Ordering::SeqCst);
213    }
214
215    /// Enables or disables maintenance mode.
216    pub fn set_maintenance_mode(&self, enabled: bool) {
217        self.maintenance_mode.store(enabled, Ordering::SeqCst);
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    #[test]
226    fn snapshot_reflects_state_transitions() {
227        let runtime = DaemonRuntimeState::new();
228        runtime.set_configured_workers(2);
229        runtime.worker_started();
230        runtime.worker_started();
231        runtime.worker_became_busy();
232        runtime.running_task_started();
233
234        let serving = runtime.snapshot();
235        assert_eq!(serving.lifecycle_state, DaemonLifecycleState::Serving);
236        assert_eq!(serving.configured_workers, 2);
237        assert_eq!(serving.live_workers, 2);
238        assert_eq!(serving.idle_workers, 1);
239        assert_eq!(serving.active_workers, 1);
240        assert_eq!(serving.running_tasks, 1);
241        assert!(!serving.shutdown_requested);
242        assert_eq!(serving.total_worker_restarts, 0);
243        assert_eq!(serving.incarnation, 0);
244        assert!(!serving.maintenance_mode);
245
246        runtime.record_worker_restart();
247        runtime.record_worker_restart();
248        assert_eq!(runtime.snapshot().total_worker_restarts, 2);
249
250        assert!(runtime.request_shutdown());
251        let draining = runtime.snapshot();
252        assert_eq!(draining.lifecycle_state, DaemonLifecycleState::Draining);
253        assert!(draining.shutdown_requested);
254
255        runtime.running_task_finished();
256        runtime.worker_became_idle();
257        runtime.worker_stopped(false);
258        runtime.worker_stopped(false);
259        runtime.mark_stopped();
260        let stopped = runtime.snapshot();
261        assert_eq!(stopped.lifecycle_state, DaemonLifecycleState::Stopped);
262        assert_eq!(stopped.live_workers, 0);
263        assert_eq!(stopped.idle_workers, 0);
264        assert_eq!(stopped.active_workers, 0);
265        assert_eq!(stopped.running_tasks, 0);
266    }
267}