tiny_actor/actor/
shutdown.rs

1use crate::*;
2use futures::{Future, FutureExt, Stream, StreamExt};
3use std::{
4    pin::Pin,
5    task::{Context, Poll},
6    time::Duration,
7};
8use tokio::time::Sleep;
9
10pub struct ShutdownFut<'a, E: Send + 'static, C: DynChannel + ?Sized> {
11    child: &'a mut Child<E, C>,
12    sleep: Option<Pin<Box<Sleep>>>,
13}
14
15impl<'a, E: Send + 'static, C: DynChannel + ?Sized> ShutdownFut<'a, E, C> {
16    pub(crate) fn new(child: &'a mut Child<E, C>, timeout: Duration) -> Self {
17        child.halt();
18
19        ShutdownFut {
20            child,
21            sleep: Some(Box::pin(tokio::time::sleep(timeout))),
22        }
23    }
24}
25
26impl<'a, E: Send + 'static, C: DynChannel + ?Sized> Future for ShutdownFut<'a, E, C> {
27    type Output = Result<E, ExitError>;
28
29    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
30        match self.child.poll_unpin(cx) {
31            Poll::Ready(res) => Poll::Ready(res),
32            Poll::Pending => {
33                if let Some(sleep) = &mut self.sleep {
34                    if sleep.poll_unpin(cx).is_ready() {
35                        self.sleep = None;
36                        self.child.abort();
37                    }
38                };
39                Poll::Pending
40            }
41        }
42    }
43}
44
45/// Stream returned when shutting down a [ChildPool].
46///
47/// This stream can be collected into a vec with [StreamExt::collect]:
48pub struct ShutdownStream<'a, E: Send + 'static, C: DynChannel + ?Sized> {
49    pool: &'a mut ChildPool<E, C>,
50    sleep: Option<Pin<Box<Sleep>>>,
51}
52
53impl<'a, E: Send + 'static, C: DynChannel + ?Sized> ShutdownStream<'a, E, C> {
54    pub(crate) fn new(pool: &'a mut ChildPool<E, C>, timeout: Duration) -> Self {
55        pool.halt();
56
57        ShutdownStream {
58            pool,
59            sleep: Some(Box::pin(tokio::time::sleep(timeout))),
60        }
61    }
62}
63
64impl<'a, E: Send + 'static, C: DynChannel + ?Sized> Stream for ShutdownStream<'a, E, C> {
65    type Item = Result<E, ExitError>;
66
67    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
68        if let Some(sleep) = &mut self.sleep {
69            if sleep.poll_unpin(cx).is_ready() {
70                self.sleep = None;
71                self.pool.abort();
72            }
73        };
74
75        self.pool.poll_next_unpin(cx)
76    }
77}
78
79#[cfg(test)]
80mod test {
81    use std::{future::pending, time::Duration};
82
83    use futures::StreamExt;
84
85    use crate::*;
86
87    #[tokio::test]
88    async fn shutdown_success() {
89        let (mut child, _addr) = spawn(Config::default(), basic_actor!());
90        assert!(child.shutdown(Duration::from_millis(5)).await.is_ok());
91    }
92
93    #[tokio::test]
94    async fn shutdown_failure() {
95        let (mut child, _addr) = spawn(Config::default(), |_inbox: Inbox<()>| async {
96            pending::<()>().await;
97        });
98        assert!(matches!(
99            child.shutdown(Duration::from_millis(5)).await,
100            Err(ExitError::Abort)
101        ));
102    }
103
104    #[tokio::test]
105    async fn shutdown_pool_success() {
106        let (mut child, _addr) = spawn_many(0..3, Config::default(), pooled_basic_actor!());
107
108        let results = child
109            .shutdown(Duration::from_millis(5))
110            .collect::<Vec<_>>()
111            .await;
112        assert_eq!(results.len(), 3);
113
114        for result in results {
115            assert!(result.is_ok());
116        }
117    }
118
119    #[tokio::test]
120    async fn shutdown_pool_failure() {
121        let (mut child, _addr) =
122            spawn_many(0..3, Config::default(), |_, _inbox: Inbox<()>| async {
123                pending::<()>().await;
124            });
125
126        let results = child
127            .shutdown(Duration::from_millis(5))
128            .collect::<Vec<_>>()
129            .await;
130        assert_eq!(results.len(), 3);
131
132        for result in results {
133            assert!(matches!(result, Err(ExitError::Abort)));
134        }
135    }
136
137    #[tokio::test]
138    async fn shutdown_pool_mixed() {
139        let (mut child, _addr) = spawn_one(Config::default(), |_inbox: Inbox<()>| async move {
140            pending::<()>().await;
141            unreachable!()
142        });
143        child.spawn(basic_actor!()).unwrap();
144        child
145            .spawn(|_inbox: Inbox<()>| async move {
146                pending::<()>().await;
147                unreachable!()
148            })
149            .unwrap();
150        child.spawn(basic_actor!()).unwrap();
151
152        let results = child
153            .shutdown(Duration::from_millis(5))
154            .collect::<Vec<_>>()
155            .await;
156
157        let successes = results.iter().filter(|res| res.is_ok()).count();
158        let failures = results.iter().filter(|res| res.is_err()).count();
159
160        assert_eq!(successes, 2);
161        assert_eq!(failures, 2);
162    }
163}