Skip to main content

modkit/
lifecycle.rs

1use async_trait::async_trait;
2use parking_lot::Mutex;
3use std::sync::{
4    Arc,
5    atomic::{AtomicBool, AtomicU8, Ordering},
6};
7use std::time::Duration;
8use tokio::sync::{Notify, oneshot};
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11
12// ----- Results & aliases -----------------------------------------------------
13
14/// Public result for lifecycle-level operations.
15type LcResult<T = ()> = std::result::Result<T, LifecycleError>;
16
17/// Result returned by user/background tasks.
18type TaskResult<T = ()> = anyhow::Result<T>;
19
20/// Type alias for ready function pointer to reduce complexity.
21type ReadyFn<T> = fn(
22    Arc<T>,
23    CancellationToken,
24    ReadySignal,
25)
26    -> std::pin::Pin<Box<dyn std::future::Future<Output = TaskResult<()>> + Send>>;
27
28// ----- Status model ----------------------------------------------------------
29
30/// Terminal/transition states for a background job.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32#[repr(u8)]
33pub enum Status {
34    Stopped,
35    Starting,
36    Running,
37    Stopping,
38}
39
40impl Status {
41    #[inline]
42    #[must_use]
43    pub const fn as_u8(self) -> u8 {
44        match self {
45            Status::Stopped => 0,
46            Status::Starting => 1,
47            Status::Running => 2,
48            Status::Stopping => 3,
49        }
50    }
51    #[inline]
52    #[must_use]
53    pub const fn from_u8(x: u8) -> Self {
54        match x {
55            1 => Status::Starting,
56            2 => Status::Running,
57            3 => Status::Stopping,
58            _ => Status::Stopped,
59        }
60    }
61}
62
63/// Reason why a task stopped.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum StopReason {
66    Finished,
67    Cancelled,
68    Timeout,
69}
70
71// ----- Ready signal ----------------------------------------------------------
72
73/// Ready signal used by `start_with_ready*` to flip Starting -> Running.
74pub struct ReadySignal(oneshot::Sender<()>);
75
76impl ReadySignal {
77    #[inline]
78    pub fn notify(self) {
79        _ = self.0.send(());
80    }
81    /// Construct a `ReadySignal` from a oneshot sender (used by macro-generated shims).
82    #[inline]
83    #[must_use]
84    pub fn from_sender(sender: tokio::sync::oneshot::Sender<()>) -> Self {
85        ReadySignal(sender)
86    }
87}
88
89// ----- Runnable --------------------------------------------------------------
90
91/// Trait for modules that can run a long-running task.
92/// Note: take `self` by `Arc` to make the spawned future `'static` and `Send`.
93#[async_trait]
94pub trait Runnable: Send + Sync + 'static {
95    /// Long-running loop. Must return when `cancel` is cancelled.
96    async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()>;
97}
98
99// ----- Errors ----------------------------------------------------------------
100
101/// Library-level error for lifecycle operations.
102#[derive(Debug, thiserror::Error)]
103pub enum LifecycleError {
104    #[error("already started")]
105    AlreadyStarted,
106}
107
108// ----- Lifecycle -------------------------------------------------------------
109
110/// Lifecycle controller for managing background tasks.
111///
112/// Concurrency notes:
113/// - State is tracked with atomics and `Notify`.
114/// - `handle` / `cancel` are protected by `Mutex`, and their locking scope is kept minimal.
115/// - All public start methods are thin wrappers around `start_core`.
116pub struct Lifecycle {
117    name: &'static str,
118    status: Arc<AtomicU8>,
119    handle: Mutex<Option<JoinHandle<()>>>,
120    cancel: Mutex<Option<CancellationToken>>,
121    /// `true` once the background task has fully finished.
122    finished: Arc<AtomicBool>,
123    /// Set to `true` when `stop()` requested cancellation.
124    was_cancelled: Arc<AtomicBool>,
125    /// Notifies all waiters when the task finishes.
126    finished_notify: Arc<Notify>,
127}
128
129impl Lifecycle {
130    #[must_use]
131    pub fn new_named(name: &'static str) -> Self {
132        Self {
133            name,
134            status: Arc::new(AtomicU8::new(Status::Stopped.as_u8())),
135            handle: Mutex::new(None),
136            cancel: Mutex::new(None),
137            finished: Arc::new(AtomicBool::new(false)),
138            was_cancelled: Arc::new(AtomicBool::new(false)),
139            finished_notify: Arc::new(Notify::new()),
140        }
141    }
142
143    #[must_use]
144    pub fn new() -> Self {
145        Self::new_named("lifecycle")
146    }
147
148    #[inline]
149    pub fn name(&self) -> &'static str {
150        self.name
151    }
152
153    // --- small helpers for atomics (keeps Ordering unified and code concise) ---
154
155    #[inline]
156    fn load_status(&self) -> Status {
157        Status::from_u8(self.status.load(Ordering::Acquire))
158    }
159
160    #[inline]
161    fn store_status(&self, s: Status) {
162        self.status.store(s.as_u8(), Ordering::Release);
163    }
164
165    // --- public start APIs delegate to start_core --------------------------------
166
167    /// Spawn the job using `make(cancel)`.
168    ///
169    /// The future is constructed inside the task to avoid leaving the lifecycle in `Starting`
170    /// if `make` panics.
171    ///
172    /// # Errors
173    /// Returns `LcError` if the lifecycle is not in a startable state.
174    #[tracing::instrument(skip(self, make), level = "debug")]
175    pub fn start<F, Fut>(&self, make: F) -> LcResult
176    where
177        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
178        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
179    {
180        self.start_core(CancellationToken::new(), move |tok, _| make(tok), false)
181    }
182
183    /// Spawn the job using a provided `CancellationToken` and `make(cancel)`.
184    ///
185    /// # Errors
186    /// Returns `LcError` if the lifecycle is not in a startable state.
187    #[tracing::instrument(skip(self, make, token), level = "debug")]
188    pub fn start_with_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
189    where
190        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
191        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
192    {
193        self.start_core(token, move |tok, _| make(tok), false)
194    }
195
196    /// Spawn the job using `make(cancel, ready)`. Status becomes `Running` only after `ready.notify()`.
197    ///
198    /// # Errors
199    /// Returns `LcError` if the lifecycle is not in a startable state.
200    #[tracing::instrument(skip(self, make), level = "debug")]
201    pub fn start_with_ready<F, Fut>(&self, make: F) -> LcResult
202    where
203        F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
204        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
205    {
206        self.start_core(
207            CancellationToken::new(),
208            move |tok, rdy| async move {
209                let Some(rdy) = rdy else {
210                    return Err(anyhow::anyhow!("ReadySignal must be present"));
211                };
212                make(tok, rdy).await
213            },
214            true,
215        )
216    }
217
218    /// Ready-aware start variant that uses a provided `CancellationToken`.
219    ///
220    /// # Errors
221    /// Returns `LcError` if the lifecycle is not in a startable state.
222    #[tracing::instrument(skip(self, make, token), level = "debug")]
223    pub fn start_with_ready_and_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
224    where
225        F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
226        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
227    {
228        self.start_core(
229            token,
230            move |tok, rdy| async move {
231                let Some(rdy) = rdy else {
232                    return Err(anyhow::anyhow!("ReadySignal must be present"));
233                };
234                make(tok, rdy).await
235            },
236            true,
237        )
238    }
239
240    /// Unified start core
241    ///
242    /// `ready_mode = true`   => we expect a `ReadySignal` to flip `Starting -> Running` (upon notify).
243    /// `ready_mode = false`  => we flip to `Running` immediately after spawn.
244    fn start_core<F, Fut>(&self, token: CancellationToken, make: F, ready_mode: bool) -> LcResult
245    where
246        F: Send + 'static + FnOnce(CancellationToken, Option<ReadySignal>) -> Fut,
247        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
248    {
249        // Stopped -> Starting (via CAS)
250        let cas_ok = self
251            .status
252            .compare_exchange(
253                Status::Stopped.as_u8(),
254                Status::Starting.as_u8(),
255                Ordering::AcqRel,
256                Ordering::Acquire,
257            )
258            .is_ok();
259        if !cas_ok {
260            return Err(LifecycleError::AlreadyStarted);
261        }
262
263        self.finished.store(false, Ordering::Release);
264        self.was_cancelled.store(false, Ordering::Release);
265
266        // store cancellation token (bounded lock scope)
267        {
268            let mut c = self.cancel.lock();
269            *c = Some(token.clone());
270        }
271
272        // In ready mode, we wait for `ready.notify()` to flip to Running.
273        // Otherwise, we mark Running immediately.
274        let (ready_tx, ready_rx) = oneshot::channel::<()>();
275        if ready_mode {
276            let status_on_ready = self.status.clone();
277            tokio::spawn(async move {
278                if ready_rx.await.is_ok() {
279                    _ = status_on_ready.compare_exchange(
280                        Status::Starting.as_u8(),
281                        Status::Running.as_u8(),
282                        Ordering::AcqRel,
283                        Ordering::Acquire,
284                    );
285                    tracing::debug!("lifecycle status -> running (ready)");
286                } else {
287                    // Sender dropped: task didn't signal readiness; we will remain in Starting
288                    // until finish. This is usually a bug or early-drop scenario.
289                    tracing::debug!("ready signal dropped; staying in Starting until finish");
290                }
291            });
292        } else {
293            self.store_status(Status::Running);
294            tracing::debug!("lifecycle status -> running");
295        }
296
297        let finished_flag = self.finished.clone();
298        let finished_notify = self.finished_notify.clone();
299        let status_on_finish = self.status.clone();
300
301        // Spawn the actual task with descriptive logging
302        let module_name = self.name;
303        let task_id = format!("{module_name}-{self:p}");
304        let handle = tokio::spawn(async move {
305            tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task starting");
306            let res = make(token, ready_mode.then(|| ReadySignal(ready_tx))).await;
307            if let Err(e) = res {
308                tracing::error!(error=%e, task_id=%task_id, module = %module_name, "lifecycle task error");
309            }
310            finished_flag.store(true, Ordering::Release);
311            finished_notify.notify_waiters();
312            status_on_finish.store(Status::Stopped.as_u8(), Ordering::Release);
313            tracing::debug!(task_id=%task_id, module = %module_name, "lifecycle task finished");
314        });
315
316        // store handle (bounded lock scope)
317        {
318            let mut h = self.handle.lock();
319            *h = Some(handle);
320        }
321
322        Ok(())
323    }
324
325    /// Request graceful shutdown and wait up to `timeout`.
326    ///
327    /// # Errors
328    /// Returns `LcError` if the stop operation fails.
329    #[tracing::instrument(skip(self, timeout), level = "debug")]
330    pub async fn stop(&self, timeout: Duration) -> LcResult<StopReason> {
331        let module_name = self.name;
332        let task_id = format!("{module_name}-{self:p}");
333        let st = self.load_status();
334        if !matches!(st, Status::Starting | Status::Running | Status::Stopping) {
335            // Not running => already finished.
336            return Ok(StopReason::Finished);
337        }
338
339        self.store_status(Status::Stopping);
340
341        // Request cancellation only once (idempotent if multiple callers race here).
342        if let Some(tok) = { self.cancel.lock().take() } {
343            self.was_cancelled.store(true, Ordering::Release);
344            tok.cancel();
345        }
346
347        // Waiter that works for all callers, even after the task already finished.
348        let finished_flag = self.finished.clone();
349        let notify = self.finished_notify.clone();
350        let finished_wait = async move {
351            if finished_flag.load(Ordering::Acquire) {
352                return;
353            }
354            notify.notified().await;
355        };
356
357        let reason = tokio::select! {
358            () = finished_wait => {
359                if self.was_cancelled.load(Ordering::Acquire) {
360                    StopReason::Cancelled
361                } else {
362                    StopReason::Finished
363                }
364            }
365            () = tokio::time::sleep(timeout) => StopReason::Timeout,
366        };
367
368        // Join and ensure we notify waiters even if the task was aborted/panicked.
369        let handle_opt = { self.handle.lock().take() };
370        if let Some(handle) = handle_opt {
371            if matches!(reason, StopReason::Timeout) && !handle.is_finished() {
372                tracing::warn!("lifecycle stop timed out; aborting task");
373                handle.abort();
374            }
375
376            match handle.await {
377                Ok(()) => {
378                    tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task completed successfully");
379                }
380                Err(e) if e.is_cancelled() => {
381                    tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task was cancelled/aborted");
382                }
383                Err(e) if e.is_panic() => {
384                    // Extract panic information if possible
385                    match e.try_into_panic() {
386                        Ok(panic_payload) => {
387                            let panic_msg = panic_payload
388                                .downcast_ref::<&str>()
389                                .copied()
390                                .map(str::to_owned)
391                                .or_else(|| panic_payload.downcast_ref::<String>().cloned())
392                                .unwrap_or_else(|| "unknown panic".to_owned());
393
394                            tracing::error!(
395                                task_id = %task_id,
396                                module = %module_name,
397                                panic_message = %panic_msg,
398                                "lifecycle task panicked - this indicates a serious bug"
399                            );
400                        }
401                        _ => {
402                            tracing::error!(
403                                task_id = %task_id,
404                                module = %module_name,
405                                "lifecycle task panicked (could not extract panic message)"
406                            );
407                        }
408                    }
409                }
410                Err(e) => {
411                    tracing::warn!(task_id = %task_id, module = %module_name, error = %e, "lifecycle task join error");
412                }
413            }
414
415            self.finished.store(true, Ordering::Release);
416            self.finished_notify.notify_waiters();
417        }
418
419        self.store_status(Status::Stopped);
420        tracing::info!(?reason, "lifecycle stopped");
421        Ok(reason)
422    }
423
424    /// Current status.
425    #[inline]
426    #[must_use]
427    pub fn status(&self) -> Status {
428        self.load_status()
429    }
430
431    /// Whether it is in `Starting` or `Running`.
432    #[inline]
433    pub fn is_running(&self) -> bool {
434        matches!(self.status(), Status::Starting | Status::Running)
435    }
436
437    /// Best-effort "try start" that swallows the error and returns bool.
438    #[inline]
439    #[must_use]
440    pub fn try_start<F, Fut>(&self, make: F) -> bool
441    where
442        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
443        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
444    {
445        self.start(make).is_ok()
446    }
447
448    /// Wait until the task is fully stopped.
449    pub async fn wait_stopped(&self) {
450        if self.finished.load(Ordering::Acquire) {
451            return;
452        }
453        self.finished_notify.notified().await;
454    }
455}
456
457impl Default for Lifecycle {
458    fn default() -> Self {
459        Self::new()
460    }
461}
462
463impl Drop for Lifecycle {
464    /// Best-effort cleanup to avoid orphaned background tasks if caller forgets to call `stop()`.
465    fn drop(&mut self) {
466        if let Some(tok) = self.cancel.get_mut().take() {
467            tok.cancel();
468        }
469        if let Some(handle) = self.handle.get_mut().take() {
470            handle.abort();
471        }
472    }
473}
474
475// ----- WithLifecycle wrapper -------------------------------------------------
476
477/// Wrapper that implements `StatefulModule` for any `T: Runnable`.
478#[must_use]
479pub struct WithLifecycle<T: Runnable> {
480    inner: Arc<T>,
481    lc: Arc<Lifecycle>,
482    pub(crate) stop_timeout: Duration,
483    // lifecycle start mode configuration
484    await_ready: bool,
485    has_ready_handler: bool,
486    run_ready_fn: Option<ReadyFn<T>>,
487}
488
489impl<T: Runnable> WithLifecycle<T> {
490    pub fn new(inner: T) -> Self {
491        Self {
492            inner: Arc::new(inner),
493            lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
494            stop_timeout: Duration::from_secs(30),
495            await_ready: false,
496            has_ready_handler: false,
497            run_ready_fn: None,
498        }
499    }
500
501    pub fn from_arc(inner: Arc<T>) -> Self {
502        Self {
503            inner,
504            lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
505            stop_timeout: Duration::from_secs(30),
506            await_ready: false,
507            has_ready_handler: false,
508            run_ready_fn: None,
509        }
510    }
511
512    pub fn new_with_name(inner: T, name: &'static str) -> Self {
513        Self {
514            inner: Arc::new(inner),
515            lc: Arc::new(Lifecycle::new_named(name)),
516            stop_timeout: Duration::from_secs(30),
517            await_ready: false,
518            has_ready_handler: false,
519            run_ready_fn: None,
520        }
521    }
522
523    pub fn from_arc_with_name(inner: Arc<T>, name: &'static str) -> Self {
524        Self {
525            inner,
526            lc: Arc::new(Lifecycle::new_named(name)),
527            stop_timeout: Duration::from_secs(30),
528            await_ready: false,
529            has_ready_handler: false,
530            run_ready_fn: None,
531        }
532    }
533
534    pub fn with_stop_timeout(mut self, d: Duration) -> Self {
535        self.stop_timeout = d;
536        self
537    }
538
539    #[inline]
540    #[must_use]
541    pub fn status(&self) -> Status {
542        self.lc.status()
543    }
544
545    #[inline]
546    #[must_use]
547    pub fn inner(&self) -> &T {
548        self.inner.as_ref()
549    }
550
551    /// Sometimes callers need to hold an `Arc` to the inner runnable.
552    #[inline]
553    #[must_use]
554    pub fn inner_arc(&self) -> Arc<T> {
555        self.inner.clone()
556    }
557
558    /// Configure readiness behavior produced by proc-macros (`#[modkit::module(..., lifecycle(...))]`).
559    pub fn with_ready_mode(
560        mut self,
561        await_ready: bool,
562        has_ready_handler: bool,
563        run_ready_fn: Option<ReadyFn<T>>,
564    ) -> Self {
565        self.await_ready = await_ready;
566        self.has_ready_handler = has_ready_handler;
567        self.run_ready_fn = run_ready_fn;
568        self
569    }
570}
571
572impl<T: Runnable + Default> Default for WithLifecycle<T> {
573    fn default() -> Self {
574        Self::new(T::default())
575    }
576}
577
578#[async_trait]
579impl<T: Runnable> crate::contracts::RunnableCapability for WithLifecycle<T> {
580    #[tracing::instrument(skip(self, external_cancel), level = "debug")]
581    async fn start(&self, external_cancel: CancellationToken) -> TaskResult<()> {
582        let inner = self.inner.clone();
583        let composed = external_cancel.child_token();
584
585        if !self.await_ready {
586            self.lc
587                .start_with_token(composed, move |cancel| inner.run(cancel))
588                .map_err(anyhow::Error::from)
589        } else if self.has_ready_handler {
590            let f = self.run_ready_fn.ok_or_else(|| {
591                anyhow::anyhow!("run_ready_fn must be set when has_ready_handler")
592            })?;
593            self.lc
594                .start_with_ready_and_token(composed, move |cancel, ready| f(inner, cancel, ready))
595                .map_err(anyhow::Error::from)
596        } else {
597            self.lc
598                .start_with_ready_and_token(composed, move |cancel, ready| async move {
599                    // Auto-notify readiness and continue with normal run()
600                    ready.notify();
601                    inner.run(cancel).await
602                })
603                .map_err(anyhow::Error::from)
604        }
605    }
606
607    #[tracing::instrument(skip(self, external_cancel), level = "debug")]
608    async fn stop(&self, external_cancel: CancellationToken) -> TaskResult<()> {
609        tokio::select! {
610            res = self.lc.stop(self.stop_timeout) => {
611                _ = res.map_err(anyhow::Error::from)?;
612                Ok(())
613            }
614            () = external_cancel.cancelled() => {
615                _ = self.lc.stop(Duration::from_millis(0)).await?;
616                Ok(())
617            }
618        }
619    }
620}
621
622impl<T: Runnable> Drop for WithLifecycle<T> {
623    /// Best-effort, but only if we're the last owner of `lc` to avoid aborting someone else's task.
624    fn drop(&mut self) {
625        if Arc::strong_count(&self.lc) == 1 {
626            if let Some(tok) = self.lc.cancel.lock().as_ref() {
627                tok.cancel();
628            }
629            if let Some(handle) = self.lc.handle.lock().as_ref() {
630                handle.abort();
631            }
632        }
633    }
634}
635
636// ----- Tests -----------------------------------------------------------------
637
638#[cfg(test)]
639#[cfg_attr(coverage_nightly, coverage(off))]
640mod tests {
641    use super::*;
642    use std::sync::atomic::{AtomicU32, Ordering as AOrd};
643    use tokio::time::{Duration, sleep};
644
645    struct TestRunnable {
646        counter: AtomicU32,
647    }
648
649    impl TestRunnable {
650        fn new() -> Self {
651            Self {
652                counter: AtomicU32::new(0),
653            }
654        }
655        fn count(&self) -> u32 {
656            self.counter.load(AOrd::Relaxed)
657        }
658    }
659
660    #[async_trait::async_trait]
661    impl Runnable for TestRunnable {
662        async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()> {
663            let mut interval = tokio::time::interval(Duration::from_millis(10));
664            loop {
665                tokio::select! {
666                    _ = interval.tick() => { self.counter.fetch_add(1, AOrd::Relaxed); }
667                    () = cancel.cancelled() => break,
668                }
669            }
670            Ok(())
671        }
672    }
673
674    #[tokio::test]
675    async fn lifecycle_basic() {
676        let lc = Arc::new(Lifecycle::new());
677        assert_eq!(lc.status(), Status::Stopped);
678
679        let result = lc.start(|cancel| async move {
680            cancel.cancelled().await;
681            Ok(())
682        });
683        assert!(result.is_ok());
684
685        let stop_result = lc.stop(Duration::from_millis(100)).await;
686        assert!(stop_result.is_ok());
687        assert_eq!(lc.status(), Status::Stopped);
688    }
689
690    #[tokio::test]
691    async fn with_lifecycle_wrapper_basics() {
692        let runnable = TestRunnable::new();
693        let wrapper = WithLifecycle::new(runnable);
694
695        assert_eq!(wrapper.status(), Status::Stopped);
696        assert_eq!(wrapper.inner().count(), 0);
697
698        let wrapper = wrapper.with_stop_timeout(Duration::from_secs(60));
699        assert_eq!(wrapper.stop_timeout.as_secs(), 60);
700    }
701
702    #[tokio::test]
703    async fn start_sets_running_immediately() {
704        let lc = Lifecycle::new();
705        lc.start(|cancel| async move {
706            cancel.cancelled().await;
707            Ok(())
708        })
709        .unwrap();
710
711        let s = lc.status();
712        assert!(matches!(s, Status::Running | Status::Starting));
713
714        let _ = lc.stop(Duration::from_millis(50)).await.unwrap();
715        assert_eq!(lc.status(), Status::Stopped);
716    }
717
718    #[tokio::test]
719    async fn start_with_ready_transitions_and_stop() {
720        let lc = Lifecycle::new();
721
722        let (ready_tx, ready_rx) = oneshot::channel::<()>();
723        lc.start_with_ready(move |cancel, ready| async move {
724            _ = ready_rx.await;
725            ready.notify();
726            cancel.cancelled().await;
727            Ok(())
728        })
729        .unwrap();
730
731        assert_eq!(lc.status(), Status::Starting);
732
733        _ = ready_tx.send(());
734        sleep(Duration::from_millis(10)).await;
735        assert_eq!(lc.status(), Status::Running);
736
737        let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
738        assert!(matches!(
739            reason,
740            StopReason::Cancelled | StopReason::Finished
741        ));
742        assert_eq!(lc.status(), Status::Stopped);
743    }
744
745    #[tokio::test]
746    async fn stop_while_starting_before_ready() {
747        let lc = Lifecycle::new();
748
749        lc.start_with_ready(move |cancel, _ready| async move {
750            cancel.cancelled().await;
751            Ok(())
752        })
753        .unwrap();
754
755        assert_eq!(lc.status(), Status::Starting);
756
757        let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
758        assert!(matches!(
759            reason,
760            StopReason::Cancelled | StopReason::Finished
761        ));
762        assert_eq!(lc.status(), Status::Stopped);
763    }
764
765    #[tokio::test]
766    async fn timeout_path_aborts_and_notifies() {
767        let lc = Lifecycle::new();
768
769        lc.start(|_cancel| async move {
770            loop {
771                sleep(Duration::from_secs(1000)).await;
772            }
773            #[allow(unreachable_code)]
774            Ok::<_, anyhow::Error>(())
775        })
776        .unwrap();
777
778        let reason = lc.stop(Duration::from_millis(30)).await.unwrap();
779        assert_eq!(reason, StopReason::Timeout);
780        assert_eq!(lc.status(), Status::Stopped);
781    }
782
783    #[tokio::test]
784    async fn try_start_and_second_start_fails() {
785        let lc = Lifecycle::new();
786
787        assert!(lc.try_start(|cancel| async move {
788            cancel.cancelled().await;
789            Ok(())
790        }));
791
792        let err = lc.start(|_c| async { Ok(()) }).unwrap_err();
793        match err {
794            LifecycleError::AlreadyStarted => {}
795        }
796
797        let _ = lc.stop(Duration::from_millis(80)).await.unwrap();
798        assert_eq!(lc.status(), Status::Stopped);
799    }
800
801    #[tokio::test]
802    async fn stop_is_idempotent_and_safe_concurrent() {
803        let lc = Arc::new(Lifecycle::new());
804
805        lc.start(|cancel| async move {
806            cancel.cancelled().await;
807            Ok(())
808        })
809        .unwrap();
810
811        let a = lc.clone();
812        let b = lc.clone();
813        let (r1, r2) = tokio::join!(
814            async move { a.stop(Duration::from_millis(80)).await },
815            async move { b.stop(Duration::from_millis(80)).await },
816        );
817
818        let r1 = r1.unwrap();
819        let r2 = r2.unwrap();
820        assert!(matches!(
821            r1,
822            StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
823        ));
824        assert!(matches!(
825            r2,
826            StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
827        ));
828        assert_eq!(lc.status(), Status::Stopped);
829    }
830
831    #[tokio::test]
832    async fn stateful_wrapper_start_stop_roundtrip() {
833        use crate::contracts::RunnableCapability;
834
835        let wrapper = WithLifecycle::new(TestRunnable::new());
836        assert_eq!(wrapper.status(), Status::Stopped);
837
838        wrapper.start(CancellationToken::new()).await.unwrap();
839        assert!(wrapper.lc.is_running());
840
841        wrapper.stop(CancellationToken::new()).await.unwrap();
842        assert_eq!(wrapper.status(), Status::Stopped);
843    }
844
845    #[tokio::test]
846    async fn with_lifecycle_double_start_fails() {
847        use crate::contracts::RunnableCapability;
848
849        let wrapper = WithLifecycle::new(TestRunnable::new());
850        let cancel = CancellationToken::new();
851        wrapper.start(cancel.clone()).await.unwrap();
852        let err = wrapper.start(cancel).await;
853        assert!(err.is_err());
854        wrapper.stop(CancellationToken::new()).await.unwrap();
855    }
856
857    #[tokio::test]
858    async fn with_lifecycle_concurrent_stop_calls() {
859        use crate::contracts::RunnableCapability;
860        let wrapper = Arc::new(WithLifecycle::new(TestRunnable::new()));
861        wrapper.start(CancellationToken::new()).await.unwrap();
862        let a = wrapper.clone();
863        let b = wrapper.clone();
864        let (r1, r2) = tokio::join!(
865            async move { a.stop(CancellationToken::new()).await },
866            async move { b.stop(CancellationToken::new()).await },
867        );
868        assert!(r1.is_ok());
869        assert!(r2.is_ok());
870        assert_eq!(wrapper.status(), Status::Stopped);
871    }
872
873    #[tokio::test]
874    async fn lifecycle_handles_panics_properly() {
875        let lc = Lifecycle::new();
876
877        // Start a task that will panic
878        lc.start(|_cancel| async {
879            panic!("test panic message");
880        })
881        .unwrap();
882
883        // Give the task a moment to start and panic
884        tokio::time::sleep(Duration::from_millis(50)).await;
885
886        // Stop should handle the panic gracefully
887        let reason = lc.stop(Duration::from_millis(1000)).await.unwrap();
888
889        // The task panicked, but stop should complete successfully
890        // The exact reason depends on timing, but it should not hang or fail
891        assert!(matches!(
892            reason,
893            StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
894        ));
895        assert_eq!(lc.status(), Status::Stopped);
896    }
897
898    #[tokio::test]
899    async fn lifecycle_task_naming_and_logging() {
900        let lc = Lifecycle::new();
901
902        // Start a simple task
903        lc.start(|cancel| async move {
904            cancel.cancelled().await;
905            Ok(())
906        })
907        .unwrap();
908
909        // Verify task is running
910        assert!(lc.is_running());
911
912        // Stop and verify proper cleanup
913        let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
914        assert!(matches!(
915            reason,
916            StopReason::Cancelled | StopReason::Finished
917        ));
918        assert_eq!(lc.status(), Status::Stopped);
919    }
920
921    #[tokio::test]
922    async fn lifecycle_join_handles_all_tasks() {
923        let lc = Arc::new(Lifecycle::new());
924
925        // Start multiple tasks in sequence (lifecycle only supports one at a time)
926        lc.start(|cancel| async move {
927            tokio::time::sleep(Duration::from_millis(10)).await;
928            cancel.cancelled().await;
929            Ok(())
930        })
931        .unwrap();
932
933        // Stop should wait for the task to complete
934        let start = std::time::Instant::now();
935        let reason = lc.stop(Duration::from_millis(200)).await.unwrap();
936        let elapsed = start.elapsed();
937
938        // Should have waited at least 10ms for the task
939        assert!(elapsed >= Duration::from_millis(10));
940        assert!(matches!(
941            reason,
942            StopReason::Cancelled | StopReason::Finished
943        ));
944        assert_eq!(lc.status(), Status::Stopped);
945    }
946}