mssf_core/sync/channel.rs
1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation. All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{
7 future::Future,
8 pin::Pin,
9 task::{Context, Poll},
10};
11
12use mssf_com::FabricCommon::IFabricAsyncOperationContext;
13
14use crate::{
15 ErrorCode,
16 runtime::executor::{BoxedCancelToken, EventFuture},
17};
18
19pub use futures_channel::oneshot::{self, Receiver, Sender};
20
21// Token that wraps oneshot receiver.
22// SF guarantees that the sender callback will be invoked, but the receiver
23// may be dropped before that (e.g. tokio::select! cancellation), in which
24// case the send is silently ignored.
25pub struct FabricReceiver<T> {
26 rx: Receiver<T>,
27 token: Option<BoxedCancelToken>,
28 // event from the token, this is needed to poll the cancellation in the receiver future.
29 cancel_event: Option<Pin<Box<dyn EventFuture + 'static>>>,
30 // saved ctx from SF Begin COM api for cancelling.
31 ctx: Option<IFabricAsyncOperationContext>,
32}
33
34impl<T> FabricReceiver<T> {
35 fn new(rx: Receiver<T>, token: Option<BoxedCancelToken>) -> FabricReceiver<T> {
36 FabricReceiver {
37 rx,
38 cancel_event: token.as_ref().map(|t| t.wait()),
39 token,
40 ctx: None,
41 }
42 }
43
44 // This does not handle cancel. It is commented out because it is not used.
45 // pub(crate) fn blocking_recv(self) -> crate::Result<T> {
46 // if let Some(t) = self.token {
47 // if t.is_cancelled() {
48 // return Err(ErrorCode::E_ABORT.into());
49 // }
50 // }
51 // // sender must send stuff so that there is not error.
52 // Ok(self.rx.blocking_recv().unwrap())
53 // }
54
55 // Set the SF ctx to hook up cancellation.
56 pub(crate) fn set_ctx(&mut self, ctx: IFabricAsyncOperationContext) {
57 let prev = self.ctx.replace(ctx);
58 assert!(prev.is_none());
59 }
60
61 // Cancels the inner SF operation if exists, and reset the ctx.
62 fn cancel_inner_ctx(&mut self) -> crate::WinResult<()> {
63 if let Some(ctx) = &self.ctx {
64 if let Err(e) = unsafe { ctx.Cancel() } {
65 // fail to cancel inner operation.
66 return Err(e);
67 } else {
68 // clear the sf ctx to avoid cancel twice.
69 self.ctx.take();
70 }
71 } else {
72 // The inner ctx can be empty after we already cancelled the inner ctx.
73 // This can happen because we cancel during polling, and polling can
74 // happen many times.
75 }
76 Ok(())
77 }
78
79 // Cancel token no longer needed.
80 fn clear_cancel_fields(&mut self) {
81 self.token.take();
82 self.cancel_event.take();
83 }
84}
85
86// Returns error if cancelled.
87// If there is an inner SF ctx, cancellation signal will
88// trigger cancellation of the ctx.
89impl<T> Future for FabricReceiver<T> {
90 // The error code should be OperationCanceled, unless cancellation
91 // of SF ctx returns other errors.
92 // (TODO: observe other error code from SF, maybe some code should be ignored).
93 type Output = crate::WinResult<T>;
94 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
95 // Poll the receiver first, if ready then return the output,
96 // else poll the cancellation token, if cancelled propagate the cancel to SF ctx,
97 // and return pending. SF task should continue finish execute in the background,
98 // and finish with error code OperationCancelled
99 // and send the error code from FabricSender.
100 //
101 // There can be the case that cancellation wakes the waker, but receiver
102 // then got the result. The next poll will return received output rather
103 // than cancelled error.
104
105 let this = self.get_mut();
106
107 // Try to receive the value from the sender
108 let inner = <Receiver<T> as Future>::poll(Pin::new(&mut this.rx), cx);
109 match (inner, this.token.as_ref()) {
110 (Poll::Ready(Ok(data)), _) => {
111 // Result received successfully; clear the token so Drop
112 // does not cancel the operation.
113 this.clear_cancel_fields();
114 Poll::Ready(Ok(data))
115 }
116 (Poll::Ready(Err(_)), Some(t)) => {
117 if t.is_cancelled() {
118 // clear the token since we only propagate the signal once.
119 this.clear_cancel_fields();
120 // cancel the SF ctx and clear it.
121 if let Err(e) = this.cancel_inner_ctx() {
122 Poll::Ready(Err(e))
123 } else {
124 Poll::Ready(Err(ErrorCode::E_ABORT.into()))
125 }
126 } else {
127 panic!("sender dropped without sending")
128 }
129 }
130 (Poll::Ready(Err(_)), None) => {
131 panic!("sender dropped without sending")
132 }
133 (Poll::Pending, Some(_)) => {
134 // If the action is canceled we can safely stop and return canceled error.
135 // this is cancel safe so we can poll it once and discard
136 let event = this
137 .cancel_event
138 .as_mut()
139 .expect("cancel event should be set");
140 let inner = std::pin::pin!(event).poll(cx);
141 match inner {
142 Poll::Ready(_) => {
143 // clear the token since we only propagate the signal once.
144 this.clear_cancel_fields();
145 // operation cancelled. propagate to inner sf ctx.
146 if let Err(e) = this.cancel_inner_ctx() {
147 Poll::Ready(Err(e))
148 } else {
149 // The cancellation is propagated to sf task,
150 // the receiver from now on should wait for the
151 // final result from the sf task. (as we have cleared the token)
152 // Most likely the task finishes with OperationCancelled error code.
153 Poll::Pending
154 }
155 }
156 Poll::Pending => Poll::Pending,
157 }
158 }
159 (Poll::Pending, None) => Poll::Pending,
160 }
161 }
162}
163
164// If nobody is waiting for the result, the inner SF operation should be cancelled.
165// We intentionally do not cancel the user-passed token here,
166// as it is user-owned and cancelling it could have unintended side effects.
167impl<T> Drop for FabricReceiver<T> {
168 fn drop(&mut self) {
169 // Note: when the token is already cancelled but the receiver was never polled,
170 // the cancellation signal has not been propagated to the inner SF ctx,
171 // because propagation only happens during poll.
172 // In this case we skip cancel_inner_ctx; the SF operation will be left
173 // in the background and eventually finish on its own.
174 if let Some(t) = &self.token
175 && !t.is_cancelled()
176 && let Err(_e) = self.cancel_inner_ctx()
177 {
178 #[cfg(feature = "tracing")]
179 tracing::debug!("FabricReceiver::drop: cancel_inner_ctx failed: {_e}");
180 }
181 }
182}
183
184pub struct FabricSender<T> {
185 tx: Sender<T>,
186}
187
188impl<T> FabricSender<T> {
189 fn new(tx: Sender<T>) -> FabricSender<T> {
190 FabricSender { tx }
191 }
192
193 pub fn send(self, data: T) {
194 // Ignore send error: receiver may have been dropped (e.g. tokio::select! cancellation).
195 if self.tx.send(data).is_err() {
196 #[cfg(feature = "tracing")]
197 tracing::debug!("FabricSender::send: receiver already dropped, ignoring send error");
198 }
199 }
200}
201
202/// Creates a fabric oneshot channel.
203/// Operation can be cancelled by cancelling the token.
204pub fn oneshot_channel<T>(token: Option<BoxedCancelToken>) -> (FabricSender<T>, FabricReceiver<T>) {
205 let (tx, rx) = oneshot::channel::<T>();
206 (FabricSender::new(tx), FabricReceiver::new(rx, token))
207}
208
209#[cfg(test)]
210mod test {
211
212 use crate::{
213 ErrorCode,
214 sync::{SimpleCancelToken, oneshot_channel},
215 };
216
217 /// Test various cancellation cases for the channel used
218 /// to send data in proxy layer.
219 #[tokio::test]
220 async fn test_channel() {
221 // success send
222 {
223 let (tx, rx) = oneshot_channel::<bool>(Some(SimpleCancelToken::new_boxed()));
224 tx.send(true);
225 assert!(rx.await.unwrap());
226 }
227 // receiver cancelled after send, still received the result.
228 {
229 let token = SimpleCancelToken::new_boxed();
230 let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
231 tx.send(true);
232 token.cancel();
233 assert!(rx.await.unwrap());
234 }
235 // receiver cancelled before send, still received the result.
236 {
237 let token = SimpleCancelToken::new_boxed();
238 let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
239 token.cancel();
240 tx.send(true);
241 assert!(rx.await.unwrap(),);
242 }
243 // receiver cancelled and droped, send is no op
244 {
245 let token = SimpleCancelToken::new_boxed();
246 let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
247 token.cancel();
248 std::mem::drop(rx);
249 tx.send(true);
250 }
251 // receiver cancelled and sender dropped. receiver get error
252 {
253 let token = SimpleCancelToken::new_boxed();
254 let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
255 token.cancel();
256 std::mem::drop(tx);
257 assert_eq!(rx.await.unwrap_err(), ErrorCode::E_ABORT.into());
258 }
259 }
260}