mssf_core/sync/
bridge_context.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{cell::Cell, future::Future};
7
8use crate::{
9    error::ErrorCode,
10    runtime::executor::{BoxedCancelToken, Executor},
11    sync::SimpleCancelToken,
12};
13use mssf_com::FabricCommon::{
14    IFabricAsyncOperationCallback, IFabricAsyncOperationContext, IFabricAsyncOperationContext_Impl,
15};
16use windows_core::{AsImpl, implement};
17
18/// Async operation context for bridging rust code into SF COM api that supports cancellation.
19#[implement(IFabricAsyncOperationContext)]
20pub struct BridgeContext<T>
21where
22    T: 'static,
23{
24    /// The task result. Initially it is None.
25    /// If the task panics, the error is propagated here.
26    content: Cell<Option<crate::Result<T>>>,
27    /// Indicates the async operation has completed or not.
28    /// This is a memory barrier for making the content available
29    /// from writer thread to the reader thread. It is needed because
30    /// in SF COM API, the caller can call Begin operation, poll on this
31    /// status until complete, and End operation without barriers.
32    is_completed: std::sync::atomic::AtomicBool,
33    /// mssf never completes async operations synchronously.
34    /// This is always false.
35    is_completed_synchronously: bool,
36    callback: IFabricAsyncOperationCallback,
37    token: BoxedCancelToken,
38}
39
40impl<T> BridgeContext<T>
41where
42    T: Send,
43{
44    fn new(callback: IFabricAsyncOperationCallback, token: BoxedCancelToken) -> Self {
45        Self {
46            content: Cell::new(None),
47            is_completed: std::sync::atomic::AtomicBool::new(false),
48            is_completed_synchronously: false,
49            callback,
50            token,
51        }
52    }
53
54    /// Creates the context from callback, and returns a cancellation token that
55    /// can be used in rust code, and the cancellation token is hooked into self,
56    /// where Cancel() api cancels the operation.
57    pub fn make(
58        callback: windows_core::Ref<IFabricAsyncOperationCallback>,
59    ) -> (Self, BoxedCancelToken) {
60        let token = SimpleCancelToken::new_boxed();
61        let ctx = Self::new(callback.unwrap().clone(), token.clone());
62        (ctx, token)
63    }
64
65    /// Spawns the future on rt.
66    /// Returns a context that can be returned to SF runtime.
67    /// This is intended to be used in SF Begin COM api, where
68    /// rust code is spawned in background and the context is returned
69    /// to caller.
70    /// If the future panics, an error is set in the resulting content,
71    /// caller will still get callback and receive an error in the End api.
72    /// This api is in some sense unsafe, because the developer needs to ensure
73    /// the following:
74    /// * return type of the future needs to match SF COM api end return type.
75    pub fn spawn<F>(
76        self,
77        rt: &impl Executor,
78        future: F,
79    ) -> crate::WinResult<IFabricAsyncOperationContext>
80    where
81        F: Future<Output = T> + Send + 'static,
82    {
83        let self_cp: IFabricAsyncOperationContext = self.into();
84        let self_cp2 = self_cp.clone();
85        let rt_cp = rt.clone();
86        let task = async move {
87            // Run user code in a task and wait on its status.
88            // If user code panics we propagate the error back to SF.
89            let (tx, rx) = crate::sync::channel::oneshot::channel();
90            rt_cp.spawn(async move {
91                let res = future.await;
92                let _ = tx.send(res);
93            });
94            // The sender should never drop so if it fails the user code must panicked.
95            let task_res = rx
96                .await
97                .inspect_err(|_e| {
98                    #[cfg(feature = "tracing")]
99                    tracing::error!("BridgeContext: background task failed: {_e}");
100                })
101                .map_err(|_| ErrorCode::E_UNEXPECTED.into());
102
103            // TODO: maybe it is good to report health to SF here the same way that sf dotnet app works.
104
105            // We trust the code in mssf here to not panic, or we have bigger problem (memory corruption etc.).
106            let self_impl: &BridgeContext<T> = unsafe { self_cp.as_impl() };
107            self_impl.set_content(task_res);
108            let cb = unsafe { self_cp.Callback().unwrap() };
109            unsafe { cb.Invoke(&self_cp) };
110        };
111        /// Propagate the span so that the executor has the right trace.
112        /// The trace would likely have BeginXXX as the function where spawn()
113        /// is called.
114        #[cfg(feature = "tracing")]
115        use tracing::Instrument;
116        #[cfg(feature = "tracing")]
117        let task = task.in_current_span();
118        rt.spawn(task);
119        Ok(self_cp2)
120    }
121
122    /// Get the result from the context from the SF End COM api.
123    /// This api is in some sense unsafe, because the developer needs to ensure
124    /// the following:
125    /// * context impl type is `BridgeContext3`, and the T matches the SF end api
126    ///   return type.
127    ///
128    /// Note that if T is of Result<ICOM> type, the current function return type is
129    /// Result<Result<ICOM>>, so unwrap is needed.
130    pub fn result(context: windows_core::Ref<IFabricAsyncOperationContext>) -> crate::Result<T> {
131        let self_impl: &BridgeContext<T> = unsafe { context.unwrap().as_impl() };
132        self_impl.consume_content()
133    }
134
135    /// Set the content for the ctx.
136    /// Marks the ctx as completed.
137    fn set_content(&self, content: crate::Result<T>) {
138        let prev = self.content.replace(Some(content));
139        assert!(prev.is_none());
140        self.set_complete();
141    }
142
143    /// Consumes the content set by set_content().
144    /// can only be called once after set content.
145    fn consume_content(&self) -> crate::Result<T> {
146        match self.check_complete() {
147            true => self.content.take().expect("content is consumed twice."),
148            false => {
149                if self.token.is_cancelled() {
150                    Err(ErrorCode::E_ABORT.into())
151                } else {
152                    Err(ErrorCode::FABRIC_E_OPERATION_NOT_COMPLETE.into())
153                }
154            }
155        }
156    }
157
158    /// Set the ctx as completed. Requires the ctx content to be set. Makes
159    /// the content available for access from other threads using barrier.
160    fn set_complete(&self) {
161        self.is_completed
162            .store(true, std::sync::atomic::Ordering::Release);
163    }
164
165    /// Checks ctx is completed.
166    /// Makes sure content sets by other threads is visible from this thread.
167    fn check_complete(&self) -> bool {
168        self.is_completed.load(std::sync::atomic::Ordering::Acquire)
169    }
170}
171
172impl<T> IFabricAsyncOperationContext_Impl for BridgeContext_Impl<T> {
173    fn IsCompleted(&self) -> bool {
174        self.is_completed.load(std::sync::atomic::Ordering::Relaxed)
175    }
176
177    // This always returns false because we defer all tasks in the background executuor.
178    fn CompletedSynchronously(&self) -> bool {
179        self.is_completed_synchronously
180    }
181
182    fn Callback(&self) -> crate::WinResult<IFabricAsyncOperationCallback> {
183        let cp = self.callback.clone();
184        Ok(cp)
185    }
186
187    fn Cancel(&self) -> crate::WinResult<()> {
188        self.token.cancel();
189        Ok(())
190    }
191}