1use std::fmt;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::watch;
23
24pub async fn wait_all_ready(
39 services: &[ServiceHandle],
40 timeout: Duration,
41) -> Result<(), Vec<String>> {
42 let futures: Vec<_> = services.iter().map(|svc| svc.wait_ready(timeout)).collect();
43
44 let results = futures::future::join_all(futures).await;
45
46 let errors: Vec<String> = results.into_iter().filter_map(|r| r.err()).collect();
47
48 if errors.is_empty() {
49 Ok(())
50 } else {
51 Err(errors)
52 }
53}
54
55#[derive(Clone, Debug)]
57pub struct RetryConfig {
58 pub max_attempts: u32,
60 pub initial_delay: Duration,
62 pub max_delay: Duration,
64}
65
66impl Default for RetryConfig {
67 fn default() -> Self {
68 Self {
69 max_attempts: 3,
70 initial_delay: Duration::from_secs(1),
71 max_delay: Duration::from_secs(30),
72 }
73 }
74}
75
76pub fn spawn_with_retry<F, Fut>(
103 handle: ServiceHandle,
104 config: RetryConfig,
105 init_fn: F,
106) -> tokio::task::JoinHandle<()>
107where
108 F: Fn() -> Fut + Send + 'static,
109 Fut: std::future::Future<Output = Result<(), String>> + Send,
110{
111 tokio::spawn(async move {
112 let mut delay = config.initial_delay;
113
114 for attempt in 1..=config.max_attempts {
115 handle.set_state(ServiceState::Starting);
116
117 match init_fn().await {
118 Ok(()) => {
119 handle.set_state(ServiceState::Ready);
120 return;
121 }
122 Err(e) => {
123 if attempt == config.max_attempts {
124 log::error!(
125 "Service '{}' failed after {} attempts: {e}",
126 handle.name(),
127 config.max_attempts
128 );
129 handle.set_state(ServiceState::Failed(e));
130 return;
131 }
132 log::warn!(
133 "Service '{}' attempt {}/{} failed: {e} — retrying in {delay:?}",
134 handle.name(),
135 attempt,
136 config.max_attempts
137 );
138 tokio::time::sleep(delay).await;
139 delay = (delay * 2).min(config.max_delay);
140 }
141 }
142 }
143 })
144}
145
146#[derive(Clone, Debug, PartialEq)]
152pub enum ServiceState {
153 Stopped,
155 Starting,
157 Ready,
159 Degraded(String),
161 Stopping,
163 Failed(String),
165}
166
167impl ServiceState {
168 pub fn is_ready(&self) -> bool {
170 matches!(self, Self::Ready)
171 }
172
173 pub fn is_available(&self) -> bool {
175 matches!(self, Self::Ready | Self::Degraded(_))
176 }
177
178 pub fn is_terminal(&self) -> bool {
180 matches!(self, Self::Stopped | Self::Failed(_))
181 }
182}
183
184impl fmt::Display for ServiceState {
185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186 match self {
187 Self::Stopped => write!(f, "stopped"),
188 Self::Starting => write!(f, "starting"),
189 Self::Ready => write!(f, "ready"),
190 Self::Degraded(reason) => write!(f, "degraded: {reason}"),
191 Self::Stopping => write!(f, "stopping"),
192 Self::Failed(reason) => write!(f, "failed: {reason}"),
193 }
194 }
195}
196
197#[derive(Clone, Debug)]
203pub struct Transition {
204 pub state: ServiceState,
206 pub elapsed: Duration,
208}
209
210#[derive(Clone)]
216pub struct ServiceHandle {
217 inner: Arc<ServiceHandleInner>,
218}
219
220struct ServiceHandleInner {
221 name: String,
222 tx: watch::Sender<ServiceState>,
223 started_at: Instant,
224 transitions: std::sync::Mutex<Vec<Transition>>,
225}
226
227impl ServiceHandle {
228 pub fn new(name: impl Into<String>) -> Self {
232 let (tx, _rx) = watch::channel(ServiceState::Stopped);
233 Self {
234 inner: Arc::new(ServiceHandleInner {
235 name: name.into(),
236 tx,
237 started_at: Instant::now(),
238 transitions: std::sync::Mutex::new(vec![Transition {
239 state: ServiceState::Stopped,
240 elapsed: Duration::ZERO,
241 }]),
242 }),
243 }
244 }
245
246 pub fn name(&self) -> &str {
248 &self.inner.name
249 }
250
251 pub fn state(&self) -> ServiceState {
253 self.inner.tx.borrow().clone()
254 }
255
256 pub fn set_state(&self, state: ServiceState) {
261 match &state {
262 ServiceState::Failed(_) => {
263 log::error!("Service '{}' → {state}", self.inner.name);
264 }
265 ServiceState::Degraded(_) => {
266 log::warn!("Service '{}' → {state}", self.inner.name);
267 }
268 _ => {
269 log::info!("Service '{}' → {state}", self.inner.name);
270 }
271 }
272 if let Ok(mut log) = self.inner.transitions.lock() {
273 log.push(Transition {
274 state: state.clone(),
275 elapsed: self.inner.started_at.elapsed(),
276 });
277 }
278 self.inner.tx.send_replace(state);
279 }
280
281 pub fn transitions(&self) -> Vec<Transition> {
287 self.inner
288 .transitions
289 .lock()
290 .map(|log| log.clone())
291 .unwrap_or_default()
292 }
293
294 pub fn subscribe(&self) -> watch::Receiver<ServiceState> {
296 self.inner.tx.subscribe()
297 }
298
299 pub async fn wait_ready(&self, timeout: Duration) -> Result<(), String> {
301 let mut rx = self.subscribe();
302 let deadline = tokio::time::sleep(timeout);
303 tokio::pin!(deadline);
304
305 {
307 let state = rx.borrow_and_update().clone();
308 match state {
309 ServiceState::Ready => return Ok(()),
310 ServiceState::Failed(reason) => {
311 return Err(format!("Service '{}' failed: {reason}", self.inner.name));
312 }
313 _ => {}
314 }
315 }
316
317 loop {
318 tokio::select! {
319 _ = &mut deadline => {
320 return Err(format!(
321 "Service '{}' not ready after {timeout:?} (state: {})",
322 self.inner.name, self.state()
323 ));
324 }
325 result = rx.changed() => {
326 if result.is_err() {
327 return Err(format!("Service '{}' channel closed", self.inner.name));
328 }
329 let state = rx.borrow().clone();
330 match state {
331 ServiceState::Ready => return Ok(()),
332 ServiceState::Failed(reason) => {
333 return Err(format!(
334 "Service '{}' failed: {reason}",
335 self.inner.name
336 ));
337 }
338 _ => continue,
339 }
340 }
341 }
342 }
343 }
344
345 pub fn elapsed(&self) -> Duration {
347 self.inner.started_at.elapsed()
348 }
349}
350
351impl fmt::Debug for ServiceHandle {
352 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353 f.debug_struct("ServiceHandle")
354 .field("name", &self.inner.name)
355 .field("state", &self.state())
356 .finish()
357 }
358}
359
360#[cfg(test)]
365mod tests {
366 use super::*;
367
368 #[test]
369 fn test_service_state_display() {
370 assert_eq!(ServiceState::Stopped.to_string(), "stopped");
371 assert_eq!(ServiceState::Starting.to_string(), "starting");
372 assert_eq!(ServiceState::Ready.to_string(), "ready");
373 assert_eq!(
374 ServiceState::Degraded("low memory".to_string()).to_string(),
375 "degraded: low memory"
376 );
377 assert_eq!(ServiceState::Stopping.to_string(), "stopping");
378 assert_eq!(
379 ServiceState::Failed("crash".to_string()).to_string(),
380 "failed: crash"
381 );
382 }
383
384 #[test]
385 fn test_service_state_predicates() {
386 assert!(ServiceState::Ready.is_ready());
387 assert!(!ServiceState::Starting.is_ready());
388 assert!(!ServiceState::Degraded("x".into()).is_ready());
389
390 assert!(ServiceState::Ready.is_available());
391 assert!(ServiceState::Degraded("x".into()).is_available());
392 assert!(!ServiceState::Starting.is_available());
393 assert!(!ServiceState::Stopped.is_available());
394
395 assert!(ServiceState::Stopped.is_terminal());
396 assert!(ServiceState::Failed("x".into()).is_terminal());
397 assert!(!ServiceState::Ready.is_terminal());
398 assert!(!ServiceState::Starting.is_terminal());
399 }
400
401 #[test]
402 fn test_service_handle_initial_state() {
403 let handle = ServiceHandle::new("test");
404 assert_eq!(handle.name(), "test");
405 assert_eq!(handle.state(), ServiceState::Stopped);
406 }
407
408 #[test]
409 fn test_service_handle_state_transitions() {
410 let handle = ServiceHandle::new("test");
411
412 handle.set_state(ServiceState::Starting);
413 assert_eq!(handle.state(), ServiceState::Starting);
414
415 handle.set_state(ServiceState::Ready);
416 assert_eq!(handle.state(), ServiceState::Ready);
417
418 handle.set_state(ServiceState::Stopping);
419 assert_eq!(handle.state(), ServiceState::Stopping);
420
421 handle.set_state(ServiceState::Stopped);
422 assert_eq!(handle.state(), ServiceState::Stopped);
423 }
424
425 #[test]
426 fn test_service_handle_clone_shares_state() {
427 let handle1 = ServiceHandle::new("shared");
428 let handle2 = handle1.clone();
429
430 handle1.set_state(ServiceState::Ready);
431 assert_eq!(handle2.state(), ServiceState::Ready);
432
433 handle2.set_state(ServiceState::Stopping);
434 assert_eq!(handle1.state(), ServiceState::Stopping);
435 }
436
437 #[test]
438 fn test_service_handle_subscribe() {
439 let handle = ServiceHandle::new("test");
440 let mut rx = handle.subscribe();
441
442 assert_eq!(*rx.borrow(), ServiceState::Stopped);
444
445 handle.set_state(ServiceState::Starting);
446 assert_eq!(*rx.borrow_and_update(), ServiceState::Starting);
448 }
449
450 #[tokio::test]
451 async fn test_service_handle_wait_ready_success() {
452 let handle = ServiceHandle::new("test");
453 let h = handle.clone();
454
455 tokio::spawn(async move {
456 tokio::time::sleep(Duration::from_millis(10)).await;
457 h.set_state(ServiceState::Starting);
458 tokio::time::sleep(Duration::from_millis(10)).await;
459 h.set_state(ServiceState::Ready);
460 });
461
462 let result = handle.wait_ready(Duration::from_secs(1)).await;
463 assert!(result.is_ok());
464 }
465
466 #[tokio::test]
467 async fn test_service_handle_wait_ready_timeout() {
468 let handle = ServiceHandle::new("slow");
469 handle.set_state(ServiceState::Starting);
470
471 let result = handle.wait_ready(Duration::from_millis(50)).await;
472 assert!(result.is_err());
473 assert!(result.unwrap_err().contains("not ready after"));
474 }
475
476 #[tokio::test]
477 async fn test_service_handle_wait_ready_failed() {
478 let handle = ServiceHandle::new("broken");
479 let h = handle.clone();
480
481 tokio::spawn(async move {
482 tokio::time::sleep(Duration::from_millis(10)).await;
483 h.set_state(ServiceState::Failed("out of memory".to_string()));
484 });
485
486 let result = handle.wait_ready(Duration::from_secs(1)).await;
487 assert!(result.is_err());
488 let err = result.unwrap_err();
489 assert!(err.contains("failed"));
490 assert!(err.contains("out of memory"));
491 }
492
493 #[tokio::test]
494 async fn test_service_handle_wait_ready_already_ready() {
495 let handle = ServiceHandle::new("instant");
496 handle.set_state(ServiceState::Ready);
497
498 let result = handle.wait_ready(Duration::from_millis(50)).await;
499 assert!(result.is_ok());
500 }
501
502 #[tokio::test]
503 async fn test_service_handle_wait_ready_already_failed() {
504 let handle = ServiceHandle::new("instant-fail");
505 handle.set_state(ServiceState::Failed("boom".to_string()));
506
507 let result = handle.wait_ready(Duration::from_millis(50)).await;
508 assert!(result.is_err());
509 assert!(result.unwrap_err().contains("boom"));
510 }
511
512 #[test]
513 fn test_service_handle_elapsed() {
514 let handle = ServiceHandle::new("test");
515 std::thread::sleep(Duration::from_millis(10));
516 assert!(handle.elapsed() >= Duration::from_millis(10));
517 }
518
519 #[test]
520 fn test_service_handle_debug() {
521 let handle = ServiceHandle::new("debug-test");
522 let debug = format!("{:?}", handle);
523 assert!(debug.contains("debug-test"));
524 assert!(debug.contains("ServiceHandle"));
525 }
526
527 fn _assert_send_sync<T: Send + Sync>() {}
529 #[test]
530 fn test_service_handle_send_sync() {
531 _assert_send_sync::<ServiceHandle>();
532 _assert_send_sync::<ServiceState>();
533 }
534
535 #[tokio::test]
538 async fn test_wait_all_ready_all_already_ready() {
539 let a = ServiceHandle::new("a");
540 let b = ServiceHandle::new("b");
541 a.set_state(ServiceState::Ready);
542 b.set_state(ServiceState::Ready);
543
544 let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
545 assert!(result.is_ok());
546 }
547
548 #[tokio::test]
549 async fn test_wait_all_ready_parallel_startup() {
550 let a = ServiceHandle::new("a");
551 let b = ServiceHandle::new("b");
552
553 let ha = a.clone();
554 let hb = b.clone();
555 tokio::spawn(async move {
556 tokio::time::sleep(Duration::from_millis(10)).await;
557 ha.set_state(ServiceState::Ready);
558 });
559 tokio::spawn(async move {
560 tokio::time::sleep(Duration::from_millis(20)).await;
561 hb.set_state(ServiceState::Ready);
562 });
563
564 let result = wait_all_ready(&[a, b], Duration::from_secs(1)).await;
565 assert!(result.is_ok());
566 }
567
568 #[tokio::test]
569 async fn test_wait_all_ready_one_fails() {
570 let a = ServiceHandle::new("ok");
571 let b = ServiceHandle::new("broken");
572 a.set_state(ServiceState::Ready);
573 b.set_state(ServiceState::Failed("boom".into()));
574
575 let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
576 assert!(result.is_err());
577 let errors = result.unwrap_err();
578 assert_eq!(errors.len(), 1);
579 assert!(errors[0].contains("boom"));
580 }
581
582 #[tokio::test]
583 async fn test_wait_all_ready_one_timeout() {
584 let a = ServiceHandle::new("ok");
585 let b = ServiceHandle::new("slow");
586 a.set_state(ServiceState::Ready);
587 b.set_state(ServiceState::Starting);
588
589 let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
590 assert!(result.is_err());
591 let errors = result.unwrap_err();
592 assert_eq!(errors.len(), 1);
593 assert!(errors[0].contains("not ready after"));
594 }
595
596 #[tokio::test]
597 async fn test_wait_all_ready_empty_services() {
598 let result = wait_all_ready(&[], Duration::from_millis(50)).await;
599 assert!(result.is_ok());
600 }
601
602 #[tokio::test]
605 async fn test_spawn_with_retry_succeeds_first_attempt() {
606 let handle = ServiceHandle::new("ok");
607 let config = RetryConfig {
608 max_attempts: 3,
609 initial_delay: Duration::from_millis(1),
610 max_delay: Duration::from_millis(10),
611 };
612
613 let jh = spawn_with_retry(handle.clone(), config, || async { Ok(()) });
614 jh.await.unwrap();
615
616 assert_eq!(handle.state(), ServiceState::Ready);
617 }
618
619 #[tokio::test]
620 async fn test_spawn_with_retry_succeeds_after_failures() {
621 let handle = ServiceHandle::new("flaky");
622 let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
623 let counter_clone = counter.clone();
624
625 let config = RetryConfig {
626 max_attempts: 3,
627 initial_delay: Duration::from_millis(1),
628 max_delay: Duration::from_millis(10),
629 };
630
631 let jh = spawn_with_retry(handle.clone(), config, move || {
632 let c = counter_clone.clone();
633 async move {
634 let attempt = c.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
635 if attempt < 3 {
636 Err(format!("attempt {attempt} failed"))
637 } else {
638 Ok(())
639 }
640 }
641 });
642 jh.await.unwrap();
643
644 assert_eq!(handle.state(), ServiceState::Ready);
645 assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 3);
646 }
647
648 #[tokio::test]
649 async fn test_spawn_with_retry_exhausts_attempts() {
650 let handle = ServiceHandle::new("broken");
651 let config = RetryConfig {
652 max_attempts: 2,
653 initial_delay: Duration::from_millis(1),
654 max_delay: Duration::from_millis(10),
655 };
656
657 let jh = spawn_with_retry(handle.clone(), config, || async {
658 Err("still broken".to_string())
659 });
660 jh.await.unwrap();
661
662 assert!(
663 matches!(handle.state(), ServiceState::Failed(msg) if msg.contains("still broken"))
664 );
665 }
666
667 #[test]
668 fn test_retry_config_default() {
669 let config = RetryConfig::default();
670 assert_eq!(config.max_attempts, 3);
671 assert_eq!(config.initial_delay, Duration::from_secs(1));
672 assert_eq!(config.max_delay, Duration::from_secs(30));
673 }
674
675 #[test]
678 fn test_transitions_initial_state_recorded() {
679 let handle = ServiceHandle::new("test");
680 let transitions = handle.transitions();
681 assert_eq!(transitions.len(), 1);
682 assert_eq!(transitions[0].state, ServiceState::Stopped);
683 assert_eq!(transitions[0].elapsed, Duration::ZERO);
684 }
685
686 #[test]
687 fn test_transitions_records_all_changes() {
688 let handle = ServiceHandle::new("test");
689 handle.set_state(ServiceState::Starting);
690 handle.set_state(ServiceState::Ready);
691
692 let transitions = handle.transitions();
693 assert_eq!(transitions.len(), 3);
694 assert_eq!(transitions[0].state, ServiceState::Stopped);
695 assert_eq!(transitions[1].state, ServiceState::Starting);
696 assert_eq!(transitions[2].state, ServiceState::Ready);
697 }
698
699 #[test]
700 fn test_transitions_timestamps_monotonic() {
701 let handle = ServiceHandle::new("test");
702 std::thread::sleep(Duration::from_millis(5));
703 handle.set_state(ServiceState::Starting);
704 std::thread::sleep(Duration::from_millis(5));
705 handle.set_state(ServiceState::Ready);
706
707 let transitions = handle.transitions();
708 for window in transitions.windows(2) {
709 assert!(
710 window[1].elapsed >= window[0].elapsed,
711 "timestamps should be monotonically increasing"
712 );
713 }
714 assert!(transitions[2].elapsed >= Duration::from_millis(10));
716 }
717
718 #[test]
719 fn test_transitions_cloned_handle_shares_log() {
720 let h1 = ServiceHandle::new("shared");
721 let h2 = h1.clone();
722
723 h1.set_state(ServiceState::Starting);
724 h2.set_state(ServiceState::Ready);
725
726 assert_eq!(h1.transitions().len(), 3);
728 assert_eq!(h2.transitions().len(), 3);
729 }
730}