1use async_trait::async_trait;
2use parking_lot::Mutex;
3use std::sync::{
4 Arc,
5 atomic::{AtomicBool, AtomicU8, Ordering},
6};
7use std::time::Duration;
8use tokio::sync::{Notify, oneshot};
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11
12type LcResult<T = ()> = std::result::Result<T, LifecycleError>;
16
17type TaskResult<T = ()> = anyhow::Result<T>;
19
20type ReadyFn<T> = fn(
22 Arc<T>,
23 CancellationToken,
24 ReadySignal,
25)
26 -> std::pin::Pin<Box<dyn std::future::Future<Output = TaskResult<()>> + Send>>;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32#[repr(u8)]
33pub enum Status {
34 Stopped,
35 Starting,
36 Running,
37 Stopping,
38}
39
40impl Status {
41 #[inline]
42 #[must_use]
43 pub const fn as_u8(self) -> u8 {
44 match self {
45 Status::Stopped => 0,
46 Status::Starting => 1,
47 Status::Running => 2,
48 Status::Stopping => 3,
49 }
50 }
51 #[inline]
52 #[must_use]
53 pub const fn from_u8(x: u8) -> Self {
54 match x {
55 1 => Status::Starting,
56 2 => Status::Running,
57 3 => Status::Stopping,
58 _ => Status::Stopped,
59 }
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum StopReason {
66 Finished,
67 Cancelled,
68 Timeout,
69}
70
71pub struct ReadySignal(oneshot::Sender<()>);
75
76impl ReadySignal {
77 #[inline]
78 pub fn notify(self) {
79 _ = self.0.send(());
80 }
81 #[inline]
83 #[must_use]
84 pub fn from_sender(sender: tokio::sync::oneshot::Sender<()>) -> Self {
85 ReadySignal(sender)
86 }
87}
88
89#[async_trait]
94pub trait Runnable: Send + Sync + 'static {
95 async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()>;
97}
98
99#[derive(Debug, thiserror::Error)]
103pub enum LifecycleError {
104 #[error("already started")]
105 AlreadyStarted,
106}
107
108pub struct Lifecycle {
117 name: &'static str,
118 status: Arc<AtomicU8>,
119 handle: Mutex<Option<JoinHandle<()>>>,
120 cancel: Mutex<Option<CancellationToken>>,
121 finished: Arc<AtomicBool>,
123 was_cancelled: Arc<AtomicBool>,
125 finished_notify: Arc<Notify>,
127}
128
129impl Lifecycle {
130 #[must_use]
131 pub fn new_named(name: &'static str) -> Self {
132 Self {
133 name,
134 status: Arc::new(AtomicU8::new(Status::Stopped.as_u8())),
135 handle: Mutex::new(None),
136 cancel: Mutex::new(None),
137 finished: Arc::new(AtomicBool::new(false)),
138 was_cancelled: Arc::new(AtomicBool::new(false)),
139 finished_notify: Arc::new(Notify::new()),
140 }
141 }
142
143 #[must_use]
144 pub fn new() -> Self {
145 Self::new_named("lifecycle")
146 }
147
148 #[inline]
149 pub fn name(&self) -> &'static str {
150 self.name
151 }
152
153 #[inline]
156 fn load_status(&self) -> Status {
157 Status::from_u8(self.status.load(Ordering::Acquire))
158 }
159
160 #[inline]
161 fn store_status(&self, s: Status) {
162 self.status.store(s.as_u8(), Ordering::Release);
163 }
164
165 #[tracing::instrument(skip(self, make), level = "debug")]
175 pub fn start<F, Fut>(&self, make: F) -> LcResult
176 where
177 F: FnOnce(CancellationToken) -> Fut + Send + 'static,
178 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
179 {
180 self.start_core(CancellationToken::new(), move |tok, _| make(tok), false)
181 }
182
183 #[tracing::instrument(skip(self, make, token), level = "debug")]
188 pub fn start_with_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
189 where
190 F: FnOnce(CancellationToken) -> Fut + Send + 'static,
191 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
192 {
193 self.start_core(token, move |tok, _| make(tok), false)
194 }
195
196 #[tracing::instrument(skip(self, make), level = "debug")]
201 pub fn start_with_ready<F, Fut>(&self, make: F) -> LcResult
202 where
203 F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
204 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
205 {
206 self.start_core(
207 CancellationToken::new(),
208 move |tok, rdy| async move {
209 let Some(rdy) = rdy else {
210 return Err(anyhow::anyhow!("ReadySignal must be present"));
211 };
212 make(tok, rdy).await
213 },
214 true,
215 )
216 }
217
218 #[tracing::instrument(skip(self, make, token), level = "debug")]
223 pub fn start_with_ready_and_token<F, Fut>(&self, token: CancellationToken, make: F) -> LcResult
224 where
225 F: FnOnce(CancellationToken, ReadySignal) -> Fut + Send + 'static,
226 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
227 {
228 self.start_core(
229 token,
230 move |tok, rdy| async move {
231 let Some(rdy) = rdy else {
232 return Err(anyhow::anyhow!("ReadySignal must be present"));
233 };
234 make(tok, rdy).await
235 },
236 true,
237 )
238 }
239
240 fn start_core<F, Fut>(&self, token: CancellationToken, make: F, ready_mode: bool) -> LcResult
245 where
246 F: Send + 'static + FnOnce(CancellationToken, Option<ReadySignal>) -> Fut,
247 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
248 {
249 let cas_ok = self
251 .status
252 .compare_exchange(
253 Status::Stopped.as_u8(),
254 Status::Starting.as_u8(),
255 Ordering::AcqRel,
256 Ordering::Acquire,
257 )
258 .is_ok();
259 if !cas_ok {
260 return Err(LifecycleError::AlreadyStarted);
261 }
262
263 self.finished.store(false, Ordering::Release);
264 self.was_cancelled.store(false, Ordering::Release);
265
266 {
268 let mut c = self.cancel.lock();
269 *c = Some(token.clone());
270 }
271
272 let (ready_tx, ready_rx) = oneshot::channel::<()>();
275 if ready_mode {
276 let status_on_ready = self.status.clone();
277 tokio::spawn(async move {
278 if ready_rx.await.is_ok() {
279 _ = status_on_ready.compare_exchange(
280 Status::Starting.as_u8(),
281 Status::Running.as_u8(),
282 Ordering::AcqRel,
283 Ordering::Acquire,
284 );
285 tracing::debug!("lifecycle status -> running (ready)");
286 } else {
287 tracing::debug!("ready signal dropped; staying in Starting until finish");
290 }
291 });
292 } else {
293 self.store_status(Status::Running);
294 tracing::debug!("lifecycle status -> running");
295 }
296
297 let finished_flag = self.finished.clone();
298 let finished_notify = self.finished_notify.clone();
299 let status_on_finish = self.status.clone();
300
301 let module_name = self.name;
303 let task_id = format!("{module_name}-{self:p}");
304 let handle = tokio::spawn(async move {
305 tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task starting");
306 let res = make(token, ready_mode.then(|| ReadySignal(ready_tx))).await;
307 if let Err(e) = res {
308 tracing::error!(error=%e, task_id=%task_id, module = %module_name, "lifecycle task error");
309 }
310 finished_flag.store(true, Ordering::Release);
311 finished_notify.notify_waiters();
312 status_on_finish.store(Status::Stopped.as_u8(), Ordering::Release);
313 tracing::debug!(task_id=%task_id, module = %module_name, "lifecycle task finished");
314 });
315
316 {
318 let mut h = self.handle.lock();
319 *h = Some(handle);
320 }
321
322 Ok(())
323 }
324
325 #[tracing::instrument(skip(self, timeout), level = "debug")]
330 pub async fn stop(&self, timeout: Duration) -> LcResult<StopReason> {
331 let module_name = self.name;
332 let task_id = format!("{module_name}-{self:p}");
333 let st = self.load_status();
334 if !matches!(st, Status::Starting | Status::Running | Status::Stopping) {
335 return Ok(StopReason::Finished);
337 }
338
339 self.store_status(Status::Stopping);
340
341 if let Some(tok) = { self.cancel.lock().take() } {
343 self.was_cancelled.store(true, Ordering::Release);
344 tok.cancel();
345 }
346
347 let finished_flag = self.finished.clone();
349 let notify = self.finished_notify.clone();
350 let finished_wait = async move {
351 if finished_flag.load(Ordering::Acquire) {
352 return;
353 }
354 notify.notified().await;
355 };
356
357 let reason = tokio::select! {
358 () = finished_wait => {
359 if self.was_cancelled.load(Ordering::Acquire) {
360 StopReason::Cancelled
361 } else {
362 StopReason::Finished
363 }
364 }
365 () = tokio::time::sleep(timeout) => StopReason::Timeout,
366 };
367
368 let handle_opt = { self.handle.lock().take() };
370 if let Some(handle) = handle_opt {
371 if matches!(reason, StopReason::Timeout) && !handle.is_finished() {
372 tracing::warn!("lifecycle stop timed out; aborting task");
373 handle.abort();
374 }
375
376 match handle.await {
377 Ok(()) => {
378 tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task completed successfully");
379 }
380 Err(e) if e.is_cancelled() => {
381 tracing::debug!(task_id = %task_id, module = %module_name, "lifecycle task was cancelled/aborted");
382 }
383 Err(e) if e.is_panic() => {
384 match e.try_into_panic() {
386 Ok(panic_payload) => {
387 let panic_msg = panic_payload
388 .downcast_ref::<&str>()
389 .copied()
390 .map(str::to_owned)
391 .or_else(|| panic_payload.downcast_ref::<String>().cloned())
392 .unwrap_or_else(|| "unknown panic".to_owned());
393
394 tracing::error!(
395 task_id = %task_id,
396 module = %module_name,
397 panic_message = %panic_msg,
398 "lifecycle task panicked - this indicates a serious bug"
399 );
400 }
401 _ => {
402 tracing::error!(
403 task_id = %task_id,
404 module = %module_name,
405 "lifecycle task panicked (could not extract panic message)"
406 );
407 }
408 }
409 }
410 Err(e) => {
411 tracing::warn!(task_id = %task_id, module = %module_name, error = %e, "lifecycle task join error");
412 }
413 }
414
415 self.finished.store(true, Ordering::Release);
416 self.finished_notify.notify_waiters();
417 }
418
419 self.store_status(Status::Stopped);
420 tracing::info!(?reason, "lifecycle stopped");
421 Ok(reason)
422 }
423
424 #[inline]
426 #[must_use]
427 pub fn status(&self) -> Status {
428 self.load_status()
429 }
430
431 #[inline]
433 pub fn is_running(&self) -> bool {
434 matches!(self.status(), Status::Starting | Status::Running)
435 }
436
437 #[inline]
439 #[must_use]
440 pub fn try_start<F, Fut>(&self, make: F) -> bool
441 where
442 F: FnOnce(CancellationToken) -> Fut + Send + 'static,
443 Fut: std::future::Future<Output = TaskResult<()>> + Send + 'static,
444 {
445 self.start(make).is_ok()
446 }
447
448 pub async fn wait_stopped(&self) {
450 if self.finished.load(Ordering::Acquire) {
451 return;
452 }
453 self.finished_notify.notified().await;
454 }
455}
456
457impl Default for Lifecycle {
458 fn default() -> Self {
459 Self::new()
460 }
461}
462
463impl Drop for Lifecycle {
464 fn drop(&mut self) {
466 if let Some(tok) = self.cancel.get_mut().take() {
467 tok.cancel();
468 }
469 if let Some(handle) = self.handle.get_mut().take() {
470 handle.abort();
471 }
472 }
473}
474
475#[must_use]
479pub struct WithLifecycle<T: Runnable> {
480 inner: Arc<T>,
481 lc: Arc<Lifecycle>,
482 pub(crate) stop_timeout: Duration,
483 await_ready: bool,
485 has_ready_handler: bool,
486 run_ready_fn: Option<ReadyFn<T>>,
487}
488
489impl<T: Runnable> WithLifecycle<T> {
490 pub fn new(inner: T) -> Self {
491 Self {
492 inner: Arc::new(inner),
493 lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
494 stop_timeout: Duration::from_secs(30),
495 await_ready: false,
496 has_ready_handler: false,
497 run_ready_fn: None,
498 }
499 }
500
501 pub fn from_arc(inner: Arc<T>) -> Self {
502 Self {
503 inner,
504 lc: Arc::new(Lifecycle::new_named(std::any::type_name::<T>())),
505 stop_timeout: Duration::from_secs(30),
506 await_ready: false,
507 has_ready_handler: false,
508 run_ready_fn: None,
509 }
510 }
511
512 pub fn new_with_name(inner: T, name: &'static str) -> Self {
513 Self {
514 inner: Arc::new(inner),
515 lc: Arc::new(Lifecycle::new_named(name)),
516 stop_timeout: Duration::from_secs(30),
517 await_ready: false,
518 has_ready_handler: false,
519 run_ready_fn: None,
520 }
521 }
522
523 pub fn from_arc_with_name(inner: Arc<T>, name: &'static str) -> Self {
524 Self {
525 inner,
526 lc: Arc::new(Lifecycle::new_named(name)),
527 stop_timeout: Duration::from_secs(30),
528 await_ready: false,
529 has_ready_handler: false,
530 run_ready_fn: None,
531 }
532 }
533
534 pub fn with_stop_timeout(mut self, d: Duration) -> Self {
535 self.stop_timeout = d;
536 self
537 }
538
539 #[inline]
540 #[must_use]
541 pub fn status(&self) -> Status {
542 self.lc.status()
543 }
544
545 #[inline]
546 #[must_use]
547 pub fn inner(&self) -> &T {
548 self.inner.as_ref()
549 }
550
551 #[inline]
553 #[must_use]
554 pub fn inner_arc(&self) -> Arc<T> {
555 self.inner.clone()
556 }
557
558 pub fn with_ready_mode(
560 mut self,
561 await_ready: bool,
562 has_ready_handler: bool,
563 run_ready_fn: Option<ReadyFn<T>>,
564 ) -> Self {
565 self.await_ready = await_ready;
566 self.has_ready_handler = has_ready_handler;
567 self.run_ready_fn = run_ready_fn;
568 self
569 }
570}
571
572impl<T: Runnable + Default> Default for WithLifecycle<T> {
573 fn default() -> Self {
574 Self::new(T::default())
575 }
576}
577
578#[async_trait]
579impl<T: Runnable> crate::contracts::RunnableCapability for WithLifecycle<T> {
580 #[tracing::instrument(skip(self, external_cancel), level = "debug")]
581 async fn start(&self, external_cancel: CancellationToken) -> TaskResult<()> {
582 let inner = self.inner.clone();
583 let composed = external_cancel.child_token();
584
585 if !self.await_ready {
586 self.lc
587 .start_with_token(composed, move |cancel| inner.run(cancel))
588 .map_err(anyhow::Error::from)
589 } else if self.has_ready_handler {
590 let f = self.run_ready_fn.ok_or_else(|| {
591 anyhow::anyhow!("run_ready_fn must be set when has_ready_handler")
592 })?;
593 self.lc
594 .start_with_ready_and_token(composed, move |cancel, ready| f(inner, cancel, ready))
595 .map_err(anyhow::Error::from)
596 } else {
597 self.lc
598 .start_with_ready_and_token(composed, move |cancel, ready| async move {
599 ready.notify();
601 inner.run(cancel).await
602 })
603 .map_err(anyhow::Error::from)
604 }
605 }
606
607 #[tracing::instrument(skip(self, external_cancel), level = "debug")]
608 async fn stop(&self, external_cancel: CancellationToken) -> TaskResult<()> {
609 tokio::select! {
610 res = self.lc.stop(self.stop_timeout) => {
611 _ = res.map_err(anyhow::Error::from)?;
612 Ok(())
613 }
614 () = external_cancel.cancelled() => {
615 _ = self.lc.stop(Duration::from_millis(0)).await?;
616 Ok(())
617 }
618 }
619 }
620}
621
622impl<T: Runnable> Drop for WithLifecycle<T> {
623 fn drop(&mut self) {
625 if Arc::strong_count(&self.lc) == 1 {
626 if let Some(tok) = self.lc.cancel.lock().as_ref() {
627 tok.cancel();
628 }
629 if let Some(handle) = self.lc.handle.lock().as_ref() {
630 handle.abort();
631 }
632 }
633 }
634}
635
636#[cfg(test)]
639#[cfg_attr(coverage_nightly, coverage(off))]
640mod tests {
641 use super::*;
642 use std::sync::atomic::{AtomicU32, Ordering as AOrd};
643 use tokio::time::{Duration, sleep};
644
645 struct TestRunnable {
646 counter: AtomicU32,
647 }
648
649 impl TestRunnable {
650 fn new() -> Self {
651 Self {
652 counter: AtomicU32::new(0),
653 }
654 }
655 fn count(&self) -> u32 {
656 self.counter.load(AOrd::Relaxed)
657 }
658 }
659
660 #[async_trait::async_trait]
661 impl Runnable for TestRunnable {
662 async fn run(self: Arc<Self>, cancel: CancellationToken) -> TaskResult<()> {
663 let mut interval = tokio::time::interval(Duration::from_millis(10));
664 loop {
665 tokio::select! {
666 _ = interval.tick() => { self.counter.fetch_add(1, AOrd::Relaxed); }
667 () = cancel.cancelled() => break,
668 }
669 }
670 Ok(())
671 }
672 }
673
674 #[tokio::test]
675 async fn lifecycle_basic() {
676 let lc = Arc::new(Lifecycle::new());
677 assert_eq!(lc.status(), Status::Stopped);
678
679 let result = lc.start(|cancel| async move {
680 cancel.cancelled().await;
681 Ok(())
682 });
683 assert!(result.is_ok());
684
685 let stop_result = lc.stop(Duration::from_millis(100)).await;
686 assert!(stop_result.is_ok());
687 assert_eq!(lc.status(), Status::Stopped);
688 }
689
690 #[tokio::test]
691 async fn with_lifecycle_wrapper_basics() {
692 let runnable = TestRunnable::new();
693 let wrapper = WithLifecycle::new(runnable);
694
695 assert_eq!(wrapper.status(), Status::Stopped);
696 assert_eq!(wrapper.inner().count(), 0);
697
698 let wrapper = wrapper.with_stop_timeout(Duration::from_secs(60));
699 assert_eq!(wrapper.stop_timeout.as_secs(), 60);
700 }
701
702 #[tokio::test]
703 async fn start_sets_running_immediately() {
704 let lc = Lifecycle::new();
705 lc.start(|cancel| async move {
706 cancel.cancelled().await;
707 Ok(())
708 })
709 .unwrap();
710
711 let s = lc.status();
712 assert!(matches!(s, Status::Running | Status::Starting));
713
714 let _ = lc.stop(Duration::from_millis(50)).await.unwrap();
715 assert_eq!(lc.status(), Status::Stopped);
716 }
717
718 #[tokio::test]
719 async fn start_with_ready_transitions_and_stop() {
720 let lc = Lifecycle::new();
721
722 let (ready_tx, ready_rx) = oneshot::channel::<()>();
723 lc.start_with_ready(move |cancel, ready| async move {
724 _ = ready_rx.await;
725 ready.notify();
726 cancel.cancelled().await;
727 Ok(())
728 })
729 .unwrap();
730
731 assert_eq!(lc.status(), Status::Starting);
732
733 _ = ready_tx.send(());
734 sleep(Duration::from_millis(10)).await;
735 assert_eq!(lc.status(), Status::Running);
736
737 let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
738 assert!(matches!(
739 reason,
740 StopReason::Cancelled | StopReason::Finished
741 ));
742 assert_eq!(lc.status(), Status::Stopped);
743 }
744
745 #[tokio::test]
746 async fn stop_while_starting_before_ready() {
747 let lc = Lifecycle::new();
748
749 lc.start_with_ready(move |cancel, _ready| async move {
750 cancel.cancelled().await;
751 Ok(())
752 })
753 .unwrap();
754
755 assert_eq!(lc.status(), Status::Starting);
756
757 let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
758 assert!(matches!(
759 reason,
760 StopReason::Cancelled | StopReason::Finished
761 ));
762 assert_eq!(lc.status(), Status::Stopped);
763 }
764
765 #[tokio::test]
766 async fn timeout_path_aborts_and_notifies() {
767 let lc = Lifecycle::new();
768
769 lc.start(|_cancel| async move {
770 loop {
771 sleep(Duration::from_secs(1000)).await;
772 }
773 #[allow(unreachable_code)]
774 Ok::<_, anyhow::Error>(())
775 })
776 .unwrap();
777
778 let reason = lc.stop(Duration::from_millis(30)).await.unwrap();
779 assert_eq!(reason, StopReason::Timeout);
780 assert_eq!(lc.status(), Status::Stopped);
781 }
782
783 #[tokio::test]
784 async fn try_start_and_second_start_fails() {
785 let lc = Lifecycle::new();
786
787 assert!(lc.try_start(|cancel| async move {
788 cancel.cancelled().await;
789 Ok(())
790 }));
791
792 let err = lc.start(|_c| async { Ok(()) }).unwrap_err();
793 match err {
794 LifecycleError::AlreadyStarted => {}
795 }
796
797 let _ = lc.stop(Duration::from_millis(80)).await.unwrap();
798 assert_eq!(lc.status(), Status::Stopped);
799 }
800
801 #[tokio::test]
802 async fn stop_is_idempotent_and_safe_concurrent() {
803 let lc = Arc::new(Lifecycle::new());
804
805 lc.start(|cancel| async move {
806 cancel.cancelled().await;
807 Ok(())
808 })
809 .unwrap();
810
811 let a = lc.clone();
812 let b = lc.clone();
813 let (r1, r2) = tokio::join!(
814 async move { a.stop(Duration::from_millis(80)).await },
815 async move { b.stop(Duration::from_millis(80)).await },
816 );
817
818 let r1 = r1.unwrap();
819 let r2 = r2.unwrap();
820 assert!(matches!(
821 r1,
822 StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
823 ));
824 assert!(matches!(
825 r2,
826 StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
827 ));
828 assert_eq!(lc.status(), Status::Stopped);
829 }
830
831 #[tokio::test]
832 async fn stateful_wrapper_start_stop_roundtrip() {
833 use crate::contracts::RunnableCapability;
834
835 let wrapper = WithLifecycle::new(TestRunnable::new());
836 assert_eq!(wrapper.status(), Status::Stopped);
837
838 wrapper.start(CancellationToken::new()).await.unwrap();
839 assert!(wrapper.lc.is_running());
840
841 wrapper.stop(CancellationToken::new()).await.unwrap();
842 assert_eq!(wrapper.status(), Status::Stopped);
843 }
844
845 #[tokio::test]
846 async fn with_lifecycle_double_start_fails() {
847 use crate::contracts::RunnableCapability;
848
849 let wrapper = WithLifecycle::new(TestRunnable::new());
850 let cancel = CancellationToken::new();
851 wrapper.start(cancel.clone()).await.unwrap();
852 let err = wrapper.start(cancel).await;
853 assert!(err.is_err());
854 wrapper.stop(CancellationToken::new()).await.unwrap();
855 }
856
857 #[tokio::test]
858 async fn with_lifecycle_concurrent_stop_calls() {
859 use crate::contracts::RunnableCapability;
860 let wrapper = Arc::new(WithLifecycle::new(TestRunnable::new()));
861 wrapper.start(CancellationToken::new()).await.unwrap();
862 let a = wrapper.clone();
863 let b = wrapper.clone();
864 let (r1, r2) = tokio::join!(
865 async move { a.stop(CancellationToken::new()).await },
866 async move { b.stop(CancellationToken::new()).await },
867 );
868 assert!(r1.is_ok());
869 assert!(r2.is_ok());
870 assert_eq!(wrapper.status(), Status::Stopped);
871 }
872
873 #[tokio::test]
874 async fn lifecycle_handles_panics_properly() {
875 let lc = Lifecycle::new();
876
877 lc.start(|_cancel| async {
879 panic!("test panic message");
880 })
881 .unwrap();
882
883 tokio::time::sleep(Duration::from_millis(50)).await;
885
886 let reason = lc.stop(Duration::from_millis(1000)).await.unwrap();
888
889 assert!(matches!(
892 reason,
893 StopReason::Finished | StopReason::Cancelled | StopReason::Timeout
894 ));
895 assert_eq!(lc.status(), Status::Stopped);
896 }
897
898 #[tokio::test]
899 async fn lifecycle_task_naming_and_logging() {
900 let lc = Lifecycle::new();
901
902 lc.start(|cancel| async move {
904 cancel.cancelled().await;
905 Ok(())
906 })
907 .unwrap();
908
909 assert!(lc.is_running());
911
912 let reason = lc.stop(Duration::from_millis(100)).await.unwrap();
914 assert!(matches!(
915 reason,
916 StopReason::Cancelled | StopReason::Finished
917 ));
918 assert_eq!(lc.status(), Status::Stopped);
919 }
920
921 #[tokio::test]
922 async fn lifecycle_join_handles_all_tasks() {
923 let lc = Arc::new(Lifecycle::new());
924
925 lc.start(|cancel| async move {
927 tokio::time::sleep(Duration::from_millis(10)).await;
928 cancel.cancelled().await;
929 Ok(())
930 })
931 .unwrap();
932
933 let start = std::time::Instant::now();
935 let reason = lc.stop(Duration::from_millis(200)).await.unwrap();
936 let elapsed = start.elapsed();
937
938 assert!(elapsed >= Duration::from_millis(10));
940 assert!(matches!(
941 reason,
942 StopReason::Cancelled | StopReason::Finished
943 ));
944 assert_eq!(lc.status(), Status::Stopped);
945 }
946}