use crate::{
domain::entities::Event,
error::{AllSourceError, Result},
infrastructure::observability::metrics::MetricsRegistry,
};
use chrono::{DateTime, Duration, Utc};
use dashmap::DashMap;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum WindowType {
Tumbling,
Sliding,
Session,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WindowConfig {
pub window_type: WindowType,
pub size_seconds: i64,
pub slide_seconds: Option<i64>,
pub session_timeout_seconds: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum PipelineOperator {
Filter {
field: String,
value: JsonValue,
op: String,
},
Map {
field: String,
transform: String,
},
Reduce {
field: String,
function: String,
group_by: Option<String>,
},
Window {
config: WindowConfig,
aggregation: Box<PipelineOperator>,
},
Enrich {
source: String,
fields: Vec<String>,
},
Branch {
field: String,
branches: HashMap<String, String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineConfig {
pub id: Uuid,
pub name: String,
pub description: Option<String>,
pub source_event_types: Vec<String>,
pub operators: Vec<PipelineOperator>,
pub enabled: bool,
pub output: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineStats {
pub pipeline_id: Uuid,
pub events_processed: u64,
pub events_filtered: u64,
pub events_failed: u64,
pub last_processed: Option<DateTime<Utc>>,
}
pub struct StatefulOperator {
state: Arc<RwLock<HashMap<String, JsonValue>>>,
windows: Arc<RwLock<HashMap<String, VecDeque<(DateTime<Utc>, Event)>>>>,
}
impl StatefulOperator {
pub fn new() -> Self {
Self {
state: Arc::new(RwLock::new(HashMap::new())),
windows: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn set_state(&self, key: String, value: JsonValue) {
self.state.write().insert(key, value);
}
pub fn get_state(&self, key: &str) -> Option<JsonValue> {
self.state.read().get(key).cloned()
}
pub fn add_to_window(&self, window_key: &str, event: Event, timestamp: DateTime<Utc>) {
let mut windows = self.windows.write();
windows
.entry(window_key.to_string())
.or_default()
.push_back((timestamp, event));
}
pub fn get_window(&self, window_key: &str) -> Vec<Event> {
self.windows
.read()
.get(window_key)
.map(|w| w.iter().map(|(_, e)| e.clone()).collect())
.unwrap_or_default()
}
pub fn evict_window(&self, window_key: &str, cutoff: DateTime<Utc>) {
if let Some(window) = self.windows.write().get_mut(window_key) {
window.retain(|(ts, _)| *ts > cutoff);
}
}
pub fn clear(&self) {
self.state.write().clear();
self.windows.write().clear();
}
}
impl Default for StatefulOperator {
fn default() -> Self {
Self::new()
}
}
pub struct Pipeline {
config: PipelineConfig,
state: StatefulOperator,
stats: Arc<RwLock<PipelineStats>>,
}
impl Pipeline {
pub fn new(config: PipelineConfig) -> Self {
let stats = PipelineStats {
pipeline_id: config.id,
events_processed: 0,
events_filtered: 0,
events_failed: 0,
last_processed: None,
};
Self {
config,
state: StatefulOperator::new(),
stats: Arc::new(RwLock::new(stats)),
}
}
pub fn process(&self, event: &Event) -> Result<Option<JsonValue>> {
if !self.config.source_event_types.is_empty()
&& !self
.config
.source_event_types
.iter()
.any(|t| t == event.event_type_str())
{
return Ok(None);
}
if !self.config.enabled {
return Ok(None);
}
let mut current_value = event.payload.clone();
let mut filtered = false;
for operator in &self.config.operators {
match self.apply_operator(operator, ¤t_value, event) {
Ok(Some(result)) => {
current_value = result;
}
Ok(None) => {
filtered = true;
self.stats.write().events_filtered += 1;
break;
}
Err(e) => {
self.stats.write().events_failed += 1;
tracing::error!("Pipeline {} operator failed: {}", self.config.name, e);
return Err(e);
}
}
}
let mut stats = self.stats.write();
stats.events_processed += 1;
stats.last_processed = Some(Utc::now());
if filtered {
Ok(None)
} else {
Ok(Some(current_value))
}
}
fn apply_operator(
&self,
operator: &PipelineOperator,
value: &JsonValue,
event: &Event,
) -> Result<Option<JsonValue>> {
match operator {
PipelineOperator::Filter {
field,
value: expected,
op,
} => self.apply_filter(field, expected, op, value),
PipelineOperator::Map { field, transform } => self.apply_map(field, transform, value),
PipelineOperator::Reduce {
field,
function,
group_by,
} => self.apply_reduce(field, function, group_by.as_deref(), value, event),
PipelineOperator::Window {
config,
aggregation,
} => self.apply_window(config, aggregation, event),
PipelineOperator::Enrich { source, fields } => self.apply_enrich(source, fields, value),
PipelineOperator::Branch { field, branches } => {
self.apply_branch(field, branches, value)
}
}
}
fn apply_filter(
&self,
field: &str,
expected: &JsonValue,
op: &str,
value: &JsonValue,
) -> Result<Option<JsonValue>> {
let field_value = self.get_field(value, field);
let matches = match op {
"eq" => field_value == Some(expected),
"ne" => field_value != Some(expected),
"gt" => {
if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
(field_value.as_ref(), expected)
{
a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
} else {
false
}
}
"lt" => {
if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
(field_value.as_ref(), expected)
{
a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
} else {
false
}
}
"contains" => {
if let (Some(JsonValue::String(a)), JsonValue::String(b)) =
(field_value.as_ref(), expected)
{
a.contains(b)
} else {
false
}
}
_ => {
return Err(AllSourceError::ValidationError(format!(
"Unknown filter operator: {op}"
)));
}
};
if matches {
Ok(Some(value.clone()))
} else {
Ok(None) }
}
fn apply_map(
&self,
field: &str,
transform: &str,
value: &JsonValue,
) -> Result<Option<JsonValue>> {
let mut result = value.clone();
let field_value = self.get_field(value, field);
let transformed = match transform {
"uppercase" => field_value
.and_then(|v| v.as_str())
.map(|s| JsonValue::String(s.to_uppercase())),
"lowercase" => field_value
.and_then(|v| v.as_str())
.map(|s| JsonValue::String(s.to_lowercase())),
"trim" => field_value
.and_then(|v| v.as_str())
.map(|s| JsonValue::String(s.trim().to_string())),
_ => {
if let Some(stripped) = transform.strip_prefix("multiply:") {
if let Ok(multiplier) = stripped.parse::<f64>() {
field_value.and_then(serde_json::Value::as_f64).map(|n| {
JsonValue::Number(serde_json::Number::from_f64(n * multiplier).unwrap())
})
} else {
None
}
} else if let Some(stripped) = transform.strip_prefix("add:") {
if let Ok(addend) = stripped.parse::<f64>() {
field_value.and_then(serde_json::Value::as_f64).map(|n| {
JsonValue::Number(serde_json::Number::from_f64(n + addend).unwrap())
})
} else {
None
}
} else {
None
}
}
};
if let Some(new_value) = transformed {
self.set_field(&mut result, field, new_value);
}
Ok(Some(result))
}
fn apply_reduce(
&self,
field: &str,
function: &str,
group_by: Option<&str>,
value: &JsonValue,
event: &Event,
) -> Result<Option<JsonValue>> {
let group_key = if let Some(group_field) = group_by {
self.get_field(value, group_field)
.and_then(|v| v.as_str())
.unwrap_or("default")
.to_string()
} else {
"default".to_string()
};
let state_key = format!("reduce_{function}_{group_key}");
let current = self.state.get_state(&state_key);
let field_value = self.get_field(value, field);
let new_value = match function {
"count" => {
let count = current.and_then(|v| v.as_u64()).unwrap_or(0) + 1;
JsonValue::Number(count.into())
}
"sum" => {
let current_sum = current.and_then(|v| v.as_f64()).unwrap_or(0.0);
let value_to_add = field_value
.and_then(serde_json::Value::as_f64)
.unwrap_or(0.0);
JsonValue::Number(serde_json::Number::from_f64(current_sum + value_to_add).unwrap())
}
"avg" => {
let sum_key = format!("{state_key}_sum");
let count_key = format!("{state_key}_count");
let current_sum = self
.state
.get_state(&sum_key)
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let current_count = self
.state
.get_state(&count_key)
.and_then(|v| v.as_u64())
.unwrap_or(0);
let value_to_add = field_value
.and_then(serde_json::Value::as_f64)
.unwrap_or(0.0);
let new_sum = current_sum + value_to_add;
let new_count = current_count + 1;
self.state.set_state(
sum_key,
JsonValue::Number(serde_json::Number::from_f64(new_sum).unwrap()),
);
self.state
.set_state(count_key, JsonValue::Number(new_count.into()));
let avg = new_sum / new_count as f64;
JsonValue::Number(serde_json::Number::from_f64(avg).unwrap())
}
"min" => {
let current_min = current.and_then(|v| v.as_f64());
let new_val = field_value.and_then(serde_json::Value::as_f64);
match (current_min, new_val) {
(Some(curr), Some(new)) => {
JsonValue::Number(serde_json::Number::from_f64(curr.min(new)).unwrap())
}
(None, Some(new)) => {
JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
}
(Some(curr), None) => {
JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
}
(None, None) => JsonValue::Null,
}
}
"max" => {
let current_max = current.and_then(|v| v.as_f64());
let new_val = field_value.and_then(serde_json::Value::as_f64);
match (current_max, new_val) {
(Some(curr), Some(new)) => {
JsonValue::Number(serde_json::Number::from_f64(curr.max(new)).unwrap())
}
(None, Some(new)) => {
JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
}
(Some(curr), None) => {
JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
}
(None, None) => JsonValue::Null,
}
}
_ => {
return Err(AllSourceError::ValidationError(format!(
"Unknown reduce function: {function}"
)));
}
};
self.state.set_state(state_key.clone(), new_value.clone());
let result = serde_json::json!({
"group": group_key,
"function": function,
"value": new_value
});
Ok(Some(result))
}
fn apply_window(
&self,
config: &WindowConfig,
aggregation: &PipelineOperator,
event: &Event,
) -> Result<Option<JsonValue>> {
let window_key = format!("window_{}", self.config.id);
let now = Utc::now();
self.state
.add_to_window(&window_key, event.clone(), event.timestamp);
let cutoff = match config.window_type {
WindowType::Tumbling => now - Duration::seconds(config.size_seconds),
WindowType::Sliding => {
let slide = config.slide_seconds.unwrap_or(config.size_seconds);
now - Duration::seconds(slide)
}
WindowType::Session => {
let timeout = config.session_timeout_seconds.unwrap_or(300);
now - Duration::seconds(timeout)
}
};
self.state.evict_window(&window_key, cutoff);
let window_events = self.state.get_window(&window_key);
let mut aggregate_value = JsonValue::Null;
for window_event in &window_events {
if let Ok(Some(result)) =
self.apply_operator(aggregation, &window_event.payload, window_event)
{
aggregate_value = result;
}
}
Ok(Some(serde_json::json!({
"window_type": config.window_type,
"window_size_seconds": config.size_seconds,
"events_in_window": window_events.len(),
"aggregation": aggregate_value
})))
}
fn apply_enrich(
&self,
_source: &str,
fields: &[String],
value: &JsonValue,
) -> Result<Option<JsonValue>> {
let mut result = value.clone();
for field in fields {
let enriched_value = JsonValue::String(format!("enriched_{field}"));
self.set_field(&mut result, field, enriched_value);
}
Ok(Some(result))
}
fn apply_branch(
&self,
field: &str,
branches: &HashMap<String, String>,
value: &JsonValue,
) -> Result<Option<JsonValue>> {
let field_value = self.get_field(value, field);
if let Some(JsonValue::String(val)) = field_value
&& let Some(route) = branches.get(val)
{
let mut result = value.clone();
if let JsonValue::Object(ref mut obj) = result {
obj.insert("_route".to_string(), JsonValue::String(route.clone()));
}
return Ok(Some(result));
}
Ok(Some(value.clone()))
}
fn get_field<'a>(&self, value: &'a JsonValue, field: &str) -> Option<&'a JsonValue> {
let parts: Vec<&str> = field.split('.').collect();
let mut current = value;
for part in parts {
current = current.get(part)?;
}
Some(current)
}
fn set_field(&self, value: &mut JsonValue, field: &str, new_value: JsonValue) {
let parts: Vec<&str> = field.split('.').collect();
if parts.len() == 1 {
if let JsonValue::Object(obj) = value {
obj.insert(field.to_string(), new_value);
}
return;
}
let mut current = value;
for part in &parts[..parts.len() - 1] {
if let JsonValue::Object(obj) = current {
current = obj
.entry((*part).to_string())
.or_insert(JsonValue::Object(Default::default()));
}
}
if let JsonValue::Object(obj) = current {
obj.insert((*parts.last().unwrap()).to_string(), new_value);
}
}
pub fn stats(&self) -> PipelineStats {
self.stats.read().clone()
}
pub fn config(&self) -> &PipelineConfig {
&self.config
}
pub fn reset(&self) {
self.state.clear();
let mut stats = self.stats.write();
stats.events_processed = 0;
stats.events_filtered = 0;
stats.events_failed = 0;
stats.last_processed = None;
}
}
pub struct PipelineManager {
pipelines: Arc<DashMap<Uuid, Arc<Pipeline>>>,
metrics: Arc<MetricsRegistry>,
}
impl PipelineManager {
pub fn new() -> Self {
Self::with_metrics(MetricsRegistry::new())
}
pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
Self {
pipelines: Arc::new(DashMap::new()),
metrics,
}
}
pub fn register(&self, config: PipelineConfig) -> Uuid {
let id = config.id;
let name = config.name.clone();
let pipeline = Arc::new(Pipeline::new(config));
self.pipelines.insert(id, pipeline);
let count = self.pipelines.len();
self.metrics.pipelines_registered_total.set(count as i64);
tracing::info!("📊 Registered pipeline: {} ({})", name, id);
id
}
pub fn get(&self, id: Uuid) -> Option<Arc<Pipeline>> {
self.pipelines.get(&id).map(|entry| entry.value().clone())
}
pub fn process_event(&self, event: &Event) -> Vec<(Uuid, JsonValue)> {
let timer = self.metrics.pipeline_duration_seconds.start_timer();
let mut results = Vec::new();
for entry in self.pipelines.iter() {
let id = entry.key();
let pipeline = entry.value();
let pipeline_name = &pipeline.config().name;
let pipeline_id = id.to_string();
match pipeline.process(event) {
Ok(Some(result)) => {
self.metrics
.pipeline_events_processed
.with_label_values(&[&pipeline_id, pipeline_name])
.inc();
results.push((*id, result));
}
Ok(None) => {
}
Err(e) => {
self.metrics
.pipeline_errors_total
.with_label_values(&[pipeline_name])
.inc();
tracing::error!(
"Pipeline '{}' ({}) failed to process event: {}",
pipeline_name,
id,
e
);
}
}
}
timer.observe_duration();
results
}
pub fn list(&self) -> Vec<PipelineConfig> {
self.pipelines
.iter()
.map(|entry| entry.value().config().clone())
.collect()
}
pub fn remove(&self, id: Uuid) -> bool {
let removed = self.pipelines.remove(&id).is_some();
if removed {
let count = self.pipelines.len();
self.metrics.pipelines_registered_total.set(count as i64);
}
removed
}
pub fn all_stats(&self) -> Vec<PipelineStats> {
self.pipelines
.iter()
.map(|entry| entry.value().stats())
.collect()
}
}
impl Default for PipelineManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_filter_operator() {
let config = PipelineConfig {
id: Uuid::new_v4(),
name: "test_filter".to_string(),
description: None,
source_event_types: vec!["test".to_string()],
operators: vec![PipelineOperator::Filter {
field: "status".to_string(),
value: json!("active"),
op: "eq".to_string(),
}],
enabled: true,
output: "test_output".to_string(),
};
let pipeline = Pipeline::new(config);
let event = Event::from_strings(
"test".to_string(),
"entity1".to_string(),
"default".to_string(),
json!({"status": "active"}),
None,
)
.unwrap();
let result = pipeline.process(&event).unwrap();
assert!(result.is_some());
}
#[test]
fn test_map_operator() {
let config = PipelineConfig {
id: Uuid::new_v4(),
name: "test_map".to_string(),
description: None,
source_event_types: vec!["test".to_string()],
operators: vec![PipelineOperator::Map {
field: "name".to_string(),
transform: "uppercase".to_string(),
}],
enabled: true,
output: "test_output".to_string(),
};
let pipeline = Pipeline::new(config);
let event = Event::from_strings(
"test".to_string(),
"entity1".to_string(),
"default".to_string(),
json!({"name": "hello"}),
None,
)
.unwrap();
let result = pipeline.process(&event).unwrap().unwrap();
assert_eq!(result["name"], "HELLO");
}
#[test]
fn test_reduce_count() {
let config = PipelineConfig {
id: Uuid::new_v4(),
name: "test_reduce".to_string(),
description: None,
source_event_types: vec!["test".to_string()],
operators: vec![PipelineOperator::Reduce {
field: "value".to_string(),
function: "count".to_string(),
group_by: None,
}],
enabled: true,
output: "test_output".to_string(),
};
let pipeline = Pipeline::new(config);
for i in 0..5 {
let event = Event::from_strings(
"test".to_string(),
"entity1".to_string(),
"default".to_string(),
json!({"value": i}),
None,
)
.unwrap();
pipeline.process(&event).unwrap();
}
let result = pipeline
.process(
&Event::from_strings(
"test".to_string(),
"entity1".to_string(),
"default".to_string(),
json!({"value": 5}),
None,
)
.unwrap(),
)
.unwrap()
.unwrap();
assert_eq!(result["value"], 6);
}
}