tiny_actor/actor/
shutdown.rs1use crate::*;
2use futures::{Future, FutureExt, Stream, StreamExt};
3use std::{
4 pin::Pin,
5 task::{Context, Poll},
6 time::Duration,
7};
8use tokio::time::Sleep;
9
10pub struct ShutdownFut<'a, E: Send + 'static, C: DynChannel + ?Sized> {
11 child: &'a mut Child<E, C>,
12 sleep: Option<Pin<Box<Sleep>>>,
13}
14
15impl<'a, E: Send + 'static, C: DynChannel + ?Sized> ShutdownFut<'a, E, C> {
16 pub(crate) fn new(child: &'a mut Child<E, C>, timeout: Duration) -> Self {
17 child.halt();
18
19 ShutdownFut {
20 child,
21 sleep: Some(Box::pin(tokio::time::sleep(timeout))),
22 }
23 }
24}
25
26impl<'a, E: Send + 'static, C: DynChannel + ?Sized> Future for ShutdownFut<'a, E, C> {
27 type Output = Result<E, ExitError>;
28
29 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
30 match self.child.poll_unpin(cx) {
31 Poll::Ready(res) => Poll::Ready(res),
32 Poll::Pending => {
33 if let Some(sleep) = &mut self.sleep {
34 if sleep.poll_unpin(cx).is_ready() {
35 self.sleep = None;
36 self.child.abort();
37 }
38 };
39 Poll::Pending
40 }
41 }
42 }
43}
44
45pub struct ShutdownStream<'a, E: Send + 'static, C: DynChannel + ?Sized> {
49 pool: &'a mut ChildPool<E, C>,
50 sleep: Option<Pin<Box<Sleep>>>,
51}
52
53impl<'a, E: Send + 'static, C: DynChannel + ?Sized> ShutdownStream<'a, E, C> {
54 pub(crate) fn new(pool: &'a mut ChildPool<E, C>, timeout: Duration) -> Self {
55 pool.halt();
56
57 ShutdownStream {
58 pool,
59 sleep: Some(Box::pin(tokio::time::sleep(timeout))),
60 }
61 }
62}
63
64impl<'a, E: Send + 'static, C: DynChannel + ?Sized> Stream for ShutdownStream<'a, E, C> {
65 type Item = Result<E, ExitError>;
66
67 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
68 if let Some(sleep) = &mut self.sleep {
69 if sleep.poll_unpin(cx).is_ready() {
70 self.sleep = None;
71 self.pool.abort();
72 }
73 };
74
75 self.pool.poll_next_unpin(cx)
76 }
77}
78
79#[cfg(test)]
80mod test {
81 use std::{future::pending, time::Duration};
82
83 use futures::StreamExt;
84
85 use crate::*;
86
87 #[tokio::test]
88 async fn shutdown_success() {
89 let (mut child, _addr) = spawn(Config::default(), basic_actor!());
90 assert!(child.shutdown(Duration::from_millis(5)).await.is_ok());
91 }
92
93 #[tokio::test]
94 async fn shutdown_failure() {
95 let (mut child, _addr) = spawn(Config::default(), |_inbox: Inbox<()>| async {
96 pending::<()>().await;
97 });
98 assert!(matches!(
99 child.shutdown(Duration::from_millis(5)).await,
100 Err(ExitError::Abort)
101 ));
102 }
103
104 #[tokio::test]
105 async fn shutdown_pool_success() {
106 let (mut child, _addr) = spawn_many(0..3, Config::default(), pooled_basic_actor!());
107
108 let results = child
109 .shutdown(Duration::from_millis(5))
110 .collect::<Vec<_>>()
111 .await;
112 assert_eq!(results.len(), 3);
113
114 for result in results {
115 assert!(result.is_ok());
116 }
117 }
118
119 #[tokio::test]
120 async fn shutdown_pool_failure() {
121 let (mut child, _addr) =
122 spawn_many(0..3, Config::default(), |_, _inbox: Inbox<()>| async {
123 pending::<()>().await;
124 });
125
126 let results = child
127 .shutdown(Duration::from_millis(5))
128 .collect::<Vec<_>>()
129 .await;
130 assert_eq!(results.len(), 3);
131
132 for result in results {
133 assert!(matches!(result, Err(ExitError::Abort)));
134 }
135 }
136
137 #[tokio::test]
138 async fn shutdown_pool_mixed() {
139 let (mut child, _addr) = spawn_one(Config::default(), |_inbox: Inbox<()>| async move {
140 pending::<()>().await;
141 unreachable!()
142 });
143 child.spawn(basic_actor!()).unwrap();
144 child
145 .spawn(|_inbox: Inbox<()>| async move {
146 pending::<()>().await;
147 unreachable!()
148 })
149 .unwrap();
150 child.spawn(basic_actor!()).unwrap();
151
152 let results = child
153 .shutdown(Duration::from_millis(5))
154 .collect::<Vec<_>>()
155 .await;
156
157 let successes = results.iter().filter(|res| res.is_ok()).count();
158 let failures = results.iter().filter(|res| res.is_err()).count();
159
160 assert_eq!(successes, 2);
161 assert_eq!(failures, 2);
162 }
163}