Skip to main content

melodium_engine/transmission/
input.rs

1use crate::debug::{DataContent, Event, EventKind, TransmissionDebug};
2use async_std::channel::{bounded, Receiver, Sender};
3use async_std::sync::Mutex as AsyncMutex;
4use async_trait::async_trait;
5use melodium_common::descriptor::Flow;
6use melodium_common::executive::{
7    Input as ExecutiveInput, RecvResult, TrackId, TransmissionError, TransmissionValue, Value,
8};
9
10#[derive(Debug)]
11pub struct Input {
12    receiver: Receiver<TransmissionValue>,
13    sender: Sender<TransmissionValue>,
14    buffer: AsyncMutex<Option<TransmissionValue>>,
15    flow: Flow,
16    track_id: TrackId,
17    debug: TransmissionDebug,
18}
19
20impl Input {
21    pub fn new(flow: Flow, track_id: TrackId, debug: TransmissionDebug) -> Self {
22        let (sender, receiver) = bounded(1);
23        Self {
24            receiver,
25            sender,
26            buffer: AsyncMutex::new(None),
27            flow,
28            track_id,
29            debug,
30        }
31    }
32
33    pub fn sender(&self) -> &Sender<TransmissionValue> {
34        &self.sender
35    }
36
37    pub fn flow(&self) -> &Flow {
38        &self.flow
39    }
40
41    pub fn track_id(&self) -> &TrackId {
42        &self.track_id
43    }
44
45    pub fn transmission_debug(&self) -> &TransmissionDebug {
46        &self.debug
47    }
48}
49
50#[async_trait]
51impl ExecutiveInput for Input {
52    fn close(&self) {
53        self.receiver.close();
54        match &self.debug {
55            TransmissionDebug::None => {}
56            TransmissionDebug::Basic(world, transmission_details)
57            | TransmissionDebug::Detailed(world, transmission_details) => {
58                world.send_debug(Event::new(EventKind::InputClosed {
59                    input: transmission_details.clone(),
60                    track_id: self.track_id.clone(),
61                }))
62            }
63        }
64    }
65
66    async fn recv_many(&self) -> RecvResult<TransmissionValue> {
67        let mut lock = self.buffer.lock().await;
68        let data = if let Some(data) = lock.take() {
69            data
70        } else {
71            match self.receiver.recv().await {
72                Ok(data) => data,
73                Err(_) => return Err(TransmissionError::EverythingClosed),
74            }
75        };
76
77        match &self.debug {
78            TransmissionDebug::None => {}
79            TransmissionDebug::Basic(world, transmission_details)
80            | TransmissionDebug::Detailed(world, transmission_details) => {
81                world
82                    .send_debug_async(Event::new(EventKind::DataReceived {
83                        input: transmission_details.clone(),
84                        track_id: self.track_id.clone(),
85                        data: DataContent::Count { count: data.len() },
86                    }))
87                    .await
88            }
89        }
90
91        Ok(data)
92    }
93
94    async fn recv_one(&self) -> RecvResult<Value> {
95        let mut lock = self.buffer.lock().await;
96        let value = if let Some(data) = lock.as_mut() {
97            data.pop_front().ok_or(TransmissionError::NoData)
98        } else {
99            match self.receiver.recv().await {
100                Ok(mut data) => {
101                    let value = data.pop_front().ok_or(TransmissionError::NoData);
102                    *lock = Some(data);
103                    value
104                }
105                Err(_) => Err(TransmissionError::EverythingClosed),
106            }
107        };
108
109        if lock.as_ref().map(|buf| buf.len()).unwrap_or(0) == 0 {
110            *lock = None;
111        }
112
113        match &self.debug {
114            TransmissionDebug::None => {}
115            TransmissionDebug::Basic(world, transmission_details)
116            | TransmissionDebug::Detailed(world, transmission_details) => {
117                world
118                    .send_debug_async(Event::new(EventKind::DataReceived {
119                        input: transmission_details.clone(),
120                        track_id: self.track_id.clone(),
121                        data: DataContent::Count { count: 1 },
122                    }))
123                    .await
124            }
125        }
126
127        value
128    }
129}
130
131impl Clone for Input {
132    fn clone(&self) -> Self {
133        Self {
134            receiver: self.receiver.clone(),
135            sender: self.sender.clone(),
136            buffer: AsyncMutex::new(None),
137            flow: self.flow.clone(),
138            track_id: self.track_id,
139            debug: self.debug.clone(),
140        }
141    }
142}