1use async_trait::async_trait;
2use parking_lot::Mutex;
3use std::sync::{
4 Arc,
5 atomic::{AtomicBool, AtomicU8, Ordering},
6};
7use std::time::Duration;
8use tokio::sync::{Notify, oneshot};
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11
12type LcResult<T = ()> = std::result::Result<T, LifecycleError>;
16
17type TaskResult<T = ()> = anyhow::Result<T>;
19
20type ReadyFn<T> = fn(
22 Arc<T>,
23 CancellationToken,
24 ReadySignal,
25)
26 -> std::pin::Pin<Box<dyn std::future::Future<Output = TaskResult<()>> + Send>>;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32#[repr(u8)]
33pub enum Status {
34 Stopped,
35 Starting,
36 Running,
37 Stopping,
38}
39
40impl Status {
41 #[inline]
42 #[must_use]
43 pub const fn as_u8(self) -> u8 {
44 match self {
45 Status::Stopped => 0,
46 Status::Starting => 1,
47 Status::Running => 2,
48 Status::Stopping => 3,
49 }
50 }
51 #[inline]
52 #[must_use]
53 pub const fn from_u8(x: u8) -> Self {
54 match x {
55 1 => Status::Starting,
56 2 => Status::Running,
57 3 => Status::Stopping,
58 _ => Status::Stopped,
59 }
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum StopReason {
66 Finished,
67 Cancelled,
68 Timeout,
69}
70
71pub struct ReadySignal(oneshot::Sender<()>);
75
76impl ReadySignal {
77 #[inline]
78 pub fn notify(self) {
79 if self.0.send(()).is_err() {
80 tracing::debug!("ReadySignal::notify: receiver already dropped");
81 }
82 }
83 #[inline]
85 #[must_use]
86 pub fn from_sender(sender: tokio::sync::oneshot::Sender<()>) -> Self {
87 ReadySignal(sender)
88 }
89}
90
91#[async_trait]
96pub trait Runnable: Send + Sync + 'static {
97 async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()>;
99}
100
101#[derive(Debug, thiserror::Error)]
105pub enum LifecycleError {
106 #[error("already started")]
107 AlreadyStarted,
108}
109
110pub struct Lifecycle {
119 name: &'static str,
120 status: Arc<AtomicU8>,
121 handle: Mutex<Option<JoinHandle<()>>>,
122 cancel: Mutex<Option<CancellationToken>>,
123 finished: Arc<AtomicBool>,
125 was_cancelled: Arc<AtomicBool>,
127 finished_notify: Arc<Notify>,
129}
130
131impl Lifecycle {
132 #[must_use]
133 pub fn new_named(name: &'static str) -> Self {
134 Self {
135 name,
136 status: Arc::new(AtomicU8::new(Status::Stopped.as_u8())),
137 handle: Mutex::new(None),
138 cancel: Mutex::new(None),
139 finished: Arc::new(AtomicBool::new(false)),
140 was_cancelled: Arc::new(AtomicBool::new(false)),
141 finished_notify: Arc::new(Notify::new()),
142 }
143 }
144
145 #[must_use]
146 pub fn new() -> Self {
147 Self::new_named("lifecycle")
148 }
149
150 #[inline]
151 pub fn name(&self) -> &'static str {
152 self.name
153 }
154
155 #[inline]
158 fn load_status(&self) -> Status {
159 Status::from_u8(self.status.load(Ordering::Acquire))
160 }
161
162 #[inline]
163 fn store_status(&self, s: Status) {
164 self.status.store(s.as_u8(), Ordering::Release);
165 }
166
167 #[tracing::instrument(skip(self, make), level = "debug")]
177 pub fn start<F, Fut>(&self, make: F) -> LcResult
178 where
179 F: FnOnce(CancellationToken) -> Fut + Send + 'static,
180 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
181 {
182 self.start_core(CancellationToken::new(), move |tok, _| make(tok), false)
183 }
184
185 #[tracing::instrument(skip(self, make, token), level = "debug")]
190 pub fn start_with_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
191 where
192 F: FnOnce(CancellationToken) -> Fut + Send + 'static,
193 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
194 {
195 self.start_core(token, move |tok, _| make(tok), false)
196 }
197
198 #[tracing::instrument(skip(self, make), level = "debug")]
203 pub fn start_with_ready<F, Fut>(&self, make: F) -> LcResult
204 where
205 F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
206 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
207 {
208 self.start_core(
209 CancellationToken::new(),
210 move |tok, rdy| async move {
211 let Some(rdy) = rdy else {
212 return Err(anyhow::anyhow!("ReadySignal must be present"));
213 };
214 make(tok, rdy).await
215 },
216 true,
217 )
218 }
219
220 #[tracing::instrument(skip(self, make, token), level = "debug")]
225 pub fn start_with_ready_and_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
226 where
227 F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
228 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
229 {
230 self.start_core(
231 token,
232 move |tok, rdy| async move {
233 let Some(rdy) = rdy else {
234 return Err(anyhow::anyhow!("ReadySignal must be present"));
235 };
236 make(tok, rdy).await
237 },
238 true,
239 )
240 }
241
242 fn start_core<F, Fut>(&self, token: CancellationToken, make: F, ready_mode: bool) -> LcResult
247 where
248 F: Send + 'static + FnOnce(CancellationToken, Option<ReadySignal>) -> Fut,
249 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
250 {
251 let cas_ok = self
253 .status
254 .compare_exchange(
255 Status::Stopped.as_u8(),
256 Status::Starting.as_u8(),
257 Ordering::AcqRel,
258 Ordering::Acquire,
259 )
260 .is_ok();
261 if !cas_ok {
262 return Err(LifecycleError::AlreadyStarted);
263 }
264
265 self.finished.store(false, Ordering::Release);
266 self.was_cancelled.store(false, Ordering::Release);
267
268 {
270 let mut c = self.cancel.lock();
271 *c = Some(token.clone());
272 }
273
274 let (ready_tx, ready_rx) = oneshot::channel::<()>();
277 if ready_mode {
278 let status_on_ready = self.status.clone();
279 tokio::spawn(async move {
280 if ready_rx.await.is_ok() {
281 _ = status_on_ready.compare_exchange(
282 Status::Starting.as_u8(),
283 Status::Running.as_u8(),
284 Ordering::AcqRel,
285 Ordering::Acquire,
286 );
287 tracing::debug!("lifecycle status -> running (ready)");
288 } else {
289 tracing::debug!("ready signal dropped; staying in Starting until finish");
292 }
293 });
294 } else {
295 self.store_status(Status::Running);
296 tracing::debug!("lifecycle status -> running");
297 }
298
299 let finished_flag = self.finished.clone();
300 let finished_notify = self.finished_notify.clone();
301 let status_on_finish = self.status.clone();
302
303 let module_name = self.name;
305 let task_id = format!("{module_name}-{self:p}");
306 let handle = tokio::spawn(async move {
307 tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task starting");
308 let res = make(token, ready_mode.then(|| ReadySignal(ready_tx))).await;
309 if let Err(e) = res {
310 tracing::error!(error=%e, task_id=%task_id, module = %module_name, "lifecycle task error");
311 }
312 finished_flag.store(true, Ordering::Release);
313 finished_notify.notify_waiters();
314 status_on_finish.store(Status::Stopped.as_u8(), Ordering::Release);
315 tracing::debug!(task_id=%task_id, module = %module_name, "lifecycle task finished");
316 });
317
318 {
320 let mut h = self.handle.lock();
321 *h = Some(handle);
322 }
323
324 Ok(())
325 }
326
327 #[tracing::instrument(skip(self, timeout), level = "debug")]
332 pub async fn stop(&self, timeout: Duration) -> LcResult<StopReason> {
333 let module_name = self.name;
334 let task_id = format!("{module_name}-{self:p}");
335 let st = self.load_status();
336 if !matches!(st, Status::Starting | Status::Running | Status::Stopping) {
337 return Ok(StopReason::Finished);
339 }
340
341 self.store_status(Status::Stopping);
342
343 if let Some(tok) = { self.cancel.lock().take() } {
345 self.was_cancelled.store(true, Ordering::Release);
346 tok.cancel();
347 }
348
349 let finished_flag = self.finished.clone();
351 let notify = self.finished_notify.clone();
352 let finished_wait = async move {
353 if finished_flag.load(Ordering::Acquire) {
354 return;
355 }
356 notify.notified().await;
357 };
358
359 let reason = tokio::select! {
360 () = finished_wait => {
361 if self.was_cancelled.load(Ordering::Acquire) {
362 StopReason::Cancelled
363 } else {
364 StopReason::Finished
365 }
366 }
367 () = tokio::time::sleep(timeout) => StopReason::Timeout,
368 };
369
370 let handle_opt = { self.handle.lock().take() };
372 if let Some(handle) = handle_opt {
373 if matches!(reason, StopReason::Timeout) && !handle.is_finished() {
374 tracing::warn!("lifecycle stop timed out; aborting task");
375 handle.abort();
376 }
377
378 match handle.await {
379 Ok(()) => {
380 tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task completed successfully");
381 }
382 Err(e) if e.is_cancelled() => {
383 tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task was cancelled/aborted");
384 }
385 Err(e) if e.is_panic() => {
386 match e.try_into_panic() {
388 Ok(panic_payload) => {
389 let panic_msg = panic_payload
390 .downcast_ref::<&str>()
391 .copied()
392 .map(str::to_owned)
393 .or_else(|| panic_payload.downcast_ref::<String>().cloned())
394 .unwrap_or_else(|| "unknown panic".to_owned());
395
396 tracing::error!(
397 task_id = %task_id,
398 module = %module_name,
399 panic_message = %panic_msg,
400 "lifecycle task panicked - this indicates a serious bug"
401 );
402 }
403 _ => {
404 tracing::error!(
405 task_id = %task_id,
406 module = %module_name,
407 "lifecycle task panicked (could not extract panic message)"
408 );
409 }
410 }
411 }
412 Err(e) => {
413 tracing::warn!(task_id = %task_id, module = %module_name, error = %e, "lifecycle task join error");
414 }
415 }
416
417 self.finished.store(true, Ordering::Release);
418 self.finished_notify.notify_waiters();
419 }
420
421 self.store_status(Status::Stopped);
422 tracing::info!(?reason, "lifecycle stopped");
423 Ok(reason)
424 }
425
426 #[inline]
428 #[must_use]
429 pub fn status(&self) -> Status {
430 self.load_status()
431 }
432
433 #[inline]
435 pub fn is_running(&self) -> bool {
436 matches!(self.status(), Status::Starting | Status::Running)
437 }
438
439 #[inline]
441 #[must_use]
442 pub fn try_start<F, Fut>(&self, make: F) -> bool
443 where
444 F: FnOnce(CancellationToken) -> Fut + Send + 'static,
445 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
446 {
447 self.start(make).is_ok()
448 }
449
450 pub async fn wait_stopped(&self) {
452 if self.finished.load(Ordering::Acquire) {
453 return;
454 }
455 self.finished_notify.notified().await;
456 }
457}
458
459impl Default for Lifecycle {
460 fn default() -> Self {
461 Self::new()
462 }
463}
464
465impl Drop for Lifecycle {
466 fn drop(&mut self) {
468 if let Some(tok) = self.cancel.get_mut().take() {
469 tok.cancel();
470 }
471 if let Some(handle) = self.handle.get_mut().take() {
472 handle.abort();
473 }
474 }
475}
476
477#[must_use]
481pub struct WithLifecycle<T: Runnable> {
482 inner: Arc<T>,
483 lc: Arc<Lifecycle>,
484 pub(crate) stop_timeout: Duration,
485 await_ready: bool,
487 has_ready_handler: bool,
488 run_ready_fn: Option<ReadyFn<T>>,
489}
490
491impl<T: Runnable> WithLifecycle<T> {
492 pub fn new(inner: T) -> Self {
493 Self {
494 inner: Arc::new(inner),
495 lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
496 stop_timeout: Duration::from_secs(30),
497 await_ready: false,
498 has_ready_handler: false,
499 run_ready_fn: None,
500 }
501 }
502
503 pub fn from_arc(inner: Arc<T>) -> Self {
504 Self {
505 inner,
506 lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
507 stop_timeout: Duration::from_secs(30),
508 await_ready: false,
509 has_ready_handler: false,
510 run_ready_fn: None,
511 }
512 }
513
514 pub fn new_with_name(inner: T, name: &'static str) -> Self {
515 Self {
516 inner: Arc::new(inner),
517 lc: Arc::new(Lifecycle::new_named(name)),
518 stop_timeout: Duration::from_secs(30),
519 await_ready: false,
520 has_ready_handler: false,
521 run_ready_fn: None,
522 }
523 }
524
525 pub fn from_arc_with_name(inner: Arc<T>, name: &'static str) -> Self {
526 Self {
527 inner,
528 lc: Arc::new(Lifecycle::new_named(name)),
529 stop_timeout: Duration::from_secs(30),
530 await_ready: false,
531 has_ready_handler: false,
532 run_ready_fn: None,
533 }
534 }
535
536 pub fn with_stop_timeout(mut self, d: Duration) -> Self {
552 self.stop_timeout = d;
553 self
554 }
555
556 #[inline]
557 #[must_use]
558 pub fn status(&self) -> Status {
559 self.lc.status()
560 }
561
562 #[inline]
563 #[must_use]
564 pub fn inner(&self) -> &T {
565 self.inner.as_ref()
566 }
567
568 #[inline]
570 #[must_use]
571 pub fn inner_arc(&self) -> Arc<T> {
572 self.inner.clone()
573 }
574
575 pub fn with_ready_mode(
577 mut self,
578 await_ready: bool,
579 has_ready_handler: bool,
580 run_ready_fn: Option<ReadyFn<T>>,
581 ) -> Self {
582 self.await_ready = await_ready;
583 self.has_ready_handler = has_ready_handler;
584 self.run_ready_fn = run_ready_fn;
585 self
586 }
587}
588
589impl<T: Runnable + Default> Default for WithLifecycle<T> {
590 fn default() -> Self {
591 Self::new(T::default())
592 }
593}
594
595#[async_trait]
596impl<T: Runnable> crate::contracts::RunnableCapability for WithLifecycle<T> {
597 #[tracing::instrument(skip(self, external_cancel), level = "debug")]
598 async fn start(&self, external_cancel: CancellationToken) -> TaskResult<()> {
599 let inner = self.inner.clone();
600 let composed = external_cancel.child_token();
601
602 if !self.await_ready {
603 self.lc
604 .start_with_token(composed, move |cancel| inner.run(cancel))
605 .map_err(anyhow::Error::from)
606 } else if self.has_ready_handler {
607 let f = self.run_ready_fn.ok_or_else(|| {
608 anyhow::anyhow!("run_ready_fn must be set when has_ready_handler")
609 })?;
610 self.lc
611 .start_with_ready_and_token(composed, move |cancel, ready| f(inner, cancel, ready))
612 .map_err(anyhow::Error::from)
613 } else {
614 self.lc
615 .start_with_ready_and_token(composed, move |cancel, ready| async move {
616 ready.notify();
618 inner.run(cancel).await
619 })
620 .map_err(anyhow::Error::from)
621 }
622 }
623
624 #[tracing::instrument(skip(self, deadline_token), level = "debug")]
634 async fn stop(&self, deadline_token: CancellationToken) -> TaskResult<()> {
635 tokio::select! {
636 res = self.lc.stop(self.stop_timeout) => {
637 _ = res.map_err(anyhow::Error::from)?;
638 Ok(())
639 }
640 () = deadline_token.cancelled() => {
641 tracing::debug!("Hard-stop deadline reached, aborting lifecycle");
643 _ = self.lc.stop(Duration::from_millis(0)).await?;
644 Ok(())
645 }
646 }
647 }
648}
649
650impl<T: Runnable> Drop for WithLifecycle<T> {
651 fn drop(&mut self) {
653 if Arc::strong_count(&self.lc) == 1 {
654 if let Some(tok) = self.lc.cancel.lock().as_ref() {
655 tok.cancel();
656 }
657 if let Some(handle) = self.lc.handle.lock().as_ref() {
658 handle.abort();
659 }
660 }
661 }
662}
663
664#[cfg(test)]
667#[cfg_attr(coverage_nightly, coverage(off))]
668mod tests {
669 use super::*;
670 use std::sync::atomic::{AtomicU32, Ordering as AOrd};
671 use tokio::time::{Duration, sleep};
672
673 struct TestRunnable {
674 counter: AtomicU32,
675 }
676
677 impl TestRunnable {
678 fn new() -> Self {
679 Self {
680 counter: AtomicU32::new(0),
681 }
682 }
683 fn count(&self) -> u32 {
684 self.counter.load(AOrd::Relaxed)
685 }
686 }
687
688 #[async_trait::async_trait]
689 impl Runnable for TestRunnable {
690 async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()> {
691 let mut interval = tokio::time::interval(Duration::from_millis(10));
692 loop {
693 tokio::select! {
694 _ = interval.tick() => { self.counter.fetch_add(1, AOrd::Relaxed); }
695 () = cancel.cancelled() => break,
696 }
697 }
698 Ok(())
699 }
700 }
701
702 #[tokio::test]
703 async fn lifecycle_basic() {
704 let lc = Arc::new(Lifecycle::new());
705 assert_eq!(lc.status(), Status::Stopped);
706
707 let result = lc.start(|cancel| async move {
708 cancel.cancelled().await;
709 Ok(())
710 });
711 assert!(result.is_ok());
712
713 let stop_result = lc.stop(Duration::from_millis(100)).await;
714 assert!(stop_result.is_ok());
715 assert_eq!(lc.status(), Status::Stopped);
716 }
717
718 #[tokio::test]
719 async fn with_lifecycle_wrapper_basics() {
720 let runnable = TestRunnable::new();
721 let wrapper = WithLifecycle::new(runnable);
722
723 assert_eq!(wrapper.status(), Status::Stopped);
724 assert_eq!(wrapper.inner().count(), 0);
725
726 let wrapper = wrapper.with_stop_timeout(Duration::from_mins(1));
727 assert_eq!(wrapper.stop_timeout.as_secs(), 60);
728 }
729
730 #[tokio::test]
731 async fn start_sets_running_immediately() {
732 let lc = Lifecycle::new();
733 lc.start(|cancel| async move {
734 cancel.cancelled().await;
735 Ok(())
736 })
737 .unwrap();
738
739 let s = lc.status();
740 assert!(matches!(s, Status::Running | Status::Starting));
741
742 let _ = lc.stop(Duration::from_millis(50)).await.unwrap();
743 assert_eq!(lc.status(), Status::Stopped);
744 }
745
746 #[tokio::test]
747 async fn start_with_ready_transitions_and_stop() {
748 let lc = Lifecycle::new();
749
750 let (ready_tx, ready_rx) = oneshot::channel::<()>();
751 lc.start_with_ready(move |cancel, ready| async move {
752 _ = ready_rx.await;
753 ready.notify();
754 cancel.cancelled().await;
755 Ok(())
756 })
757 .unwrap();
758
759 assert_eq!(lc.status(), Status::Starting);
760
761 _ = ready_tx.send(());
762 sleep(Duration::from_millis(10)).await;
763 assert_eq!(lc.status(), Status::Running);
764
765 let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
766 assert!(matches!(
767 reason,
768 StopReason::Cancelled | StopReason::Finished
769 ));
770 assert_eq!(lc.status(), Status::Stopped);
771 }
772
773 #[tokio::test]
774 async fn stop_while_starting_before_ready() {
775 let lc = Lifecycle::new();
776
777 lc.start_with_ready(move |cancel, _ready| async move {
778 cancel.cancelled().await;
779 Ok(())
780 })
781 .unwrap();
782
783 assert_eq!(lc.status(), Status::Starting);
784
785 let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
786 assert!(matches!(
787 reason,
788 StopReason::Cancelled | StopReason::Finished
789 ));
790 assert_eq!(lc.status(), Status::Stopped);
791 }
792
793 #[tokio::test]
794 async fn timeout_path_aborts_and_notifies() {
795 let lc = Lifecycle::new();
796
797 lc.start(|_cancel| async move {
798 loop {
799 sleep(Duration::from_secs(1000)).await;
800 }
801 #[allow(unreachable_code)]
802 Ok::<_, anyhow::Error>(())
803 })
804 .unwrap();
805
806 let reason = lc.stop(Duration::from_millis(30)).await.unwrap();
807 assert_eq!(reason, StopReason::Timeout);
808 assert_eq!(lc.status(), Status::Stopped);
809 }
810
811 #[tokio::test]
812 async fn try_start_and_second_start_fails() {
813 let lc = Lifecycle::new();
814
815 assert!(lc.try_start(|cancel| async move {
816 cancel.cancelled().await;
817 Ok(())
818 }));
819
820 let err = lc.start(|_c| async { Ok(()) }).unwrap_err();
821 match err {
822 LifecycleError::AlreadyStarted => {}
823 }
824
825 let _ = lc.stop(Duration::from_millis(80)).await.unwrap();
826 assert_eq!(lc.status(), Status::Stopped);
827 }
828
829 #[tokio::test]
830 async fn stop_is_idempotent_and_safe_concurrent() {
831 let lc = Arc::new(Lifecycle::new());
832
833 lc.start(|cancel| async move {
834 cancel.cancelled().await;
835 Ok(())
836 })
837 .unwrap();
838
839 let a = lc.clone();
840 let b = lc.clone();
841 let (r1, r2) = tokio::join!(
842 async move { a.stop(Duration::from_millis(80)).await },
843 async move { b.stop(Duration::from_millis(80)).await },
844 );
845
846 let r1 = r1.unwrap();
847 let r2 = r2.unwrap();
848 assert!(matches!(
849 r1,
850 StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
851 ));
852 assert!(matches!(
853 r2,
854 StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
855 ));
856 assert_eq!(lc.status(), Status::Stopped);
857 }
858
859 #[tokio::test]
860 async fn stateful_wrapper_start_stop_roundtrip() {
861 use crate::contracts::RunnableCapability;
862
863 let wrapper = WithLifecycle::new(TestRunnable::new());
864 assert_eq!(wrapper.status(), Status::Stopped);
865
866 wrapper.start(CancellationToken::new()).await.unwrap();
867 assert!(wrapper.lc.is_running());
868
869 wrapper.stop(CancellationToken::new()).await.unwrap();
870 assert_eq!(wrapper.status(), Status::Stopped);
871 }
872
873 #[tokio::test]
874 async fn with_lifecycle_double_start_fails() {
875 use crate::contracts::RunnableCapability;
876
877 let wrapper = WithLifecycle::new(TestRunnable::new());
878 let cancel = CancellationToken::new();
879 wrapper.start(cancel.clone()).await.unwrap();
880 let err = wrapper.start(cancel).await;
881 assert!(err.is_err());
882 wrapper.stop(CancellationToken::new()).await.unwrap();
883 }
884
885 #[tokio::test]
886 async fn with_lifecycle_concurrent_stop_calls() {
887 use crate::contracts::RunnableCapability;
888 let wrapper = Arc::new(WithLifecycle::new(TestRunnable::new()));
889 wrapper.start(CancellationToken::new()).await.unwrap();
890 let a = wrapper.clone();
891 let b = wrapper.clone();
892 let (r1, r2) = tokio::join!(
893 async move { a.stop(CancellationToken::new()).await },
894 async move { b.stop(CancellationToken::new()).await },
895 );
896 assert!(r1.is_ok());
897 assert!(r2.is_ok());
898 assert_eq!(wrapper.status(), Status::Stopped);
899 }
900
901 #[tokio::test]
902 async fn lifecycle_handles_panics_properly() {
903 let lc = Lifecycle::new();
904
905 lc.start(|_cancel| async {
907 panic!("test panic message");
908 })
909 .unwrap();
910
911 tokio::time::sleep(Duration::from_millis(50)).await;
913
914 let reason = lc.stop(Duration::from_secs(1)).await.unwrap();
916
917 assert!(matches!(
920 reason,
921 StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
922 ));
923 assert_eq!(lc.status(), Status::Stopped);
924 }
925
926 #[tokio::test]
927 async fn lifecycle_task_naming_and_logging() {
928 let lc = Lifecycle::new();
929
930 lc.start(|cancel| async move {
932 cancel.cancelled().await;
933 Ok(())
934 })
935 .unwrap();
936
937 assert!(lc.is_running());
939
940 let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
942 assert!(matches!(
943 reason,
944 StopReason::Cancelled | StopReason::Finished
945 ));
946 assert_eq!(lc.status(), Status::Stopped);
947 }
948
949 #[tokio::test]
950 async fn lifecycle_join_handles_all_tasks() {
951 let lc = Arc::new(Lifecycle::new());
952
953 lc.start(|cancel| async move {
955 tokio::time::sleep(Duration::from_millis(10)).await;
956 cancel.cancelled().await;
957 Ok(())
958 })
959 .unwrap();
960
961 let start = std::time::Instant::now();
963 let reason = lc.stop(Duration::from_millis(200)).await.unwrap();
964 let elapsed = start.elapsed();
965
966 assert!(elapsed >= Duration::from_millis(10));
968 assert!(matches!(
969 reason,
970 StopReason::Cancelled | StopReason::Finished
971 ));
972 assert_eq!(lc.status(), Status::Stopped);
973 }
974}