1use async_trait::async_trait;
2use parking_lot::Mutex;
3use std::sync::{
4 Arc,
5 atomic::{AtomicBool, AtomicU8, Ordering},
6};
7use std::time::Duration;
8use tokio::sync::{Notify, oneshot};
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11
12type LcResult<T = ()> = std::result::Result<T, LifecycleError>;
16
17type TaskResult<T = ()> = anyhow::Result<T>;
19
20type ReadyFn<T> = fn(
22 Arc<T>,
23 CancellationToken,
24 ReadySignal,
25)
26 -> std::pin::Pin<Box<dyn std::future::Future<Output = TaskResult<()>> + Send>>;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32#[repr(u8)]
33pub enum Status {
34 Stopped,
35 Starting,
36 Running,
37 Stopping,
38}
39
40impl Status {
41 #[inline]
42 #[must_use]
43 pub const fn as_u8(self) -> u8 {
44 match self {
45 Status::Stopped => 0,
46 Status::Starting => 1,
47 Status::Running => 2,
48 Status::Stopping => 3,
49 }
50 }
51 #[inline]
52 #[must_use]
53 pub const fn from_u8(x: u8) -> Self {
54 match x {
55 1 => Status::Starting,
56 2 => Status::Running,
57 3 => Status::Stopping,
58 _ => Status::Stopped,
59 }
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum StopReason {
66 Finished,
67 Cancelled,
68 Timeout,
69}
70
71pub struct ReadySignal(oneshot::Sender<()>);
75
76impl ReadySignal {
77 #[inline]
78 pub fn notify(self) {
79 if self.0.send(()).is_err() {
80 tracing::debug!("ReadySignal::notify: receiver already dropped");
81 }
82 }
83 #[inline]
85 #[must_use]
86 pub fn from_sender(sender: tokio::sync::oneshot::Sender<()>) -> Self {
87 ReadySignal(sender)
88 }
89}
90
91#[async_trait]
96pub trait Runnable: Send + Sync + 'static {
97 async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()>;
99}
100
101#[derive(Debug, thiserror::Error)]
105pub enum LifecycleError {
106 #[error("already started")]
107 AlreadyStarted,
108}
109
110pub struct Lifecycle {
119 name: &'static str,
120 status: Arc<AtomicU8>,
121 handle: Mutex<Option<JoinHandle<()>>>,
122 cancel: Mutex<Option<CancellationToken>>,
123 finished: Arc<AtomicBool>,
125 was_cancelled: Arc<AtomicBool>,
127 finished_notify: Arc<Notify>,
129}
130
131impl Lifecycle {
132 #[must_use]
133 pub fn new_named(name: &'static str) -> Self {
134 Self {
135 name,
136 status: Arc::new(AtomicU8::new(Status::Stopped.as_u8())),
137 handle: Mutex::new(None),
138 cancel: Mutex::new(None),
139 finished: Arc::new(AtomicBool::new(false)),
140 was_cancelled: Arc::new(AtomicBool::new(false)),
141 finished_notify: Arc::new(Notify::new()),
142 }
143 }
144
145 #[must_use]
146 pub fn new() -> Self {
147 Self::new_named("lifecycle")
148 }
149
150 #[inline]
151 pub fn name(&self) -> &'static str {
152 self.name
153 }
154
155 #[inline]
158 fn load_status(&self) -> Status {
159 Status::from_u8(self.status.load(Ordering::Acquire))
160 }
161
162 #[inline]
163 fn store_status(&self, s: Status) {
164 self.status.store(s.as_u8(), Ordering::Release);
165 }
166
167 #[tracing::instrument(skip(self, make), level = "debug")]
177 pub fn start<F, Fut>(&self, make: F) -> LcResult
178 where
179 F: FnOnce(CancellationToken) -> Fut + Send + 'static,
180 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
181 {
182 self.start_core(CancellationToken::new(), move |tok, _| make(tok), false)
183 }
184
185 #[tracing::instrument(skip(self, make, token), level = "debug")]
190 pub fn start_with_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
191 where
192 F: FnOnce(CancellationToken) -> Fut + Send + 'static,
193 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
194 {
195 self.start_core(token, move |tok, _| make(tok), false)
196 }
197
198 #[tracing::instrument(skip(self, make), level = "debug")]
203 pub fn start_with_ready<F, Fut>(&self, make: F) -> LcResult
204 where
205 F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
206 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
207 {
208 self.start_core(
209 CancellationToken::new(),
210 move |tok, rdy| async move {
211 let Some(rdy) = rdy else {
212 return Err(anyhow::anyhow!("ReadySignal must be present"));
213 };
214 make(tok, rdy).await
215 },
216 true,
217 )
218 }
219
220 #[tracing::instrument(skip(self, make, token), level = "debug")]
225 pub fn start_with_ready_and_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
226 where
227 F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
228 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
229 {
230 self.start_core(
231 token,
232 move |tok, rdy| async move {
233 let Some(rdy) = rdy else {
234 return Err(anyhow::anyhow!("ReadySignal must be present"));
235 };
236 make(tok, rdy).await
237 },
238 true,
239 )
240 }
241
242 fn start_core<F, Fut>(&self, token: CancellationToken, make: F, ready_mode: bool) -> LcResult
247 where
248 F: Send + 'static + FnOnce(CancellationToken, Option<ReadySignal>) -> Fut,
249 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
250 {
251 let cas_ok = self
253 .status
254 .compare_exchange(
255 Status::Stopped.as_u8(),
256 Status::Starting.as_u8(),
257 Ordering::AcqRel,
258 Ordering::Acquire,
259 )
260 .is_ok();
261 if !cas_ok {
262 return Err(LifecycleError::AlreadyStarted);
263 }
264
265 self.finished.store(false, Ordering::Release);
266 self.was_cancelled.store(false, Ordering::Release);
267
268 {
270 let mut c = self.cancel.lock();
271 *c = Some(token.clone());
272 }
273
274 let (ready_tx, ready_rx) = oneshot::channel::<()>();
277 if ready_mode {
278 let status_on_ready = self.status.clone();
279 tokio::spawn(async move {
280 if ready_rx.await.is_ok() {
281 _ = status_on_ready.compare_exchange(
282 Status::Starting.as_u8(),
283 Status::Running.as_u8(),
284 Ordering::AcqRel,
285 Ordering::Acquire,
286 );
287 tracing::debug!("lifecycle status -> running (ready)");
288 } else {
289 tracing::debug!("ready signal dropped; staying in Starting until finish");
292 }
293 });
294 } else {
295 self.store_status(Status::Running);
296 tracing::debug!("lifecycle status -> running");
297 }
298
299 let finished_flag = self.finished.clone();
300 let finished_notify = self.finished_notify.clone();
301 let status_on_finish = self.status.clone();
302
303 let module_name = self.name;
305 let task_id = format!("{module_name}-{self:p}");
306 let handle = tokio::spawn(async move {
307 tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task starting");
308 let res = make(token, ready_mode.then(|| ReadySignal(ready_tx))).await;
309 if let Err(e) = res {
310 tracing::error!(error=%e, task_id=%task_id, module = %module_name, "lifecycle task error");
311 }
312 finished_flag.store(true, Ordering::Release);
313 finished_notify.notify_waiters();
314 status_on_finish.store(Status::Stopped.as_u8(), Ordering::Release);
315 tracing::debug!(task_id=%task_id, module = %module_name, "lifecycle task finished");
316 });
317
318 {
320 let mut h = self.handle.lock();
321 *h = Some(handle);
322 }
323
324 Ok(())
325 }
326
327 #[tracing::instrument(skip(self, timeout), level = "debug")]
332 pub async fn stop(&self, timeout: Duration) -> LcResult<StopReason> {
333 let module_name = self.name;
334 let task_id = format!("{module_name}-{self:p}");
335 let st = self.load_status();
336 if !matches!(st, Status::Starting | Status::Running | Status::Stopping) {
337 return Ok(StopReason::Finished);
339 }
340
341 self.store_status(Status::Stopping);
342
343 if let Some(tok) = { self.cancel.lock().take() } {
345 self.was_cancelled.store(true, Ordering::Release);
346 tok.cancel();
347 }
348
349 let finished_flag = self.finished.clone();
351 let notify = self.finished_notify.clone();
352 let finished_wait = async move {
353 if finished_flag.load(Ordering::Acquire) {
354 return;
355 }
356 notify.notified().await;
357 };
358
359 let reason = tokio::select! {
360 () = finished_wait => {
361 if self.was_cancelled.load(Ordering::Acquire) {
362 StopReason::Cancelled
363 } else {
364 StopReason::Finished
365 }
366 }
367 () = tokio::time::sleep(timeout) => StopReason::Timeout,
368 };
369
370 let handle_opt = { self.handle.lock().take() };
372 if let Some(handle) = handle_opt {
373 if matches!(reason, StopReason::Timeout) && !handle.is_finished() {
374 tracing::warn!("lifecycle stop timed out; aborting task");
375 handle.abort();
376 }
377
378 match handle.await {
379 Ok(()) => {
380 tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task completed successfully");
381 }
382 Err(e) if e.is_cancelled() => {
383 tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task was cancelled/aborted");
384 }
385 Err(e) if e.is_panic() => {
386 match e.try_into_panic() {
388 Ok(panic_payload) => {
389 let panic_msg = panic_payload
390 .downcast_ref::<&str>()
391 .copied()
392 .map(str::to_owned)
393 .or_else(|| panic_payload.downcast_ref::<String>().cloned())
394 .unwrap_or_else(|| "unknown panic".to_owned());
395
396 tracing::error!(
397 task_id = %task_id,
398 module = %module_name,
399 panic_message = %panic_msg,
400 "lifecycle task panicked - this indicates a serious bug"
401 );
402 }
403 _ => {
404 tracing::error!(
405 task_id = %task_id,
406 module = %module_name,
407 "lifecycle task panicked (could not extract panic message)"
408 );
409 }
410 }
411 }
412 Err(e) => {
413 tracing::warn!(task_id = %task_id, module = %module_name, error = %e, "lifecycle task join error");
414 }
415 }
416
417 self.finished.store(true, Ordering::Release);
418 self.finished_notify.notify_waiters();
419 }
420
421 self.store_status(Status::Stopped);
422 tracing::info!(?reason, "lifecycle stopped");
423 Ok(reason)
424 }
425
426 #[inline]
428 #[must_use]
429 pub fn status(&self) -> Status {
430 self.load_status()
431 }
432
433 #[inline]
435 pub fn is_running(&self) -> bool {
436 matches!(self.status(), Status::Starting | Status::Running)
437 }
438
439 #[inline]
441 #[must_use]
442 pub fn try_start<F, Fut>(&self, make: F) -> bool
443 where
444 F: FnOnce(CancellationToken) -> Fut + Send + 'static,
445 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
446 {
447 self.start(make).is_ok()
448 }
449
450 pub async fn wait_stopped(&self) {
452 if self.finished.load(Ordering::Acquire) {
453 return;
454 }
455 self.finished_notify.notified().await;
456 }
457}
458
459impl Default for Lifecycle {
460 fn default() -> Self {
461 Self::new()
462 }
463}
464
465impl Drop for Lifecycle {
466 fn drop(&mut self) {
468 if let Some(tok) = self.cancel.get_mut().take() {
469 tok.cancel();
470 }
471 if let Some(handle) = self.handle.get_mut().take() {
472 handle.abort();
473 }
474 }
475}
476
477#[must_use]
481pub struct WithLifecycle<T: Runnable> {
482 inner: Arc<T>,
483 lc: Arc<Lifecycle>,
484 pub(crate) stop_timeout: Duration,
485 await_ready: bool,
487 has_ready_handler: bool,
488 run_ready_fn: Option<ReadyFn<T>>,
489}
490
491impl<T: Runnable> WithLifecycle<T> {
492 pub fn new(inner: T) -> Self {
493 Self {
494 inner: Arc::new(inner),
495 lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
496 stop_timeout: Duration::from_secs(30),
497 await_ready: false,
498 has_ready_handler: false,
499 run_ready_fn: None,
500 }
501 }
502
503 pub fn from_arc(inner: Arc<T>) -> Self {
504 Self {
505 inner,
506 lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
507 stop_timeout: Duration::from_secs(30),
508 await_ready: false,
509 has_ready_handler: false,
510 run_ready_fn: None,
511 }
512 }
513
514 pub fn new_with_name(inner: T, name: &'static str) -> Self {
515 Self {
516 inner: Arc::new(inner),
517 lc: Arc::new(Lifecycle::new_named(name)),
518 stop_timeout: Duration::from_secs(30),
519 await_ready: false,
520 has_ready_handler: false,
521 run_ready_fn: None,
522 }
523 }
524
525 pub fn from_arc_with_name(inner: Arc<T>, name: &'static str) -> Self {
526 Self {
527 inner,
528 lc: Arc::new(Lifecycle::new_named(name)),
529 stop_timeout: Duration::from_secs(30),
530 await_ready: false,
531 has_ready_handler: false,
532 run_ready_fn: None,
533 }
534 }
535
536 pub fn with_stop_timeout(mut self, d: Duration) -> Self {
537 self.stop_timeout = d;
538 self
539 }
540
541 #[inline]
542 #[must_use]
543 pub fn status(&self) -> Status {
544 self.lc.status()
545 }
546
547 #[inline]
548 #[must_use]
549 pub fn inner(&self) -> &T {
550 self.inner.as_ref()
551 }
552
553 #[inline]
555 #[must_use]
556 pub fn inner_arc(&self) -> Arc<T> {
557 self.inner.clone()
558 }
559
560 pub fn with_ready_mode(
562 mut self,
563 await_ready: bool,
564 has_ready_handler: bool,
565 run_ready_fn: Option<ReadyFn<T>>,
566 ) -> Self {
567 self.await_ready = await_ready;
568 self.has_ready_handler = has_ready_handler;
569 self.run_ready_fn = run_ready_fn;
570 self
571 }
572}
573
574impl<T: Runnable + Default> Default for WithLifecycle<T> {
575 fn default() -> Self {
576 Self::new(T::default())
577 }
578}
579
580#[async_trait]
581impl<T: Runnable> crate::contracts::RunnableCapability for WithLifecycle<T> {
582 #[tracing::instrument(skip(self, external_cancel), level = "debug")]
583 async fn start(&self, external_cancel: CancellationToken) -> TaskResult<()> {
584 let inner = self.inner.clone();
585 let composed = external_cancel.child_token();
586
587 if !self.await_ready {
588 self.lc
589 .start_with_token(composed, move |cancel| inner.run(cancel))
590 .map_err(anyhow::Error::from)
591 } else if self.has_ready_handler {
592 let f = self.run_ready_fn.ok_or_else(|| {
593 anyhow::anyhow!("run_ready_fn must be set when has_ready_handler")
594 })?;
595 self.lc
596 .start_with_ready_and_token(composed, move |cancel, ready| f(inner, cancel, ready))
597 .map_err(anyhow::Error::from)
598 } else {
599 self.lc
600 .start_with_ready_and_token(composed, move |cancel, ready| async move {
601 ready.notify();
603 inner.run(cancel).await
604 })
605 .map_err(anyhow::Error::from)
606 }
607 }
608
609 #[tracing::instrument(skip(self, external_cancel), level = "debug")]
610 async fn stop(&self, external_cancel: CancellationToken) -> TaskResult<()> {
611 tokio::select! {
612 res = self.lc.stop(self.stop_timeout) => {
613 _ = res.map_err(anyhow::Error::from)?;
614 Ok(())
615 }
616 () = external_cancel.cancelled() => {
617 _ = self.lc.stop(Duration::from_millis(0)).await?;
618 Ok(())
619 }
620 }
621 }
622}
623
624impl<T: Runnable> Drop for WithLifecycle<T> {
625 fn drop(&mut self) {
627 if Arc::strong_count(&self.lc) == 1 {
628 if let Some(tok) = self.lc.cancel.lock().as_ref() {
629 tok.cancel();
630 }
631 if let Some(handle) = self.lc.handle.lock().as_ref() {
632 handle.abort();
633 }
634 }
635 }
636}
637
638#[cfg(test)]
641#[cfg_attr(coverage_nightly, coverage(off))]
642mod tests {
643 use super::*;
644 use std::sync::atomic::{AtomicU32, Ordering as AOrd};
645 use tokio::time::{Duration, sleep};
646
647 struct TestRunnable {
648 counter: AtomicU32,
649 }
650
651 impl TestRunnable {
652 fn new() -> Self {
653 Self {
654 counter: AtomicU32::new(0),
655 }
656 }
657 fn count(&self) -> u32 {
658 self.counter.load(AOrd::Relaxed)
659 }
660 }
661
662 #[async_trait::async_trait]
663 impl Runnable for TestRunnable {
664 async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()> {
665 let mut interval = tokio::time::interval(Duration::from_millis(10));
666 loop {
667 tokio::select! {
668 _ = interval.tick() => { self.counter.fetch_add(1, AOrd::Relaxed); }
669 () = cancel.cancelled() => break,
670 }
671 }
672 Ok(())
673 }
674 }
675
676 #[tokio::test]
677 async fn lifecycle_basic() {
678 let lc = Arc::new(Lifecycle::new());
679 assert_eq!(lc.status(), Status::Stopped);
680
681 let result = lc.start(|cancel| async move {
682 cancel.cancelled().await;
683 Ok(())
684 });
685 assert!(result.is_ok());
686
687 let stop_result = lc.stop(Duration::from_millis(100)).await;
688 assert!(stop_result.is_ok());
689 assert_eq!(lc.status(), Status::Stopped);
690 }
691
692 #[tokio::test]
693 async fn with_lifecycle_wrapper_basics() {
694 let runnable = TestRunnable::new();
695 let wrapper = WithLifecycle::new(runnable);
696
697 assert_eq!(wrapper.status(), Status::Stopped);
698 assert_eq!(wrapper.inner().count(), 0);
699
700 let wrapper = wrapper.with_stop_timeout(Duration::from_secs(60));
701 assert_eq!(wrapper.stop_timeout.as_secs(), 60);
702 }
703
704 #[tokio::test]
705 async fn start_sets_running_immediately() {
706 let lc = Lifecycle::new();
707 lc.start(|cancel| async move {
708 cancel.cancelled().await;
709 Ok(())
710 })
711 .unwrap();
712
713 let s = lc.status();
714 assert!(matches!(s, Status::Running | Status::Starting));
715
716 let _ = lc.stop(Duration::from_millis(50)).await.unwrap();
717 assert_eq!(lc.status(), Status::Stopped);
718 }
719
720 #[tokio::test]
721 async fn start_with_ready_transitions_and_stop() {
722 let lc = Lifecycle::new();
723
724 let (ready_tx, ready_rx) = oneshot::channel::<()>();
725 lc.start_with_ready(move |cancel, ready| async move {
726 _ = ready_rx.await;
727 ready.notify();
728 cancel.cancelled().await;
729 Ok(())
730 })
731 .unwrap();
732
733 assert_eq!(lc.status(), Status::Starting);
734
735 _ = ready_tx.send(());
736 sleep(Duration::from_millis(10)).await;
737 assert_eq!(lc.status(), Status::Running);
738
739 let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
740 assert!(matches!(
741 reason,
742 StopReason::Cancelled | StopReason::Finished
743 ));
744 assert_eq!(lc.status(), Status::Stopped);
745 }
746
747 #[tokio::test]
748 async fn stop_while_starting_before_ready() {
749 let lc = Lifecycle::new();
750
751 lc.start_with_ready(move |cancel, _ready| async move {
752 cancel.cancelled().await;
753 Ok(())
754 })
755 .unwrap();
756
757 assert_eq!(lc.status(), Status::Starting);
758
759 let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
760 assert!(matches!(
761 reason,
762 StopReason::Cancelled | StopReason::Finished
763 ));
764 assert_eq!(lc.status(), Status::Stopped);
765 }
766
767 #[tokio::test]
768 async fn timeout_path_aborts_and_notifies() {
769 let lc = Lifecycle::new();
770
771 lc.start(|_cancel| async move {
772 loop {
773 sleep(Duration::from_secs(1000)).await;
774 }
775 #[allow(unreachable_code)]
776 Ok::<_, anyhow::Error>(())
777 })
778 .unwrap();
779
780 let reason = lc.stop(Duration::from_millis(30)).await.unwrap();
781 assert_eq!(reason, StopReason::Timeout);
782 assert_eq!(lc.status(), Status::Stopped);
783 }
784
785 #[tokio::test]
786 async fn try_start_and_second_start_fails() {
787 let lc = Lifecycle::new();
788
789 assert!(lc.try_start(|cancel| async move {
790 cancel.cancelled().await;
791 Ok(())
792 }));
793
794 let err = lc.start(|_c| async { Ok(()) }).unwrap_err();
795 match err {
796 LifecycleError::AlreadyStarted => {}
797 }
798
799 let _ = lc.stop(Duration::from_millis(80)).await.unwrap();
800 assert_eq!(lc.status(), Status::Stopped);
801 }
802
803 #[tokio::test]
804 async fn stop_is_idempotent_and_safe_concurrent() {
805 let lc = Arc::new(Lifecycle::new());
806
807 lc.start(|cancel| async move {
808 cancel.cancelled().await;
809 Ok(())
810 })
811 .unwrap();
812
813 let a = lc.clone();
814 let b = lc.clone();
815 let (r1, r2) = tokio::join!(
816 async move { a.stop(Duration::from_millis(80)).await },
817 async move { b.stop(Duration::from_millis(80)).await },
818 );
819
820 let r1 = r1.unwrap();
821 let r2 = r2.unwrap();
822 assert!(matches!(
823 r1,
824 StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
825 ));
826 assert!(matches!(
827 r2,
828 StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
829 ));
830 assert_eq!(lc.status(), Status::Stopped);
831 }
832
833 #[tokio::test]
834 async fn stateful_wrapper_start_stop_roundtrip() {
835 use crate::contracts::RunnableCapability;
836
837 let wrapper = WithLifecycle::new(TestRunnable::new());
838 assert_eq!(wrapper.status(), Status::Stopped);
839
840 wrapper.start(CancellationToken::new()).await.unwrap();
841 assert!(wrapper.lc.is_running());
842
843 wrapper.stop(CancellationToken::new()).await.unwrap();
844 assert_eq!(wrapper.status(), Status::Stopped);
845 }
846
847 #[tokio::test]
848 async fn with_lifecycle_double_start_fails() {
849 use crate::contracts::RunnableCapability;
850
851 let wrapper = WithLifecycle::new(TestRunnable::new());
852 let cancel = CancellationToken::new();
853 wrapper.start(cancel.clone()).await.unwrap();
854 let err = wrapper.start(cancel).await;
855 assert!(err.is_err());
856 wrapper.stop(CancellationToken::new()).await.unwrap();
857 }
858
859 #[tokio::test]
860 async fn with_lifecycle_concurrent_stop_calls() {
861 use crate::contracts::RunnableCapability;
862 let wrapper = Arc::new(WithLifecycle::new(TestRunnable::new()));
863 wrapper.start(CancellationToken::new()).await.unwrap();
864 let a = wrapper.clone();
865 let b = wrapper.clone();
866 let (r1, r2) = tokio::join!(
867 async move { a.stop(CancellationToken::new()).await },
868 async move { b.stop(CancellationToken::new()).await },
869 );
870 assert!(r1.is_ok());
871 assert!(r2.is_ok());
872 assert_eq!(wrapper.status(), Status::Stopped);
873 }
874
875 #[tokio::test]
876 async fn lifecycle_handles_panics_properly() {
877 let lc = Lifecycle::new();
878
879 lc.start(|_cancel| async {
881 panic!("test panic message");
882 })
883 .unwrap();
884
885 tokio::time::sleep(Duration::from_millis(50)).await;
887
888 let reason = lc.stop(Duration::from_millis(1000)).await.unwrap();
890
891 assert!(matches!(
894 reason,
895 StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
896 ));
897 assert_eq!(lc.status(), Status::Stopped);
898 }
899
900 #[tokio::test]
901 async fn lifecycle_task_naming_and_logging() {
902 let lc = Lifecycle::new();
903
904 lc.start(|cancel| async move {
906 cancel.cancelled().await;
907 Ok(())
908 })
909 .unwrap();
910
911 assert!(lc.is_running());
913
914 let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
916 assert!(matches!(
917 reason,
918 StopReason::Cancelled | StopReason::Finished
919 ));
920 assert_eq!(lc.status(), Status::Stopped);
921 }
922
923 #[tokio::test]
924 async fn lifecycle_join_handles_all_tasks() {
925 let lc = Arc::new(Lifecycle::new());
926
927 lc.start(|cancel| async move {
929 tokio::time::sleep(Duration::from_millis(10)).await;
930 cancel.cancelled().await;
931 Ok(())
932 })
933 .unwrap();
934
935 let start = std::time::Instant::now();
937 let reason = lc.stop(Duration::from_millis(200)).await.unwrap();
938 let elapsed = start.elapsed();
939
940 assert!(elapsed >= Duration::from_millis(10));
942 assert!(matches!(
943 reason,
944 StopReason::Cancelled | StopReason::Finished
945 ));
946 assert_eq!(lc.status(), Status::Stopped);
947 }
948}