use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use mssf_com::FabricCommon::IFabricAsyncOperationContext;
use crate::{
ErrorCode,
runtime::executor::{BoxedCancelToken, EventFuture},
};
pub use futures_channel::oneshot::{self, Receiver, Sender};
pub struct FabricReceiver<T> {
rx: Receiver<T>,
token: Option<BoxedCancelToken>,
cancel_event: Option<Pin<Box<dyn EventFuture + 'static>>>,
ctx: Option<IFabricAsyncOperationContext>,
}
impl<T> FabricReceiver<T> {
fn new(rx: Receiver<T>, token: Option<BoxedCancelToken>) -> FabricReceiver<T> {
FabricReceiver {
rx,
cancel_event: token.as_ref().map(|t| t.wait()),
token,
ctx: None,
}
}
pub(crate) fn set_ctx(&mut self, ctx: IFabricAsyncOperationContext) {
let prev = self.ctx.replace(ctx);
assert!(prev.is_none());
}
fn cancel_inner_ctx(&mut self) -> crate::WinResult<()> {
if let Some(ctx) = &self.ctx {
if let Err(e) = unsafe { ctx.Cancel() } {
return Err(e);
} else {
self.ctx.take();
}
} else {
}
Ok(())
}
fn clear_cancel_fields(&mut self) {
self.token.take();
self.cancel_event.take();
}
}
impl<T> Future for FabricReceiver<T> {
type Output = crate::WinResult<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let inner = <Receiver<T> as Future>::poll(Pin::new(&mut this.rx), cx);
match (inner, this.token.as_ref()) {
(Poll::Ready(Ok(data)), _) => {
this.clear_cancel_fields();
Poll::Ready(Ok(data))
}
(Poll::Ready(Err(_)), Some(t)) => {
if t.is_cancelled() {
this.clear_cancel_fields();
if let Err(e) = this.cancel_inner_ctx() {
Poll::Ready(Err(e))
} else {
Poll::Ready(Err(ErrorCode::E_ABORT.into()))
}
} else {
panic!("sender dropped without sending")
}
}
(Poll::Ready(Err(_)), None) => {
panic!("sender dropped without sending")
}
(Poll::Pending, Some(_)) => {
let event = this
.cancel_event
.as_mut()
.expect("cancel event should be set");
let inner = std::pin::pin!(event).poll(cx);
match inner {
Poll::Ready(_) => {
this.clear_cancel_fields();
if let Err(e) = this.cancel_inner_ctx() {
Poll::Ready(Err(e))
} else {
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
(Poll::Pending, None) => Poll::Pending,
}
}
}
impl<T> Drop for FabricReceiver<T> {
fn drop(&mut self) {
if let Some(t) = &self.token
&& !t.is_cancelled()
&& let Err(_e) = self.cancel_inner_ctx()
{
#[cfg(feature = "tracing")]
tracing::debug!("FabricReceiver::drop: cancel_inner_ctx failed: {_e}");
}
}
}
pub struct FabricSender<T> {
tx: Sender<T>,
}
impl<T> FabricSender<T> {
fn new(tx: Sender<T>) -> FabricSender<T> {
FabricSender { tx }
}
pub fn send(self, data: T) {
if self.tx.send(data).is_err() {
#[cfg(feature = "tracing")]
tracing::debug!("FabricSender::send: receiver already dropped, ignoring send error");
}
}
}
pub fn oneshot_channel<T>(token: Option<BoxedCancelToken>) -> (FabricSender<T>, FabricReceiver<T>) {
let (tx, rx) = oneshot::channel::<T>();
(FabricSender::new(tx), FabricReceiver::new(rx, token))
}
#[cfg(test)]
mod test {
use crate::{
ErrorCode,
sync::{SimpleCancelToken, oneshot_channel},
};
#[tokio::test]
async fn test_channel() {
{
let (tx, rx) = oneshot_channel::<bool>(Some(SimpleCancelToken::new_boxed()));
tx.send(true);
assert!(rx.await.unwrap());
}
{
let token = SimpleCancelToken::new_boxed();
let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
tx.send(true);
token.cancel();
assert!(rx.await.unwrap());
}
{
let token = SimpleCancelToken::new_boxed();
let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
token.cancel();
tx.send(true);
assert!(rx.await.unwrap(),);
}
{
let token = SimpleCancelToken::new_boxed();
let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
token.cancel();
std::mem::drop(rx);
tx.send(true);
}
{
let token = SimpleCancelToken::new_boxed();
let (tx, rx) = oneshot_channel::<bool>(Some(token.clone()));
token.cancel();
std::mem::drop(tx);
assert_eq!(rx.await.unwrap_err(), ErrorCode::E_ABORT.into());
}
}
}