use super::{
exchange::{Recv, Send},
Session,
};
use futures::{Future, FutureExt, Stream};
use std::{
marker,
pin::Pin,
task::{Context, Poll},
};
#[must_use]
pub struct Dequeue<T, S: Session = ()> {
deq: Recv<Queue<T, S>>,
}
#[must_use]
pub struct Enqueue<T, S: Session = ()> {
enq: Send<Queue<T, S::Dual>>,
}
pub enum Queue<T, S: Session = ()> {
Item(T, Dequeue<T, S>),
Closed(S),
}
impl<T, S: Session> Session for Dequeue<T, S>
where
T: marker::Send + 'static,
{
type Dual = Enqueue<T, S::Dual>;
fn fork_sync(f: impl FnOnce(Self::Dual)) -> Self {
Self {
deq: Recv::fork_sync(|send| f(Enqueue { enq: send })),
}
}
fn link(self, dual: Self::Dual) {
self.deq.link(dual.enq)
}
}
impl<T, S: Session> Session for Enqueue<T, S>
where
T: marker::Send + 'static,
{
type Dual = Dequeue<T, S::Dual>;
fn fork_sync(f: impl FnOnce(Self::Dual)) -> Self {
Self {
enq: Send::fork_sync(|recv| f(Dequeue { deq: recv })),
}
}
fn link(self, dual: Self::Dual) {
self.enq.link(dual.deq)
}
}
impl<T, S: Session> Dequeue<T, S>
where
T: marker::Send + 'static,
{
#[must_use]
pub async fn pop(self) -> Queue<T, S> {
self.deq.recv1().await
}
#[must_use]
pub async fn fold<A, F>(mut self, init: A, mut f: impl FnMut(A, T) -> F) -> (A, S)
where
F: Future<Output = A>,
{
let mut accum = init;
loop {
match self.pop().await {
Queue::Item(item, rest) => {
accum = f(accum, item).await;
self = rest;
}
Queue::Closed(session) => return (accum, session),
}
}
}
#[must_use]
pub async fn for_each<F>(self, mut f: impl FnMut(T) -> F) -> S
where
F: Future<Output = ()>,
{
self.fold((), |(), item| f(item)).await.1
}
#[must_use]
pub fn into_stream(self) -> DequeueStream<T, S> {
DequeueStream {
future: Box::pin(self.pop()),
}
}
}
impl<T> Dequeue<T, ()>
where
T: marker::Send + 'static,
{
pub async fn fold1<A, F>(self, init: A, f: impl FnMut(A, T) -> F) -> A
where
F: Future<Output = A>,
{
self.fold(init, f).await.0
}
pub async fn for_each1<F>(self, f: impl FnMut(T) -> F)
where
F: Future<Output = ()>,
{
self.for_each(f).await
}
#[must_use]
pub fn into_stream1(self) -> DequeueStream1<T> {
DequeueStream1 {
future: Box::pin(self.pop()),
}
}
}
impl<T, S: Session> Enqueue<T, S>
where
T: marker::Send + 'static,
{
#[must_use]
pub fn close(self) -> S {
S::fork_sync(|dual| self.enq.send1(Queue::Closed(dual)))
}
pub fn push(self, item: T) -> Self {
Self::fork_sync(|dual| self.enq.send1(Queue::Item(item, dual)))
}
}
impl<T> Enqueue<T, ()>
where
T: marker::Send + 'static,
{
pub fn close1(self) {
self.close()
}
}
pub struct DequeueStream<T, S: Session> {
future: Pin<Box<dyn Future<Output = Queue<T, S>> + marker::Send + 'static>>,
}
pub enum Next<T, S: Session> {
Item(T),
Closed(S),
}
impl<T, S: Session> Stream for DequeueStream<T, S>
where
T: marker::Send + 'static,
{
type Item = Next<T, S>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.future.poll_unpin(cx) {
Poll::Ready(Queue::Item(value, next)) => {
self.future = Box::pin(next.pop());
Poll::Ready(Some(Next::Item(value)))
}
Poll::Ready(Queue::Closed(session)) => Poll::Ready(Some(Next::Closed(session))),
Poll::Pending => Poll::Pending,
}
}
}
pub struct DequeueStream1<T> {
future: Pin<Box<dyn Future<Output = Queue<T, ()>> + marker::Send + 'static>>,
}
impl<T> Stream for DequeueStream1<T>
where
T: marker::Send + 'static,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.future.poll_unpin(cx) {
Poll::Ready(Queue::Item(value, next)) => {
self.future = Box::pin(next.pop());
Poll::Ready(Some(value))
}
Poll::Ready(Queue::Closed(())) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}