Skip to main content

mssf_core/sync/
channel.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{
7    future::Future,
8    pin::Pin,
9    task::{Context, Poll},
10};
11
12use mssf_com::FabricCommon::IFabricAsyncOperationContext;
13
14use crate::{
15    ErrorCode,
16    runtime::executor::{BoxedCancelToken, EventFuture},
17};
18
19pub use futures_channel::oneshot::{self, Receiver, Sender};
20
21// Token that wraps oneshot receiver.
22// SF guarantees that the sender callback will be invoked, but the receiver
23// may be dropped before that (e.g. tokio::select! cancellation), in which
24// case the send is silently ignored.
25pub struct FabricReceiver<T> {
26    rx: Receiver<T>,
27    token: Option<BoxedCancelToken>,
28    // event from the token, this is needed to poll the cancellation in the receiver future.
29    cancel_event: Option<Pin<Box<dyn EventFuture + 'static>>>,
30    // saved ctx from SF Begin COM api for cancelling.
31    ctx: Option<IFabricAsyncOperationContext>,
32}
33
34impl<T> FabricReceiver<T> {
35    fn new(rx: Receiver<T>, token: Option<BoxedCancelToken>) -> FabricReceiver<T> {
36        FabricReceiver {
37            rx,
38            cancel_event: token.as_ref().map(|t| t.wait()),
39            token,
40            ctx: None,
41        }
42    }
43
44    // This does not handle cancel. It is commented out because it is not used.
45    // pub(crate) fn blocking_recv(self) -> crate::Result<T> {
46    //     if let Some(t) = self.token {
47    //         if t.is_cancelled() {
48    //             return Err(ErrorCode::E_ABORT.into());
49    //         }
50    //     }
51    //     // sender must send stuff so that there is not error.
52    //     Ok(self.rx.blocking_recv().unwrap())
53    // }
54
55    // Set the SF ctx to hook up cancellation.
56    pub(crate) fn set_ctx(&mut self, ctx: IFabricAsyncOperationContext) {
57        let prev = self.ctx.replace(ctx);
58        assert!(prev.is_none());
59    }
60
61    // Cancels the inner SF operation if exists, and reset the ctx.
62    fn cancel_inner_ctx(&mut self) -> crate::WinResult<()> {
63        if let Some(ctx) = &self.ctx {
64            if let Err(e) = unsafe { ctx.Cancel() } {
65                // fail to cancel inner operation.
66                return Err(e);
67            } else {
68                // clear the sf ctx to avoid cancel twice.
69                self.ctx.take();
70            }
71        } else {
72            // The inner ctx can be empty after we already cancelled the inner ctx.
73            // This can happen because we cancel during polling, and polling can
74            // happen many times.
75        }
76        Ok(())
77    }
78
79    // Cancel token no longer needed.
80    fn clear_cancel_fields(&mut self) {
81        self.token.take();
82        self.cancel_event.take();
83    }
84}
85
86// Returns error if cancelled.
87// If there is an inner SF ctx, cancellation signal will
88// trigger cancellation of the ctx.
89impl<T> Future for FabricReceiver<T> {
90    // The error code should be OperationCanceled, unless cancellation
91    // of SF ctx returns other errors.
92    // (TODO: observe other error code from SF, maybe some code should be ignored).
93    type Output = crate::WinResult<T>;
94    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
95        // Poll the receiver first, if ready then return the output,
96        // else poll the cancellation token, if cancelled propagate the cancel to SF ctx,
97        // and return pending. SF task should continue finish execute in the background,
98        // and finish with error code OperationCancelled
99        // and send the error code from FabricSender.
100        //
101        // There can be the case that cancellation wakes the waker, but receiver
102        // then got the result. The next poll will return received output rather
103        // than cancelled error.
104
105        let this = self.get_mut();
106
107        // Try to receive the value from the sender
108        let inner = <Receiver<T> as Future>::poll(Pin::new(&mut this.rx), cx);
109        match (inner, this.token.as_ref()) {
110            (Poll::Ready(Ok(data)), _) => {
111                // Result received successfully; clear the token so Drop
112                // does not cancel the operation.
113                this.clear_cancel_fields();
114                Poll::Ready(Ok(data))
115            }
116            (Poll::Ready(Err(_)), Some(t)) => {
117                if t.is_cancelled() {
118                    // clear the token since we only propagate the signal once.
119                    this.clear_cancel_fields();
120                    // cancel the SF ctx and clear it.
121                    if let Err(e) = this.cancel_inner_ctx() {
122                        Poll::Ready(Err(e))
123                    } else {
124                        Poll::Ready(Err(ErrorCode::E_ABORT.into()))
125                    }
126                } else {
127                    panic!("sender dropped without sending")
128                }
129            }
130            (Poll::Ready(Err(_)), None) => {
131                panic!("sender dropped without sending")
132            }
133            (Poll::Pending, Some(_)) => {
134                // If the action is canceled we can safely stop and return canceled error.
135                // this is cancel safe so we can poll it once and discard
136                let event = this
137                    .cancel_event
138                    .as_mut()
139                    .expect("cancel event should be set");
140                let inner = std::pin::pin!(event).poll(cx);
141                match inner {
142                    Poll::Ready(_) => {
143                        // clear the token since we only propagate the signal once.
144                        this.clear_cancel_fields();
145                        // operation cancelled. propagate to inner sf ctx.
146                        if let Err(e) = this.cancel_inner_ctx() {
147                            Poll::Ready(Err(e))
148                        } else {
149                            // The cancellation is propagated to sf task,
150                            // the receiver from now on should wait for the
151                            // final result from the sf task. (as we have cleared the token)
152                            // Most likely the task finishes with OperationCancelled error code.
153                            Poll::Pending
154                        }
155                    }
156                    Poll::Pending => Poll::Pending,
157                }
158            }
159            (Poll::Pending, None) => Poll::Pending,
160        }
161    }
162}
163
164// If nobody is waiting for the result, the inner SF operation should be cancelled.
165// We intentionally do not cancel the user-passed token here,
166// as it is user-owned and cancelling it could have unintended side effects.
167impl<T> Drop for FabricReceiver<T> {
168    fn drop(&mut self) {
169        // Note: when the token is already cancelled but the receiver was never polled,
170        // the cancellation signal has not been propagated to the inner SF ctx,
171        // because propagation only happens during poll.
172        // In this case we skip cancel_inner_ctx; the SF operation will be left
173        // in the background and eventually finish on its own.
174        if let Some(t) = &self.token
175            && !t.is_cancelled()
176            && let Err(_e) = self.cancel_inner_ctx()
177        {
178            #[cfg(feature = "tracing")]
179            tracing::debug!("FabricReceiver::drop: cancel_inner_ctx failed: {_e}");
180        }
181    }
182}
183
184pub struct FabricSender<T> {
185    tx: Sender<T>,
186}
187
188impl<T> FabricSender<T> {
189    fn new(tx: Sender<T>) -> FabricSender<T> {
190        FabricSender { tx }
191    }
192
193    pub fn send(self, data: T) {
194        // Ignore send error: receiver may have been dropped (e.g. tokio::select! cancellation).
195        if self.tx.send(data).is_err() {
196            #[cfg(feature = "tracing")]
197            tracing::debug!("FabricSender::send: receiver already dropped, ignoring send error");
198        }
199    }
200}
201
202/// Creates a fabric oneshot channel.
203/// Operation can be cancelled by cancelling the token.
204pub fn oneshot_channel<T>(token: Option<BoxedCancelToken>) -> (FabricSender<T>, FabricReceiver<T>) {
205    let (tx, rx) = oneshot::channel::<T>();
206    (FabricSender::new(tx), FabricReceiver::new(rx, token))
207}
208
209#[cfg(test)]
210mod test {
211
212    use crate::{
213        ErrorCode,
214        sync::{SimpleCancelToken, oneshot_channel},
215    };
216
217    /// Test various cancellation cases for the channel used
218    /// to send data in proxy layer.
219    #[tokio::test]
220    async fn test_channel() {
221        // success send
222        {
223            let (tx, rx) = oneshot_channel::<bool>(Some(SimpleCancelToken::new_boxed()));
224            tx.send(true);
225            assert!(rx.await.unwrap());
226        }
227        // receiver cancelled after send, still received the result.
228        {
229            let token = SimpleCancelToken::new_boxed();
230            let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
231            tx.send(true);
232            token.cancel();
233            assert!(rx.await.unwrap());
234        }
235        // receiver cancelled before send, still received the result.
236        {
237            let token = SimpleCancelToken::new_boxed();
238            let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
239            token.cancel();
240            tx.send(true);
241            assert!(rx.await.unwrap(),);
242        }
243        // receiver cancelled and droped, send is no op
244        {
245            let token = SimpleCancelToken::new_boxed();
246            let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
247            token.cancel();
248            std::mem::drop(rx);
249            tx.send(true);
250        }
251        // receiver cancelled and sender dropped. receiver get error
252        {
253            let token = SimpleCancelToken::new_boxed();
254            let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
255            token.cancel();
256            std::mem::drop(tx);
257            assert_eq!(rx.await.unwrap_err(), ErrorCode::E_ABORT.into());
258        }
259    }
260}