Skip to main content

allsource_core/application/services/
pipeline.rs

1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::infrastructure::observability::metrics::MetricsRegistry;
4use chrono::{DateTime, Duration, Utc};
5use dashmap::DashMap;
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8use serde_json::Value as JsonValue;
9use std::collections::{HashMap, VecDeque};
10use std::sync::Arc;
11use uuid::Uuid;
12
13/// Window type for time-based aggregations
14#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "lowercase")]
16pub enum WindowType {
17    /// Tumbling window (non-overlapping)
18    Tumbling,
19    /// Sliding window (overlapping)
20    Sliding,
21    /// Session window (activity-based)
22    Session,
23}
24
25/// Window configuration
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct WindowConfig {
28    /// Type of window
29    pub window_type: WindowType,
30
31    /// Window size in seconds
32    pub size_seconds: i64,
33
34    /// Slide interval in seconds (for sliding windows)
35    pub slide_seconds: Option<i64>,
36
37    /// Session timeout in seconds (for session windows)
38    pub session_timeout_seconds: Option<i64>,
39}
40
41/// Pipeline operator types
42#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(tag = "type", rename_all = "lowercase")]
44pub enum PipelineOperator {
45    /// Filter events based on a condition
46    Filter {
47        /// Field path to check (e.g., "payload.status")
48        field: String,
49        /// Expected value
50        value: JsonValue,
51        /// Operator: eq, ne, gt, lt, contains
52        op: String,
53    },
54
55    /// Transform event payload
56    Map {
57        /// Field to transform
58        field: String,
59        /// Transformation expression (simple for now)
60        transform: String,
61    },
62
63    /// Aggregate events
64    Reduce {
65        /// Field to aggregate
66        field: String,
67        /// Aggregation function: sum, count, avg, min, max
68        function: String,
69        /// Group by field (optional)
70        group_by: Option<String>,
71    },
72
73    /// Window-based aggregation
74    Window {
75        /// Window configuration
76        config: WindowConfig,
77        /// Aggregation to apply within window
78        aggregation: Box<PipelineOperator>,
79    },
80
81    /// Enrich event with external data
82    Enrich {
83        /// Source to enrich from
84        source: String,
85        /// Fields to add
86        fields: Vec<String>,
87    },
88
89    /// Split stream based on condition
90    Branch {
91        /// Condition field
92        field: String,
93        /// Branch mapping
94        branches: HashMap<String, String>,
95    },
96}
97
98/// Pipeline configuration
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct PipelineConfig {
101    /// Pipeline ID
102    pub id: Uuid,
103
104    /// Pipeline name
105    pub name: String,
106
107    /// Description
108    pub description: Option<String>,
109
110    /// Source event types to process
111    pub source_event_types: Vec<String>,
112
113    /// Pipeline operators in order
114    pub operators: Vec<PipelineOperator>,
115
116    /// Whether pipeline is enabled
117    pub enabled: bool,
118
119    /// Output destination (projection name or topic)
120    pub output: String,
121}
122
123/// Pipeline execution statistics
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct PipelineStats {
126    pub pipeline_id: Uuid,
127    pub events_processed: u64,
128    pub events_filtered: u64,
129    pub events_failed: u64,
130    pub last_processed: Option<DateTime<Utc>>,
131}
132
133/// Stateful operator for maintaining state across events
134pub struct StatefulOperator {
135    /// Operator state storage
136    state: Arc<RwLock<HashMap<String, JsonValue>>>,
137
138    /// Window buffers for time-based operations
139    windows: Arc<RwLock<HashMap<String, VecDeque<(DateTime<Utc>, Event)>>>>,
140}
141
142impl StatefulOperator {
143    pub fn new() -> Self {
144        Self {
145            state: Arc::new(RwLock::new(HashMap::new())),
146            windows: Arc::new(RwLock::new(HashMap::new())),
147        }
148    }
149
150    /// Store state value
151    pub fn set_state(&self, key: String, value: JsonValue) {
152        self.state.write().insert(key, value);
153    }
154
155    /// Get state value
156    pub fn get_state(&self, key: &str) -> Option<JsonValue> {
157        self.state.read().get(key).cloned()
158    }
159
160    /// Add event to window
161    pub fn add_to_window(&self, window_key: &str, event: Event, timestamp: DateTime<Utc>) {
162        let mut windows = self.windows.write();
163        windows
164            .entry(window_key.to_string())
165            .or_default()
166            .push_back((timestamp, event));
167    }
168
169    /// Get events in window
170    pub fn get_window(&self, window_key: &str) -> Vec<Event> {
171        self.windows
172            .read()
173            .get(window_key)
174            .map(|w| w.iter().map(|(_, e)| e.clone()).collect())
175            .unwrap_or_default()
176    }
177
178    /// Evict expired events from window
179    pub fn evict_window(&self, window_key: &str, cutoff: DateTime<Utc>) {
180        if let Some(window) = self.windows.write().get_mut(window_key) {
181            window.retain(|(ts, _)| *ts > cutoff);
182        }
183    }
184
185    /// Clear all state
186    pub fn clear(&self) {
187        self.state.write().clear();
188        self.windows.write().clear();
189    }
190}
191
192impl Default for StatefulOperator {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198/// Pipeline execution engine
199pub struct Pipeline {
200    config: PipelineConfig,
201    state: StatefulOperator,
202    stats: Arc<RwLock<PipelineStats>>,
203}
204
205impl Pipeline {
206    pub fn new(config: PipelineConfig) -> Self {
207        let stats = PipelineStats {
208            pipeline_id: config.id,
209            events_processed: 0,
210            events_filtered: 0,
211            events_failed: 0,
212            last_processed: None,
213        };
214
215        Self {
216            config,
217            state: StatefulOperator::new(),
218            stats: Arc::new(RwLock::new(stats)),
219        }
220    }
221
222    /// Process an event through the pipeline
223    pub fn process(&self, event: &Event) -> Result<Option<JsonValue>> {
224        // Check if event type matches source filter
225        if !self.config.source_event_types.is_empty()
226            && !self
227                .config
228                .source_event_types
229                .iter()
230                .any(|t| t == event.event_type_str())
231        {
232            return Ok(None);
233        }
234
235        if !self.config.enabled {
236            return Ok(None);
237        }
238
239        let mut current_value = event.payload.clone();
240        let mut filtered = false;
241
242        // Apply operators in sequence
243        for operator in &self.config.operators {
244            match self.apply_operator(operator, &current_value, event) {
245                Ok(Some(result)) => {
246                    current_value = result;
247                }
248                Ok(None) => {
249                    // Event was filtered out
250                    filtered = true;
251                    self.stats.write().events_filtered += 1;
252                    break;
253                }
254                Err(e) => {
255                    self.stats.write().events_failed += 1;
256                    tracing::error!("Pipeline {} operator failed: {}", self.config.name, e);
257                    return Err(e);
258                }
259            }
260        }
261
262        // Update stats
263        let mut stats = self.stats.write();
264        stats.events_processed += 1;
265        stats.last_processed = Some(Utc::now());
266
267        if filtered {
268            Ok(None)
269        } else {
270            Ok(Some(current_value))
271        }
272    }
273
274    /// Apply a single operator
275    fn apply_operator(
276        &self,
277        operator: &PipelineOperator,
278        value: &JsonValue,
279        event: &Event,
280    ) -> Result<Option<JsonValue>> {
281        match operator {
282            PipelineOperator::Filter {
283                field,
284                value: expected,
285                op,
286            } => self.apply_filter(field, expected, op, value),
287
288            PipelineOperator::Map { field, transform } => self.apply_map(field, transform, value),
289
290            PipelineOperator::Reduce {
291                field,
292                function,
293                group_by,
294            } => self.apply_reduce(field, function, group_by.as_deref(), value, event),
295
296            PipelineOperator::Window {
297                config,
298                aggregation,
299            } => self.apply_window(config, aggregation, event),
300
301            PipelineOperator::Enrich { source, fields } => self.apply_enrich(source, fields, value),
302
303            PipelineOperator::Branch { field, branches } => {
304                self.apply_branch(field, branches, value)
305            }
306        }
307    }
308
309    /// Apply filter operator
310    fn apply_filter(
311        &self,
312        field: &str,
313        expected: &JsonValue,
314        op: &str,
315        value: &JsonValue,
316    ) -> Result<Option<JsonValue>> {
317        let field_value = self.get_field(value, field);
318
319        let matches = match op {
320            "eq" => field_value == Some(expected),
321            "ne" => field_value != Some(expected),
322            "gt" => {
323                if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
324                    (field_value.as_ref(), expected)
325                {
326                    a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
327                } else {
328                    false
329                }
330            }
331            "lt" => {
332                if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
333                    (field_value.as_ref(), expected)
334                {
335                    a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
336                } else {
337                    false
338                }
339            }
340            "contains" => {
341                if let (Some(JsonValue::String(a)), JsonValue::String(b)) =
342                    (field_value.as_ref(), expected)
343                {
344                    a.contains(b)
345                } else {
346                    false
347                }
348            }
349            _ => {
350                return Err(AllSourceError::ValidationError(format!(
351                    "Unknown filter operator: {}",
352                    op
353                )));
354            }
355        };
356
357        if matches {
358            Ok(Some(value.clone()))
359        } else {
360            Ok(None) // Filtered out
361        }
362    }
363
364    /// Apply map transformation
365    fn apply_map(
366        &self,
367        field: &str,
368        transform: &str,
369        value: &JsonValue,
370    ) -> Result<Option<JsonValue>> {
371        let mut result = value.clone();
372
373        // Simple transformations
374        let field_value = self.get_field(value, field);
375
376        let transformed = match transform {
377            "uppercase" => field_value
378                .and_then(|v| v.as_str())
379                .map(|s| JsonValue::String(s.to_uppercase())),
380            "lowercase" => field_value
381                .and_then(|v| v.as_str())
382                .map(|s| JsonValue::String(s.to_lowercase())),
383            "trim" => field_value
384                .and_then(|v| v.as_str())
385                .map(|s| JsonValue::String(s.trim().to_string())),
386            _ => {
387                // Try to parse as number operation
388                if let Some(stripped) = transform.strip_prefix("multiply:") {
389                    if let Ok(multiplier) = stripped.parse::<f64>() {
390                        field_value.and_then(|v| v.as_f64()).map(|n| {
391                            JsonValue::Number(serde_json::Number::from_f64(n * multiplier).unwrap())
392                        })
393                    } else {
394                        None
395                    }
396                } else if let Some(stripped) = transform.strip_prefix("add:") {
397                    if let Ok(addend) = stripped.parse::<f64>() {
398                        field_value.and_then(|v| v.as_f64()).map(|n| {
399                            JsonValue::Number(serde_json::Number::from_f64(n + addend).unwrap())
400                        })
401                    } else {
402                        None
403                    }
404                } else {
405                    None
406                }
407            }
408        };
409
410        if let Some(new_value) = transformed {
411            self.set_field(&mut result, field, new_value);
412        }
413
414        Ok(Some(result))
415    }
416
417    /// Apply reduce aggregation
418    fn apply_reduce(
419        &self,
420        field: &str,
421        function: &str,
422        group_by: Option<&str>,
423        value: &JsonValue,
424        event: &Event,
425    ) -> Result<Option<JsonValue>> {
426        // Get group key
427        let group_key = if let Some(group_field) = group_by {
428            self.get_field(value, group_field)
429                .and_then(|v| v.as_str())
430                .unwrap_or("default")
431                .to_string()
432        } else {
433            "default".to_string()
434        };
435
436        let state_key = format!("reduce_{}_{}", function, group_key);
437
438        // Get current aggregate value
439        let current = self.state.get_state(&state_key);
440
441        // Get field value to aggregate
442        let field_value = self.get_field(value, field);
443
444        let new_value = match function {
445            "count" => {
446                let count = current.and_then(|v| v.as_u64()).unwrap_or(0) + 1;
447                JsonValue::Number(count.into())
448            }
449            "sum" => {
450                let current_sum = current.and_then(|v| v.as_f64()).unwrap_or(0.0);
451                let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
452                JsonValue::Number(serde_json::Number::from_f64(current_sum + value_to_add).unwrap())
453            }
454            "avg" => {
455                // Store sum and count separately
456                let sum_key = format!("{}_sum", state_key);
457                let count_key = format!("{}_count", state_key);
458
459                let current_sum = self
460                    .state
461                    .get_state(&sum_key)
462                    .and_then(|v| v.as_f64())
463                    .unwrap_or(0.0);
464                let current_count = self
465                    .state
466                    .get_state(&count_key)
467                    .and_then(|v| v.as_u64())
468                    .unwrap_or(0);
469
470                let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
471
472                let new_sum = current_sum + value_to_add;
473                let new_count = current_count + 1;
474
475                self.state.set_state(
476                    sum_key,
477                    JsonValue::Number(serde_json::Number::from_f64(new_sum).unwrap()),
478                );
479                self.state
480                    .set_state(count_key, JsonValue::Number(new_count.into()));
481
482                let avg = new_sum / new_count as f64;
483                JsonValue::Number(serde_json::Number::from_f64(avg).unwrap())
484            }
485            "min" => {
486                let current_min = current.and_then(|v| v.as_f64());
487                let new_val = field_value.and_then(|v| v.as_f64());
488
489                match (current_min, new_val) {
490                    (Some(curr), Some(new)) => {
491                        JsonValue::Number(serde_json::Number::from_f64(curr.min(new)).unwrap())
492                    }
493                    (None, Some(new)) => {
494                        JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
495                    }
496                    (Some(curr), None) => {
497                        JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
498                    }
499                    (None, None) => JsonValue::Null,
500                }
501            }
502            "max" => {
503                let current_max = current.and_then(|v| v.as_f64());
504                let new_val = field_value.and_then(|v| v.as_f64());
505
506                match (current_max, new_val) {
507                    (Some(curr), Some(new)) => {
508                        JsonValue::Number(serde_json::Number::from_f64(curr.max(new)).unwrap())
509                    }
510                    (None, Some(new)) => {
511                        JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
512                    }
513                    (Some(curr), None) => {
514                        JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
515                    }
516                    (None, None) => JsonValue::Null,
517                }
518            }
519            _ => {
520                return Err(AllSourceError::ValidationError(format!(
521                    "Unknown reduce function: {}",
522                    function
523                )));
524            }
525        };
526
527        // Update state
528        self.state.set_state(state_key.clone(), new_value.clone());
529
530        // Return aggregated result
531        let result = serde_json::json!({
532            "group": group_key,
533            "function": function,
534            "value": new_value
535        });
536
537        Ok(Some(result))
538    }
539
540    /// Apply window aggregation
541    fn apply_window(
542        &self,
543        config: &WindowConfig,
544        aggregation: &PipelineOperator,
545        event: &Event,
546    ) -> Result<Option<JsonValue>> {
547        let window_key = format!("window_{}", self.config.id);
548        let now = Utc::now();
549
550        // Add event to window
551        self.state
552            .add_to_window(&window_key, event.clone(), event.timestamp);
553
554        // Evict expired events based on window type
555        let cutoff = match config.window_type {
556            WindowType::Tumbling => now - Duration::seconds(config.size_seconds),
557            WindowType::Sliding => {
558                let slide = config.slide_seconds.unwrap_or(config.size_seconds);
559                now - Duration::seconds(slide)
560            }
561            WindowType::Session => {
562                let timeout = config.session_timeout_seconds.unwrap_or(300);
563                now - Duration::seconds(timeout)
564            }
565        };
566
567        self.state.evict_window(&window_key, cutoff);
568
569        // Get events in current window
570        let window_events = self.state.get_window(&window_key);
571
572        // Apply aggregation to window
573        let mut aggregate_value = JsonValue::Null;
574        for window_event in &window_events {
575            if let Ok(Some(result)) =
576                self.apply_operator(aggregation, &window_event.payload, window_event)
577            {
578                aggregate_value = result;
579            }
580        }
581
582        Ok(Some(serde_json::json!({
583            "window_type": config.window_type,
584            "window_size_seconds": config.size_seconds,
585            "events_in_window": window_events.len(),
586            "aggregation": aggregate_value
587        })))
588    }
589
590    /// Apply enrichment
591    fn apply_enrich(
592        &self,
593        _source: &str,
594        fields: &[String],
595        value: &JsonValue,
596    ) -> Result<Option<JsonValue>> {
597        // Placeholder for enrichment logic
598        // In production, this would fetch data from external sources
599        let mut result = value.clone();
600
601        for field in fields {
602            let enriched_value = JsonValue::String(format!("enriched_{}", field));
603            self.set_field(&mut result, field, enriched_value);
604        }
605
606        Ok(Some(result))
607    }
608
609    /// Apply branch routing
610    fn apply_branch(
611        &self,
612        field: &str,
613        branches: &HashMap<String, String>,
614        value: &JsonValue,
615    ) -> Result<Option<JsonValue>> {
616        let field_value = self.get_field(value, field);
617
618        if let Some(JsonValue::String(val)) = field_value {
619            if let Some(route) = branches.get(val) {
620                let mut result = value.clone();
621                if let JsonValue::Object(ref mut obj) = result {
622                    obj.insert("_route".to_string(), JsonValue::String(route.clone()));
623                }
624                return Ok(Some(result));
625            }
626        }
627
628        Ok(Some(value.clone()))
629    }
630
631    /// Helper: Get nested field from JSON
632    fn get_field<'a>(&self, value: &'a JsonValue, field: &str) -> Option<&'a JsonValue> {
633        let parts: Vec<&str> = field.split('.').collect();
634        let mut current = value;
635
636        for part in parts {
637            current = current.get(part)?;
638        }
639
640        Some(current)
641    }
642
643    /// Helper: Set nested field in JSON
644    fn set_field(&self, value: &mut JsonValue, field: &str, new_value: JsonValue) {
645        let parts: Vec<&str> = field.split('.').collect();
646
647        if parts.len() == 1 {
648            if let JsonValue::Object(ref mut obj) = value {
649                obj.insert(field.to_string(), new_value);
650            }
651            return;
652        }
653
654        // Navigate to parent
655        let mut current = value;
656        for part in &parts[..parts.len() - 1] {
657            if let JsonValue::Object(ref mut obj) = current {
658                current = obj
659                    .entry(part.to_string())
660                    .or_insert(JsonValue::Object(Default::default()));
661            }
662        }
663
664        // Set final value
665        if let JsonValue::Object(ref mut obj) = current {
666            obj.insert(parts.last().unwrap().to_string(), new_value);
667        }
668    }
669
670    /// Get pipeline statistics
671    pub fn stats(&self) -> PipelineStats {
672        self.stats.read().clone()
673    }
674
675    /// Get pipeline configuration
676    pub fn config(&self) -> &PipelineConfig {
677        &self.config
678    }
679
680    /// Reset pipeline state
681    pub fn reset(&self) {
682        self.state.clear();
683        let mut stats = self.stats.write();
684        stats.events_processed = 0;
685        stats.events_filtered = 0;
686        stats.events_failed = 0;
687        stats.last_processed = None;
688    }
689}
690
691/// Manages multiple pipelines
692pub struct PipelineManager {
693    // Using DashMap for lock-free concurrent access
694    pipelines: Arc<DashMap<Uuid, Arc<Pipeline>>>,
695    metrics: Arc<MetricsRegistry>,
696}
697
698impl PipelineManager {
699    pub fn new() -> Self {
700        Self::with_metrics(MetricsRegistry::new())
701    }
702
703    pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
704        Self {
705            pipelines: Arc::new(DashMap::new()),
706            metrics,
707        }
708    }
709
710    /// Register a new pipeline
711    pub fn register(&self, config: PipelineConfig) -> Uuid {
712        let id = config.id;
713        let name = config.name.clone();
714        let pipeline = Arc::new(Pipeline::new(config));
715        self.pipelines.insert(id, pipeline);
716
717        let count = self.pipelines.len();
718        self.metrics.pipelines_registered_total.set(count as i64);
719
720        tracing::info!("📊 Registered pipeline: {} ({})", name, id);
721        id
722    }
723
724    /// Get a pipeline by ID
725    pub fn get(&self, id: Uuid) -> Option<Arc<Pipeline>> {
726        self.pipelines.get(&id).map(|entry| entry.value().clone())
727    }
728
729    /// Process event through all matching pipelines
730    pub fn process_event(&self, event: &Event) -> Vec<(Uuid, JsonValue)> {
731        let timer = self.metrics.pipeline_duration_seconds.start_timer();
732
733        let mut results = Vec::new();
734
735        for entry in self.pipelines.iter() {
736            let id = entry.key();
737            let pipeline = entry.value();
738            let pipeline_name = &pipeline.config().name;
739            let pipeline_id = id.to_string();
740
741            match pipeline.process(event) {
742                Ok(Some(result)) => {
743                    self.metrics
744                        .pipeline_events_processed
745                        .with_label_values(&[&pipeline_id, pipeline_name])
746                        .inc();
747                    results.push((*id, result));
748                }
749                Ok(None) => {
750                    // Event filtered out or didn't match - not an error
751                }
752                Err(e) => {
753                    self.metrics
754                        .pipeline_errors_total
755                        .with_label_values(&[pipeline_name])
756                        .inc();
757                    tracing::error!(
758                        "Pipeline '{}' ({}) failed to process event: {}",
759                        pipeline_name,
760                        id,
761                        e
762                    );
763                }
764            }
765        }
766
767        timer.observe_duration();
768        results
769    }
770
771    /// List all pipelines
772    pub fn list(&self) -> Vec<PipelineConfig> {
773        self.pipelines
774            .iter()
775            .map(|entry| entry.value().config().clone())
776            .collect()
777    }
778
779    /// Remove a pipeline
780    pub fn remove(&self, id: Uuid) -> bool {
781        let removed = self.pipelines.remove(&id).is_some();
782
783        if removed {
784            let count = self.pipelines.len();
785            self.metrics.pipelines_registered_total.set(count as i64);
786        }
787
788        removed
789    }
790
791    /// Get statistics for all pipelines
792    pub fn all_stats(&self) -> Vec<PipelineStats> {
793        self.pipelines.iter().map(|entry| entry.value().stats()).collect()
794    }
795}
796
797impl Default for PipelineManager {
798    fn default() -> Self {
799        Self::new()
800    }
801}
802
803#[cfg(test)]
804mod tests {
805    use super::*;
806    use serde_json::json;
807
808    #[test]
809    fn test_filter_operator() {
810        let config = PipelineConfig {
811            id: Uuid::new_v4(),
812            name: "test_filter".to_string(),
813            description: None,
814            source_event_types: vec!["test".to_string()],
815            operators: vec![PipelineOperator::Filter {
816                field: "status".to_string(),
817                value: json!("active"),
818                op: "eq".to_string(),
819            }],
820            enabled: true,
821            output: "test_output".to_string(),
822        };
823
824        let pipeline = Pipeline::new(config);
825        let event = Event::from_strings(
826            "test".to_string(),
827            "entity1".to_string(),
828            "default".to_string(),
829            json!({"status": "active"}),
830            None,
831        )
832        .unwrap();
833
834        let result = pipeline.process(&event).unwrap();
835        assert!(result.is_some());
836    }
837
838    #[test]
839    fn test_map_operator() {
840        let config = PipelineConfig {
841            id: Uuid::new_v4(),
842            name: "test_map".to_string(),
843            description: None,
844            source_event_types: vec!["test".to_string()],
845            operators: vec![PipelineOperator::Map {
846                field: "name".to_string(),
847                transform: "uppercase".to_string(),
848            }],
849            enabled: true,
850            output: "test_output".to_string(),
851        };
852
853        let pipeline = Pipeline::new(config);
854        let event = Event::from_strings(
855            "test".to_string(),
856            "entity1".to_string(),
857            "default".to_string(),
858            json!({"name": "hello"}),
859            None,
860        )
861        .unwrap();
862
863        let result = pipeline.process(&event).unwrap().unwrap();
864        assert_eq!(result["name"], "HELLO");
865    }
866
867    #[test]
868    fn test_reduce_count() {
869        let config = PipelineConfig {
870            id: Uuid::new_v4(),
871            name: "test_reduce".to_string(),
872            description: None,
873            source_event_types: vec!["test".to_string()],
874            operators: vec![PipelineOperator::Reduce {
875                field: "value".to_string(),
876                function: "count".to_string(),
877                group_by: None,
878            }],
879            enabled: true,
880            output: "test_output".to_string(),
881        };
882
883        let pipeline = Pipeline::new(config);
884
885        for i in 0..5 {
886            let event = Event::from_strings(
887                "test".to_string(),
888                "entity1".to_string(),
889                "default".to_string(),
890                json!({"value": i}),
891                None,
892            )
893            .unwrap();
894            pipeline.process(&event).unwrap();
895        }
896
897        let result = pipeline
898            .process(
899                &Event::from_strings(
900                    "test".to_string(),
901                    "entity1".to_string(),
902                    "default".to_string(),
903                    json!({"value": 5}),
904                    None,
905                )
906                .unwrap(),
907            )
908            .unwrap()
909            .unwrap();
910
911        assert_eq!(result["value"], 6);
912    }
913}