mechutil 0.8.0

Utility structures and functions for mechatronics applications.
Documentation
//
// Copyright (C) 2024 Automated Design Corp.. All Rights Reserved.
// Created Date: 2024-10-03 08:36:27
// -----
// Last Modified: 2024-10-19 16:03:25
// -----
//
//

//! An asynchronous, bi-directional channel meant for communicating via an internal bus.
//! We use tokio mpsc (multi-producer, single consumer) channels because tokio typically
//! runs through all or projects.
//!
//! The AsyncBusChannel uses generics to allow any type of message.
//!

use tokio::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
use tokio::sync::mpsc::{Receiver, Sender, channel as create_channel};

/// An asynchronous, bi-directional channel.
/// The AsyncBusChannel uses generics to allow any type of message payload.
///
/// # Examples
///
/// ```ignore
/// let (mut l, mut r) = AsyncBusChannel::new::<String, String>(32);
///
/// l.send("Hello from chan1".to_string()).await.unwrap();
/// r.send("Hello from chan2".to_string()).await.unwrap();
///
/// assert_eq!(r.recv().await.unwrap(), "Hello from chan1");
/// assert_eq!(l.recv().await.unwrap(), "Hello from chan2");
/// ```
#[derive(Debug)]
pub struct AsyncBusChannel<S, R> {
    sender: Sender<S>,
    receiver: Receiver<R>,
}

impl<S, R> AsyncBusChannel<S, R> {
    /// Sends a message through the channel.
    ///
    /// # Arguments
    ///
    /// * `s` - The message to send.
    ///
    /// # Returns
    ///
    /// * `Result<(), SendError<S>>` - Returns `Ok(())` if the message was sent successfully, or an error if it wasn't.
    ///
    /// # Examples
    ///
    /// ```ignore
    /// channel.send("This is a message.".to_string()).await.unwrap();
    /// ```
    pub async fn send(&self, s: S) -> Result<(), SendError<S>> {
        self.sender.send(s).await
    }

    /// Clone the multi-producer sender so that it can be shared with another client.
    /// The same receiver is used. Note that the receiver will not inherently know the
    /// source of received messages.
    pub fn clone_sender(&self) -> Sender<S> {
        return self.sender.clone();
    }

    /// Receives a message from the AsyncBusChannel.
    ///
    /// # Returns
    ///
    /// * `Option<R>` - Returns `Some(message)` if a message was received, or `None` if the channel is closed.
    ///
    /// # Examples
    ///
    /// ```ignore
    /// while let Some(msg) = channel.recv().await {
    ///     println!("Received: {}", msg);
    /// }
    /// ```
    pub async fn recv(&mut self) -> Option<R> {
        self.receiver.recv().await
    }

    /// Attempts to send a message through the channel without blocking.
    ///
    /// # Arguments
    ///
    /// * `s` - The message to send.
    ///
    /// # Returns
    ///
    /// * `Result<(), SendError<S>>` - Returns `Ok(())` if the message was sent, or an error if the channel is full or closed.
    ///
    /// # Examples
    ///
    /// ```ignore
    /// match channel.try_send("Hello".to_string()) {
    ///    Ok(_) => println!("Message sent"),
    ///    Err(e) => println!("Error: {:?}", e),
    /// }
    /// ```
    pub fn try_send(&self, s: S) -> Result<(), TrySendError<S>> {
        self.sender.try_send(s)
    }

    /// Attempts to receive a message from the channel without blocking.
    ///
    /// # Returns
    ///
    /// * `Result<R, TryRecvError>` - Returns `Ok(message)` if a message was received, or an error if the channel is empty or closed.
    ///
    /// # Examples
    /// ```ignore
    /// match channel.try_recv() {
    ///     Ok(msg) => println!("Received: {}", msg),
    ///     Err(e) => println!("Error: {:?}", e),
    /// }
    /// ```
    pub fn try_recv(&mut self) -> Result<R, TryRecvError> {
        self.receiver.try_recv()
    }
}

/// Creates a complete asynchronous, bidirectional pipeline with the specified buffer size for each channel.
/// A pipeline is the combination of two channels, allowing two separate instances to communicate bi-directionally.
///
/// # Arguments
///
/// * `buffer_size` - The number of messages than can be buffered in the channel. A minimum of 32 is recommended.
///
/// # Returns
///
/// * `(AsyncBusChannel<T, U>, AsyncBusChannel<U, T>)` - Returns a tuple of two `AsyncBusChannel` instances.
///
/// # Examples
///
/// ```
/// use mechutil::async_channel::{AsyncBusChannel, async_pipeline};
///
/// #[tokio::main]
/// async fn main() {
///     let (mut chan1, mut chan2) = async_pipeline::<String, String>(32);
///
///     chan1.send("Message from channel 1".to_string()).await.unwrap();
///     chan2.send("Message from channel 2".to_string()).await.unwrap();
///
///     let msg1 = chan2.recv().await.unwrap();
///     let msg2 = chan1.recv().await.unwrap();
///
///     assert_eq!(msg1, "Message from channel 1");
///     assert_eq!(msg2, "Message from channel 2");
/// }
/// ```
pub fn async_pipeline<T, U>(
    message_buffer_size: usize,
) -> (AsyncBusChannel<T, U>, AsyncBusChannel<U, T>) {
    let (ls, lr) = create_channel(message_buffer_size);
    let (rs, rr) = create_channel(message_buffer_size);

    (
        AsyncBusChannel {
            sender: ls,
            receiver: rr,
        },
        AsyncBusChannel {
            sender: rs,
            receiver: lr,
        },
    )
}