Skip to main content

datum/stream/
completion.rs

1use super::*;
2
3#[derive(Clone)]
4pub struct Cancellable {
5    pub(super) cancelled: Arc<AtomicBool>,
6    _keep_alive: Option<Arc<dyn Send + Sync>>,
7}
8
9impl Cancellable {
10    pub(super) fn new_with_keep_alive(keep_alive: Option<Arc<dyn Send + Sync>>) -> Self {
11        Self {
12            cancelled: Arc::new(AtomicBool::new(false)),
13            _keep_alive: keep_alive,
14        }
15    }
16
17    pub fn cancel(&self) -> bool {
18        !self.cancelled.swap(true, Ordering::SeqCst)
19    }
20
21    #[must_use]
22    pub fn is_cancelled(&self) -> bool {
23        self.cancelled.load(Ordering::SeqCst)
24    }
25}
26
27impl fmt::Debug for Cancellable {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        f.debug_struct("Cancellable").finish_non_exhaustive()
30    }
31}
32
33#[must_use = "dropping the StreamCompletion cancels the running stream; call .wait()/.try_wait() or keep it alive"]
34pub struct StreamCompletion<T> {
35    state: StreamCompletionState<T>,
36    cancel_on_drop: Option<StreamCancellation>,
37}
38
39enum StreamCompletionState<T> {
40    Ready(Option<StreamResult<T>>),
41    Receiver(oneshot::Receiver<StreamResult<T>>),
42}
43
44#[derive(Clone)]
45pub(crate) struct StreamCancellation {
46    cancelled: Arc<AtomicBool>,
47    worker: Arc<Mutex<Option<thread::Thread>>>,
48}
49
50pub(super) struct RegisteredStreamWorker {
51    cancellation: StreamCancellation,
52}
53
54impl StreamCancellation {
55    pub(super) fn new() -> Self {
56        Self {
57            cancelled: Arc::new(AtomicBool::new(false)),
58            worker: Arc::new(Mutex::new(None)),
59        }
60    }
61
62    pub(crate) fn for_external_completion() -> Self {
63        Self::new()
64    }
65
66    pub(crate) fn cancelled(&self) -> Arc<AtomicBool> {
67        Arc::clone(&self.cancelled)
68    }
69
70    pub(super) fn register_current_worker(&self) -> RegisteredStreamWorker {
71        *self.worker.lock().expect("stream worker slot poisoned") = Some(thread::current());
72        RegisteredStreamWorker {
73            cancellation: self.clone(),
74        }
75    }
76
77    fn cancel(&self) {
78        self.cancelled.store(true, Ordering::SeqCst);
79        let worker = self
80            .worker
81            .lock()
82            .expect("stream worker slot poisoned")
83            .clone();
84        if let Some(worker) = worker {
85            worker.unpark();
86        }
87    }
88}
89
90impl Drop for RegisteredStreamWorker {
91    fn drop(&mut self) {
92        *self
93            .cancellation
94            .worker
95            .lock()
96            .expect("stream worker slot poisoned") = None;
97    }
98}
99
100impl<T> StreamCompletion<T> {
101    pub(crate) fn from_receiver(
102        receiver: oneshot::Receiver<StreamResult<T>>,
103        cancel_on_drop: Option<StreamCancellation>,
104    ) -> Self {
105        Self {
106            state: StreamCompletionState::Receiver(receiver),
107            cancel_on_drop,
108        }
109    }
110
111    pub(crate) fn ready(result: StreamResult<T>) -> Self {
112        Self {
113            state: StreamCompletionState::Ready(Some(result)),
114            cancel_on_drop: None,
115        }
116    }
117
118    pub fn wait(self) -> StreamResult<T> {
119        block_on(self)
120    }
121
122    #[must_use]
123    pub fn try_wait(&mut self) -> Option<StreamResult<T>> {
124        match &mut self.state {
125            StreamCompletionState::Ready(result) => result.take(),
126            StreamCompletionState::Receiver(receiver) => match receiver.try_recv() {
127                Ok(Some(result)) => Some(result),
128                Ok(None) => None,
129                Err(_) => Some(Err(StreamError::AbruptTermination)),
130            },
131        }
132    }
133}
134
135impl<T> Future for StreamCompletion<T> {
136    type Output = StreamResult<T>;
137
138    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139        match &mut self.state {
140            StreamCompletionState::Ready(result) => {
141                Poll::Ready(result.take().unwrap_or(Err(StreamError::AbruptTermination)))
142            }
143            StreamCompletionState::Receiver(receiver) => match Pin::new(receiver).poll(cx) {
144                Poll::Ready(Ok(result)) => Poll::Ready(result),
145                Poll::Ready(Err(_)) => Poll::Ready(Err(StreamError::AbruptTermination)),
146                Poll::Pending => Poll::Pending,
147            },
148        }
149    }
150}
151
152impl<T> Unpin for StreamCompletion<T> {}
153
154impl<T> Drop for StreamCompletion<T> {
155    fn drop(&mut self) {
156        if let Some(cancellation) = &self.cancel_on_drop {
157            cancellation.cancel();
158        }
159    }
160}
161
162impl<T> fmt::Debug for StreamCompletion<T> {
163    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164        f.debug_struct("StreamCompletion").finish_non_exhaustive()
165    }
166}