Skip to main content

fabryk_core/
service.rs

1//! Service lifecycle state management.
2//!
3//! Provides [`ServiceState`] and [`ServiceHandle`] for tracking the lifecycle
4//! of background services (index builders, search backends, etc.).
5//!
6//! # Usage
7//!
8//! ```rust
9//! use fabryk_core::service::{ServiceHandle, ServiceState};
10//!
11//! let handle = ServiceHandle::new("my-service");
12//! assert_eq!(handle.state(), ServiceState::Stopped);
13//!
14//! handle.set_state(ServiceState::Starting);
15//! handle.set_state(ServiceState::Ready);
16//! assert!(handle.state().is_ready());
17//! ```
18
19use std::fmt;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::watch;
23
24// ============================================================================
25// ServiceState
26// ============================================================================
27
28/// State of a service in its lifecycle.
29#[derive(Clone, Debug, PartialEq)]
30pub enum ServiceState {
31    /// Service has not been started.
32    Stopped,
33    /// Service is initializing (e.g., building index).
34    Starting,
35    /// Service is operational and accepting requests.
36    Ready,
37    /// Service is partially operational.
38    Degraded(String),
39    /// Service is shutting down.
40    Stopping,
41    /// Service failed to start or encountered a fatal error.
42    Failed(String),
43}
44
45impl ServiceState {
46    /// Returns `true` if the service is fully ready.
47    pub fn is_ready(&self) -> bool {
48        matches!(self, Self::Ready)
49    }
50
51    /// Returns `true` if the service can handle requests (Ready or Degraded).
52    pub fn is_available(&self) -> bool {
53        matches!(self, Self::Ready | Self::Degraded(_))
54    }
55
56    /// Returns `true` if the service is in a terminal state (Stopped or Failed).
57    pub fn is_terminal(&self) -> bool {
58        matches!(self, Self::Stopped | Self::Failed(_))
59    }
60}
61
62impl fmt::Display for ServiceState {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        match self {
65            Self::Stopped => write!(f, "stopped"),
66            Self::Starting => write!(f, "starting"),
67            Self::Ready => write!(f, "ready"),
68            Self::Degraded(reason) => write!(f, "degraded: {reason}"),
69            Self::Stopping => write!(f, "stopping"),
70            Self::Failed(reason) => write!(f, "failed: {reason}"),
71        }
72    }
73}
74
75// ============================================================================
76// ServiceHandle
77// ============================================================================
78
79/// Thread-safe handle for observing and updating service state.
80///
81/// Cheap to clone (Arc internals). State changes are broadcast
82/// to all subscribers via a watch channel.
83#[derive(Clone)]
84pub struct ServiceHandle {
85    inner: Arc<ServiceHandleInner>,
86}
87
88struct ServiceHandleInner {
89    name: String,
90    tx: watch::Sender<ServiceState>,
91    started_at: Instant,
92}
93
94impl ServiceHandle {
95    /// Create a new service handle with the given name.
96    ///
97    /// Initial state is [`ServiceState::Stopped`].
98    pub fn new(name: impl Into<String>) -> Self {
99        let (tx, _rx) = watch::channel(ServiceState::Stopped);
100        Self {
101            inner: Arc::new(ServiceHandleInner {
102                name: name.into(),
103                tx,
104                started_at: Instant::now(),
105            }),
106        }
107    }
108
109    /// Get the service name.
110    pub fn name(&self) -> &str {
111        &self.inner.name
112    }
113
114    /// Get the current service state.
115    pub fn state(&self) -> ServiceState {
116        self.inner.tx.borrow().clone()
117    }
118
119    /// Update the service state.
120    ///
121    /// All subscribers are notified of the change.
122    pub fn set_state(&self, state: ServiceState) {
123        log::info!("Service '{}' → {state}", self.inner.name);
124        self.inner.tx.send_replace(state);
125    }
126
127    /// Subscribe to state changes.
128    pub fn subscribe(&self) -> watch::Receiver<ServiceState> {
129        self.inner.tx.subscribe()
130    }
131
132    /// Wait until the service reaches Ready, Failed, or timeout.
133    pub async fn wait_ready(&self, timeout: Duration) -> Result<(), String> {
134        let mut rx = self.subscribe();
135        let deadline = tokio::time::sleep(timeout);
136        tokio::pin!(deadline);
137
138        // Check current state first
139        {
140            let state = rx.borrow_and_update().clone();
141            match state {
142                ServiceState::Ready => return Ok(()),
143                ServiceState::Failed(reason) => {
144                    return Err(format!("Service '{}' failed: {reason}", self.inner.name));
145                }
146                _ => {}
147            }
148        }
149
150        loop {
151            tokio::select! {
152                _ = &mut deadline => {
153                    return Err(format!(
154                        "Service '{}' not ready after {timeout:?} (state: {})",
155                        self.inner.name, self.state()
156                    ));
157                }
158                result = rx.changed() => {
159                    if result.is_err() {
160                        return Err(format!("Service '{}' channel closed", self.inner.name));
161                    }
162                    let state = rx.borrow().clone();
163                    match state {
164                        ServiceState::Ready => return Ok(()),
165                        ServiceState::Failed(reason) => {
166                            return Err(format!(
167                                "Service '{}' failed: {reason}",
168                                self.inner.name
169                            ));
170                        }
171                        _ => continue,
172                    }
173                }
174            }
175        }
176    }
177
178    /// Elapsed time since the handle was created.
179    pub fn elapsed(&self) -> Duration {
180        self.inner.started_at.elapsed()
181    }
182}
183
184impl fmt::Debug for ServiceHandle {
185    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186        f.debug_struct("ServiceHandle")
187            .field("name", &self.inner.name)
188            .field("state", &self.state())
189            .finish()
190    }
191}
192
193// ============================================================================
194// Tests
195// ============================================================================
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[test]
202    fn test_service_state_display() {
203        assert_eq!(ServiceState::Stopped.to_string(), "stopped");
204        assert_eq!(ServiceState::Starting.to_string(), "starting");
205        assert_eq!(ServiceState::Ready.to_string(), "ready");
206        assert_eq!(
207            ServiceState::Degraded("low memory".to_string()).to_string(),
208            "degraded: low memory"
209        );
210        assert_eq!(ServiceState::Stopping.to_string(), "stopping");
211        assert_eq!(
212            ServiceState::Failed("crash".to_string()).to_string(),
213            "failed: crash"
214        );
215    }
216
217    #[test]
218    fn test_service_state_predicates() {
219        assert!(ServiceState::Ready.is_ready());
220        assert!(!ServiceState::Starting.is_ready());
221        assert!(!ServiceState::Degraded("x".into()).is_ready());
222
223        assert!(ServiceState::Ready.is_available());
224        assert!(ServiceState::Degraded("x".into()).is_available());
225        assert!(!ServiceState::Starting.is_available());
226        assert!(!ServiceState::Stopped.is_available());
227
228        assert!(ServiceState::Stopped.is_terminal());
229        assert!(ServiceState::Failed("x".into()).is_terminal());
230        assert!(!ServiceState::Ready.is_terminal());
231        assert!(!ServiceState::Starting.is_terminal());
232    }
233
234    #[test]
235    fn test_service_handle_initial_state() {
236        let handle = ServiceHandle::new("test");
237        assert_eq!(handle.name(), "test");
238        assert_eq!(handle.state(), ServiceState::Stopped);
239    }
240
241    #[test]
242    fn test_service_handle_state_transitions() {
243        let handle = ServiceHandle::new("test");
244
245        handle.set_state(ServiceState::Starting);
246        assert_eq!(handle.state(), ServiceState::Starting);
247
248        handle.set_state(ServiceState::Ready);
249        assert_eq!(handle.state(), ServiceState::Ready);
250
251        handle.set_state(ServiceState::Stopping);
252        assert_eq!(handle.state(), ServiceState::Stopping);
253
254        handle.set_state(ServiceState::Stopped);
255        assert_eq!(handle.state(), ServiceState::Stopped);
256    }
257
258    #[test]
259    fn test_service_handle_clone_shares_state() {
260        let handle1 = ServiceHandle::new("shared");
261        let handle2 = handle1.clone();
262
263        handle1.set_state(ServiceState::Ready);
264        assert_eq!(handle2.state(), ServiceState::Ready);
265
266        handle2.set_state(ServiceState::Stopping);
267        assert_eq!(handle1.state(), ServiceState::Stopping);
268    }
269
270    #[test]
271    fn test_service_handle_subscribe() {
272        let handle = ServiceHandle::new("test");
273        let mut rx = handle.subscribe();
274
275        // Initial value
276        assert_eq!(*rx.borrow(), ServiceState::Stopped);
277
278        handle.set_state(ServiceState::Starting);
279        // Note: watch::Receiver sees latest value on next borrow
280        assert_eq!(*rx.borrow_and_update(), ServiceState::Starting);
281    }
282
283    #[tokio::test]
284    async fn test_service_handle_wait_ready_success() {
285        let handle = ServiceHandle::new("test");
286        let h = handle.clone();
287
288        tokio::spawn(async move {
289            tokio::time::sleep(Duration::from_millis(10)).await;
290            h.set_state(ServiceState::Starting);
291            tokio::time::sleep(Duration::from_millis(10)).await;
292            h.set_state(ServiceState::Ready);
293        });
294
295        let result = handle.wait_ready(Duration::from_secs(1)).await;
296        assert!(result.is_ok());
297    }
298
299    #[tokio::test]
300    async fn test_service_handle_wait_ready_timeout() {
301        let handle = ServiceHandle::new("slow");
302        handle.set_state(ServiceState::Starting);
303
304        let result = handle.wait_ready(Duration::from_millis(50)).await;
305        assert!(result.is_err());
306        assert!(result.unwrap_err().contains("not ready after"));
307    }
308
309    #[tokio::test]
310    async fn test_service_handle_wait_ready_failed() {
311        let handle = ServiceHandle::new("broken");
312        let h = handle.clone();
313
314        tokio::spawn(async move {
315            tokio::time::sleep(Duration::from_millis(10)).await;
316            h.set_state(ServiceState::Failed("out of memory".to_string()));
317        });
318
319        let result = handle.wait_ready(Duration::from_secs(1)).await;
320        assert!(result.is_err());
321        let err = result.unwrap_err();
322        assert!(err.contains("failed"));
323        assert!(err.contains("out of memory"));
324    }
325
326    #[tokio::test]
327    async fn test_service_handle_wait_ready_already_ready() {
328        let handle = ServiceHandle::new("instant");
329        handle.set_state(ServiceState::Ready);
330
331        let result = handle.wait_ready(Duration::from_millis(50)).await;
332        assert!(result.is_ok());
333    }
334
335    #[tokio::test]
336    async fn test_service_handle_wait_ready_already_failed() {
337        let handle = ServiceHandle::new("instant-fail");
338        handle.set_state(ServiceState::Failed("boom".to_string()));
339
340        let result = handle.wait_ready(Duration::from_millis(50)).await;
341        assert!(result.is_err());
342        assert!(result.unwrap_err().contains("boom"));
343    }
344
345    #[test]
346    fn test_service_handle_elapsed() {
347        let handle = ServiceHandle::new("test");
348        std::thread::sleep(Duration::from_millis(10));
349        assert!(handle.elapsed() >= Duration::from_millis(10));
350    }
351
352    #[test]
353    fn test_service_handle_debug() {
354        let handle = ServiceHandle::new("debug-test");
355        let debug = format!("{:?}", handle);
356        assert!(debug.contains("debug-test"));
357        assert!(debug.contains("ServiceHandle"));
358    }
359
360    // Compile-time check: ServiceHandle must be Send + Sync
361    fn _assert_send_sync<T: Send + Sync>() {}
362    #[test]
363    fn test_service_handle_send_sync() {
364        _assert_send_sync::<ServiceHandle>();
365        _assert_send_sync::<ServiceState>();
366    }
367}