polyhorn-core 0.4.0

Core types used in Polyhorn.
Documentation
use futures::channel::mpsc::{self, TrySendError};
use futures::{Future, SinkExt, StreamExt};

use crate::{Key, Link, UseAsync, UseReference};

#[macro_export]
macro_rules! use_channel {
    ($manager:expr, $task:expr) => {
        $crate::UseChannel::use_channel($manager, $crate::Key::from($crate::use_id!()), $task)
    };
}

pub struct Sender<T>(mpsc::Sender<T>)
where
    T: Send + 'static;

impl<T> Sender<T>
where
    T: Send + 'static,
{
    pub fn send(&mut self, message: T) -> futures::sink::Send<mpsc::Sender<T>, T> {
        self.0.send(message)
    }

    pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
        self.0.try_send(message)
    }
}

impl<T> Clone for Sender<T>
where
    T: Send + 'static,
{
    fn clone(&self) -> Self {
        Sender(self.0.clone())
    }
}

pub struct Receiver<T>(mpsc::Receiver<T>)
where
    T: Send + 'static;

impl<T> Receiver<T>
where
    T: Send + 'static,
{
    pub fn next(&mut self) -> impl Future<Output = Option<T>> + '_ {
        self.0.next()
    }
}

pub trait UseChannel {
    fn use_channel<T, C, F>(&mut self, key: Key, closure: C) -> Sender<T>
    where
        T: Send + 'static,
        C: FnOnce(Receiver<T>) -> F + 'static,
        F: Future<Output = ()>;
}

impl<M> UseChannel for M
where
    M: UseAsync + UseReference + Link,
{
    fn use_channel<T, C, F>(&mut self, key: Key, closure: C) -> Sender<T>
    where
        T: Send + 'static,
        C: FnOnce(Receiver<T>) -> F + 'static,
        F: Future<Output = ()>,
    {
        let mut rx = None;

        let tx = self.use_reference(key.clone(), || {
            let (new_tx, new_rx) = mpsc::channel::<T>(1024);
            rx = Some(new_rx);
            new_tx
        });

        let tx = tx.apply(self, |tx| tx.to_owned());

        self.use_async(key, async move {
            if let Some(rx) = rx.take() {
                closure(Receiver(rx)).await;
            }
        });

        Sender(tx)
    }
}