1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4 infrastructure::observability::metrics::MetricsRegistry,
5};
6use chrono::{DateTime, Duration, Utc};
7use dashmap::DashMap;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use serde_json::Value as JsonValue;
11use std::{
12 collections::{HashMap, VecDeque},
13 sync::Arc,
14};
15use uuid::Uuid;
16
17#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
19#[serde(rename_all = "lowercase")]
20pub enum WindowType {
21 Tumbling,
23 Sliding,
25 Session,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct WindowConfig {
32 pub window_type: WindowType,
34
35 pub size_seconds: i64,
37
38 pub slide_seconds: Option<i64>,
40
41 pub session_timeout_seconds: Option<i64>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47#[serde(tag = "type", rename_all = "lowercase")]
48pub enum PipelineOperator {
49 Filter {
51 field: String,
53 value: JsonValue,
55 op: String,
57 },
58
59 Map {
61 field: String,
63 transform: String,
65 },
66
67 Reduce {
69 field: String,
71 function: String,
73 group_by: Option<String>,
75 },
76
77 Window {
79 config: WindowConfig,
81 aggregation: Box<PipelineOperator>,
83 },
84
85 Enrich {
87 source: String,
89 fields: Vec<String>,
91 },
92
93 Branch {
95 field: String,
97 branches: HashMap<String, String>,
99 },
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct PipelineConfig {
105 pub id: Uuid,
107
108 pub name: String,
110
111 pub description: Option<String>,
113
114 pub source_event_types: Vec<String>,
116
117 pub operators: Vec<PipelineOperator>,
119
120 pub enabled: bool,
122
123 pub output: String,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct PipelineStats {
130 pub pipeline_id: Uuid,
131 pub events_processed: u64,
132 pub events_filtered: u64,
133 pub events_failed: u64,
134 pub last_processed: Option<DateTime<Utc>>,
135}
136
137pub struct StatefulOperator {
139 state: Arc<RwLock<HashMap<String, JsonValue>>>,
141
142 windows: Arc<RwLock<HashMap<String, VecDeque<(DateTime<Utc>, Event)>>>>,
144}
145
146impl StatefulOperator {
147 pub fn new() -> Self {
148 Self {
149 state: Arc::new(RwLock::new(HashMap::new())),
150 windows: Arc::new(RwLock::new(HashMap::new())),
151 }
152 }
153
154 pub fn set_state(&self, key: String, value: JsonValue) {
156 self.state.write().insert(key, value);
157 }
158
159 pub fn get_state(&self, key: &str) -> Option<JsonValue> {
161 self.state.read().get(key).cloned()
162 }
163
164 pub fn add_to_window(&self, window_key: &str, event: Event, timestamp: DateTime<Utc>) {
166 let mut windows = self.windows.write();
167 windows
168 .entry(window_key.to_string())
169 .or_default()
170 .push_back((timestamp, event));
171 }
172
173 pub fn get_window(&self, window_key: &str) -> Vec<Event> {
175 self.windows
176 .read()
177 .get(window_key)
178 .map(|w| w.iter().map(|(_, e)| e.clone()).collect())
179 .unwrap_or_default()
180 }
181
182 pub fn evict_window(&self, window_key: &str, cutoff: DateTime<Utc>) {
184 if let Some(window) = self.windows.write().get_mut(window_key) {
185 window.retain(|(ts, _)| *ts > cutoff);
186 }
187 }
188
189 pub fn clear(&self) {
191 self.state.write().clear();
192 self.windows.write().clear();
193 }
194}
195
196impl Default for StatefulOperator {
197 fn default() -> Self {
198 Self::new()
199 }
200}
201
202pub struct Pipeline {
204 config: PipelineConfig,
205 state: StatefulOperator,
206 stats: Arc<RwLock<PipelineStats>>,
207}
208
209impl Pipeline {
210 pub fn new(config: PipelineConfig) -> Self {
211 let stats = PipelineStats {
212 pipeline_id: config.id,
213 events_processed: 0,
214 events_filtered: 0,
215 events_failed: 0,
216 last_processed: None,
217 };
218
219 Self {
220 config,
221 state: StatefulOperator::new(),
222 stats: Arc::new(RwLock::new(stats)),
223 }
224 }
225
226 pub fn process(&self, event: &Event) -> Result<Option<JsonValue>> {
228 if !self.config.source_event_types.is_empty()
230 && !self
231 .config
232 .source_event_types
233 .iter()
234 .any(|t| t == event.event_type_str())
235 {
236 return Ok(None);
237 }
238
239 if !self.config.enabled {
240 return Ok(None);
241 }
242
243 let mut current_value = event.payload.clone();
244 let mut filtered = false;
245
246 for operator in &self.config.operators {
248 match self.apply_operator(operator, ¤t_value, event) {
249 Ok(Some(result)) => {
250 current_value = result;
251 }
252 Ok(None) => {
253 filtered = true;
255 self.stats.write().events_filtered += 1;
256 break;
257 }
258 Err(e) => {
259 self.stats.write().events_failed += 1;
260 tracing::error!("Pipeline {} operator failed: {}", self.config.name, e);
261 return Err(e);
262 }
263 }
264 }
265
266 let mut stats = self.stats.write();
268 stats.events_processed += 1;
269 stats.last_processed = Some(Utc::now());
270
271 if filtered {
272 Ok(None)
273 } else {
274 Ok(Some(current_value))
275 }
276 }
277
278 fn apply_operator(
280 &self,
281 operator: &PipelineOperator,
282 value: &JsonValue,
283 event: &Event,
284 ) -> Result<Option<JsonValue>> {
285 match operator {
286 PipelineOperator::Filter {
287 field,
288 value: expected,
289 op,
290 } => self.apply_filter(field, expected, op, value),
291
292 PipelineOperator::Map { field, transform } => self.apply_map(field, transform, value),
293
294 PipelineOperator::Reduce {
295 field,
296 function,
297 group_by,
298 } => self.apply_reduce(field, function, group_by.as_deref(), value, event),
299
300 PipelineOperator::Window {
301 config,
302 aggregation,
303 } => self.apply_window(config, aggregation, event),
304
305 PipelineOperator::Enrich { source, fields } => self.apply_enrich(source, fields, value),
306
307 PipelineOperator::Branch { field, branches } => {
308 self.apply_branch(field, branches, value)
309 }
310 }
311 }
312
313 fn apply_filter(
315 &self,
316 field: &str,
317 expected: &JsonValue,
318 op: &str,
319 value: &JsonValue,
320 ) -> Result<Option<JsonValue>> {
321 let field_value = self.get_field(value, field);
322
323 let matches = match op {
324 "eq" => field_value == Some(expected),
325 "ne" => field_value != Some(expected),
326 "gt" => {
327 if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
328 (field_value.as_ref(), expected)
329 {
330 a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
331 } else {
332 false
333 }
334 }
335 "lt" => {
336 if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
337 (field_value.as_ref(), expected)
338 {
339 a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
340 } else {
341 false
342 }
343 }
344 "contains" => {
345 if let (Some(JsonValue::String(a)), JsonValue::String(b)) =
346 (field_value.as_ref(), expected)
347 {
348 a.contains(b)
349 } else {
350 false
351 }
352 }
353 _ => {
354 return Err(AllSourceError::ValidationError(format!(
355 "Unknown filter operator: {op}"
356 )));
357 }
358 };
359
360 if matches {
361 Ok(Some(value.clone()))
362 } else {
363 Ok(None) }
365 }
366
367 fn apply_map(
369 &self,
370 field: &str,
371 transform: &str,
372 value: &JsonValue,
373 ) -> Result<Option<JsonValue>> {
374 let mut result = value.clone();
375
376 let field_value = self.get_field(value, field);
378
379 let transformed = match transform {
380 "uppercase" => field_value
381 .and_then(|v| v.as_str())
382 .map(|s| JsonValue::String(s.to_uppercase())),
383 "lowercase" => field_value
384 .and_then(|v| v.as_str())
385 .map(|s| JsonValue::String(s.to_lowercase())),
386 "trim" => field_value
387 .and_then(|v| v.as_str())
388 .map(|s| JsonValue::String(s.trim().to_string())),
389 _ => {
390 if let Some(stripped) = transform.strip_prefix("multiply:") {
392 if let Ok(multiplier) = stripped.parse::<f64>() {
393 field_value.and_then(serde_json::Value::as_f64).map(|n| {
394 JsonValue::Number(serde_json::Number::from_f64(n * multiplier).unwrap())
395 })
396 } else {
397 None
398 }
399 } else if let Some(stripped) = transform.strip_prefix("add:") {
400 if let Ok(addend) = stripped.parse::<f64>() {
401 field_value.and_then(serde_json::Value::as_f64).map(|n| {
402 JsonValue::Number(serde_json::Number::from_f64(n + addend).unwrap())
403 })
404 } else {
405 None
406 }
407 } else {
408 None
409 }
410 }
411 };
412
413 if let Some(new_value) = transformed {
414 self.set_field(&mut result, field, new_value);
415 }
416
417 Ok(Some(result))
418 }
419
420 fn apply_reduce(
422 &self,
423 field: &str,
424 function: &str,
425 group_by: Option<&str>,
426 value: &JsonValue,
427 event: &Event,
428 ) -> Result<Option<JsonValue>> {
429 let group_key = if let Some(group_field) = group_by {
431 self.get_field(value, group_field)
432 .and_then(|v| v.as_str())
433 .unwrap_or("default")
434 .to_string()
435 } else {
436 "default".to_string()
437 };
438
439 let state_key = format!("reduce_{function}_{group_key}");
440
441 let current = self.state.get_state(&state_key);
443
444 let field_value = self.get_field(value, field);
446
447 let new_value = match function {
448 "count" => {
449 let count = current.and_then(|v| v.as_u64()).unwrap_or(0) + 1;
450 JsonValue::Number(count.into())
451 }
452 "sum" => {
453 let current_sum = current.and_then(|v| v.as_f64()).unwrap_or(0.0);
454 let value_to_add = field_value
455 .and_then(serde_json::Value::as_f64)
456 .unwrap_or(0.0);
457 JsonValue::Number(serde_json::Number::from_f64(current_sum + value_to_add).unwrap())
458 }
459 "avg" => {
460 let sum_key = format!("{state_key}_sum");
462 let count_key = format!("{state_key}_count");
463
464 let current_sum = self
465 .state
466 .get_state(&sum_key)
467 .and_then(|v| v.as_f64())
468 .unwrap_or(0.0);
469 let current_count = self
470 .state
471 .get_state(&count_key)
472 .and_then(|v| v.as_u64())
473 .unwrap_or(0);
474
475 let value_to_add = field_value
476 .and_then(serde_json::Value::as_f64)
477 .unwrap_or(0.0);
478
479 let new_sum = current_sum + value_to_add;
480 let new_count = current_count + 1;
481
482 self.state.set_state(
483 sum_key,
484 JsonValue::Number(serde_json::Number::from_f64(new_sum).unwrap()),
485 );
486 self.state
487 .set_state(count_key, JsonValue::Number(new_count.into()));
488
489 let avg = new_sum / new_count as f64;
490 JsonValue::Number(serde_json::Number::from_f64(avg).unwrap())
491 }
492 "min" => {
493 let current_min = current.and_then(|v| v.as_f64());
494 let new_val = field_value.and_then(serde_json::Value::as_f64);
495
496 match (current_min, new_val) {
497 (Some(curr), Some(new)) => {
498 JsonValue::Number(serde_json::Number::from_f64(curr.min(new)).unwrap())
499 }
500 (None, Some(new)) => {
501 JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
502 }
503 (Some(curr), None) => {
504 JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
505 }
506 (None, None) => JsonValue::Null,
507 }
508 }
509 "max" => {
510 let current_max = current.and_then(|v| v.as_f64());
511 let new_val = field_value.and_then(serde_json::Value::as_f64);
512
513 match (current_max, new_val) {
514 (Some(curr), Some(new)) => {
515 JsonValue::Number(serde_json::Number::from_f64(curr.max(new)).unwrap())
516 }
517 (None, Some(new)) => {
518 JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
519 }
520 (Some(curr), None) => {
521 JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
522 }
523 (None, None) => JsonValue::Null,
524 }
525 }
526 _ => {
527 return Err(AllSourceError::ValidationError(format!(
528 "Unknown reduce function: {function}"
529 )));
530 }
531 };
532
533 self.state.set_state(state_key.clone(), new_value.clone());
535
536 let result = serde_json::json!({
538 "group": group_key,
539 "function": function,
540 "value": new_value
541 });
542
543 Ok(Some(result))
544 }
545
546 fn apply_window(
548 &self,
549 config: &WindowConfig,
550 aggregation: &PipelineOperator,
551 event: &Event,
552 ) -> Result<Option<JsonValue>> {
553 let window_key = format!("window_{}", self.config.id);
554 let now = Utc::now();
555
556 self.state
558 .add_to_window(&window_key, event.clone(), event.timestamp);
559
560 let cutoff = match config.window_type {
562 WindowType::Tumbling => now - Duration::seconds(config.size_seconds),
563 WindowType::Sliding => {
564 let slide = config.slide_seconds.unwrap_or(config.size_seconds);
565 now - Duration::seconds(slide)
566 }
567 WindowType::Session => {
568 let timeout = config.session_timeout_seconds.unwrap_or(300);
569 now - Duration::seconds(timeout)
570 }
571 };
572
573 self.state.evict_window(&window_key, cutoff);
574
575 let window_events = self.state.get_window(&window_key);
577
578 let mut aggregate_value = JsonValue::Null;
580 for window_event in &window_events {
581 if let Ok(Some(result)) =
582 self.apply_operator(aggregation, &window_event.payload, window_event)
583 {
584 aggregate_value = result;
585 }
586 }
587
588 Ok(Some(serde_json::json!({
589 "window_type": config.window_type,
590 "window_size_seconds": config.size_seconds,
591 "events_in_window": window_events.len(),
592 "aggregation": aggregate_value
593 })))
594 }
595
596 fn apply_enrich(
598 &self,
599 _source: &str,
600 fields: &[String],
601 value: &JsonValue,
602 ) -> Result<Option<JsonValue>> {
603 let mut result = value.clone();
606
607 for field in fields {
608 let enriched_value = JsonValue::String(format!("enriched_{field}"));
609 self.set_field(&mut result, field, enriched_value);
610 }
611
612 Ok(Some(result))
613 }
614
615 fn apply_branch(
617 &self,
618 field: &str,
619 branches: &HashMap<String, String>,
620 value: &JsonValue,
621 ) -> Result<Option<JsonValue>> {
622 let field_value = self.get_field(value, field);
623
624 if let Some(JsonValue::String(val)) = field_value
625 && let Some(route) = branches.get(val)
626 {
627 let mut result = value.clone();
628 if let JsonValue::Object(ref mut obj) = result {
629 obj.insert("_route".to_string(), JsonValue::String(route.clone()));
630 }
631 return Ok(Some(result));
632 }
633
634 Ok(Some(value.clone()))
635 }
636
637 fn get_field<'a>(&self, value: &'a JsonValue, field: &str) -> Option<&'a JsonValue> {
639 let parts: Vec<&str> = field.split('.').collect();
640 let mut current = value;
641
642 for part in parts {
643 current = current.get(part)?;
644 }
645
646 Some(current)
647 }
648
649 fn set_field(&self, value: &mut JsonValue, field: &str, new_value: JsonValue) {
651 let parts: Vec<&str> = field.split('.').collect();
652
653 if parts.len() == 1 {
654 if let JsonValue::Object(obj) = value {
655 obj.insert(field.to_string(), new_value);
656 }
657 return;
658 }
659
660 let mut current = value;
662 for part in &parts[..parts.len() - 1] {
663 if let JsonValue::Object(obj) = current {
664 current = obj
665 .entry((*part).to_string())
666 .or_insert(JsonValue::Object(Default::default()));
667 }
668 }
669
670 if let JsonValue::Object(obj) = current {
672 obj.insert((*parts.last().unwrap()).to_string(), new_value);
673 }
674 }
675
676 pub fn stats(&self) -> PipelineStats {
678 self.stats.read().clone()
679 }
680
681 pub fn config(&self) -> &PipelineConfig {
683 &self.config
684 }
685
686 pub fn reset(&self) {
688 self.state.clear();
689 let mut stats = self.stats.write();
690 stats.events_processed = 0;
691 stats.events_filtered = 0;
692 stats.events_failed = 0;
693 stats.last_processed = None;
694 }
695}
696
697pub struct PipelineManager {
699 pipelines: Arc<DashMap<Uuid, Arc<Pipeline>>>,
701 metrics: Arc<MetricsRegistry>,
702}
703
704impl PipelineManager {
705 pub fn new() -> Self {
706 Self::with_metrics(MetricsRegistry::new())
707 }
708
709 pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
710 Self {
711 pipelines: Arc::new(DashMap::new()),
712 metrics,
713 }
714 }
715
716 pub fn register(&self, config: PipelineConfig) -> Uuid {
718 let id = config.id;
719 let name = config.name.clone();
720 let pipeline = Arc::new(Pipeline::new(config));
721 self.pipelines.insert(id, pipeline);
722
723 let count = self.pipelines.len();
724 self.metrics.pipelines_registered_total.set(count as i64);
725
726 tracing::info!("📊 Registered pipeline: {} ({})", name, id);
727 id
728 }
729
730 pub fn get(&self, id: Uuid) -> Option<Arc<Pipeline>> {
732 self.pipelines.get(&id).map(|entry| entry.value().clone())
733 }
734
735 pub fn process_event(&self, event: &Event) -> Vec<(Uuid, JsonValue)> {
737 let timer = self.metrics.pipeline_duration_seconds.start_timer();
738
739 let mut results = Vec::new();
740
741 for entry in self.pipelines.iter() {
742 let id = entry.key();
743 let pipeline = entry.value();
744 let pipeline_name = &pipeline.config().name;
745 let pipeline_id = id.to_string();
746
747 match pipeline.process(event) {
748 Ok(Some(result)) => {
749 self.metrics
750 .pipeline_events_processed
751 .with_label_values(&[&pipeline_id, pipeline_name])
752 .inc();
753 results.push((*id, result));
754 }
755 Ok(None) => {
756 }
758 Err(e) => {
759 self.metrics
760 .pipeline_errors_total
761 .with_label_values(&[pipeline_name])
762 .inc();
763 tracing::error!(
764 "Pipeline '{}' ({}) failed to process event: {}",
765 pipeline_name,
766 id,
767 e
768 );
769 }
770 }
771 }
772
773 timer.observe_duration();
774 results
775 }
776
777 pub fn list(&self) -> Vec<PipelineConfig> {
779 self.pipelines
780 .iter()
781 .map(|entry| entry.value().config().clone())
782 .collect()
783 }
784
785 pub fn remove(&self, id: Uuid) -> bool {
787 let removed = self.pipelines.remove(&id).is_some();
788
789 if removed {
790 let count = self.pipelines.len();
791 self.metrics.pipelines_registered_total.set(count as i64);
792 }
793
794 removed
795 }
796
797 pub fn all_stats(&self) -> Vec<PipelineStats> {
799 self.pipelines
800 .iter()
801 .map(|entry| entry.value().stats())
802 .collect()
803 }
804}
805
806impl Default for PipelineManager {
807 fn default() -> Self {
808 Self::new()
809 }
810}
811
812#[cfg(test)]
813mod tests {
814 use super::*;
815 use serde_json::json;
816
817 #[test]
818 fn test_filter_operator() {
819 let config = PipelineConfig {
820 id: Uuid::new_v4(),
821 name: "test_filter".to_string(),
822 description: None,
823 source_event_types: vec!["test".to_string()],
824 operators: vec![PipelineOperator::Filter {
825 field: "status".to_string(),
826 value: json!("active"),
827 op: "eq".to_string(),
828 }],
829 enabled: true,
830 output: "test_output".to_string(),
831 };
832
833 let pipeline = Pipeline::new(config);
834 let event = Event::from_strings(
835 "test".to_string(),
836 "entity1".to_string(),
837 "default".to_string(),
838 json!({"status": "active"}),
839 None,
840 )
841 .unwrap();
842
843 let result = pipeline.process(&event).unwrap();
844 assert!(result.is_some());
845 }
846
847 #[test]
848 fn test_map_operator() {
849 let config = PipelineConfig {
850 id: Uuid::new_v4(),
851 name: "test_map".to_string(),
852 description: None,
853 source_event_types: vec!["test".to_string()],
854 operators: vec![PipelineOperator::Map {
855 field: "name".to_string(),
856 transform: "uppercase".to_string(),
857 }],
858 enabled: true,
859 output: "test_output".to_string(),
860 };
861
862 let pipeline = Pipeline::new(config);
863 let event = Event::from_strings(
864 "test".to_string(),
865 "entity1".to_string(),
866 "default".to_string(),
867 json!({"name": "hello"}),
868 None,
869 )
870 .unwrap();
871
872 let result = pipeline.process(&event).unwrap().unwrap();
873 assert_eq!(result["name"], "HELLO");
874 }
875
876 #[test]
877 fn test_reduce_count() {
878 let config = PipelineConfig {
879 id: Uuid::new_v4(),
880 name: "test_reduce".to_string(),
881 description: None,
882 source_event_types: vec!["test".to_string()],
883 operators: vec![PipelineOperator::Reduce {
884 field: "value".to_string(),
885 function: "count".to_string(),
886 group_by: None,
887 }],
888 enabled: true,
889 output: "test_output".to_string(),
890 };
891
892 let pipeline = Pipeline::new(config);
893
894 for i in 0..5 {
895 let event = Event::from_strings(
896 "test".to_string(),
897 "entity1".to_string(),
898 "default".to_string(),
899 json!({"value": i}),
900 None,
901 )
902 .unwrap();
903 pipeline.process(&event).unwrap();
904 }
905
906 let result = pipeline
907 .process(
908 &Event::from_strings(
909 "test".to_string(),
910 "entity1".to_string(),
911 "default".to_string(),
912 json!({"value": 5}),
913 None,
914 )
915 .unwrap(),
916 )
917 .unwrap()
918 .unwrap();
919
920 assert_eq!(result["value"], 6);
921 }
922}