std_mel/flow/
concentrate.rs1use async_std::{
2 channel::{bounded, Receiver, Sender, WeakSender},
3 sync::Mutex,
4};
5use melodium_core::{
6 common::{descriptor::DataType, executive::TrackId},
7 *,
8};
9use melodium_macro::{check, mel_model, mel_treatment};
10use std::collections::{hash_map::Entry as HashMapEntry, HashMap};
11
12#[derive(Debug)]
13enum SenderState {
14 Strong(Sender<Value>),
15 Weak(WeakSender<Value>),
16}
17
18#[derive(Debug)]
19struct TrackEntry {
20 pub track_sender: Mutex<SenderState>,
21 pub track_receiver: Mutex<Option<Receiver<Value>>>,
22}
23
24#[mel_model]
25#[derive(Debug)]
26pub struct Concentrator {
27 _model: std::sync::Weak<ConcentratorModel>,
28 tracks: Mutex<HashMap<TrackId, Vec<(DataType, TrackEntry)>>>,
29}
30
31impl Concentrator {
32 pub fn new(model: std::sync::Weak<ConcentratorModel>) -> Self {
33 Self {
34 _model: model,
35 tracks: Mutex::new(HashMap::new()),
36 }
37 }
38
39 fn invoke_source(&self, _source: &str, _params: HashMap<String, Value>) {}
40
41 pub async fn track_sender(
42 &self,
43 track_id: TrackId,
44 data_type: DataType,
45 ) -> Option<Sender<Value>> {
46 match self.tracks.lock().await.entry(track_id) {
47 HashMapEntry::Occupied(mut occupied_entry) => {
48 let entries = occupied_entry.get_mut();
49
50 if let Some((_, entry)) = entries.iter_mut().find(|(dt, _)| dt == &data_type) {
51 let mut weak_sender = None;
52 let sender = match &entry.track_sender.get_mut() {
53 SenderState::Strong(sender) => {
54 weak_sender = Some(sender.downgrade());
55 Some(sender.clone())
56 }
57 SenderState::Weak(weak_sender) => weak_sender.upgrade(),
58 };
59
60 if let Some(weak_sender) = weak_sender {
61 *entry.track_sender.get_mut() = SenderState::Weak(weak_sender);
62 }
63
64 sender
65 } else {
66 let couple = bounded(500);
67 let track_entry = TrackEntry {
68 track_sender: Mutex::new(SenderState::Weak(couple.0.downgrade())),
69 track_receiver: Mutex::new(Some(couple.1)),
70 };
71
72 entries.push((data_type, track_entry));
73
74 Some(couple.0)
75 }
76 }
77 HashMapEntry::Vacant(vacant_entry) => {
78 let couple = bounded(500);
79 let track_entry = TrackEntry {
80 track_sender: Mutex::new(SenderState::Weak(couple.0.downgrade())),
81 track_receiver: Mutex::new(Some(couple.1)),
82 };
83
84 let entries = vec![(data_type, track_entry)];
85 vacant_entry.insert(entries);
86
87 Some(couple.0)
88 }
89 }
90 }
91
92 pub async fn track_receiver(
93 &self,
94 track_id: TrackId,
95 data_type: DataType,
96 ) -> Option<Receiver<Value>> {
97 match self.tracks.lock().await.entry(track_id) {
98 HashMapEntry::Occupied(mut occupied_entry) => {
99 let entries = occupied_entry.get_mut();
100
101 if let Some((_, entry)) = entries.iter_mut().find(|(dt, _)| dt == &data_type) {
102 entry.track_receiver.get_mut().take()
103 } else {
104 let couple = bounded(500);
105 let track_entry = TrackEntry {
106 track_sender: Mutex::new(SenderState::Strong(couple.0)),
107 track_receiver: Mutex::new(None),
108 };
109
110 entries.push((data_type, track_entry));
111
112 Some(couple.1)
113 }
114 }
115 HashMapEntry::Vacant(vacant_entry) => {
116 let couple = bounded(500);
117 let track_entry = TrackEntry {
118 track_sender: Mutex::new(SenderState::Strong(couple.0)),
119 track_receiver: Mutex::new(None),
120 };
121
122 let entries = vec![(data_type, track_entry)];
123 vacant_entry.insert(entries);
124
125 Some(couple.1)
126 }
127 }
128 }
129}
130
131#[mel_treatment(
132 model concentrator Concentrator
133 generic T ()
134 input data Stream<T>
135)]
136pub async fn concentrate() {
137 let model = ConcentratorModel::into(concentrator);
138 let concentrator = model.inner();
139
140 let data_type = T;
141
142 if let Some(sender) = concentrator.track_sender(track_id, data_type).await {
143 while let Ok(value) = data.recv_one().await {
144 check!(sender.send(value).await)
145 }
146 }
147}
148
149#[mel_treatment(
150 model concentrator Concentrator
151 generic T ()
152 input data Block<T>
153)]
154pub async fn concentrateBlock() {
155 let model = ConcentratorModel::into(concentrator);
156 let concentrator = model.inner();
157
158 let data_type = T;
159
160 if let Some(sender) = concentrator.track_sender(track_id, data_type).await {
161 if let Ok(value) = data.recv_one().await {
162 let _ = sender.send(value).await;
163 }
164 }
165}
166
167#[mel_treatment(
168 model concentrator Concentrator
169 generic T ()
170 input trigger Block<T>
171 output data Stream<T>
172)]
173pub async fn concentrated() {
174 let model = ConcentratorModel::into(concentrator);
175 let concentrator = model.inner();
176
177 let data_type = T;
178
179 if let Ok(_) = trigger.recv_one().await {
180 if let Some(receiver) = concentrator.track_receiver(track_id, data_type).await {
181 while let Ok(value) = receiver.recv().await {
182 check!(data.send_one(value).await)
183 }
184 }
185 }
186}