1use crate::error::{AllSourceError, Result};
2use crate::domain::entities::Event;
3use crate::metrics::MetricsRegistry;
4use chrono::{DateTime, Duration, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use serde_json::Value as JsonValue;
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use uuid::Uuid;
11
12#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "lowercase")]
15pub enum WindowType {
16 Tumbling,
18 Sliding,
20 Session,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct WindowConfig {
27 pub window_type: WindowType,
29
30 pub size_seconds: i64,
32
33 pub slide_seconds: Option<i64>,
35
36 pub session_timeout_seconds: Option<i64>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "lowercase")]
43pub enum PipelineOperator {
44 Filter {
46 field: String,
48 value: JsonValue,
50 op: String,
52 },
53
54 Map {
56 field: String,
58 transform: String,
60 },
61
62 Reduce {
64 field: String,
66 function: String,
68 group_by: Option<String>,
70 },
71
72 Window {
74 config: WindowConfig,
76 aggregation: Box<PipelineOperator>,
78 },
79
80 Enrich {
82 source: String,
84 fields: Vec<String>,
86 },
87
88 Branch {
90 field: String,
92 branches: HashMap<String, String>,
94 },
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct PipelineConfig {
100 pub id: Uuid,
102
103 pub name: String,
105
106 pub description: Option<String>,
108
109 pub source_event_types: Vec<String>,
111
112 pub operators: Vec<PipelineOperator>,
114
115 pub enabled: bool,
117
118 pub output: String,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct PipelineStats {
125 pub pipeline_id: Uuid,
126 pub events_processed: u64,
127 pub events_filtered: u64,
128 pub events_failed: u64,
129 pub last_processed: Option<DateTime<Utc>>,
130}
131
132pub struct StatefulOperator {
134 state: Arc<RwLock<HashMap<String, JsonValue>>>,
136
137 windows: Arc<RwLock<HashMap<String, VecDeque<(DateTime<Utc>, Event)>>>>,
139}
140
141impl StatefulOperator {
142 pub fn new() -> Self {
143 Self {
144 state: Arc::new(RwLock::new(HashMap::new())),
145 windows: Arc::new(RwLock::new(HashMap::new())),
146 }
147 }
148
149 pub fn set_state(&self, key: String, value: JsonValue) {
151 self.state.write().insert(key, value);
152 }
153
154 pub fn get_state(&self, key: &str) -> Option<JsonValue> {
156 self.state.read().get(key).cloned()
157 }
158
159 pub fn add_to_window(&self, window_key: &str, event: Event, timestamp: DateTime<Utc>) {
161 let mut windows = self.windows.write();
162 windows
163 .entry(window_key.to_string())
164 .or_insert_with(VecDeque::new)
165 .push_back((timestamp, event));
166 }
167
168 pub fn get_window(&self, window_key: &str) -> Vec<Event> {
170 self.windows
171 .read()
172 .get(window_key)
173 .map(|w| w.iter().map(|(_, e)| e.clone()).collect())
174 .unwrap_or_default()
175 }
176
177 pub fn evict_window(&self, window_key: &str, cutoff: DateTime<Utc>) {
179 if let Some(window) = self.windows.write().get_mut(window_key) {
180 window.retain(|(ts, _)| *ts > cutoff);
181 }
182 }
183
184 pub fn clear(&self) {
186 self.state.write().clear();
187 self.windows.write().clear();
188 }
189}
190
191impl Default for StatefulOperator {
192 fn default() -> Self {
193 Self::new()
194 }
195}
196
197pub struct Pipeline {
199 config: PipelineConfig,
200 state: StatefulOperator,
201 stats: Arc<RwLock<PipelineStats>>,
202}
203
204impl Pipeline {
205 pub fn new(config: PipelineConfig) -> Self {
206 let stats = PipelineStats {
207 pipeline_id: config.id,
208 events_processed: 0,
209 events_filtered: 0,
210 events_failed: 0,
211 last_processed: None,
212 };
213
214 Self {
215 config,
216 state: StatefulOperator::new(),
217 stats: Arc::new(RwLock::new(stats)),
218 }
219 }
220
221 pub fn process(&self, event: &Event) -> Result<Option<JsonValue>> {
223 if !self.config.source_event_types.is_empty()
225 && !self.config.source_event_types.iter().any(|t| t == event.event_type_str())
226 {
227 return Ok(None);
228 }
229
230 if !self.config.enabled {
231 return Ok(None);
232 }
233
234 let mut current_value = event.payload.clone();
235 let mut filtered = false;
236
237 for operator in &self.config.operators {
239 match self.apply_operator(operator, ¤t_value, event) {
240 Ok(Some(result)) => {
241 current_value = result;
242 }
243 Ok(None) => {
244 filtered = true;
246 self.stats.write().events_filtered += 1;
247 break;
248 }
249 Err(e) => {
250 self.stats.write().events_failed += 1;
251 tracing::error!(
252 "Pipeline {} operator failed: {}",
253 self.config.name,
254 e
255 );
256 return Err(e);
257 }
258 }
259 }
260
261 let mut stats = self.stats.write();
263 stats.events_processed += 1;
264 stats.last_processed = Some(Utc::now());
265
266 if filtered {
267 Ok(None)
268 } else {
269 Ok(Some(current_value))
270 }
271 }
272
273 fn apply_operator(
275 &self,
276 operator: &PipelineOperator,
277 value: &JsonValue,
278 event: &Event,
279 ) -> Result<Option<JsonValue>> {
280 match operator {
281 PipelineOperator::Filter { field, value: expected, op } => {
282 self.apply_filter(field, expected, op, value)
283 }
284
285 PipelineOperator::Map { field, transform } => {
286 self.apply_map(field, transform, value)
287 }
288
289 PipelineOperator::Reduce { field, function, group_by } => {
290 self.apply_reduce(field, function, group_by.as_deref(), value, event)
291 }
292
293 PipelineOperator::Window { config, aggregation } => {
294 self.apply_window(config, aggregation, event)
295 }
296
297 PipelineOperator::Enrich { source, fields } => {
298 self.apply_enrich(source, fields, value)
299 }
300
301 PipelineOperator::Branch { field, branches } => {
302 self.apply_branch(field, branches, value)
303 }
304 }
305 }
306
307 fn apply_filter(
309 &self,
310 field: &str,
311 expected: &JsonValue,
312 op: &str,
313 value: &JsonValue,
314 ) -> Result<Option<JsonValue>> {
315 let field_value = self.get_field(value, field);
316
317 let matches = match op {
318 "eq" => field_value == Some(expected),
319 "ne" => field_value != Some(expected),
320 "gt" => {
321 if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) = (field_value.as_ref(), expected) {
322 a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
323 } else {
324 false
325 }
326 }
327 "lt" => {
328 if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) = (field_value.as_ref(), expected) {
329 a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
330 } else {
331 false
332 }
333 }
334 "contains" => {
335 if let (Some(JsonValue::String(a)), JsonValue::String(b)) = (field_value.as_ref(), expected) {
336 a.contains(b)
337 } else {
338 false
339 }
340 }
341 _ => {
342 return Err(AllSourceError::ValidationError(format!(
343 "Unknown filter operator: {}",
344 op
345 )));
346 }
347 };
348
349 if matches {
350 Ok(Some(value.clone()))
351 } else {
352 Ok(None) }
354 }
355
356 fn apply_map(
358 &self,
359 field: &str,
360 transform: &str,
361 value: &JsonValue,
362 ) -> Result<Option<JsonValue>> {
363 let mut result = value.clone();
364
365 let field_value = self.get_field(value, field);
367
368 let transformed = match transform {
369 "uppercase" => {
370 field_value
371 .and_then(|v| v.as_str())
372 .map(|s| JsonValue::String(s.to_uppercase()))
373 }
374 "lowercase" => {
375 field_value
376 .and_then(|v| v.as_str())
377 .map(|s| JsonValue::String(s.to_lowercase()))
378 }
379 "trim" => {
380 field_value
381 .and_then(|v| v.as_str())
382 .map(|s| JsonValue::String(s.trim().to_string()))
383 }
384 _ => {
385 if let Some(stripped) = transform.strip_prefix("multiply:") {
387 if let Ok(multiplier) = stripped.parse::<f64>() {
388 field_value
389 .and_then(|v| v.as_f64())
390 .map(|n| JsonValue::Number(
391 serde_json::Number::from_f64(n * multiplier).unwrap()
392 ))
393 } else {
394 None
395 }
396 } else if let Some(stripped) = transform.strip_prefix("add:") {
397 if let Ok(addend) = stripped.parse::<f64>() {
398 field_value
399 .and_then(|v| v.as_f64())
400 .map(|n| JsonValue::Number(
401 serde_json::Number::from_f64(n + addend).unwrap()
402 ))
403 } else {
404 None
405 }
406 } else {
407 None
408 }
409 }
410 };
411
412 if let Some(new_value) = transformed {
413 self.set_field(&mut result, field, new_value);
414 }
415
416 Ok(Some(result))
417 }
418
419 fn apply_reduce(
421 &self,
422 field: &str,
423 function: &str,
424 group_by: Option<&str>,
425 value: &JsonValue,
426 event: &Event,
427 ) -> Result<Option<JsonValue>> {
428 let group_key = if let Some(group_field) = group_by {
430 self.get_field(value, group_field)
431 .and_then(|v| v.as_str())
432 .unwrap_or("default")
433 .to_string()
434 } else {
435 "default".to_string()
436 };
437
438 let state_key = format!("reduce_{}_{}", function, group_key);
439
440 let current = self.state.get_state(&state_key);
442
443 let field_value = self.get_field(value, field);
445
446 let new_value = match function {
447 "count" => {
448 let count = current
449 .and_then(|v| v.as_u64())
450 .unwrap_or(0) + 1;
451 JsonValue::Number(count.into())
452 }
453 "sum" => {
454 let current_sum = current
455 .and_then(|v| v.as_f64())
456 .unwrap_or(0.0);
457 let value_to_add = field_value
458 .and_then(|v| v.as_f64())
459 .unwrap_or(0.0);
460 JsonValue::Number(
461 serde_json::Number::from_f64(current_sum + value_to_add).unwrap()
462 )
463 }
464 "avg" => {
465 let sum_key = format!("{}_sum", state_key);
467 let count_key = format!("{}_count", state_key);
468
469 let current_sum = self.state.get_state(&sum_key)
470 .and_then(|v| v.as_f64())
471 .unwrap_or(0.0);
472 let current_count = self.state.get_state(&count_key)
473 .and_then(|v| v.as_u64())
474 .unwrap_or(0);
475
476 let value_to_add = field_value
477 .and_then(|v| v.as_f64())
478 .unwrap_or(0.0);
479
480 let new_sum = current_sum + value_to_add;
481 let new_count = current_count + 1;
482
483 self.state.set_state(sum_key, JsonValue::Number(
484 serde_json::Number::from_f64(new_sum).unwrap()
485 ));
486 self.state.set_state(count_key, JsonValue::Number(new_count.into()));
487
488 let avg = new_sum / new_count as f64;
489 JsonValue::Number(serde_json::Number::from_f64(avg).unwrap())
490 }
491 "min" => {
492 let current_min = current.and_then(|v| v.as_f64());
493 let new_val = field_value.and_then(|v| v.as_f64());
494
495 match (current_min, new_val) {
496 (Some(curr), Some(new)) => JsonValue::Number(
497 serde_json::Number::from_f64(curr.min(new)).unwrap()
498 ),
499 (None, Some(new)) => JsonValue::Number(
500 serde_json::Number::from_f64(new).unwrap()
501 ),
502 (Some(curr), None) => JsonValue::Number(
503 serde_json::Number::from_f64(curr).unwrap()
504 ),
505 (None, None) => JsonValue::Null,
506 }
507 }
508 "max" => {
509 let current_max = current.and_then(|v| v.as_f64());
510 let new_val = field_value.and_then(|v| v.as_f64());
511
512 match (current_max, new_val) {
513 (Some(curr), Some(new)) => JsonValue::Number(
514 serde_json::Number::from_f64(curr.max(new)).unwrap()
515 ),
516 (None, Some(new)) => JsonValue::Number(
517 serde_json::Number::from_f64(new).unwrap()
518 ),
519 (Some(curr), None) => JsonValue::Number(
520 serde_json::Number::from_f64(curr).unwrap()
521 ),
522 (None, None) => JsonValue::Null,
523 }
524 }
525 _ => {
526 return Err(AllSourceError::ValidationError(format!(
527 "Unknown reduce function: {}",
528 function
529 )));
530 }
531 };
532
533 self.state.set_state(state_key.clone(), new_value.clone());
535
536 let result = serde_json::json!({
538 "group": group_key,
539 "function": function,
540 "value": new_value
541 });
542
543 Ok(Some(result))
544 }
545
546 fn apply_window(
548 &self,
549 config: &WindowConfig,
550 aggregation: &PipelineOperator,
551 event: &Event,
552 ) -> Result<Option<JsonValue>> {
553 let window_key = format!("window_{}", self.config.id);
554 let now = Utc::now();
555
556 self.state.add_to_window(&window_key, event.clone(), event.timestamp);
558
559 let cutoff = match config.window_type {
561 WindowType::Tumbling => now - Duration::seconds(config.size_seconds),
562 WindowType::Sliding => {
563 let slide = config.slide_seconds.unwrap_or(config.size_seconds);
564 now - Duration::seconds(slide)
565 }
566 WindowType::Session => {
567 let timeout = config.session_timeout_seconds.unwrap_or(300);
568 now - Duration::seconds(timeout)
569 }
570 };
571
572 self.state.evict_window(&window_key, cutoff);
573
574 let window_events = self.state.get_window(&window_key);
576
577 let mut aggregate_value = JsonValue::Null;
579 for window_event in &window_events {
580 if let Ok(Some(result)) = self.apply_operator(aggregation, &window_event.payload, window_event) {
581 aggregate_value = result;
582 }
583 }
584
585 Ok(Some(serde_json::json!({
586 "window_type": config.window_type,
587 "window_size_seconds": config.size_seconds,
588 "events_in_window": window_events.len(),
589 "aggregation": aggregate_value
590 })))
591 }
592
593 fn apply_enrich(
595 &self,
596 _source: &str,
597 fields: &[String],
598 value: &JsonValue,
599 ) -> Result<Option<JsonValue>> {
600 let mut result = value.clone();
603
604 for field in fields {
605 let enriched_value = JsonValue::String(format!("enriched_{}", field));
606 self.set_field(&mut result, field, enriched_value);
607 }
608
609 Ok(Some(result))
610 }
611
612 fn apply_branch(
614 &self,
615 field: &str,
616 branches: &HashMap<String, String>,
617 value: &JsonValue,
618 ) -> Result<Option<JsonValue>> {
619 let field_value = self.get_field(value, field);
620
621 if let Some(JsonValue::String(val)) = field_value {
622 if let Some(route) = branches.get(val) {
623 let mut result = value.clone();
624 if let JsonValue::Object(ref mut obj) = result {
625 obj.insert("_route".to_string(), JsonValue::String(route.clone()));
626 }
627 return Ok(Some(result));
628 }
629 }
630
631 Ok(Some(value.clone()))
632 }
633
634 fn get_field<'a>(&self, value: &'a JsonValue, field: &str) -> Option<&'a JsonValue> {
636 let parts: Vec<&str> = field.split('.').collect();
637 let mut current = value;
638
639 for part in parts {
640 current = current.get(part)?;
641 }
642
643 Some(current)
644 }
645
646 fn set_field(&self, value: &mut JsonValue, field: &str, new_value: JsonValue) {
648 let parts: Vec<&str> = field.split('.').collect();
649
650 if parts.len() == 1 {
651 if let JsonValue::Object(ref mut obj) = value {
652 obj.insert(field.to_string(), new_value);
653 }
654 return;
655 }
656
657 let mut current = value;
659 for part in &parts[..parts.len() - 1] {
660 if let JsonValue::Object(ref mut obj) = current {
661 current = obj.entry(part.to_string()).or_insert(JsonValue::Object(Default::default()));
662 }
663 }
664
665 if let JsonValue::Object(ref mut obj) = current {
667 obj.insert(parts.last().unwrap().to_string(), new_value);
668 }
669 }
670
671 pub fn stats(&self) -> PipelineStats {
673 self.stats.read().clone()
674 }
675
676 pub fn config(&self) -> &PipelineConfig {
678 &self.config
679 }
680
681 pub fn reset(&self) {
683 self.state.clear();
684 let mut stats = self.stats.write();
685 stats.events_processed = 0;
686 stats.events_filtered = 0;
687 stats.events_failed = 0;
688 stats.last_processed = None;
689 }
690}
691
692pub struct PipelineManager {
694 pipelines: Arc<RwLock<HashMap<Uuid, Arc<Pipeline>>>>,
695 metrics: Arc<MetricsRegistry>,
696}
697
698impl PipelineManager {
699 pub fn new() -> Self {
700 Self::with_metrics(MetricsRegistry::new())
701 }
702
703 pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
704 Self {
705 pipelines: Arc::new(RwLock::new(HashMap::new())),
706 metrics,
707 }
708 }
709
710 pub fn register(&self, config: PipelineConfig) -> Uuid {
712 let id = config.id;
713 let name = config.name.clone();
714 let pipeline = Arc::new(Pipeline::new(config));
715 self.pipelines.write().insert(id, pipeline);
716
717 let count = self.pipelines.read().len();
718 self.metrics.pipelines_registered_total.set(count as i64);
719
720 tracing::info!("📊 Registered pipeline: {} ({})", name, id);
721 id
722 }
723
724 pub fn get(&self, id: Uuid) -> Option<Arc<Pipeline>> {
726 self.pipelines.read().get(&id).cloned()
727 }
728
729 pub fn process_event(&self, event: &Event) -> Vec<(Uuid, JsonValue)> {
731 let timer = self.metrics.pipeline_duration_seconds.start_timer();
732
733 let pipelines = self.pipelines.read();
734 let mut results = Vec::new();
735
736 for (id, pipeline) in pipelines.iter() {
737 let pipeline_name = &pipeline.config().name;
738 let pipeline_id = id.to_string();
739
740 match pipeline.process(event) {
741 Ok(Some(result)) => {
742 self.metrics.pipeline_events_processed
743 .with_label_values(&[&pipeline_id, pipeline_name])
744 .inc();
745 results.push((*id, result));
746 }
747 Ok(None) => {
748 }
750 Err(e) => {
751 self.metrics.pipeline_errors_total
752 .with_label_values(&[pipeline_name])
753 .inc();
754 tracing::error!(
755 "Pipeline '{}' ({}) failed to process event: {}",
756 pipeline_name,
757 id,
758 e
759 );
760 }
761 }
762 }
763
764 timer.observe_duration();
765 results
766 }
767
768 pub fn list(&self) -> Vec<PipelineConfig> {
770 self.pipelines
771 .read()
772 .values()
773 .map(|p| p.config().clone())
774 .collect()
775 }
776
777 pub fn remove(&self, id: Uuid) -> bool {
779 let removed = self.pipelines.write().remove(&id).is_some();
780
781 if removed {
782 let count = self.pipelines.read().len();
783 self.metrics.pipelines_registered_total.set(count as i64);
784 }
785
786 removed
787 }
788
789 pub fn all_stats(&self) -> Vec<PipelineStats> {
791 self.pipelines
792 .read()
793 .values()
794 .map(|p| p.stats())
795 .collect()
796 }
797}
798
799impl Default for PipelineManager {
800 fn default() -> Self {
801 Self::new()
802 }
803}
804
805#[cfg(test)]
806mod tests {
807 use super::*;
808 use serde_json::json;
809
810 #[test]
811 fn test_filter_operator() {
812 let config = PipelineConfig {
813 id: Uuid::new_v4(),
814 name: "test_filter".to_string(),
815 description: None,
816 source_event_types: vec!["test".to_string()],
817 operators: vec![PipelineOperator::Filter {
818 field: "status".to_string(),
819 value: json!("active"),
820 op: "eq".to_string(),
821 }],
822 enabled: true,
823 output: "test_output".to_string(),
824 };
825
826 let pipeline = Pipeline::new(config);
827 let event = Event::from_strings(
828 "test".to_string(),
829 "entity1".to_string(),
830 "default".to_string(),
831 json!({"status": "active"}),
832 None,
833 ).unwrap();
834
835 let result = pipeline.process(&event).unwrap();
836 assert!(result.is_some());
837 }
838
839 #[test]
840 fn test_map_operator() {
841 let config = PipelineConfig {
842 id: Uuid::new_v4(),
843 name: "test_map".to_string(),
844 description: None,
845 source_event_types: vec!["test".to_string()],
846 operators: vec![PipelineOperator::Map {
847 field: "name".to_string(),
848 transform: "uppercase".to_string(),
849 }],
850 enabled: true,
851 output: "test_output".to_string(),
852 };
853
854 let pipeline = Pipeline::new(config);
855 let event = Event::from_strings(
856 "test".to_string(),
857 "entity1".to_string(),
858 "default".to_string(),
859 json!({"name": "hello"}),
860 None,
861 ).unwrap();
862
863 let result = pipeline.process(&event).unwrap().unwrap();
864 assert_eq!(result["name"], "HELLO");
865 }
866
867 #[test]
868 fn test_reduce_count() {
869 let config = PipelineConfig {
870 id: Uuid::new_v4(),
871 name: "test_reduce".to_string(),
872 description: None,
873 source_event_types: vec!["test".to_string()],
874 operators: vec![PipelineOperator::Reduce {
875 field: "value".to_string(),
876 function: "count".to_string(),
877 group_by: None,
878 }],
879 enabled: true,
880 output: "test_output".to_string(),
881 };
882
883 let pipeline = Pipeline::new(config);
884
885 for i in 0..5 {
886 let event = Event::from_strings(
887 "test".to_string(),
888 "entity1".to_string(),
889 "default".to_string(),
890 json!({"value": i}),
891 None,
892 ).unwrap();
893 pipeline.process(&event).unwrap();
894 }
895
896 let result = pipeline.process(&Event::from_strings(
897 "test".to_string(),
898 "entity1".to_string(),
899 "default".to_string(),
900 json!({"value": 5}),
901 None,
902 ).unwrap()).unwrap().unwrap();
903
904 assert_eq!(result["value"], 6);
905 }
906}