futuresdr 0.0.27

An Experimental Async SDR Runtime for Heterogeneous Architectures.
Documentation
//! Message/Event/RPC-based Ports
use futures::channel::mpsc::Sender;
use futures::prelude::*;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use crate::anyhow::Result;
use crate::runtime::BlockMessage;
use crate::runtime::BlockMeta;
use crate::runtime::Pmt;

pub struct MessageInput<T: ?Sized> {
    name: String,
    #[allow(clippy::type_complexity)]
    handler: Arc<
        dyn for<'a> Fn(
                &'a mut T,
                &'a mut MessageIo<T>,
                &'a mut BlockMeta,
                Pmt,
            ) -> Pin<Box<dyn Future<Output = Result<Pmt>> + Send + 'a>>
            + Send
            + Sync,
    >,
}

impl<T: Send + ?Sized> MessageInput<T> {
    #[allow(clippy::type_complexity)]
    pub fn new(
        name: &str,
        handler: Arc<
            dyn for<'a> Fn(
                    &'a mut T,
                    &'a mut MessageIo<T>,
                    &'a mut BlockMeta,
                    Pmt,
                )
                    -> Pin<Box<dyn Future<Output = Result<Pmt>> + Send + 'a>>
                + Send
                + Sync,
        >,
    ) -> MessageInput<T> {
        MessageInput {
            name: name.to_string(),
            handler,
        }
    }

    #[allow(clippy::type_complexity)]
    pub fn get_handler(
        &self,
    ) -> Arc<
        dyn for<'a> Fn(
                &'a mut T,
                &'a mut MessageIo<T>,
                &'a mut BlockMeta,
                Pmt,
            ) -> Pin<Box<dyn Future<Output = Result<Pmt>> + Send + 'a>>
            + Send
            + Sync,
    > {
        self.handler.clone()
    }

    pub fn name(&self) -> &str {
        &self.name
    }
}

#[derive(Debug)]
pub struct MessageOutput {
    name: String,
    handlers: Vec<(usize, Sender<BlockMessage>)>,
}

impl MessageOutput {
    pub fn new(name: &str) -> MessageOutput {
        MessageOutput {
            name: name.to_string(),
            handlers: Vec::new(),
        }
    }

    pub fn name(&self) -> &str {
        &self.name
    }

    pub fn connect(&mut self, port: usize, sender: Sender<BlockMessage>) {
        self.handlers.push((port, sender));
    }

    pub async fn notify_finished(&mut self) {
        for (_, sender) in self.handlers.iter_mut() {
            let _ = sender.send(BlockMessage::Terminate).await;
        }
    }

    pub async fn post(&mut self, p: Pmt) {
        for (port_id, sender) in self.handlers.iter_mut() {
            sender
                .send(BlockMessage::Call {
                    port_id: *port_id,
                    data: p.clone(),
                })
                .await
                .unwrap();
        }
    }
}

pub struct MessageIo<T: ?Sized> {
    inputs: Vec<MessageInput<T>>,
    outputs: Vec<MessageOutput>,
}

impl<T: Send + ?Sized> MessageIo<T> {
    fn new(inputs: Vec<MessageInput<T>>, outputs: Vec<MessageOutput>) -> Self {
        MessageIo { inputs, outputs }
    }

    pub fn input_name_to_id(&self, name: &str) -> Option<usize> {
        self.inputs
            .iter()
            .enumerate()
            .find(|item| item.1.name() == name)
            .map(|(i, _)| i)
    }

    pub fn input(&self, id: usize) -> &MessageInput<T> {
        &self.inputs[id]
    }

    pub fn input_names(&self) -> Vec<String> {
        self.inputs.iter().map(|x| x.name().to_string()).collect()
    }

    pub fn outputs(&self) -> &Vec<MessageOutput> {
        &self.outputs
    }

    pub fn outputs_mut(&mut self) -> &mut Vec<MessageOutput> {
        &mut self.outputs
    }

    pub fn output(&self, id: usize) -> &MessageOutput {
        &self.outputs[id]
    }

    pub fn output_mut(&mut self, id: usize) -> &mut MessageOutput {
        &mut self.outputs[id]
    }

    pub fn output_name_to_id(&self, name: &str) -> Option<usize> {
        self.outputs
            .iter()
            .enumerate()
            .find(|item| item.1.name() == name)
            .map(|(i, _)| i)
    }

    pub async fn post(&mut self, id: usize, p: Pmt) {
        self.output_mut(id).post(p).await;
    }
}

pub struct MessageIoBuilder<T> {
    inputs: Vec<MessageInput<T>>,
    outputs: Vec<MessageOutput>,
}

impl<T: Send> MessageIoBuilder<T> {
    pub fn new() -> MessageIoBuilder<T> {
        MessageIoBuilder {
            inputs: Vec::new(),
            outputs: Vec::new(),
        }
    }

    #[must_use]
    pub fn add_input(
        mut self,
        name: &str,
        c: impl for<'a> Fn(
                &'a mut T,
                &'a mut MessageIo<T>,
                &'a mut BlockMeta,
                Pmt,
            ) -> Pin<Box<dyn Future<Output = Result<Pmt>> + Send + 'a>>
            + Send
            + Sync
            + 'static,
    ) -> MessageIoBuilder<T> {
        self.inputs.push(MessageInput::new(name, Arc::new(c)));
        self
    }

    #[must_use]
    pub fn add_output(mut self, name: &str) -> MessageIoBuilder<T> {
        self.outputs.push(MessageOutput::new(name));
        self
    }

    pub fn build(self) -> MessageIo<T> {
        MessageIo::new(self.inputs, self.outputs)
    }
}

impl<T: Send> Default for MessageIoBuilder<T> {
    fn default() -> Self {
        Self::new()
    }
}