Skip to main content

claude_pool/
supervisor.rs

1//! Background supervisor loop for slot health monitoring.
2//!
3//! The supervisor periodically checks slot health and automatically restarts
4//! errored slots that haven't exceeded their restart limit. Enable it via
5//! [`crate::PoolConfig::supervisor_enabled`] and configure the interval with
6//! [`crate::PoolConfig::supervisor_interval_secs`].
7
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use crate::pool::Pool;
12use crate::store::PoolStore;
13use crate::types::{SlotRecord, SlotState};
14
15/// Handle to a running supervisor loop.
16///
17/// Returned by [`Pool::start_supervisor`]. Dropping the handle does **not**
18/// stop the loop — call [`SupervisorHandle::stop`]
19/// explicitly.
20pub struct SupervisorHandle {
21    stop_tx: watch::Sender<bool>,
22    handle: JoinHandle<()>,
23}
24
25impl SupervisorHandle {
26    /// Signal the supervisor to stop and wait for it to finish.
27    pub async fn stop(self) {
28        let _ = self.stop_tx.send(true);
29        let _ = self.handle.await;
30    }
31}
32
33/// Run one health-check pass over all slots.
34///
35/// Errored slots below the restart limit are reset to idle.
36/// Returns the number of slots restarted.
37pub async fn check_and_restart_slots<S: PoolStore + 'static>(pool: &Pool<S>) -> usize {
38    let max_restarts = pool.config().max_restarts;
39    let slots = match pool.store().list_slots().await {
40        Ok(s) => s,
41        Err(e) => {
42            tracing::warn!(error = %e, "supervisor: failed to list slots");
43            return 0;
44        }
45    };
46
47    let mut restarted = 0;
48    for slot in slots {
49        if slot.state == SlotState::Errored && slot.restart_count < max_restarts {
50            if let Err(e) = restart_slot(pool, &slot).await {
51                tracing::warn!(
52                    slot_id = %slot.id.0,
53                    error = %e,
54                    "supervisor: failed to restart slot"
55                );
56            } else {
57                restarted += 1;
58                tracing::info!(
59                    slot_id = %slot.id.0,
60                    restart_count = slot.restart_count + 1,
61                    "supervisor: restarted errored slot"
62                );
63            }
64        }
65    }
66    restarted
67}
68
69/// Reset an errored slot back to idle and increment its restart counter.
70async fn restart_slot<S: PoolStore + 'static>(
71    pool: &Pool<S>,
72    slot: &SlotRecord,
73) -> crate::Result<()> {
74    let mut updated = slot.clone();
75    updated.state = SlotState::Idle;
76    updated.current_task = None;
77    updated.session_id = None;
78    updated.restart_count += 1;
79    pool.store().put_slot(updated).await?;
80    Ok(())
81}
82
83/// Spawn the background supervisor loop.
84///
85/// The loop runs every `interval_secs` seconds until signalled to stop.
86pub(crate) fn spawn_supervisor<S: PoolStore + 'static>(
87    pool: Pool<S>,
88    interval_secs: u64,
89) -> SupervisorHandle {
90    let (stop_tx, mut stop_rx) = watch::channel(false);
91    let interval = std::time::Duration::from_secs(interval_secs);
92
93    let handle = tokio::spawn(async move {
94        loop {
95            tokio::select! {
96                _ = tokio::time::sleep(interval) => {
97                    check_and_restart_slots(&pool).await;
98                }
99                _ = stop_rx.changed() => {
100                    break;
101                }
102            }
103        }
104    });
105
106    SupervisorHandle { stop_tx, handle }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use crate::types::{PoolConfig, SlotId};
113    use claude_wrapper::Claude;
114
115    fn mock_claude() -> Claude {
116        Claude::builder().binary("/usr/bin/false").build().unwrap()
117    }
118
119    #[tokio::test]
120    async fn check_restarts_errored_slots() {
121        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
122
123        // Mark slot-0 as errored.
124        let mut slot = pool
125            .store()
126            .get_slot(&SlotId("slot-0".into()))
127            .await
128            .unwrap()
129            .unwrap();
130        slot.state = SlotState::Errored;
131        pool.store().put_slot(slot).await.unwrap();
132
133        let restarted = check_and_restart_slots(&pool).await;
134        assert_eq!(restarted, 1);
135
136        // Slot should now be idle with restart_count = 1.
137        let slot = pool
138            .store()
139            .get_slot(&SlotId("slot-0".into()))
140            .await
141            .unwrap()
142            .unwrap();
143        assert_eq!(slot.state, SlotState::Idle);
144        assert_eq!(slot.restart_count, 1);
145    }
146
147    #[tokio::test]
148    async fn check_skips_slots_at_restart_limit() {
149        let config = PoolConfig {
150            max_restarts: 2,
151            ..Default::default()
152        };
153        let pool = Pool::builder(mock_claude())
154            .slots(1)
155            .config(config)
156            .build()
157            .await
158            .unwrap();
159
160        // Mark slot-0 as errored with restart_count at the limit.
161        let mut slot = pool
162            .store()
163            .get_slot(&SlotId("slot-0".into()))
164            .await
165            .unwrap()
166            .unwrap();
167        slot.state = SlotState::Errored;
168        slot.restart_count = 2;
169        pool.store().put_slot(slot).await.unwrap();
170
171        let restarted = check_and_restart_slots(&pool).await;
172        assert_eq!(restarted, 0);
173
174        // Slot should still be errored.
175        let slot = pool
176            .store()
177            .get_slot(&SlotId("slot-0".into()))
178            .await
179            .unwrap()
180            .unwrap();
181        assert_eq!(slot.state, SlotState::Errored);
182    }
183
184    #[tokio::test]
185    async fn check_ignores_idle_and_busy_slots() {
186        let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
187
188        // Mark slot-1 as busy.
189        let mut slot = pool
190            .store()
191            .get_slot(&SlotId("slot-1".into()))
192            .await
193            .unwrap()
194            .unwrap();
195        slot.state = SlotState::Busy;
196        pool.store().put_slot(slot).await.unwrap();
197
198        let restarted = check_and_restart_slots(&pool).await;
199        assert_eq!(restarted, 0);
200    }
201
202    #[tokio::test]
203    async fn start_supervisor_returns_none_when_disabled() {
204        let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
205        assert!(pool.start_supervisor().is_none());
206    }
207
208    #[tokio::test]
209    async fn start_supervisor_returns_handle_when_enabled() {
210        let config = PoolConfig {
211            supervisor_enabled: true,
212            supervisor_interval_secs: 1,
213            ..Default::default()
214        };
215        let pool = Pool::builder(mock_claude())
216            .slots(1)
217            .config(config)
218            .build()
219            .await
220            .unwrap();
221
222        let handle = pool.start_supervisor().expect("should return handle");
223        handle.stop().await;
224    }
225}