std_mel/data/string_map/
block.rs

1use super::*;
2use melodium_macro::mel_treatment;
3use std::collections::HashMap;
4use std::sync::Arc;
5
6/// Create maps with one entry
7///
8/// When `value` is received, generates a mono-entry map with it.
9#[mel_treatment(
10    input value Block<string>
11    output map Block<StringMap>
12)]
13pub async fn entry(key: string) {
14    if let Ok(value) = value
15        .recv_one()
16        .await
17        .map(|val| GetData::<String>::try_data(val).unwrap())
18    {
19        let mut new_map = HashMap::new();
20        new_map.insert(key.clone(), value);
21        let new_map = StringMap { map: new_map };
22        let _ = map.send_one(Value::Data(Arc::new(new_map))).await;
23    }
24}
25
26/// Get a map entry
27///
28/// Takes in `map` the `key` entry.
29#[mel_treatment(
30    input map Block<StringMap>
31    output value Block<Option<string>>
32)]
33pub async fn get(key: string) {
34    if let Ok(map) = map.recv_one().await.map(|val| {
35        GetData::<Arc<dyn Data>>::try_data(val)
36            .unwrap()
37            .downcast_arc::<StringMap>()
38            .unwrap()
39    }) {
40        let _ = value.send_one(map.map.get(&key).cloned().into()).await;
41    }
42}
43
44/// Insert entry in map
45///
46/// Insert `value` in `base` map, then emit to `map`.
47#[mel_treatment(
48    input base Block<StringMap>
49    input value Block<string>
50    output map Block<StringMap>
51)]
52pub async fn insert(key: string) {
53    if let (Ok(base), Ok(value)) = (
54        base.recv_one().await.map(|val| {
55            GetData::<Arc<dyn Data>>::try_data(val)
56                .unwrap()
57                .downcast_arc::<StringMap>()
58                .unwrap()
59        }),
60        value
61            .recv_one()
62            .await
63            .map(|val| GetData::<String>::try_data(val).unwrap()),
64    ) {
65        let mut new_map = Arc::unwrap_or_clone(base);
66        new_map.map.insert(key.clone(), value);
67        let _ = map.send_one(Value::Data(Arc::new(new_map))).await;
68    }
69}
70
71/// Merge two maps
72///
73/// Merge map `entries` in `base`.
74/// `entries` erase existing entries in `base` if they already exists.
75/// `entries` can be omitted (closed input) and `merge` will still be emitted if `base` is received.
76#[mel_treatment(
77    input base Block<StringMap>
78    input entries Block<StringMap>
79    output merged Block<StringMap>
80)]
81pub async fn merge() {
82    if let Ok(base) = base.recv_one().await.map(|val| {
83        GetData::<Arc<dyn Data>>::try_data(val)
84            .unwrap()
85            .downcast_arc::<StringMap>()
86            .unwrap()
87    }) {
88        if let Ok(entries) = entries.recv_one().await.map(|val| {
89            GetData::<Arc<dyn Data>>::try_data(val)
90                .unwrap()
91                .downcast_arc::<StringMap>()
92                .unwrap()
93        }) {
94            let mut new_map = Arc::unwrap_or_clone(base);
95            for (key, value) in &entries.map {
96                new_map.map.insert(key.clone(), value.clone());
97            }
98
99            let _ = merged.send_one(Value::Data(Arc::new(new_map))).await;
100        } else {
101            let _ = merged.send_one(Value::Data(base)).await;
102        }
103    }
104}