dagrs 0.8.0

Dagrs follows the concept of Flow-based Programming and is suitable for the execution of multiple tasks with graph-like dependencies. Dagrs has the characteristics of high performance and asynchronous execution. It provides users with a convenient programming interface.
Documentation
use std::{collections::HashMap, marker::PhantomData, sync::Arc};

use futures::future::join_all;
use tokio::sync::{Mutex, broadcast, mpsc};

use crate::node::NodeId;

use super::information_packet::Content;

/// # Output Channels
/// A hash-table mapping `NodeId` to `OutChannel`. In **Dagrs**, each `Node` stores output
/// channels in this map, enabling `Node` to send information packets to other `Node`s.
#[derive(Default)]
pub struct OutChannels(pub(crate) HashMap<NodeId, Arc<Mutex<OutChannel>>>);

impl OutChannels {
    /// Perform an asynchronous send on the outgoing channel for `NodeId`.
    pub async fn send_to(&self, id: &NodeId, content: Content) -> Result<(), SendErr> {
        match self.get(id) {
            Some(channel) => channel.lock().await.send(content).await,
            None => Err(SendErr::NoSuchChannel),
        }
    }

    /// Broadcasts the `content` to all the [`OutChannel`]s asynchronously.
    pub async fn broadcast(&self, content: Content) -> Vec<Result<(), SendErr>> {
        let futures = self
            .0
            .values()
            .map(|c| async { c.lock().await.send(content.clone()).await });

        join_all(futures).await
    }

    /// Close the channel by the given `NodeId`, and remove the channel in this map.
    pub async fn close(&mut self, id: &NodeId) {
        if self.get(id).is_some() {
            self.0.remove(id);
        }
    }

    pub(crate) async fn close_all(&mut self) {
        self.0.clear();
    }

    fn get(&self, id: &NodeId) -> Option<Arc<Mutex<OutChannel>>> {
        self.0.get(id).cloned()
    }

    pub(crate) fn insert(&mut self, node_id: NodeId, channel: Arc<Mutex<OutChannel>>) {
        self.0.insert(node_id, channel);
    }

    /// Returns a list of all available receiver node IDs.
    pub fn get_receiver_ids(&self) -> Vec<NodeId> {
        self.0.keys().copied().collect()
    }
}

/// # Output Channel
/// Wrapper of senderrs of `tokio::sync::mpsc` and `tokio::sync::broadcast`. **Dagrs** will
/// decide the inner type of channel when building the graph.
/// Learn more about [Tokio Channels](https://tokio.rs/tokio/tutorial/channels).
pub enum OutChannel {
    /// Sender of a `tokio::sync::mpsc` channel.
    Mpsc(mpsc::Sender<Content>),
    /// Sender of a `tokio::sync::broadcast` channel.
    Bcst(broadcast::Sender<Content>),
}

impl OutChannel {
    /// Perform an asynchronous send on this channel.
    async fn send(&self, value: Content) -> Result<(), SendErr> {
        match self {
            OutChannel::Mpsc(sender) => match sender.send(value).await {
                Ok(_) => Ok(()),
                Err(e) => Err(SendErr::ClosedChannel(e.0)),
            },
            OutChannel::Bcst(sender) => match sender.send(value) {
                Ok(_) => Ok(()),
                Err(e) => Err(SendErr::ClosedChannel(e.0)),
            },
        }
    }
}

/// # Output Channel Error Types
/// - NoSuchChannel: try to get a channel with an invalid `NodeId`.
/// - ClosedChannel: the channel is closed alredy.
///
/// In cases of getting errs of type `MpscError` and `BcstError`, the sender
/// will find there are no active receivers left, so try to send messages is
/// meaningless for now.
#[derive(Debug)]
pub enum SendErr {
    NoSuchChannel,
    ClosedChannel(Content),
}

/// # Typed Output Channels
/// A hash-table mapping [`NodeId`] to [`OutChannel`]. This provides type-safe channel communication
/// between nodes.
#[derive(Default)]
pub struct TypedOutChannels<T: Send + Sync + 'static>(
    pub(crate) HashMap<NodeId, Arc<Mutex<OutChannel>>>,
    // maker for type T
    pub(crate) PhantomData<T>,
);

impl<T: Send + Sync + 'static> TypedOutChannels<T> {
    /// Perform an asynchronous send on the outcoming channel from `NodeId`.
    pub async fn send_to(&self, id: &NodeId, content: T) -> Result<(), SendErr> {
        match self.get(id) {
            Some(channel) => channel.lock().await.send(Content::new(content)).await,
            None => Err(SendErr::NoSuchChannel),
        }
    }

    /// Broadcasts the `content` to all the [`TypedOutChannels`] asynchronously.
    pub async fn broadcast(&self, content: T) -> Vec<Result<(), SendErr>> {
        let content = Content::new(content);
        let futures = self
            .0
            .values()
            .map(|c| async { c.lock().await.send(content.clone()).await });

        join_all(futures).await
    }

    /// Close the channel by the given `NodeId`, and remove the channel in this map.
    pub async fn close(&mut self, id: &NodeId) {
        if self.get(id).is_some() {
            self.0.remove(id);
        }
    }

    fn get(&self, id: &NodeId) -> Option<Arc<Mutex<OutChannel>>> {
        self.0.get(id).cloned()
    }

    /// Returns a list of all available receiver node IDs.
    pub fn get_receiver_ids(&self) -> Vec<NodeId> {
        self.0.keys().copied().collect()
    }
}