mssf_core/sync/
channel.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{
7    future::Future,
8    pin::Pin,
9    task::{Context, Poll},
10};
11
12use mssf_com::FabricCommon::IFabricAsyncOperationContext;
13
14use crate::{
15    ErrorCode,
16    runtime::executor::{BoxedCancelToken, EventFuture},
17};
18
19pub use futures_channel::oneshot::{self, Receiver, Sender};
20
21// Token that wraps oneshot receiver.
22// The future recieve does not have error. This is designed for the use
23// case where SF guarantees that sender will be called.
24pub struct FabricReceiver<T> {
25    rx: Receiver<T>,
26    token: Option<BoxedCancelToken>,
27    // event for cancelling
28    cancel_event: Option<Pin<Box<dyn EventFuture + 'static>>>,
29    // saved ctx from SF Begin COM api for cancalling.
30    ctx: Option<IFabricAsyncOperationContext>,
31}
32
33impl<T> FabricReceiver<T> {
34    fn new(rx: Receiver<T>, token: Option<BoxedCancelToken>) -> FabricReceiver<T> {
35        FabricReceiver {
36            rx,
37            cancel_event: token.as_ref().map(|t| t.wait()),
38            token,
39            ctx: None,
40        }
41    }
42
43    // This does not handle cancel. It is commented out because it is not used.
44    // pub(crate) fn blocking_recv(self) -> crate::Result<T> {
45    //     if let Some(t) = self.token {
46    //         if t.is_cancelled() {
47    //             return Err(ErrorCode::E_ABORT.into());
48    //         }
49    //     }
50    //     // sender must send stuff so that there is not error.
51    //     Ok(self.rx.blocking_recv().unwrap())
52    // }
53
54    // Set the SF ctx to hook up cancellation.
55    pub(crate) fn set_ctx(&mut self, ctx: IFabricAsyncOperationContext) {
56        let prev = self.ctx.replace(ctx);
57        assert!(prev.is_none());
58    }
59
60    // Cancels the inner SF operation if exists, and reset the ctx.
61    fn cancel_inner_ctx(&mut self) -> crate::WinResult<()> {
62        if let Some(ctx) = &self.ctx {
63            if let Err(e) = unsafe { ctx.Cancel() } {
64                // fail to cancel inner operation.
65                return Err(e);
66            } else {
67                // clear the sf ctx to avoid cancel twice.
68                self.ctx.take();
69            }
70        } else {
71            // The inner ctx can be empty after we already cancelled the inner ctx.
72            // This can happen because we cancel during polling, and polling can
73            // happen many times.
74        }
75        Ok(())
76    }
77}
78
79// Returns error if cancelled.
80// If there is an inner SF ctx, cancellation signal will
81// trigger cancellation of the ctx.
82impl<T> Future for FabricReceiver<T> {
83    // The error code should be OperationCanceled, unless cancellation
84    // of SF ctx returns other errors.
85    // (TODO: observe other error code from SF, maybe some code should be ignored).
86    type Output = crate::WinResult<T>;
87    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
88        // Poll the receiver first, if ready then return the output,
89        // else poll the cancellation token, if cancelled propergate the cancel to SF ctx,
90        // and return pending. SF task should continue finish execute in the background,
91        // and finish with error code OperationCancelled
92        // and send the error code from FabricSender.
93        //
94        // There can be the case that cancellation wakes the waker, but receiver
95        // then got the result. The next poll will return received output rather
96        // than cancelled error.
97
98        let this = self.get_mut();
99
100        // Try to receive the value from the sender
101        let inner = <Receiver<T> as Future>::poll(Pin::new(&mut this.rx), cx);
102        match (inner, this.token.as_ref()) {
103            (Poll::Ready(Ok(data)), _) => Poll::Ready(Ok(data)),
104            (Poll::Ready(Err(_)), Some(t)) => {
105                if t.is_cancelled() {
106                    // clear the token since we only propergate the signal once.
107                    this.token.take();
108                    this.cancel_event.take();
109                    // cancel the SF ctx and clear it.
110                    if let Err(e) = this.cancel_inner_ctx() {
111                        Poll::Ready(Err(e))
112                    } else {
113                        Poll::Ready(Err(ErrorCode::E_ABORT.into()))
114                    }
115                } else {
116                    panic!("sender dropped without sending")
117                }
118            }
119            (Poll::Ready(Err(_)), None) => {
120                panic!("sender dropped without sending")
121            }
122            (Poll::Pending, Some(_)) => {
123                // If the action is canceled we can safely stop and return canceled error.
124                // this is cancel safe so we can poll it once and discard
125                let event = this
126                    .cancel_event
127                    .as_mut()
128                    .expect("cancel event should be set");
129                let inner = std::pin::pin!(event).poll(cx);
130                match inner {
131                    Poll::Ready(_) => {
132                        // clear the token since we only propergate the signal once.
133                        this.cancel_event.take();
134                        this.cancel_event.take();
135                        // operation cancelled. Propergate to inner sf ctx.
136                        if let Err(e) = this.cancel_inner_ctx() {
137                            Poll::Ready(Err(e))
138                        } else {
139                            // The cancellation is propergated to sf task,
140                            // the receiver from now on should wait for the
141                            // final result from the sf task. (as we have cleared the token)
142                            // Most likely the task finishes with OperationCancelled error code.
143                            Poll::Pending
144                        }
145                    }
146                    Poll::Pending => Poll::Pending,
147                }
148            }
149            (Poll::Pending, None) => Poll::Pending,
150        }
151    }
152}
153
154pub struct FabricSender<T> {
155    tx: Sender<T>,
156    token: Option<BoxedCancelToken>,
157}
158
159impl<T> FabricSender<T> {
160    fn new(tx: Sender<T>, token: Option<BoxedCancelToken>) -> FabricSender<T> {
161        FabricSender { tx, token }
162    }
163
164    pub fn send(self, data: T) {
165        let e = self.tx.send(data);
166        if e.is_err() {
167            // In SF use case receiver should not be dropped by user.
168            // If it acctually dropped by user, it is ok to ignore because user
169            // does not want to want the value any more. But too bad SF has done
170            // the work to get the value.
171
172            // receiver should never be dropped if operation is not cancelled.
173            if let Some(t) = self.token {
174                debug_assert!(
175                    t.is_cancelled(),
176                    "task should be cancelled when receiver dropped."
177                );
178            }
179        }
180    }
181}
182
183/// Creates a fabric oneshot channel.
184/// Operation can be cancelled by cancelling the token.
185pub fn oneshot_channel<T>(token: Option<BoxedCancelToken>) -> (FabricSender<T>, FabricReceiver<T>) {
186    let (tx, rx) = oneshot::channel::<T>();
187    (
188        FabricSender::new(tx, token.clone()),
189        FabricReceiver::new(rx, token),
190    )
191}
192
193#[cfg(test)]
194mod test {
195
196    use crate::{
197        ErrorCode,
198        sync::{SimpleCancelToken, oneshot_channel},
199    };
200
201    /// Test various cancellation cases for the channel used
202    /// to send data in proxy layer.
203    #[tokio::test]
204    async fn test_channel() {
205        // success send
206        {
207            let (tx, rx) = oneshot_channel::<bool>(Some(SimpleCancelToken::new_boxed()));
208            tx.send(true);
209            assert!(rx.await.unwrap());
210        }
211        // receiver cancelled after send, still received the result.
212        {
213            let token = SimpleCancelToken::new_boxed();
214            let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
215            tx.send(true);
216            token.cancel();
217            assert!(rx.await.unwrap());
218        }
219        // receiver cancelled before send, still received the result.
220        {
221            let token = SimpleCancelToken::new_boxed();
222            let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
223            token.cancel();
224            tx.send(true);
225            assert!(rx.await.unwrap(),);
226        }
227        // receiver cancelled and droped, send is no op
228        {
229            let token = SimpleCancelToken::new_boxed();
230            let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
231            token.cancel();
232            std::mem::drop(rx);
233            tx.send(true);
234        }
235        // receiver cancelled and sender dropped. receiver get error
236        {
237            let token = SimpleCancelToken::new_boxed();
238            let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
239            token.cancel();
240            std::mem::drop(tx);
241            assert_eq!(rx.await.unwrap_err(), ErrorCode::E_ABORT.into());
242        }
243    }
244}