Skip to main content

std_mel/data/map/
block.rs

1use super::*;
2use melodium_macro::mel_treatment;
3use std::collections::HashMap;
4use std::sync::Arc;
5
6/// When `value` is received, produce a single-entry `Map` with `key` → `value` and emit it on `map`.
7#[mel_treatment(
8    generic T ()
9    input value Block<T>
10    output map Block<Map>
11)]
12pub async fn entry(key: string) {
13    if let Ok(value) = value.recv_one().await {
14        let mut new_map = HashMap::new();
15        new_map.insert(key.clone(), value);
16        let new_map = Map { map: new_map };
17        let _ = map.send_one(Value::Data(Arc::new(new_map))).await;
18    }
19}
20
21/// Receive one `Map` block and emit the value stored under `key` as `Option<T>` on `value`.
22///
23/// Emits `none` if the key is absent or the stored value does not match type `T`.
24#[mel_treatment(
25    generic T ()
26    input map Block<Map>
27    output value Block<Option<T>>
28)]
29pub async fn get(key: string) {
30    if let Ok(map) = map.recv_one().await.map(|val| {
31        GetData::<Arc<dyn Data>>::try_data(val)
32            .unwrap()
33            .downcast_arc::<Map>()
34            .unwrap()
35    }) {
36        let _ = value.send_one(map.map.get(&key).cloned().into()).await;
37    }
38}
39
40/// Receive one `base` map and one `value` block, insert `key` → `value` into a copy of `base`, and emit the updated map on `map`.
41#[mel_treatment(
42    generic T ()
43    input base Block<Map>
44    input value Block<T>
45    output map Block<Map>
46)]
47pub async fn insert(key: string) {
48    if let (Ok(base), Ok(value)) = (
49        base.recv_one().await.map(|val| {
50            GetData::<Arc<dyn Data>>::try_data(val)
51                .unwrap()
52                .downcast_arc::<Map>()
53                .unwrap()
54        }),
55        value.recv_one().await,
56    ) {
57        let mut new_map = Arc::unwrap_or_clone(base);
58        new_map.map.insert(key.clone(), value);
59        let _ = map.send_one(Value::Data(Arc::new(new_map))).await;
60    }
61}
62
63/// Merge two maps
64///
65/// Merge map `entries` in `base`.
66/// `entries` erase existing entries in `base` if they already exists.
67/// `entries` can be omitted (closed input) and `merge` will still be emitted if `base` is received.
68#[mel_treatment(
69    input base Block<Map>
70    input entries Block<Map>
71    output merged Block<Map>
72)]
73pub async fn merge() {
74    if let Ok(base) = base.recv_one().await.map(|val| {
75        GetData::<Arc<dyn Data>>::try_data(val)
76            .unwrap()
77            .downcast_arc::<Map>()
78            .unwrap()
79    }) {
80        if let Ok(entries) = entries.recv_one().await.map(|val| {
81            GetData::<Arc<dyn Data>>::try_data(val)
82                .unwrap()
83                .downcast_arc::<Map>()
84                .unwrap()
85        }) {
86            let mut new_map = Arc::unwrap_or_clone(base);
87            for (key, value) in &entries.map {
88                new_map.map.insert(key.clone(), value.clone());
89            }
90
91            let _ = merged.send_one(Value::Data(Arc::new(new_map))).await;
92        } else {
93            let _ = merged.send_one(Value::Data(base)).await;
94        }
95    }
96}