std_mel/flow/
concentrate.rs

1use async_std::{
2    channel::{bounded, Receiver, Sender, WeakSender},
3    sync::Mutex,
4};
5use melodium_core::{
6    common::{descriptor::DataType, executive::TrackId},
7    *,
8};
9use melodium_macro::{check, mel_model, mel_treatment};
10use std::collections::{hash_map::Entry as HashMapEntry, HashMap};
11
12#[derive(Debug)]
13enum SenderState {
14    Strong(Sender<Value>),
15    Weak(WeakSender<Value>),
16}
17
18#[derive(Debug)]
19struct TrackEntry {
20    pub track_sender: Mutex<SenderState>,
21    pub track_receiver: Mutex<Option<Receiver<Value>>>,
22}
23
24#[mel_model]
25#[derive(Debug)]
26pub struct Concentrator {
27    _model: std::sync::Weak<ConcentratorModel>,
28    tracks: Mutex<HashMap<TrackId, Vec<(DataType, TrackEntry)>>>,
29}
30
31impl Concentrator {
32    pub fn new(model: std::sync::Weak<ConcentratorModel>) -> Self {
33        Self {
34            _model: model,
35            tracks: Mutex::new(HashMap::new()),
36        }
37    }
38
39    fn invoke_source(&self, _source: &str, _params: HashMap<String, Value>) {}
40
41    pub async fn track_sender(
42        &self,
43        track_id: TrackId,
44        data_type: DataType,
45    ) -> Option<Sender<Value>> {
46        match self.tracks.lock().await.entry(track_id) {
47            HashMapEntry::Occupied(mut occupied_entry) => {
48                let entries = occupied_entry.get_mut();
49
50                if let Some((_, entry)) = entries.iter_mut().find(|(dt, _)| dt == &data_type) {
51                    let mut weak_sender = None;
52                    let sender = match &entry.track_sender.get_mut() {
53                        SenderState::Strong(sender) => {
54                            weak_sender = Some(sender.downgrade());
55                            Some(sender.clone())
56                        }
57                        SenderState::Weak(weak_sender) => weak_sender.upgrade(),
58                    };
59
60                    if let Some(weak_sender) = weak_sender {
61                        *entry.track_sender.get_mut() = SenderState::Weak(weak_sender);
62                    }
63
64                    sender
65                } else {
66                    let couple = bounded(500);
67                    let track_entry = TrackEntry {
68                        track_sender: Mutex::new(SenderState::Weak(couple.0.downgrade())),
69                        track_receiver: Mutex::new(Some(couple.1)),
70                    };
71
72                    entries.push((data_type, track_entry));
73
74                    Some(couple.0)
75                }
76            }
77            HashMapEntry::Vacant(vacant_entry) => {
78                let couple = bounded(500);
79                let track_entry = TrackEntry {
80                    track_sender: Mutex::new(SenderState::Weak(couple.0.downgrade())),
81                    track_receiver: Mutex::new(Some(couple.1)),
82                };
83
84                let entries = vec![(data_type, track_entry)];
85                vacant_entry.insert(entries);
86
87                Some(couple.0)
88            }
89        }
90    }
91
92    pub async fn track_receiver(
93        &self,
94        track_id: TrackId,
95        data_type: DataType,
96    ) -> Option<Receiver<Value>> {
97        match self.tracks.lock().await.entry(track_id) {
98            HashMapEntry::Occupied(mut occupied_entry) => {
99                let entries = occupied_entry.get_mut();
100
101                if let Some((_, entry)) = entries.iter_mut().find(|(dt, _)| dt == &data_type) {
102                    entry.track_receiver.get_mut().take()
103                } else {
104                    let couple = bounded(500);
105                    let track_entry = TrackEntry {
106                        track_sender: Mutex::new(SenderState::Strong(couple.0)),
107                        track_receiver: Mutex::new(None),
108                    };
109
110                    entries.push((data_type, track_entry));
111
112                    Some(couple.1)
113                }
114            }
115            HashMapEntry::Vacant(vacant_entry) => {
116                let couple = bounded(500);
117                let track_entry = TrackEntry {
118                    track_sender: Mutex::new(SenderState::Strong(couple.0)),
119                    track_receiver: Mutex::new(None),
120                };
121
122                let entries = vec![(data_type, track_entry)];
123                vacant_entry.insert(entries);
124
125                Some(couple.1)
126            }
127        }
128    }
129}
130
131#[mel_treatment(
132    model concentrator Concentrator
133    generic T ()
134    input data Stream<T>
135)]
136pub async fn concentrate() {
137    let model = ConcentratorModel::into(concentrator);
138    let concentrator = model.inner();
139
140    let data_type = T;
141
142    if let Some(sender) = concentrator.track_sender(track_id, data_type).await {
143        while let Ok(value) = data.recv_one().await {
144            check!(sender.send(value).await)
145        }
146    }
147}
148
149#[mel_treatment(
150    model concentrator Concentrator
151    generic T ()
152    input data Block<T>
153)]
154pub async fn concentrateBlock() {
155    let model = ConcentratorModel::into(concentrator);
156    let concentrator = model.inner();
157
158    let data_type = T;
159
160    if let Some(sender) = concentrator.track_sender(track_id, data_type).await {
161        if let Ok(value) = data.recv_one().await {
162            let _ = sender.send(value).await;
163        }
164    }
165}
166
167#[mel_treatment(
168    model concentrator Concentrator
169    generic T ()
170    input trigger Block<T>
171    output data Stream<T>
172)]
173pub async fn concentrated() {
174    let model = ConcentratorModel::into(concentrator);
175    let concentrator = model.inner();
176
177    let data_type = T;
178
179    if let Ok(_) = trigger.recv_one().await {
180        if let Some(receiver) = concentrator.track_receiver(track_id, data_type).await {
181            while let Ok(value) = receiver.recv().await {
182                check!(data.send_one(value).await)
183            }
184        }
185    }
186}