Skip to main content

std_mel/data/map/
mod.rs

1use melodium_core::{executive::*, *};
2use melodium_macro::{check, mel_data, mel_function, mel_treatment};
3use std::collections::HashMap;
4use std::sync::Arc;
5
6pub mod block;
7
8/// A heterogeneous key→value map where keys are strings and values can be any Mélodium type.
9///
10/// Used to pass structured data between treatments and to build JSON-like payloads.
11/// Supports `entry`, `get`, `insert`, and `merge` operations.
12/// Later entries with the same key overwrite earlier ones.
13#[mel_data(
14    traits (PartialEquality Serialize Display)
15)]
16#[derive(Clone, Debug, PartialEq, Serialize)]
17pub struct Map {
18    pub map: HashMap<String, Value>,
19}
20
21impl Map {
22    pub fn new() -> Self {
23        Self {
24            map: HashMap::new(),
25        }
26    }
27
28    pub fn new_with(map: HashMap<String, Value>) -> Self {
29        Self { map }
30    }
31}
32
33impl Display for Map {
34    fn display(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
35        write!(f, "{:#?}", self)
36    }
37}
38
39/// Build a `Map` by merging a list of single-entry maps.
40///
41/// Each element in `entries` should be produced by `|entry(key, value)`.
42/// Later entries with the same key overwrite earlier ones.
43#[mel_function]
44pub fn map(entries: Vec<Map>) -> Map {
45    let mut map = HashMap::new();
46    for submap in entries {
47        map.extend(submap.map);
48    }
49    Map { map }
50}
51
52/// Build a single-entry `Map` mapping `key` to `value`.
53///
54/// Typically used as an argument to `|map([...])` to construct multi-entry maps.
55#[mel_function(
56    generic T ()
57)]
58pub fn entry(key: string, value: T) -> Map {
59    let mut map = HashMap::new();
60    map.insert(key, value);
61    Map { map }
62}
63
64/// For every `value` received on the stream, produce a single-entry `Map` with `key` → `value` and emit it on `map`.
65#[mel_treatment(
66    generic T ()
67    input value Stream<T>
68    output map Stream<Map>
69)]
70pub async fn entry(key: string) {
71    while let Ok(value) = value.recv_one().await {
72        let mut new_map = HashMap::new();
73        new_map.insert(key.clone(), value);
74        let new_map = Map { map: new_map };
75        check!(map.send_one(Value::Data(Arc::new(new_map))).await)
76    }
77}
78
79/// Look up `key` in `map` and return its value as `Option<T>`, or `none` if the key is absent or the stored value is of a different type.
80#[mel_function(
81    generic T ()
82)]
83pub fn get(map: Map, key: string) -> Option<T> {
84    generics
85        .get("T")
86        .map(|dt| map.map.get(&key).cloned().filter(|v| &v.datatype() == dt))
87        .flatten()
88}
89
90/// For every `map` received on the stream, look up `key` and emit the result as `Option<T>` on `value`.
91///
92/// Emits `none` if the key is absent or the stored value does not match type `T`.
93#[mel_treatment(
94    generic T ()
95    input map Stream<Map>
96    output value Stream<Option<T>>
97)]
98pub async fn get(key: string) {
99    while let Ok(map) = map.recv_one().await.map(|val| {
100        GetData::<Arc<dyn Data>>::try_data(val)
101            .unwrap()
102            .downcast_arc::<Map>()
103            .unwrap()
104    }) {
105        check!(value.send_one(map.map.get(&key).cloned().into()).await)
106    }
107}
108
109/// Return a copy of `map` with `key` set to `value`, overwriting any existing entry for that key.
110#[mel_function(
111    generic T ()
112)]
113pub fn insert(mut map: Map, key: string, value: T) -> Map {
114    map.map.insert(key, value);
115    map
116}
117
118/// For every (`base`, `value`) pair received from the two streams, insert `key` → `value` into a copy of `base` and emit it on `map`.
119#[mel_treatment(
120    generic T ()
121    input base Stream<Map>
122    input value Stream<T>
123    output map Stream<Map>
124)]
125pub async fn insert(key: string) {
126    while let (Ok(base), Ok(value)) = (
127        base.recv_one().await.map(|val| {
128            GetData::<Arc<dyn Data>>::try_data(val)
129                .unwrap()
130                .downcast_arc::<Map>()
131                .unwrap()
132        }),
133        value.recv_one().await,
134    ) {
135        let mut new_map = Arc::unwrap_or_clone(base);
136        new_map.map.insert(key.clone(), value);
137        check!(map.send_one(Value::Data(Arc::new(new_map))).await)
138    }
139}