use dataflow_rs::engine::utils::set_nested_value;
use datavalue::OwnedDataValue;
use serde_json::Value;
pub fn merge_metadata(message: &mut dataflow_rs::Message, metadata: &Value) {
if let Some(meta_obj) = metadata.as_object() {
for (k, v) in meta_obj {
set_nested_value(
&mut message.context,
&format!("metadata.{k}"),
OwnedDataValue::from(v),
);
}
}
}
pub fn inject_rollout_bucket(message: &mut dataflow_rs::Message) {
let bucket = rand::random::<u32>() % 100;
set_nested_value(
&mut message.context,
"data._rollout_bucket",
OwnedDataValue::from_i64(bucket as i64),
);
}
pub fn remove_rollout_bucket(message: &mut dataflow_rs::Message) {
set_nested_value(
&mut message.context,
"data._rollout_bucket",
OwnedDataValue::Null,
);
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn make_message(data: Value) -> dataflow_rs::Message {
dataflow_rs::Message::from_value(&data)
}
#[test]
fn test_merge_metadata() {
let mut msg = make_message(json!({}));
let metadata = json!({"source": "test", "version": 2});
merge_metadata(&mut msg, &metadata);
assert_eq!(
msg.metadata().get("source").and_then(|v| v.as_str()),
Some("test")
);
assert_eq!(
msg.metadata().get("version").and_then(|v| v.as_i64()),
Some(2)
);
}
#[test]
fn test_inject_rollout_bucket_in_range() {
let mut msg = make_message(json!({}));
inject_rollout_bucket(&mut msg);
let bucket = msg
.data()
.get("_rollout_bucket")
.and_then(|v| v.as_i64())
.expect("test");
assert!(
(0..100).contains(&bucket),
"bucket should be 0–99, got {bucket}"
);
}
#[test]
fn test_remove_rollout_bucket() {
let mut msg = make_message(json!({"_rollout_bucket": 42}));
remove_rollout_bucket(&mut msg);
let is_absent = msg
.data()
.get("_rollout_bucket")
.map(|v| v.is_null())
.unwrap_or(true);
assert!(is_absent, "bucket should be removed or null");
}
}