Skip to main content

std_mel/data/map/
block.rs

1use super::*;
2use melodium_macro::mel_treatment;
3use std::collections::HashMap;
4use std::sync::Arc;
5
6/// Create maps with one entry
7///
8/// When `value` is received, generates a mono-entry map with it.
9#[mel_treatment(
10    generic T ()
11    input value Block<T>
12    output map Block<Map>
13)]
14pub async fn entry(key: string) {
15    if let Ok(value) = value.recv_one().await {
16        let mut new_map = HashMap::new();
17        new_map.insert(key.clone(), value);
18        let new_map = Map { map: new_map };
19        let _ = map.send_one(Value::Data(Arc::new(new_map))).await;
20    }
21}
22
23/// Get a map entry
24///
25/// Takes in `map` the `key` entry.
26#[mel_treatment(
27    generic T ()
28    input map Block<Map>
29    output value Block<Option<T>>
30)]
31pub async fn get(key: string) {
32    if let Ok(map) = map.recv_one().await.map(|val| {
33        GetData::<Arc<dyn Data>>::try_data(val)
34            .unwrap()
35            .downcast_arc::<Map>()
36            .unwrap()
37    }) {
38        let _ = value.send_one(map.map.get(&key).cloned().into()).await;
39    }
40}
41
42/// Insert entry in map
43///
44/// Insert `value` in `base` map, then emit to `map`.
45#[mel_treatment(
46    generic T ()
47    input base Block<Map>
48    input value Block<T>
49    output map Block<Map>
50)]
51pub async fn insert(key: string) {
52    if let (Ok(base), Ok(value)) = (
53        base.recv_one().await.map(|val| {
54            GetData::<Arc<dyn Data>>::try_data(val)
55                .unwrap()
56                .downcast_arc::<Map>()
57                .unwrap()
58        }),
59        value.recv_one().await,
60    ) {
61        let mut new_map = Arc::unwrap_or_clone(base);
62        new_map.map.insert(key.clone(), value);
63        let _ = map.send_one(Value::Data(Arc::new(new_map))).await;
64    }
65}
66
67/// Merge two maps
68///
69/// Merge map `entries` in `base`.
70/// `entries` erase existing entries in `base` if they already exists.
71/// `entries` can be omitted (closed input) and `merge` will still be emitted if `base` is received.
72#[mel_treatment(
73    input base Block<Map>
74    input entries Block<Map>
75    output merged Block<Map>
76)]
77pub async fn merge() {
78    if let Ok(base) = base.recv_one().await.map(|val| {
79        GetData::<Arc<dyn Data>>::try_data(val)
80            .unwrap()
81            .downcast_arc::<Map>()
82            .unwrap()
83    }) {
84        if let Ok(entries) = entries.recv_one().await.map(|val| {
85            GetData::<Arc<dyn Data>>::try_data(val)
86                .unwrap()
87                .downcast_arc::<Map>()
88                .unwrap()
89        }) {
90            let mut new_map = Arc::unwrap_or_clone(base);
91            for (key, value) in &entries.map {
92                new_map.map.insert(key.clone(), value.clone());
93            }
94
95            let _ = merged.send_one(Value::Data(Arc::new(new_map))).await;
96        } else {
97            let _ = merged.send_one(Value::Data(base)).await;
98        }
99    }
100}