melodium_engine/transmission/
input.rs

1use async_std::channel::{bounded, Receiver, Sender};
2use async_std::sync::Mutex as AsyncMutex;
3use async_trait::async_trait;
4use melodium_common::descriptor::Flow;
5use melodium_common::executive::{
6    Input as ExecutiveInput, RecvResult, TransmissionError, TransmissionValue, Value,
7};
8
9#[derive(Debug)]
10pub struct Input {
11    receiver: Receiver<TransmissionValue>,
12    sender: Sender<TransmissionValue>,
13    buffer: AsyncMutex<Option<TransmissionValue>>,
14    flow: Flow,
15}
16
17impl Input {
18    pub fn new(flow: Flow) -> Self {
19        let (sender, receiver) = bounded(1);
20        Self {
21            receiver,
22            sender,
23            buffer: AsyncMutex::new(None),
24            flow,
25        }
26    }
27
28    pub fn sender(&self) -> &Sender<TransmissionValue> {
29        &self.sender
30    }
31
32    pub fn flow(&self) -> &Flow {
33        &self.flow
34    }
35}
36
37#[async_trait]
38impl ExecutiveInput for Input {
39    fn close(&self) {
40        self.receiver.close();
41    }
42
43    async fn recv_many(&self) -> RecvResult<TransmissionValue> {
44        let mut lock = self.buffer.lock().await;
45        if let Some(data) = lock.take() {
46            Ok(data)
47        } else {
48            match self.receiver.recv().await {
49                Ok(data) => Ok(data),
50                Err(_) => Err(TransmissionError::EverythingClosed),
51            }
52        }
53    }
54
55    async fn recv_one(&self) -> RecvResult<Value> {
56        let mut lock = self.buffer.lock().await;
57        let value = if let Some(data) = lock.as_mut() {
58            data.pop_front().ok_or(TransmissionError::NoData)
59        } else {
60            match self.receiver.recv().await {
61                Ok(mut data) => {
62                    let value = data.pop_front().ok_or(TransmissionError::NoData);
63                    *lock = Some(data);
64                    value
65                }
66                Err(_) => Err(TransmissionError::EverythingClosed),
67            }
68        };
69
70        if lock.as_ref().map(|buf| buf.len()).unwrap_or(0) == 0 {
71            *lock = None;
72        }
73
74        value
75    }
76}
77
78impl Clone for Input {
79    fn clone(&self) -> Self {
80        Self {
81            receiver: self.receiver.clone(),
82            sender: self.sender.clone(),
83            buffer: AsyncMutex::new(None),
84            flow: self.flow.clone(),
85        }
86    }
87}