Skip to main content

modkit/
lifecycle.rs

1use async_trait::async_trait;
2use parking_lot::Mutex;
3use std::sync::{
4    Arc,
5    atomic::{AtomicBool, AtomicU8, Ordering},
6};
7use std::time::Duration;
8use tokio::sync::{Notify, oneshot};
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11
12// ----- Results & aliases -----------------------------------------------------
13
14/// Public result for lifecycle-level operations.
15type LcResult<T = ()> = std::result::Result<T, LifecycleError>;
16
17/// Result returned by user/background tasks.
18type TaskResult<T = ()> = anyhow::Result<T>;
19
20/// Type alias for ready function pointer to reduce complexity.
21type ReadyFn<T> = fn(
22    Arc<T>,
23    CancellationToken,
24    ReadySignal,
25)
26    -> std::pin::Pin<Box<dyn std::future::Future<Output = TaskResult<()>> + Send>>;
27
28// ----- Status model ----------------------------------------------------------
29
30/// Terminal/transition states for a background job.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32#[repr(u8)]
33pub enum Status {
34    Stopped,
35    Starting,
36    Running,
37    Stopping,
38}
39
40impl Status {
41    #[inline]
42    #[must_use]
43    pub const fn as_u8(self) -> u8 {
44        match self {
45            Status::Stopped => 0,
46            Status::Starting => 1,
47            Status::Running => 2,
48            Status::Stopping => 3,
49        }
50    }
51    #[inline]
52    #[must_use]
53    pub const fn from_u8(x: u8) -> Self {
54        match x {
55            1 => Status::Starting,
56            2 => Status::Running,
57            3 => Status::Stopping,
58            _ => Status::Stopped,
59        }
60    }
61}
62
63/// Reason why a task stopped.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum StopReason {
66    Finished,
67    Cancelled,
68    Timeout,
69}
70
71// ----- Ready signal ----------------------------------------------------------
72
73/// Ready signal used by `start_with_ready*` to flip Starting -> Running.
74pub struct ReadySignal(oneshot::Sender<()>);
75
76impl ReadySignal {
77    #[inline]
78    pub fn notify(self) {
79        if self.0.send(()).is_err() {
80            tracing::debug!("ReadySignal::notify: receiver already dropped");
81        }
82    }
83    /// Construct a `ReadySignal` from a oneshot sender (used by macro-generated shims).
84    #[inline]
85    #[must_use]
86    pub fn from_sender(sender: tokio::sync::oneshot::Sender<()>) -> Self {
87        ReadySignal(sender)
88    }
89}
90
91// ----- Runnable --------------------------------------------------------------
92
93/// Trait for modules that can run a long-running task.
94/// Note: take `self` by `Arc` to make the spawned future `'static` and `Send`.
95#[async_trait]
96pub trait Runnable: Send + Sync + 'static {
97    /// Long-running loop. Must return when `cancel` is cancelled.
98    async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()>;
99}
100
101// ----- Errors ----------------------------------------------------------------
102
103/// Library-level error for lifecycle operations.
104#[derive(Debug, thiserror::Error)]
105pub enum LifecycleError {
106    #[error("already started")]
107    AlreadyStarted,
108}
109
110// ----- Lifecycle -------------------------------------------------------------
111
112/// Lifecycle controller for managing background tasks.
113///
114/// Concurrency notes:
115/// - State is tracked with atomics and `Notify`.
116/// - `handle` / `cancel` are protected by `Mutex`, and their locking scope is kept minimal.
117/// - All public start methods are thin wrappers around `start_core`.
118pub struct Lifecycle {
119    name: &'static str,
120    status: Arc<AtomicU8>,
121    handle: Mutex<Option<JoinHandle<()>>>,
122    cancel: Mutex<Option<CancellationToken>>,
123    /// `true` once the background task has fully finished.
124    finished: Arc<AtomicBool>,
125    /// Set to `true` when `stop()` requested cancellation.
126    was_cancelled: Arc<AtomicBool>,
127    /// Notifies all waiters when the task finishes.
128    finished_notify: Arc<Notify>,
129}
130
131impl Lifecycle {
132    #[must_use]
133    pub fn new_named(name: &'static str) -> Self {
134        Self {
135            name,
136            status: Arc::new(AtomicU8::new(Status::Stopped.as_u8())),
137            handle: Mutex::new(None),
138            cancel: Mutex::new(None),
139            finished: Arc::new(AtomicBool::new(false)),
140            was_cancelled: Arc::new(AtomicBool::new(false)),
141            finished_notify: Arc::new(Notify::new()),
142        }
143    }
144
145    #[must_use]
146    pub fn new() -> Self {
147        Self::new_named("lifecycle")
148    }
149
150    #[inline]
151    pub fn name(&self) -> &'static str {
152        self.name
153    }
154
155    // --- small helpers for atomics (keeps Ordering unified and code concise) ---
156
157    #[inline]
158    fn load_status(&self) -> Status {
159        Status::from_u8(self.status.load(Ordering::Acquire))
160    }
161
162    #[inline]
163    fn store_status(&self, s: Status) {
164        self.status.store(s.as_u8(), Ordering::Release);
165    }
166
167    // --- public start APIs delegate to start_core --------------------------------
168
169    /// Spawn the job using `make(cancel)`.
170    ///
171    /// The future is constructed inside the task to avoid leaving the lifecycle in `Starting`
172    /// if `make` panics.
173    ///
174    /// # Errors
175    /// Returns `LcError` if the lifecycle is not in a startable state.
176    #[tracing::instrument(skip(self, make), level = "debug")]
177    pub fn start<F, Fut>(&self, make: F) -> LcResult
178    where
179        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
180        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
181    {
182        self.start_core(CancellationToken::new(), move |tok, _| make(tok), false)
183    }
184
185    /// Spawn the job using a provided `CancellationToken` and `make(cancel)`.
186    ///
187    /// # Errors
188    /// Returns `LcError` if the lifecycle is not in a startable state.
189    #[tracing::instrument(skip(self, make, token), level = "debug")]
190    pub fn start_with_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
191    where
192        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
193        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
194    {
195        self.start_core(token, move |tok, _| make(tok), false)
196    }
197
198    /// Spawn the job using `make(cancel, ready)`. Status becomes `Running` only after `ready.notify()`.
199    ///
200    /// # Errors
201    /// Returns `LcError` if the lifecycle is not in a startable state.
202    #[tracing::instrument(skip(self, make), level = "debug")]
203    pub fn start_with_ready<F, Fut>(&self, make: F) -> LcResult
204    where
205        F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
206        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
207    {
208        self.start_core(
209            CancellationToken::new(),
210            move |tok, rdy| async move {
211                let Some(rdy) = rdy else {
212                    return Err(anyhow::anyhow!("ReadySignal must be present"));
213                };
214                make(tok, rdy).await
215            },
216            true,
217        )
218    }
219
220    /// Ready-aware start variant that uses a provided `CancellationToken`.
221    ///
222    /// # Errors
223    /// Returns `LcError` if the lifecycle is not in a startable state.
224    #[tracing::instrument(skip(self, make, token), level = "debug")]
225    pub fn start_with_ready_and_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
226    where
227        F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
228        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
229    {
230        self.start_core(
231            token,
232            move |tok, rdy| async move {
233                let Some(rdy) = rdy else {
234                    return Err(anyhow::anyhow!("ReadySignal must be present"));
235                };
236                make(tok, rdy).await
237            },
238            true,
239        )
240    }
241
242    /// Unified start core
243    ///
244    /// `ready_mode = true`   => we expect a `ReadySignal` to flip `Starting -> Running` (upon notify).
245    /// `ready_mode = false`  => we flip to `Running` immediately after spawn.
246    fn start_core<F, Fut>(&self, token: CancellationToken, make: F, ready_mode: bool) -> LcResult
247    where
248        F: Send + 'static + FnOnce(CancellationToken, Option<ReadySignal>) -> Fut,
249        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
250    {
251        // Stopped -> Starting (via CAS)
252        let cas_ok = self
253            .status
254            .compare_exchange(
255                Status::Stopped.as_u8(),
256                Status::Starting.as_u8(),
257                Ordering::AcqRel,
258                Ordering::Acquire,
259            )
260            .is_ok();
261        if !cas_ok {
262            return Err(LifecycleError::AlreadyStarted);
263        }
264
265        self.finished.store(false, Ordering::Release);
266        self.was_cancelled.store(false, Ordering::Release);
267
268        // store cancellation token (bounded lock scope)
269        {
270            let mut c = self.cancel.lock();
271            *c = Some(token.clone());
272        }
273
274        // In ready mode, we wait for `ready.notify()` to flip to Running.
275        // Otherwise, we mark Running immediately.
276        let (ready_tx, ready_rx) = oneshot::channel::<()>();
277        if ready_mode {
278            let status_on_ready = self.status.clone();
279            tokio::spawn(async move {
280                if ready_rx.await.is_ok() {
281                    _ = status_on_ready.compare_exchange(
282                        Status::Starting.as_u8(),
283                        Status::Running.as_u8(),
284                        Ordering::AcqRel,
285                        Ordering::Acquire,
286                    );
287                    tracing::debug!("lifecycle status -> running (ready)");
288                } else {
289                    // Sender dropped: task didn't signal readiness; we will remain in Starting
290                    // until finish. This is usually a bug or early-drop scenario.
291                    tracing::debug!("ready signal dropped; staying in Starting until finish");
292                }
293            });
294        } else {
295            self.store_status(Status::Running);
296            tracing::debug!("lifecycle status -> running");
297        }
298
299        let finished_flag = self.finished.clone();
300        let finished_notify = self.finished_notify.clone();
301        let status_on_finish = self.status.clone();
302
303        // Spawn the actual task with descriptive logging
304        let module_name = self.name;
305        let task_id = format!("{module_name}-{self:p}");
306        let handle = tokio::spawn(async move {
307            tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task starting");
308            let res = make(token, ready_mode.then(|| ReadySignal(ready_tx))).await;
309            if let Err(e) = res {
310                tracing::error!(error=%e, task_id=%task_id, module = %module_name, "lifecycle task error");
311            }
312            finished_flag.store(true, Ordering::Release);
313            finished_notify.notify_waiters();
314            status_on_finish.store(Status::Stopped.as_u8(), Ordering::Release);
315            tracing::debug!(task_id=%task_id, module = %module_name, "lifecycle task finished");
316        });
317
318        // store handle (bounded lock scope)
319        {
320            let mut h = self.handle.lock();
321            *h = Some(handle);
322        }
323
324        Ok(())
325    }
326
327    /// Request graceful shutdown and wait up to `timeout`.
328    ///
329    /// # Errors
330    /// Returns `LcError` if the stop operation fails.
331    #[tracing::instrument(skip(self, timeout), level = "debug")]
332    pub async fn stop(&self, timeout: Duration) -> LcResult<StopReason> {
333        let module_name = self.name;
334        let task_id = format!("{module_name}-{self:p}");
335        let st = self.load_status();
336        if !matches!(st, Status::Starting | Status::Running | Status::Stopping) {
337            // Not running => already finished.
338            return Ok(StopReason::Finished);
339        }
340
341        self.store_status(Status::Stopping);
342
343        // Request cancellation only once (idempotent if multiple callers race here).
344        if let Some(tok) = { self.cancel.lock().take() } {
345            self.was_cancelled.store(true, Ordering::Release);
346            tok.cancel();
347        }
348
349        // Waiter that works for all callers, even after the task already finished.
350        let finished_flag = self.finished.clone();
351        let notify = self.finished_notify.clone();
352        let finished_wait = async move {
353            if finished_flag.load(Ordering::Acquire) {
354                return;
355            }
356            notify.notified().await;
357        };
358
359        let reason = tokio::select! {
360            () = finished_wait => {
361                if self.was_cancelled.load(Ordering::Acquire) {
362                    StopReason::Cancelled
363                } else {
364                    StopReason::Finished
365                }
366            }
367            () = tokio::time::sleep(timeout) => StopReason::Timeout,
368        };
369
370        // Join and ensure we notify waiters even if the task was aborted/panicked.
371        let handle_opt = { self.handle.lock().take() };
372        if let Some(handle) = handle_opt {
373            if matches!(reason, StopReason::Timeout) && !handle.is_finished() {
374                tracing::warn!("lifecycle stop timed out; aborting task");
375                handle.abort();
376            }
377
378            match handle.await {
379                Ok(()) => {
380                    tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task completed successfully");
381                }
382                Err(e) if e.is_cancelled() => {
383                    tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task was cancelled/aborted");
384                }
385                Err(e) if e.is_panic() => {
386                    // Extract panic information if possible
387                    match e.try_into_panic() {
388                        Ok(panic_payload) => {
389                            let panic_msg = panic_payload
390                                .downcast_ref::<&str>()
391                                .copied()
392                                .map(str::to_owned)
393                                .or_else(|| panic_payload.downcast_ref::<String>().cloned())
394                                .unwrap_or_else(|| "unknown panic".to_owned());
395
396                            tracing::error!(
397                                task_id = %task_id,
398                                module = %module_name,
399                                panic_message = %panic_msg,
400                                "lifecycle task panicked - this indicates a serious bug"
401                            );
402                        }
403                        _ => {
404                            tracing::error!(
405                                task_id = %task_id,
406                                module = %module_name,
407                                "lifecycle task panicked (could not extract panic message)"
408                            );
409                        }
410                    }
411                }
412                Err(e) => {
413                    tracing::warn!(task_id = %task_id, module = %module_name, error = %e, "lifecycle task join error");
414                }
415            }
416
417            self.finished.store(true, Ordering::Release);
418            self.finished_notify.notify_waiters();
419        }
420
421        self.store_status(Status::Stopped);
422        tracing::info!(?reason, "lifecycle stopped");
423        Ok(reason)
424    }
425
426    /// Current status.
427    #[inline]
428    #[must_use]
429    pub fn status(&self) -> Status {
430        self.load_status()
431    }
432
433    /// Whether it is in `Starting` or `Running`.
434    #[inline]
435    pub fn is_running(&self) -> bool {
436        matches!(self.status(), Status::Starting | Status::Running)
437    }
438
439    /// Best-effort "try start" that swallows the error and returns bool.
440    #[inline]
441    #[must_use]
442    pub fn try_start<F, Fut>(&self, make: F) -> bool
443    where
444        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
445        Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
446    {
447        self.start(make).is_ok()
448    }
449
450    /// Wait until the task is fully stopped.
451    pub async fn wait_stopped(&self) {
452        if self.finished.load(Ordering::Acquire) {
453            return;
454        }
455        self.finished_notify.notified().await;
456    }
457}
458
459impl Default for Lifecycle {
460    fn default() -> Self {
461        Self::new()
462    }
463}
464
465impl Drop for Lifecycle {
466    /// Best-effort cleanup to avoid orphaned background tasks if caller forgets to call `stop()`.
467    fn drop(&mut self) {
468        if let Some(tok) = self.cancel.get_mut().take() {
469            tok.cancel();
470        }
471        if let Some(handle) = self.handle.get_mut().take() {
472            handle.abort();
473        }
474    }
475}
476
477// ----- WithLifecycle wrapper -------------------------------------------------
478
479/// Wrapper that implements `StatefulModule` for any `T: Runnable`.
480#[must_use]
481pub struct WithLifecycle<T: Runnable> {
482    inner: Arc<T>,
483    lc: Arc<Lifecycle>,
484    pub(crate) stop_timeout: Duration,
485    // lifecycle start mode configuration
486    await_ready: bool,
487    has_ready_handler: bool,
488    run_ready_fn: Option<ReadyFn<T>>,
489}
490
491impl<T: Runnable> WithLifecycle<T> {
492    pub fn new(inner: T) -> Self {
493        Self {
494            inner: Arc::new(inner),
495            lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
496            stop_timeout: Duration::from_secs(30),
497            await_ready: false,
498            has_ready_handler: false,
499            run_ready_fn: None,
500        }
501    }
502
503    pub fn from_arc(inner: Arc<T>) -> Self {
504        Self {
505            inner,
506            lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
507            stop_timeout: Duration::from_secs(30),
508            await_ready: false,
509            has_ready_handler: false,
510            run_ready_fn: None,
511        }
512    }
513
514    pub fn new_with_name(inner: T, name: &'static str) -> Self {
515        Self {
516            inner: Arc::new(inner),
517            lc: Arc::new(Lifecycle::new_named(name)),
518            stop_timeout: Duration::from_secs(30),
519            await_ready: false,
520            has_ready_handler: false,
521            run_ready_fn: None,
522        }
523    }
524
525    pub fn from_arc_with_name(inner: Arc<T>, name: &'static str) -> Self {
526        Self {
527            inner,
528            lc: Arc::new(Lifecycle::new_named(name)),
529            stop_timeout: Duration::from_secs(30),
530            await_ready: false,
531            has_ready_handler: false,
532            run_ready_fn: None,
533        }
534    }
535
536    pub fn with_stop_timeout(mut self, d: Duration) -> Self {
537        self.stop_timeout = d;
538        self
539    }
540
541    #[inline]
542    #[must_use]
543    pub fn status(&self) -> Status {
544        self.lc.status()
545    }
546
547    #[inline]
548    #[must_use]
549    pub fn inner(&self) -> &T {
550        self.inner.as_ref()
551    }
552
553    /// Sometimes callers need to hold an `Arc` to the inner runnable.
554    #[inline]
555    #[must_use]
556    pub fn inner_arc(&self) -> Arc<T> {
557        self.inner.clone()
558    }
559
560    /// Configure readiness behavior produced by proc-macros (`#[modkit::module(..., lifecycle(...))]`).
561    pub fn with_ready_mode(
562        mut self,
563        await_ready: bool,
564        has_ready_handler: bool,
565        run_ready_fn: Option<ReadyFn<T>>,
566    ) -> Self {
567        self.await_ready = await_ready;
568        self.has_ready_handler = has_ready_handler;
569        self.run_ready_fn = run_ready_fn;
570        self
571    }
572}
573
574impl<T: Runnable + Default> Default for WithLifecycle<T> {
575    fn default() -> Self {
576        Self::new(T::default())
577    }
578}
579
580#[async_trait]
581impl<T: Runnable> crate::contracts::RunnableCapability for WithLifecycle<T> {
582    #[tracing::instrument(skip(self, external_cancel), level = "debug")]
583    async fn start(&self, external_cancel: CancellationToken) -> TaskResult<()> {
584        let inner = self.inner.clone();
585        let composed = external_cancel.child_token();
586
587        if !self.await_ready {
588            self.lc
589                .start_with_token(composed, move |cancel| inner.run(cancel))
590                .map_err(anyhow::Error::from)
591        } else if self.has_ready_handler {
592            let f = self.run_ready_fn.ok_or_else(|| {
593                anyhow::anyhow!("run_ready_fn must be set when has_ready_handler")
594            })?;
595            self.lc
596                .start_with_ready_and_token(composed, move |cancel, ready| f(inner, cancel, ready))
597                .map_err(anyhow::Error::from)
598        } else {
599            self.lc
600                .start_with_ready_and_token(composed, move |cancel, ready| async move {
601                    // Auto-notify readiness and continue with normal run()
602                    ready.notify();
603                    inner.run(cancel).await
604                })
605                .map_err(anyhow::Error::from)
606        }
607    }
608
609    #[tracing::instrument(skip(self, external_cancel), level = "debug")]
610    async fn stop(&self, external_cancel: CancellationToken) -> TaskResult<()> {
611        tokio::select! {
612            res = self.lc.stop(self.stop_timeout) => {
613                _ = res.map_err(anyhow::Error::from)?;
614                Ok(())
615            }
616            () = external_cancel.cancelled() => {
617                _ = self.lc.stop(Duration::from_millis(0)).await?;
618                Ok(())
619            }
620        }
621    }
622}
623
624impl<T: Runnable> Drop for WithLifecycle<T> {
625    /// Best-effort, but only if we're the last owner of `lc` to avoid aborting someone else's task.
626    fn drop(&mut self) {
627        if Arc::strong_count(&self.lc) == 1 {
628            if let Some(tok) = self.lc.cancel.lock().as_ref() {
629                tok.cancel();
630            }
631            if let Some(handle) = self.lc.handle.lock().as_ref() {
632                handle.abort();
633            }
634        }
635    }
636}
637
638// ----- Tests -----------------------------------------------------------------
639
640#[cfg(test)]
641#[cfg_attr(coverage_nightly, coverage(off))]
642mod tests {
643    use super::*;
644    use std::sync::atomic::{AtomicU32, Ordering as AOrd};
645    use tokio::time::{Duration, sleep};
646
647    struct TestRunnable {
648        counter: AtomicU32,
649    }
650
651    impl TestRunnable {
652        fn new() -> Self {
653            Self {
654                counter: AtomicU32::new(0),
655            }
656        }
657        fn count(&self) -> u32 {
658            self.counter.load(AOrd::Relaxed)
659        }
660    }
661
662    #[async_trait::async_trait]
663    impl Runnable for TestRunnable {
664        async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()> {
665            let mut interval = tokio::time::interval(Duration::from_millis(10));
666            loop {
667                tokio::select! {
668                    _ = interval.tick() => { self.counter.fetch_add(1, AOrd::Relaxed); }
669                    () = cancel.cancelled() => break,
670                }
671            }
672            Ok(())
673        }
674    }
675
676    #[tokio::test]
677    async fn lifecycle_basic() {
678        let lc = Arc::new(Lifecycle::new());
679        assert_eq!(lc.status(), Status::Stopped);
680
681        let result = lc.start(|cancel| async move {
682            cancel.cancelled().await;
683            Ok(())
684        });
685        assert!(result.is_ok());
686
687        let stop_result = lc.stop(Duration::from_millis(100)).await;
688        assert!(stop_result.is_ok());
689        assert_eq!(lc.status(), Status::Stopped);
690    }
691
692    #[tokio::test]
693    async fn with_lifecycle_wrapper_basics() {
694        let runnable = TestRunnable::new();
695        let wrapper = WithLifecycle::new(runnable);
696
697        assert_eq!(wrapper.status(), Status::Stopped);
698        assert_eq!(wrapper.inner().count(), 0);
699
700        let wrapper = wrapper.with_stop_timeout(Duration::from_secs(60));
701        assert_eq!(wrapper.stop_timeout.as_secs(), 60);
702    }
703
704    #[tokio::test]
705    async fn start_sets_running_immediately() {
706        let lc = Lifecycle::new();
707        lc.start(|cancel| async move {
708            cancel.cancelled().await;
709            Ok(())
710        })
711        .unwrap();
712
713        let s = lc.status();
714        assert!(matches!(s, Status::Running | Status::Starting));
715
716        let _ = lc.stop(Duration::from_millis(50)).await.unwrap();
717        assert_eq!(lc.status(), Status::Stopped);
718    }
719
720    #[tokio::test]
721    async fn start_with_ready_transitions_and_stop() {
722        let lc = Lifecycle::new();
723
724        let (ready_tx, ready_rx) = oneshot::channel::<()>();
725        lc.start_with_ready(move |cancel, ready| async move {
726            _ = ready_rx.await;
727            ready.notify();
728            cancel.cancelled().await;
729            Ok(())
730        })
731        .unwrap();
732
733        assert_eq!(lc.status(), Status::Starting);
734
735        _ = ready_tx.send(());
736        sleep(Duration::from_millis(10)).await;
737        assert_eq!(lc.status(), Status::Running);
738
739        let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
740        assert!(matches!(
741            reason,
742            StopReason::Cancelled | StopReason::Finished
743        ));
744        assert_eq!(lc.status(), Status::Stopped);
745    }
746
747    #[tokio::test]
748    async fn stop_while_starting_before_ready() {
749        let lc = Lifecycle::new();
750
751        lc.start_with_ready(move |cancel, _ready| async move {
752            cancel.cancelled().await;
753            Ok(())
754        })
755        .unwrap();
756
757        assert_eq!(lc.status(), Status::Starting);
758
759        let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
760        assert!(matches!(
761            reason,
762            StopReason::Cancelled | StopReason::Finished
763        ));
764        assert_eq!(lc.status(), Status::Stopped);
765    }
766
767    #[tokio::test]
768    async fn timeout_path_aborts_and_notifies() {
769        let lc = Lifecycle::new();
770
771        lc.start(|_cancel| async move {
772            loop {
773                sleep(Duration::from_secs(1000)).await;
774            }
775            #[allow(unreachable_code)]
776            Ok::<_, anyhow::Error>(())
777        })
778        .unwrap();
779
780        let reason = lc.stop(Duration::from_millis(30)).await.unwrap();
781        assert_eq!(reason, StopReason::Timeout);
782        assert_eq!(lc.status(), Status::Stopped);
783    }
784
785    #[tokio::test]
786    async fn try_start_and_second_start_fails() {
787        let lc = Lifecycle::new();
788
789        assert!(lc.try_start(|cancel| async move {
790            cancel.cancelled().await;
791            Ok(())
792        }));
793
794        let err = lc.start(|_c| async { Ok(()) }).unwrap_err();
795        match err {
796            LifecycleError::AlreadyStarted => {}
797        }
798
799        let _ = lc.stop(Duration::from_millis(80)).await.unwrap();
800        assert_eq!(lc.status(), Status::Stopped);
801    }
802
803    #[tokio::test]
804    async fn stop_is_idempotent_and_safe_concurrent() {
805        let lc = Arc::new(Lifecycle::new());
806
807        lc.start(|cancel| async move {
808            cancel.cancelled().await;
809            Ok(())
810        })
811        .unwrap();
812
813        let a = lc.clone();
814        let b = lc.clone();
815        let (r1, r2) = tokio::join!(
816            async move { a.stop(Duration::from_millis(80)).await },
817            async move { b.stop(Duration::from_millis(80)).await },
818        );
819
820        let r1 = r1.unwrap();
821        let r2 = r2.unwrap();
822        assert!(matches!(
823            r1,
824            StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
825        ));
826        assert!(matches!(
827            r2,
828            StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
829        ));
830        assert_eq!(lc.status(), Status::Stopped);
831    }
832
833    #[tokio::test]
834    async fn stateful_wrapper_start_stop_roundtrip() {
835        use crate::contracts::RunnableCapability;
836
837        let wrapper = WithLifecycle::new(TestRunnable::new());
838        assert_eq!(wrapper.status(), Status::Stopped);
839
840        wrapper.start(CancellationToken::new()).await.unwrap();
841        assert!(wrapper.lc.is_running());
842
843        wrapper.stop(CancellationToken::new()).await.unwrap();
844        assert_eq!(wrapper.status(), Status::Stopped);
845    }
846
847    #[tokio::test]
848    async fn with_lifecycle_double_start_fails() {
849        use crate::contracts::RunnableCapability;
850
851        let wrapper = WithLifecycle::new(TestRunnable::new());
852        let cancel = CancellationToken::new();
853        wrapper.start(cancel.clone()).await.unwrap();
854        let err = wrapper.start(cancel).await;
855        assert!(err.is_err());
856        wrapper.stop(CancellationToken::new()).await.unwrap();
857    }
858
859    #[tokio::test]
860    async fn with_lifecycle_concurrent_stop_calls() {
861        use crate::contracts::RunnableCapability;
862        let wrapper = Arc::new(WithLifecycle::new(TestRunnable::new()));
863        wrapper.start(CancellationToken::new()).await.unwrap();
864        let a = wrapper.clone();
865        let b = wrapper.clone();
866        let (r1, r2) = tokio::join!(
867            async move { a.stop(CancellationToken::new()).await },
868            async move { b.stop(CancellationToken::new()).await },
869        );
870        assert!(r1.is_ok());
871        assert!(r2.is_ok());
872        assert_eq!(wrapper.status(), Status::Stopped);
873    }
874
875    #[tokio::test]
876    async fn lifecycle_handles_panics_properly() {
877        let lc = Lifecycle::new();
878
879        // Start a task that will panic
880        lc.start(|_cancel| async {
881            panic!("test panic message");
882        })
883        .unwrap();
884
885        // Give the task a moment to start and panic
886        tokio::time::sleep(Duration::from_millis(50)).await;
887
888        // Stop should handle the panic gracefully
889        let reason = lc.stop(Duration::from_millis(1000)).await.unwrap();
890
891        // The task panicked, but stop should complete successfully
892        // The exact reason depends on timing, but it should not hang or fail
893        assert!(matches!(
894            reason,
895            StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
896        ));
897        assert_eq!(lc.status(), Status::Stopped);
898    }
899
900    #[tokio::test]
901    async fn lifecycle_task_naming_and_logging() {
902        let lc = Lifecycle::new();
903
904        // Start a simple task
905        lc.start(|cancel| async move {
906            cancel.cancelled().await;
907            Ok(())
908        })
909        .unwrap();
910
911        // Verify task is running
912        assert!(lc.is_running());
913
914        // Stop and verify proper cleanup
915        let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
916        assert!(matches!(
917            reason,
918            StopReason::Cancelled | StopReason::Finished
919        ));
920        assert_eq!(lc.status(), Status::Stopped);
921    }
922
923    #[tokio::test]
924    async fn lifecycle_join_handles_all_tasks() {
925        let lc = Arc::new(Lifecycle::new());
926
927        // Start multiple tasks in sequence (lifecycle only supports one at a time)
928        lc.start(|cancel| async move {
929            tokio::time::sleep(Duration::from_millis(10)).await;
930            cancel.cancelled().await;
931            Ok(())
932        })
933        .unwrap();
934
935        // Stop should wait for the task to complete
936        let start = std::time::Instant::now();
937        let reason = lc.stop(Duration::from_millis(200)).await.unwrap();
938        let elapsed = start.elapsed();
939
940        // Should have waited at least 10ms for the task
941        assert!(elapsed >= Duration::from_millis(10));
942        assert!(matches!(
943            reason,
944            StopReason::Cancelled | StopReason::Finished
945        ));
946        assert_eq!(lc.status(), Status::Stopped);
947    }
948}