tokio_tasker/
join_stream.rs

1use std::fmt;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use futures_util::Stream;
7use tokio::task::JoinError;
8
9use crate::tasker::Shared;
10
11/// Stream for the [`join_stream()`][crate::Tasker::join_stream()] method.
12pub struct JoinStream {
13    shared: Pin<Arc<Shared>>,
14}
15
16impl JoinStream {
17    pub(crate) fn new(shared: Pin<Arc<Shared>>) -> Self {
18        Self { shared }
19    }
20}
21
22impl Stream for JoinStream {
23    type Item = Result<(), JoinError>;
24
25    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
26        let mut handles = self.shared.handles.lock();
27        match handles.poll_next(cx) {
28            Poll::Ready(Some(res)) => Poll::Ready(Some(res)),
29            Poll::Ready(None) if self.shared.all_finished() => Poll::Ready(None),
30            _ => {
31                // Either all the handles have returned already or the stream is pending,
32                // but either way there are still outstanding Tasker clones,
33                // and more join handles could be added,
34                // so we need to save a waker to be notified when a handle
35                // is added or a Tasker clone is marked finished.
36                // Meanwhile, return Pending.
37                handles.set_waker(cx);
38                Poll::Pending
39            }
40        }
41    }
42}
43
44impl fmt::Debug for JoinStream {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        f.debug_struct("JoinStream")
47            .field("tasker", &self.shared.ptr())
48            .finish()
49    }
50}