Skip to main content

modkit/
lifecycle.rs

1use async_trait::async_trait;
2use parking_lot::Mutex;
3use std::sync::{
4    Arc,
5    atomic::{AtomicBool, AtomicU8, Ordering},
6};
7use std::time::Duration;
8use tokio::sync::{Notify, oneshot};
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11
12// ----- Results & aliases -----------------------------------------------------
13
14/// Public result for lifecycle-level operations.
15type LcResult<T = ()> = std::result::Result<T, LifecycleError>;
16
17/// Result returned by user/background tasks.
18type TaskResult<T = ()> = anyhow::Result<T>;
19
20/// Type alias for ready function pointer to reduce complexity.
21type ReadyFn<T> = fn(
22    Arc<T>,
23    CancellationToken,
24    ReadySignal,
25)
26    -> std::pin::Pin<Box<dyn std::future::Future<Output = TaskResult<()>> + Send>>;
27
28// ----- Status model ----------------------------------------------------------
29
30/// Terminal/transition states for a background job.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32#[repr(u8)]
33pub enum Status {
34    Stopped,
35    Starting,
36    Running,
37    Stopping,
38}
39
40impl Status {
41    #[inline]
42    #[must_use]
43    pub const fn as_u8(self) -> u8 {
44        match self {
45            Status::Stopped => 0,
46            Status::Starting => 1,
47            Status::Running => 2,
48            Status::Stopping => 3,
49        }
50    }
51    #[inline]
52    #[must_use]
53    pub const fn from_u8(x: u8) -> Self {
54        match x {
55            1 => Status::Starting,
56            2 => Status::Running,
57            3 => Status::Stopping,
58            _ => Status::Stopped,
59        }
60    }
61}
62
63/// Reason why a task stopped.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum StopReason {
66    Finished,
67    Cancelled,
68    Timeout,
69}
70
71// ----- Ready signal ----------------------------------------------------------
72
73/// Ready signal used by `start_with_ready*` to flip Starting -> Running.
74pub struct ReadySignal(oneshot::Sender<()>);
75
76impl ReadySignal {
77    #[inline]
78    pub fn notify(self) {
79        if self.0.send(()).is_err() {
80            tracing::debug!("ReadySignal::notify: receiver already dropped");
81        }
82    }
83    /// Construct a `ReadySignal` from a oneshot sender (used by macro-generated shims).
84    #[inline]
85    #[must_use]
86    pub fn from_sender(sender: tokio::sync::oneshot::Sender<()>) -> Self {
87        ReadySignal(sender)
88    }
89}
90
91// ----- Runnable --------------------------------------------------------------
92
93/// Trait for modules that can run a long-running task.
94/// Note: take `self` by `Arc` to make the spawned future `'static` and `Send`.
95#[async_trait]
96pub trait Runnable: Send + Sync + 'static {
97    /// Long-running loop. Must return when `cancel` is cancelled.
98    async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()>;
99}
100
101// ----- Errors ----------------------------------------------------------------
102
103/// Library-level error for lifecycle operations.
104#[derive(Debug, thiserror::Error)]
105pub enum LifecycleError {
106    #[error("already started")]
107    AlreadyStarted,
108}
109
110// ----- Lifecycle -------------------------------------------------------------
111
112/// Lifecycle controller for managing background tasks.
113///
114/// Concurrency notes:
115/// - State is tracked with atomics and `Notify`.
116/// - `handle` / `cancel` are protected by `Mutex`, and their locking scope is kept minimal.
117/// - All public start methods are thin wrappers around `start_core`.
118pub struct Lifecycle {
119    name: &'static str,
120    status: Arc<AtomicU8>,
121    handle: Mutex<Option<JoinHandle<()>>>,
122    cancel: Mutex<Option<CancellationToken>>,
123    /// `true` once the background task has fully finished.
124    finished: Arc<AtomicBool>,
125    /// Set to `true` when `stop()` requested cancellation.
126    was_cancelled: Arc<AtomicBool>,
127    /// Notifies all waiters when the task finishes.
128    finished_notify: Arc<Notify>,
129}
130
131impl Lifecycle {
132    #[must_use]
133    pub fn new_named(name: &'static str) -> Self {
134        Self {
135            name,
136            status: Arc::new(AtomicU8::new(Status::Stopped.as_u8())),
137            handle: Mutex::new(None),
138            cancel: Mutex::new(None),
139            finished: Arc::new(AtomicBool::new(false)),
140            was_cancelled: Arc::new(AtomicBool::new(false)),
141            finished_notify: Arc::new(Notify::new()),
142        }
143    }
144
145    #[must_use]
146    pub fn new() -> Self {
147        Self::new_named("lifecycle")
148    }
149
150    #[inline]
151    pub fn name(&self) -> &'static str {
152        self.name
153    }
154
155    // --- small helpers for atomics (keeps Ordering unified and code concise) ---
156
157    #[inline]
158    fn load_status(&self) -> Status {
159        Status::from_u8(self.status.load(Ordering::Acquire))
160    }
161
162    #[inline]
163    fn store_status(&self, s: Status) {
164        self.status.store(s.as_u8(), Ordering::Release);
165    }
166
167    // --- public start APIs delegate to start_core --------------------------------
168
169    /// Spawn the job using `make(cancel)`.
170    ///
171    /// The future is constructed inside the task to avoid leaving the lifecycle in `Starting`
172    /// if `make` panics.
173    ///
174    /// # Errors
175    /// Returns `LcError` if the lifecycle is not in a startable state.
176    #[tracing::instrument(skip(self, make), level = "debug")]
177    pub fn start<F, Fut>(&self, make: F) -> LcResult
178    where
179        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
180        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
181    {
182        self.start_core(CancellationToken::new(), move |tok, _| make(tok), false)
183    }
184
185    /// Spawn the job using a provided `CancellationToken` and `make(cancel)`.
186    ///
187    /// # Errors
188    /// Returns `LcError` if the lifecycle is not in a startable state.
189    #[tracing::instrument(skip(self, make, token), level = "debug")]
190    pub fn start_with_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
191    where
192        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
193        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
194    {
195        self.start_core(token, move |tok, _| make(tok), false)
196    }
197
198    /// Spawn the job using `make(cancel, ready)`. Status becomes `Running` only after `ready.notify()`.
199    ///
200    /// # Errors
201    /// Returns `LcError` if the lifecycle is not in a startable state.
202    #[tracing::instrument(skip(self, make), level = "debug")]
203    pub fn start_with_ready<F, Fut>(&self, make: F) -> LcResult
204    where
205        F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
206        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
207    {
208        self.start_core(
209            CancellationToken::new(),
210            move |tok, rdy| async move {
211                let Some(rdy) = rdy else {
212                    return Err(anyhow::anyhow!("ReadySignal must be present"));
213                };
214                make(tok, rdy).await
215            },
216            true,
217        )
218    }
219
220    /// Ready-aware start variant that uses a provided `CancellationToken`.
221    ///
222    /// # Errors
223    /// Returns `LcError` if the lifecycle is not in a startable state.
224    #[tracing::instrument(skip(self, make, token), level = "debug")]
225    pub fn start_with_ready_and_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
226    where
227        F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
228        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
229    {
230        self.start_core(
231            token,
232            move |tok, rdy| async move {
233                let Some(rdy) = rdy else {
234                    return Err(anyhow::anyhow!("ReadySignal must be present"));
235                };
236                make(tok, rdy).await
237            },
238            true,
239        )
240    }
241
242    /// Unified start core
243    ///
244    /// `ready_mode = true`   => we expect a `ReadySignal` to flip `Starting -> Running` (upon notify).
245    /// `ready_mode = false`  => we flip to `Running` immediately after spawn.
246    fn start_core<F, Fut>(&self, token: CancellationToken, make: F, ready_mode: bool) -> LcResult
247    where
248        F: Send + 'static + FnOnce(CancellationToken, Option<ReadySignal>) -> Fut,
249        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
250    {
251        // Stopped -> Starting (via CAS)
252        let cas_ok = self
253            .status
254            .compare_exchange(
255                Status::Stopped.as_u8(),
256                Status::Starting.as_u8(),
257                Ordering::AcqRel,
258                Ordering::Acquire,
259            )
260            .is_ok();
261        if !cas_ok {
262            return Err(LifecycleError::AlreadyStarted);
263        }
264
265        self.finished.store(false, Ordering::Release);
266        self.was_cancelled.store(false, Ordering::Release);
267
268        // store cancellation token (bounded lock scope)
269        {
270            let mut c = self.cancel.lock();
271            *c = Some(token.clone());
272        }
273
274        // In ready mode, we wait for `ready.notify()` to flip to Running.
275        // Otherwise, we mark Running immediately.
276        let (ready_tx, ready_rx) = oneshot::channel::<()>();
277        if ready_mode {
278            let status_on_ready = self.status.clone();
279            tokio::spawn(async move {
280                if ready_rx.await.is_ok() {
281                    _ = status_on_ready.compare_exchange(
282                        Status::Starting.as_u8(),
283                        Status::Running.as_u8(),
284                        Ordering::AcqRel,
285                        Ordering::Acquire,
286                    );
287                    tracing::debug!("lifecycle status -> running (ready)");
288                } else {
289                    // Sender dropped: task didn't signal readiness; we will remain in Starting
290                    // until finish. This is usually a bug or early-drop scenario.
291                    tracing::debug!("ready signal dropped; staying in Starting until finish");
292                }
293            });
294        } else {
295            self.store_status(Status::Running);
296            tracing::debug!("lifecycle status -> running");
297        }
298
299        let finished_flag = self.finished.clone();
300        let finished_notify = self.finished_notify.clone();
301        let status_on_finish = self.status.clone();
302
303        // Spawn the actual task with descriptive logging
304        let module_name = self.name;
305        let task_id = format!("{module_name}-{self:p}");
306        let handle = tokio::spawn(async move {
307            tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task starting");
308            let res = make(token, ready_mode.then(|| ReadySignal(ready_tx))).await;
309            if let Err(e) = res {
310                tracing::error!(error=%e, task_id=%task_id, module = %module_name, "lifecycle task error");
311            }
312            finished_flag.store(true, Ordering::Release);
313            finished_notify.notify_waiters();
314            status_on_finish.store(Status::Stopped.as_u8(), Ordering::Release);
315            tracing::debug!(task_id=%task_id, module = %module_name, "lifecycle task finished");
316        });
317
318        // store handle (bounded lock scope)
319        {
320            let mut h = self.handle.lock();
321            *h = Some(handle);
322        }
323
324        Ok(())
325    }
326
327    /// Request graceful shutdown and wait up to `timeout`.
328    ///
329    /// # Errors
330    /// Returns `LcError` if the stop operation fails.
331    #[tracing::instrument(skip(self, timeout), level = "debug")]
332    pub async fn stop(&self, timeout: Duration) -> LcResult<StopReason> {
333        let module_name = self.name;
334        let task_id = format!("{module_name}-{self:p}");
335        let st = self.load_status();
336        if !matches!(st, Status::Starting | Status::Running | Status::Stopping) {
337            // Not running => already finished.
338            return Ok(StopReason::Finished);
339        }
340
341        self.store_status(Status::Stopping);
342
343        // Request cancellation only once (idempotent if multiple callers race here).
344        if let Some(tok) = { self.cancel.lock().take() } {
345            self.was_cancelled.store(true, Ordering::Release);
346            tok.cancel();
347        }
348
349        // Waiter that works for all callers, even after the task already finished.
350        let finished_flag = self.finished.clone();
351        let notify = self.finished_notify.clone();
352        let finished_wait = async move {
353            if finished_flag.load(Ordering::Acquire) {
354                return;
355            }
356            notify.notified().await;
357        };
358
359        let reason = tokio::select! {
360            () = finished_wait => {
361                if self.was_cancelled.load(Ordering::Acquire) {
362                    StopReason::Cancelled
363                } else {
364                    StopReason::Finished
365                }
366            }
367            () = tokio::time::sleep(timeout) => StopReason::Timeout,
368        };
369
370        // Join and ensure we notify waiters even if the task was aborted/panicked.
371        let handle_opt = { self.handle.lock().take() };
372        if let Some(handle) = handle_opt {
373            if matches!(reason, StopReason::Timeout) && !handle.is_finished() {
374                tracing::warn!("lifecycle stop timed out; aborting task");
375                handle.abort();
376            }
377
378            match handle.await {
379                Ok(()) => {
380                    tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task completed successfully");
381                }
382                Err(e) if e.is_cancelled() => {
383                    tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task was cancelled/aborted");
384                }
385                Err(e) if e.is_panic() => {
386                    // Extract panic information if possible
387                    match e.try_into_panic() {
388                        Ok(panic_payload) => {
389                            let panic_msg = panic_payload
390                                .downcast_ref::<&str>()
391                                .copied()
392                                .map(str::to_owned)
393                                .or_else(|| panic_payload.downcast_ref::<String>().cloned())
394                                .unwrap_or_else(|| "unknown panic".to_owned());
395
396                            tracing::error!(
397                                task_id = %task_id,
398                                module = %module_name,
399                                panic_message = %panic_msg,
400                                "lifecycle task panicked - this indicates a serious bug"
401                            );
402                        }
403                        _ => {
404                            tracing::error!(
405                                task_id = %task_id,
406                                module = %module_name,
407                                "lifecycle task panicked (could not extract panic message)"
408                            );
409                        }
410                    }
411                }
412                Err(e) => {
413                    tracing::warn!(task_id = %task_id, module = %module_name, error = %e, "lifecycle task join error");
414                }
415            }
416
417            self.finished.store(true, Ordering::Release);
418            self.finished_notify.notify_waiters();
419        }
420
421        self.store_status(Status::Stopped);
422        tracing::info!(?reason, "lifecycle stopped");
423        Ok(reason)
424    }
425
426    /// Current status.
427    #[inline]
428    #[must_use]
429    pub fn status(&self) -> Status {
430        self.load_status()
431    }
432
433    /// Whether it is in `Starting` or `Running`.
434    #[inline]
435    pub fn is_running(&self) -> bool {
436        matches!(self.status(), Status::Starting | Status::Running)
437    }
438
439    /// Best-effort "try start" that swallows the error and returns bool.
440    #[inline]
441    #[must_use]
442    pub fn try_start<F, Fut>(&self, make: F) -> bool
443    where
444        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
445        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
446    {
447        self.start(make).is_ok()
448    }
449
450    /// Wait until the task is fully stopped.
451    pub async fn wait_stopped(&self) {
452        if self.finished.load(Ordering::Acquire) {
453            return;
454        }
455        self.finished_notify.notified().await;
456    }
457}
458
459impl Default for Lifecycle {
460    fn default() -> Self {
461        Self::new()
462    }
463}
464
465impl Drop for Lifecycle {
466    /// Best-effort cleanup to avoid orphaned background tasks if caller forgets to call `stop()`.
467    fn drop(&mut self) {
468        if let Some(tok) = self.cancel.get_mut().take() {
469            tok.cancel();
470        }
471        if let Some(handle) = self.handle.get_mut().take() {
472            handle.abort();
473        }
474    }
475}
476
477// ----- WithLifecycle wrapper -------------------------------------------------
478
479/// Wrapper that implements `StatefulModule` for any `T: Runnable`.
480#[must_use]
481pub struct WithLifecycle<T: Runnable> {
482    inner: Arc<T>,
483    lc: Arc<Lifecycle>,
484    pub(crate) stop_timeout: Duration,
485    // lifecycle start mode configuration
486    await_ready: bool,
487    has_ready_handler: bool,
488    run_ready_fn: Option<ReadyFn<T>>,
489}
490
491impl<T: Runnable> WithLifecycle<T> {
492    pub fn new(inner: T) -> Self {
493        Self {
494            inner: Arc::new(inner),
495            lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
496            stop_timeout: Duration::from_secs(30),
497            await_ready: false,
498            has_ready_handler: false,
499            run_ready_fn: None,
500        }
501    }
502
503    pub fn from_arc(inner: Arc<T>) -> Self {
504        Self {
505            inner,
506            lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
507            stop_timeout: Duration::from_secs(30),
508            await_ready: false,
509            has_ready_handler: false,
510            run_ready_fn: None,
511        }
512    }
513
514    pub fn new_with_name(inner: T, name: &'static str) -> Self {
515        Self {
516            inner: Arc::new(inner),
517            lc: Arc::new(Lifecycle::new_named(name)),
518            stop_timeout: Duration::from_secs(30),
519            await_ready: false,
520            has_ready_handler: false,
521            run_ready_fn: None,
522        }
523    }
524
525    pub fn from_arc_with_name(inner: Arc<T>, name: &'static str) -> Self {
526        Self {
527            inner,
528            lc: Arc::new(Lifecycle::new_named(name)),
529            stop_timeout: Duration::from_secs(30),
530            await_ready: false,
531            has_ready_handler: false,
532            run_ready_fn: None,
533        }
534    }
535
536    /// Set a custom stop timeout for graceful lifecycle shutdown.
537    ///
538    /// This is how long `Lifecycle::stop()` will wait for the task to finish
539    /// before aborting it.
540    ///
541    /// # Relationship with `HostRuntime::shutdown_deadline`
542    ///
543    /// When running under `HostRuntime`, this `stop_timeout` races against the
544    /// runtime's `shutdown_deadline` (both default to 30s). To ensure deterministic behavior:
545    ///
546    /// - `stop_timeout` should be **less than** `shutdown_deadline`
547    /// - This allows the lifecycle's internal timeout to trigger first for graceful cleanup
548    /// - The runtime's `deadline_token` then acts as a hard backstop
549    ///
550    /// Example: `stop_timeout = 25s`, `shutdown_deadline = 30s`
551    pub fn with_stop_timeout(mut self, d: Duration) -> Self {
552        self.stop_timeout = d;
553        self
554    }
555
556    #[inline]
557    #[must_use]
558    pub fn status(&self) -> Status {
559        self.lc.status()
560    }
561
562    #[inline]
563    #[must_use]
564    pub fn inner(&self) -> &T {
565        self.inner.as_ref()
566    }
567
568    /// Sometimes callers need to hold an `Arc` to the inner runnable.
569    #[inline]
570    #[must_use]
571    pub fn inner_arc(&self) -> Arc<T> {
572        self.inner.clone()
573    }
574
575    /// Configure readiness behavior produced by proc-macros (`#[modkit::module(..., lifecycle(...))]`).
576    pub fn with_ready_mode(
577        mut self,
578        await_ready: bool,
579        has_ready_handler: bool,
580        run_ready_fn: Option<ReadyFn<T>>,
581    ) -> Self {
582        self.await_ready = await_ready;
583        self.has_ready_handler = has_ready_handler;
584        self.run_ready_fn = run_ready_fn;
585        self
586    }
587}
588
589impl<T: Runnable + Default> Default for WithLifecycle<T> {
590    fn default() -> Self {
591        Self::new(T::default())
592    }
593}
594
595#[async_trait]
596impl<T: Runnable> crate::contracts::RunnableCapability for WithLifecycle<T> {
597    #[tracing::instrument(skip(self, external_cancel), level = "debug")]
598    async fn start(&self, external_cancel: CancellationToken) -> TaskResult<()> {
599        let inner = self.inner.clone();
600        let composed = external_cancel.child_token();
601
602        if !self.await_ready {
603            self.lc
604                .start_with_token(composed, move |cancel| inner.run(cancel))
605                .map_err(anyhow::Error::from)
606        } else if self.has_ready_handler {
607            let f = self.run_ready_fn.ok_or_else(|| {
608                anyhow::anyhow!("run_ready_fn must be set when has_ready_handler")
609            })?;
610            self.lc
611                .start_with_ready_and_token(composed, move |cancel, ready| f(inner, cancel, ready))
612                .map_err(anyhow::Error::from)
613        } else {
614            self.lc
615                .start_with_ready_and_token(composed, move |cancel, ready| async move {
616                    // Auto-notify readiness and continue with normal run()
617                    ready.notify();
618                    inner.run(cancel).await
619                })
620                .map_err(anyhow::Error::from)
621        }
622    }
623
624    /// Stop the lifecycle-managed task.
625    ///
626    /// Implements the two-phase shutdown contract:
627    /// 1. Attempts graceful stop using `self.stop_timeout` (default 30s)
628    /// 2. If `deadline_token` is cancelled before graceful stop completes,
629    ///    immediately aborts with zero timeout
630    ///
631    /// The `deadline_token` is a fresh token from the runtime (not already cancelled),
632    /// allowing real graceful shutdown to occur.
633    #[tracing::instrument(skip(self, deadline_token), level = "debug")]
634    async fn stop(&self, deadline_token: CancellationToken) -> TaskResult<()> {
635        tokio::select! {
636            res = self.lc.stop(self.stop_timeout) => {
637                _ = res.map_err(anyhow::Error::from)?;
638                Ok(())
639            }
640            () = deadline_token.cancelled() => {
641                // Hard-stop deadline reached, abort immediately
642                tracing::debug!("Hard-stop deadline reached, aborting lifecycle");
643                _ = self.lc.stop(Duration::from_millis(0)).await?;
644                Ok(())
645            }
646        }
647    }
648}
649
650impl<T: Runnable> Drop for WithLifecycle<T> {
651    /// Best-effort, but only if we're the last owner of `lc` to avoid aborting someone else's task.
652    fn drop(&mut self) {
653        if Arc::strong_count(&self.lc) == 1 {
654            if let Some(tok) = self.lc.cancel.lock().as_ref() {
655                tok.cancel();
656            }
657            if let Some(handle) = self.lc.handle.lock().as_ref() {
658                handle.abort();
659            }
660        }
661    }
662}
663
664// ----- Tests -----------------------------------------------------------------
665
666#[cfg(test)]
667#[cfg_attr(coverage_nightly, coverage(off))]
668mod tests {
669    use super::*;
670    use std::sync::atomic::{AtomicU32, Ordering as AOrd};
671    use tokio::time::{Duration, sleep};
672
673    struct TestRunnable {
674        counter: AtomicU32,
675    }
676
677    impl TestRunnable {
678        fn new() -> Self {
679            Self {
680                counter: AtomicU32::new(0),
681            }
682        }
683        fn count(&self) -> u32 {
684            self.counter.load(AOrd::Relaxed)
685        }
686    }
687
688    #[async_trait::async_trait]
689    impl Runnable for TestRunnable {
690        async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()> {
691            let mut interval = tokio::time::interval(Duration::from_millis(10));
692            loop {
693                tokio::select! {
694                    _ = interval.tick() => { self.counter.fetch_add(1, AOrd::Relaxed); }
695                    () = cancel.cancelled() => break,
696                }
697            }
698            Ok(())
699        }
700    }
701
702    #[tokio::test]
703    async fn lifecycle_basic() {
704        let lc = Arc::new(Lifecycle::new());
705        assert_eq!(lc.status(), Status::Stopped);
706
707        let result = lc.start(|cancel| async move {
708            cancel.cancelled().await;
709            Ok(())
710        });
711        assert!(result.is_ok());
712
713        let stop_result = lc.stop(Duration::from_millis(100)).await;
714        assert!(stop_result.is_ok());
715        assert_eq!(lc.status(), Status::Stopped);
716    }
717
718    #[tokio::test]
719    async fn with_lifecycle_wrapper_basics() {
720        let runnable = TestRunnable::new();
721        let wrapper = WithLifecycle::new(runnable);
722
723        assert_eq!(wrapper.status(), Status::Stopped);
724        assert_eq!(wrapper.inner().count(), 0);
725
726        let wrapper = wrapper.with_stop_timeout(Duration::from_mins(1));
727        assert_eq!(wrapper.stop_timeout.as_secs(), 60);
728    }
729
730    #[tokio::test]
731    async fn start_sets_running_immediately() {
732        let lc = Lifecycle::new();
733        lc.start(|cancel| async move {
734            cancel.cancelled().await;
735            Ok(())
736        })
737        .unwrap();
738
739        let s = lc.status();
740        assert!(matches!(s, Status::Running | Status::Starting));
741
742        let _ = lc.stop(Duration::from_millis(50)).await.unwrap();
743        assert_eq!(lc.status(), Status::Stopped);
744    }
745
746    #[tokio::test]
747    async fn start_with_ready_transitions_and_stop() {
748        let lc = Lifecycle::new();
749
750        let (ready_tx, ready_rx) = oneshot::channel::<()>();
751        lc.start_with_ready(move |cancel, ready| async move {
752            _ = ready_rx.await;
753            ready.notify();
754            cancel.cancelled().await;
755            Ok(())
756        })
757        .unwrap();
758
759        assert_eq!(lc.status(), Status::Starting);
760
761        _ = ready_tx.send(());
762        sleep(Duration::from_millis(10)).await;
763        assert_eq!(lc.status(), Status::Running);
764
765        let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
766        assert!(matches!(
767            reason,
768            StopReason::Cancelled | StopReason::Finished
769        ));
770        assert_eq!(lc.status(), Status::Stopped);
771    }
772
773    #[tokio::test]
774    async fn stop_while_starting_before_ready() {
775        let lc = Lifecycle::new();
776
777        lc.start_with_ready(move |cancel, _ready| async move {
778            cancel.cancelled().await;
779            Ok(())
780        })
781        .unwrap();
782
783        assert_eq!(lc.status(), Status::Starting);
784
785        let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
786        assert!(matches!(
787            reason,
788            StopReason::Cancelled | StopReason::Finished
789        ));
790        assert_eq!(lc.status(), Status::Stopped);
791    }
792
793    #[tokio::test]
794    async fn timeout_path_aborts_and_notifies() {
795        let lc = Lifecycle::new();
796
797        lc.start(|_cancel| async move {
798            loop {
799                sleep(Duration::from_secs(1000)).await;
800            }
801            #[allow(unreachable_code)]
802            Ok::<_, anyhow::Error>(())
803        })
804        .unwrap();
805
806        let reason = lc.stop(Duration::from_millis(30)).await.unwrap();
807        assert_eq!(reason, StopReason::Timeout);
808        assert_eq!(lc.status(), Status::Stopped);
809    }
810
811    #[tokio::test]
812    async fn try_start_and_second_start_fails() {
813        let lc = Lifecycle::new();
814
815        assert!(lc.try_start(|cancel| async move {
816            cancel.cancelled().await;
817            Ok(())
818        }));
819
820        let err = lc.start(|_c| async { Ok(()) }).unwrap_err();
821        match err {
822            LifecycleError::AlreadyStarted => {}
823        }
824
825        let _ = lc.stop(Duration::from_millis(80)).await.unwrap();
826        assert_eq!(lc.status(), Status::Stopped);
827    }
828
829    #[tokio::test]
830    async fn stop_is_idempotent_and_safe_concurrent() {
831        let lc = Arc::new(Lifecycle::new());
832
833        lc.start(|cancel| async move {
834            cancel.cancelled().await;
835            Ok(())
836        })
837        .unwrap();
838
839        let a = lc.clone();
840        let b = lc.clone();
841        let (r1, r2) = tokio::join!(
842            async move { a.stop(Duration::from_millis(80)).await },
843            async move { b.stop(Duration::from_millis(80)).await },
844        );
845
846        let r1 = r1.unwrap();
847        let r2 = r2.unwrap();
848        assert!(matches!(
849            r1,
850            StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
851        ));
852        assert!(matches!(
853            r2,
854            StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
855        ));
856        assert_eq!(lc.status(), Status::Stopped);
857    }
858
859    #[tokio::test]
860    async fn stateful_wrapper_start_stop_roundtrip() {
861        use crate::contracts::RunnableCapability;
862
863        let wrapper = WithLifecycle::new(TestRunnable::new());
864        assert_eq!(wrapper.status(), Status::Stopped);
865
866        wrapper.start(CancellationToken::new()).await.unwrap();
867        assert!(wrapper.lc.is_running());
868
869        wrapper.stop(CancellationToken::new()).await.unwrap();
870        assert_eq!(wrapper.status(), Status::Stopped);
871    }
872
873    #[tokio::test]
874    async fn with_lifecycle_double_start_fails() {
875        use crate::contracts::RunnableCapability;
876
877        let wrapper = WithLifecycle::new(TestRunnable::new());
878        let cancel = CancellationToken::new();
879        wrapper.start(cancel.clone()).await.unwrap();
880        let err = wrapper.start(cancel).await;
881        assert!(err.is_err());
882        wrapper.stop(CancellationToken::new()).await.unwrap();
883    }
884
885    #[tokio::test]
886    async fn with_lifecycle_concurrent_stop_calls() {
887        use crate::contracts::RunnableCapability;
888        let wrapper = Arc::new(WithLifecycle::new(TestRunnable::new()));
889        wrapper.start(CancellationToken::new()).await.unwrap();
890        let a = wrapper.clone();
891        let b = wrapper.clone();
892        let (r1, r2) = tokio::join!(
893            async move { a.stop(CancellationToken::new()).await },
894            async move { b.stop(CancellationToken::new()).await },
895        );
896        assert!(r1.is_ok());
897        assert!(r2.is_ok());
898        assert_eq!(wrapper.status(), Status::Stopped);
899    }
900
901    #[tokio::test]
902    async fn lifecycle_handles_panics_properly() {
903        let lc = Lifecycle::new();
904
905        // Start a task that will panic
906        lc.start(|_cancel| async {
907            panic!("test panic message");
908        })
909        .unwrap();
910
911        // Give the task a moment to start and panic
912        tokio::time::sleep(Duration::from_millis(50)).await;
913
914        // Stop should handle the panic gracefully
915        let reason = lc.stop(Duration::from_secs(1)).await.unwrap();
916
917        // The task panicked, but stop should complete successfully
918        // The exact reason depends on timing, but it should not hang or fail
919        assert!(matches!(
920            reason,
921            StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
922        ));
923        assert_eq!(lc.status(), Status::Stopped);
924    }
925
926    #[tokio::test]
927    async fn lifecycle_task_naming_and_logging() {
928        let lc = Lifecycle::new();
929
930        // Start a simple task
931        lc.start(|cancel| async move {
932            cancel.cancelled().await;
933            Ok(())
934        })
935        .unwrap();
936
937        // Verify task is running
938        assert!(lc.is_running());
939
940        // Stop and verify proper cleanup
941        let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
942        assert!(matches!(
943            reason,
944            StopReason::Cancelled | StopReason::Finished
945        ));
946        assert_eq!(lc.status(), Status::Stopped);
947    }
948
949    #[tokio::test]
950    async fn lifecycle_join_handles_all_tasks() {
951        let lc = Arc::new(Lifecycle::new());
952
953        // Start multiple tasks in sequence (lifecycle only supports one at a time)
954        lc.start(|cancel| async move {
955            tokio::time::sleep(Duration::from_millis(10)).await;
956            cancel.cancelled().await;
957            Ok(())
958        })
959        .unwrap();
960
961        // Stop should wait for the task to complete
962        let start = std::time::Instant::now();
963        let reason = lc.stop(Duration::from_millis(200)).await.unwrap();
964        let elapsed = start.elapsed();
965
966        // Should have waited at least 10ms for the task
967        assert!(elapsed >= Duration::from_millis(10));
968        assert!(matches!(
969            reason,
970            StopReason::Cancelled | StopReason::Finished
971        ));
972        assert_eq!(lc.status(), Status::Stopped);
973    }
974}