datum/stream/
completion.rs1use super::*;
2
3#[derive(Clone)]
4pub struct Cancellable {
5 pub(super) cancelled: Arc<AtomicBool>,
6 _keep_alive: Option<Arc<dyn Send + Sync>>,
7}
8
9impl Cancellable {
10 pub(super) fn new_with_keep_alive(keep_alive: Option<Arc<dyn Send + Sync>>) -> Self {
11 Self {
12 cancelled: Arc::new(AtomicBool::new(false)),
13 _keep_alive: keep_alive,
14 }
15 }
16
17 pub fn cancel(&self) -> bool {
18 !self.cancelled.swap(true, Ordering::SeqCst)
19 }
20
21 #[must_use]
22 pub fn is_cancelled(&self) -> bool {
23 self.cancelled.load(Ordering::SeqCst)
24 }
25}
26
27impl fmt::Debug for Cancellable {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.debug_struct("Cancellable").finish_non_exhaustive()
30 }
31}
32
33#[must_use = "dropping the StreamCompletion cancels the running stream; call .wait()/.try_wait() or keep it alive"]
34pub struct StreamCompletion<T> {
35 state: StreamCompletionState<T>,
36 cancel_on_drop: Option<StreamCancellation>,
37}
38
39enum StreamCompletionState<T> {
40 Ready(Option<StreamResult<T>>),
41 Receiver(oneshot::Receiver<StreamResult<T>>),
42}
43
44#[derive(Clone)]
45pub(crate) struct StreamCancellation {
46 cancelled: Arc<AtomicBool>,
47 worker: Arc<Mutex<Option<thread::Thread>>>,
48}
49
50pub(super) struct RegisteredStreamWorker {
51 cancellation: StreamCancellation,
52}
53
54impl StreamCancellation {
55 pub(super) fn new() -> Self {
56 Self {
57 cancelled: Arc::new(AtomicBool::new(false)),
58 worker: Arc::new(Mutex::new(None)),
59 }
60 }
61
62 pub(crate) fn for_external_completion() -> Self {
63 Self::new()
64 }
65
66 pub(crate) fn cancelled(&self) -> Arc<AtomicBool> {
67 Arc::clone(&self.cancelled)
68 }
69
70 pub(super) fn register_current_worker(&self) -> RegisteredStreamWorker {
71 *self.worker.lock().expect("stream worker slot poisoned") = Some(thread::current());
72 RegisteredStreamWorker {
73 cancellation: self.clone(),
74 }
75 }
76
77 fn cancel(&self) {
78 self.cancelled.store(true, Ordering::SeqCst);
79 let worker = self
80 .worker
81 .lock()
82 .expect("stream worker slot poisoned")
83 .clone();
84 if let Some(worker) = worker {
85 worker.unpark();
86 }
87 }
88}
89
90impl Drop for RegisteredStreamWorker {
91 fn drop(&mut self) {
92 *self
93 .cancellation
94 .worker
95 .lock()
96 .expect("stream worker slot poisoned") = None;
97 }
98}
99
100impl<T> StreamCompletion<T> {
101 pub(crate) fn from_receiver(
102 receiver: oneshot::Receiver<StreamResult<T>>,
103 cancel_on_drop: Option<StreamCancellation>,
104 ) -> Self {
105 Self {
106 state: StreamCompletionState::Receiver(receiver),
107 cancel_on_drop,
108 }
109 }
110
111 pub(crate) fn ready(result: StreamResult<T>) -> Self {
112 Self {
113 state: StreamCompletionState::Ready(Some(result)),
114 cancel_on_drop: None,
115 }
116 }
117
118 pub fn wait(self) -> StreamResult<T> {
119 block_on(self)
120 }
121
122 #[must_use]
123 pub fn try_wait(&mut self) -> Option<StreamResult<T>> {
124 match &mut self.state {
125 StreamCompletionState::Ready(result) => result.take(),
126 StreamCompletionState::Receiver(receiver) => match receiver.try_recv() {
127 Ok(Some(result)) => Some(result),
128 Ok(None) => None,
129 Err(_) => Some(Err(StreamError::AbruptTermination)),
130 },
131 }
132 }
133}
134
135impl<T> Future for StreamCompletion<T> {
136 type Output = StreamResult<T>;
137
138 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139 match &mut self.state {
140 StreamCompletionState::Ready(result) => {
141 Poll::Ready(result.take().unwrap_or(Err(StreamError::AbruptTermination)))
142 }
143 StreamCompletionState::Receiver(receiver) => match Pin::new(receiver).poll(cx) {
144 Poll::Ready(Ok(result)) => Poll::Ready(result),
145 Poll::Ready(Err(_)) => Poll::Ready(Err(StreamError::AbruptTermination)),
146 Poll::Pending => Poll::Pending,
147 },
148 }
149 }
150}
151
152impl<T> Unpin for StreamCompletion<T> {}
153
154impl<T> Drop for StreamCompletion<T> {
155 fn drop(&mut self) {
156 if let Some(cancellation) = &self.cancel_on_drop {
157 cancellation.cancel();
158 }
159 }
160}
161
162impl<T> fmt::Debug for StreamCompletion<T> {
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 f.debug_struct("StreamCompletion").finish_non_exhaustive()
165 }
166}