1use crate::*;
2use futures::{Future, FutureExt};
3use std::{fmt::Debug, mem::ManuallyDrop, sync::Arc, time::Duration};
4use tokio::task::JoinHandle;
5
6#[derive(Debug)]
18pub struct Child<E, C = dyn AnyChannel>
19where
20 E: Send + 'static,
21 C: DynChannel + ?Sized,
22{
23 pub(super) handle: Option<JoinHandle<E>>,
24 pub(super) channel: Arc<C>,
25 pub(super) link: Link,
26 pub(super) is_aborted: bool,
27}
28
29impl<E, C> Child<E, C>
30where
31 E: Send + 'static,
32 C: DynChannel + ?Sized,
33{
34 pub(crate) fn new(channel: Arc<C>, join_handle: JoinHandle<E>, link: Link) -> Self {
35 Self {
36 handle: Some(join_handle),
37 link,
38 channel,
39 is_aborted: false,
40 }
41 }
42
43 fn into_parts(self) -> (Arc<C>, JoinHandle<E>, Link, bool) {
44 let no_drop = ManuallyDrop::new(self);
45 unsafe {
46 let mut handle = std::ptr::read(&no_drop.handle);
47 let channel = std::ptr::read(&no_drop.channel);
48 let link = std::ptr::read(&no_drop.link);
49 let is_aborted = std::ptr::read(&no_drop.is_aborted);
50 (channel, handle.take().unwrap(), link, is_aborted)
51 }
52 }
53
54 pub fn into_joinhandle(self) -> JoinHandle<E> {
58 self.into_parts().1
59 }
60
61 pub fn abort(&mut self) -> bool {
65 self.channel.close();
66 let was_aborted = self.is_aborted;
67 self.is_aborted = true;
68 self.handle.as_ref().unwrap().abort();
69 !was_aborted
70 }
71
72 pub fn is_finished(&self) -> bool {
74 self.handle.as_ref().unwrap().is_finished()
75 }
76
77 pub fn into_pool(self) -> ChildPool<E, C> {
79 let (channel, handle, link, is_aborted) = self.into_parts();
80 ChildPool {
81 channel,
82 handles: Some(vec![handle]),
83 link,
84 is_aborted,
85 }
86 }
87
88 pub fn downcast<M: Send + 'static>(self) -> Result<Child<E, Channel<M>>, Self>
90 where
91 C: AnyChannel,
92 {
93 let (channel, handle, link, is_aborted) = self.into_parts();
94 match channel.clone().into_any().downcast::<Channel<M>>() {
95 Ok(channel) => Ok(Child {
96 handle: Some(handle),
97 channel,
98 link,
99 is_aborted,
100 }),
101 Err(_) => Err(Child {
102 handle: Some(handle),
103 channel,
104 link,
105 is_aborted,
106 }),
107 }
108 }
109
110 pub fn shutdown(&mut self, timeout: Duration) -> ShutdownFut<'_, E, C> {
115 ShutdownFut::new(self, timeout)
116 }
117
118 gen::child_methods!();
119 gen::dyn_channel_methods!();
120}
121
122impl<'a, E: Send + 'static, C: DynChannel + ?Sized> Unpin for ShutdownFut<'a, E, C> {}
123
124impl<E, M> Child<E, Channel<M>>
125where
126 E: Send + 'static,
127 M: Send + 'static,
128{
129 pub fn into_dyn(self) -> Child<E> {
131 let parts = self.into_parts();
132 Child {
133 handle: Some(parts.1),
134 channel: parts.0,
135 link: parts.2,
136 is_aborted: parts.3,
137 }
138 }
139
140 gen::send_methods!();
141}
142
143#[cfg(feature = "internals")]
144impl<E, C> Child<E, C>
145where
146 E: Send + 'static,
147 C: DynChannel + ?Sized,
148{
149 pub fn transform_channel<C2: DynChannel + ?Sized>(
150 self,
151 func: fn(Arc<C>) -> Arc<C2>,
152 ) -> Child<E, C2> {
153 let (channel, handle, link, is_aborted) = self.into_parts();
154 Child {
155 channel: func(channel),
156 handle: Some(handle),
157 link,
158 is_aborted,
159 }
160 }
161
162 pub fn channel_ref(&self) -> &C {
163 &self.channel
164 }
165}
166
167impl<E: Send + 'static, C: DynChannel + ?Sized> Drop for Child<E, C> {
168 fn drop(&mut self) {
169 if let Link::Attached(abort_timer) = self.link {
170 if !self.is_aborted && !self.is_finished() {
171 if abort_timer.is_zero() {
172 self.abort();
173 } else {
174 self.halt();
175 let handle = self.handle.take().unwrap();
176 tokio::task::spawn(async move {
177 tokio::time::sleep(abort_timer).await;
178 handle.abort();
179 });
180 }
181 }
182 }
183 }
184}
185
186impl<E: Send + 'static, C: DynChannel + ?Sized> Unpin for Child<E, C> {}
187
188impl<E: Send + 'static, C: DynChannel + ?Sized> Future for Child<E, C> {
189 type Output = Result<E, ExitError>;
190
191 fn poll(
192 mut self: std::pin::Pin<&mut Self>,
193 cx: &mut std::task::Context<'_>,
194 ) -> std::task::Poll<Self::Output> {
195 self.handle
196 .as_mut()
197 .unwrap()
198 .poll_unpin(cx)
199 .map_err(|e| e.into())
200 }
201}
202
203#[cfg(test)]
204mod test {
205 use crate::*;
206 use std::{future::pending, time::Duration};
207 use tokio::sync::oneshot;
208
209 #[tokio::test]
210 async fn dropping() {
211 let (tx, rx) = oneshot::channel();
212 let (child, _addr) = spawn(Config::default(), |mut inbox: Inbox<()>| async move {
213 if let Err(RecvError::Halted) = inbox.recv().await {
214 tx.send(true).unwrap();
215 } else {
216 tx.send(false).unwrap()
217 }
218 });
219 drop(child);
220 assert!(rx.await.unwrap());
221 }
222
223 #[tokio::test]
224 async fn dropping_aborts() {
225 let (tx, rx) = oneshot::channel();
226 let (child, _addr) = spawn(
227 Config::attached(Duration::from_millis(1)),
228 |mut inbox: Inbox<()>| async move {
229 if let Err(RecvError::Halted) = inbox.recv().await {
230 tx.send(true).unwrap();
231 pending::<()>().await;
232 } else {
233 tx.send(false).unwrap()
234 }
235 },
236 );
237 drop(child);
238 assert!(rx.await.unwrap());
239 }
240
241 #[tokio::test]
242 async fn dropping_detached() {
243 let (tx, rx) = oneshot::channel();
244 let (child, addr) = spawn(Config::detached(), |mut inbox: Inbox<()>| async move {
245 if let Err(RecvError::Halted) = inbox.recv().await {
246 tx.send(true).unwrap();
247 } else {
248 tx.send(false).unwrap()
249 }
250 });
251 drop(child);
252 tokio::time::sleep(Duration::from_millis(1)).await;
253 addr.try_send(()).unwrap();
254 assert!(!rx.await.unwrap());
255 }
256
257 #[tokio::test]
258 async fn downcast() {
259 let (child, _addr) = spawn(Config::default(), basic_actor!());
260 assert!(matches!(child.into_dyn().downcast::<()>(), Ok(_)));
261 }
262
263 #[tokio::test]
264 async fn abort() {
265 let (mut child, _addr) = spawn(Config::default(), basic_actor!());
266 assert!(!child.is_aborted());
267 child.abort();
268 assert!(child.is_aborted());
269 assert!(matches!(child.await, Err(ExitError::Abort)));
270 }
271
272 #[tokio::test]
273 async fn is_finished() {
274 let (mut child, _addr) = spawn(Config::default(), basic_actor!());
275 child.abort();
276 let _ = (&mut child).await;
277 assert!(child.is_finished());
278 }
279
280 #[tokio::test]
281 async fn into_childpool() {
282 let (child, _addr) = spawn(Config::default(), basic_actor!());
283 let pool = child.into_pool();
284 assert_eq!(pool.task_count(), 1);
285 assert_eq!(pool.process_count(), 1);
286 assert_eq!(pool.is_aborted(), false);
287
288 let (mut child, _addr) = spawn(Config::default(), basic_actor!());
289 child.abort();
290 let pool = child.into_pool();
291 assert_eq!(pool.is_aborted(), true);
292 }
293}