datum/stream/
completion.rs1use super::*;
9
10#[derive(Clone)]
11pub struct Cancellable {
12 pub(super) cancelled: Arc<AtomicBool>,
13 _keep_alive: Option<Arc<dyn Send + Sync>>,
14}
15
16impl Cancellable {
17 pub(super) fn new_with_keep_alive(keep_alive: Option<Arc<dyn Send + Sync>>) -> Self {
18 Self {
19 cancelled: Arc::new(AtomicBool::new(false)),
20 _keep_alive: keep_alive,
21 }
22 }
23
24 pub fn cancel(&self) -> bool {
25 !self.cancelled.swap(true, Ordering::SeqCst)
26 }
27
28 #[must_use]
29 pub fn is_cancelled(&self) -> bool {
30 self.cancelled.load(Ordering::SeqCst)
31 }
32}
33
34impl fmt::Debug for Cancellable {
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 f.debug_struct("Cancellable").finish_non_exhaustive()
37 }
38}
39
40#[must_use = "dropping the StreamCompletion cancels the running stream; call .wait()/.try_wait() or keep it alive"]
41pub struct StreamCompletion<T> {
42 state: StreamCompletionState<T>,
43 cancel_on_drop: Option<StreamCancellation>,
44}
45
46enum StreamCompletionState<T> {
47 Ready(Option<StreamResult<T>>),
48 Receiver(oneshot::Receiver<StreamResult<T>>),
49}
50
51#[derive(Clone)]
52pub(crate) struct StreamCancellation {
53 cancelled: Arc<AtomicBool>,
54 worker: Arc<Mutex<Option<thread::Thread>>>,
55}
56
57pub(super) struct RegisteredStreamWorker {
58 cancellation: StreamCancellation,
59}
60
61impl StreamCancellation {
62 pub(super) fn new() -> Self {
63 Self {
64 cancelled: Arc::new(AtomicBool::new(false)),
65 worker: Arc::new(Mutex::new(None)),
66 }
67 }
68
69 pub(crate) fn for_external_completion() -> Self {
70 Self::new()
71 }
72
73 pub(crate) fn cancelled(&self) -> Arc<AtomicBool> {
74 Arc::clone(&self.cancelled)
75 }
76
77 pub(super) fn register_current_worker(&self) -> RegisteredStreamWorker {
78 *self.worker.lock().expect("stream worker slot poisoned") = Some(thread::current());
79 RegisteredStreamWorker {
80 cancellation: self.clone(),
81 }
82 }
83
84 fn cancel(&self) {
85 self.cancelled.store(true, Ordering::SeqCst);
86 let worker = self
87 .worker
88 .lock()
89 .expect("stream worker slot poisoned")
90 .clone();
91 if let Some(worker) = worker {
92 worker.unpark();
93 }
94 }
95}
96
97impl Drop for RegisteredStreamWorker {
98 fn drop(&mut self) {
99 *self
100 .cancellation
101 .worker
102 .lock()
103 .expect("stream worker slot poisoned") = None;
104 }
105}
106
107impl<T> StreamCompletion<T> {
108 pub(crate) fn from_receiver(
109 receiver: oneshot::Receiver<StreamResult<T>>,
110 cancel_on_drop: Option<StreamCancellation>,
111 ) -> Self {
112 Self {
113 state: StreamCompletionState::Receiver(receiver),
114 cancel_on_drop,
115 }
116 }
117
118 pub(crate) fn ready(result: StreamResult<T>) -> Self {
119 Self {
120 state: StreamCompletionState::Ready(Some(result)),
121 cancel_on_drop: None,
122 }
123 }
124
125 pub fn wait(self) -> StreamResult<T> {
126 block_on(self)
127 }
128
129 #[must_use]
130 pub fn try_wait(&mut self) -> Option<StreamResult<T>> {
131 match &mut self.state {
132 StreamCompletionState::Ready(result) => result.take(),
133 StreamCompletionState::Receiver(receiver) => match receiver.try_recv() {
134 Ok(Some(result)) => Some(result),
135 Ok(None) => None,
136 Err(_) => Some(Err(StreamError::AbruptTermination)),
137 },
138 }
139 }
140}
141
142impl<T> Future for StreamCompletion<T> {
143 type Output = StreamResult<T>;
144
145 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146 match &mut self.state {
147 StreamCompletionState::Ready(result) => {
148 Poll::Ready(result.take().unwrap_or(Err(StreamError::AbruptTermination)))
149 }
150 StreamCompletionState::Receiver(receiver) => match Pin::new(receiver).poll(cx) {
151 Poll::Ready(Ok(result)) => Poll::Ready(result),
152 Poll::Ready(Err(_)) => Poll::Ready(Err(StreamError::AbruptTermination)),
153 Poll::Pending => Poll::Pending,
154 },
155 }
156 }
157}
158
159impl<T> Unpin for StreamCompletion<T> {}
160
161impl<T> Drop for StreamCompletion<T> {
162 fn drop(&mut self) {
163 if let Some(cancellation) = &self.cancel_on_drop {
164 cancellation.cancel();
165 }
166 }
167}
168
169impl<T> fmt::Debug for StreamCompletion<T> {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171 f.debug_struct("StreamCompletion").finish_non_exhaustive()
172 }
173}