1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4 infrastructure::observability::metrics::MetricsRegistry,
5};
6use chrono::{DateTime, Duration, Utc};
7use dashmap::DashMap;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use serde_json::Value as JsonValue;
11use std::{
12 collections::{HashMap, VecDeque},
13 sync::Arc,
14};
15use uuid::Uuid;
16
17#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
19#[serde(rename_all = "lowercase")]
20pub enum WindowType {
21 Tumbling,
23 Sliding,
25 Session,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct WindowConfig {
32 pub window_type: WindowType,
34
35 pub size_seconds: i64,
37
38 pub slide_seconds: Option<i64>,
40
41 pub session_timeout_seconds: Option<i64>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47#[serde(tag = "type", rename_all = "lowercase")]
48pub enum PipelineOperator {
49 Filter {
51 field: String,
53 value: JsonValue,
55 op: String,
57 },
58
59 Map {
61 field: String,
63 transform: String,
65 },
66
67 Reduce {
69 field: String,
71 function: String,
73 group_by: Option<String>,
75 },
76
77 Window {
79 config: WindowConfig,
81 aggregation: Box<PipelineOperator>,
83 },
84
85 Enrich {
87 source: String,
89 fields: Vec<String>,
91 },
92
93 Branch {
95 field: String,
97 branches: HashMap<String, String>,
99 },
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct PipelineConfig {
105 pub id: Uuid,
107
108 pub name: String,
110
111 pub description: Option<String>,
113
114 pub source_event_types: Vec<String>,
116
117 pub operators: Vec<PipelineOperator>,
119
120 pub enabled: bool,
122
123 pub output: String,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct PipelineStats {
130 pub pipeline_id: Uuid,
131 pub events_processed: u64,
132 pub events_filtered: u64,
133 pub events_failed: u64,
134 pub last_processed: Option<DateTime<Utc>>,
135}
136
137pub struct StatefulOperator {
139 state: Arc<RwLock<HashMap<String, JsonValue>>>,
141
142 windows: Arc<RwLock<HashMap<String, VecDeque<(DateTime<Utc>, Event)>>>>,
144}
145
146impl StatefulOperator {
147 pub fn new() -> Self {
148 Self {
149 state: Arc::new(RwLock::new(HashMap::new())),
150 windows: Arc::new(RwLock::new(HashMap::new())),
151 }
152 }
153
154 pub fn set_state(&self, key: String, value: JsonValue) {
156 self.state.write().insert(key, value);
157 }
158
159 pub fn get_state(&self, key: &str) -> Option<JsonValue> {
161 self.state.read().get(key).cloned()
162 }
163
164 pub fn add_to_window(&self, window_key: &str, event: Event, timestamp: DateTime<Utc>) {
166 let mut windows = self.windows.write();
167 windows
168 .entry(window_key.to_string())
169 .or_default()
170 .push_back((timestamp, event));
171 }
172
173 pub fn get_window(&self, window_key: &str) -> Vec<Event> {
175 self.windows
176 .read()
177 .get(window_key)
178 .map(|w| w.iter().map(|(_, e)| e.clone()).collect())
179 .unwrap_or_default()
180 }
181
182 pub fn evict_window(&self, window_key: &str, cutoff: DateTime<Utc>) {
184 if let Some(window) = self.windows.write().get_mut(window_key) {
185 window.retain(|(ts, _)| *ts > cutoff);
186 }
187 }
188
189 pub fn clear(&self) {
191 self.state.write().clear();
192 self.windows.write().clear();
193 }
194}
195
196impl Default for StatefulOperator {
197 fn default() -> Self {
198 Self::new()
199 }
200}
201
202pub struct Pipeline {
204 config: PipelineConfig,
205 state: StatefulOperator,
206 stats: Arc<RwLock<PipelineStats>>,
207}
208
209impl Pipeline {
210 pub fn new(config: PipelineConfig) -> Self {
211 let stats = PipelineStats {
212 pipeline_id: config.id,
213 events_processed: 0,
214 events_filtered: 0,
215 events_failed: 0,
216 last_processed: None,
217 };
218
219 Self {
220 config,
221 state: StatefulOperator::new(),
222 stats: Arc::new(RwLock::new(stats)),
223 }
224 }
225
226 pub fn process(&self, event: &Event) -> Result<Option<JsonValue>> {
228 if !self.config.source_event_types.is_empty()
230 && !self
231 .config
232 .source_event_types
233 .iter()
234 .any(|t| t == event.event_type_str())
235 {
236 return Ok(None);
237 }
238
239 if !self.config.enabled {
240 return Ok(None);
241 }
242
243 let mut current_value = event.payload.clone();
244 let mut filtered = false;
245
246 for operator in &self.config.operators {
248 match self.apply_operator(operator, ¤t_value, event) {
249 Ok(Some(result)) => {
250 current_value = result;
251 }
252 Ok(None) => {
253 filtered = true;
255 self.stats.write().events_filtered += 1;
256 break;
257 }
258 Err(e) => {
259 self.stats.write().events_failed += 1;
260 tracing::error!("Pipeline {} operator failed: {}", self.config.name, e);
261 return Err(e);
262 }
263 }
264 }
265
266 let mut stats = self.stats.write();
268 stats.events_processed += 1;
269 stats.last_processed = Some(Utc::now());
270
271 if filtered {
272 Ok(None)
273 } else {
274 Ok(Some(current_value))
275 }
276 }
277
278 fn apply_operator(
280 &self,
281 operator: &PipelineOperator,
282 value: &JsonValue,
283 event: &Event,
284 ) -> Result<Option<JsonValue>> {
285 match operator {
286 PipelineOperator::Filter {
287 field,
288 value: expected,
289 op,
290 } => self.apply_filter(field, expected, op, value),
291
292 PipelineOperator::Map { field, transform } => self.apply_map(field, transform, value),
293
294 PipelineOperator::Reduce {
295 field,
296 function,
297 group_by,
298 } => self.apply_reduce(field, function, group_by.as_deref(), value, event),
299
300 PipelineOperator::Window {
301 config,
302 aggregation,
303 } => self.apply_window(config, aggregation, event),
304
305 PipelineOperator::Enrich { source, fields } => self.apply_enrich(source, fields, value),
306
307 PipelineOperator::Branch { field, branches } => {
308 self.apply_branch(field, branches, value)
309 }
310 }
311 }
312
313 fn apply_filter(
315 &self,
316 field: &str,
317 expected: &JsonValue,
318 op: &str,
319 value: &JsonValue,
320 ) -> Result<Option<JsonValue>> {
321 let field_value = self.get_field(value, field);
322
323 let matches = match op {
324 "eq" => field_value == Some(expected),
325 "ne" => field_value != Some(expected),
326 "gt" => {
327 if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
328 (field_value.as_ref(), expected)
329 {
330 a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
331 } else {
332 false
333 }
334 }
335 "lt" => {
336 if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
337 (field_value.as_ref(), expected)
338 {
339 a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
340 } else {
341 false
342 }
343 }
344 "contains" => {
345 if let (Some(JsonValue::String(a)), JsonValue::String(b)) =
346 (field_value.as_ref(), expected)
347 {
348 a.contains(b)
349 } else {
350 false
351 }
352 }
353 _ => {
354 return Err(AllSourceError::ValidationError(format!(
355 "Unknown filter operator: {}",
356 op
357 )));
358 }
359 };
360
361 if matches {
362 Ok(Some(value.clone()))
363 } else {
364 Ok(None) }
366 }
367
368 fn apply_map(
370 &self,
371 field: &str,
372 transform: &str,
373 value: &JsonValue,
374 ) -> Result<Option<JsonValue>> {
375 let mut result = value.clone();
376
377 let field_value = self.get_field(value, field);
379
380 let transformed = match transform {
381 "uppercase" => field_value
382 .and_then(|v| v.as_str())
383 .map(|s| JsonValue::String(s.to_uppercase())),
384 "lowercase" => field_value
385 .and_then(|v| v.as_str())
386 .map(|s| JsonValue::String(s.to_lowercase())),
387 "trim" => field_value
388 .and_then(|v| v.as_str())
389 .map(|s| JsonValue::String(s.trim().to_string())),
390 _ => {
391 if let Some(stripped) = transform.strip_prefix("multiply:") {
393 if let Ok(multiplier) = stripped.parse::<f64>() {
394 field_value.and_then(|v| v.as_f64()).map(|n| {
395 JsonValue::Number(serde_json::Number::from_f64(n * multiplier).unwrap())
396 })
397 } else {
398 None
399 }
400 } else if let Some(stripped) = transform.strip_prefix("add:") {
401 if let Ok(addend) = stripped.parse::<f64>() {
402 field_value.and_then(|v| v.as_f64()).map(|n| {
403 JsonValue::Number(serde_json::Number::from_f64(n + addend).unwrap())
404 })
405 } else {
406 None
407 }
408 } else {
409 None
410 }
411 }
412 };
413
414 if let Some(new_value) = transformed {
415 self.set_field(&mut result, field, new_value);
416 }
417
418 Ok(Some(result))
419 }
420
421 fn apply_reduce(
423 &self,
424 field: &str,
425 function: &str,
426 group_by: Option<&str>,
427 value: &JsonValue,
428 event: &Event,
429 ) -> Result<Option<JsonValue>> {
430 let group_key = if let Some(group_field) = group_by {
432 self.get_field(value, group_field)
433 .and_then(|v| v.as_str())
434 .unwrap_or("default")
435 .to_string()
436 } else {
437 "default".to_string()
438 };
439
440 let state_key = format!("reduce_{}_{}", function, group_key);
441
442 let current = self.state.get_state(&state_key);
444
445 let field_value = self.get_field(value, field);
447
448 let new_value = match function {
449 "count" => {
450 let count = current.and_then(|v| v.as_u64()).unwrap_or(0) + 1;
451 JsonValue::Number(count.into())
452 }
453 "sum" => {
454 let current_sum = current.and_then(|v| v.as_f64()).unwrap_or(0.0);
455 let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
456 JsonValue::Number(serde_json::Number::from_f64(current_sum + value_to_add).unwrap())
457 }
458 "avg" => {
459 let sum_key = format!("{}_sum", state_key);
461 let count_key = format!("{}_count", state_key);
462
463 let current_sum = self
464 .state
465 .get_state(&sum_key)
466 .and_then(|v| v.as_f64())
467 .unwrap_or(0.0);
468 let current_count = self
469 .state
470 .get_state(&count_key)
471 .and_then(|v| v.as_u64())
472 .unwrap_or(0);
473
474 let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
475
476 let new_sum = current_sum + value_to_add;
477 let new_count = current_count + 1;
478
479 self.state.set_state(
480 sum_key,
481 JsonValue::Number(serde_json::Number::from_f64(new_sum).unwrap()),
482 );
483 self.state
484 .set_state(count_key, JsonValue::Number(new_count.into()));
485
486 let avg = new_sum / new_count as f64;
487 JsonValue::Number(serde_json::Number::from_f64(avg).unwrap())
488 }
489 "min" => {
490 let current_min = current.and_then(|v| v.as_f64());
491 let new_val = field_value.and_then(|v| v.as_f64());
492
493 match (current_min, new_val) {
494 (Some(curr), Some(new)) => {
495 JsonValue::Number(serde_json::Number::from_f64(curr.min(new)).unwrap())
496 }
497 (None, Some(new)) => {
498 JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
499 }
500 (Some(curr), None) => {
501 JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
502 }
503 (None, None) => JsonValue::Null,
504 }
505 }
506 "max" => {
507 let current_max = current.and_then(|v| v.as_f64());
508 let new_val = field_value.and_then(|v| v.as_f64());
509
510 match (current_max, new_val) {
511 (Some(curr), Some(new)) => {
512 JsonValue::Number(serde_json::Number::from_f64(curr.max(new)).unwrap())
513 }
514 (None, Some(new)) => {
515 JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
516 }
517 (Some(curr), None) => {
518 JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
519 }
520 (None, None) => JsonValue::Null,
521 }
522 }
523 _ => {
524 return Err(AllSourceError::ValidationError(format!(
525 "Unknown reduce function: {}",
526 function
527 )));
528 }
529 };
530
531 self.state.set_state(state_key.clone(), new_value.clone());
533
534 let result = serde_json::json!({
536 "group": group_key,
537 "function": function,
538 "value": new_value
539 });
540
541 Ok(Some(result))
542 }
543
544 fn apply_window(
546 &self,
547 config: &WindowConfig,
548 aggregation: &PipelineOperator,
549 event: &Event,
550 ) -> Result<Option<JsonValue>> {
551 let window_key = format!("window_{}", self.config.id);
552 let now = Utc::now();
553
554 self.state
556 .add_to_window(&window_key, event.clone(), event.timestamp);
557
558 let cutoff = match config.window_type {
560 WindowType::Tumbling => now - Duration::seconds(config.size_seconds),
561 WindowType::Sliding => {
562 let slide = config.slide_seconds.unwrap_or(config.size_seconds);
563 now - Duration::seconds(slide)
564 }
565 WindowType::Session => {
566 let timeout = config.session_timeout_seconds.unwrap_or(300);
567 now - Duration::seconds(timeout)
568 }
569 };
570
571 self.state.evict_window(&window_key, cutoff);
572
573 let window_events = self.state.get_window(&window_key);
575
576 let mut aggregate_value = JsonValue::Null;
578 for window_event in &window_events {
579 if let Ok(Some(result)) =
580 self.apply_operator(aggregation, &window_event.payload, window_event)
581 {
582 aggregate_value = result;
583 }
584 }
585
586 Ok(Some(serde_json::json!({
587 "window_type": config.window_type,
588 "window_size_seconds": config.size_seconds,
589 "events_in_window": window_events.len(),
590 "aggregation": aggregate_value
591 })))
592 }
593
594 fn apply_enrich(
596 &self,
597 _source: &str,
598 fields: &[String],
599 value: &JsonValue,
600 ) -> Result<Option<JsonValue>> {
601 let mut result = value.clone();
604
605 for field in fields {
606 let enriched_value = JsonValue::String(format!("enriched_{}", field));
607 self.set_field(&mut result, field, enriched_value);
608 }
609
610 Ok(Some(result))
611 }
612
613 fn apply_branch(
615 &self,
616 field: &str,
617 branches: &HashMap<String, String>,
618 value: &JsonValue,
619 ) -> Result<Option<JsonValue>> {
620 let field_value = self.get_field(value, field);
621
622 if let Some(JsonValue::String(val)) = field_value
623 && let Some(route) = branches.get(val)
624 {
625 let mut result = value.clone();
626 if let JsonValue::Object(ref mut obj) = result {
627 obj.insert("_route".to_string(), JsonValue::String(route.clone()));
628 }
629 return Ok(Some(result));
630 }
631
632 Ok(Some(value.clone()))
633 }
634
635 fn get_field<'a>(&self, value: &'a JsonValue, field: &str) -> Option<&'a JsonValue> {
637 let parts: Vec<&str> = field.split('.').collect();
638 let mut current = value;
639
640 for part in parts {
641 current = current.get(part)?;
642 }
643
644 Some(current)
645 }
646
647 fn set_field(&self, value: &mut JsonValue, field: &str, new_value: JsonValue) {
649 let parts: Vec<&str> = field.split('.').collect();
650
651 if parts.len() == 1 {
652 if let JsonValue::Object(obj) = value {
653 obj.insert(field.to_string(), new_value);
654 }
655 return;
656 }
657
658 let mut current = value;
660 for part in &parts[..parts.len() - 1] {
661 if let JsonValue::Object(obj) = current {
662 current = obj
663 .entry(part.to_string())
664 .or_insert(JsonValue::Object(Default::default()));
665 }
666 }
667
668 if let JsonValue::Object(obj) = current {
670 obj.insert(parts.last().unwrap().to_string(), new_value);
671 }
672 }
673
674 pub fn stats(&self) -> PipelineStats {
676 self.stats.read().clone()
677 }
678
679 pub fn config(&self) -> &PipelineConfig {
681 &self.config
682 }
683
684 pub fn reset(&self) {
686 self.state.clear();
687 let mut stats = self.stats.write();
688 stats.events_processed = 0;
689 stats.events_filtered = 0;
690 stats.events_failed = 0;
691 stats.last_processed = None;
692 }
693}
694
695pub struct PipelineManager {
697 pipelines: Arc<DashMap<Uuid, Arc<Pipeline>>>,
699 metrics: Arc<MetricsRegistry>,
700}
701
702impl PipelineManager {
703 pub fn new() -> Self {
704 Self::with_metrics(MetricsRegistry::new())
705 }
706
707 pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
708 Self {
709 pipelines: Arc::new(DashMap::new()),
710 metrics,
711 }
712 }
713
714 pub fn register(&self, config: PipelineConfig) -> Uuid {
716 let id = config.id;
717 let name = config.name.clone();
718 let pipeline = Arc::new(Pipeline::new(config));
719 self.pipelines.insert(id, pipeline);
720
721 let count = self.pipelines.len();
722 self.metrics.pipelines_registered_total.set(count as i64);
723
724 tracing::info!("📊 Registered pipeline: {} ({})", name, id);
725 id
726 }
727
728 pub fn get(&self, id: Uuid) -> Option<Arc<Pipeline>> {
730 self.pipelines.get(&id).map(|entry| entry.value().clone())
731 }
732
733 pub fn process_event(&self, event: &Event) -> Vec<(Uuid, JsonValue)> {
735 let timer = self.metrics.pipeline_duration_seconds.start_timer();
736
737 let mut results = Vec::new();
738
739 for entry in self.pipelines.iter() {
740 let id = entry.key();
741 let pipeline = entry.value();
742 let pipeline_name = &pipeline.config().name;
743 let pipeline_id = id.to_string();
744
745 match pipeline.process(event) {
746 Ok(Some(result)) => {
747 self.metrics
748 .pipeline_events_processed
749 .with_label_values(&[&pipeline_id, pipeline_name])
750 .inc();
751 results.push((*id, result));
752 }
753 Ok(None) => {
754 }
756 Err(e) => {
757 self.metrics
758 .pipeline_errors_total
759 .with_label_values(&[pipeline_name])
760 .inc();
761 tracing::error!(
762 "Pipeline '{}' ({}) failed to process event: {}",
763 pipeline_name,
764 id,
765 e
766 );
767 }
768 }
769 }
770
771 timer.observe_duration();
772 results
773 }
774
775 pub fn list(&self) -> Vec<PipelineConfig> {
777 self.pipelines
778 .iter()
779 .map(|entry| entry.value().config().clone())
780 .collect()
781 }
782
783 pub fn remove(&self, id: Uuid) -> bool {
785 let removed = self.pipelines.remove(&id).is_some();
786
787 if removed {
788 let count = self.pipelines.len();
789 self.metrics.pipelines_registered_total.set(count as i64);
790 }
791
792 removed
793 }
794
795 pub fn all_stats(&self) -> Vec<PipelineStats> {
797 self.pipelines
798 .iter()
799 .map(|entry| entry.value().stats())
800 .collect()
801 }
802}
803
804impl Default for PipelineManager {
805 fn default() -> Self {
806 Self::new()
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813 use serde_json::json;
814
815 #[test]
816 fn test_filter_operator() {
817 let config = PipelineConfig {
818 id: Uuid::new_v4(),
819 name: "test_filter".to_string(),
820 description: None,
821 source_event_types: vec!["test".to_string()],
822 operators: vec![PipelineOperator::Filter {
823 field: "status".to_string(),
824 value: json!("active"),
825 op: "eq".to_string(),
826 }],
827 enabled: true,
828 output: "test_output".to_string(),
829 };
830
831 let pipeline = Pipeline::new(config);
832 let event = Event::from_strings(
833 "test".to_string(),
834 "entity1".to_string(),
835 "default".to_string(),
836 json!({"status": "active"}),
837 None,
838 )
839 .unwrap();
840
841 let result = pipeline.process(&event).unwrap();
842 assert!(result.is_some());
843 }
844
845 #[test]
846 fn test_map_operator() {
847 let config = PipelineConfig {
848 id: Uuid::new_v4(),
849 name: "test_map".to_string(),
850 description: None,
851 source_event_types: vec!["test".to_string()],
852 operators: vec![PipelineOperator::Map {
853 field: "name".to_string(),
854 transform: "uppercase".to_string(),
855 }],
856 enabled: true,
857 output: "test_output".to_string(),
858 };
859
860 let pipeline = Pipeline::new(config);
861 let event = Event::from_strings(
862 "test".to_string(),
863 "entity1".to_string(),
864 "default".to_string(),
865 json!({"name": "hello"}),
866 None,
867 )
868 .unwrap();
869
870 let result = pipeline.process(&event).unwrap().unwrap();
871 assert_eq!(result["name"], "HELLO");
872 }
873
874 #[test]
875 fn test_reduce_count() {
876 let config = PipelineConfig {
877 id: Uuid::new_v4(),
878 name: "test_reduce".to_string(),
879 description: None,
880 source_event_types: vec!["test".to_string()],
881 operators: vec![PipelineOperator::Reduce {
882 field: "value".to_string(),
883 function: "count".to_string(),
884 group_by: None,
885 }],
886 enabled: true,
887 output: "test_output".to_string(),
888 };
889
890 let pipeline = Pipeline::new(config);
891
892 for i in 0..5 {
893 let event = Event::from_strings(
894 "test".to_string(),
895 "entity1".to_string(),
896 "default".to_string(),
897 json!({"value": i}),
898 None,
899 )
900 .unwrap();
901 pipeline.process(&event).unwrap();
902 }
903
904 let result = pipeline
905 .process(
906 &Event::from_strings(
907 "test".to_string(),
908 "entity1".to_string(),
909 "default".to_string(),
910 json!({"value": 5}),
911 None,
912 )
913 .unwrap(),
914 )
915 .unwrap()
916 .unwrap();
917
918 assert_eq!(result["value"], 6);
919 }
920}