Skip to main content

allsource_core/application/services/
pipeline.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4    infrastructure::observability::metrics::MetricsRegistry,
5};
6use chrono::{DateTime, Duration, Utc};
7use dashmap::DashMap;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use serde_json::Value as JsonValue;
11use std::{
12    collections::{HashMap, VecDeque},
13    sync::Arc,
14};
15use uuid::Uuid;
16
17/// Window type for time-based aggregations
18#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
19#[serde(rename_all = "lowercase")]
20pub enum WindowType {
21    /// Tumbling window (non-overlapping)
22    Tumbling,
23    /// Sliding window (overlapping)
24    Sliding,
25    /// Session window (activity-based)
26    Session,
27}
28
29/// Window configuration
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct WindowConfig {
32    /// Type of window
33    pub window_type: WindowType,
34
35    /// Window size in seconds
36    pub size_seconds: i64,
37
38    /// Slide interval in seconds (for sliding windows)
39    pub slide_seconds: Option<i64>,
40
41    /// Session timeout in seconds (for session windows)
42    pub session_timeout_seconds: Option<i64>,
43}
44
45/// Pipeline operator types
46#[derive(Debug, Clone, Serialize, Deserialize)]
47#[serde(tag = "type", rename_all = "lowercase")]
48pub enum PipelineOperator {
49    /// Filter events based on a condition
50    Filter {
51        /// Field path to check (e.g., "payload.status")
52        field: String,
53        /// Expected value
54        value: JsonValue,
55        /// Operator: eq, ne, gt, lt, contains
56        op: String,
57    },
58
59    /// Transform event payload
60    Map {
61        /// Field to transform
62        field: String,
63        /// Transformation expression (simple for now)
64        transform: String,
65    },
66
67    /// Aggregate events
68    Reduce {
69        /// Field to aggregate
70        field: String,
71        /// Aggregation function: sum, count, avg, min, max
72        function: String,
73        /// Group by field (optional)
74        group_by: Option<String>,
75    },
76
77    /// Window-based aggregation
78    Window {
79        /// Window configuration
80        config: WindowConfig,
81        /// Aggregation to apply within window
82        aggregation: Box<PipelineOperator>,
83    },
84
85    /// Enrich event with external data
86    Enrich {
87        /// Source to enrich from
88        source: String,
89        /// Fields to add
90        fields: Vec<String>,
91    },
92
93    /// Split stream based on condition
94    Branch {
95        /// Condition field
96        field: String,
97        /// Branch mapping
98        branches: HashMap<String, String>,
99    },
100}
101
102/// Pipeline configuration
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct PipelineConfig {
105    /// Pipeline ID
106    pub id: Uuid,
107
108    /// Pipeline name
109    pub name: String,
110
111    /// Description
112    pub description: Option<String>,
113
114    /// Source event types to process
115    pub source_event_types: Vec<String>,
116
117    /// Pipeline operators in order
118    pub operators: Vec<PipelineOperator>,
119
120    /// Whether pipeline is enabled
121    pub enabled: bool,
122
123    /// Output destination (projection name or topic)
124    pub output: String,
125}
126
127/// Pipeline execution statistics
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct PipelineStats {
130    pub pipeline_id: Uuid,
131    pub events_processed: u64,
132    pub events_filtered: u64,
133    pub events_failed: u64,
134    pub last_processed: Option<DateTime<Utc>>,
135}
136
137/// Stateful operator for maintaining state across events
138pub struct StatefulOperator {
139    /// Operator state storage
140    state: Arc<RwLock<HashMap<String, JsonValue>>>,
141
142    /// Window buffers for time-based operations
143    windows: Arc<RwLock<HashMap<String, VecDeque<(DateTime<Utc>, Event)>>>>,
144}
145
146impl StatefulOperator {
147    pub fn new() -> Self {
148        Self {
149            state: Arc::new(RwLock::new(HashMap::new())),
150            windows: Arc::new(RwLock::new(HashMap::new())),
151        }
152    }
153
154    /// Store state value
155    pub fn set_state(&self, key: String, value: JsonValue) {
156        self.state.write().insert(key, value);
157    }
158
159    /// Get state value
160    pub fn get_state(&self, key: &str) -> Option<JsonValue> {
161        self.state.read().get(key).cloned()
162    }
163
164    /// Add event to window
165    pub fn add_to_window(&self, window_key: &str, event: Event, timestamp: DateTime<Utc>) {
166        let mut windows = self.windows.write();
167        windows
168            .entry(window_key.to_string())
169            .or_default()
170            .push_back((timestamp, event));
171    }
172
173    /// Get events in window
174    pub fn get_window(&self, window_key: &str) -> Vec<Event> {
175        self.windows
176            .read()
177            .get(window_key)
178            .map(|w| w.iter().map(|(_, e)| e.clone()).collect())
179            .unwrap_or_default()
180    }
181
182    /// Evict expired events from window
183    pub fn evict_window(&self, window_key: &str, cutoff: DateTime<Utc>) {
184        if let Some(window) = self.windows.write().get_mut(window_key) {
185            window.retain(|(ts, _)| *ts > cutoff);
186        }
187    }
188
189    /// Clear all state
190    pub fn clear(&self) {
191        self.state.write().clear();
192        self.windows.write().clear();
193    }
194}
195
196impl Default for StatefulOperator {
197    fn default() -> Self {
198        Self::new()
199    }
200}
201
202/// Pipeline execution engine
203pub struct Pipeline {
204    config: PipelineConfig,
205    state: StatefulOperator,
206    stats: Arc<RwLock<PipelineStats>>,
207}
208
209impl Pipeline {
210    pub fn new(config: PipelineConfig) -> Self {
211        let stats = PipelineStats {
212            pipeline_id: config.id,
213            events_processed: 0,
214            events_filtered: 0,
215            events_failed: 0,
216            last_processed: None,
217        };
218
219        Self {
220            config,
221            state: StatefulOperator::new(),
222            stats: Arc::new(RwLock::new(stats)),
223        }
224    }
225
226    /// Process an event through the pipeline
227    pub fn process(&self, event: &Event) -> Result<Option<JsonValue>> {
228        // Check if event type matches source filter
229        if !self.config.source_event_types.is_empty()
230            && !self
231                .config
232                .source_event_types
233                .iter()
234                .any(|t| t == event.event_type_str())
235        {
236            return Ok(None);
237        }
238
239        if !self.config.enabled {
240            return Ok(None);
241        }
242
243        let mut current_value = event.payload.clone();
244        let mut filtered = false;
245
246        // Apply operators in sequence
247        for operator in &self.config.operators {
248            match self.apply_operator(operator, &current_value, event) {
249                Ok(Some(result)) => {
250                    current_value = result;
251                }
252                Ok(None) => {
253                    // Event was filtered out
254                    filtered = true;
255                    self.stats.write().events_filtered += 1;
256                    break;
257                }
258                Err(e) => {
259                    self.stats.write().events_failed += 1;
260                    tracing::error!("Pipeline {} operator failed: {}", self.config.name, e);
261                    return Err(e);
262                }
263            }
264        }
265
266        // Update stats
267        let mut stats = self.stats.write();
268        stats.events_processed += 1;
269        stats.last_processed = Some(Utc::now());
270
271        if filtered {
272            Ok(None)
273        } else {
274            Ok(Some(current_value))
275        }
276    }
277
278    /// Apply a single operator
279    fn apply_operator(
280        &self,
281        operator: &PipelineOperator,
282        value: &JsonValue,
283        event: &Event,
284    ) -> Result<Option<JsonValue>> {
285        match operator {
286            PipelineOperator::Filter {
287                field,
288                value: expected,
289                op,
290            } => self.apply_filter(field, expected, op, value),
291
292            PipelineOperator::Map { field, transform } => self.apply_map(field, transform, value),
293
294            PipelineOperator::Reduce {
295                field,
296                function,
297                group_by,
298            } => self.apply_reduce(field, function, group_by.as_deref(), value, event),
299
300            PipelineOperator::Window {
301                config,
302                aggregation,
303            } => self.apply_window(config, aggregation, event),
304
305            PipelineOperator::Enrich { source, fields } => self.apply_enrich(source, fields, value),
306
307            PipelineOperator::Branch { field, branches } => {
308                self.apply_branch(field, branches, value)
309            }
310        }
311    }
312
313    /// Apply filter operator
314    fn apply_filter(
315        &self,
316        field: &str,
317        expected: &JsonValue,
318        op: &str,
319        value: &JsonValue,
320    ) -> Result<Option<JsonValue>> {
321        let field_value = self.get_field(value, field);
322
323        let matches = match op {
324            "eq" => field_value == Some(expected),
325            "ne" => field_value != Some(expected),
326            "gt" => {
327                if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
328                    (field_value.as_ref(), expected)
329                {
330                    a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
331                } else {
332                    false
333                }
334            }
335            "lt" => {
336                if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
337                    (field_value.as_ref(), expected)
338                {
339                    a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
340                } else {
341                    false
342                }
343            }
344            "contains" => {
345                if let (Some(JsonValue::String(a)), JsonValue::String(b)) =
346                    (field_value.as_ref(), expected)
347                {
348                    a.contains(b)
349                } else {
350                    false
351                }
352            }
353            _ => {
354                return Err(AllSourceError::ValidationError(format!(
355                    "Unknown filter operator: {}",
356                    op
357                )));
358            }
359        };
360
361        if matches {
362            Ok(Some(value.clone()))
363        } else {
364            Ok(None) // Filtered out
365        }
366    }
367
368    /// Apply map transformation
369    fn apply_map(
370        &self,
371        field: &str,
372        transform: &str,
373        value: &JsonValue,
374    ) -> Result<Option<JsonValue>> {
375        let mut result = value.clone();
376
377        // Simple transformations
378        let field_value = self.get_field(value, field);
379
380        let transformed = match transform {
381            "uppercase" => field_value
382                .and_then(|v| v.as_str())
383                .map(|s| JsonValue::String(s.to_uppercase())),
384            "lowercase" => field_value
385                .and_then(|v| v.as_str())
386                .map(|s| JsonValue::String(s.to_lowercase())),
387            "trim" => field_value
388                .and_then(|v| v.as_str())
389                .map(|s| JsonValue::String(s.trim().to_string())),
390            _ => {
391                // Try to parse as number operation
392                if let Some(stripped) = transform.strip_prefix("multiply:") {
393                    if let Ok(multiplier) = stripped.parse::<f64>() {
394                        field_value.and_then(|v| v.as_f64()).map(|n| {
395                            JsonValue::Number(serde_json::Number::from_f64(n * multiplier).unwrap())
396                        })
397                    } else {
398                        None
399                    }
400                } else if let Some(stripped) = transform.strip_prefix("add:") {
401                    if let Ok(addend) = stripped.parse::<f64>() {
402                        field_value.and_then(|v| v.as_f64()).map(|n| {
403                            JsonValue::Number(serde_json::Number::from_f64(n + addend).unwrap())
404                        })
405                    } else {
406                        None
407                    }
408                } else {
409                    None
410                }
411            }
412        };
413
414        if let Some(new_value) = transformed {
415            self.set_field(&mut result, field, new_value);
416        }
417
418        Ok(Some(result))
419    }
420
421    /// Apply reduce aggregation
422    fn apply_reduce(
423        &self,
424        field: &str,
425        function: &str,
426        group_by: Option<&str>,
427        value: &JsonValue,
428        event: &Event,
429    ) -> Result<Option<JsonValue>> {
430        // Get group key
431        let group_key = if let Some(group_field) = group_by {
432            self.get_field(value, group_field)
433                .and_then(|v| v.as_str())
434                .unwrap_or("default")
435                .to_string()
436        } else {
437            "default".to_string()
438        };
439
440        let state_key = format!("reduce_{}_{}", function, group_key);
441
442        // Get current aggregate value
443        let current = self.state.get_state(&state_key);
444
445        // Get field value to aggregate
446        let field_value = self.get_field(value, field);
447
448        let new_value = match function {
449            "count" => {
450                let count = current.and_then(|v| v.as_u64()).unwrap_or(0) + 1;
451                JsonValue::Number(count.into())
452            }
453            "sum" => {
454                let current_sum = current.and_then(|v| v.as_f64()).unwrap_or(0.0);
455                let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
456                JsonValue::Number(serde_json::Number::from_f64(current_sum + value_to_add).unwrap())
457            }
458            "avg" => {
459                // Store sum and count separately
460                let sum_key = format!("{}_sum", state_key);
461                let count_key = format!("{}_count", state_key);
462
463                let current_sum = self
464                    .state
465                    .get_state(&sum_key)
466                    .and_then(|v| v.as_f64())
467                    .unwrap_or(0.0);
468                let current_count = self
469                    .state
470                    .get_state(&count_key)
471                    .and_then(|v| v.as_u64())
472                    .unwrap_or(0);
473
474                let value_to_add = field_value.and_then(|v| v.as_f64()).unwrap_or(0.0);
475
476                let new_sum = current_sum + value_to_add;
477                let new_count = current_count + 1;
478
479                self.state.set_state(
480                    sum_key,
481                    JsonValue::Number(serde_json::Number::from_f64(new_sum).unwrap()),
482                );
483                self.state
484                    .set_state(count_key, JsonValue::Number(new_count.into()));
485
486                let avg = new_sum / new_count as f64;
487                JsonValue::Number(serde_json::Number::from_f64(avg).unwrap())
488            }
489            "min" => {
490                let current_min = current.and_then(|v| v.as_f64());
491                let new_val = field_value.and_then(|v| v.as_f64());
492
493                match (current_min, new_val) {
494                    (Some(curr), Some(new)) => {
495                        JsonValue::Number(serde_json::Number::from_f64(curr.min(new)).unwrap())
496                    }
497                    (None, Some(new)) => {
498                        JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
499                    }
500                    (Some(curr), None) => {
501                        JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
502                    }
503                    (None, None) => JsonValue::Null,
504                }
505            }
506            "max" => {
507                let current_max = current.and_then(|v| v.as_f64());
508                let new_val = field_value.and_then(|v| v.as_f64());
509
510                match (current_max, new_val) {
511                    (Some(curr), Some(new)) => {
512                        JsonValue::Number(serde_json::Number::from_f64(curr.max(new)).unwrap())
513                    }
514                    (None, Some(new)) => {
515                        JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
516                    }
517                    (Some(curr), None) => {
518                        JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
519                    }
520                    (None, None) => JsonValue::Null,
521                }
522            }
523            _ => {
524                return Err(AllSourceError::ValidationError(format!(
525                    "Unknown reduce function: {}",
526                    function
527                )));
528            }
529        };
530
531        // Update state
532        self.state.set_state(state_key.clone(), new_value.clone());
533
534        // Return aggregated result
535        let result = serde_json::json!({
536            "group": group_key,
537            "function": function,
538            "value": new_value
539        });
540
541        Ok(Some(result))
542    }
543
544    /// Apply window aggregation
545    fn apply_window(
546        &self,
547        config: &WindowConfig,
548        aggregation: &PipelineOperator,
549        event: &Event,
550    ) -> Result<Option<JsonValue>> {
551        let window_key = format!("window_{}", self.config.id);
552        let now = Utc::now();
553
554        // Add event to window
555        self.state
556            .add_to_window(&window_key, event.clone(), event.timestamp);
557
558        // Evict expired events based on window type
559        let cutoff = match config.window_type {
560            WindowType::Tumbling => now - Duration::seconds(config.size_seconds),
561            WindowType::Sliding => {
562                let slide = config.slide_seconds.unwrap_or(config.size_seconds);
563                now - Duration::seconds(slide)
564            }
565            WindowType::Session => {
566                let timeout = config.session_timeout_seconds.unwrap_or(300);
567                now - Duration::seconds(timeout)
568            }
569        };
570
571        self.state.evict_window(&window_key, cutoff);
572
573        // Get events in current window
574        let window_events = self.state.get_window(&window_key);
575
576        // Apply aggregation to window
577        let mut aggregate_value = JsonValue::Null;
578        for window_event in &window_events {
579            if let Ok(Some(result)) =
580                self.apply_operator(aggregation, &window_event.payload, window_event)
581            {
582                aggregate_value = result;
583            }
584        }
585
586        Ok(Some(serde_json::json!({
587            "window_type": config.window_type,
588            "window_size_seconds": config.size_seconds,
589            "events_in_window": window_events.len(),
590            "aggregation": aggregate_value
591        })))
592    }
593
594    /// Apply enrichment
595    fn apply_enrich(
596        &self,
597        _source: &str,
598        fields: &[String],
599        value: &JsonValue,
600    ) -> Result<Option<JsonValue>> {
601        // Placeholder for enrichment logic
602        // In production, this would fetch data from external sources
603        let mut result = value.clone();
604
605        for field in fields {
606            let enriched_value = JsonValue::String(format!("enriched_{}", field));
607            self.set_field(&mut result, field, enriched_value);
608        }
609
610        Ok(Some(result))
611    }
612
613    /// Apply branch routing
614    fn apply_branch(
615        &self,
616        field: &str,
617        branches: &HashMap<String, String>,
618        value: &JsonValue,
619    ) -> Result<Option<JsonValue>> {
620        let field_value = self.get_field(value, field);
621
622        if let Some(JsonValue::String(val)) = field_value
623            && let Some(route) = branches.get(val)
624        {
625            let mut result = value.clone();
626            if let JsonValue::Object(ref mut obj) = result {
627                obj.insert("_route".to_string(), JsonValue::String(route.clone()));
628            }
629            return Ok(Some(result));
630        }
631
632        Ok(Some(value.clone()))
633    }
634
635    /// Helper: Get nested field from JSON
636    fn get_field<'a>(&self, value: &'a JsonValue, field: &str) -> Option<&'a JsonValue> {
637        let parts: Vec<&str> = field.split('.').collect();
638        let mut current = value;
639
640        for part in parts {
641            current = current.get(part)?;
642        }
643
644        Some(current)
645    }
646
647    /// Helper: Set nested field in JSON
648    fn set_field(&self, value: &mut JsonValue, field: &str, new_value: JsonValue) {
649        let parts: Vec<&str> = field.split('.').collect();
650
651        if parts.len() == 1 {
652            if let JsonValue::Object(obj) = value {
653                obj.insert(field.to_string(), new_value);
654            }
655            return;
656        }
657
658        // Navigate to parent
659        let mut current = value;
660        for part in &parts[..parts.len() - 1] {
661            if let JsonValue::Object(obj) = current {
662                current = obj
663                    .entry(part.to_string())
664                    .or_insert(JsonValue::Object(Default::default()));
665            }
666        }
667
668        // Set final value
669        if let JsonValue::Object(obj) = current {
670            obj.insert(parts.last().unwrap().to_string(), new_value);
671        }
672    }
673
674    /// Get pipeline statistics
675    pub fn stats(&self) -> PipelineStats {
676        self.stats.read().clone()
677    }
678
679    /// Get pipeline configuration
680    pub fn config(&self) -> &PipelineConfig {
681        &self.config
682    }
683
684    /// Reset pipeline state
685    pub fn reset(&self) {
686        self.state.clear();
687        let mut stats = self.stats.write();
688        stats.events_processed = 0;
689        stats.events_filtered = 0;
690        stats.events_failed = 0;
691        stats.last_processed = None;
692    }
693}
694
695/// Manages multiple pipelines
696pub struct PipelineManager {
697    // Using DashMap for lock-free concurrent access
698    pipelines: Arc<DashMap<Uuid, Arc<Pipeline>>>,
699    metrics: Arc<MetricsRegistry>,
700}
701
702impl PipelineManager {
703    pub fn new() -> Self {
704        Self::with_metrics(MetricsRegistry::new())
705    }
706
707    pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
708        Self {
709            pipelines: Arc::new(DashMap::new()),
710            metrics,
711        }
712    }
713
714    /// Register a new pipeline
715    pub fn register(&self, config: PipelineConfig) -> Uuid {
716        let id = config.id;
717        let name = config.name.clone();
718        let pipeline = Arc::new(Pipeline::new(config));
719        self.pipelines.insert(id, pipeline);
720
721        let count = self.pipelines.len();
722        self.metrics.pipelines_registered_total.set(count as i64);
723
724        tracing::info!("📊 Registered pipeline: {} ({})", name, id);
725        id
726    }
727
728    /// Get a pipeline by ID
729    pub fn get(&self, id: Uuid) -> Option<Arc<Pipeline>> {
730        self.pipelines.get(&id).map(|entry| entry.value().clone())
731    }
732
733    /// Process event through all matching pipelines
734    pub fn process_event(&self, event: &Event) -> Vec<(Uuid, JsonValue)> {
735        let timer = self.metrics.pipeline_duration_seconds.start_timer();
736
737        let mut results = Vec::new();
738
739        for entry in self.pipelines.iter() {
740            let id = entry.key();
741            let pipeline = entry.value();
742            let pipeline_name = &pipeline.config().name;
743            let pipeline_id = id.to_string();
744
745            match pipeline.process(event) {
746                Ok(Some(result)) => {
747                    self.metrics
748                        .pipeline_events_processed
749                        .with_label_values(&[&pipeline_id, pipeline_name])
750                        .inc();
751                    results.push((*id, result));
752                }
753                Ok(None) => {
754                    // Event filtered out or didn't match - not an error
755                }
756                Err(e) => {
757                    self.metrics
758                        .pipeline_errors_total
759                        .with_label_values(&[pipeline_name])
760                        .inc();
761                    tracing::error!(
762                        "Pipeline '{}' ({}) failed to process event: {}",
763                        pipeline_name,
764                        id,
765                        e
766                    );
767                }
768            }
769        }
770
771        timer.observe_duration();
772        results
773    }
774
775    /// List all pipelines
776    pub fn list(&self) -> Vec<PipelineConfig> {
777        self.pipelines
778            .iter()
779            .map(|entry| entry.value().config().clone())
780            .collect()
781    }
782
783    /// Remove a pipeline
784    pub fn remove(&self, id: Uuid) -> bool {
785        let removed = self.pipelines.remove(&id).is_some();
786
787        if removed {
788            let count = self.pipelines.len();
789            self.metrics.pipelines_registered_total.set(count as i64);
790        }
791
792        removed
793    }
794
795    /// Get statistics for all pipelines
796    pub fn all_stats(&self) -> Vec<PipelineStats> {
797        self.pipelines
798            .iter()
799            .map(|entry| entry.value().stats())
800            .collect()
801    }
802}
803
804impl Default for PipelineManager {
805    fn default() -> Self {
806        Self::new()
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813    use serde_json::json;
814
815    #[test]
816    fn test_filter_operator() {
817        let config = PipelineConfig {
818            id: Uuid::new_v4(),
819            name: "test_filter".to_string(),
820            description: None,
821            source_event_types: vec!["test".to_string()],
822            operators: vec![PipelineOperator::Filter {
823                field: "status".to_string(),
824                value: json!("active"),
825                op: "eq".to_string(),
826            }],
827            enabled: true,
828            output: "test_output".to_string(),
829        };
830
831        let pipeline = Pipeline::new(config);
832        let event = Event::from_strings(
833            "test".to_string(),
834            "entity1".to_string(),
835            "default".to_string(),
836            json!({"status": "active"}),
837            None,
838        )
839        .unwrap();
840
841        let result = pipeline.process(&event).unwrap();
842        assert!(result.is_some());
843    }
844
845    #[test]
846    fn test_map_operator() {
847        let config = PipelineConfig {
848            id: Uuid::new_v4(),
849            name: "test_map".to_string(),
850            description: None,
851            source_event_types: vec!["test".to_string()],
852            operators: vec![PipelineOperator::Map {
853                field: "name".to_string(),
854                transform: "uppercase".to_string(),
855            }],
856            enabled: true,
857            output: "test_output".to_string(),
858        };
859
860        let pipeline = Pipeline::new(config);
861        let event = Event::from_strings(
862            "test".to_string(),
863            "entity1".to_string(),
864            "default".to_string(),
865            json!({"name": "hello"}),
866            None,
867        )
868        .unwrap();
869
870        let result = pipeline.process(&event).unwrap().unwrap();
871        assert_eq!(result["name"], "HELLO");
872    }
873
874    #[test]
875    fn test_reduce_count() {
876        let config = PipelineConfig {
877            id: Uuid::new_v4(),
878            name: "test_reduce".to_string(),
879            description: None,
880            source_event_types: vec!["test".to_string()],
881            operators: vec![PipelineOperator::Reduce {
882                field: "value".to_string(),
883                function: "count".to_string(),
884                group_by: None,
885            }],
886            enabled: true,
887            output: "test_output".to_string(),
888        };
889
890        let pipeline = Pipeline::new(config);
891
892        for i in 0..5 {
893            let event = Event::from_strings(
894                "test".to_string(),
895                "entity1".to_string(),
896                "default".to_string(),
897                json!({"value": i}),
898                None,
899            )
900            .unwrap();
901            pipeline.process(&event).unwrap();
902        }
903
904        let result = pipeline
905            .process(
906                &Event::from_strings(
907                    "test".to_string(),
908                    "entity1".to_string(),
909                    "default".to_string(),
910                    json!({"value": 5}),
911                    None,
912                )
913                .unwrap(),
914            )
915            .unwrap()
916            .unwrap();
917
918        assert_eq!(result["value"], 6);
919    }
920}