Skip to main content

datum/stream/
completion.rs

1//! `StreamCompletion<T>` — the handle returned immediately at materialization —
2//! and `Cancellable`.
3//!
4//! `StreamCompletion` is `#[must_use]`: dropping it cancels the running stream
5//! (its `Drop` impl). `.wait()` blocks until the result is ready (driven by the
6//! `BlockingPoller`); `.try_wait()` polls without blocking.
7
8use super::*;
9
10#[derive(Clone)]
11pub struct Cancellable {
12    pub(super) cancelled: Arc<AtomicBool>,
13    _keep_alive: Option<Arc<dyn Send + Sync>>,
14}
15
16impl Cancellable {
17    pub(super) fn new_with_keep_alive(keep_alive: Option<Arc<dyn Send + Sync>>) -> Self {
18        Self {
19            cancelled: Arc::new(AtomicBool::new(false)),
20            _keep_alive: keep_alive,
21        }
22    }
23
24    pub fn cancel(&self) -> bool {
25        !self.cancelled.swap(true, Ordering::SeqCst)
26    }
27
28    #[must_use]
29    pub fn is_cancelled(&self) -> bool {
30        self.cancelled.load(Ordering::SeqCst)
31    }
32}
33
34impl fmt::Debug for Cancellable {
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36        f.debug_struct("Cancellable").finish_non_exhaustive()
37    }
38}
39
40#[must_use = "dropping the StreamCompletion cancels the running stream; call .wait()/.try_wait() or keep it alive"]
41pub struct StreamCompletion<T> {
42    state: StreamCompletionState<T>,
43    cancel_on_drop: Option<StreamCancellation>,
44}
45
46enum StreamCompletionState<T> {
47    Ready(Option<StreamResult<T>>),
48    Receiver(oneshot::Receiver<StreamResult<T>>),
49}
50
51#[derive(Clone)]
52pub(crate) struct StreamCancellation {
53    cancelled: Arc<AtomicBool>,
54    worker: Arc<Mutex<Option<thread::Thread>>>,
55}
56
57pub(super) struct RegisteredStreamWorker {
58    cancellation: StreamCancellation,
59}
60
61impl StreamCancellation {
62    pub(super) fn new() -> Self {
63        Self {
64            cancelled: Arc::new(AtomicBool::new(false)),
65            worker: Arc::new(Mutex::new(None)),
66        }
67    }
68
69    pub(crate) fn for_external_completion() -> Self {
70        Self::new()
71    }
72
73    pub(crate) fn cancelled(&self) -> Arc<AtomicBool> {
74        Arc::clone(&self.cancelled)
75    }
76
77    pub(super) fn register_current_worker(&self) -> RegisteredStreamWorker {
78        *self.worker.lock().expect("stream worker slot poisoned") = Some(thread::current());
79        RegisteredStreamWorker {
80            cancellation: self.clone(),
81        }
82    }
83
84    fn cancel(&self) {
85        self.cancelled.store(true, Ordering::SeqCst);
86        let worker = self
87            .worker
88            .lock()
89            .expect("stream worker slot poisoned")
90            .clone();
91        if let Some(worker) = worker {
92            worker.unpark();
93        }
94    }
95}
96
97impl Drop for RegisteredStreamWorker {
98    fn drop(&mut self) {
99        *self
100            .cancellation
101            .worker
102            .lock()
103            .expect("stream worker slot poisoned") = None;
104    }
105}
106
107impl<T> StreamCompletion<T> {
108    pub(crate) fn from_receiver(
109        receiver: oneshot::Receiver<StreamResult<T>>,
110        cancel_on_drop: Option<StreamCancellation>,
111    ) -> Self {
112        Self {
113            state: StreamCompletionState::Receiver(receiver),
114            cancel_on_drop,
115        }
116    }
117
118    pub(crate) fn ready(result: StreamResult<T>) -> Self {
119        Self {
120            state: StreamCompletionState::Ready(Some(result)),
121            cancel_on_drop: None,
122        }
123    }
124
125    pub fn wait(self) -> StreamResult<T> {
126        block_on(self)
127    }
128
129    #[must_use]
130    pub fn try_wait(&mut self) -> Option<StreamResult<T>> {
131        match &mut self.state {
132            StreamCompletionState::Ready(result) => result.take(),
133            StreamCompletionState::Receiver(receiver) => match receiver.try_recv() {
134                Ok(Some(result)) => Some(result),
135                Ok(None) => None,
136                Err(_) => Some(Err(StreamError::AbruptTermination)),
137            },
138        }
139    }
140}
141
142impl<T> Future for StreamCompletion<T> {
143    type Output = StreamResult<T>;
144
145    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146        match &mut self.state {
147            StreamCompletionState::Ready(result) => {
148                Poll::Ready(result.take().unwrap_or(Err(StreamError::AbruptTermination)))
149            }
150            StreamCompletionState::Receiver(receiver) => match Pin::new(receiver).poll(cx) {
151                Poll::Ready(Ok(result)) => Poll::Ready(result),
152                Poll::Ready(Err(_)) => Poll::Ready(Err(StreamError::AbruptTermination)),
153                Poll::Pending => Poll::Pending,
154            },
155        }
156    }
157}
158
159impl<T> Unpin for StreamCompletion<T> {}
160
161impl<T> Drop for StreamCompletion<T> {
162    fn drop(&mut self) {
163        if let Some(cancellation) = &self.cancel_on_drop {
164            cancellation.cancel();
165        }
166    }
167}
168
169impl<T> fmt::Debug for StreamCompletion<T> {
170    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171        f.debug_struct("StreamCompletion").finish_non_exhaustive()
172    }
173}