melodium_engine/transmission/
input.rs1use crate::debug::{DataContent, Event, EventKind, TransmissionDebug};
2use async_std::channel::{bounded, Receiver, Sender};
3use async_std::sync::Mutex as AsyncMutex;
4use async_trait::async_trait;
5use melodium_common::descriptor::Flow;
6use melodium_common::executive::{
7 Input as ExecutiveInput, RecvResult, TrackId, TransmissionError, TransmissionValue, Value,
8};
9
10#[derive(Debug)]
11pub struct Input {
12 receiver: Receiver<TransmissionValue>,
13 sender: Sender<TransmissionValue>,
14 buffer: AsyncMutex<Option<TransmissionValue>>,
15 flow: Flow,
16 track_id: TrackId,
17 debug: TransmissionDebug,
18}
19
20impl Input {
21 pub fn new(flow: Flow, track_id: TrackId, debug: TransmissionDebug) -> Self {
22 let (sender, receiver) = bounded(1);
23 Self {
24 receiver,
25 sender,
26 buffer: AsyncMutex::new(None),
27 flow,
28 track_id,
29 debug,
30 }
31 }
32
33 pub fn sender(&self) -> &Sender<TransmissionValue> {
34 &self.sender
35 }
36
37 pub fn flow(&self) -> &Flow {
38 &self.flow
39 }
40
41 pub fn track_id(&self) -> &TrackId {
42 &self.track_id
43 }
44
45 pub fn transmission_debug(&self) -> &TransmissionDebug {
46 &self.debug
47 }
48}
49
50#[async_trait]
51impl ExecutiveInput for Input {
52 fn close(&self) {
53 self.receiver.close();
54 match &self.debug {
55 TransmissionDebug::None => {}
56 TransmissionDebug::Basic(world, transmission_details)
57 | TransmissionDebug::Detailed(world, transmission_details) => {
58 world.send_debug(Event::new(EventKind::InputClosed {
59 input: transmission_details.clone(),
60 track_id: self.track_id.clone(),
61 }))
62 }
63 }
64 }
65
66 async fn recv_many(&self) -> RecvResult<TransmissionValue> {
67 let mut lock = self.buffer.lock().await;
68 let data = if let Some(data) = lock.take() {
69 data
70 } else {
71 match self.receiver.recv().await {
72 Ok(data) => data,
73 Err(_) => return Err(TransmissionError::EverythingClosed),
74 }
75 };
76
77 match &self.debug {
78 TransmissionDebug::None => {}
79 TransmissionDebug::Basic(world, transmission_details)
80 | TransmissionDebug::Detailed(world, transmission_details) => {
81 world
82 .send_debug_async(Event::new(EventKind::DataReceived {
83 input: transmission_details.clone(),
84 track_id: self.track_id.clone(),
85 data: DataContent::Count { count: data.len() },
86 }))
87 .await
88 }
89 }
90
91 Ok(data)
92 }
93
94 async fn recv_one(&self) -> RecvResult<Value> {
95 let mut lock = self.buffer.lock().await;
96 let value = if let Some(data) = lock.as_mut() {
97 data.pop_front().ok_or(TransmissionError::NoData)
98 } else {
99 match self.receiver.recv().await {
100 Ok(mut data) => {
101 let value = data.pop_front().ok_or(TransmissionError::NoData);
102 *lock = Some(data);
103 value
104 }
105 Err(_) => Err(TransmissionError::EverythingClosed),
106 }
107 };
108
109 if lock.as_ref().map(|buf| buf.len()).unwrap_or(0) == 0 {
110 *lock = None;
111 }
112
113 match &self.debug {
114 TransmissionDebug::None => {}
115 TransmissionDebug::Basic(world, transmission_details)
116 | TransmissionDebug::Detailed(world, transmission_details) => {
117 world
118 .send_debug_async(Event::new(EventKind::DataReceived {
119 input: transmission_details.clone(),
120 track_id: self.track_id.clone(),
121 data: DataContent::Count { count: 1 },
122 }))
123 .await
124 }
125 }
126
127 value
128 }
129}
130
131impl Clone for Input {
132 fn clone(&self) -> Self {
133 Self {
134 receiver: self.receiver.clone(),
135 sender: self.sender.clone(),
136 buffer: AsyncMutex::new(None),
137 flow: self.flow.clone(),
138 track_id: self.track_id,
139 debug: self.debug.clone(),
140 }
141 }
142}