Skip to main content

std_mel/flow/
concentrate.rs

1use async_std::{
2    channel::{bounded, Receiver, Sender, WeakSender},
3    sync::Mutex,
4};
5use melodium_core::{
6    common::{descriptor::DataType, executive::TrackId},
7    *,
8};
9use melodium_macro::{check, mel_model, mel_treatment};
10use std::collections::{hash_map::Entry as HashMapEntry, HashMap};
11
12#[derive(Debug)]
13enum SenderState {
14    Strong(Sender<Value>),
15    Weak(WeakSender<Value>),
16}
17
18#[derive(Debug)]
19struct TrackEntry {
20    pub track_sender: Mutex<SenderState>,
21    pub track_receiver: Mutex<Option<Receiver<Value>>>,
22}
23
24/// Collect and re-distribute typed data streams across tracks within a single engine run.
25///
26/// `concentrate` / `concentrateBlock` push values in per-track, per-type channels;
27/// `concentrated` drains those channels back out as a stream.
28/// Useful when multiple upstream tracks need to fan-in into a single downstream consumer
29/// that processes all values after all producers have finished.
30#[mel_model]
31#[derive(Debug)]
32pub struct Concentrator {
33    _model: std::sync::Weak<ConcentratorModel>,
34    tracks: Mutex<HashMap<TrackId, Vec<(DataType, TrackEntry)>>>,
35}
36
37impl Concentrator {
38    pub fn new(model: std::sync::Weak<ConcentratorModel>) -> Self {
39        Self {
40            _model: model,
41            tracks: Mutex::new(HashMap::new()),
42        }
43    }
44
45    fn invoke_source(&self, _source: &str, _params: HashMap<String, Value>) {}
46
47    pub async fn track_sender(
48        &self,
49        track_id: TrackId,
50        data_type: DataType,
51    ) -> Option<Sender<Value>> {
52        match self.tracks.lock().await.entry(track_id) {
53            HashMapEntry::Occupied(mut occupied_entry) => {
54                let entries = occupied_entry.get_mut();
55
56                if let Some((_, entry)) = entries.iter_mut().find(|(dt, _)| dt == &data_type) {
57                    let mut weak_sender = None;
58                    let sender = match &entry.track_sender.get_mut() {
59                        SenderState::Strong(sender) => {
60                            weak_sender = Some(sender.downgrade());
61                            Some(sender.clone())
62                        }
63                        SenderState::Weak(weak_sender) => weak_sender.upgrade(),
64                    };
65
66                    if let Some(weak_sender) = weak_sender {
67                        *entry.track_sender.get_mut() = SenderState::Weak(weak_sender);
68                    }
69
70                    sender
71                } else {
72                    let couple = bounded(500);
73                    let track_entry = TrackEntry {
74                        track_sender: Mutex::new(SenderState::Weak(couple.0.downgrade())),
75                        track_receiver: Mutex::new(Some(couple.1)),
76                    };
77
78                    entries.push((data_type, track_entry));
79
80                    Some(couple.0)
81                }
82            }
83            HashMapEntry::Vacant(vacant_entry) => {
84                let couple = bounded(500);
85                let track_entry = TrackEntry {
86                    track_sender: Mutex::new(SenderState::Weak(couple.0.downgrade())),
87                    track_receiver: Mutex::new(Some(couple.1)),
88                };
89
90                let entries = vec![(data_type, track_entry)];
91                vacant_entry.insert(entries);
92
93                Some(couple.0)
94            }
95        }
96    }
97
98    pub async fn track_receiver(
99        &self,
100        track_id: TrackId,
101        data_type: DataType,
102    ) -> Option<Receiver<Value>> {
103        match self.tracks.lock().await.entry(track_id) {
104            HashMapEntry::Occupied(mut occupied_entry) => {
105                let entries = occupied_entry.get_mut();
106
107                if let Some((_, entry)) = entries.iter_mut().find(|(dt, _)| dt == &data_type) {
108                    entry.track_receiver.get_mut().take()
109                } else {
110                    let couple = bounded(500);
111                    let track_entry = TrackEntry {
112                        track_sender: Mutex::new(SenderState::Strong(couple.0)),
113                        track_receiver: Mutex::new(None),
114                    };
115
116                    entries.push((data_type, track_entry));
117
118                    Some(couple.1)
119                }
120            }
121            HashMapEntry::Vacant(vacant_entry) => {
122                let couple = bounded(500);
123                let track_entry = TrackEntry {
124                    track_sender: Mutex::new(SenderState::Strong(couple.0)),
125                    track_receiver: Mutex::new(None),
126                };
127
128                let entries = vec![(data_type, track_entry)];
129                vacant_entry.insert(entries);
130
131                Some(couple.1)
132            }
133        }
134    }
135}
136
137/// Receive a `Stream<T>` and forward each value into the `Concentrator` channel for type `T` on the current track.
138///
139/// Pair with `concentrated` to retrieve the accumulated values after all producers are done.
140#[mel_treatment(
141    model concentrator Concentrator
142    generic T ()
143    input data Stream<T>
144)]
145pub async fn concentrate() {
146    let model = ConcentratorModel::into(concentrator);
147    let concentrator = model.inner();
148
149    let data_type = T;
150
151    if let Some(sender) = concentrator.track_sender(track_id, data_type).await {
152        while let Ok(value) = data.recv_one().await {
153            check!(sender.send(value).await)
154        }
155    }
156}
157
158/// Receive a single `Block<T>` value and forward it into the `Concentrator` channel for type `T` on the current track.
159///
160/// Block variant of `concentrate`; use when the producer emits exactly one value rather than a stream.
161#[mel_treatment(
162    model concentrator Concentrator
163    generic T ()
164    input data Block<T>
165)]
166pub async fn concentrateBlock() {
167    let model = ConcentratorModel::into(concentrator);
168    let concentrator = model.inner();
169
170    let data_type = T;
171
172    if let Some(sender) = concentrator.track_sender(track_id, data_type).await {
173        if let Ok(value) = data.recv_one().await {
174            let _ = sender.send(value).await;
175        }
176    }
177}
178
179/// Wait for `trigger` then drain the `Concentrator` channel for type `T` on the current track, re-emitting all accumulated values through `data`.
180///
181/// `trigger` must carry a value of the same type `T` as the accumulated data; the value itself is ignored — only its arrival matters.
182#[mel_treatment(
183    model concentrator Concentrator
184    generic T ()
185    input trigger Block<T>
186    output data Stream<T>
187)]
188pub async fn concentrated() {
189    let model = ConcentratorModel::into(concentrator);
190    let concentrator = model.inner();
191
192    let data_type = T;
193
194    if let Ok(_) = trigger.recv_one().await {
195        if let Some(receiver) = concentrator.track_receiver(track_id, data_type).await {
196            while let Ok(value) = receiver.recv().await {
197                check!(data.send_one(value).await)
198            }
199        }
200    }
201}