allsource_core/
pipeline.rs

1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::metrics::MetricsRegistry;
4use chrono::{DateTime, Duration, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use serde_json::Value as JsonValue;
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use uuid::Uuid;
11
12/// Window type for time-based aggregations
13#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "lowercase")]
15pub enum WindowType {
16    /// Tumbling window (non-overlapping)
17    Tumbling,
18    /// Sliding window (overlapping)
19    Sliding,
20    /// Session window (activity-based)
21    Session,
22}
23
24/// Window configuration
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct WindowConfig {
27    /// Type of window
28    pub window_type: WindowType,
29
30    /// Window size in seconds
31    pub size_seconds: i64,
32
33    /// Slide interval in seconds (for sliding windows)
34    pub slide_seconds: Option<i64>,
35
36    /// Session timeout in seconds (for session windows)
37    pub session_timeout_seconds: Option<i64>,
38}
39
40/// Pipeline operator types
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "lowercase")]
43pub enum PipelineOperator {
44    /// Filter events based on a condition
45    Filter {
46        /// Field path to check (e.g., "payload.status")
47        field: String,
48        /// Expected value
49        value: JsonValue,
50        /// Operator: eq, ne, gt, lt, contains
51        op: String,
52    },
53
54    /// Transform event payload
55    Map {
56        /// Field to transform
57        field: String,
58        /// Transformation expression (simple for now)
59        transform: String,
60    },
61
62    /// Aggregate events
63    Reduce {
64        /// Field to aggregate
65        field: String,
66        /// Aggregation function: sum, count, avg, min, max
67        function: String,
68        /// Group by field (optional)
69        group_by: Option<String>,
70    },
71
72    /// Window-based aggregation
73    Window {
74        /// Window configuration
75        config: WindowConfig,
76        /// Aggregation to apply within window
77        aggregation: Box<PipelineOperator>,
78    },
79
80    /// Enrich event with external data
81    Enrich {
82        /// Source to enrich from
83        source: String,
84        /// Fields to add
85        fields: Vec<String>,
86    },
87
88    /// Split stream based on condition
89    Branch {
90        /// Condition field
91        field: String,
92        /// Branch mapping
93        branches: HashMap<String, String>,
94    },
95}
96
97/// Pipeline configuration
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct PipelineConfig {
100    /// Pipeline ID
101    pub id: Uuid,
102
103    /// Pipeline name
104    pub name: String,
105
106    /// Description
107    pub description: Option<String>,
108
109    /// Source event types to process
110    pub source_event_types: Vec<String>,
111
112    /// Pipeline operators in order
113    pub operators: Vec<PipelineOperator>,
114
115    /// Whether pipeline is enabled
116    pub enabled: bool,
117
118    /// Output destination (projection name or topic)
119    pub output: String,
120}
121
122/// Pipeline execution statistics
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct PipelineStats {
125    pub pipeline_id: Uuid,
126    pub events_processed: u64,
127    pub events_filtered: u64,
128    pub events_failed: u64,
129    pub last_processed: Option<DateTime<Utc>>,
130}
131
132/// Stateful operator for maintaining state across events
133pub struct StatefulOperator {
134    /// Operator state storage
135    state: Arc<RwLock<HashMap<String, JsonValue>>>,
136
137    /// Window buffers for time-based operations
138    windows: Arc<RwLock<HashMap<String, VecDeque<(DateTime<Utc>, Event)>>>>,
139}
140
141impl StatefulOperator {
142    pub fn new() -> Self {
143        Self {
144            state: Arc::new(RwLock::new(HashMap::new())),
145            windows: Arc::new(RwLock::new(HashMap::new())),
146        }
147    }
148
149    /// Store state value
150    pub fn set_state(&self, key: String, value: JsonValue) {
151        self.state.write().insert(key, value);
152    }
153
154    /// Get state value
155    pub fn get_state(&self, key: &str) -> Option<JsonValue> {
156        self.state.read().get(key).cloned()
157    }
158
159    /// Add event to window
160    pub fn add_to_window(&self, window_key: &str, event: Event, timestamp: DateTime<Utc>) {
161        let mut windows = self.windows.write();
162        windows
163            .entry(window_key.to_string())
164            .or_insert_with(VecDeque::new)
165            .push_back((timestamp, event));
166    }
167
168    /// Get events in window
169    pub fn get_window(&self, window_key: &str) -> Vec<Event> {
170        self.windows
171            .read()
172            .get(window_key)
173            .map(|w| w.iter().map(|(_, e)| e.clone()).collect())
174            .unwrap_or_default()
175    }
176
177    /// Evict expired events from window
178    pub fn evict_window(&self, window_key: &str, cutoff: DateTime<Utc>) {
179        if let Some(window) = self.windows.write().get_mut(window_key) {
180            window.retain(|(ts, _)| *ts > cutoff);
181        }
182    }
183
184    /// Clear all state
185    pub fn clear(&self) {
186        self.state.write().clear();
187        self.windows.write().clear();
188    }
189}
190
191impl Default for StatefulOperator {
192    fn default() -> Self {
193        Self::new()
194    }
195}
196
197/// Pipeline execution engine
198pub struct Pipeline {
199    config: PipelineConfig,
200    state: StatefulOperator,
201    stats: Arc<RwLock<PipelineStats>>,
202}
203
204impl Pipeline {
205    pub fn new(config: PipelineConfig) -> Self {
206        let stats = PipelineStats {
207            pipeline_id: config.id,
208            events_processed: 0,
209            events_filtered: 0,
210            events_failed: 0,
211            last_processed: None,
212        };
213
214        Self {
215            config,
216            state: StatefulOperator::new(),
217            stats: Arc::new(RwLock::new(stats)),
218        }
219    }
220
221    /// Process an event through the pipeline
222    pub fn process(&self, event: &Event) -> Result<Option<JsonValue>> {
223        // Check if event type matches source filter
224        if !self.config.source_event_types.is_empty()
225            && !self
226                .config
227                .source_event_types
228                .iter()
229                .any(|t| t == event.event_type_str())
230        {
231            return Ok(None);
232        }
233
234        if !self.config.enabled {
235            return Ok(None);
236        }
237
238        let mut current_value = event.payload.clone();
239        let mut filtered = false;
240
241        // Apply operators in sequence
242        for operator in &self.config.operators {
243            match self.apply_operator(operator, &current_value, event) {
244                Ok(Some(result)) => {
245                    current_value = result;
246                }
247                Ok(None) => {
248                    // Event was filtered out
249                    filtered = true;
250                    self.stats.write().events_filtered += 1;
251                    break;
252                }
253                Err(e) => {
254                    self.stats.write().events_failed += 1;
255                    tracing::error!("Pipeline {} operator failed: {}", self.config.name, e);
256                    return Err(e);
257                }
258            }
259        }
260
261        // Update stats
262        let mut stats = self.stats.write();
263        stats.events_processed += 1;
264        stats.last_processed = Some(Utc::now());
265
266        if filtered {
267            Ok(None)
268        } else {
269            Ok(Some(current_value))
270        }
271    }
272
273    /// Apply a single operator
274    fn apply_operator(
275        &self,
276        operator: &PipelineOperator,
277        value: &JsonValue,
278        event: &Event,
279    ) -> Result<Option<JsonValue>> {
280        match operator {
281            PipelineOperator::Filter {
282                field,
283                value: expected,
284                op,
285            } => self.apply_filter(field, expected, op, value),
286
287            PipelineOperator::Map { field, transform } => self.apply_map(field, transform, value),
288
289            PipelineOperator::Reduce {
290                field,
291                function,
292                group_by,
293            } => self.apply_reduce(field, function, group_by.as_deref(), value, event),
294
295            PipelineOperator::Window {
296                config,
297                aggregation,
298            } => self.apply_window(config, aggregation, event),
299
300            PipelineOperator::Enrich { source, fields } => self.apply_enrich(source, fields, value),
301
302            PipelineOperator::Branch { field, branches } => {
303                self.apply_branch(field, branches, value)
304            }
305        }
306    }
307
308    /// Apply filter operator
309    fn apply_filter(
310        &self,
311        field: &str,
312        expected: &JsonValue,
313        op: &str,
314        value: &JsonValue,
315    ) -> Result<Option<JsonValue>> {
316        let field_value = self.get_field(value, field);
317
318        let matches = match op {
319            "eq" => field_value == Some(expected),
320            "ne" => field_value != Some(expected),
321            "gt" => {
322                if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
323                    (field_value.as_ref(), expected)
324                {
325                    a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
326                } else {
327                    false
328                }
329            }
330            "lt" => {
331                if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
332                    (field_value.as_ref(), expected)
333                {
334                    a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
335                } else {
336                    false
337                }
338            }
339            "contains" => {
340                if let (Some(JsonValue::String(a)), JsonValue::String(b)) =
341                    (field_value.as_ref(), expected)
342                {
343                    a.contains(b)
344                } else {
345                    false
346                }
347            }
348            _ => {
349                return Err(AllSourceError::ValidationError(format!(
350                    "Unknown filter operator: {}",
351                    op
352                )));
353            }
354        };
355
356        if matches {
357            Ok(Some(value.clone()))
358        } else {
359            Ok(None) // Filtered out
360        }
361    }
362
363    /// Apply map transformation
364    fn apply_map(
365        &self,
366        field: &str,
367        transform: &str,
368        value: &JsonValue,
369    ) -> Result<Option<JsonValue>> {
370        let mut result = value.clone();
371
372        // Simple transformations
373        let field_value = self.get_field(value, field);
374
375        let transformed = match transform {
376            "uppercase" => field_value
377                .and_then(|v| v.as_str())
378                .map(|s| JsonValue::String(s.to_uppercase())),
379            "lowercase" => field_value
380                .and_then(|v| v.as_str())
381                .map(|s| JsonValue::String(s.to_lowercase())),
382            "trim" => field_value
383                .and_then(|v| v.as_str())
384                .map(|s| JsonValue::String(s.trim().to_string())),
385            _ => {
386                // Try to parse as number operation
387                if let Some(stripped) = transform.strip_prefix("multiply:") {
388                    if let Ok(multiplier) = stripped.parse::<f64>() {
389                        field_value.and_then(|v| v.as_f64()).map(|n| {
390                            JsonValue::Number(serde_json::Number::from_f64(n * multiplier).unwrap())
391                        })
392                    } else {
393                        None
394                    }
395                } else if let Some(stripped) = transform.strip_prefix("add:") {
396                    if let Ok(addend) = stripped.parse::<f64>() {
397                        field_value.and_then(|v| v.as_f64()).map(|n| {
398                            JsonValue::Number(serde_json::Number::from_f64(n + addend).unwrap())
399                        })
400                    } else {
401                        None
402                    }
403                } else {
404                    None
405                }
406            }
407        };
408
409        if let Some(new_value) = transformed {
410            self.set_field(&mut result, field, new_value);
411        }
412
413        Ok(Some(result))
414    }
415
416    /// Apply reduce aggregation
417    fn apply_reduce(
418        &self,
419        field: &str,
420        function: &str,
421        group_by: Option<&str>,
422        value: &JsonValue,
423        event: &Event,
424    ) -> Result<Option<JsonValue>> {
425        // Get group key
426        let group_key = if let Some(group_field) = group_by {
427            self.get_field(value, group_field)
428                .and_then(|v| v.as_str())
429                .unwrap_or("default")
430                .to_string()
431        } else {
432            "default".to_string()
433        };
434
435        let state_key = format!("reduce_{}_{}", function, group_key);
436
437        // Get current aggregate value
438        let current = self.state.get_state(&state_key);
439
440        // Get field value to aggregate
441        let field_value = self.get_field(value, field);
442
443        let new_value = match function {
444            "count" => {
445                let count = current.and_then(|v| v.as_u64()).unwrap_or(0) + 1;
446                JsonValue::Number(count.into())
447            }
448            "sum" => {
449                let current_sum = current.and_then(|v| v.as_f64()).unwrap_or(0.0);
450                let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
451                JsonValue::Number(serde_json::Number::from_f64(current_sum + value_to_add).unwrap())
452            }
453            "avg" => {
454                // Store sum and count separately
455                let sum_key = format!("{}_sum", state_key);
456                let count_key = format!("{}_count", state_key);
457
458                let current_sum = self
459                    .state
460                    .get_state(&sum_key)
461                    .and_then(|v| v.as_f64())
462                    .unwrap_or(0.0);
463                let current_count = self
464                    .state
465                    .get_state(&count_key)
466                    .and_then(|v| v.as_u64())
467                    .unwrap_or(0);
468
469                let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
470
471                let new_sum = current_sum + value_to_add;
472                let new_count = current_count + 1;
473
474                self.state.set_state(
475                    sum_key,
476                    JsonValue::Number(serde_json::Number::from_f64(new_sum).unwrap()),
477                );
478                self.state
479                    .set_state(count_key, JsonValue::Number(new_count.into()));
480
481                let avg = new_sum / new_count as f64;
482                JsonValue::Number(serde_json::Number::from_f64(avg).unwrap())
483            }
484            "min" => {
485                let current_min = current.and_then(|v| v.as_f64());
486                let new_val = field_value.and_then(|v| v.as_f64());
487
488                match (current_min, new_val) {
489                    (Some(curr), Some(new)) => {
490                        JsonValue::Number(serde_json::Number::from_f64(curr.min(new)).unwrap())
491                    }
492                    (None, Some(new)) => {
493                        JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
494                    }
495                    (Some(curr), None) => {
496                        JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
497                    }
498                    (None, None) => JsonValue::Null,
499                }
500            }
501            "max" => {
502                let current_max = current.and_then(|v| v.as_f64());
503                let new_val = field_value.and_then(|v| v.as_f64());
504
505                match (current_max, new_val) {
506                    (Some(curr), Some(new)) => {
507                        JsonValue::Number(serde_json::Number::from_f64(curr.max(new)).unwrap())
508                    }
509                    (None, Some(new)) => {
510                        JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
511                    }
512                    (Some(curr), None) => {
513                        JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
514                    }
515                    (None, None) => JsonValue::Null,
516                }
517            }
518            _ => {
519                return Err(AllSourceError::ValidationError(format!(
520                    "Unknown reduce function: {}",
521                    function
522                )));
523            }
524        };
525
526        // Update state
527        self.state.set_state(state_key.clone(), new_value.clone());
528
529        // Return aggregated result
530        let result = serde_json::json!({
531            "group": group_key,
532            "function": function,
533            "value": new_value
534        });
535
536        Ok(Some(result))
537    }
538
539    /// Apply window aggregation
540    fn apply_window(
541        &self,
542        config: &WindowConfig,
543        aggregation: &PipelineOperator,
544        event: &Event,
545    ) -> Result<Option<JsonValue>> {
546        let window_key = format!("window_{}", self.config.id);
547        let now = Utc::now();
548
549        // Add event to window
550        self.state
551            .add_to_window(&window_key, event.clone(), event.timestamp);
552
553        // Evict expired events based on window type
554        let cutoff = match config.window_type {
555            WindowType::Tumbling => now - Duration::seconds(config.size_seconds),
556            WindowType::Sliding => {
557                let slide = config.slide_seconds.unwrap_or(config.size_seconds);
558                now - Duration::seconds(slide)
559            }
560            WindowType::Session => {
561                let timeout = config.session_timeout_seconds.unwrap_or(300);
562                now - Duration::seconds(timeout)
563            }
564        };
565
566        self.state.evict_window(&window_key, cutoff);
567
568        // Get events in current window
569        let window_events = self.state.get_window(&window_key);
570
571        // Apply aggregation to window
572        let mut aggregate_value = JsonValue::Null;
573        for window_event in &window_events {
574            if let Ok(Some(result)) =
575                self.apply_operator(aggregation, &window_event.payload, window_event)
576            {
577                aggregate_value = result;
578            }
579        }
580
581        Ok(Some(serde_json::json!({
582            "window_type": config.window_type,
583            "window_size_seconds": config.size_seconds,
584            "events_in_window": window_events.len(),
585            "aggregation": aggregate_value
586        })))
587    }
588
589    /// Apply enrichment
590    fn apply_enrich(
591        &self,
592        _source: &str,
593        fields: &[String],
594        value: &JsonValue,
595    ) -> Result<Option<JsonValue>> {
596        // Placeholder for enrichment logic
597        // In production, this would fetch data from external sources
598        let mut result = value.clone();
599
600        for field in fields {
601            let enriched_value = JsonValue::String(format!("enriched_{}", field));
602            self.set_field(&mut result, field, enriched_value);
603        }
604
605        Ok(Some(result))
606    }
607
608    /// Apply branch routing
609    fn apply_branch(
610        &self,
611        field: &str,
612        branches: &HashMap<String, String>,
613        value: &JsonValue,
614    ) -> Result<Option<JsonValue>> {
615        let field_value = self.get_field(value, field);
616
617        if let Some(JsonValue::String(val)) = field_value {
618            if let Some(route) = branches.get(val) {
619                let mut result = value.clone();
620                if let JsonValue::Object(ref mut obj) = result {
621                    obj.insert("_route".to_string(), JsonValue::String(route.clone()));
622                }
623                return Ok(Some(result));
624            }
625        }
626
627        Ok(Some(value.clone()))
628    }
629
630    /// Helper: Get nested field from JSON
631    fn get_field<'a>(&self, value: &'a JsonValue, field: &str) -> Option<&'a JsonValue> {
632        let parts: Vec<&str> = field.split('.').collect();
633        let mut current = value;
634
635        for part in parts {
636            current = current.get(part)?;
637        }
638
639        Some(current)
640    }
641
642    /// Helper: Set nested field in JSON
643    fn set_field(&self, value: &mut JsonValue, field: &str, new_value: JsonValue) {
644        let parts: Vec<&str> = field.split('.').collect();
645
646        if parts.len() == 1 {
647            if let JsonValue::Object(ref mut obj) = value {
648                obj.insert(field.to_string(), new_value);
649            }
650            return;
651        }
652
653        // Navigate to parent
654        let mut current = value;
655        for part in &parts[..parts.len() - 1] {
656            if let JsonValue::Object(ref mut obj) = current {
657                current = obj
658                    .entry(part.to_string())
659                    .or_insert(JsonValue::Object(Default::default()));
660            }
661        }
662
663        // Set final value
664        if let JsonValue::Object(ref mut obj) = current {
665            obj.insert(parts.last().unwrap().to_string(), new_value);
666        }
667    }
668
669    /// Get pipeline statistics
670    pub fn stats(&self) -> PipelineStats {
671        self.stats.read().clone()
672    }
673
674    /// Get pipeline configuration
675    pub fn config(&self) -> &PipelineConfig {
676        &self.config
677    }
678
679    /// Reset pipeline state
680    pub fn reset(&self) {
681        self.state.clear();
682        let mut stats = self.stats.write();
683        stats.events_processed = 0;
684        stats.events_filtered = 0;
685        stats.events_failed = 0;
686        stats.last_processed = None;
687    }
688}
689
690/// Manages multiple pipelines
691pub struct PipelineManager {
692    pipelines: Arc<RwLock<HashMap<Uuid, Arc<Pipeline>>>>,
693    metrics: Arc<MetricsRegistry>,
694}
695
696impl PipelineManager {
697    pub fn new() -> Self {
698        Self::with_metrics(MetricsRegistry::new())
699    }
700
701    pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
702        Self {
703            pipelines: Arc::new(RwLock::new(HashMap::new())),
704            metrics,
705        }
706    }
707
708    /// Register a new pipeline
709    pub fn register(&self, config: PipelineConfig) -> Uuid {
710        let id = config.id;
711        let name = config.name.clone();
712        let pipeline = Arc::new(Pipeline::new(config));
713        self.pipelines.write().insert(id, pipeline);
714
715        let count = self.pipelines.read().len();
716        self.metrics.pipelines_registered_total.set(count as i64);
717
718        tracing::info!("📊 Registered pipeline: {} ({})", name, id);
719        id
720    }
721
722    /// Get a pipeline by ID
723    pub fn get(&self, id: Uuid) -> Option<Arc<Pipeline>> {
724        self.pipelines.read().get(&id).cloned()
725    }
726
727    /// Process event through all matching pipelines
728    pub fn process_event(&self, event: &Event) -> Vec<(Uuid, JsonValue)> {
729        let timer = self.metrics.pipeline_duration_seconds.start_timer();
730
731        let pipelines = self.pipelines.read();
732        let mut results = Vec::new();
733
734        for (id, pipeline) in pipelines.iter() {
735            let pipeline_name = &pipeline.config().name;
736            let pipeline_id = id.to_string();
737
738            match pipeline.process(event) {
739                Ok(Some(result)) => {
740                    self.metrics
741                        .pipeline_events_processed
742                        .with_label_values(&[&pipeline_id, pipeline_name])
743                        .inc();
744                    results.push((*id, result));
745                }
746                Ok(None) => {
747                    // Event filtered out or didn't match - not an error
748                }
749                Err(e) => {
750                    self.metrics
751                        .pipeline_errors_total
752                        .with_label_values(&[pipeline_name])
753                        .inc();
754                    tracing::error!(
755                        "Pipeline '{}' ({}) failed to process event: {}",
756                        pipeline_name,
757                        id,
758                        e
759                    );
760                }
761            }
762        }
763
764        timer.observe_duration();
765        results
766    }
767
768    /// List all pipelines
769    pub fn list(&self) -> Vec<PipelineConfig> {
770        self.pipelines
771            .read()
772            .values()
773            .map(|p| p.config().clone())
774            .collect()
775    }
776
777    /// Remove a pipeline
778    pub fn remove(&self, id: Uuid) -> bool {
779        let removed = self.pipelines.write().remove(&id).is_some();
780
781        if removed {
782            let count = self.pipelines.read().len();
783            self.metrics.pipelines_registered_total.set(count as i64);
784        }
785
786        removed
787    }
788
789    /// Get statistics for all pipelines
790    pub fn all_stats(&self) -> Vec<PipelineStats> {
791        self.pipelines.read().values().map(|p| p.stats()).collect()
792    }
793}
794
795impl Default for PipelineManager {
796    fn default() -> Self {
797        Self::new()
798    }
799}
800
801#[cfg(test)]
802mod tests {
803    use super::*;
804    use serde_json::json;
805
806    #[test]
807    fn test_filter_operator() {
808        let config = PipelineConfig {
809            id: Uuid::new_v4(),
810            name: "test_filter".to_string(),
811            description: None,
812            source_event_types: vec!["test".to_string()],
813            operators: vec![PipelineOperator::Filter {
814                field: "status".to_string(),
815                value: json!("active"),
816                op: "eq".to_string(),
817            }],
818            enabled: true,
819            output: "test_output".to_string(),
820        };
821
822        let pipeline = Pipeline::new(config);
823        let event = Event::from_strings(
824            "test".to_string(),
825            "entity1".to_string(),
826            "default".to_string(),
827            json!({"status": "active"}),
828            None,
829        )
830        .unwrap();
831
832        let result = pipeline.process(&event).unwrap();
833        assert!(result.is_some());
834    }
835
836    #[test]
837    fn test_map_operator() {
838        let config = PipelineConfig {
839            id: Uuid::new_v4(),
840            name: "test_map".to_string(),
841            description: None,
842            source_event_types: vec!["test".to_string()],
843            operators: vec![PipelineOperator::Map {
844                field: "name".to_string(),
845                transform: "uppercase".to_string(),
846            }],
847            enabled: true,
848            output: "test_output".to_string(),
849        };
850
851        let pipeline = Pipeline::new(config);
852        let event = Event::from_strings(
853            "test".to_string(),
854            "entity1".to_string(),
855            "default".to_string(),
856            json!({"name": "hello"}),
857            None,
858        )
859        .unwrap();
860
861        let result = pipeline.process(&event).unwrap().unwrap();
862        assert_eq!(result["name"], "HELLO");
863    }
864
865    #[test]
866    fn test_reduce_count() {
867        let config = PipelineConfig {
868            id: Uuid::new_v4(),
869            name: "test_reduce".to_string(),
870            description: None,
871            source_event_types: vec!["test".to_string()],
872            operators: vec![PipelineOperator::Reduce {
873                field: "value".to_string(),
874                function: "count".to_string(),
875                group_by: None,
876            }],
877            enabled: true,
878            output: "test_output".to_string(),
879        };
880
881        let pipeline = Pipeline::new(config);
882
883        for i in 0..5 {
884            let event = Event::from_strings(
885                "test".to_string(),
886                "entity1".to_string(),
887                "default".to_string(),
888                json!({"value": i}),
889                None,
890            )
891            .unwrap();
892            pipeline.process(&event).unwrap();
893        }
894
895        let result = pipeline
896            .process(
897                &Event::from_strings(
898                    "test".to_string(),
899                    "entity1".to_string(),
900                    "default".to_string(),
901                    json!({"value": 5}),
902                    None,
903                )
904                .unwrap(),
905            )
906            .unwrap()
907            .unwrap();
908
909        assert_eq!(result["value"], 6);
910    }
911}