tokio_tasker/
join_stream.rs1use std::fmt;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use futures_util::Stream;
7use tokio::task::JoinError;
8
9use crate::tasker::Shared;
10
11pub struct JoinStream {
13 shared: Pin<Arc<Shared>>,
14}
15
16impl JoinStream {
17 pub(crate) fn new(shared: Pin<Arc<Shared>>) -> Self {
18 Self { shared }
19 }
20}
21
22impl Stream for JoinStream {
23 type Item = Result<(), JoinError>;
24
25 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
26 let mut handles = self.shared.handles.lock();
27 match handles.poll_next(cx) {
28 Poll::Ready(Some(res)) => Poll::Ready(Some(res)),
29 Poll::Ready(None) if self.shared.all_finished() => Poll::Ready(None),
30 _ => {
31 handles.set_waker(cx);
38 Poll::Pending
39 }
40 }
41 }
42}
43
44impl fmt::Debug for JoinStream {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 f.debug_struct("JoinStream")
47 .field("tasker", &self.shared.ptr())
48 .finish()
49 }
50}