mssf_core/sync/bridge_context.rs
1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation. All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{cell::Cell, future::Future};
7
8use crate::{
9 error::ErrorCode,
10 runtime::executor::{BoxedCancelToken, Executor},
11 sync::SimpleCancelToken,
12};
13use mssf_com::FabricCommon::{
14 IFabricAsyncOperationCallback, IFabricAsyncOperationContext, IFabricAsyncOperationContext_Impl,
15};
16use windows_core::{AsImpl, implement};
17
18/// Async operation context for bridging rust code into SF COM api that supports cancellation.
19#[implement(IFabricAsyncOperationContext)]
20pub struct BridgeContext<T>
21where
22 T: 'static,
23{
24 /// The task result. Initially it is None.
25 /// If the task panics, the error is propagated here.
26 content: Cell<Option<crate::Result<T>>>,
27 /// Indicates the async operation has completed or not.
28 /// This is a memory barrier for making the content available
29 /// from writer thread to the reader thread. It is needed because
30 /// in SF COM API, the caller can call Begin operation, poll on this
31 /// status until complete, and End operation without barriers.
32 is_completed: std::sync::atomic::AtomicBool,
33 /// mssf never completes async operations synchronously.
34 /// This is always false.
35 is_completed_synchronously: bool,
36 callback: IFabricAsyncOperationCallback,
37 token: BoxedCancelToken,
38}
39
40impl<T> BridgeContext<T>
41where
42 T: Send,
43{
44 fn new(callback: IFabricAsyncOperationCallback, token: BoxedCancelToken) -> Self {
45 Self {
46 content: Cell::new(None),
47 is_completed: std::sync::atomic::AtomicBool::new(false),
48 is_completed_synchronously: false,
49 callback,
50 token,
51 }
52 }
53
54 /// Creates the context from callback, and returns a cancellation token that
55 /// can be used in rust code, and the cancellation token is hooked into self,
56 /// where Cancel() api cancels the operation.
57 pub fn make(
58 callback: windows_core::Ref<IFabricAsyncOperationCallback>,
59 ) -> (Self, BoxedCancelToken) {
60 let token = SimpleCancelToken::new_boxed();
61 let ctx = Self::new(callback.unwrap().clone(), token.clone());
62 (ctx, token)
63 }
64
65 /// Spawns the future on rt.
66 /// Returns a context that can be returned to SF runtime.
67 /// This is intended to be used in SF Begin COM api, where
68 /// rust code is spawned in background and the context is returned
69 /// to caller.
70 /// If the future panics, an error is set in the resulting content,
71 /// caller will still get callback and receive an error in the End api.
72 /// This api is in some sense unsafe, because the developer needs to ensure
73 /// the following:
74 /// * return type of the future needs to match SF COM api end return type.
75 pub fn spawn<F>(
76 self,
77 rt: &impl Executor,
78 future: F,
79 ) -> crate::WinResult<IFabricAsyncOperationContext>
80 where
81 F: Future<Output = T> + Send + 'static,
82 {
83 let self_cp: IFabricAsyncOperationContext = self.into();
84 let self_cp2 = self_cp.clone();
85 let rt_cp = rt.clone();
86 let task = async move {
87 // Run user code in a task and wait on its status.
88 // If user code panics we propagate the error back to SF.
89 let (tx, rx) = crate::sync::channel::oneshot::channel();
90 rt_cp.spawn(async move {
91 let res = future.await;
92 let _ = tx.send(res);
93 });
94 // The sender should never drop so if it fails the user code must panicked.
95 let task_res = rx
96 .await
97 .inspect_err(|_e| {
98 #[cfg(feature = "tracing")]
99 tracing::error!("BridgeContext: background task failed: {_e}");
100 })
101 .map_err(|_| ErrorCode::E_UNEXPECTED.into());
102
103 // TODO: maybe it is good to report health to SF here the same way that sf dotnet app works.
104
105 // We trust the code in mssf here to not panic, or we have bigger problem (memory corruption etc.).
106 let self_impl: &BridgeContext<T> = unsafe { self_cp.as_impl() };
107 self_impl.set_content(task_res);
108 let cb = unsafe { self_cp.Callback().unwrap() };
109
110 // We move the callback invocation off of the tokio I/O thread as they take locks
111 // and may block.
112 rt_cp.spawn_blocking(move || {
113 unsafe { cb.Invoke(&self_cp) };
114 })
115 };
116 /// Propagate the span so that the executor has the right trace.
117 /// The trace would likely have BeginXXX as the function where spawn()
118 /// is called.
119 #[cfg(feature = "tracing")]
120 use tracing::Instrument;
121 #[cfg(feature = "tracing")]
122 let task = task.in_current_span();
123 rt.spawn(task);
124 Ok(self_cp2)
125 }
126
127 /// Get the result from the context from the SF End COM api.
128 /// This api is in some sense unsafe, because the developer needs to ensure
129 /// the following:
130 /// * context impl type is `BridgeContext3`, and the T matches the SF end api
131 /// return type.
132 ///
133 /// Note that if T is of Result<ICOM> type, the current function return type is
134 /// Result<Result<ICOM>>, so unwrap is needed.
135 pub fn result(context: windows_core::Ref<IFabricAsyncOperationContext>) -> crate::Result<T> {
136 let self_impl: &BridgeContext<T> = unsafe { context.unwrap().as_impl() };
137 self_impl.consume_content()
138 }
139
140 /// Set the content for the ctx.
141 /// Marks the ctx as completed.
142 fn set_content(&self, content: crate::Result<T>) {
143 let prev = self.content.replace(Some(content));
144 assert!(prev.is_none());
145 self.set_complete();
146 }
147
148 /// Consumes the content set by set_content().
149 /// can only be called once after set content.
150 fn consume_content(&self) -> crate::Result<T> {
151 match self.check_complete() {
152 true => self.content.take().expect("content is consumed twice."),
153 false => {
154 if self.token.is_cancelled() {
155 Err(ErrorCode::E_ABORT.into())
156 } else {
157 Err(ErrorCode::FABRIC_E_OPERATION_NOT_COMPLETE.into())
158 }
159 }
160 }
161 }
162
163 /// Set the ctx as completed. Requires the ctx content to be set. Makes
164 /// the content available for access from other threads using barrier.
165 fn set_complete(&self) {
166 self.is_completed
167 .store(true, std::sync::atomic::Ordering::Release);
168 }
169
170 /// Checks ctx is completed.
171 /// Makes sure content sets by other threads is visible from this thread.
172 fn check_complete(&self) -> bool {
173 self.is_completed.load(std::sync::atomic::Ordering::Acquire)
174 }
175}
176
177impl<T> IFabricAsyncOperationContext_Impl for BridgeContext_Impl<T> {
178 fn IsCompleted(&self) -> bool {
179 self.is_completed.load(std::sync::atomic::Ordering::Relaxed)
180 }
181
182 // This always returns false because we defer all tasks in the background executuor.
183 fn CompletedSynchronously(&self) -> bool {
184 self.is_completed_synchronously
185 }
186
187 fn Callback(&self) -> crate::WinResult<IFabricAsyncOperationCallback> {
188 let cp = self.callback.clone();
189 Ok(cp)
190 }
191
192 fn Cancel(&self) -> crate::WinResult<()> {
193 self.token.cancel();
194 Ok(())
195 }
196}