1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::metrics::MetricsRegistry;
4use chrono::{DateTime, Duration, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use serde_json::Value as JsonValue;
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use uuid::Uuid;
11
12#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "lowercase")]
15pub enum WindowType {
16 Tumbling,
18 Sliding,
20 Session,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct WindowConfig {
27 pub window_type: WindowType,
29
30 pub size_seconds: i64,
32
33 pub slide_seconds: Option<i64>,
35
36 pub session_timeout_seconds: Option<i64>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "lowercase")]
43pub enum PipelineOperator {
44 Filter {
46 field: String,
48 value: JsonValue,
50 op: String,
52 },
53
54 Map {
56 field: String,
58 transform: String,
60 },
61
62 Reduce {
64 field: String,
66 function: String,
68 group_by: Option<String>,
70 },
71
72 Window {
74 config: WindowConfig,
76 aggregation: Box<PipelineOperator>,
78 },
79
80 Enrich {
82 source: String,
84 fields: Vec<String>,
86 },
87
88 Branch {
90 field: String,
92 branches: HashMap<String, String>,
94 },
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct PipelineConfig {
100 pub id: Uuid,
102
103 pub name: String,
105
106 pub description: Option<String>,
108
109 pub source_event_types: Vec<String>,
111
112 pub operators: Vec<PipelineOperator>,
114
115 pub enabled: bool,
117
118 pub output: String,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct PipelineStats {
125 pub pipeline_id: Uuid,
126 pub events_processed: u64,
127 pub events_filtered: u64,
128 pub events_failed: u64,
129 pub last_processed: Option<DateTime<Utc>>,
130}
131
132pub struct StatefulOperator {
134 state: Arc<RwLock<HashMap<String, JsonValue>>>,
136
137 windows: Arc<RwLock<HashMap<String, VecDeque<(DateTime<Utc>, Event)>>>>,
139}
140
141impl StatefulOperator {
142 pub fn new() -> Self {
143 Self {
144 state: Arc::new(RwLock::new(HashMap::new())),
145 windows: Arc::new(RwLock::new(HashMap::new())),
146 }
147 }
148
149 pub fn set_state(&self, key: String, value: JsonValue) {
151 self.state.write().insert(key, value);
152 }
153
154 pub fn get_state(&self, key: &str) -> Option<JsonValue> {
156 self.state.read().get(key).cloned()
157 }
158
159 pub fn add_to_window(&self, window_key: &str, event: Event, timestamp: DateTime<Utc>) {
161 let mut windows = self.windows.write();
162 windows
163 .entry(window_key.to_string())
164 .or_insert_with(VecDeque::new)
165 .push_back((timestamp, event));
166 }
167
168 pub fn get_window(&self, window_key: &str) -> Vec<Event> {
170 self.windows
171 .read()
172 .get(window_key)
173 .map(|w| w.iter().map(|(_, e)| e.clone()).collect())
174 .unwrap_or_default()
175 }
176
177 pub fn evict_window(&self, window_key: &str, cutoff: DateTime<Utc>) {
179 if let Some(window) = self.windows.write().get_mut(window_key) {
180 window.retain(|(ts, _)| *ts > cutoff);
181 }
182 }
183
184 pub fn clear(&self) {
186 self.state.write().clear();
187 self.windows.write().clear();
188 }
189}
190
191impl Default for StatefulOperator {
192 fn default() -> Self {
193 Self::new()
194 }
195}
196
197pub struct Pipeline {
199 config: PipelineConfig,
200 state: StatefulOperator,
201 stats: Arc<RwLock<PipelineStats>>,
202}
203
204impl Pipeline {
205 pub fn new(config: PipelineConfig) -> Self {
206 let stats = PipelineStats {
207 pipeline_id: config.id,
208 events_processed: 0,
209 events_filtered: 0,
210 events_failed: 0,
211 last_processed: None,
212 };
213
214 Self {
215 config,
216 state: StatefulOperator::new(),
217 stats: Arc::new(RwLock::new(stats)),
218 }
219 }
220
221 pub fn process(&self, event: &Event) -> Result<Option<JsonValue>> {
223 if !self.config.source_event_types.is_empty()
225 && !self
226 .config
227 .source_event_types
228 .iter()
229 .any(|t| t == event.event_type_str())
230 {
231 return Ok(None);
232 }
233
234 if !self.config.enabled {
235 return Ok(None);
236 }
237
238 let mut current_value = event.payload.clone();
239 let mut filtered = false;
240
241 for operator in &self.config.operators {
243 match self.apply_operator(operator, ¤t_value, event) {
244 Ok(Some(result)) => {
245 current_value = result;
246 }
247 Ok(None) => {
248 filtered = true;
250 self.stats.write().events_filtered += 1;
251 break;
252 }
253 Err(e) => {
254 self.stats.write().events_failed += 1;
255 tracing::error!("Pipeline {} operator failed: {}", self.config.name, e);
256 return Err(e);
257 }
258 }
259 }
260
261 let mut stats = self.stats.write();
263 stats.events_processed += 1;
264 stats.last_processed = Some(Utc::now());
265
266 if filtered {
267 Ok(None)
268 } else {
269 Ok(Some(current_value))
270 }
271 }
272
273 fn apply_operator(
275 &self,
276 operator: &PipelineOperator,
277 value: &JsonValue,
278 event: &Event,
279 ) -> Result<Option<JsonValue>> {
280 match operator {
281 PipelineOperator::Filter {
282 field,
283 value: expected,
284 op,
285 } => self.apply_filter(field, expected, op, value),
286
287 PipelineOperator::Map { field, transform } => self.apply_map(field, transform, value),
288
289 PipelineOperator::Reduce {
290 field,
291 function,
292 group_by,
293 } => self.apply_reduce(field, function, group_by.as_deref(), value, event),
294
295 PipelineOperator::Window {
296 config,
297 aggregation,
298 } => self.apply_window(config, aggregation, event),
299
300 PipelineOperator::Enrich { source, fields } => self.apply_enrich(source, fields, value),
301
302 PipelineOperator::Branch { field, branches } => {
303 self.apply_branch(field, branches, value)
304 }
305 }
306 }
307
308 fn apply_filter(
310 &self,
311 field: &str,
312 expected: &JsonValue,
313 op: &str,
314 value: &JsonValue,
315 ) -> Result<Option<JsonValue>> {
316 let field_value = self.get_field(value, field);
317
318 let matches = match op {
319 "eq" => field_value == Some(expected),
320 "ne" => field_value != Some(expected),
321 "gt" => {
322 if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
323 (field_value.as_ref(), expected)
324 {
325 a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
326 } else {
327 false
328 }
329 }
330 "lt" => {
331 if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
332 (field_value.as_ref(), expected)
333 {
334 a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
335 } else {
336 false
337 }
338 }
339 "contains" => {
340 if let (Some(JsonValue::String(a)), JsonValue::String(b)) =
341 (field_value.as_ref(), expected)
342 {
343 a.contains(b)
344 } else {
345 false
346 }
347 }
348 _ => {
349 return Err(AllSourceError::ValidationError(format!(
350 "Unknown filter operator: {}",
351 op
352 )));
353 }
354 };
355
356 if matches {
357 Ok(Some(value.clone()))
358 } else {
359 Ok(None) }
361 }
362
363 fn apply_map(
365 &self,
366 field: &str,
367 transform: &str,
368 value: &JsonValue,
369 ) -> Result<Option<JsonValue>> {
370 let mut result = value.clone();
371
372 let field_value = self.get_field(value, field);
374
375 let transformed = match transform {
376 "uppercase" => field_value
377 .and_then(|v| v.as_str())
378 .map(|s| JsonValue::String(s.to_uppercase())),
379 "lowercase" => field_value
380 .and_then(|v| v.as_str())
381 .map(|s| JsonValue::String(s.to_lowercase())),
382 "trim" => field_value
383 .and_then(|v| v.as_str())
384 .map(|s| JsonValue::String(s.trim().to_string())),
385 _ => {
386 if let Some(stripped) = transform.strip_prefix("multiply:") {
388 if let Ok(multiplier) = stripped.parse::<f64>() {
389 field_value.and_then(|v| v.as_f64()).map(|n| {
390 JsonValue::Number(serde_json::Number::from_f64(n * multiplier).unwrap())
391 })
392 } else {
393 None
394 }
395 } else if let Some(stripped) = transform.strip_prefix("add:") {
396 if let Ok(addend) = stripped.parse::<f64>() {
397 field_value.and_then(|v| v.as_f64()).map(|n| {
398 JsonValue::Number(serde_json::Number::from_f64(n + addend).unwrap())
399 })
400 } else {
401 None
402 }
403 } else {
404 None
405 }
406 }
407 };
408
409 if let Some(new_value) = transformed {
410 self.set_field(&mut result, field, new_value);
411 }
412
413 Ok(Some(result))
414 }
415
416 fn apply_reduce(
418 &self,
419 field: &str,
420 function: &str,
421 group_by: Option<&str>,
422 value: &JsonValue,
423 event: &Event,
424 ) -> Result<Option<JsonValue>> {
425 let group_key = if let Some(group_field) = group_by {
427 self.get_field(value, group_field)
428 .and_then(|v| v.as_str())
429 .unwrap_or("default")
430 .to_string()
431 } else {
432 "default".to_string()
433 };
434
435 let state_key = format!("reduce_{}_{}", function, group_key);
436
437 let current = self.state.get_state(&state_key);
439
440 let field_value = self.get_field(value, field);
442
443 let new_value = match function {
444 "count" => {
445 let count = current.and_then(|v| v.as_u64()).unwrap_or(0) + 1;
446 JsonValue::Number(count.into())
447 }
448 "sum" => {
449 let current_sum = current.and_then(|v| v.as_f64()).unwrap_or(0.0);
450 let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
451 JsonValue::Number(serde_json::Number::from_f64(current_sum + value_to_add).unwrap())
452 }
453 "avg" => {
454 let sum_key = format!("{}_sum", state_key);
456 let count_key = format!("{}_count", state_key);
457
458 let current_sum = self
459 .state
460 .get_state(&sum_key)
461 .and_then(|v| v.as_f64())
462 .unwrap_or(0.0);
463 let current_count = self
464 .state
465 .get_state(&count_key)
466 .and_then(|v| v.as_u64())
467 .unwrap_or(0);
468
469 let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
470
471 let new_sum = current_sum + value_to_add;
472 let new_count = current_count + 1;
473
474 self.state.set_state(
475 sum_key,
476 JsonValue::Number(serde_json::Number::from_f64(new_sum).unwrap()),
477 );
478 self.state
479 .set_state(count_key, JsonValue::Number(new_count.into()));
480
481 let avg = new_sum / new_count as f64;
482 JsonValue::Number(serde_json::Number::from_f64(avg).unwrap())
483 }
484 "min" => {
485 let current_min = current.and_then(|v| v.as_f64());
486 let new_val = field_value.and_then(|v| v.as_f64());
487
488 match (current_min, new_val) {
489 (Some(curr), Some(new)) => {
490 JsonValue::Number(serde_json::Number::from_f64(curr.min(new)).unwrap())
491 }
492 (None, Some(new)) => {
493 JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
494 }
495 (Some(curr), None) => {
496 JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
497 }
498 (None, None) => JsonValue::Null,
499 }
500 }
501 "max" => {
502 let current_max = current.and_then(|v| v.as_f64());
503 let new_val = field_value.and_then(|v| v.as_f64());
504
505 match (current_max, new_val) {
506 (Some(curr), Some(new)) => {
507 JsonValue::Number(serde_json::Number::from_f64(curr.max(new)).unwrap())
508 }
509 (None, Some(new)) => {
510 JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
511 }
512 (Some(curr), None) => {
513 JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
514 }
515 (None, None) => JsonValue::Null,
516 }
517 }
518 _ => {
519 return Err(AllSourceError::ValidationError(format!(
520 "Unknown reduce function: {}",
521 function
522 )));
523 }
524 };
525
526 self.state.set_state(state_key.clone(), new_value.clone());
528
529 let result = serde_json::json!({
531 "group": group_key,
532 "function": function,
533 "value": new_value
534 });
535
536 Ok(Some(result))
537 }
538
539 fn apply_window(
541 &self,
542 config: &WindowConfig,
543 aggregation: &PipelineOperator,
544 event: &Event,
545 ) -> Result<Option<JsonValue>> {
546 let window_key = format!("window_{}", self.config.id);
547 let now = Utc::now();
548
549 self.state
551 .add_to_window(&window_key, event.clone(), event.timestamp);
552
553 let cutoff = match config.window_type {
555 WindowType::Tumbling => now - Duration::seconds(config.size_seconds),
556 WindowType::Sliding => {
557 let slide = config.slide_seconds.unwrap_or(config.size_seconds);
558 now - Duration::seconds(slide)
559 }
560 WindowType::Session => {
561 let timeout = config.session_timeout_seconds.unwrap_or(300);
562 now - Duration::seconds(timeout)
563 }
564 };
565
566 self.state.evict_window(&window_key, cutoff);
567
568 let window_events = self.state.get_window(&window_key);
570
571 let mut aggregate_value = JsonValue::Null;
573 for window_event in &window_events {
574 if let Ok(Some(result)) =
575 self.apply_operator(aggregation, &window_event.payload, window_event)
576 {
577 aggregate_value = result;
578 }
579 }
580
581 Ok(Some(serde_json::json!({
582 "window_type": config.window_type,
583 "window_size_seconds": config.size_seconds,
584 "events_in_window": window_events.len(),
585 "aggregation": aggregate_value
586 })))
587 }
588
589 fn apply_enrich(
591 &self,
592 _source: &str,
593 fields: &[String],
594 value: &JsonValue,
595 ) -> Result<Option<JsonValue>> {
596 let mut result = value.clone();
599
600 for field in fields {
601 let enriched_value = JsonValue::String(format!("enriched_{}", field));
602 self.set_field(&mut result, field, enriched_value);
603 }
604
605 Ok(Some(result))
606 }
607
608 fn apply_branch(
610 &self,
611 field: &str,
612 branches: &HashMap<String, String>,
613 value: &JsonValue,
614 ) -> Result<Option<JsonValue>> {
615 let field_value = self.get_field(value, field);
616
617 if let Some(JsonValue::String(val)) = field_value {
618 if let Some(route) = branches.get(val) {
619 let mut result = value.clone();
620 if let JsonValue::Object(ref mut obj) = result {
621 obj.insert("_route".to_string(), JsonValue::String(route.clone()));
622 }
623 return Ok(Some(result));
624 }
625 }
626
627 Ok(Some(value.clone()))
628 }
629
630 fn get_field<'a>(&self, value: &'a JsonValue, field: &str) -> Option<&'a JsonValue> {
632 let parts: Vec<&str> = field.split('.').collect();
633 let mut current = value;
634
635 for part in parts {
636 current = current.get(part)?;
637 }
638
639 Some(current)
640 }
641
642 fn set_field(&self, value: &mut JsonValue, field: &str, new_value: JsonValue) {
644 let parts: Vec<&str> = field.split('.').collect();
645
646 if parts.len() == 1 {
647 if let JsonValue::Object(ref mut obj) = value {
648 obj.insert(field.to_string(), new_value);
649 }
650 return;
651 }
652
653 let mut current = value;
655 for part in &parts[..parts.len() - 1] {
656 if let JsonValue::Object(ref mut obj) = current {
657 current = obj
658 .entry(part.to_string())
659 .or_insert(JsonValue::Object(Default::default()));
660 }
661 }
662
663 if let JsonValue::Object(ref mut obj) = current {
665 obj.insert(parts.last().unwrap().to_string(), new_value);
666 }
667 }
668
669 pub fn stats(&self) -> PipelineStats {
671 self.stats.read().clone()
672 }
673
674 pub fn config(&self) -> &PipelineConfig {
676 &self.config
677 }
678
679 pub fn reset(&self) {
681 self.state.clear();
682 let mut stats = self.stats.write();
683 stats.events_processed = 0;
684 stats.events_filtered = 0;
685 stats.events_failed = 0;
686 stats.last_processed = None;
687 }
688}
689
690pub struct PipelineManager {
692 pipelines: Arc<RwLock<HashMap<Uuid, Arc<Pipeline>>>>,
693 metrics: Arc<MetricsRegistry>,
694}
695
696impl PipelineManager {
697 pub fn new() -> Self {
698 Self::with_metrics(MetricsRegistry::new())
699 }
700
701 pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
702 Self {
703 pipelines: Arc::new(RwLock::new(HashMap::new())),
704 metrics,
705 }
706 }
707
708 pub fn register(&self, config: PipelineConfig) -> Uuid {
710 let id = config.id;
711 let name = config.name.clone();
712 let pipeline = Arc::new(Pipeline::new(config));
713 self.pipelines.write().insert(id, pipeline);
714
715 let count = self.pipelines.read().len();
716 self.metrics.pipelines_registered_total.set(count as i64);
717
718 tracing::info!("📊 Registered pipeline: {} ({})", name, id);
719 id
720 }
721
722 pub fn get(&self, id: Uuid) -> Option<Arc<Pipeline>> {
724 self.pipelines.read().get(&id).cloned()
725 }
726
727 pub fn process_event(&self, event: &Event) -> Vec<(Uuid, JsonValue)> {
729 let timer = self.metrics.pipeline_duration_seconds.start_timer();
730
731 let pipelines = self.pipelines.read();
732 let mut results = Vec::new();
733
734 for (id, pipeline) in pipelines.iter() {
735 let pipeline_name = &pipeline.config().name;
736 let pipeline_id = id.to_string();
737
738 match pipeline.process(event) {
739 Ok(Some(result)) => {
740 self.metrics
741 .pipeline_events_processed
742 .with_label_values(&[&pipeline_id, pipeline_name])
743 .inc();
744 results.push((*id, result));
745 }
746 Ok(None) => {
747 }
749 Err(e) => {
750 self.metrics
751 .pipeline_errors_total
752 .with_label_values(&[pipeline_name])
753 .inc();
754 tracing::error!(
755 "Pipeline '{}' ({}) failed to process event: {}",
756 pipeline_name,
757 id,
758 e
759 );
760 }
761 }
762 }
763
764 timer.observe_duration();
765 results
766 }
767
768 pub fn list(&self) -> Vec<PipelineConfig> {
770 self.pipelines
771 .read()
772 .values()
773 .map(|p| p.config().clone())
774 .collect()
775 }
776
777 pub fn remove(&self, id: Uuid) -> bool {
779 let removed = self.pipelines.write().remove(&id).is_some();
780
781 if removed {
782 let count = self.pipelines.read().len();
783 self.metrics.pipelines_registered_total.set(count as i64);
784 }
785
786 removed
787 }
788
789 pub fn all_stats(&self) -> Vec<PipelineStats> {
791 self.pipelines.read().values().map(|p| p.stats()).collect()
792 }
793}
794
795impl Default for PipelineManager {
796 fn default() -> Self {
797 Self::new()
798 }
799}
800
801#[cfg(test)]
802mod tests {
803 use super::*;
804 use serde_json::json;
805
806 #[test]
807 fn test_filter_operator() {
808 let config = PipelineConfig {
809 id: Uuid::new_v4(),
810 name: "test_filter".to_string(),
811 description: None,
812 source_event_types: vec!["test".to_string()],
813 operators: vec![PipelineOperator::Filter {
814 field: "status".to_string(),
815 value: json!("active"),
816 op: "eq".to_string(),
817 }],
818 enabled: true,
819 output: "test_output".to_string(),
820 };
821
822 let pipeline = Pipeline::new(config);
823 let event = Event::from_strings(
824 "test".to_string(),
825 "entity1".to_string(),
826 "default".to_string(),
827 json!({"status": "active"}),
828 None,
829 )
830 .unwrap();
831
832 let result = pipeline.process(&event).unwrap();
833 assert!(result.is_some());
834 }
835
836 #[test]
837 fn test_map_operator() {
838 let config = PipelineConfig {
839 id: Uuid::new_v4(),
840 name: "test_map".to_string(),
841 description: None,
842 source_event_types: vec!["test".to_string()],
843 operators: vec![PipelineOperator::Map {
844 field: "name".to_string(),
845 transform: "uppercase".to_string(),
846 }],
847 enabled: true,
848 output: "test_output".to_string(),
849 };
850
851 let pipeline = Pipeline::new(config);
852 let event = Event::from_strings(
853 "test".to_string(),
854 "entity1".to_string(),
855 "default".to_string(),
856 json!({"name": "hello"}),
857 None,
858 )
859 .unwrap();
860
861 let result = pipeline.process(&event).unwrap().unwrap();
862 assert_eq!(result["name"], "HELLO");
863 }
864
865 #[test]
866 fn test_reduce_count() {
867 let config = PipelineConfig {
868 id: Uuid::new_v4(),
869 name: "test_reduce".to_string(),
870 description: None,
871 source_event_types: vec!["test".to_string()],
872 operators: vec![PipelineOperator::Reduce {
873 field: "value".to_string(),
874 function: "count".to_string(),
875 group_by: None,
876 }],
877 enabled: true,
878 output: "test_output".to_string(),
879 };
880
881 let pipeline = Pipeline::new(config);
882
883 for i in 0..5 {
884 let event = Event::from_strings(
885 "test".to_string(),
886 "entity1".to_string(),
887 "default".to_string(),
888 json!({"value": i}),
889 None,
890 )
891 .unwrap();
892 pipeline.process(&event).unwrap();
893 }
894
895 let result = pipeline
896 .process(
897 &Event::from_strings(
898 "test".to_string(),
899 "entity1".to_string(),
900 "default".to_string(),
901 json!({"value": 5}),
902 None,
903 )
904 .unwrap(),
905 )
906 .unwrap()
907 .unwrap();
908
909 assert_eq!(result["value"], 6);
910 }
911}