datum/stream/
completion.rs1use super::*;
2
3#[derive(Clone)]
4pub struct Cancellable {
5 pub(super) cancelled: Arc<AtomicBool>,
6 _keep_alive: Option<Arc<dyn Send + Sync>>,
7}
8
9impl Cancellable {
10 pub(super) fn new_with_keep_alive(keep_alive: Option<Arc<dyn Send + Sync>>) -> Self {
11 Self {
12 cancelled: Arc::new(AtomicBool::new(false)),
13 _keep_alive: keep_alive,
14 }
15 }
16
17 pub fn cancel(&self) -> bool {
18 !self.cancelled.swap(true, Ordering::SeqCst)
19 }
20
21 #[must_use]
22 pub fn is_cancelled(&self) -> bool {
23 self.cancelled.load(Ordering::SeqCst)
24 }
25}
26
27impl fmt::Debug for Cancellable {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.debug_struct("Cancellable").finish_non_exhaustive()
30 }
31}
32
33#[must_use = "dropping the StreamCompletion cancels the running stream; call .wait()/.try_wait() or keep it alive"]
34pub struct StreamCompletion<T> {
35 state: StreamCompletionState<T>,
36 cancel_on_drop: Option<StreamCancellation>,
37}
38
39enum StreamCompletionState<T> {
40 Ready(Option<StreamResult<T>>),
41 Receiver(oneshot::Receiver<StreamResult<T>>),
42}
43
44#[derive(Clone)]
45pub(crate) struct StreamCancellation {
46 cancelled: Arc<AtomicBool>,
47 worker: Arc<Mutex<Option<thread::Thread>>>,
48}
49
50pub(super) struct RegisteredStreamWorker {
51 cancellation: StreamCancellation,
52}
53
54impl StreamCancellation {
55 pub(super) fn new() -> Self {
56 Self {
57 cancelled: Arc::new(AtomicBool::new(false)),
58 worker: Arc::new(Mutex::new(None)),
59 }
60 }
61
62 pub(super) fn cancelled(&self) -> Arc<AtomicBool> {
63 Arc::clone(&self.cancelled)
64 }
65
66 pub(super) fn register_current_worker(&self) -> RegisteredStreamWorker {
67 *self.worker.lock().expect("stream worker slot poisoned") = Some(thread::current());
68 RegisteredStreamWorker {
69 cancellation: self.clone(),
70 }
71 }
72
73 fn cancel(&self) {
74 self.cancelled.store(true, Ordering::SeqCst);
75 let worker = self
76 .worker
77 .lock()
78 .expect("stream worker slot poisoned")
79 .clone();
80 if let Some(worker) = worker {
81 worker.unpark();
82 }
83 }
84}
85
86impl Drop for RegisteredStreamWorker {
87 fn drop(&mut self) {
88 *self
89 .cancellation
90 .worker
91 .lock()
92 .expect("stream worker slot poisoned") = None;
93 }
94}
95
96impl<T> StreamCompletion<T> {
97 pub(crate) fn from_receiver(
98 receiver: oneshot::Receiver<StreamResult<T>>,
99 cancel_on_drop: Option<StreamCancellation>,
100 ) -> Self {
101 Self {
102 state: StreamCompletionState::Receiver(receiver),
103 cancel_on_drop,
104 }
105 }
106
107 pub(crate) fn ready(result: StreamResult<T>) -> Self {
108 Self {
109 state: StreamCompletionState::Ready(Some(result)),
110 cancel_on_drop: None,
111 }
112 }
113
114 pub fn wait(self) -> StreamResult<T> {
115 block_on(self)
116 }
117
118 #[must_use]
119 pub fn try_wait(&mut self) -> Option<StreamResult<T>> {
120 match &mut self.state {
121 StreamCompletionState::Ready(result) => result.take(),
122 StreamCompletionState::Receiver(receiver) => match receiver.try_recv() {
123 Ok(Some(result)) => Some(result),
124 Ok(None) => None,
125 Err(_) => Some(Err(StreamError::AbruptTermination)),
126 },
127 }
128 }
129}
130
131impl<T> Future for StreamCompletion<T> {
132 type Output = StreamResult<T>;
133
134 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
135 match &mut self.state {
136 StreamCompletionState::Ready(result) => {
137 Poll::Ready(result.take().unwrap_or(Err(StreamError::AbruptTermination)))
138 }
139 StreamCompletionState::Receiver(receiver) => match Pin::new(receiver).poll(cx) {
140 Poll::Ready(Ok(result)) => Poll::Ready(result),
141 Poll::Ready(Err(_)) => Poll::Ready(Err(StreamError::AbruptTermination)),
142 Poll::Pending => Poll::Pending,
143 },
144 }
145 }
146}
147
148impl<T> Unpin for StreamCompletion<T> {}
149
150impl<T> Drop for StreamCompletion<T> {
151 fn drop(&mut self) {
152 if let Some(cancellation) = &self.cancel_on_drop {
153 cancellation.cancel();
154 }
155 }
156}
157
158impl<T> fmt::Debug for StreamCompletion<T> {
159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160 f.debug_struct("StreamCompletion").finish_non_exhaustive()
161 }
162}