1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::infrastructure::observability::metrics::MetricsRegistry;
4use chrono::{DateTime, Duration, Utc};
5use dashmap::DashMap;
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8use serde_json::Value as JsonValue;
9use std::collections::{HashMap, VecDeque};
10use std::sync::Arc;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "lowercase")]
16pub enum WindowType {
17 Tumbling,
19 Sliding,
21 Session,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct WindowConfig {
28 pub window_type: WindowType,
30
31 pub size_seconds: i64,
33
34 pub slide_seconds: Option<i64>,
36
37 pub session_timeout_seconds: Option<i64>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(tag = "type", rename_all = "lowercase")]
44pub enum PipelineOperator {
45 Filter {
47 field: String,
49 value: JsonValue,
51 op: String,
53 },
54
55 Map {
57 field: String,
59 transform: String,
61 },
62
63 Reduce {
65 field: String,
67 function: String,
69 group_by: Option<String>,
71 },
72
73 Window {
75 config: WindowConfig,
77 aggregation: Box<PipelineOperator>,
79 },
80
81 Enrich {
83 source: String,
85 fields: Vec<String>,
87 },
88
89 Branch {
91 field: String,
93 branches: HashMap<String, String>,
95 },
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct PipelineConfig {
101 pub id: Uuid,
103
104 pub name: String,
106
107 pub description: Option<String>,
109
110 pub source_event_types: Vec<String>,
112
113 pub operators: Vec<PipelineOperator>,
115
116 pub enabled: bool,
118
119 pub output: String,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct PipelineStats {
126 pub pipeline_id: Uuid,
127 pub events_processed: u64,
128 pub events_filtered: u64,
129 pub events_failed: u64,
130 pub last_processed: Option<DateTime<Utc>>,
131}
132
133pub struct StatefulOperator {
135 state: Arc<RwLock<HashMap<String, JsonValue>>>,
137
138 windows: Arc<RwLock<HashMap<String, VecDeque<(DateTime<Utc>, Event)>>>>,
140}
141
142impl StatefulOperator {
143 pub fn new() -> Self {
144 Self {
145 state: Arc::new(RwLock::new(HashMap::new())),
146 windows: Arc::new(RwLock::new(HashMap::new())),
147 }
148 }
149
150 pub fn set_state(&self, key: String, value: JsonValue) {
152 self.state.write().insert(key, value);
153 }
154
155 pub fn get_state(&self, key: &str) -> Option<JsonValue> {
157 self.state.read().get(key).cloned()
158 }
159
160 pub fn add_to_window(&self, window_key: &str, event: Event, timestamp: DateTime<Utc>) {
162 let mut windows = self.windows.write();
163 windows
164 .entry(window_key.to_string())
165 .or_default()
166 .push_back((timestamp, event));
167 }
168
169 pub fn get_window(&self, window_key: &str) -> Vec<Event> {
171 self.windows
172 .read()
173 .get(window_key)
174 .map(|w| w.iter().map(|(_, e)| e.clone()).collect())
175 .unwrap_or_default()
176 }
177
178 pub fn evict_window(&self, window_key: &str, cutoff: DateTime<Utc>) {
180 if let Some(window) = self.windows.write().get_mut(window_key) {
181 window.retain(|(ts, _)| *ts > cutoff);
182 }
183 }
184
185 pub fn clear(&self) {
187 self.state.write().clear();
188 self.windows.write().clear();
189 }
190}
191
192impl Default for StatefulOperator {
193 fn default() -> Self {
194 Self::new()
195 }
196}
197
198pub struct Pipeline {
200 config: PipelineConfig,
201 state: StatefulOperator,
202 stats: Arc<RwLock<PipelineStats>>,
203}
204
205impl Pipeline {
206 pub fn new(config: PipelineConfig) -> Self {
207 let stats = PipelineStats {
208 pipeline_id: config.id,
209 events_processed: 0,
210 events_filtered: 0,
211 events_failed: 0,
212 last_processed: None,
213 };
214
215 Self {
216 config,
217 state: StatefulOperator::new(),
218 stats: Arc::new(RwLock::new(stats)),
219 }
220 }
221
222 pub fn process(&self, event: &Event) -> Result<Option<JsonValue>> {
224 if !self.config.source_event_types.is_empty()
226 && !self
227 .config
228 .source_event_types
229 .iter()
230 .any(|t| t == event.event_type_str())
231 {
232 return Ok(None);
233 }
234
235 if !self.config.enabled {
236 return Ok(None);
237 }
238
239 let mut current_value = event.payload.clone();
240 let mut filtered = false;
241
242 for operator in &self.config.operators {
244 match self.apply_operator(operator, ¤t_value, event) {
245 Ok(Some(result)) => {
246 current_value = result;
247 }
248 Ok(None) => {
249 filtered = true;
251 self.stats.write().events_filtered += 1;
252 break;
253 }
254 Err(e) => {
255 self.stats.write().events_failed += 1;
256 tracing::error!("Pipeline {} operator failed: {}", self.config.name, e);
257 return Err(e);
258 }
259 }
260 }
261
262 let mut stats = self.stats.write();
264 stats.events_processed += 1;
265 stats.last_processed = Some(Utc::now());
266
267 if filtered {
268 Ok(None)
269 } else {
270 Ok(Some(current_value))
271 }
272 }
273
274 fn apply_operator(
276 &self,
277 operator: &PipelineOperator,
278 value: &JsonValue,
279 event: &Event,
280 ) -> Result<Option<JsonValue>> {
281 match operator {
282 PipelineOperator::Filter {
283 field,
284 value: expected,
285 op,
286 } => self.apply_filter(field, expected, op, value),
287
288 PipelineOperator::Map { field, transform } => self.apply_map(field, transform, value),
289
290 PipelineOperator::Reduce {
291 field,
292 function,
293 group_by,
294 } => self.apply_reduce(field, function, group_by.as_deref(), value, event),
295
296 PipelineOperator::Window {
297 config,
298 aggregation,
299 } => self.apply_window(config, aggregation, event),
300
301 PipelineOperator::Enrich { source, fields } => self.apply_enrich(source, fields, value),
302
303 PipelineOperator::Branch { field, branches } => {
304 self.apply_branch(field, branches, value)
305 }
306 }
307 }
308
309 fn apply_filter(
311 &self,
312 field: &str,
313 expected: &JsonValue,
314 op: &str,
315 value: &JsonValue,
316 ) -> Result<Option<JsonValue>> {
317 let field_value = self.get_field(value, field);
318
319 let matches = match op {
320 "eq" => field_value == Some(expected),
321 "ne" => field_value != Some(expected),
322 "gt" => {
323 if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
324 (field_value.as_ref(), expected)
325 {
326 a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
327 } else {
328 false
329 }
330 }
331 "lt" => {
332 if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
333 (field_value.as_ref(), expected)
334 {
335 a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
336 } else {
337 false
338 }
339 }
340 "contains" => {
341 if let (Some(JsonValue::String(a)), JsonValue::String(b)) =
342 (field_value.as_ref(), expected)
343 {
344 a.contains(b)
345 } else {
346 false
347 }
348 }
349 _ => {
350 return Err(AllSourceError::ValidationError(format!(
351 "Unknown filter operator: {}",
352 op
353 )));
354 }
355 };
356
357 if matches {
358 Ok(Some(value.clone()))
359 } else {
360 Ok(None) }
362 }
363
364 fn apply_map(
366 &self,
367 field: &str,
368 transform: &str,
369 value: &JsonValue,
370 ) -> Result<Option<JsonValue>> {
371 let mut result = value.clone();
372
373 let field_value = self.get_field(value, field);
375
376 let transformed = match transform {
377 "uppercase" => field_value
378 .and_then(|v| v.as_str())
379 .map(|s| JsonValue::String(s.to_uppercase())),
380 "lowercase" => field_value
381 .and_then(|v| v.as_str())
382 .map(|s| JsonValue::String(s.to_lowercase())),
383 "trim" => field_value
384 .and_then(|v| v.as_str())
385 .map(|s| JsonValue::String(s.trim().to_string())),
386 _ => {
387 if let Some(stripped) = transform.strip_prefix("multiply:") {
389 if let Ok(multiplier) = stripped.parse::<f64>() {
390 field_value.and_then(|v| v.as_f64()).map(|n| {
391 JsonValue::Number(serde_json::Number::from_f64(n * multiplier).unwrap())
392 })
393 } else {
394 None
395 }
396 } else if let Some(stripped) = transform.strip_prefix("add:") {
397 if let Ok(addend) = stripped.parse::<f64>() {
398 field_value.and_then(|v| v.as_f64()).map(|n| {
399 JsonValue::Number(serde_json::Number::from_f64(n + addend).unwrap())
400 })
401 } else {
402 None
403 }
404 } else {
405 None
406 }
407 }
408 };
409
410 if let Some(new_value) = transformed {
411 self.set_field(&mut result, field, new_value);
412 }
413
414 Ok(Some(result))
415 }
416
417 fn apply_reduce(
419 &self,
420 field: &str,
421 function: &str,
422 group_by: Option<&str>,
423 value: &JsonValue,
424 event: &Event,
425 ) -> Result<Option<JsonValue>> {
426 let group_key = if let Some(group_field) = group_by {
428 self.get_field(value, group_field)
429 .and_then(|v| v.as_str())
430 .unwrap_or("default")
431 .to_string()
432 } else {
433 "default".to_string()
434 };
435
436 let state_key = format!("reduce_{}_{}", function, group_key);
437
438 let current = self.state.get_state(&state_key);
440
441 let field_value = self.get_field(value, field);
443
444 let new_value = match function {
445 "count" => {
446 let count = current.and_then(|v| v.as_u64()).unwrap_or(0) + 1;
447 JsonValue::Number(count.into())
448 }
449 "sum" => {
450 let current_sum = current.and_then(|v| v.as_f64()).unwrap_or(0.0);
451 let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
452 JsonValue::Number(serde_json::Number::from_f64(current_sum + value_to_add).unwrap())
453 }
454 "avg" => {
455 let sum_key = format!("{}_sum", state_key);
457 let count_key = format!("{}_count", state_key);
458
459 let current_sum = self
460 .state
461 .get_state(&sum_key)
462 .and_then(|v| v.as_f64())
463 .unwrap_or(0.0);
464 let current_count = self
465 .state
466 .get_state(&count_key)
467 .and_then(|v| v.as_u64())
468 .unwrap_or(0);
469
470 let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
471
472 let new_sum = current_sum + value_to_add;
473 let new_count = current_count + 1;
474
475 self.state.set_state(
476 sum_key,
477 JsonValue::Number(serde_json::Number::from_f64(new_sum).unwrap()),
478 );
479 self.state
480 .set_state(count_key, JsonValue::Number(new_count.into()));
481
482 let avg = new_sum / new_count as f64;
483 JsonValue::Number(serde_json::Number::from_f64(avg).unwrap())
484 }
485 "min" => {
486 let current_min = current.and_then(|v| v.as_f64());
487 let new_val = field_value.and_then(|v| v.as_f64());
488
489 match (current_min, new_val) {
490 (Some(curr), Some(new)) => {
491 JsonValue::Number(serde_json::Number::from_f64(curr.min(new)).unwrap())
492 }
493 (None, Some(new)) => {
494 JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
495 }
496 (Some(curr), None) => {
497 JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
498 }
499 (None, None) => JsonValue::Null,
500 }
501 }
502 "max" => {
503 let current_max = current.and_then(|v| v.as_f64());
504 let new_val = field_value.and_then(|v| v.as_f64());
505
506 match (current_max, new_val) {
507 (Some(curr), Some(new)) => {
508 JsonValue::Number(serde_json::Number::from_f64(curr.max(new)).unwrap())
509 }
510 (None, Some(new)) => {
511 JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
512 }
513 (Some(curr), None) => {
514 JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
515 }
516 (None, None) => JsonValue::Null,
517 }
518 }
519 _ => {
520 return Err(AllSourceError::ValidationError(format!(
521 "Unknown reduce function: {}",
522 function
523 )));
524 }
525 };
526
527 self.state.set_state(state_key.clone(), new_value.clone());
529
530 let result = serde_json::json!({
532 "group": group_key,
533 "function": function,
534 "value": new_value
535 });
536
537 Ok(Some(result))
538 }
539
540 fn apply_window(
542 &self,
543 config: &WindowConfig,
544 aggregation: &PipelineOperator,
545 event: &Event,
546 ) -> Result<Option<JsonValue>> {
547 let window_key = format!("window_{}", self.config.id);
548 let now = Utc::now();
549
550 self.state
552 .add_to_window(&window_key, event.clone(), event.timestamp);
553
554 let cutoff = match config.window_type {
556 WindowType::Tumbling => now - Duration::seconds(config.size_seconds),
557 WindowType::Sliding => {
558 let slide = config.slide_seconds.unwrap_or(config.size_seconds);
559 now - Duration::seconds(slide)
560 }
561 WindowType::Session => {
562 let timeout = config.session_timeout_seconds.unwrap_or(300);
563 now - Duration::seconds(timeout)
564 }
565 };
566
567 self.state.evict_window(&window_key, cutoff);
568
569 let window_events = self.state.get_window(&window_key);
571
572 let mut aggregate_value = JsonValue::Null;
574 for window_event in &window_events {
575 if let Ok(Some(result)) =
576 self.apply_operator(aggregation, &window_event.payload, window_event)
577 {
578 aggregate_value = result;
579 }
580 }
581
582 Ok(Some(serde_json::json!({
583 "window_type": config.window_type,
584 "window_size_seconds": config.size_seconds,
585 "events_in_window": window_events.len(),
586 "aggregation": aggregate_value
587 })))
588 }
589
590 fn apply_enrich(
592 &self,
593 _source: &str,
594 fields: &[String],
595 value: &JsonValue,
596 ) -> Result<Option<JsonValue>> {
597 let mut result = value.clone();
600
601 for field in fields {
602 let enriched_value = JsonValue::String(format!("enriched_{}", field));
603 self.set_field(&mut result, field, enriched_value);
604 }
605
606 Ok(Some(result))
607 }
608
609 fn apply_branch(
611 &self,
612 field: &str,
613 branches: &HashMap<String, String>,
614 value: &JsonValue,
615 ) -> Result<Option<JsonValue>> {
616 let field_value = self.get_field(value, field);
617
618 if let Some(JsonValue::String(val)) = field_value {
619 if let Some(route) = branches.get(val) {
620 let mut result = value.clone();
621 if let JsonValue::Object(ref mut obj) = result {
622 obj.insert("_route".to_string(), JsonValue::String(route.clone()));
623 }
624 return Ok(Some(result));
625 }
626 }
627
628 Ok(Some(value.clone()))
629 }
630
631 fn get_field<'a>(&self, value: &'a JsonValue, field: &str) -> Option<&'a JsonValue> {
633 let parts: Vec<&str> = field.split('.').collect();
634 let mut current = value;
635
636 for part in parts {
637 current = current.get(part)?;
638 }
639
640 Some(current)
641 }
642
643 fn set_field(&self, value: &mut JsonValue, field: &str, new_value: JsonValue) {
645 let parts: Vec<&str> = field.split('.').collect();
646
647 if parts.len() == 1 {
648 if let JsonValue::Object(ref mut obj) = value {
649 obj.insert(field.to_string(), new_value);
650 }
651 return;
652 }
653
654 let mut current = value;
656 for part in &parts[..parts.len() - 1] {
657 if let JsonValue::Object(ref mut obj) = current {
658 current = obj
659 .entry(part.to_string())
660 .or_insert(JsonValue::Object(Default::default()));
661 }
662 }
663
664 if let JsonValue::Object(ref mut obj) = current {
666 obj.insert(parts.last().unwrap().to_string(), new_value);
667 }
668 }
669
670 pub fn stats(&self) -> PipelineStats {
672 self.stats.read().clone()
673 }
674
675 pub fn config(&self) -> &PipelineConfig {
677 &self.config
678 }
679
680 pub fn reset(&self) {
682 self.state.clear();
683 let mut stats = self.stats.write();
684 stats.events_processed = 0;
685 stats.events_filtered = 0;
686 stats.events_failed = 0;
687 stats.last_processed = None;
688 }
689}
690
691pub struct PipelineManager {
693 pipelines: Arc<DashMap<Uuid, Arc<Pipeline>>>,
695 metrics: Arc<MetricsRegistry>,
696}
697
698impl PipelineManager {
699 pub fn new() -> Self {
700 Self::with_metrics(MetricsRegistry::new())
701 }
702
703 pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
704 Self {
705 pipelines: Arc::new(DashMap::new()),
706 metrics,
707 }
708 }
709
710 pub fn register(&self, config: PipelineConfig) -> Uuid {
712 let id = config.id;
713 let name = config.name.clone();
714 let pipeline = Arc::new(Pipeline::new(config));
715 self.pipelines.insert(id, pipeline);
716
717 let count = self.pipelines.len();
718 self.metrics.pipelines_registered_total.set(count as i64);
719
720 tracing::info!("📊 Registered pipeline: {} ({})", name, id);
721 id
722 }
723
724 pub fn get(&self, id: Uuid) -> Option<Arc<Pipeline>> {
726 self.pipelines.get(&id).map(|entry| entry.value().clone())
727 }
728
729 pub fn process_event(&self, event: &Event) -> Vec<(Uuid, JsonValue)> {
731 let timer = self.metrics.pipeline_duration_seconds.start_timer();
732
733 let mut results = Vec::new();
734
735 for entry in self.pipelines.iter() {
736 let id = entry.key();
737 let pipeline = entry.value();
738 let pipeline_name = &pipeline.config().name;
739 let pipeline_id = id.to_string();
740
741 match pipeline.process(event) {
742 Ok(Some(result)) => {
743 self.metrics
744 .pipeline_events_processed
745 .with_label_values(&[&pipeline_id, pipeline_name])
746 .inc();
747 results.push((*id, result));
748 }
749 Ok(None) => {
750 }
752 Err(e) => {
753 self.metrics
754 .pipeline_errors_total
755 .with_label_values(&[pipeline_name])
756 .inc();
757 tracing::error!(
758 "Pipeline '{}' ({}) failed to process event: {}",
759 pipeline_name,
760 id,
761 e
762 );
763 }
764 }
765 }
766
767 timer.observe_duration();
768 results
769 }
770
771 pub fn list(&self) -> Vec<PipelineConfig> {
773 self.pipelines
774 .iter()
775 .map(|entry| entry.value().config().clone())
776 .collect()
777 }
778
779 pub fn remove(&self, id: Uuid) -> bool {
781 let removed = self.pipelines.remove(&id).is_some();
782
783 if removed {
784 let count = self.pipelines.len();
785 self.metrics.pipelines_registered_total.set(count as i64);
786 }
787
788 removed
789 }
790
791 pub fn all_stats(&self) -> Vec<PipelineStats> {
793 self.pipelines.iter().map(|entry| entry.value().stats()).collect()
794 }
795}
796
797impl Default for PipelineManager {
798 fn default() -> Self {
799 Self::new()
800 }
801}
802
803#[cfg(test)]
804mod tests {
805 use super::*;
806 use serde_json::json;
807
808 #[test]
809 fn test_filter_operator() {
810 let config = PipelineConfig {
811 id: Uuid::new_v4(),
812 name: "test_filter".to_string(),
813 description: None,
814 source_event_types: vec!["test".to_string()],
815 operators: vec![PipelineOperator::Filter {
816 field: "status".to_string(),
817 value: json!("active"),
818 op: "eq".to_string(),
819 }],
820 enabled: true,
821 output: "test_output".to_string(),
822 };
823
824 let pipeline = Pipeline::new(config);
825 let event = Event::from_strings(
826 "test".to_string(),
827 "entity1".to_string(),
828 "default".to_string(),
829 json!({"status": "active"}),
830 None,
831 )
832 .unwrap();
833
834 let result = pipeline.process(&event).unwrap();
835 assert!(result.is_some());
836 }
837
838 #[test]
839 fn test_map_operator() {
840 let config = PipelineConfig {
841 id: Uuid::new_v4(),
842 name: "test_map".to_string(),
843 description: None,
844 source_event_types: vec!["test".to_string()],
845 operators: vec![PipelineOperator::Map {
846 field: "name".to_string(),
847 transform: "uppercase".to_string(),
848 }],
849 enabled: true,
850 output: "test_output".to_string(),
851 };
852
853 let pipeline = Pipeline::new(config);
854 let event = Event::from_strings(
855 "test".to_string(),
856 "entity1".to_string(),
857 "default".to_string(),
858 json!({"name": "hello"}),
859 None,
860 )
861 .unwrap();
862
863 let result = pipeline.process(&event).unwrap().unwrap();
864 assert_eq!(result["name"], "HELLO");
865 }
866
867 #[test]
868 fn test_reduce_count() {
869 let config = PipelineConfig {
870 id: Uuid::new_v4(),
871 name: "test_reduce".to_string(),
872 description: None,
873 source_event_types: vec!["test".to_string()],
874 operators: vec![PipelineOperator::Reduce {
875 field: "value".to_string(),
876 function: "count".to_string(),
877 group_by: None,
878 }],
879 enabled: true,
880 output: "test_output".to_string(),
881 };
882
883 let pipeline = Pipeline::new(config);
884
885 for i in 0..5 {
886 let event = Event::from_strings(
887 "test".to_string(),
888 "entity1".to_string(),
889 "default".to_string(),
890 json!({"value": i}),
891 None,
892 )
893 .unwrap();
894 pipeline.process(&event).unwrap();
895 }
896
897 let result = pipeline
898 .process(
899 &Event::from_strings(
900 "test".to_string(),
901 "entity1".to_string(),
902 "default".to_string(),
903 json!({"value": 5}),
904 None,
905 )
906 .unwrap(),
907 )
908 .unwrap()
909 .unwrap();
910
911 assert_eq!(result["value"], 6);
912 }
913}