melodium_engine/transmission/
input.rs1use async_std::channel::{bounded, Receiver, Sender};
2use async_std::sync::Mutex as AsyncMutex;
3use async_trait::async_trait;
4use melodium_common::descriptor::Flow;
5use melodium_common::executive::{
6 Input as ExecutiveInput, RecvResult, TransmissionError, TransmissionValue, Value,
7};
8
9#[derive(Debug)]
10pub struct Input {
11 receiver: Receiver<TransmissionValue>,
12 sender: Sender<TransmissionValue>,
13 buffer: AsyncMutex<Option<TransmissionValue>>,
14 flow: Flow,
15}
16
17impl Input {
18 pub fn new(flow: Flow) -> Self {
19 let (sender, receiver) = bounded(1);
20 Self {
21 receiver,
22 sender,
23 buffer: AsyncMutex::new(None),
24 flow,
25 }
26 }
27
28 pub fn sender(&self) -> &Sender<TransmissionValue> {
29 &self.sender
30 }
31
32 pub fn flow(&self) -> &Flow {
33 &self.flow
34 }
35}
36
37#[async_trait]
38impl ExecutiveInput for Input {
39 fn close(&self) {
40 self.receiver.close();
41 }
42
43 async fn recv_many(&self) -> RecvResult<TransmissionValue> {
44 let mut lock = self.buffer.lock().await;
45 if let Some(data) = lock.take() {
46 Ok(data)
47 } else {
48 match self.receiver.recv().await {
49 Ok(data) => Ok(data),
50 Err(_) => Err(TransmissionError::EverythingClosed),
51 }
52 }
53 }
54
55 async fn recv_one(&self) -> RecvResult<Value> {
56 let mut lock = self.buffer.lock().await;
57 let value = if let Some(data) = lock.as_mut() {
58 data.pop_front().ok_or(TransmissionError::NoData)
59 } else {
60 match self.receiver.recv().await {
61 Ok(mut data) => {
62 let value = data.pop_front().ok_or(TransmissionError::NoData);
63 *lock = Some(data);
64 value
65 }
66 Err(_) => Err(TransmissionError::EverythingClosed),
67 }
68 };
69
70 if lock.as_ref().map(|buf| buf.len()).unwrap_or(0) == 0 {
71 *lock = None;
72 }
73
74 value
75 }
76}
77
78impl Clone for Input {
79 fn clone(&self) -> Self {
80 Self {
81 receiver: self.receiver.clone(),
82 sender: self.sender.clone(),
83 buffer: AsyncMutex::new(None),
84 flow: self.flow.clone(),
85 }
86 }
87}