use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::channel_rt::{PeerRt, RecverRt, SenderRt, SwapResult};
use crate::event_node::EventNode;
use crate::reactor::{EventId, Reactor};
use crate::runtime::Runtime;
const MODTRACE: bool = true;
pub fn channel<'runtime, T, ReactorT: Reactor>(
rt: &'runtime Runtime<ReactorT>,
) -> (Sender<'runtime, T, ReactorT>, Recver<'runtime, T, ReactorT>) {
let channel_id = rt.channels().create();
let sender_rt = rt.channels().sender_rt(channel_id);
let recver_rt = rt.channels().recver_rt(channel_id);
(Sender::new(rt, sender_rt), Recver::new(rt, recver_rt))
}
#[derive(Debug)] pub struct RecvError;
pub struct Sender<'runtime, T, ReactorT: Reactor> {
rt: &'runtime Runtime<ReactorT>,
sender_rt: SenderRt<'runtime>,
temp: PhantomData<T>,
}
impl<'runtime, T, ReactorT: Reactor> Sender<'runtime, T, ReactorT> {
fn new(rt: &'runtime Runtime<ReactorT>, sender_rt: SenderRt<'runtime>) -> Self {
sender_rt.inc_ref();
Self {
rt,
sender_rt,
temp: PhantomData,
}
}
pub async fn send(&mut self, value: T) -> Result<(), T> {
SenderFuture::new(self.rt, self.sender_rt, value).await
}
}
impl<'runtime, T, ReactorT: Reactor> Clone for Sender<'runtime, T, ReactorT> {
fn clone(&self) -> Self {
Self::new(self.rt, self.sender_rt)
}
}
impl<'runtime, T, ReactorT: Reactor> Drop for Sender<'runtime, T, ReactorT> {
fn drop(&mut self) {
self.sender_rt.close();
}
}
pub struct Recver<'runtime, T, ReactorT: Reactor> {
rt: &'runtime Runtime<ReactorT>,
recver_rt: RecverRt<'runtime>,
temp: PhantomData<T>,
}
impl<'runtime, T, ReactorT: Reactor> Recver<'runtime, T, ReactorT> {
fn new(rt: &'runtime Runtime<ReactorT>, recver_rt: RecverRt<'runtime>) -> Self {
Self {
rt,
recver_rt,
temp: PhantomData,
}
}
pub async fn next(&mut self) -> Result<T, RecvError> {
NextFuture::new(self.rt, self.recver_rt).await
}
}
impl<'runtime, T, ReactorT: Reactor> Drop for Recver<'runtime, T, ReactorT> {
fn drop(&mut self) {
self.recver_rt.close();
}
}
#[derive(Debug)]
enum PeerFutureState {
Created,
Exchanging,
Closed,
}
struct SenderFuture<'runtime, T, ReactorT: Reactor> {
rt: &'runtime Runtime<ReactorT>,
event_node: EventNode,
sender_rt: SenderRt<'runtime>,
data: Option<T>,
state: PeerFutureState,
}
impl<'runtime, T, ReactorT: Reactor> SenderFuture<'runtime, T, ReactorT> {
fn new(rt: &'runtime Runtime<ReactorT>, sender_rt: SenderRt<'runtime>, value: T) -> Self {
Self {
rt,
event_node: EventNode::new(),
sender_rt,
data: Some(value),
state: PeerFutureState::Created,
}
}
fn set_state(&mut self, new_state: PeerFutureState) {
modtrace!(
self.rt.tracer(),
"channel_sender_future: {:?} state {:?} -> {:?}",
self.sender_rt.channel_id,
self.state,
new_state
);
self.state = new_state;
}
fn set_state_closed(&mut self, exchange_result: SwapResult) {
let new_state = PeerFutureState::Closed;
modtrace!(
self.rt.tracer(),
"channel_sender_future: {:?} state {:?} -> {:?}, exchange result: {:?}",
self.sender_rt.channel_id,
self.state,
new_state,
exchange_result
);
self.state = new_state;
}
fn transmit(&mut self, event_id: EventId) -> Poll<Result<(), T>> {
self.set_state(PeerFutureState::Exchanging);
self.sender_rt
.pin(event_id, (&mut self.data) as *mut Option<T> as *mut ());
Poll::Pending
}
fn close(&mut self) -> Poll<Result<(), T>> {
if !self.event_node.is_awoken_for(self.rt) {
return Poll::Pending; }
return match unsafe { self.sender_rt.swap::<T>() } {
SwapResult::Done =>
{
self.set_state_closed(SwapResult::Done);
Poll::Ready(Ok(()))
}
SwapResult::Disconnected =>
{
self.set_state_closed(SwapResult::Disconnected);
Poll::Ready(Err(self.data.take().unwrap()))
}
SwapResult::TryLater =>
{
modtrace!(
self.rt.tracer(),
"channel_next_future: {:?} state {:?} exchange result: {:?}",
self.sender_rt.channel_id,
self.state,
SwapResult::TryLater
);
Poll::Pending
}
};
}
}
impl<'runtime, T, ReactorT: Reactor> Future for SenderFuture<'runtime, T, ReactorT> {
type Output = Result<(), T>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
modtrace!(
self.rt.tracer(),
"channel_sender_future: in the poll() {:?}",
self.sender_rt.channel_id
);
let this = unsafe { self.get_unchecked_mut() };
return match this.state {
PeerFutureState::Created => {
let event_id = unsafe { this.event_node.on_pin(ctx) };
this.transmit(event_id) }
PeerFutureState::Exchanging => this.close(),
PeerFutureState::Closed => {
panic!(
"aiur/channel_sender_future: {:?} was polled after completion.",
this.sender_rt.channel_id
);
}
};
}
}
impl<'runtime, T, ReactorT: Reactor> Drop for SenderFuture<'runtime, T, ReactorT> {
fn drop(&mut self) {
if matches!(self.state, PeerFutureState::Exchanging) {
modtrace!(
self.rt.tracer(),
"channel_sender_future: in the drop() {:?} - cancelling",
self.sender_rt.channel_id
);
self.sender_rt.unpin(self.event_node.get_event_id());
let _ = self.event_node.on_cancel(); } else {
modtrace!(
self.rt.tracer(),
"channel_sender_future: in the drop() {:?} no op",
self.sender_rt.channel_id
);
}
}
}
pub struct NextFuture<'runtime, T, ReactorT: Reactor> {
rt: &'runtime Runtime<ReactorT>,
event_node: EventNode,
recver_rt: RecverRt<'runtime>,
state: PeerFutureState,
data: Option<T>,
}
impl<'runtime, T, ReactorT: Reactor> NextFuture<'runtime, T, ReactorT> {
fn new(rt: &'runtime Runtime<ReactorT>, recver_rt: RecverRt<'runtime>) -> Self {
Self {
rt,
event_node: EventNode::new(),
recver_rt,
state: PeerFutureState::Created,
data: None,
}
}
fn set_state(&mut self, new_state: PeerFutureState) {
modtrace!(
self.rt.tracer(),
"channel_next_future: {:?} state {:?} -> {:?}",
self.recver_rt.channel_id,
self.state,
new_state
);
self.state = new_state;
}
fn set_state_closed(&mut self, exchange_result: SwapResult) {
let new_state = PeerFutureState::Closed;
modtrace!(
self.rt.tracer(),
"channel_next_future: {:?} state {:?} -> {:?}, exchange result: {:?}",
self.recver_rt.channel_id,
self.state,
new_state,
exchange_result
);
self.state = new_state;
}
fn transmit(&mut self, event_id: EventId) -> Poll<Result<T, RecvError>> {
self.set_state(PeerFutureState::Exchanging);
self.recver_rt
.pin(event_id, (&mut self.data) as *mut Option<T> as *mut ());
Poll::Pending
}
fn close(&mut self) -> Poll<Result<T, RecvError>> {
if !self.event_node.is_awoken_for(self.rt) {
return Poll::Pending; }
return match unsafe { self.recver_rt.swap::<T>() } {
SwapResult::Done =>
{
self.set_state_closed(SwapResult::Done);
Poll::Ready(Ok(self.data.take().unwrap()))
}
SwapResult::Disconnected =>
{
self.set_state_closed(SwapResult::Disconnected);
Poll::Ready(Err(RecvError))
}
SwapResult::TryLater =>
{
modtrace!(
self.rt.tracer(),
"channel_next_future: {:?} state {:?} exchange result: {:?}",
self.recver_rt.channel_id,
self.state,
SwapResult::TryLater
);
Poll::Pending
}
};
}
}
impl<'runtime, T, ReactorT: Reactor> Drop for NextFuture<'runtime, T, ReactorT> {
fn drop(&mut self) {
if matches!(self.state, PeerFutureState::Exchanging) {
modtrace!(
self.rt.tracer(),
"channel_next_future: in the drop() {:?} cancelling",
self.recver_rt.channel_id
);
self.recver_rt.unpin(self.event_node.get_event_id());
let _ = self.event_node.on_cancel(); } else {
modtrace!(
self.rt.tracer(),
"channel_next_future: in the drop() {:?} no op",
self.recver_rt.channel_id
);
}
}
}
impl<'runtime, T, ReactorT: Reactor> Future for NextFuture<'runtime, T, ReactorT> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
modtrace!(
self.rt.tracer(),
"channel_next_future: in the poll() {:?}",
self.recver_rt.channel_id
);
let this = unsafe { self.get_unchecked_mut() };
return match this.state {
PeerFutureState::Created => {
let event_id = unsafe { this.event_node.on_pin(ctx) };
this.transmit(event_id) }
PeerFutureState::Exchanging => this.close(),
PeerFutureState::Closed => {
panic!(
"aiur/channel_next_future: {:?} was polled after completion.",
this.recver_rt.channel_id
)
}
};
}
}