std_mel/flow/
concentrate.rs1use async_std::{
2 channel::{bounded, Receiver, Sender, WeakSender},
3 sync::Mutex,
4};
5use melodium_core::{
6 common::{descriptor::DataType, executive::TrackId},
7 *,
8};
9use melodium_macro::{check, mel_model, mel_treatment};
10use std::collections::{hash_map::Entry as HashMapEntry, HashMap};
11
12#[derive(Debug)]
13enum SenderState {
14 Strong(Sender<Value>),
15 Weak(WeakSender<Value>),
16}
17
18#[derive(Debug)]
19struct TrackEntry {
20 pub track_sender: Mutex<SenderState>,
21 pub track_receiver: Mutex<Option<Receiver<Value>>>,
22}
23
24#[mel_model]
31#[derive(Debug)]
32pub struct Concentrator {
33 _model: std::sync::Weak<ConcentratorModel>,
34 tracks: Mutex<HashMap<TrackId, Vec<(DataType, TrackEntry)>>>,
35}
36
37impl Concentrator {
38 pub fn new(model: std::sync::Weak<ConcentratorModel>) -> Self {
39 Self {
40 _model: model,
41 tracks: Mutex::new(HashMap::new()),
42 }
43 }
44
45 fn invoke_source(&self, _source: &str, _params: HashMap<String, Value>) {}
46
47 pub async fn track_sender(
48 &self,
49 track_id: TrackId,
50 data_type: DataType,
51 ) -> Option<Sender<Value>> {
52 match self.tracks.lock().await.entry(track_id) {
53 HashMapEntry::Occupied(mut occupied_entry) => {
54 let entries = occupied_entry.get_mut();
55
56 if let Some((_, entry)) = entries.iter_mut().find(|(dt, _)| dt == &data_type) {
57 let mut weak_sender = None;
58 let sender = match &entry.track_sender.get_mut() {
59 SenderState::Strong(sender) => {
60 weak_sender = Some(sender.downgrade());
61 Some(sender.clone())
62 }
63 SenderState::Weak(weak_sender) => weak_sender.upgrade(),
64 };
65
66 if let Some(weak_sender) = weak_sender {
67 *entry.track_sender.get_mut() = SenderState::Weak(weak_sender);
68 }
69
70 sender
71 } else {
72 let couple = bounded(500);
73 let track_entry = TrackEntry {
74 track_sender: Mutex::new(SenderState::Weak(couple.0.downgrade())),
75 track_receiver: Mutex::new(Some(couple.1)),
76 };
77
78 entries.push((data_type, track_entry));
79
80 Some(couple.0)
81 }
82 }
83 HashMapEntry::Vacant(vacant_entry) => {
84 let couple = bounded(500);
85 let track_entry = TrackEntry {
86 track_sender: Mutex::new(SenderState::Weak(couple.0.downgrade())),
87 track_receiver: Mutex::new(Some(couple.1)),
88 };
89
90 let entries = vec![(data_type, track_entry)];
91 vacant_entry.insert(entries);
92
93 Some(couple.0)
94 }
95 }
96 }
97
98 pub async fn track_receiver(
99 &self,
100 track_id: TrackId,
101 data_type: DataType,
102 ) -> Option<Receiver<Value>> {
103 match self.tracks.lock().await.entry(track_id) {
104 HashMapEntry::Occupied(mut occupied_entry) => {
105 let entries = occupied_entry.get_mut();
106
107 if let Some((_, entry)) = entries.iter_mut().find(|(dt, _)| dt == &data_type) {
108 entry.track_receiver.get_mut().take()
109 } else {
110 let couple = bounded(500);
111 let track_entry = TrackEntry {
112 track_sender: Mutex::new(SenderState::Strong(couple.0)),
113 track_receiver: Mutex::new(None),
114 };
115
116 entries.push((data_type, track_entry));
117
118 Some(couple.1)
119 }
120 }
121 HashMapEntry::Vacant(vacant_entry) => {
122 let couple = bounded(500);
123 let track_entry = TrackEntry {
124 track_sender: Mutex::new(SenderState::Strong(couple.0)),
125 track_receiver: Mutex::new(None),
126 };
127
128 let entries = vec![(data_type, track_entry)];
129 vacant_entry.insert(entries);
130
131 Some(couple.1)
132 }
133 }
134 }
135}
136
137#[mel_treatment(
141 model concentrator Concentrator
142 generic T ()
143 input data Stream<T>
144)]
145pub async fn concentrate() {
146 let model = ConcentratorModel::into(concentrator);
147 let concentrator = model.inner();
148
149 let data_type = T;
150
151 if let Some(sender) = concentrator.track_sender(track_id, data_type).await {
152 while let Ok(value) = data.recv_one().await {
153 check!(sender.send(value).await)
154 }
155 }
156}
157
158#[mel_treatment(
162 model concentrator Concentrator
163 generic T ()
164 input data Block<T>
165)]
166pub async fn concentrateBlock() {
167 let model = ConcentratorModel::into(concentrator);
168 let concentrator = model.inner();
169
170 let data_type = T;
171
172 if let Some(sender) = concentrator.track_sender(track_id, data_type).await {
173 if let Ok(value) = data.recv_one().await {
174 let _ = sender.send(value).await;
175 }
176 }
177}
178
179#[mel_treatment(
183 model concentrator Concentrator
184 generic T ()
185 input trigger Block<T>
186 output data Stream<T>
187)]
188pub async fn concentrated() {
189 let model = ConcentratorModel::into(concentrator);
190 let concentrator = model.inner();
191
192 let data_type = T;
193
194 if let Ok(_) = trigger.recv_one().await {
195 if let Some(receiver) = concentrator.track_receiver(track_id, data_type).await {
196 while let Ok(value) = receiver.recv().await {
197 check!(data.send_one(value).await)
198 }
199 }
200 }
201}