use std::collections::{HashMap, HashSet};
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use serde_json::Value;
#[derive(Clone, Default)]
pub enum MergeStrategy {
#[default]
Collect,
MergeMap,
First,
Custom(Arc<dyn Fn(Vec<Value>) -> Value + Send + Sync>),
}
impl fmt::Debug for MergeStrategy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Collect => write!(f, "Collect"),
Self::MergeMap => write!(f, "MergeMap"),
Self::First => write!(f, "First"),
Self::Custom(_) => write!(f, "Custom(<fn>)"),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct DeferredNodeConfig {
pub merge_strategy: MergeStrategy,
pub fan_in_timeout: Option<Duration>,
}
pub struct FanInTracker {
expected: HashSet<String>,
received: HashMap<String, Value>,
insertion_order: Vec<String>,
}
impl FanInTracker {
pub fn new(expected_sources: Vec<&str>) -> Self {
Self {
expected: expected_sources.iter().map(|s| (*s).to_string()).collect(),
received: HashMap::new(),
insertion_order: Vec::new(),
}
}
pub fn is_ready(&self) -> bool {
self.expected.iter().all(|s| self.received.contains_key(s))
}
pub fn record(&mut self, source_node: &str, output: Value) {
let key = source_node.to_string();
if !self.received.contains_key(&key) {
self.insertion_order.push(key.clone());
}
self.received.insert(key, output);
}
pub fn merge(&self, strategy: &MergeStrategy) -> Value {
match strategy {
MergeStrategy::Collect => {
let outputs: Vec<Value> = self
.insertion_order
.iter()
.filter_map(|key| self.received.get(key).cloned())
.collect();
Value::Array(outputs)
}
MergeStrategy::MergeMap => {
let mut merged = serde_json::Map::new();
for key in &self.insertion_order {
if let Some(Value::Object(map)) = self.received.get(key) {
for (k, v) in map {
merged.insert(k.clone(), v.clone());
}
}
}
Value::Object(merged)
}
MergeStrategy::First => self
.insertion_order
.first()
.and_then(|key| self.received.get(key).cloned())
.unwrap_or(Value::Null),
MergeStrategy::Custom(f) => {
let outputs: Vec<Value> = self
.insertion_order
.iter()
.filter_map(|key| self.received.get(key).cloned())
.collect();
f(outputs)
}
}
}
pub fn received_count(&self) -> usize {
self.received.len()
}
pub fn expected_count(&self) -> usize {
self.expected.len()
}
pub fn pending_sources(&self) -> Vec<&str> {
self.expected
.iter()
.filter(|s| !self.received.contains_key(*s))
.map(|s| s.as_str())
.collect()
}
pub fn completed_sources(&self) -> Vec<&str> {
self.insertion_order.iter().map(|s| s.as_str()).collect()
}
}
impl fmt::Debug for FanInTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FanInTracker")
.field("expected", &self.expected)
.field("received_keys", &self.insertion_order)
.field("is_ready", &self.is_ready())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_tracker_new_empty_not_ready() {
let tracker = FanInTracker::new(vec!["a", "b", "c"]);
assert!(!tracker.is_ready());
assert_eq!(tracker.expected_count(), 3);
assert_eq!(tracker.received_count(), 0);
}
#[test]
fn test_tracker_ready_when_all_received() {
let mut tracker = FanInTracker::new(vec!["a", "b"]);
tracker.record("a", json!(1));
assert!(!tracker.is_ready());
tracker.record("b", json!(2));
assert!(tracker.is_ready());
}
#[test]
fn test_merge_collect() {
let mut tracker = FanInTracker::new(vec!["x", "y", "z"]);
tracker.record("x", json!("first"));
tracker.record("y", json!("second"));
tracker.record("z", json!("third"));
let result = tracker.merge(&MergeStrategy::Collect);
assert_eq!(result, json!(["first", "second", "third"]));
}
#[test]
fn test_merge_map_combines_objects() {
let mut tracker = FanInTracker::new(vec!["a", "b"]);
tracker.record("a", json!({"key1": "val1", "shared": "from_a"}));
tracker.record("b", json!({"key2": "val2", "shared": "from_b"}));
let result = tracker.merge(&MergeStrategy::MergeMap);
assert_eq!(result, json!({"key1": "val1", "key2": "val2", "shared": "from_b"}));
}
#[test]
fn test_merge_map_skips_non_objects() {
let mut tracker = FanInTracker::new(vec!["a", "b"]);
tracker.record("a", json!(42)); tracker.record("b", json!({"key": "value"}));
let result = tracker.merge(&MergeStrategy::MergeMap);
assert_eq!(result, json!({"key": "value"}));
}
#[test]
fn test_merge_first() {
let mut tracker = FanInTracker::new(vec!["a", "b", "c"]);
tracker.record("b", json!("first_to_arrive"));
tracker.record("a", json!("second_to_arrive"));
tracker.record("c", json!("third_to_arrive"));
let result = tracker.merge(&MergeStrategy::First);
assert_eq!(result, json!("first_to_arrive"));
}
#[test]
fn test_merge_first_empty() {
let tracker = FanInTracker::new(vec!["a"]);
let result = tracker.merge(&MergeStrategy::First);
assert_eq!(result, Value::Null);
}
#[test]
fn test_merge_custom() {
let mut tracker = FanInTracker::new(vec!["a", "b"]);
tracker.record("a", json!(10));
tracker.record("b", json!(20));
let strategy = MergeStrategy::Custom(Arc::new(|outputs| {
let sum: i64 = outputs.iter().filter_map(|v| v.as_i64()).sum();
json!(sum)
}));
let result = tracker.merge(&strategy);
assert_eq!(result, json!(30));
}
#[test]
fn test_record_overwrites_previous() {
let mut tracker = FanInTracker::new(vec!["a"]);
tracker.record("a", json!("first"));
tracker.record("a", json!("second"));
assert!(tracker.is_ready());
assert_eq!(tracker.received_count(), 1);
let result = tracker.merge(&MergeStrategy::First);
assert_eq!(result, json!("second"));
}
#[test]
fn test_pending_and_completed_sources() {
let mut tracker = FanInTracker::new(vec!["a", "b", "c"]);
tracker.record("b", json!(1));
let mut pending = tracker.pending_sources();
pending.sort();
assert_eq!(pending, vec!["a", "c"]);
assert_eq!(tracker.completed_sources(), vec!["b"]);
}
#[test]
fn test_default_config() {
let config = DeferredNodeConfig::default();
assert!(matches!(config.merge_strategy, MergeStrategy::Collect));
assert!(config.fan_in_timeout.is_none());
}
#[test]
fn test_merge_strategy_debug() {
assert_eq!(format!("{:?}", MergeStrategy::Collect), "Collect");
assert_eq!(format!("{:?}", MergeStrategy::MergeMap), "MergeMap");
assert_eq!(format!("{:?}", MergeStrategy::First), "First");
let custom = MergeStrategy::Custom(Arc::new(Value::Array));
assert_eq!(format!("{:?}", custom), "Custom(<fn>)");
}
}