1use std::fmt;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::watch;
23
24#[derive(Clone, Debug, PartialEq)]
30pub enum ServiceState {
31 Stopped,
33 Starting,
35 Ready,
37 Degraded(String),
39 Stopping,
41 Failed(String),
43}
44
45impl ServiceState {
46 pub fn is_ready(&self) -> bool {
48 matches!(self, Self::Ready)
49 }
50
51 pub fn is_available(&self) -> bool {
53 matches!(self, Self::Ready | Self::Degraded(_))
54 }
55
56 pub fn is_terminal(&self) -> bool {
58 matches!(self, Self::Stopped | Self::Failed(_))
59 }
60}
61
62impl fmt::Display for ServiceState {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 match self {
65 Self::Stopped => write!(f, "stopped"),
66 Self::Starting => write!(f, "starting"),
67 Self::Ready => write!(f, "ready"),
68 Self::Degraded(reason) => write!(f, "degraded: {reason}"),
69 Self::Stopping => write!(f, "stopping"),
70 Self::Failed(reason) => write!(f, "failed: {reason}"),
71 }
72 }
73}
74
75#[derive(Clone)]
84pub struct ServiceHandle {
85 inner: Arc<ServiceHandleInner>,
86}
87
88struct ServiceHandleInner {
89 name: String,
90 tx: watch::Sender<ServiceState>,
91 started_at: Instant,
92}
93
94impl ServiceHandle {
95 pub fn new(name: impl Into<String>) -> Self {
99 let (tx, _rx) = watch::channel(ServiceState::Stopped);
100 Self {
101 inner: Arc::new(ServiceHandleInner {
102 name: name.into(),
103 tx,
104 started_at: Instant::now(),
105 }),
106 }
107 }
108
109 pub fn name(&self) -> &str {
111 &self.inner.name
112 }
113
114 pub fn state(&self) -> ServiceState {
116 self.inner.tx.borrow().clone()
117 }
118
119 pub fn set_state(&self, state: ServiceState) {
123 log::info!("Service '{}' → {state}", self.inner.name);
124 self.inner.tx.send_replace(state);
125 }
126
127 pub fn subscribe(&self) -> watch::Receiver<ServiceState> {
129 self.inner.tx.subscribe()
130 }
131
132 pub async fn wait_ready(&self, timeout: Duration) -> Result<(), String> {
134 let mut rx = self.subscribe();
135 let deadline = tokio::time::sleep(timeout);
136 tokio::pin!(deadline);
137
138 {
140 let state = rx.borrow_and_update().clone();
141 match state {
142 ServiceState::Ready => return Ok(()),
143 ServiceState::Failed(reason) => {
144 return Err(format!("Service '{}' failed: {reason}", self.inner.name));
145 }
146 _ => {}
147 }
148 }
149
150 loop {
151 tokio::select! {
152 _ = &mut deadline => {
153 return Err(format!(
154 "Service '{}' not ready after {timeout:?} (state: {})",
155 self.inner.name, self.state()
156 ));
157 }
158 result = rx.changed() => {
159 if result.is_err() {
160 return Err(format!("Service '{}' channel closed", self.inner.name));
161 }
162 let state = rx.borrow().clone();
163 match state {
164 ServiceState::Ready => return Ok(()),
165 ServiceState::Failed(reason) => {
166 return Err(format!(
167 "Service '{}' failed: {reason}",
168 self.inner.name
169 ));
170 }
171 _ => continue,
172 }
173 }
174 }
175 }
176 }
177
178 pub fn elapsed(&self) -> Duration {
180 self.inner.started_at.elapsed()
181 }
182}
183
184impl fmt::Debug for ServiceHandle {
185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186 f.debug_struct("ServiceHandle")
187 .field("name", &self.inner.name)
188 .field("state", &self.state())
189 .finish()
190 }
191}
192
193#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[test]
202 fn test_service_state_display() {
203 assert_eq!(ServiceState::Stopped.to_string(), "stopped");
204 assert_eq!(ServiceState::Starting.to_string(), "starting");
205 assert_eq!(ServiceState::Ready.to_string(), "ready");
206 assert_eq!(
207 ServiceState::Degraded("low memory".to_string()).to_string(),
208 "degraded: low memory"
209 );
210 assert_eq!(ServiceState::Stopping.to_string(), "stopping");
211 assert_eq!(
212 ServiceState::Failed("crash".to_string()).to_string(),
213 "failed: crash"
214 );
215 }
216
217 #[test]
218 fn test_service_state_predicates() {
219 assert!(ServiceState::Ready.is_ready());
220 assert!(!ServiceState::Starting.is_ready());
221 assert!(!ServiceState::Degraded("x".into()).is_ready());
222
223 assert!(ServiceState::Ready.is_available());
224 assert!(ServiceState::Degraded("x".into()).is_available());
225 assert!(!ServiceState::Starting.is_available());
226 assert!(!ServiceState::Stopped.is_available());
227
228 assert!(ServiceState::Stopped.is_terminal());
229 assert!(ServiceState::Failed("x".into()).is_terminal());
230 assert!(!ServiceState::Ready.is_terminal());
231 assert!(!ServiceState::Starting.is_terminal());
232 }
233
234 #[test]
235 fn test_service_handle_initial_state() {
236 let handle = ServiceHandle::new("test");
237 assert_eq!(handle.name(), "test");
238 assert_eq!(handle.state(), ServiceState::Stopped);
239 }
240
241 #[test]
242 fn test_service_handle_state_transitions() {
243 let handle = ServiceHandle::new("test");
244
245 handle.set_state(ServiceState::Starting);
246 assert_eq!(handle.state(), ServiceState::Starting);
247
248 handle.set_state(ServiceState::Ready);
249 assert_eq!(handle.state(), ServiceState::Ready);
250
251 handle.set_state(ServiceState::Stopping);
252 assert_eq!(handle.state(), ServiceState::Stopping);
253
254 handle.set_state(ServiceState::Stopped);
255 assert_eq!(handle.state(), ServiceState::Stopped);
256 }
257
258 #[test]
259 fn test_service_handle_clone_shares_state() {
260 let handle1 = ServiceHandle::new("shared");
261 let handle2 = handle1.clone();
262
263 handle1.set_state(ServiceState::Ready);
264 assert_eq!(handle2.state(), ServiceState::Ready);
265
266 handle2.set_state(ServiceState::Stopping);
267 assert_eq!(handle1.state(), ServiceState::Stopping);
268 }
269
270 #[test]
271 fn test_service_handle_subscribe() {
272 let handle = ServiceHandle::new("test");
273 let mut rx = handle.subscribe();
274
275 assert_eq!(*rx.borrow(), ServiceState::Stopped);
277
278 handle.set_state(ServiceState::Starting);
279 assert_eq!(*rx.borrow_and_update(), ServiceState::Starting);
281 }
282
283 #[tokio::test]
284 async fn test_service_handle_wait_ready_success() {
285 let handle = ServiceHandle::new("test");
286 let h = handle.clone();
287
288 tokio::spawn(async move {
289 tokio::time::sleep(Duration::from_millis(10)).await;
290 h.set_state(ServiceState::Starting);
291 tokio::time::sleep(Duration::from_millis(10)).await;
292 h.set_state(ServiceState::Ready);
293 });
294
295 let result = handle.wait_ready(Duration::from_secs(1)).await;
296 assert!(result.is_ok());
297 }
298
299 #[tokio::test]
300 async fn test_service_handle_wait_ready_timeout() {
301 let handle = ServiceHandle::new("slow");
302 handle.set_state(ServiceState::Starting);
303
304 let result = handle.wait_ready(Duration::from_millis(50)).await;
305 assert!(result.is_err());
306 assert!(result.unwrap_err().contains("not ready after"));
307 }
308
309 #[tokio::test]
310 async fn test_service_handle_wait_ready_failed() {
311 let handle = ServiceHandle::new("broken");
312 let h = handle.clone();
313
314 tokio::spawn(async move {
315 tokio::time::sleep(Duration::from_millis(10)).await;
316 h.set_state(ServiceState::Failed("out of memory".to_string()));
317 });
318
319 let result = handle.wait_ready(Duration::from_secs(1)).await;
320 assert!(result.is_err());
321 let err = result.unwrap_err();
322 assert!(err.contains("failed"));
323 assert!(err.contains("out of memory"));
324 }
325
326 #[tokio::test]
327 async fn test_service_handle_wait_ready_already_ready() {
328 let handle = ServiceHandle::new("instant");
329 handle.set_state(ServiceState::Ready);
330
331 let result = handle.wait_ready(Duration::from_millis(50)).await;
332 assert!(result.is_ok());
333 }
334
335 #[tokio::test]
336 async fn test_service_handle_wait_ready_already_failed() {
337 let handle = ServiceHandle::new("instant-fail");
338 handle.set_state(ServiceState::Failed("boom".to_string()));
339
340 let result = handle.wait_ready(Duration::from_millis(50)).await;
341 assert!(result.is_err());
342 assert!(result.unwrap_err().contains("boom"));
343 }
344
345 #[test]
346 fn test_service_handle_elapsed() {
347 let handle = ServiceHandle::new("test");
348 std::thread::sleep(Duration::from_millis(10));
349 assert!(handle.elapsed() >= Duration::from_millis(10));
350 }
351
352 #[test]
353 fn test_service_handle_debug() {
354 let handle = ServiceHandle::new("debug-test");
355 let debug = format!("{:?}", handle);
356 assert!(debug.contains("debug-test"));
357 assert!(debug.contains("ServiceHandle"));
358 }
359
360 fn _assert_send_sync<T: Send + Sync>() {}
362 #[test]
363 fn test_service_handle_send_sync() {
364 _assert_send_sync::<ServiceHandle>();
365 _assert_send_sync::<ServiceState>();
366 }
367}