Skip to main content

fabryk_core/
service.rs

1//! Service lifecycle state management.
2//!
3//! Provides [`ServiceState`] and [`ServiceHandle`] for tracking the lifecycle
4//! of background services (index builders, search backends, etc.).
5//!
6//! # Usage
7//!
8//! ```rust
9//! use fabryk_core::service::{ServiceHandle, ServiceState};
10//!
11//! let handle = ServiceHandle::new("my-service");
12//! assert_eq!(handle.state(), ServiceState::Stopped);
13//!
14//! handle.set_state(ServiceState::Starting);
15//! handle.set_state(ServiceState::Ready);
16//! assert!(handle.state().is_ready());
17//! ```
18
19use std::fmt;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::watch;
23
24// ============================================================================
25// Free functions
26// ============================================================================
27
28/// Wait for **all** services to reach `Ready` (or fail), **in parallel**.
29///
30/// Each service gets its own concurrent `wait_ready` future sharing the
31/// same `timeout`. Returns `Ok(())` if every service becomes `Ready`
32/// within the deadline, or `Err(errors)` collecting every failure/timeout
33/// message.
34///
35/// This is more efficient than calling `wait_ready` sequentially when
36/// multiple services initialise concurrently — the wall-clock wait time
37/// equals the **slowest** service rather than the **sum** of all services.
38pub async fn wait_all_ready(
39    services: &[ServiceHandle],
40    timeout: Duration,
41) -> Result<(), Vec<String>> {
42    let futures: Vec<_> = services.iter().map(|svc| svc.wait_ready(timeout)).collect();
43
44    let results = futures::future::join_all(futures).await;
45
46    let errors: Vec<String> = results.into_iter().filter_map(|r| r.err()).collect();
47
48    if errors.is_empty() {
49        Ok(())
50    } else {
51        Err(errors)
52    }
53}
54
55/// Configuration for [`spawn_with_retry`].
56#[derive(Clone, Debug)]
57pub struct RetryConfig {
58    /// Maximum number of attempts (including the initial one).
59    pub max_attempts: u32,
60    /// Delay between retries. Doubles each attempt (exponential backoff).
61    pub initial_delay: Duration,
62    /// Maximum delay between retries (caps the exponential growth).
63    pub max_delay: Duration,
64}
65
66impl Default for RetryConfig {
67    fn default() -> Self {
68        Self {
69            max_attempts: 3,
70            initial_delay: Duration::from_secs(1),
71            max_delay: Duration::from_secs(30),
72        }
73    }
74}
75
76/// Spawn a background task that runs `init_fn` with retry on failure.
77///
78/// On each attempt the handle transitions to `Starting`. If `init_fn`
79/// succeeds the handle moves to `Ready`. If all attempts are exhausted
80/// the handle moves to `Failed` with the last error message.
81///
82/// Returns a `JoinHandle` for the background task.
83///
84/// # Example
85///
86/// ```rust,ignore
87/// use fabryk_core::service::{ServiceHandle, spawn_with_retry, RetryConfig};
88///
89/// let svc = ServiceHandle::new("redis");
90/// let cell = Arc::new(tokio::sync::OnceCell::new());
91/// let cell_bg = cell.clone();
92///
93/// spawn_with_retry(svc.clone(), RetryConfig::default(), move || {
94///     let cell = cell_bg.clone();
95///     async move {
96///         let client = RedisClient::new(&url).await?;
97///         cell.set(Arc::new(client)).ok();
98///         Ok(())
99///     }
100/// });
101/// ```
102pub fn spawn_with_retry<F, Fut>(
103    handle: ServiceHandle,
104    config: RetryConfig,
105    init_fn: F,
106) -> tokio::task::JoinHandle<()>
107where
108    F: Fn() -> Fut + Send + 'static,
109    Fut: std::future::Future<Output = Result<(), String>> + Send,
110{
111    tokio::spawn(async move {
112        let mut delay = config.initial_delay;
113
114        for attempt in 1..=config.max_attempts {
115            handle.set_state(ServiceState::Starting);
116
117            match init_fn().await {
118                Ok(()) => {
119                    handle.set_state(ServiceState::Ready);
120                    return;
121                }
122                Err(e) => {
123                    if attempt == config.max_attempts {
124                        log::error!(
125                            "Service '{}' failed after {} attempts: {e}",
126                            handle.name(),
127                            config.max_attempts
128                        );
129                        handle.set_state(ServiceState::Failed(e));
130                        return;
131                    }
132                    log::warn!(
133                        "Service '{}' attempt {}/{} failed: {e} — retrying in {delay:?}",
134                        handle.name(),
135                        attempt,
136                        config.max_attempts
137                    );
138                    tokio::time::sleep(delay).await;
139                    delay = (delay * 2).min(config.max_delay);
140                }
141            }
142        }
143    })
144}
145
146// ============================================================================
147// ServiceState
148// ============================================================================
149
150/// State of a service in its lifecycle.
151#[derive(Clone, Debug, PartialEq)]
152pub enum ServiceState {
153    /// Service has not been started.
154    Stopped,
155    /// Service is initializing (e.g., building index).
156    Starting,
157    /// Service is operational and accepting requests.
158    Ready,
159    /// Service is partially operational.
160    Degraded(String),
161    /// Service is shutting down.
162    Stopping,
163    /// Service failed to start or encountered a fatal error.
164    Failed(String),
165}
166
167impl ServiceState {
168    /// Returns `true` if the service is fully ready.
169    pub fn is_ready(&self) -> bool {
170        matches!(self, Self::Ready)
171    }
172
173    /// Returns `true` if the service can handle requests (Ready or Degraded).
174    pub fn is_available(&self) -> bool {
175        matches!(self, Self::Ready | Self::Degraded(_))
176    }
177
178    /// Returns `true` if the service is in a terminal state (Stopped or Failed).
179    pub fn is_terminal(&self) -> bool {
180        matches!(self, Self::Stopped | Self::Failed(_))
181    }
182}
183
184impl fmt::Display for ServiceState {
185    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186        match self {
187            Self::Stopped => write!(f, "stopped"),
188            Self::Starting => write!(f, "starting"),
189            Self::Ready => write!(f, "ready"),
190            Self::Degraded(reason) => write!(f, "degraded: {reason}"),
191            Self::Stopping => write!(f, "stopping"),
192            Self::Failed(reason) => write!(f, "failed: {reason}"),
193        }
194    }
195}
196
197// ============================================================================
198// ServiceHandle
199// ============================================================================
200
201/// A recorded state transition with timestamp.
202#[derive(Clone, Debug)]
203pub struct Transition {
204    /// The state that was entered.
205    pub state: ServiceState,
206    /// When the transition occurred (monotonic, relative to handle creation).
207    pub elapsed: Duration,
208}
209
210/// Thread-safe handle for observing and updating service state.
211///
212/// Cheap to clone (Arc internals). State changes are broadcast
213/// to all subscribers via a watch channel. Every transition is
214/// recorded in an audit trail accessible via [`transitions()`](Self::transitions).
215#[derive(Clone)]
216pub struct ServiceHandle {
217    inner: Arc<ServiceHandleInner>,
218}
219
220struct ServiceHandleInner {
221    name: String,
222    tx: watch::Sender<ServiceState>,
223    started_at: Instant,
224    transitions: std::sync::Mutex<Vec<Transition>>,
225}
226
227impl ServiceHandle {
228    /// Create a new service handle with the given name.
229    ///
230    /// Initial state is [`ServiceState::Stopped`].
231    pub fn new(name: impl Into<String>) -> Self {
232        let (tx, _rx) = watch::channel(ServiceState::Stopped);
233        Self {
234            inner: Arc::new(ServiceHandleInner {
235                name: name.into(),
236                tx,
237                started_at: Instant::now(),
238                transitions: std::sync::Mutex::new(vec![Transition {
239                    state: ServiceState::Stopped,
240                    elapsed: Duration::ZERO,
241                }]),
242            }),
243        }
244    }
245
246    /// Get the service name.
247    pub fn name(&self) -> &str {
248        &self.inner.name
249    }
250
251    /// Get the current service state.
252    pub fn state(&self) -> ServiceState {
253        self.inner.tx.borrow().clone()
254    }
255
256    /// Update the service state.
257    ///
258    /// All subscribers are notified of the change. The transition is
259    /// recorded in the audit trail with a monotonic timestamp.
260    pub fn set_state(&self, state: ServiceState) {
261        log::info!("Service '{}' → {state}", self.inner.name);
262        if let Ok(mut log) = self.inner.transitions.lock() {
263            log.push(Transition {
264                state: state.clone(),
265                elapsed: self.inner.started_at.elapsed(),
266            });
267        }
268        self.inner.tx.send_replace(state);
269    }
270
271    /// Get the full transition audit trail.
272    ///
273    /// Returns a snapshot of all recorded state transitions, each with a
274    /// monotonic timestamp relative to handle creation. Useful for
275    /// diagnostics and debugging startup timing.
276    pub fn transitions(&self) -> Vec<Transition> {
277        self.inner
278            .transitions
279            .lock()
280            .map(|log| log.clone())
281            .unwrap_or_default()
282    }
283
284    /// Subscribe to state changes.
285    pub fn subscribe(&self) -> watch::Receiver<ServiceState> {
286        self.inner.tx.subscribe()
287    }
288
289    /// Wait until the service reaches Ready, Failed, or timeout.
290    pub async fn wait_ready(&self, timeout: Duration) -> Result<(), String> {
291        let mut rx = self.subscribe();
292        let deadline = tokio::time::sleep(timeout);
293        tokio::pin!(deadline);
294
295        // Check current state first
296        {
297            let state = rx.borrow_and_update().clone();
298            match state {
299                ServiceState::Ready => return Ok(()),
300                ServiceState::Failed(reason) => {
301                    return Err(format!("Service '{}' failed: {reason}", self.inner.name));
302                }
303                _ => {}
304            }
305        }
306
307        loop {
308            tokio::select! {
309                _ = &mut deadline => {
310                    return Err(format!(
311                        "Service '{}' not ready after {timeout:?} (state: {})",
312                        self.inner.name, self.state()
313                    ));
314                }
315                result = rx.changed() => {
316                    if result.is_err() {
317                        return Err(format!("Service '{}' channel closed", self.inner.name));
318                    }
319                    let state = rx.borrow().clone();
320                    match state {
321                        ServiceState::Ready => return Ok(()),
322                        ServiceState::Failed(reason) => {
323                            return Err(format!(
324                                "Service '{}' failed: {reason}",
325                                self.inner.name
326                            ));
327                        }
328                        _ => continue,
329                    }
330                }
331            }
332        }
333    }
334
335    /// Elapsed time since the handle was created.
336    pub fn elapsed(&self) -> Duration {
337        self.inner.started_at.elapsed()
338    }
339}
340
341impl fmt::Debug for ServiceHandle {
342    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343        f.debug_struct("ServiceHandle")
344            .field("name", &self.inner.name)
345            .field("state", &self.state())
346            .finish()
347    }
348}
349
350// ============================================================================
351// Tests
352// ============================================================================
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    #[test]
359    fn test_service_state_display() {
360        assert_eq!(ServiceState::Stopped.to_string(), "stopped");
361        assert_eq!(ServiceState::Starting.to_string(), "starting");
362        assert_eq!(ServiceState::Ready.to_string(), "ready");
363        assert_eq!(
364            ServiceState::Degraded("low memory".to_string()).to_string(),
365            "degraded: low memory"
366        );
367        assert_eq!(ServiceState::Stopping.to_string(), "stopping");
368        assert_eq!(
369            ServiceState::Failed("crash".to_string()).to_string(),
370            "failed: crash"
371        );
372    }
373
374    #[test]
375    fn test_service_state_predicates() {
376        assert!(ServiceState::Ready.is_ready());
377        assert!(!ServiceState::Starting.is_ready());
378        assert!(!ServiceState::Degraded("x".into()).is_ready());
379
380        assert!(ServiceState::Ready.is_available());
381        assert!(ServiceState::Degraded("x".into()).is_available());
382        assert!(!ServiceState::Starting.is_available());
383        assert!(!ServiceState::Stopped.is_available());
384
385        assert!(ServiceState::Stopped.is_terminal());
386        assert!(ServiceState::Failed("x".into()).is_terminal());
387        assert!(!ServiceState::Ready.is_terminal());
388        assert!(!ServiceState::Starting.is_terminal());
389    }
390
391    #[test]
392    fn test_service_handle_initial_state() {
393        let handle = ServiceHandle::new("test");
394        assert_eq!(handle.name(), "test");
395        assert_eq!(handle.state(), ServiceState::Stopped);
396    }
397
398    #[test]
399    fn test_service_handle_state_transitions() {
400        let handle = ServiceHandle::new("test");
401
402        handle.set_state(ServiceState::Starting);
403        assert_eq!(handle.state(), ServiceState::Starting);
404
405        handle.set_state(ServiceState::Ready);
406        assert_eq!(handle.state(), ServiceState::Ready);
407
408        handle.set_state(ServiceState::Stopping);
409        assert_eq!(handle.state(), ServiceState::Stopping);
410
411        handle.set_state(ServiceState::Stopped);
412        assert_eq!(handle.state(), ServiceState::Stopped);
413    }
414
415    #[test]
416    fn test_service_handle_clone_shares_state() {
417        let handle1 = ServiceHandle::new("shared");
418        let handle2 = handle1.clone();
419
420        handle1.set_state(ServiceState::Ready);
421        assert_eq!(handle2.state(), ServiceState::Ready);
422
423        handle2.set_state(ServiceState::Stopping);
424        assert_eq!(handle1.state(), ServiceState::Stopping);
425    }
426
427    #[test]
428    fn test_service_handle_subscribe() {
429        let handle = ServiceHandle::new("test");
430        let mut rx = handle.subscribe();
431
432        // Initial value
433        assert_eq!(*rx.borrow(), ServiceState::Stopped);
434
435        handle.set_state(ServiceState::Starting);
436        // Note: watch::Receiver sees latest value on next borrow
437        assert_eq!(*rx.borrow_and_update(), ServiceState::Starting);
438    }
439
440    #[tokio::test]
441    async fn test_service_handle_wait_ready_success() {
442        let handle = ServiceHandle::new("test");
443        let h = handle.clone();
444
445        tokio::spawn(async move {
446            tokio::time::sleep(Duration::from_millis(10)).await;
447            h.set_state(ServiceState::Starting);
448            tokio::time::sleep(Duration::from_millis(10)).await;
449            h.set_state(ServiceState::Ready);
450        });
451
452        let result = handle.wait_ready(Duration::from_secs(1)).await;
453        assert!(result.is_ok());
454    }
455
456    #[tokio::test]
457    async fn test_service_handle_wait_ready_timeout() {
458        let handle = ServiceHandle::new("slow");
459        handle.set_state(ServiceState::Starting);
460
461        let result = handle.wait_ready(Duration::from_millis(50)).await;
462        assert!(result.is_err());
463        assert!(result.unwrap_err().contains("not ready after"));
464    }
465
466    #[tokio::test]
467    async fn test_service_handle_wait_ready_failed() {
468        let handle = ServiceHandle::new("broken");
469        let h = handle.clone();
470
471        tokio::spawn(async move {
472            tokio::time::sleep(Duration::from_millis(10)).await;
473            h.set_state(ServiceState::Failed("out of memory".to_string()));
474        });
475
476        let result = handle.wait_ready(Duration::from_secs(1)).await;
477        assert!(result.is_err());
478        let err = result.unwrap_err();
479        assert!(err.contains("failed"));
480        assert!(err.contains("out of memory"));
481    }
482
483    #[tokio::test]
484    async fn test_service_handle_wait_ready_already_ready() {
485        let handle = ServiceHandle::new("instant");
486        handle.set_state(ServiceState::Ready);
487
488        let result = handle.wait_ready(Duration::from_millis(50)).await;
489        assert!(result.is_ok());
490    }
491
492    #[tokio::test]
493    async fn test_service_handle_wait_ready_already_failed() {
494        let handle = ServiceHandle::new("instant-fail");
495        handle.set_state(ServiceState::Failed("boom".to_string()));
496
497        let result = handle.wait_ready(Duration::from_millis(50)).await;
498        assert!(result.is_err());
499        assert!(result.unwrap_err().contains("boom"));
500    }
501
502    #[test]
503    fn test_service_handle_elapsed() {
504        let handle = ServiceHandle::new("test");
505        std::thread::sleep(Duration::from_millis(10));
506        assert!(handle.elapsed() >= Duration::from_millis(10));
507    }
508
509    #[test]
510    fn test_service_handle_debug() {
511        let handle = ServiceHandle::new("debug-test");
512        let debug = format!("{:?}", handle);
513        assert!(debug.contains("debug-test"));
514        assert!(debug.contains("ServiceHandle"));
515    }
516
517    // Compile-time check: ServiceHandle must be Send + Sync
518    fn _assert_send_sync<T: Send + Sync>() {}
519    #[test]
520    fn test_service_handle_send_sync() {
521        _assert_send_sync::<ServiceHandle>();
522        _assert_send_sync::<ServiceState>();
523    }
524
525    // ── wait_all_ready tests ────────────────────────────────────────
526
527    #[tokio::test]
528    async fn test_wait_all_ready_all_already_ready() {
529        let a = ServiceHandle::new("a");
530        let b = ServiceHandle::new("b");
531        a.set_state(ServiceState::Ready);
532        b.set_state(ServiceState::Ready);
533
534        let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
535        assert!(result.is_ok());
536    }
537
538    #[tokio::test]
539    async fn test_wait_all_ready_parallel_startup() {
540        let a = ServiceHandle::new("a");
541        let b = ServiceHandle::new("b");
542
543        let ha = a.clone();
544        let hb = b.clone();
545        tokio::spawn(async move {
546            tokio::time::sleep(Duration::from_millis(10)).await;
547            ha.set_state(ServiceState::Ready);
548        });
549        tokio::spawn(async move {
550            tokio::time::sleep(Duration::from_millis(20)).await;
551            hb.set_state(ServiceState::Ready);
552        });
553
554        let result = wait_all_ready(&[a, b], Duration::from_secs(1)).await;
555        assert!(result.is_ok());
556    }
557
558    #[tokio::test]
559    async fn test_wait_all_ready_one_fails() {
560        let a = ServiceHandle::new("ok");
561        let b = ServiceHandle::new("broken");
562        a.set_state(ServiceState::Ready);
563        b.set_state(ServiceState::Failed("boom".into()));
564
565        let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
566        assert!(result.is_err());
567        let errors = result.unwrap_err();
568        assert_eq!(errors.len(), 1);
569        assert!(errors[0].contains("boom"));
570    }
571
572    #[tokio::test]
573    async fn test_wait_all_ready_one_timeout() {
574        let a = ServiceHandle::new("ok");
575        let b = ServiceHandle::new("slow");
576        a.set_state(ServiceState::Ready);
577        b.set_state(ServiceState::Starting);
578
579        let result = wait_all_ready(&[a, b], Duration::from_millis(50)).await;
580        assert!(result.is_err());
581        let errors = result.unwrap_err();
582        assert_eq!(errors.len(), 1);
583        assert!(errors[0].contains("not ready after"));
584    }
585
586    #[tokio::test]
587    async fn test_wait_all_ready_empty_services() {
588        let result = wait_all_ready(&[], Duration::from_millis(50)).await;
589        assert!(result.is_ok());
590    }
591
592    // ── spawn_with_retry tests ──────────────────────────────────────
593
594    #[tokio::test]
595    async fn test_spawn_with_retry_succeeds_first_attempt() {
596        let handle = ServiceHandle::new("ok");
597        let config = RetryConfig {
598            max_attempts: 3,
599            initial_delay: Duration::from_millis(1),
600            max_delay: Duration::from_millis(10),
601        };
602
603        let jh = spawn_with_retry(handle.clone(), config, || async { Ok(()) });
604        jh.await.unwrap();
605
606        assert_eq!(handle.state(), ServiceState::Ready);
607    }
608
609    #[tokio::test]
610    async fn test_spawn_with_retry_succeeds_after_failures() {
611        let handle = ServiceHandle::new("flaky");
612        let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
613        let counter_clone = counter.clone();
614
615        let config = RetryConfig {
616            max_attempts: 3,
617            initial_delay: Duration::from_millis(1),
618            max_delay: Duration::from_millis(10),
619        };
620
621        let jh = spawn_with_retry(handle.clone(), config, move || {
622            let c = counter_clone.clone();
623            async move {
624                let attempt = c.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
625                if attempt < 3 {
626                    Err(format!("attempt {attempt} failed"))
627                } else {
628                    Ok(())
629                }
630            }
631        });
632        jh.await.unwrap();
633
634        assert_eq!(handle.state(), ServiceState::Ready);
635        assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 3);
636    }
637
638    #[tokio::test]
639    async fn test_spawn_with_retry_exhausts_attempts() {
640        let handle = ServiceHandle::new("broken");
641        let config = RetryConfig {
642            max_attempts: 2,
643            initial_delay: Duration::from_millis(1),
644            max_delay: Duration::from_millis(10),
645        };
646
647        let jh = spawn_with_retry(handle.clone(), config, || async {
648            Err("still broken".to_string())
649        });
650        jh.await.unwrap();
651
652        assert!(
653            matches!(handle.state(), ServiceState::Failed(msg) if msg.contains("still broken"))
654        );
655    }
656
657    #[test]
658    fn test_retry_config_default() {
659        let config = RetryConfig::default();
660        assert_eq!(config.max_attempts, 3);
661        assert_eq!(config.initial_delay, Duration::from_secs(1));
662        assert_eq!(config.max_delay, Duration::from_secs(30));
663    }
664
665    // ── transition audit trail tests ────────────────────────────────
666
667    #[test]
668    fn test_transitions_initial_state_recorded() {
669        let handle = ServiceHandle::new("test");
670        let transitions = handle.transitions();
671        assert_eq!(transitions.len(), 1);
672        assert_eq!(transitions[0].state, ServiceState::Stopped);
673        assert_eq!(transitions[0].elapsed, Duration::ZERO);
674    }
675
676    #[test]
677    fn test_transitions_records_all_changes() {
678        let handle = ServiceHandle::new("test");
679        handle.set_state(ServiceState::Starting);
680        handle.set_state(ServiceState::Ready);
681
682        let transitions = handle.transitions();
683        assert_eq!(transitions.len(), 3);
684        assert_eq!(transitions[0].state, ServiceState::Stopped);
685        assert_eq!(transitions[1].state, ServiceState::Starting);
686        assert_eq!(transitions[2].state, ServiceState::Ready);
687    }
688
689    #[test]
690    fn test_transitions_timestamps_monotonic() {
691        let handle = ServiceHandle::new("test");
692        std::thread::sleep(Duration::from_millis(5));
693        handle.set_state(ServiceState::Starting);
694        std::thread::sleep(Duration::from_millis(5));
695        handle.set_state(ServiceState::Ready);
696
697        let transitions = handle.transitions();
698        for window in transitions.windows(2) {
699            assert!(
700                window[1].elapsed >= window[0].elapsed,
701                "timestamps should be monotonically increasing"
702            );
703        }
704        // The last transition should have a non-zero elapsed
705        assert!(transitions[2].elapsed >= Duration::from_millis(10));
706    }
707
708    #[test]
709    fn test_transitions_cloned_handle_shares_log() {
710        let h1 = ServiceHandle::new("shared");
711        let h2 = h1.clone();
712
713        h1.set_state(ServiceState::Starting);
714        h2.set_state(ServiceState::Ready);
715
716        // Both should see all 3 transitions
717        assert_eq!(h1.transitions().len(), 3);
718        assert_eq!(h2.transitions().len(), 3);
719    }
720}