melodium-engine 0.10.0

Mélodium core engine and executor implementation
Documentation
use crate::debug::{DataContent, Event, EventKind, TransmissionDebug};
use async_std::channel::{bounded, Receiver, Sender};
use async_std::sync::Mutex as AsyncMutex;
use async_trait::async_trait;
use melodium_common::descriptor::Flow;
use melodium_common::executive::{
    Input as ExecutiveInput, RecvResult, TrackId, TransmissionError, TransmissionValue, Value,
};

#[derive(Debug)]
pub struct Input {
    receiver: Receiver<TransmissionValue>,
    sender: Sender<TransmissionValue>,
    buffer: AsyncMutex<Option<TransmissionValue>>,
    flow: Flow,
    track_id: TrackId,
    debug: TransmissionDebug,
}

impl Input {
    pub fn new(flow: Flow, track_id: TrackId, debug: TransmissionDebug) -> Self {
        let (sender, receiver) = bounded(1);
        Self {
            receiver,
            sender,
            buffer: AsyncMutex::new(None),
            flow,
            track_id,
            debug,
        }
    }

    pub fn sender(&self) -> &Sender<TransmissionValue> {
        &self.sender
    }

    pub fn flow(&self) -> &Flow {
        &self.flow
    }

    pub fn track_id(&self) -> &TrackId {
        &self.track_id
    }

    pub fn transmission_debug(&self) -> &TransmissionDebug {
        &self.debug
    }
}

#[async_trait]
impl ExecutiveInput for Input {
    fn close(&self) {
        self.receiver.close();
        match &self.debug {
            TransmissionDebug::None => {}
            TransmissionDebug::Basic(world, transmission_details)
            | TransmissionDebug::Detailed(world, transmission_details) => {
                world.send_debug(Event::new(EventKind::InputClosed {
                    input: transmission_details.clone(),
                    track_id: self.track_id.clone(),
                }))
            }
        }
    }

    async fn recv_many(&self) -> RecvResult<TransmissionValue> {
        let mut lock = self.buffer.lock().await;
        let data = if let Some(data) = lock.take() {
            data
        } else {
            match self.receiver.recv().await {
                Ok(data) => data,
                Err(_) => return Err(TransmissionError::EverythingClosed),
            }
        };

        match &self.debug {
            TransmissionDebug::None => {}
            TransmissionDebug::Basic(world, transmission_details)
            | TransmissionDebug::Detailed(world, transmission_details) => {
                world
                    .send_debug_async(Event::new(EventKind::DataReceived {
                        input: transmission_details.clone(),
                        track_id: self.track_id.clone(),
                        data: DataContent::Count { count: data.len() },
                    }))
                    .await
            }
        }

        Ok(data)
    }

    async fn recv_one(&self) -> RecvResult<Value> {
        let mut lock = self.buffer.lock().await;
        let value = if let Some(data) = lock.as_mut() {
            data.pop_front().ok_or(TransmissionError::NoData)
        } else {
            match self.receiver.recv().await {
                Ok(mut data) => {
                    let value = data.pop_front().ok_or(TransmissionError::NoData);
                    *lock = Some(data);
                    value
                }
                Err(_) => Err(TransmissionError::EverythingClosed),
            }
        };

        if lock.as_ref().map(|buf| buf.len()).unwrap_or(0) == 0 {
            *lock = None;
        }

        match &self.debug {
            TransmissionDebug::None => {}
            TransmissionDebug::Basic(world, transmission_details)
            | TransmissionDebug::Detailed(world, transmission_details) => {
                world
                    .send_debug_async(Event::new(EventKind::DataReceived {
                        input: transmission_details.clone(),
                        track_id: self.track_id.clone(),
                        data: DataContent::Count { count: 1 },
                    }))
                    .await
            }
        }

        value
    }
}

impl Clone for Input {
    fn clone(&self) -> Self {
        Self {
            receiver: self.receiver.clone(),
            sender: self.sender.clone(),
            buffer: AsyncMutex::new(None),
            flow: self.flow.clone(),
            track_id: self.track_id,
            debug: self.debug.clone(),
        }
    }
}