tiny_actor/actor/
child.rs

1use crate::*;
2use futures::{Future, FutureExt};
3use std::{fmt::Debug, mem::ManuallyDrop, sync::Arc, time::Duration};
4use tokio::task::JoinHandle;
5
6/// A child is the non clone-able reference to an actor with a single process.
7///
8/// Children can be of two forms:
9/// * `Child<E, Channel<M>>`: This is the default form, it can be transformed into a `Child<E>` using
10/// [Child::into_dyn].
11/// * `Child<E>`: This form is a dynamic child, it can be transformed back into a `Child<E, Channel<M>>`
12/// using [Child::downcast::<M>].
13///
14/// A child can be transformed into a [ChildPool] using [Child::into_pool()].
15///
16/// A child can be awaited which returns the parameter `E` once the actor exits.
17#[derive(Debug)]
18pub struct Child<E, C = dyn AnyChannel>
19where
20    E: Send + 'static,
21    C: DynChannel + ?Sized,
22{
23    pub(super) handle: Option<JoinHandle<E>>,
24    pub(super) channel: Arc<C>,
25    pub(super) link: Link,
26    pub(super) is_aborted: bool,
27}
28
29impl<E, C> Child<E, C>
30where
31    E: Send + 'static,
32    C: DynChannel + ?Sized,
33{
34    pub(crate) fn new(channel: Arc<C>, join_handle: JoinHandle<E>, link: Link) -> Self {
35        Self {
36            handle: Some(join_handle),
37            link,
38            channel,
39            is_aborted: false,
40        }
41    }
42
43    fn into_parts(self) -> (Arc<C>, JoinHandle<E>, Link, bool) {
44        let no_drop = ManuallyDrop::new(self);
45        unsafe {
46            let mut handle = std::ptr::read(&no_drop.handle);
47            let channel = std::ptr::read(&no_drop.channel);
48            let link = std::ptr::read(&no_drop.link);
49            let is_aborted = std::ptr::read(&no_drop.is_aborted);
50            (channel, handle.take().unwrap(), link, is_aborted)
51        }
52    }
53
54    /// Get the underlying [JoinHandle].
55    ///
56    /// This will not run the drop, and therefore the actor will not be halted/aborted.
57    pub fn into_joinhandle(self) -> JoinHandle<E> {
58        self.into_parts().1
59    }
60
61    /// Abort the actor.
62    ///
63    /// Returns `true` if this is the first abort.
64    pub fn abort(&mut self) -> bool {
65        self.channel.close();
66        let was_aborted = self.is_aborted;
67        self.is_aborted = true;
68        self.handle.as_ref().unwrap().abort();
69        !was_aborted
70    }
71
72    /// Whether the task is finished.
73    pub fn is_finished(&self) -> bool {
74        self.handle.as_ref().unwrap().is_finished()
75    }
76
77    /// Convert the [Child] into a [ChildPool].
78    pub fn into_pool(self) -> ChildPool<E, C> {
79        let (channel, handle, link, is_aborted) = self.into_parts();
80        ChildPool {
81            channel,
82            handles: Some(vec![handle]),
83            link,
84            is_aborted,
85        }
86    }
87
88    /// Downcast the `Child<E>` to a `Child<E, Channel<M>>`.
89    pub fn downcast<M: Send + 'static>(self) -> Result<Child<E, Channel<M>>, Self>
90    where
91        C: AnyChannel,
92    {
93        let (channel, handle, link, is_aborted) = self.into_parts();
94        match channel.clone().into_any().downcast::<Channel<M>>() {
95            Ok(channel) => Ok(Child {
96                handle: Some(handle),
97                channel,
98                link,
99                is_aborted,
100            }),
101            Err(_) => Err(Child {
102                handle: Some(handle),
103                channel,
104                link,
105                is_aborted,
106            }),
107        }
108    }
109
110    /// Halts the actor, and then waits for it to exit. This always returns with the
111    /// result of the task, and closes the channel.
112    ///
113    /// If the timeout expires before the actor has exited, the actor will be aborted.
114    pub fn shutdown(&mut self, timeout: Duration) -> ShutdownFut<'_, E, C> {
115        ShutdownFut::new(self, timeout)
116    }
117
118    gen::child_methods!();
119    gen::dyn_channel_methods!();
120}
121
122impl<'a, E: Send + 'static, C: DynChannel + ?Sized> Unpin for ShutdownFut<'a, E, C> {}
123
124impl<E, M> Child<E, Channel<M>>
125where
126    E: Send + 'static,
127    M: Send + 'static,
128{
129    /// Convert the `Child<T, Channel<M>>` into a `Child<T>`
130    pub fn into_dyn(self) -> Child<E> {
131        let parts = self.into_parts();
132        Child {
133            handle: Some(parts.1),
134            channel: parts.0,
135            link: parts.2,
136            is_aborted: parts.3,
137        }
138    }
139
140    gen::send_methods!();
141}
142
143#[cfg(feature = "internals")]
144impl<E, C> Child<E, C>
145where
146    E: Send + 'static,
147    C: DynChannel + ?Sized,
148{
149    pub fn transform_channel<C2: DynChannel + ?Sized>(
150        self,
151        func: fn(Arc<C>) -> Arc<C2>,
152    ) -> Child<E, C2> {
153        let (channel, handle, link, is_aborted) = self.into_parts();
154        Child {
155            channel: func(channel),
156            handle: Some(handle),
157            link,
158            is_aborted,
159        }
160    }
161
162    pub fn channel_ref(&self) -> &C {
163        &self.channel
164    }
165}
166
167impl<E: Send + 'static, C: DynChannel + ?Sized> Drop for Child<E, C> {
168    fn drop(&mut self) {
169        if let Link::Attached(abort_timer) = self.link {
170            if !self.is_aborted && !self.is_finished() {
171                if abort_timer.is_zero() {
172                    self.abort();
173                } else {
174                    self.halt();
175                    let handle = self.handle.take().unwrap();
176                    tokio::task::spawn(async move {
177                        tokio::time::sleep(abort_timer).await;
178                        handle.abort();
179                    });
180                }
181            }
182        }
183    }
184}
185
186impl<E: Send + 'static, C: DynChannel + ?Sized> Unpin for Child<E, C> {}
187
188impl<E: Send + 'static, C: DynChannel + ?Sized> Future for Child<E, C> {
189    type Output = Result<E, ExitError>;
190
191    fn poll(
192        mut self: std::pin::Pin<&mut Self>,
193        cx: &mut std::task::Context<'_>,
194    ) -> std::task::Poll<Self::Output> {
195        self.handle
196            .as_mut()
197            .unwrap()
198            .poll_unpin(cx)
199            .map_err(|e| e.into())
200    }
201}
202
203#[cfg(test)]
204mod test {
205    use crate::*;
206    use std::{future::pending, time::Duration};
207    use tokio::sync::oneshot;
208
209    #[tokio::test]
210    async fn dropping() {
211        let (tx, rx) = oneshot::channel();
212        let (child, _addr) = spawn(Config::default(), |mut inbox: Inbox<()>| async move {
213            if let Err(RecvError::Halted) = inbox.recv().await {
214                tx.send(true).unwrap();
215            } else {
216                tx.send(false).unwrap()
217            }
218        });
219        drop(child);
220        assert!(rx.await.unwrap());
221    }
222
223    #[tokio::test]
224    async fn dropping_aborts() {
225        let (tx, rx) = oneshot::channel();
226        let (child, _addr) = spawn(
227            Config::attached(Duration::from_millis(1)),
228            |mut inbox: Inbox<()>| async move {
229                if let Err(RecvError::Halted) = inbox.recv().await {
230                    tx.send(true).unwrap();
231                    pending::<()>().await;
232                } else {
233                    tx.send(false).unwrap()
234                }
235            },
236        );
237        drop(child);
238        assert!(rx.await.unwrap());
239    }
240
241    #[tokio::test]
242    async fn dropping_detached() {
243        let (tx, rx) = oneshot::channel();
244        let (child, addr) = spawn(Config::detached(), |mut inbox: Inbox<()>| async move {
245            if let Err(RecvError::Halted) = inbox.recv().await {
246                tx.send(true).unwrap();
247            } else {
248                tx.send(false).unwrap()
249            }
250        });
251        drop(child);
252        tokio::time::sleep(Duration::from_millis(1)).await;
253        addr.try_send(()).unwrap();
254        assert!(!rx.await.unwrap());
255    }
256
257    #[tokio::test]
258    async fn downcast() {
259        let (child, _addr) = spawn(Config::default(), basic_actor!());
260        assert!(matches!(child.into_dyn().downcast::<()>(), Ok(_)));
261    }
262
263    #[tokio::test]
264    async fn abort() {
265        let (mut child, _addr) = spawn(Config::default(), basic_actor!());
266        assert!(!child.is_aborted());
267        child.abort();
268        assert!(child.is_aborted());
269        assert!(matches!(child.await, Err(ExitError::Abort)));
270    }
271
272    #[tokio::test]
273    async fn is_finished() {
274        let (mut child, _addr) = spawn(Config::default(), basic_actor!());
275        child.abort();
276        let _ = (&mut child).await;
277        assert!(child.is_finished());
278    }
279
280    #[tokio::test]
281    async fn into_childpool() {
282        let (child, _addr) = spawn(Config::default(), basic_actor!());
283        let pool = child.into_pool();
284        assert_eq!(pool.task_count(), 1);
285        assert_eq!(pool.process_count(), 1);
286        assert_eq!(pool.is_aborted(), false);
287
288        let (mut child, _addr) = spawn(Config::default(), basic_actor!());
289        child.abort();
290        let pool = child.into_pool();
291        assert_eq!(pool.is_aborted(), true);
292    }
293}