Skip to main content

datum/stream/
completion.rs

1use super::*;
2
3#[derive(Clone)]
4pub struct Cancellable {
5    pub(super) cancelled: Arc<AtomicBool>,
6    _keep_alive: Option<Arc<dyn Send + Sync>>,
7}
8
9impl Cancellable {
10    pub(super) fn new_with_keep_alive(keep_alive: Option<Arc<dyn Send + Sync>>) -> Self {
11        Self {
12            cancelled: Arc::new(AtomicBool::new(false)),
13            _keep_alive: keep_alive,
14        }
15    }
16
17    pub fn cancel(&self) -> bool {
18        !self.cancelled.swap(true, Ordering::SeqCst)
19    }
20
21    #[must_use]
22    pub fn is_cancelled(&self) -> bool {
23        self.cancelled.load(Ordering::SeqCst)
24    }
25}
26
27impl fmt::Debug for Cancellable {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        f.debug_struct("Cancellable").finish_non_exhaustive()
30    }
31}
32
33#[must_use = "dropping the StreamCompletion cancels the running stream; call .wait()/.try_wait() or keep it alive"]
34pub struct StreamCompletion<T> {
35    state: StreamCompletionState<T>,
36    cancel_on_drop: Option<StreamCancellation>,
37}
38
39enum StreamCompletionState<T> {
40    Ready(Option<StreamResult<T>>),
41    Receiver(oneshot::Receiver<StreamResult<T>>),
42}
43
44#[derive(Clone)]
45pub(crate) struct StreamCancellation {
46    cancelled: Arc<AtomicBool>,
47    worker: Arc<Mutex<Option<thread::Thread>>>,
48}
49
50pub(super) struct RegisteredStreamWorker {
51    cancellation: StreamCancellation,
52}
53
54impl StreamCancellation {
55    pub(super) fn new() -> Self {
56        Self {
57            cancelled: Arc::new(AtomicBool::new(false)),
58            worker: Arc::new(Mutex::new(None)),
59        }
60    }
61
62    pub(super) fn cancelled(&self) -> Arc<AtomicBool> {
63        Arc::clone(&self.cancelled)
64    }
65
66    pub(super) fn register_current_worker(&self) -> RegisteredStreamWorker {
67        *self.worker.lock().expect("stream worker slot poisoned") = Some(thread::current());
68        RegisteredStreamWorker {
69            cancellation: self.clone(),
70        }
71    }
72
73    fn cancel(&self) {
74        self.cancelled.store(true, Ordering::SeqCst);
75        let worker = self
76            .worker
77            .lock()
78            .expect("stream worker slot poisoned")
79            .clone();
80        if let Some(worker) = worker {
81            worker.unpark();
82        }
83    }
84}
85
86impl Drop for RegisteredStreamWorker {
87    fn drop(&mut self) {
88        *self
89            .cancellation
90            .worker
91            .lock()
92            .expect("stream worker slot poisoned") = None;
93    }
94}
95
96impl<T> StreamCompletion<T> {
97    pub(crate) fn from_receiver(
98        receiver: oneshot::Receiver<StreamResult<T>>,
99        cancel_on_drop: Option<StreamCancellation>,
100    ) -> Self {
101        Self {
102            state: StreamCompletionState::Receiver(receiver),
103            cancel_on_drop,
104        }
105    }
106
107    pub(crate) fn ready(result: StreamResult<T>) -> Self {
108        Self {
109            state: StreamCompletionState::Ready(Some(result)),
110            cancel_on_drop: None,
111        }
112    }
113
114    pub fn wait(self) -> StreamResult<T> {
115        block_on(self)
116    }
117
118    #[must_use]
119    pub fn try_wait(&mut self) -> Option<StreamResult<T>> {
120        match &mut self.state {
121            StreamCompletionState::Ready(result) => result.take(),
122            StreamCompletionState::Receiver(receiver) => match receiver.try_recv() {
123                Ok(Some(result)) => Some(result),
124                Ok(None) => None,
125                Err(_) => Some(Err(StreamError::AbruptTermination)),
126            },
127        }
128    }
129}
130
131impl<T> Future for StreamCompletion<T> {
132    type Output = StreamResult<T>;
133
134    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
135        match &mut self.state {
136            StreamCompletionState::Ready(result) => {
137                Poll::Ready(result.take().unwrap_or(Err(StreamError::AbruptTermination)))
138            }
139            StreamCompletionState::Receiver(receiver) => match Pin::new(receiver).poll(cx) {
140                Poll::Ready(Ok(result)) => Poll::Ready(result),
141                Poll::Ready(Err(_)) => Poll::Ready(Err(StreamError::AbruptTermination)),
142                Poll::Pending => Poll::Pending,
143            },
144        }
145    }
146}
147
148impl<T> Unpin for StreamCompletion<T> {}
149
150impl<T> Drop for StreamCompletion<T> {
151    fn drop(&mut self) {
152        if let Some(cancellation) = &self.cancel_on_drop {
153            cancellation.cancel();
154        }
155    }
156}
157
158impl<T> fmt::Debug for StreamCompletion<T> {
159    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160        f.debug_struct("StreamCompletion").finish_non_exhaustive()
161    }
162}