use std::{cell::Cell, future::Future};
use crate::{
error::ErrorCode,
runtime::executor::{BoxedCancelToken, Executor},
sync::SimpleCancelToken,
};
use mssf_com::FabricCommon::{
IFabricAsyncOperationCallback, IFabricAsyncOperationContext, IFabricAsyncOperationContext_Impl,
};
use windows_core::{AsImpl, implement};
#[implement(IFabricAsyncOperationContext)]
pub struct BridgeContext<T>
where
T: 'static,
{
content: Cell<Option<crate::Result<T>>>,
is_completed: std::sync::atomic::AtomicBool,
is_completed_synchronously: bool,
callback: IFabricAsyncOperationCallback,
token: BoxedCancelToken,
}
impl<T> BridgeContext<T>
where
T: Send,
{
fn new(callback: IFabricAsyncOperationCallback, token: BoxedCancelToken) -> Self {
Self {
content: Cell::new(None),
is_completed: std::sync::atomic::AtomicBool::new(false),
is_completed_synchronously: false,
callback,
token,
}
}
pub fn make(
callback: windows_core::Ref<IFabricAsyncOperationCallback>,
) -> (Self, BoxedCancelToken) {
let token = SimpleCancelToken::new_boxed();
let ctx = Self::new(callback.unwrap().clone(), token.clone());
(ctx, token)
}
pub fn spawn<F>(
self,
rt: &impl Executor,
future: F,
) -> crate::WinResult<IFabricAsyncOperationContext>
where
F: Future<Output = T> + Send + 'static,
{
let self_cp: IFabricAsyncOperationContext = self.into();
let self_cp2 = self_cp.clone();
let rt_cp = rt.clone();
let task = async move {
let (tx, rx) = crate::sync::channel::oneshot::channel();
rt_cp.spawn(async move {
let res = future.await;
let _ = tx.send(res);
});
let task_res = rx
.await
.inspect_err(|_e| {
#[cfg(feature = "tracing")]
tracing::error!("BridgeContext: background task failed: {_e}");
})
.map_err(|_| ErrorCode::E_UNEXPECTED.into());
let self_impl: &BridgeContext<T> = unsafe { self_cp.as_impl() };
self_impl.set_content(task_res);
let cb = unsafe { self_cp.Callback().unwrap() };
rt_cp.spawn_blocking(move || {
unsafe { cb.Invoke(&self_cp) };
})
};
#[cfg(feature = "tracing")]
use tracing::Instrument;
#[cfg(feature = "tracing")]
let task = task.in_current_span();
rt.spawn(task);
Ok(self_cp2)
}
pub fn result(context: windows_core::Ref<IFabricAsyncOperationContext>) -> crate::Result<T> {
let self_impl: &BridgeContext<T> = unsafe { context.unwrap().as_impl() };
self_impl.consume_content()
}
fn set_content(&self, content: crate::Result<T>) {
let prev = self.content.replace(Some(content));
assert!(prev.is_none());
self.set_complete();
}
fn consume_content(&self) -> crate::Result<T> {
match self.check_complete() {
true => self.content.take().expect("content is consumed twice."),
false => {
if self.token.is_cancelled() {
Err(ErrorCode::E_ABORT.into())
} else {
Err(ErrorCode::FABRIC_E_OPERATION_NOT_COMPLETE.into())
}
}
}
}
fn set_complete(&self) {
self.is_completed
.store(true, std::sync::atomic::Ordering::Release);
}
fn check_complete(&self) -> bool {
self.is_completed.load(std::sync::atomic::Ordering::Acquire)
}
}
impl<T> IFabricAsyncOperationContext_Impl for BridgeContext_Impl<T> {
fn IsCompleted(&self) -> bool {
self.is_completed.load(std::sync::atomic::Ordering::Relaxed)
}
fn CompletedSynchronously(&self) -> bool {
self.is_completed_synchronously
}
fn Callback(&self) -> crate::WinResult<IFabricAsyncOperationCallback> {
let cp = self.callback.clone();
Ok(cp)
}
fn Cancel(&self) -> crate::WinResult<()> {
self.token.cancel();
Ok(())
}
}