mssf_core/sync/bridge_context.rs
1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation. All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{cell::Cell, future::Future};
7
8use crate::{
9 error::ErrorCode,
10 runtime::executor::{BoxedCancelToken, Executor},
11 sync::SimpleCancelToken,
12};
13use mssf_com::FabricCommon::{
14 IFabricAsyncOperationCallback, IFabricAsyncOperationContext, IFabricAsyncOperationContext_Impl,
15};
16use windows_core::{AsImpl, implement};
17
18/// Async operation context for bridging rust code into SF COM api that supports cancellation.
19#[implement(IFabricAsyncOperationContext)]
20pub struct BridgeContext<T>
21where
22 T: 'static,
23{
24 /// The task result. Initially it is None.
25 /// If the task panics, the error is propagated here.
26 content: Cell<Option<crate::Result<T>>>,
27 /// Indicates the async operation has completed or not.
28 /// This is a memory barrier for making the content available
29 /// from writer thread to the reader thread. It is needed because
30 /// in SF COM API, the caller can call Begin operation, poll on this
31 /// status until complete, and End operation without barriers.
32 is_completed: std::sync::atomic::AtomicBool,
33 /// mssf never completes async operations synchronously.
34 /// This is always false.
35 is_completed_synchronously: bool,
36 callback: IFabricAsyncOperationCallback,
37 token: BoxedCancelToken,
38}
39
40impl<T> BridgeContext<T>
41where
42 T: Send,
43{
44 fn new(callback: IFabricAsyncOperationCallback, token: BoxedCancelToken) -> Self {
45 Self {
46 content: Cell::new(None),
47 is_completed: std::sync::atomic::AtomicBool::new(false),
48 is_completed_synchronously: false,
49 callback,
50 token,
51 }
52 }
53
54 /// Creates the context from callback, and returns a cancellation token that
55 /// can be used in rust code, and the cancellation token is hooked into self,
56 /// where Cancel() api cancels the operation.
57 pub fn make(
58 callback: windows_core::Ref<IFabricAsyncOperationCallback>,
59 ) -> (Self, BoxedCancelToken) {
60 let token = SimpleCancelToken::new_boxed();
61 let ctx = Self::new(callback.unwrap().clone(), token.clone());
62 (ctx, token)
63 }
64
65 /// Spawns the future on rt.
66 /// Returns a context that can be returned to SF runtime.
67 /// This is intended to be used in SF Begin COM api, where
68 /// rust code is spawned in background and the context is returned
69 /// to caller.
70 /// If the future panics, an error is set in the resulting content,
71 /// caller will still get callback and receive an error in the End api.
72 /// This api is in some sense unsafe, because the developer needs to ensure
73 /// the following:
74 /// * return type of the future needs to match SF COM api end return type.
75 pub fn spawn<F>(
76 self,
77 rt: &impl Executor,
78 future: F,
79 ) -> crate::WinResult<IFabricAsyncOperationContext>
80 where
81 F: Future<Output = T> + Send + 'static,
82 {
83 let self_cp: IFabricAsyncOperationContext = self.into();
84 let self_cp2 = self_cp.clone();
85 let rt_cp = rt.clone();
86 let task = async move {
87 // Run user code in a task and wait on its status.
88 // If user code panics we propagate the error back to SF.
89 let (tx, rx) = crate::sync::channel::oneshot::channel();
90 rt_cp.spawn(async move {
91 let res = future.await;
92 let _ = tx.send(res);
93 });
94 // The sender should never drop so if it fails the user code must panicked.
95 let task_res = rx
96 .await
97 .inspect_err(|_e| {
98 #[cfg(feature = "tracing")]
99 tracing::error!("BridgeContext: background task failed: {_e}");
100 })
101 .map_err(|_| ErrorCode::E_UNEXPECTED.into());
102
103 // TODO: maybe it is good to report health to SF here the same way that sf dotnet app works.
104
105 // We trust the code in mssf here to not panic, or we have bigger problem (memory corruption etc.).
106 let self_impl: &BridgeContext<T> = unsafe { self_cp.as_impl() };
107 self_impl.set_content(task_res);
108 let cb = unsafe { self_cp.Callback().unwrap() };
109 unsafe { cb.Invoke(&self_cp) };
110 };
111 /// Propagate the span so that the executor has the right trace.
112 /// The trace would likely have BeginXXX as the function where spawn()
113 /// is called.
114 #[cfg(feature = "tracing")]
115 use tracing::Instrument;
116 #[cfg(feature = "tracing")]
117 let task = task.in_current_span();
118 rt.spawn(task);
119 Ok(self_cp2)
120 }
121
122 /// Get the result from the context from the SF End COM api.
123 /// This api is in some sense unsafe, because the developer needs to ensure
124 /// the following:
125 /// * context impl type is `BridgeContext3`, and the T matches the SF end api
126 /// return type.
127 ///
128 /// Note that if T is of Result<ICOM> type, the current function return type is
129 /// Result<Result<ICOM>>, so unwrap is needed.
130 pub fn result(context: windows_core::Ref<IFabricAsyncOperationContext>) -> crate::Result<T> {
131 let self_impl: &BridgeContext<T> = unsafe { context.unwrap().as_impl() };
132 self_impl.consume_content()
133 }
134
135 /// Set the content for the ctx.
136 /// Marks the ctx as completed.
137 fn set_content(&self, content: crate::Result<T>) {
138 let prev = self.content.replace(Some(content));
139 assert!(prev.is_none());
140 self.set_complete();
141 }
142
143 /// Consumes the content set by set_content().
144 /// can only be called once after set content.
145 fn consume_content(&self) -> crate::Result<T> {
146 match self.check_complete() {
147 true => self.content.take().expect("content is consumed twice."),
148 false => {
149 if self.token.is_cancelled() {
150 Err(ErrorCode::E_ABORT.into())
151 } else {
152 Err(ErrorCode::FABRIC_E_OPERATION_NOT_COMPLETE.into())
153 }
154 }
155 }
156 }
157
158 /// Set the ctx as completed. Requires the ctx content to be set. Makes
159 /// the content available for access from other threads using barrier.
160 fn set_complete(&self) {
161 self.is_completed
162 .store(true, std::sync::atomic::Ordering::Release);
163 }
164
165 /// Checks ctx is completed.
166 /// Makes sure content sets by other threads is visible from this thread.
167 fn check_complete(&self) -> bool {
168 self.is_completed.load(std::sync::atomic::Ordering::Acquire)
169 }
170}
171
172impl<T> IFabricAsyncOperationContext_Impl for BridgeContext_Impl<T> {
173 fn IsCompleted(&self) -> bool {
174 self.is_completed.load(std::sync::atomic::Ordering::Relaxed)
175 }
176
177 // This always returns false because we defer all tasks in the background executuor.
178 fn CompletedSynchronously(&self) -> bool {
179 self.is_completed_synchronously
180 }
181
182 fn Callback(&self) -> crate::WinResult<IFabricAsyncOperationCallback> {
183 let cp = self.callback.clone();
184 Ok(cp)
185 }
186
187 fn Cancel(&self) -> crate::WinResult<()> {
188 self.token.cancel();
189 Ok(())
190 }
191}