allsource_core/
pipeline.rs

1use crate::error::{AllSourceError, Result};
2use crate::domain::entities::Event;
3use crate::metrics::MetricsRegistry;
4use chrono::{DateTime, Duration, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use serde_json::Value as JsonValue;
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use uuid::Uuid;
11
12/// Window type for time-based aggregations
13#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "lowercase")]
15pub enum WindowType {
16    /// Tumbling window (non-overlapping)
17    Tumbling,
18    /// Sliding window (overlapping)
19    Sliding,
20    /// Session window (activity-based)
21    Session,
22}
23
24/// Window configuration
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct WindowConfig {
27    /// Type of window
28    pub window_type: WindowType,
29
30    /// Window size in seconds
31    pub size_seconds: i64,
32
33    /// Slide interval in seconds (for sliding windows)
34    pub slide_seconds: Option<i64>,
35
36    /// Session timeout in seconds (for session windows)
37    pub session_timeout_seconds: Option<i64>,
38}
39
40/// Pipeline operator types
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "lowercase")]
43pub enum PipelineOperator {
44    /// Filter events based on a condition
45    Filter {
46        /// Field path to check (e.g., "payload.status")
47        field: String,
48        /// Expected value
49        value: JsonValue,
50        /// Operator: eq, ne, gt, lt, contains
51        op: String,
52    },
53
54    /// Transform event payload
55    Map {
56        /// Field to transform
57        field: String,
58        /// Transformation expression (simple for now)
59        transform: String,
60    },
61
62    /// Aggregate events
63    Reduce {
64        /// Field to aggregate
65        field: String,
66        /// Aggregation function: sum, count, avg, min, max
67        function: String,
68        /// Group by field (optional)
69        group_by: Option<String>,
70    },
71
72    /// Window-based aggregation
73    Window {
74        /// Window configuration
75        config: WindowConfig,
76        /// Aggregation to apply within window
77        aggregation: Box<PipelineOperator>,
78    },
79
80    /// Enrich event with external data
81    Enrich {
82        /// Source to enrich from
83        source: String,
84        /// Fields to add
85        fields: Vec<String>,
86    },
87
88    /// Split stream based on condition
89    Branch {
90        /// Condition field
91        field: String,
92        /// Branch mapping
93        branches: HashMap<String, String>,
94    },
95}
96
97/// Pipeline configuration
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct PipelineConfig {
100    /// Pipeline ID
101    pub id: Uuid,
102
103    /// Pipeline name
104    pub name: String,
105
106    /// Description
107    pub description: Option<String>,
108
109    /// Source event types to process
110    pub source_event_types: Vec<String>,
111
112    /// Pipeline operators in order
113    pub operators: Vec<PipelineOperator>,
114
115    /// Whether pipeline is enabled
116    pub enabled: bool,
117
118    /// Output destination (projection name or topic)
119    pub output: String,
120}
121
122/// Pipeline execution statistics
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct PipelineStats {
125    pub pipeline_id: Uuid,
126    pub events_processed: u64,
127    pub events_filtered: u64,
128    pub events_failed: u64,
129    pub last_processed: Option<DateTime<Utc>>,
130}
131
132/// Stateful operator for maintaining state across events
133pub struct StatefulOperator {
134    /// Operator state storage
135    state: Arc<RwLock<HashMap<String, JsonValue>>>,
136
137    /// Window buffers for time-based operations
138    windows: Arc<RwLock<HashMap<String, VecDeque<(DateTime<Utc>, Event)>>>>,
139}
140
141impl StatefulOperator {
142    pub fn new() -> Self {
143        Self {
144            state: Arc::new(RwLock::new(HashMap::new())),
145            windows: Arc::new(RwLock::new(HashMap::new())),
146        }
147    }
148
149    /// Store state value
150    pub fn set_state(&self, key: String, value: JsonValue) {
151        self.state.write().insert(key, value);
152    }
153
154    /// Get state value
155    pub fn get_state(&self, key: &str) -> Option<JsonValue> {
156        self.state.read().get(key).cloned()
157    }
158
159    /// Add event to window
160    pub fn add_to_window(&self, window_key: &str, event: Event, timestamp: DateTime<Utc>) {
161        let mut windows = self.windows.write();
162        windows
163            .entry(window_key.to_string())
164            .or_insert_with(VecDeque::new)
165            .push_back((timestamp, event));
166    }
167
168    /// Get events in window
169    pub fn get_window(&self, window_key: &str) -> Vec<Event> {
170        self.windows
171            .read()
172            .get(window_key)
173            .map(|w| w.iter().map(|(_, e)| e.clone()).collect())
174            .unwrap_or_default()
175    }
176
177    /// Evict expired events from window
178    pub fn evict_window(&self, window_key: &str, cutoff: DateTime<Utc>) {
179        if let Some(window) = self.windows.write().get_mut(window_key) {
180            window.retain(|(ts, _)| *ts > cutoff);
181        }
182    }
183
184    /// Clear all state
185    pub fn clear(&self) {
186        self.state.write().clear();
187        self.windows.write().clear();
188    }
189}
190
191impl Default for StatefulOperator {
192    fn default() -> Self {
193        Self::new()
194    }
195}
196
197/// Pipeline execution engine
198pub struct Pipeline {
199    config: PipelineConfig,
200    state: StatefulOperator,
201    stats: Arc<RwLock<PipelineStats>>,
202}
203
204impl Pipeline {
205    pub fn new(config: PipelineConfig) -> Self {
206        let stats = PipelineStats {
207            pipeline_id: config.id,
208            events_processed: 0,
209            events_filtered: 0,
210            events_failed: 0,
211            last_processed: None,
212        };
213
214        Self {
215            config,
216            state: StatefulOperator::new(),
217            stats: Arc::new(RwLock::new(stats)),
218        }
219    }
220
221    /// Process an event through the pipeline
222    pub fn process(&self, event: &Event) -> Result<Option<JsonValue>> {
223        // Check if event type matches source filter
224        if !self.config.source_event_types.is_empty()
225            && !self.config.source_event_types.iter().any(|t| t == event.event_type_str())
226        {
227            return Ok(None);
228        }
229
230        if !self.config.enabled {
231            return Ok(None);
232        }
233
234        let mut current_value = event.payload.clone();
235        let mut filtered = false;
236
237        // Apply operators in sequence
238        for operator in &self.config.operators {
239            match self.apply_operator(operator, &current_value, event) {
240                Ok(Some(result)) => {
241                    current_value = result;
242                }
243                Ok(None) => {
244                    // Event was filtered out
245                    filtered = true;
246                    self.stats.write().events_filtered += 1;
247                    break;
248                }
249                Err(e) => {
250                    self.stats.write().events_failed += 1;
251                    tracing::error!(
252                        "Pipeline {} operator failed: {}",
253                        self.config.name,
254                        e
255                    );
256                    return Err(e);
257                }
258            }
259        }
260
261        // Update stats
262        let mut stats = self.stats.write();
263        stats.events_processed += 1;
264        stats.last_processed = Some(Utc::now());
265
266        if filtered {
267            Ok(None)
268        } else {
269            Ok(Some(current_value))
270        }
271    }
272
273    /// Apply a single operator
274    fn apply_operator(
275        &self,
276        operator: &PipelineOperator,
277        value: &JsonValue,
278        event: &Event,
279    ) -> Result<Option<JsonValue>> {
280        match operator {
281            PipelineOperator::Filter { field, value: expected, op } => {
282                self.apply_filter(field, expected, op, value)
283            }
284
285            PipelineOperator::Map { field, transform } => {
286                self.apply_map(field, transform, value)
287            }
288
289            PipelineOperator::Reduce { field, function, group_by } => {
290                self.apply_reduce(field, function, group_by.as_deref(), value, event)
291            }
292
293            PipelineOperator::Window { config, aggregation } => {
294                self.apply_window(config, aggregation, event)
295            }
296
297            PipelineOperator::Enrich { source, fields } => {
298                self.apply_enrich(source, fields, value)
299            }
300
301            PipelineOperator::Branch { field, branches } => {
302                self.apply_branch(field, branches, value)
303            }
304        }
305    }
306
307    /// Apply filter operator
308    fn apply_filter(
309        &self,
310        field: &str,
311        expected: &JsonValue,
312        op: &str,
313        value: &JsonValue,
314    ) -> Result<Option<JsonValue>> {
315        let field_value = self.get_field(value, field);
316
317        let matches = match op {
318            "eq" => field_value == Some(expected),
319            "ne" => field_value != Some(expected),
320            "gt" => {
321                if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) = (field_value.as_ref(), expected) {
322                    a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
323                } else {
324                    false
325                }
326            }
327            "lt" => {
328                if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) = (field_value.as_ref(), expected) {
329                    a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
330                } else {
331                    false
332                }
333            }
334            "contains" => {
335                if let (Some(JsonValue::String(a)), JsonValue::String(b)) = (field_value.as_ref(), expected) {
336                    a.contains(b)
337                } else {
338                    false
339                }
340            }
341            _ => {
342                return Err(AllSourceError::ValidationError(format!(
343                    "Unknown filter operator: {}",
344                    op
345                )));
346            }
347        };
348
349        if matches {
350            Ok(Some(value.clone()))
351        } else {
352            Ok(None) // Filtered out
353        }
354    }
355
356    /// Apply map transformation
357    fn apply_map(
358        &self,
359        field: &str,
360        transform: &str,
361        value: &JsonValue,
362    ) -> Result<Option<JsonValue>> {
363        let mut result = value.clone();
364
365        // Simple transformations
366        let field_value = self.get_field(value, field);
367
368        let transformed = match transform {
369            "uppercase" => {
370                field_value
371                    .and_then(|v| v.as_str())
372                    .map(|s| JsonValue::String(s.to_uppercase()))
373            }
374            "lowercase" => {
375                field_value
376                    .and_then(|v| v.as_str())
377                    .map(|s| JsonValue::String(s.to_lowercase()))
378            }
379            "trim" => {
380                field_value
381                    .and_then(|v| v.as_str())
382                    .map(|s| JsonValue::String(s.trim().to_string()))
383            }
384            _ => {
385                // Try to parse as number operation
386                if let Some(stripped) = transform.strip_prefix("multiply:") {
387                    if let Ok(multiplier) = stripped.parse::<f64>() {
388                        field_value
389                            .and_then(|v| v.as_f64())
390                            .map(|n| JsonValue::Number(
391                                serde_json::Number::from_f64(n * multiplier).unwrap()
392                            ))
393                    } else {
394                        None
395                    }
396                } else if let Some(stripped) = transform.strip_prefix("add:") {
397                    if let Ok(addend) = stripped.parse::<f64>() {
398                        field_value
399                            .and_then(|v| v.as_f64())
400                            .map(|n| JsonValue::Number(
401                                serde_json::Number::from_f64(n + addend).unwrap()
402                            ))
403                    } else {
404                        None
405                    }
406                } else {
407                    None
408                }
409            }
410        };
411
412        if let Some(new_value) = transformed {
413            self.set_field(&mut result, field, new_value);
414        }
415
416        Ok(Some(result))
417    }
418
419    /// Apply reduce aggregation
420    fn apply_reduce(
421        &self,
422        field: &str,
423        function: &str,
424        group_by: Option<&str>,
425        value: &JsonValue,
426        event: &Event,
427    ) -> Result<Option<JsonValue>> {
428        // Get group key
429        let group_key = if let Some(group_field) = group_by {
430            self.get_field(value, group_field)
431                .and_then(|v| v.as_str())
432                .unwrap_or("default")
433                .to_string()
434        } else {
435            "default".to_string()
436        };
437
438        let state_key = format!("reduce_{}_{}", function, group_key);
439
440        // Get current aggregate value
441        let current = self.state.get_state(&state_key);
442
443        // Get field value to aggregate
444        let field_value = self.get_field(value, field);
445
446        let new_value = match function {
447            "count" => {
448                let count = current
449                    .and_then(|v| v.as_u64())
450                    .unwrap_or(0) + 1;
451                JsonValue::Number(count.into())
452            }
453            "sum" => {
454                let current_sum = current
455                    .and_then(|v| v.as_f64())
456                    .unwrap_or(0.0);
457                let value_to_add = field_value
458                    .and_then(|v| v.as_f64())
459                    .unwrap_or(0.0);
460                JsonValue::Number(
461                    serde_json::Number::from_f64(current_sum + value_to_add).unwrap()
462                )
463            }
464            "avg" => {
465                // Store sum and count separately
466                let sum_key = format!("{}_sum", state_key);
467                let count_key = format!("{}_count", state_key);
468
469                let current_sum = self.state.get_state(&sum_key)
470                    .and_then(|v| v.as_f64())
471                    .unwrap_or(0.0);
472                let current_count = self.state.get_state(&count_key)
473                    .and_then(|v| v.as_u64())
474                    .unwrap_or(0);
475
476                let value_to_add = field_value
477                    .and_then(|v| v.as_f64())
478                    .unwrap_or(0.0);
479
480                let new_sum = current_sum + value_to_add;
481                let new_count = current_count + 1;
482
483                self.state.set_state(sum_key, JsonValue::Number(
484                    serde_json::Number::from_f64(new_sum).unwrap()
485                ));
486                self.state.set_state(count_key, JsonValue::Number(new_count.into()));
487
488                let avg = new_sum / new_count as f64;
489                JsonValue::Number(serde_json::Number::from_f64(avg).unwrap())
490            }
491            "min" => {
492                let current_min = current.and_then(|v| v.as_f64());
493                let new_val = field_value.and_then(|v| v.as_f64());
494
495                match (current_min, new_val) {
496                    (Some(curr), Some(new)) => JsonValue::Number(
497                        serde_json::Number::from_f64(curr.min(new)).unwrap()
498                    ),
499                    (None, Some(new)) => JsonValue::Number(
500                        serde_json::Number::from_f64(new).unwrap()
501                    ),
502                    (Some(curr), None) => JsonValue::Number(
503                        serde_json::Number::from_f64(curr).unwrap()
504                    ),
505                    (None, None) => JsonValue::Null,
506                }
507            }
508            "max" => {
509                let current_max = current.and_then(|v| v.as_f64());
510                let new_val = field_value.and_then(|v| v.as_f64());
511
512                match (current_max, new_val) {
513                    (Some(curr), Some(new)) => JsonValue::Number(
514                        serde_json::Number::from_f64(curr.max(new)).unwrap()
515                    ),
516                    (None, Some(new)) => JsonValue::Number(
517                        serde_json::Number::from_f64(new).unwrap()
518                    ),
519                    (Some(curr), None) => JsonValue::Number(
520                        serde_json::Number::from_f64(curr).unwrap()
521                    ),
522                    (None, None) => JsonValue::Null,
523                }
524            }
525            _ => {
526                return Err(AllSourceError::ValidationError(format!(
527                    "Unknown reduce function: {}",
528                    function
529                )));
530            }
531        };
532
533        // Update state
534        self.state.set_state(state_key.clone(), new_value.clone());
535
536        // Return aggregated result
537        let result = serde_json::json!({
538            "group": group_key,
539            "function": function,
540            "value": new_value
541        });
542
543        Ok(Some(result))
544    }
545
546    /// Apply window aggregation
547    fn apply_window(
548        &self,
549        config: &WindowConfig,
550        aggregation: &PipelineOperator,
551        event: &Event,
552    ) -> Result<Option<JsonValue>> {
553        let window_key = format!("window_{}", self.config.id);
554        let now = Utc::now();
555
556        // Add event to window
557        self.state.add_to_window(&window_key, event.clone(), event.timestamp);
558
559        // Evict expired events based on window type
560        let cutoff = match config.window_type {
561            WindowType::Tumbling => now - Duration::seconds(config.size_seconds),
562            WindowType::Sliding => {
563                let slide = config.slide_seconds.unwrap_or(config.size_seconds);
564                now - Duration::seconds(slide)
565            }
566            WindowType::Session => {
567                let timeout = config.session_timeout_seconds.unwrap_or(300);
568                now - Duration::seconds(timeout)
569            }
570        };
571
572        self.state.evict_window(&window_key, cutoff);
573
574        // Get events in current window
575        let window_events = self.state.get_window(&window_key);
576
577        // Apply aggregation to window
578        let mut aggregate_value = JsonValue::Null;
579        for window_event in &window_events {
580            if let Ok(Some(result)) = self.apply_operator(aggregation, &window_event.payload, window_event) {
581                aggregate_value = result;
582            }
583        }
584
585        Ok(Some(serde_json::json!({
586            "window_type": config.window_type,
587            "window_size_seconds": config.size_seconds,
588            "events_in_window": window_events.len(),
589            "aggregation": aggregate_value
590        })))
591    }
592
593    /// Apply enrichment
594    fn apply_enrich(
595        &self,
596        _source: &str,
597        fields: &[String],
598        value: &JsonValue,
599    ) -> Result<Option<JsonValue>> {
600        // Placeholder for enrichment logic
601        // In production, this would fetch data from external sources
602        let mut result = value.clone();
603
604        for field in fields {
605            let enriched_value = JsonValue::String(format!("enriched_{}", field));
606            self.set_field(&mut result, field, enriched_value);
607        }
608
609        Ok(Some(result))
610    }
611
612    /// Apply branch routing
613    fn apply_branch(
614        &self,
615        field: &str,
616        branches: &HashMap<String, String>,
617        value: &JsonValue,
618    ) -> Result<Option<JsonValue>> {
619        let field_value = self.get_field(value, field);
620
621        if let Some(JsonValue::String(val)) = field_value {
622            if let Some(route) = branches.get(val) {
623                let mut result = value.clone();
624                if let JsonValue::Object(ref mut obj) = result {
625                    obj.insert("_route".to_string(), JsonValue::String(route.clone()));
626                }
627                return Ok(Some(result));
628            }
629        }
630
631        Ok(Some(value.clone()))
632    }
633
634    /// Helper: Get nested field from JSON
635    fn get_field<'a>(&self, value: &'a JsonValue, field: &str) -> Option<&'a JsonValue> {
636        let parts: Vec<&str> = field.split('.').collect();
637        let mut current = value;
638
639        for part in parts {
640            current = current.get(part)?;
641        }
642
643        Some(current)
644    }
645
646    /// Helper: Set nested field in JSON
647    fn set_field(&self, value: &mut JsonValue, field: &str, new_value: JsonValue) {
648        let parts: Vec<&str> = field.split('.').collect();
649
650        if parts.len() == 1 {
651            if let JsonValue::Object(ref mut obj) = value {
652                obj.insert(field.to_string(), new_value);
653            }
654            return;
655        }
656
657        // Navigate to parent
658        let mut current = value;
659        for part in &parts[..parts.len() - 1] {
660            if let JsonValue::Object(ref mut obj) = current {
661                current = obj.entry(part.to_string()).or_insert(JsonValue::Object(Default::default()));
662            }
663        }
664
665        // Set final value
666        if let JsonValue::Object(ref mut obj) = current {
667            obj.insert(parts.last().unwrap().to_string(), new_value);
668        }
669    }
670
671    /// Get pipeline statistics
672    pub fn stats(&self) -> PipelineStats {
673        self.stats.read().clone()
674    }
675
676    /// Get pipeline configuration
677    pub fn config(&self) -> &PipelineConfig {
678        &self.config
679    }
680
681    /// Reset pipeline state
682    pub fn reset(&self) {
683        self.state.clear();
684        let mut stats = self.stats.write();
685        stats.events_processed = 0;
686        stats.events_filtered = 0;
687        stats.events_failed = 0;
688        stats.last_processed = None;
689    }
690}
691
692/// Manages multiple pipelines
693pub struct PipelineManager {
694    pipelines: Arc<RwLock<HashMap<Uuid, Arc<Pipeline>>>>,
695    metrics: Arc<MetricsRegistry>,
696}
697
698impl PipelineManager {
699    pub fn new() -> Self {
700        Self::with_metrics(MetricsRegistry::new())
701    }
702
703    pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
704        Self {
705            pipelines: Arc::new(RwLock::new(HashMap::new())),
706            metrics,
707        }
708    }
709
710    /// Register a new pipeline
711    pub fn register(&self, config: PipelineConfig) -> Uuid {
712        let id = config.id;
713        let name = config.name.clone();
714        let pipeline = Arc::new(Pipeline::new(config));
715        self.pipelines.write().insert(id, pipeline);
716
717        let count = self.pipelines.read().len();
718        self.metrics.pipelines_registered_total.set(count as i64);
719
720        tracing::info!("📊 Registered pipeline: {} ({})", name, id);
721        id
722    }
723
724    /// Get a pipeline by ID
725    pub fn get(&self, id: Uuid) -> Option<Arc<Pipeline>> {
726        self.pipelines.read().get(&id).cloned()
727    }
728
729    /// Process event through all matching pipelines
730    pub fn process_event(&self, event: &Event) -> Vec<(Uuid, JsonValue)> {
731        let timer = self.metrics.pipeline_duration_seconds.start_timer();
732
733        let pipelines = self.pipelines.read();
734        let mut results = Vec::new();
735
736        for (id, pipeline) in pipelines.iter() {
737            let pipeline_name = &pipeline.config().name;
738            let pipeline_id = id.to_string();
739
740            match pipeline.process(event) {
741                Ok(Some(result)) => {
742                    self.metrics.pipeline_events_processed
743                        .with_label_values(&[&pipeline_id, pipeline_name])
744                        .inc();
745                    results.push((*id, result));
746                }
747                Ok(None) => {
748                    // Event filtered out or didn't match - not an error
749                }
750                Err(e) => {
751                    self.metrics.pipeline_errors_total
752                        .with_label_values(&[pipeline_name])
753                        .inc();
754                    tracing::error!(
755                        "Pipeline '{}' ({}) failed to process event: {}",
756                        pipeline_name,
757                        id,
758                        e
759                    );
760                }
761            }
762        }
763
764        timer.observe_duration();
765        results
766    }
767
768    /// List all pipelines
769    pub fn list(&self) -> Vec<PipelineConfig> {
770        self.pipelines
771            .read()
772            .values()
773            .map(|p| p.config().clone())
774            .collect()
775    }
776
777    /// Remove a pipeline
778    pub fn remove(&self, id: Uuid) -> bool {
779        let removed = self.pipelines.write().remove(&id).is_some();
780
781        if removed {
782            let count = self.pipelines.read().len();
783            self.metrics.pipelines_registered_total.set(count as i64);
784        }
785
786        removed
787    }
788
789    /// Get statistics for all pipelines
790    pub fn all_stats(&self) -> Vec<PipelineStats> {
791        self.pipelines
792            .read()
793            .values()
794            .map(|p| p.stats())
795            .collect()
796    }
797}
798
799impl Default for PipelineManager {
800    fn default() -> Self {
801        Self::new()
802    }
803}
804
805#[cfg(test)]
806mod tests {
807    use super::*;
808    use serde_json::json;
809
810    #[test]
811    fn test_filter_operator() {
812        let config = PipelineConfig {
813            id: Uuid::new_v4(),
814            name: "test_filter".to_string(),
815            description: None,
816            source_event_types: vec!["test".to_string()],
817            operators: vec![PipelineOperator::Filter {
818                field: "status".to_string(),
819                value: json!("active"),
820                op: "eq".to_string(),
821            }],
822            enabled: true,
823            output: "test_output".to_string(),
824        };
825
826        let pipeline = Pipeline::new(config);
827        let event = Event::from_strings(
828            "test".to_string(),
829            "entity1".to_string(),
830            "default".to_string(),
831            json!({"status": "active"}),
832            None,
833        ).unwrap();
834
835        let result = pipeline.process(&event).unwrap();
836        assert!(result.is_some());
837    }
838
839    #[test]
840    fn test_map_operator() {
841        let config = PipelineConfig {
842            id: Uuid::new_v4(),
843            name: "test_map".to_string(),
844            description: None,
845            source_event_types: vec!["test".to_string()],
846            operators: vec![PipelineOperator::Map {
847                field: "name".to_string(),
848                transform: "uppercase".to_string(),
849            }],
850            enabled: true,
851            output: "test_output".to_string(),
852        };
853
854        let pipeline = Pipeline::new(config);
855        let event = Event::from_strings(
856            "test".to_string(),
857            "entity1".to_string(),
858            "default".to_string(),
859            json!({"name": "hello"}),
860            None,
861        ).unwrap();
862
863        let result = pipeline.process(&event).unwrap().unwrap();
864        assert_eq!(result["name"], "HELLO");
865    }
866
867    #[test]
868    fn test_reduce_count() {
869        let config = PipelineConfig {
870            id: Uuid::new_v4(),
871            name: "test_reduce".to_string(),
872            description: None,
873            source_event_types: vec!["test".to_string()],
874            operators: vec![PipelineOperator::Reduce {
875                field: "value".to_string(),
876                function: "count".to_string(),
877                group_by: None,
878            }],
879            enabled: true,
880            output: "test_output".to_string(),
881        };
882
883        let pipeline = Pipeline::new(config);
884
885        for i in 0..5 {
886            let event = Event::from_strings(
887                "test".to_string(),
888                "entity1".to_string(),
889                "default".to_string(),
890                json!({"value": i}),
891                None,
892            ).unwrap();
893            pipeline.process(&event).unwrap();
894        }
895
896        let result = pipeline.process(&Event::from_strings(
897            "test".to_string(),
898            "entity1".to_string(),
899            "default".to_string(),
900            json!({"value": 5}),
901            None,
902        ).unwrap()).unwrap().unwrap();
903
904        assert_eq!(result["value"], 6);
905    }
906}