1use std::fmt;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::watch;
23
24pub async fn wait_all_ready(
39 services: &[ServiceHandle],
40 timeout: Duration,
41) -> Result<(), Vec<String>> {
42 let futures: Vec<_> = services.iter().map(|svc| svc.wait_ready(timeout)).collect();
43
44 let results = futures::future::join_all(futures).await;
45
46 let errors: Vec<String> = results.into_iter().filter_map(|r| r.err()).collect();
47
48 if errors.is_empty() {
49 Ok(())
50 } else {
51 Err(errors)
52 }
53}
54
55#[derive(Clone, Debug)]
57pub struct RetryConfig {
58 pub max_attempts: u32,
60 pub initial_delay: Duration,
62 pub max_delay: Duration,
64}
65
66impl Default for RetryConfig {
67 fn default() -> Self {
68 Self {
69 max_attempts: 3,
70 initial_delay: Duration::from_secs(1),
71 max_delay: Duration::from_secs(30),
72 }
73 }
74}
75
76pub fn spawn_with_retry<F, Fut>(
103 handle: ServiceHandle,
104 config: RetryConfig,
105 init_fn: F,
106) -> tokio::task::JoinHandle<()>
107where
108 F: Fn() -> Fut + Send + 'static,
109 Fut: std::future::Future<Output = Result<(), String>> + Send,
110{
111 tokio::spawn(async move {
112 let mut delay = config.initial_delay;
113
114 for attempt in 1..=config.max_attempts {
115 handle.set_state(ServiceState::Starting);
116
117 match init_fn().await {
118 Ok(()) => {
119 handle.set_state(ServiceState::Ready);
120 return;
121 }
122 Err(e) => {
123 if attempt == config.max_attempts {
124 log::error!(
125 "Service '{}' failed after {} attempts: {e}",
126 handle.name(),
127 config.max_attempts
128 );
129 handle.set_state(ServiceState::Failed(e));
130 return;
131 }
132 log::warn!(
133 "Service '{}' attempt {}/{} failed: {e} — retrying in {delay:?}",
134 handle.name(),
135 attempt,
136 config.max_attempts
137 );
138 tokio::time::sleep(delay).await;
139 delay = (delay * 2).min(config.max_delay);
140 }
141 }
142 }
143 })
144}
145
146#[derive(Clone, Debug, PartialEq)]
152pub enum ServiceState {
153 Stopped,
155 Starting,
157 Ready,
159 Degraded(String),
161 Stopping,
163 Failed(String),
165}
166
167impl ServiceState {
168 pub fn is_ready(&self) -> bool {
170 matches!(self, Self::Ready)
171 }
172
173 pub fn is_available(&self) -> bool {
175 matches!(self, Self::Ready | Self::Degraded(_))
176 }
177
178 pub fn is_terminal(&self) -> bool {
180 matches!(self, Self::Stopped | Self::Failed(_))
181 }
182}
183
184impl fmt::Display for ServiceState {
185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186 match self {
187 Self::Stopped => write!(f, "stopped"),
188 Self::Starting => write!(f, "starting"),
189 Self::Ready => write!(f, "ready"),
190 Self::Degraded(reason) => write!(f, "degraded: {reason}"),
191 Self::Stopping => write!(f, "stopping"),
192 Self::Failed(reason) => write!(f, "failed: {reason}"),
193 }
194 }
195}
196
197#[derive(Clone, Debug)]
203pub struct Transition {
204 pub state: ServiceState,
206 pub elapsed: Duration,
208}
209
210#[derive(Clone)]
216pub struct ServiceHandle {
217 inner: Arc<ServiceHandleInner>,
218}
219
220struct ServiceHandleInner {
221 name: String,
222 tx: watch::Sender<ServiceState>,
223 started_at: Instant,
224 transitions: std::sync::Mutex<Vec<Transition>>,
225}
226
227impl ServiceHandle {
228 pub fn new(name: impl Into<String>) -> Self {
232 let (tx, _rx) = watch::channel(ServiceState::Stopped);
233 Self {
234 inner: Arc::new(ServiceHandleInner {
235 name: name.into(),
236 tx,
237 started_at: Instant::now(),
238 transitions: std::sync::Mutex::new(vec![Transition {
239 state: ServiceState::Stopped,
240 elapsed: Duration::ZERO,
241 }]),
242 }),
243 }
244 }
245
246 pub fn name(&self) -> &str {
248 &self.inner.name
249 }
250
251 pub fn state(&self) -> ServiceState {
253 self.inner.tx.borrow().clone()
254 }
255
256 pub fn set_state(&self, state: ServiceState) {
261 log::info!("Service '{}' → {state}", self.inner.name);
262 if let Ok(mut log) = self.inner.transitions.lock() {
263 log.push(Transition {
264 state: state.clone(),
265 elapsed: self.inner.started_at.elapsed(),
266 });
267 }
268 self.inner.tx.send_replace(state);
269 }
270
271 pub fn transitions(&self) -> Vec<Transition> {
277 self.inner
278 .transitions
279 .lock()
280 .map(|log| log.clone())
281 .unwrap_or_default()
282 }
283
284 pub fn subscribe(&self) -> watch::Receiver<ServiceState> {
286 self.inner.tx.subscribe()
287 }
288
289 pub async fn wait_ready(&self, timeout: Duration) -> Result<(), String> {
291 let mut rx = self.subscribe();
292 let deadline = tokio::time::sleep(timeout);
293 tokio::pin!(deadline);
294
295 {
297 let state = rx.borrow_and_update().clone();
298 match state {
299 ServiceState::Ready => return Ok(()),
300 ServiceState::Failed(reason) => {
301 return Err(format!("Service '{}' failed: {reason}", self.inner.name));
302 }
303 _ => {}
304 }
305 }
306
307 loop {
308 tokio::select! {
309 _ = &mut deadline => {
310 return Err(format!(
311 "Service '{}' not ready after {timeout:?} (state: {})",
312 self.inner.name, self.state()
313 ));
314 }
315 result = rx.changed() => {
316 if result.is_err() {
317 return Err(format!("Service '{}' channel closed", self.inner.name));
318 }
319 let state = rx.borrow().clone();
320 match state {
321 ServiceState::Ready => return Ok(()),
322 ServiceState::Failed(reason) => {
323 return Err(format!(
324 "Service '{}' failed: {reason}",
325 self.inner.name
326 ));
327 }
328 _ => continue,
329 }
330 }
331 }
332 }
333 }
334
335 pub fn elapsed(&self) -> Duration {
337 self.inner.started_at.elapsed()
338 }
339}
340
341impl fmt::Debug for ServiceHandle {
342 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343 f.debug_struct("ServiceHandle")
344 .field("name", &self.inner.name)
345 .field("state", &self.state())
346 .finish()
347 }
348}
349
350#[cfg(test)]
355mod tests {
356 use super::*;
357
358 #[test]
359 fn test_service_state_display() {
360 assert_eq!(ServiceState::Stopped.to_string(), "stopped");
361 assert_eq!(ServiceState::Starting.to_string(), "starting");
362 assert_eq!(ServiceState::Ready.to_string(), "ready");
363 assert_eq!(
364 ServiceState::Degraded("low memory".to_string()).to_string(),
365 "degraded: low memory"
366 );
367 assert_eq!(ServiceState::Stopping.to_string(), "stopping");
368 assert_eq!(
369 ServiceState::Failed("crash".to_string()).to_string(),
370 "failed: crash"
371 );
372 }
373
374 #[test]
375 fn test_service_state_predicates() {
376 assert!(ServiceState::Ready.is_ready());
377 assert!(!ServiceState::Starting.is_ready());
378 assert!(!ServiceState::Degraded("x".into()).is_ready());
379
380 assert!(ServiceState::Ready.is_available());
381 assert!(ServiceState::Degraded("x".into()).is_available());
382 assert!(!ServiceState::Starting.is_available());
383 assert!(!ServiceState::Stopped.is_available());
384
385 assert!(ServiceState::Stopped.is_terminal());
386 assert!(ServiceState::Failed("x".into()).is_terminal());
387 assert!(!ServiceState::Ready.is_terminal());
388 assert!(!ServiceState::Starting.is_terminal());
389 }
390
391 #[test]
392 fn test_service_handle_initial_state() {
393 let handle = ServiceHandle::new("test");
394 assert_eq!(handle.name(), "test");
395 assert_eq!(handle.state(), ServiceState::Stopped);
396 }
397
398 #[test]
399 fn test_service_handle_state_transitions() {
400 let handle = ServiceHandle::new("test");
401
402 handle.set_state(ServiceState::Starting);
403 assert_eq!(handle.state(), ServiceState::Starting);
404
405 handle.set_state(ServiceState::Ready);
406 assert_eq!(handle.state(), ServiceState::Ready);
407
408 handle.set_state(ServiceState::Stopping);
409 assert_eq!(handle.state(), ServiceState::Stopping);
410
411 handle.set_state(ServiceState::Stopped);
412 assert_eq!(handle.state(), ServiceState::Stopped);
413 }
414
415 #[test]
416 fn test_service_handle_clone_shares_state() {
417 let handle1 = ServiceHandle::new("shared");
418 let handle2 = handle1.clone();
419
420 handle1.set_state(ServiceState::Ready);
421 assert_eq!(handle2.state(), ServiceState::Ready);
422
423 handle2.set_state(ServiceState::Stopping);
424 assert_eq!(handle1.state(), ServiceState::Stopping);
425 }
426
427 #[test]
428 fn test_service_handle_subscribe() {
429 let handle = ServiceHandle::new("test");
430 let mut rx = handle.subscribe();
431
432 assert_eq!(*rx.borrow(), ServiceState::Stopped);
434
435 handle.set_state(ServiceState::Starting);
436 assert_eq!(*rx.borrow_and_update(), ServiceState::Starting);
438 }
439
440 #[tokio::test]
441 async fn test_service_handle_wait_ready_success() {
442 let handle = ServiceHandle::new("test");
443 let h = handle.clone();
444
445 tokio::spawn(async move {
446 tokio::time::sleep(Duration::from_millis(10)).await;
447 h.set_state(ServiceState::Starting);
448 tokio::time::sleep(Duration::from_millis(10)).await;
449 h.set_state(ServiceState::Ready);
450 });
451
452 let result = handle.wait_ready(Duration::from_secs(1)).await;
453 assert!(result.is_ok());
454 }
455
456 #[tokio::test]
457 async fn test_service_handle_wait_ready_timeout() {
458 let handle = ServiceHandle::new("slow");
459 handle.set_state(ServiceState::Starting);
460
461 let result = handle.wait_ready(Duration::from_millis(50)).await;
462 assert!(result.is_err());
463 assert!(result.unwrap_err().contains("not ready after"));
464 }
465
466 #[tokio::test]
467 async fn test_service_handle_wait_ready_failed() {
468 let handle = ServiceHandle::new("broken");
469 let h = handle.clone();
470
471 tokio::spawn(async move {
472 tokio::time::sleep(Duration::from_millis(10)).await;
473 h.set_state(ServiceState::Failed("out of memory".to_string()));
474 });
475
476 let result = handle.wait_ready(Duration::from_secs(1)).await;
477 assert!(result.is_err());
478 let err = result.unwrap_err();
479 assert!(err.contains("failed"));
480 assert!(err.contains("out of memory"));
481 }
482
483 #[tokio::test]
484 async fn test_service_handle_wait_ready_already_ready() {
485 let handle = ServiceHandle::new("instant");
486 handle.set_state(ServiceState::Ready);
487
488 let result = handle.wait_ready(Duration::from_millis(50)).await;
489 assert!(result.is_ok());
490 }
491
492 #[tokio::test]
493 async fn test_service_handle_wait_ready_already_failed() {
494 let handle = ServiceHandle::new("instant-fail");
495 handle.set_state(ServiceState::Failed("boom".to_string()));
496
497 let result = handle.wait_ready(Duration::from_millis(50)).await;
498 assert!(result.is_err());
499 assert!(result.unwrap_err().contains("boom"));
500 }
501
502 #[test]
503 fn test_service_handle_elapsed() {
504 let handle = ServiceHandle::new("test");
505 std::thread::sleep(Duration::from_millis(10));
506 assert!(handle.elapsed() >= Duration::from_millis(10));
507 }
508
509 #[test]
510 fn test_service_handle_debug() {
511 let handle = ServiceHandle::new("debug-test");
512 let debug = format!("{:?}", handle);
513 assert!(debug.contains("debug-test"));
514 assert!(debug.contains("ServiceHandle"));
515 }
516
517 fn _assert_send_sync<T: Send + Sync>() {}
519 #[test]
520 fn test_service_handle_send_sync() {
521 _assert_send_sync::<ServiceHandle>();
522 _assert_send_sync::<ServiceState>();
523 }
524
525 #[tokio::test]
528 async fn test_wait_all_ready_all_already_ready() {
529 let a = ServiceHandle::new("a");
530 let b = ServiceHandle::new("b");
531 a.set_state(ServiceState::Ready);
532 b.set_state(ServiceState::Ready);
533
534 let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
535 assert!(result.is_ok());
536 }
537
538 #[tokio::test]
539 async fn test_wait_all_ready_parallel_startup() {
540 let a = ServiceHandle::new("a");
541 let b = ServiceHandle::new("b");
542
543 let ha = a.clone();
544 let hb = b.clone();
545 tokio::spawn(async move {
546 tokio::time::sleep(Duration::from_millis(10)).await;
547 ha.set_state(ServiceState::Ready);
548 });
549 tokio::spawn(async move {
550 tokio::time::sleep(Duration::from_millis(20)).await;
551 hb.set_state(ServiceState::Ready);
552 });
553
554 let result = wait_all_ready(&[a, b], Duration::from_secs(1)).await;
555 assert!(result.is_ok());
556 }
557
558 #[tokio::test]
559 async fn test_wait_all_ready_one_fails() {
560 let a = ServiceHandle::new("ok");
561 let b = ServiceHandle::new("broken");
562 a.set_state(ServiceState::Ready);
563 b.set_state(ServiceState::Failed("boom".into()));
564
565 let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
566 assert!(result.is_err());
567 let errors = result.unwrap_err();
568 assert_eq!(errors.len(), 1);
569 assert!(errors[0].contains("boom"));
570 }
571
572 #[tokio::test]
573 async fn test_wait_all_ready_one_timeout() {
574 let a = ServiceHandle::new("ok");
575 let b = ServiceHandle::new("slow");
576 a.set_state(ServiceState::Ready);
577 b.set_state(ServiceState::Starting);
578
579 let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
580 assert!(result.is_err());
581 let errors = result.unwrap_err();
582 assert_eq!(errors.len(), 1);
583 assert!(errors[0].contains("not ready after"));
584 }
585
586 #[tokio::test]
587 async fn test_wait_all_ready_empty_services() {
588 let result = wait_all_ready(&[], Duration::from_millis(50)).await;
589 assert!(result.is_ok());
590 }
591
592 #[tokio::test]
595 async fn test_spawn_with_retry_succeeds_first_attempt() {
596 let handle = ServiceHandle::new("ok");
597 let config = RetryConfig {
598 max_attempts: 3,
599 initial_delay: Duration::from_millis(1),
600 max_delay: Duration::from_millis(10),
601 };
602
603 let jh = spawn_with_retry(handle.clone(), config, || async { Ok(()) });
604 jh.await.unwrap();
605
606 assert_eq!(handle.state(), ServiceState::Ready);
607 }
608
609 #[tokio::test]
610 async fn test_spawn_with_retry_succeeds_after_failures() {
611 let handle = ServiceHandle::new("flaky");
612 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
613 let counter_clone = counter.clone();
614
615 let config = RetryConfig {
616 max_attempts: 3,
617 initial_delay: Duration::from_millis(1),
618 max_delay: Duration::from_millis(10),
619 };
620
621 let jh = spawn_with_retry(handle.clone(), config, move || {
622 let c = counter_clone.clone();
623 async move {
624 let attempt = c.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
625 if attempt < 3 {
626 Err(format!("attempt {attempt} failed"))
627 } else {
628 Ok(())
629 }
630 }
631 });
632 jh.await.unwrap();
633
634 assert_eq!(handle.state(), ServiceState::Ready);
635 assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 3);
636 }
637
638 #[tokio::test]
639 async fn test_spawn_with_retry_exhausts_attempts() {
640 let handle = ServiceHandle::new("broken");
641 let config = RetryConfig {
642 max_attempts: 2,
643 initial_delay: Duration::from_millis(1),
644 max_delay: Duration::from_millis(10),
645 };
646
647 let jh = spawn_with_retry(handle.clone(), config, || async {
648 Err("still broken".to_string())
649 });
650 jh.await.unwrap();
651
652 assert!(
653 matches!(handle.state(), ServiceState::Failed(msg) if msg.contains("still broken"))
654 );
655 }
656
657 #[test]
658 fn test_retry_config_default() {
659 let config = RetryConfig::default();
660 assert_eq!(config.max_attempts, 3);
661 assert_eq!(config.initial_delay, Duration::from_secs(1));
662 assert_eq!(config.max_delay, Duration::from_secs(30));
663 }
664
665 #[test]
668 fn test_transitions_initial_state_recorded() {
669 let handle = ServiceHandle::new("test");
670 let transitions = handle.transitions();
671 assert_eq!(transitions.len(), 1);
672 assert_eq!(transitions[0].state, ServiceState::Stopped);
673 assert_eq!(transitions[0].elapsed, Duration::ZERO);
674 }
675
676 #[test]
677 fn test_transitions_records_all_changes() {
678 let handle = ServiceHandle::new("test");
679 handle.set_state(ServiceState::Starting);
680 handle.set_state(ServiceState::Ready);
681
682 let transitions = handle.transitions();
683 assert_eq!(transitions.len(), 3);
684 assert_eq!(transitions[0].state, ServiceState::Stopped);
685 assert_eq!(transitions[1].state, ServiceState::Starting);
686 assert_eq!(transitions[2].state, ServiceState::Ready);
687 }
688
689 #[test]
690 fn test_transitions_timestamps_monotonic() {
691 let handle = ServiceHandle::new("test");
692 std::thread::sleep(Duration::from_millis(5));
693 handle.set_state(ServiceState::Starting);
694 std::thread::sleep(Duration::from_millis(5));
695 handle.set_state(ServiceState::Ready);
696
697 let transitions = handle.transitions();
698 for window in transitions.windows(2) {
699 assert!(
700 window[1].elapsed >= window[0].elapsed,
701 "timestamps should be monotonically increasing"
702 );
703 }
704 assert!(transitions[2].elapsed >= Duration::from_millis(10));
706 }
707
708 #[test]
709 fn test_transitions_cloned_handle_shares_log() {
710 let h1 = ServiceHandle::new("shared");
711 let h2 = h1.clone();
712
713 h1.set_state(ServiceState::Starting);
714 h2.set_state(ServiceState::Ready);
715
716 assert_eq!(h1.transitions().len(), 3);
718 assert_eq!(h2.transitions().len(), 3);
719 }
720}