claude_pool/
supervisor.rs1use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use crate::pool::Pool;
12use crate::store::PoolStore;
13use crate::types::{SlotRecord, SlotState};
14
15pub struct SupervisorHandle {
21 stop_tx: watch::Sender<bool>,
22 handle: JoinHandle<()>,
23}
24
25impl SupervisorHandle {
26 pub async fn stop(self) {
28 let _ = self.stop_tx.send(true);
29 let _ = self.handle.await;
30 }
31}
32
33pub async fn check_and_restart_slots<S: PoolStore + 'static>(pool: &Pool<S>) -> usize {
38 let max_restarts = pool.config().max_restarts;
39 let slots = match pool.store().list_slots().await {
40 Ok(s) => s,
41 Err(e) => {
42 tracing::warn!(error = %e, "supervisor: failed to list slots");
43 return 0;
44 }
45 };
46
47 let mut restarted = 0;
48 for slot in slots {
49 if slot.state == SlotState::Errored && slot.restart_count < max_restarts {
50 if let Err(e) = restart_slot(pool, &slot).await {
51 tracing::warn!(
52 slot_id = %slot.id.0,
53 error = %e,
54 "supervisor: failed to restart slot"
55 );
56 } else {
57 restarted += 1;
58 tracing::info!(
59 slot_id = %slot.id.0,
60 restart_count = slot.restart_count + 1,
61 "supervisor: restarted errored slot"
62 );
63 }
64 }
65 }
66 restarted
67}
68
69async fn restart_slot<S: PoolStore + 'static>(
71 pool: &Pool<S>,
72 slot: &SlotRecord,
73) -> crate::Result<()> {
74 let mut updated = slot.clone();
75 updated.state = SlotState::Idle;
76 updated.current_task = None;
77 updated.session_id = None;
78 updated.restart_count += 1;
79 pool.store().put_slot(updated).await?;
80 Ok(())
81}
82
83pub(crate) fn spawn_supervisor<S: PoolStore + 'static>(
87 pool: Pool<S>,
88 interval_secs: u64,
89) -> SupervisorHandle {
90 let (stop_tx, mut stop_rx) = watch::channel(false);
91 let interval = std::time::Duration::from_secs(interval_secs);
92
93 let handle = tokio::spawn(async move {
94 loop {
95 tokio::select! {
96 _ = tokio::time::sleep(interval) => {
97 check_and_restart_slots(&pool).await;
98 }
99 _ = stop_rx.changed() => {
100 break;
101 }
102 }
103 }
104 });
105
106 SupervisorHandle { stop_tx, handle }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use crate::types::{PoolConfig, SlotId};
113 use claude_wrapper::Claude;
114
115 fn mock_claude() -> Claude {
116 Claude::builder().binary("/usr/bin/false").build().unwrap()
117 }
118
119 #[tokio::test]
120 async fn check_restarts_errored_slots() {
121 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
122
123 let mut slot = pool
125 .store()
126 .get_slot(&SlotId("slot-0".into()))
127 .await
128 .unwrap()
129 .unwrap();
130 slot.state = SlotState::Errored;
131 pool.store().put_slot(slot).await.unwrap();
132
133 let restarted = check_and_restart_slots(&pool).await;
134 assert_eq!(restarted, 1);
135
136 let slot = pool
138 .store()
139 .get_slot(&SlotId("slot-0".into()))
140 .await
141 .unwrap()
142 .unwrap();
143 assert_eq!(slot.state, SlotState::Idle);
144 assert_eq!(slot.restart_count, 1);
145 }
146
147 #[tokio::test]
148 async fn check_skips_slots_at_restart_limit() {
149 let config = PoolConfig {
150 max_restarts: 2,
151 ..Default::default()
152 };
153 let pool = Pool::builder(mock_claude())
154 .slots(1)
155 .config(config)
156 .build()
157 .await
158 .unwrap();
159
160 let mut slot = pool
162 .store()
163 .get_slot(&SlotId("slot-0".into()))
164 .await
165 .unwrap()
166 .unwrap();
167 slot.state = SlotState::Errored;
168 slot.restart_count = 2;
169 pool.store().put_slot(slot).await.unwrap();
170
171 let restarted = check_and_restart_slots(&pool).await;
172 assert_eq!(restarted, 0);
173
174 let slot = pool
176 .store()
177 .get_slot(&SlotId("slot-0".into()))
178 .await
179 .unwrap()
180 .unwrap();
181 assert_eq!(slot.state, SlotState::Errored);
182 }
183
184 #[tokio::test]
185 async fn check_ignores_idle_and_busy_slots() {
186 let pool = Pool::builder(mock_claude()).slots(2).build().await.unwrap();
187
188 let mut slot = pool
190 .store()
191 .get_slot(&SlotId("slot-1".into()))
192 .await
193 .unwrap()
194 .unwrap();
195 slot.state = SlotState::Busy;
196 pool.store().put_slot(slot).await.unwrap();
197
198 let restarted = check_and_restart_slots(&pool).await;
199 assert_eq!(restarted, 0);
200 }
201
202 #[tokio::test]
203 async fn start_supervisor_returns_none_when_disabled() {
204 let pool = Pool::builder(mock_claude()).slots(1).build().await.unwrap();
205 assert!(pool.start_supervisor().is_none());
206 }
207
208 #[tokio::test]
209 async fn start_supervisor_returns_handle_when_enabled() {
210 let config = PoolConfig {
211 supervisor_enabled: true,
212 supervisor_interval_secs: 1,
213 ..Default::default()
214 };
215 let pool = Pool::builder(mock_claude())
216 .slots(1)
217 .config(config)
218 .build()
219 .await
220 .unwrap();
221
222 let handle = pool.start_supervisor().expect("should return handle");
223 handle.stop().await;
224 }
225}