mssf_core/sync/channel.rs
1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation. All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{
7 future::Future,
8 pin::Pin,
9 task::{Context, Poll},
10};
11
12use mssf_com::FabricCommon::IFabricAsyncOperationContext;
13
14use crate::{
15 ErrorCode,
16 runtime::executor::{BoxedCancelToken, EventFuture},
17};
18
19pub use futures_channel::oneshot::{self, Receiver, Sender};
20
21// Token that wraps oneshot receiver.
22// The future recieve does not have error. This is designed for the use
23// case where SF guarantees that sender will be called.
24pub struct FabricReceiver<T> {
25 rx: Receiver<T>,
26 token: Option<BoxedCancelToken>,
27 // event for cancelling
28 cancel_event: Option<Pin<Box<dyn EventFuture + 'static>>>,
29 // saved ctx from SF Begin COM api for cancalling.
30 ctx: Option<IFabricAsyncOperationContext>,
31}
32
33impl<T> FabricReceiver<T> {
34 fn new(rx: Receiver<T>, token: Option<BoxedCancelToken>) -> FabricReceiver<T> {
35 FabricReceiver {
36 rx,
37 cancel_event: token.as_ref().map(|t| t.wait()),
38 token,
39 ctx: None,
40 }
41 }
42
43 // This does not handle cancel. It is commented out because it is not used.
44 // pub(crate) fn blocking_recv(self) -> crate::Result<T> {
45 // if let Some(t) = self.token {
46 // if t.is_cancelled() {
47 // return Err(ErrorCode::E_ABORT.into());
48 // }
49 // }
50 // // sender must send stuff so that there is not error.
51 // Ok(self.rx.blocking_recv().unwrap())
52 // }
53
54 // Set the SF ctx to hook up cancellation.
55 pub(crate) fn set_ctx(&mut self, ctx: IFabricAsyncOperationContext) {
56 let prev = self.ctx.replace(ctx);
57 assert!(prev.is_none());
58 }
59
60 // Cancels the inner SF operation if exists, and reset the ctx.
61 fn cancel_inner_ctx(&mut self) -> crate::WinResult<()> {
62 if let Some(ctx) = &self.ctx {
63 if let Err(e) = unsafe { ctx.Cancel() } {
64 // fail to cancel inner operation.
65 return Err(e);
66 } else {
67 // clear the sf ctx to avoid cancel twice.
68 self.ctx.take();
69 }
70 } else {
71 // The inner ctx can be empty after we already cancelled the inner ctx.
72 // This can happen because we cancel during polling, and polling can
73 // happen many times.
74 }
75 Ok(())
76 }
77}
78
79// Returns error if cancelled.
80// If there is an inner SF ctx, cancellation signal will
81// trigger cancellation of the ctx.
82impl<T> Future for FabricReceiver<T> {
83 // The error code should be OperationCanceled, unless cancellation
84 // of SF ctx returns other errors.
85 // (TODO: observe other error code from SF, maybe some code should be ignored).
86 type Output = crate::WinResult<T>;
87 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
88 // Poll the receiver first, if ready then return the output,
89 // else poll the cancellation token, if cancelled propergate the cancel to SF ctx,
90 // and return pending. SF task should continue finish execute in the background,
91 // and finish with error code OperationCancelled
92 // and send the error code from FabricSender.
93 //
94 // There can be the case that cancellation wakes the waker, but receiver
95 // then got the result. The next poll will return received output rather
96 // than cancelled error.
97
98 let this = self.get_mut();
99
100 // Try to receive the value from the sender
101 let inner = <Receiver<T> as Future>::poll(Pin::new(&mut this.rx), cx);
102 match (inner, this.token.as_ref()) {
103 (Poll::Ready(Ok(data)), _) => Poll::Ready(Ok(data)),
104 (Poll::Ready(Err(_)), Some(t)) => {
105 if t.is_cancelled() {
106 // clear the token since we only propergate the signal once.
107 this.token.take();
108 this.cancel_event.take();
109 // cancel the SF ctx and clear it.
110 if let Err(e) = this.cancel_inner_ctx() {
111 Poll::Ready(Err(e))
112 } else {
113 Poll::Ready(Err(ErrorCode::E_ABORT.into()))
114 }
115 } else {
116 panic!("sender dropped without sending")
117 }
118 }
119 (Poll::Ready(Err(_)), None) => {
120 panic!("sender dropped without sending")
121 }
122 (Poll::Pending, Some(_)) => {
123 // If the action is canceled we can safely stop and return canceled error.
124 // this is cancel safe so we can poll it once and discard
125 let event = this
126 .cancel_event
127 .as_mut()
128 .expect("cancel event should be set");
129 let inner = std::pin::pin!(event).poll(cx);
130 match inner {
131 Poll::Ready(_) => {
132 // clear the token since we only propergate the signal once.
133 this.cancel_event.take();
134 this.cancel_event.take();
135 // operation cancelled. Propergate to inner sf ctx.
136 if let Err(e) = this.cancel_inner_ctx() {
137 Poll::Ready(Err(e))
138 } else {
139 // The cancellation is propergated to sf task,
140 // the receiver from now on should wait for the
141 // final result from the sf task. (as we have cleared the token)
142 // Most likely the task finishes with OperationCancelled error code.
143 Poll::Pending
144 }
145 }
146 Poll::Pending => Poll::Pending,
147 }
148 }
149 (Poll::Pending, None) => Poll::Pending,
150 }
151 }
152}
153
154pub struct FabricSender<T> {
155 tx: Sender<T>,
156 token: Option<BoxedCancelToken>,
157}
158
159impl<T> FabricSender<T> {
160 fn new(tx: Sender<T>, token: Option<BoxedCancelToken>) -> FabricSender<T> {
161 FabricSender { tx, token }
162 }
163
164 pub fn send(self, data: T) {
165 let e = self.tx.send(data);
166 if e.is_err() {
167 // In SF use case receiver should not be dropped by user.
168 // If it acctually dropped by user, it is ok to ignore because user
169 // does not want to want the value any more. But too bad SF has done
170 // the work to get the value.
171
172 // receiver should never be dropped if operation is not cancelled.
173 if let Some(t) = self.token {
174 debug_assert!(
175 t.is_cancelled(),
176 "task should be cancelled when receiver dropped."
177 );
178 }
179 }
180 }
181}
182
183/// Creates a fabric oneshot channel.
184/// Operation can be cancelled by cancelling the token.
185pub fn oneshot_channel<T>(token: Option<BoxedCancelToken>) -> (FabricSender<T>, FabricReceiver<T>) {
186 let (tx, rx) = oneshot::channel::<T>();
187 (
188 FabricSender::new(tx, token.clone()),
189 FabricReceiver::new(rx, token),
190 )
191}
192
193#[cfg(test)]
194mod test {
195
196 use crate::{
197 ErrorCode,
198 sync::{SimpleCancelToken, oneshot_channel},
199 };
200
201 /// Test various cancellation cases for the channel used
202 /// to send data in proxy layer.
203 #[tokio::test]
204 async fn test_channel() {
205 // success send
206 {
207 let (tx, rx) = oneshot_channel::<bool>(Some(SimpleCancelToken::new_boxed()));
208 tx.send(true);
209 assert!(rx.await.unwrap());
210 }
211 // receiver cancelled after send, still received the result.
212 {
213 let token = SimpleCancelToken::new_boxed();
214 let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
215 tx.send(true);
216 token.cancel();
217 assert!(rx.await.unwrap());
218 }
219 // receiver cancelled before send, still received the result.
220 {
221 let token = SimpleCancelToken::new_boxed();
222 let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
223 token.cancel();
224 tx.send(true);
225 assert!(rx.await.unwrap(),);
226 }
227 // receiver cancelled and droped, send is no op
228 {
229 let token = SimpleCancelToken::new_boxed();
230 let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
231 token.cancel();
232 std::mem::drop(rx);
233 tx.send(true);
234 }
235 // receiver cancelled and sender dropped. receiver get error
236 {
237 let token = SimpleCancelToken::new_boxed();
238 let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
239 token.cancel();
240 std::mem::drop(tx);
241 assert_eq!(rx.await.unwrap_err(), ErrorCode::E_ABORT.into());
242 }
243 }
244}