Skip to main content

fabryk_core/
service.rs

1//! Service lifecycle state management.
2//!
3//! Provides [`ServiceState`] and [`ServiceHandle`] for tracking the lifecycle
4//! of background services (index builders, search backends, etc.).
5//!
6//! # Usage
7//!
8//! ```rust
9//! use fabryk_core::service::{ServiceHandle, ServiceState};
10//!
11//! let handle = ServiceHandle::new("my-service");
12//! assert_eq!(handle.state(), ServiceState::Stopped);
13//!
14//! handle.set_state(ServiceState::Starting);
15//! handle.set_state(ServiceState::Ready);
16//! assert!(handle.state().is_ready());
17//! ```
18
19use std::fmt;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::watch;
23
24// ============================================================================
25// Free functions
26// ============================================================================
27
28/// Wait for **all** services to reach `Ready` (or fail), **in parallel**.
29///
30/// Each service gets its own concurrent `wait_ready` future sharing the
31/// same `timeout`. Returns `Ok(())` if every service becomes `Ready`
32/// within the deadline, or `Err(errors)` collecting every failure/timeout
33/// message.
34///
35/// This is more efficient than calling `wait_ready` sequentially when
36/// multiple services initialise concurrently — the wall-clock wait time
37/// equals the **slowest** service rather than the **sum** of all services.
38pub async fn wait_all_ready(
39    services: &[ServiceHandle],
40    timeout: Duration,
41) -> Result<(), Vec<String>> {
42    let futures: Vec<_> = services.iter().map(|svc| svc.wait_ready(timeout)).collect();
43
44    let results = futures::future::join_all(futures).await;
45
46    let errors: Vec<String> = results.into_iter().filter_map(|r| r.err()).collect();
47
48    if errors.is_empty() {
49        Ok(())
50    } else {
51        Err(errors)
52    }
53}
54
55/// Configuration for [`spawn_with_retry`].
56#[derive(Clone, Debug)]
57pub struct RetryConfig {
58    /// Maximum number of attempts (including the initial one).
59    pub max_attempts: u32,
60    /// Delay between retries. Doubles each attempt (exponential backoff).
61    pub initial_delay: Duration,
62    /// Maximum delay between retries (caps the exponential growth).
63    pub max_delay: Duration,
64}
65
66impl Default for RetryConfig {
67    fn default() -> Self {
68        Self {
69            max_attempts: 3,
70            initial_delay: Duration::from_secs(1),
71            max_delay: Duration::from_secs(30),
72        }
73    }
74}
75
76/// Spawn a background task that runs `init_fn` with retry on failure.
77///
78/// On each attempt the handle transitions to `Starting`. If `init_fn`
79/// succeeds the handle moves to `Ready`. If all attempts are exhausted
80/// the handle moves to `Failed` with the last error message.
81///
82/// Returns a `JoinHandle` for the background task.
83///
84/// # Example
85///
86/// ```rust,ignore
87/// use fabryk_core::service::{ServiceHandle, spawn_with_retry, RetryConfig};
88///
89/// let svc = ServiceHandle::new("redis");
90/// let cell = Arc::new(tokio::sync::OnceCell::new());
91/// let cell_bg = cell.clone();
92///
93/// spawn_with_retry(svc.clone(), RetryConfig::default(), move || {
94///     let cell = cell_bg.clone();
95///     async move {
96///         let client = RedisClient::new(&url).await?;
97///         cell.set(Arc::new(client)).ok();
98///         Ok(())
99///     }
100/// });
101/// ```
102pub fn spawn_with_retry<F, Fut>(
103    handle: ServiceHandle,
104    config: RetryConfig,
105    init_fn: F,
106) -> tokio::task::JoinHandle<()>
107where
108    F: Fn() -> Fut + Send + 'static,
109    Fut: std::future::Future<Output = Result<(), String>> + Send,
110{
111    tokio::spawn(async move {
112        let mut delay = config.initial_delay;
113
114        for attempt in 1..=config.max_attempts {
115            handle.set_state(ServiceState::Starting);
116
117            match init_fn().await {
118                Ok(()) => {
119                    handle.set_state(ServiceState::Ready);
120                    return;
121                }
122                Err(e) => {
123                    if attempt == config.max_attempts {
124                        log::error!(
125                            "Service '{}' failed after {} attempts: {e}",
126                            handle.name(),
127                            config.max_attempts
128                        );
129                        handle.set_state(ServiceState::Failed(e));
130                        return;
131                    }
132                    log::warn!(
133                        "Service '{}' attempt {}/{} failed: {e} — retrying in {delay:?}",
134                        handle.name(),
135                        attempt,
136                        config.max_attempts
137                    );
138                    tokio::time::sleep(delay).await;
139                    delay = (delay * 2).min(config.max_delay);
140                }
141            }
142        }
143    })
144}
145
146// ============================================================================
147// ServiceState
148// ============================================================================
149
150/// State of a service in its lifecycle.
151#[derive(Clone, Debug, PartialEq)]
152pub enum ServiceState {
153    /// Service has not been started.
154    Stopped,
155    /// Service is initializing (e.g., building index).
156    Starting,
157    /// Service is operational and accepting requests.
158    Ready,
159    /// Service is partially operational.
160    Degraded(String),
161    /// Service is shutting down.
162    Stopping,
163    /// Service failed to start or encountered a fatal error.
164    Failed(String),
165}
166
167impl ServiceState {
168    /// Returns `true` if the service is fully ready.
169    pub fn is_ready(&self) -> bool {
170        matches!(self, Self::Ready)
171    }
172
173    /// Returns `true` if the service can handle requests (Ready or Degraded).
174    pub fn is_available(&self) -> bool {
175        matches!(self, Self::Ready | Self::Degraded(_))
176    }
177
178    /// Returns `true` if the service is in a terminal state (Stopped or Failed).
179    pub fn is_terminal(&self) -> bool {
180        matches!(self, Self::Stopped | Self::Failed(_))
181    }
182}
183
184impl fmt::Display for ServiceState {
185    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186        match self {
187            Self::Stopped => write!(f, "stopped"),
188            Self::Starting => write!(f, "starting"),
189            Self::Ready => write!(f, "ready"),
190            Self::Degraded(reason) => write!(f, "degraded: {reason}"),
191            Self::Stopping => write!(f, "stopping"),
192            Self::Failed(reason) => write!(f, "failed: {reason}"),
193        }
194    }
195}
196
197// ============================================================================
198// ServiceHandle
199// ============================================================================
200
201/// A recorded state transition with timestamp.
202#[derive(Clone, Debug)]
203pub struct Transition {
204    /// The state that was entered.
205    pub state: ServiceState,
206    /// When the transition occurred (monotonic, relative to handle creation).
207    pub elapsed: Duration,
208}
209
210/// Thread-safe handle for observing and updating service state.
211///
212/// Cheap to clone (Arc internals). State changes are broadcast
213/// to all subscribers via a watch channel. Every transition is
214/// recorded in an audit trail accessible via [`transitions()`](Self::transitions).
215#[derive(Clone)]
216pub struct ServiceHandle {
217    inner: Arc<ServiceHandleInner>,
218}
219
220struct ServiceHandleInner {
221    name: String,
222    tx: watch::Sender<ServiceState>,
223    started_at: Instant,
224    transitions: std::sync::Mutex<Vec<Transition>>,
225}
226
227impl ServiceHandle {
228    /// Create a new service handle with the given name.
229    ///
230    /// Initial state is [`ServiceState::Stopped`].
231    pub fn new(name: impl Into<String>) -> Self {
232        let (tx, _rx) = watch::channel(ServiceState::Stopped);
233        Self {
234            inner: Arc::new(ServiceHandleInner {
235                name: name.into(),
236                tx,
237                started_at: Instant::now(),
238                transitions: std::sync::Mutex::new(vec![Transition {
239                    state: ServiceState::Stopped,
240                    elapsed: Duration::ZERO,
241                }]),
242            }),
243        }
244    }
245
246    /// Get the service name.
247    pub fn name(&self) -> &str {
248        &self.inner.name
249    }
250
251    /// Get the current service state.
252    pub fn state(&self) -> ServiceState {
253        self.inner.tx.borrow().clone()
254    }
255
256    /// Update the service state.
257    ///
258    /// All subscribers are notified of the change. The transition is
259    /// recorded in the audit trail with a monotonic timestamp.
260    pub fn set_state(&self, state: ServiceState) {
261        match &state {
262            ServiceState::Failed(_) => {
263                log::error!("Service '{}' → {state}", self.inner.name);
264            }
265            ServiceState::Degraded(_) => {
266                log::warn!("Service '{}' → {state}", self.inner.name);
267            }
268            _ => {
269                log::info!("Service '{}' → {state}", self.inner.name);
270            }
271        }
272        if let Ok(mut log) = self.inner.transitions.lock() {
273            log.push(Transition {
274                state: state.clone(),
275                elapsed: self.inner.started_at.elapsed(),
276            });
277        }
278        self.inner.tx.send_replace(state);
279    }
280
281    /// Get the full transition audit trail.
282    ///
283    /// Returns a snapshot of all recorded state transitions, each with a
284    /// monotonic timestamp relative to handle creation. Useful for
285    /// diagnostics and debugging startup timing.
286    pub fn transitions(&self) -> Vec<Transition> {
287        self.inner
288            .transitions
289            .lock()
290            .map(|log| log.clone())
291            .unwrap_or_default()
292    }
293
294    /// Subscribe to state changes.
295    pub fn subscribe(&self) -> watch::Receiver<ServiceState> {
296        self.inner.tx.subscribe()
297    }
298
299    /// Wait until the service reaches Ready, Failed, or timeout.
300    pub async fn wait_ready(&self, timeout: Duration) -> Result<(), String> {
301        let mut rx = self.subscribe();
302        let deadline = tokio::time::sleep(timeout);
303        tokio::pin!(deadline);
304
305        // Check current state first
306        {
307            let state = rx.borrow_and_update().clone();
308            match state {
309                ServiceState::Ready => return Ok(()),
310                ServiceState::Failed(reason) => {
311                    return Err(format!("Service '{}' failed: {reason}", self.inner.name));
312                }
313                _ => {}
314            }
315        }
316
317        loop {
318            tokio::select! {
319                _ = &mut deadline => {
320                    return Err(format!(
321                        "Service '{}' not ready after {timeout:?} (state: {})",
322                        self.inner.name, self.state()
323                    ));
324                }
325                result = rx.changed() => {
326                    if result.is_err() {
327                        return Err(format!("Service '{}' channel closed", self.inner.name));
328                    }
329                    let state = rx.borrow().clone();
330                    match state {
331                        ServiceState::Ready => return Ok(()),
332                        ServiceState::Failed(reason) => {
333                            return Err(format!(
334                                "Service '{}' failed: {reason}",
335                                self.inner.name
336                            ));
337                        }
338                        _ => continue,
339                    }
340                }
341            }
342        }
343    }
344
345    /// Elapsed time since the handle was created.
346    pub fn elapsed(&self) -> Duration {
347        self.inner.started_at.elapsed()
348    }
349}
350
351impl fmt::Debug for ServiceHandle {
352    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353        f.debug_struct("ServiceHandle")
354            .field("name", &self.inner.name)
355            .field("state", &self.state())
356            .finish()
357    }
358}
359
360// ============================================================================
361// Tests
362// ============================================================================
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367
368    #[test]
369    fn test_service_state_display() {
370        assert_eq!(ServiceState::Stopped.to_string(), "stopped");
371        assert_eq!(ServiceState::Starting.to_string(), "starting");
372        assert_eq!(ServiceState::Ready.to_string(), "ready");
373        assert_eq!(
374            ServiceState::Degraded("low memory".to_string()).to_string(),
375            "degraded: low memory"
376        );
377        assert_eq!(ServiceState::Stopping.to_string(), "stopping");
378        assert_eq!(
379            ServiceState::Failed("crash".to_string()).to_string(),
380            "failed: crash"
381        );
382    }
383
384    #[test]
385    fn test_service_state_predicates() {
386        assert!(ServiceState::Ready.is_ready());
387        assert!(!ServiceState::Starting.is_ready());
388        assert!(!ServiceState::Degraded("x".into()).is_ready());
389
390        assert!(ServiceState::Ready.is_available());
391        assert!(ServiceState::Degraded("x".into()).is_available());
392        assert!(!ServiceState::Starting.is_available());
393        assert!(!ServiceState::Stopped.is_available());
394
395        assert!(ServiceState::Stopped.is_terminal());
396        assert!(ServiceState::Failed("x".into()).is_terminal());
397        assert!(!ServiceState::Ready.is_terminal());
398        assert!(!ServiceState::Starting.is_terminal());
399    }
400
401    #[test]
402    fn test_service_handle_initial_state() {
403        let handle = ServiceHandle::new("test");
404        assert_eq!(handle.name(), "test");
405        assert_eq!(handle.state(), ServiceState::Stopped);
406    }
407
408    #[test]
409    fn test_service_handle_state_transitions() {
410        let handle = ServiceHandle::new("test");
411
412        handle.set_state(ServiceState::Starting);
413        assert_eq!(handle.state(), ServiceState::Starting);
414
415        handle.set_state(ServiceState::Ready);
416        assert_eq!(handle.state(), ServiceState::Ready);
417
418        handle.set_state(ServiceState::Stopping);
419        assert_eq!(handle.state(), ServiceState::Stopping);
420
421        handle.set_state(ServiceState::Stopped);
422        assert_eq!(handle.state(), ServiceState::Stopped);
423    }
424
425    #[test]
426    fn test_service_handle_clone_shares_state() {
427        let handle1 = ServiceHandle::new("shared");
428        let handle2 = handle1.clone();
429
430        handle1.set_state(ServiceState::Ready);
431        assert_eq!(handle2.state(), ServiceState::Ready);
432
433        handle2.set_state(ServiceState::Stopping);
434        assert_eq!(handle1.state(), ServiceState::Stopping);
435    }
436
437    #[test]
438    fn test_service_handle_subscribe() {
439        let handle = ServiceHandle::new("test");
440        let mut rx = handle.subscribe();
441
442        // Initial value
443        assert_eq!(*rx.borrow(), ServiceState::Stopped);
444
445        handle.set_state(ServiceState::Starting);
446        // Note: watch::Receiver sees latest value on next borrow
447        assert_eq!(*rx.borrow_and_update(), ServiceState::Starting);
448    }
449
450    #[tokio::test]
451    async fn test_service_handle_wait_ready_success() {
452        let handle = ServiceHandle::new("test");
453        let h = handle.clone();
454
455        tokio::spawn(async move {
456            tokio::time::sleep(Duration::from_millis(10)).await;
457            h.set_state(ServiceState::Starting);
458            tokio::time::sleep(Duration::from_millis(10)).await;
459            h.set_state(ServiceState::Ready);
460        });
461
462        let result = handle.wait_ready(Duration::from_secs(1)).await;
463        assert!(result.is_ok());
464    }
465
466    #[tokio::test]
467    async fn test_service_handle_wait_ready_timeout() {
468        let handle = ServiceHandle::new("slow");
469        handle.set_state(ServiceState::Starting);
470
471        let result = handle.wait_ready(Duration::from_millis(50)).await;
472        assert!(result.is_err());
473        assert!(result.unwrap_err().contains("not ready after"));
474    }
475
476    #[tokio::test]
477    async fn test_service_handle_wait_ready_failed() {
478        let handle = ServiceHandle::new("broken");
479        let h = handle.clone();
480
481        tokio::spawn(async move {
482            tokio::time::sleep(Duration::from_millis(10)).await;
483            h.set_state(ServiceState::Failed("out of memory".to_string()));
484        });
485
486        let result = handle.wait_ready(Duration::from_secs(1)).await;
487        assert!(result.is_err());
488        let err = result.unwrap_err();
489        assert!(err.contains("failed"));
490        assert!(err.contains("out of memory"));
491    }
492
493    #[tokio::test]
494    async fn test_service_handle_wait_ready_already_ready() {
495        let handle = ServiceHandle::new("instant");
496        handle.set_state(ServiceState::Ready);
497
498        let result = handle.wait_ready(Duration::from_millis(50)).await;
499        assert!(result.is_ok());
500    }
501
502    #[tokio::test]
503    async fn test_service_handle_wait_ready_already_failed() {
504        let handle = ServiceHandle::new("instant-fail");
505        handle.set_state(ServiceState::Failed("boom".to_string()));
506
507        let result = handle.wait_ready(Duration::from_millis(50)).await;
508        assert!(result.is_err());
509        assert!(result.unwrap_err().contains("boom"));
510    }
511
512    #[test]
513    fn test_service_handle_elapsed() {
514        let handle = ServiceHandle::new("test");
515        std::thread::sleep(Duration::from_millis(10));
516        assert!(handle.elapsed() >= Duration::from_millis(10));
517    }
518
519    #[test]
520    fn test_service_handle_debug() {
521        let handle = ServiceHandle::new("debug-test");
522        let debug = format!("{:?}", handle);
523        assert!(debug.contains("debug-test"));
524        assert!(debug.contains("ServiceHandle"));
525    }
526
527    // Compile-time check: ServiceHandle must be Send + Sync
528    fn _assert_send_sync<T: Send + Sync>() {}
529    #[test]
530    fn test_service_handle_send_sync() {
531        _assert_send_sync::<ServiceHandle>();
532        _assert_send_sync::<ServiceState>();
533    }
534
535    // ── wait_all_ready tests ────────────────────────────────────────
536
537    #[tokio::test]
538    async fn test_wait_all_ready_all_already_ready() {
539        let a = ServiceHandle::new("a");
540        let b = ServiceHandle::new("b");
541        a.set_state(ServiceState::Ready);
542        b.set_state(ServiceState::Ready);
543
544        let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
545        assert!(result.is_ok());
546    }
547
548    #[tokio::test]
549    async fn test_wait_all_ready_parallel_startup() {
550        let a = ServiceHandle::new("a");
551        let b = ServiceHandle::new("b");
552
553        let ha = a.clone();
554        let hb = b.clone();
555        tokio::spawn(async move {
556            tokio::time::sleep(Duration::from_millis(10)).await;
557            ha.set_state(ServiceState::Ready);
558        });
559        tokio::spawn(async move {
560            tokio::time::sleep(Duration::from_millis(20)).await;
561            hb.set_state(ServiceState::Ready);
562        });
563
564        let result = wait_all_ready(&[a, b], Duration::from_secs(1)).await;
565        assert!(result.is_ok());
566    }
567
568    #[tokio::test]
569    async fn test_wait_all_ready_one_fails() {
570        let a = ServiceHandle::new("ok");
571        let b = ServiceHandle::new("broken");
572        a.set_state(ServiceState::Ready);
573        b.set_state(ServiceState::Failed("boom".into()));
574
575        let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
576        assert!(result.is_err());
577        let errors = result.unwrap_err();
578        assert_eq!(errors.len(), 1);
579        assert!(errors[0].contains("boom"));
580    }
581
582    #[tokio::test]
583    async fn test_wait_all_ready_one_timeout() {
584        let a = ServiceHandle::new("ok");
585        let b = ServiceHandle::new("slow");
586        a.set_state(ServiceState::Ready);
587        b.set_state(ServiceState::Starting);
588
589        let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
590        assert!(result.is_err());
591        let errors = result.unwrap_err();
592        assert_eq!(errors.len(), 1);
593        assert!(errors[0].contains("not ready after"));
594    }
595
596    #[tokio::test]
597    async fn test_wait_all_ready_empty_services() {
598        let result = wait_all_ready(&[], Duration::from_millis(50)).await;
599        assert!(result.is_ok());
600    }
601
602    // ── spawn_with_retry tests ──────────────────────────────────────
603
604    #[tokio::test]
605    async fn test_spawn_with_retry_succeeds_first_attempt() {
606        let handle = ServiceHandle::new("ok");
607        let config = RetryConfig {
608            max_attempts: 3,
609            initial_delay: Duration::from_millis(1),
610            max_delay: Duration::from_millis(10),
611        };
612
613        let jh = spawn_with_retry(handle.clone(), config, || async { Ok(()) });
614        jh.await.unwrap();
615
616        assert_eq!(handle.state(), ServiceState::Ready);
617    }
618
619    #[tokio::test]
620    async fn test_spawn_with_retry_succeeds_after_failures() {
621        let handle = ServiceHandle::new("flaky");
622        let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
623        let counter_clone = counter.clone();
624
625        let config = RetryConfig {
626            max_attempts: 3,
627            initial_delay: Duration::from_millis(1),
628            max_delay: Duration::from_millis(10),
629        };
630
631        let jh = spawn_with_retry(handle.clone(), config, move || {
632            let c = counter_clone.clone();
633            async move {
634                let attempt = c.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
635                if attempt < 3 {
636                    Err(format!("attempt {attempt} failed"))
637                } else {
638                    Ok(())
639                }
640            }
641        });
642        jh.await.unwrap();
643
644        assert_eq!(handle.state(), ServiceState::Ready);
645        assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 3);
646    }
647
648    #[tokio::test]
649    async fn test_spawn_with_retry_exhausts_attempts() {
650        let handle = ServiceHandle::new("broken");
651        let config = RetryConfig {
652            max_attempts: 2,
653            initial_delay: Duration::from_millis(1),
654            max_delay: Duration::from_millis(10),
655        };
656
657        let jh = spawn_with_retry(handle.clone(), config, || async {
658            Err("still broken".to_string())
659        });
660        jh.await.unwrap();
661
662        assert!(
663            matches!(handle.state(), ServiceState::Failed(msg) if msg.contains("still broken"))
664        );
665    }
666
667    #[test]
668    fn test_retry_config_default() {
669        let config = RetryConfig::default();
670        assert_eq!(config.max_attempts, 3);
671        assert_eq!(config.initial_delay, Duration::from_secs(1));
672        assert_eq!(config.max_delay, Duration::from_secs(30));
673    }
674
675    // ── transition audit trail tests ────────────────────────────────
676
677    #[test]
678    fn test_transitions_initial_state_recorded() {
679        let handle = ServiceHandle::new("test");
680        let transitions = handle.transitions();
681        assert_eq!(transitions.len(), 1);
682        assert_eq!(transitions[0].state, ServiceState::Stopped);
683        assert_eq!(transitions[0].elapsed, Duration::ZERO);
684    }
685
686    #[test]
687    fn test_transitions_records_all_changes() {
688        let handle = ServiceHandle::new("test");
689        handle.set_state(ServiceState::Starting);
690        handle.set_state(ServiceState::Ready);
691
692        let transitions = handle.transitions();
693        assert_eq!(transitions.len(), 3);
694        assert_eq!(transitions[0].state, ServiceState::Stopped);
695        assert_eq!(transitions[1].state, ServiceState::Starting);
696        assert_eq!(transitions[2].state, ServiceState::Ready);
697    }
698
699    #[test]
700    fn test_transitions_timestamps_monotonic() {
701        let handle = ServiceHandle::new("test");
702        std::thread::sleep(Duration::from_millis(5));
703        handle.set_state(ServiceState::Starting);
704        std::thread::sleep(Duration::from_millis(5));
705        handle.set_state(ServiceState::Ready);
706
707        let transitions = handle.transitions();
708        for window in transitions.windows(2) {
709            assert!(
710                window[1].elapsed >= window[0].elapsed,
711                "timestamps should be monotonically increasing"
712            );
713        }
714        // The last transition should have a non-zero elapsed
715        assert!(transitions[2].elapsed >= Duration::from_millis(10));
716    }
717
718    #[test]
719    fn test_transitions_cloned_handle_shares_log() {
720        let h1 = ServiceHandle::new("shared");
721        let h2 = h1.clone();
722
723        h1.set_state(ServiceState::Starting);
724        h2.set_state(ServiceState::Ready);
725
726        // Both should see all 3 transitions
727        assert_eq!(h1.transitions().len(), 3);
728        assert_eq!(h2.transitions().len(), 3);
729    }
730}