Skip to main content

allsource_core/application/services/
pipeline.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4    infrastructure::observability::metrics::MetricsRegistry,
5};
6use chrono::{DateTime, Duration, Utc};
7use dashmap::DashMap;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use serde_json::Value as JsonValue;
11use std::{
12    collections::{HashMap, VecDeque},
13    sync::Arc,
14};
15use uuid::Uuid;
16
17/// Window type for time-based aggregations
18#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
19#[serde(rename_all = "lowercase")]
20pub enum WindowType {
21    /// Tumbling window (non-overlapping)
22    Tumbling,
23    /// Sliding window (overlapping)
24    Sliding,
25    /// Session window (activity-based)
26    Session,
27}
28
29/// Window configuration
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct WindowConfig {
32    /// Type of window
33    pub window_type: WindowType,
34
35    /// Window size in seconds
36    pub size_seconds: i64,
37
38    /// Slide interval in seconds (for sliding windows)
39    pub slide_seconds: Option<i64>,
40
41    /// Session timeout in seconds (for session windows)
42    pub session_timeout_seconds: Option<i64>,
43}
44
45/// Pipeline operator types
46#[derive(Debug, Clone, Serialize, Deserialize)]
47#[serde(tag = "type", rename_all = "lowercase")]
48pub enum PipelineOperator {
49    /// Filter events based on a condition
50    Filter {
51        /// Field path to check (e.g., "payload.status")
52        field: String,
53        /// Expected value
54        value: JsonValue,
55        /// Operator: eq, ne, gt, lt, contains
56        op: String,
57    },
58
59    /// Transform event payload
60    Map {
61        /// Field to transform
62        field: String,
63        /// Transformation expression (simple for now)
64        transform: String,
65    },
66
67    /// Aggregate events
68    Reduce {
69        /// Field to aggregate
70        field: String,
71        /// Aggregation function: sum, count, avg, min, max
72        function: String,
73        /// Group by field (optional)
74        group_by: Option<String>,
75    },
76
77    /// Window-based aggregation
78    Window {
79        /// Window configuration
80        config: WindowConfig,
81        /// Aggregation to apply within window
82        aggregation: Box<PipelineOperator>,
83    },
84
85    /// Enrich event with external data
86    Enrich {
87        /// Source to enrich from
88        source: String,
89        /// Fields to add
90        fields: Vec<String>,
91    },
92
93    /// Split stream based on condition
94    Branch {
95        /// Condition field
96        field: String,
97        /// Branch mapping
98        branches: HashMap<String, String>,
99    },
100}
101
102/// Pipeline configuration
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct PipelineConfig {
105    /// Pipeline ID
106    pub id: Uuid,
107
108    /// Pipeline name
109    pub name: String,
110
111    /// Description
112    pub description: Option<String>,
113
114    /// Source event types to process
115    pub source_event_types: Vec<String>,
116
117    /// Pipeline operators in order
118    pub operators: Vec<PipelineOperator>,
119
120    /// Whether pipeline is enabled
121    pub enabled: bool,
122
123    /// Output destination (projection name or topic)
124    pub output: String,
125}
126
127/// Pipeline execution statistics
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct PipelineStats {
130    pub pipeline_id: Uuid,
131    pub events_processed: u64,
132    pub events_filtered: u64,
133    pub events_failed: u64,
134    pub last_processed: Option<DateTime<Utc>>,
135}
136
137/// Stateful operator for maintaining state across events
138pub struct StatefulOperator {
139    /// Operator state storage
140    state: Arc<RwLock<HashMap<String, JsonValue>>>,
141
142    /// Window buffers for time-based operations
143    windows: Arc<RwLock<HashMap<String, VecDeque<(DateTime<Utc>, Event)>>>>,
144}
145
146impl StatefulOperator {
147    pub fn new() -> Self {
148        Self {
149            state: Arc::new(RwLock::new(HashMap::new())),
150            windows: Arc::new(RwLock::new(HashMap::new())),
151        }
152    }
153
154    /// Store state value
155    pub fn set_state(&self, key: String, value: JsonValue) {
156        self.state.write().insert(key, value);
157    }
158
159    /// Get state value
160    pub fn get_state(&self, key: &str) -> Option<JsonValue> {
161        self.state.read().get(key).cloned()
162    }
163
164    /// Add event to window
165    pub fn add_to_window(&self, window_key: &str, event: Event, timestamp: DateTime<Utc>) {
166        let mut windows = self.windows.write();
167        windows
168            .entry(window_key.to_string())
169            .or_default()
170            .push_back((timestamp, event));
171    }
172
173    /// Get events in window
174    pub fn get_window(&self, window_key: &str) -> Vec<Event> {
175        self.windows
176            .read()
177            .get(window_key)
178            .map(|w| w.iter().map(|(_, e)| e.clone()).collect())
179            .unwrap_or_default()
180    }
181
182    /// Evict expired events from window
183    pub fn evict_window(&self, window_key: &str, cutoff: DateTime<Utc>) {
184        if let Some(window) = self.windows.write().get_mut(window_key) {
185            window.retain(|(ts, _)| *ts > cutoff);
186        }
187    }
188
189    /// Clear all state
190    pub fn clear(&self) {
191        self.state.write().clear();
192        self.windows.write().clear();
193    }
194}
195
196impl Default for StatefulOperator {
197    fn default() -> Self {
198        Self::new()
199    }
200}
201
202/// Pipeline execution engine
203pub struct Pipeline {
204    config: PipelineConfig,
205    state: StatefulOperator,
206    stats: Arc<RwLock<PipelineStats>>,
207}
208
209impl Pipeline {
210    pub fn new(config: PipelineConfig) -> Self {
211        let stats = PipelineStats {
212            pipeline_id: config.id,
213            events_processed: 0,
214            events_filtered: 0,
215            events_failed: 0,
216            last_processed: None,
217        };
218
219        Self {
220            config,
221            state: StatefulOperator::new(),
222            stats: Arc::new(RwLock::new(stats)),
223        }
224    }
225
226    /// Process an event through the pipeline
227    pub fn process(&self, event: &Event) -> Result<Option<JsonValue>> {
228        // Check if event type matches source filter
229        if !self.config.source_event_types.is_empty()
230            && !self
231                .config
232                .source_event_types
233                .iter()
234                .any(|t| t == event.event_type_str())
235        {
236            return Ok(None);
237        }
238
239        if !self.config.enabled {
240            return Ok(None);
241        }
242
243        let mut current_value = event.payload.clone();
244        let mut filtered = false;
245
246        // Apply operators in sequence
247        for operator in &self.config.operators {
248            match self.apply_operator(operator, &current_value, event) {
249                Ok(Some(result)) => {
250                    current_value = result;
251                }
252                Ok(None) => {
253                    // Event was filtered out
254                    filtered = true;
255                    self.stats.write().events_filtered += 1;
256                    break;
257                }
258                Err(e) => {
259                    self.stats.write().events_failed += 1;
260                    tracing::error!("Pipeline {} operator failed: {}", self.config.name, e);
261                    return Err(e);
262                }
263            }
264        }
265
266        // Update stats
267        let mut stats = self.stats.write();
268        stats.events_processed += 1;
269        stats.last_processed = Some(Utc::now());
270
271        if filtered {
272            Ok(None)
273        } else {
274            Ok(Some(current_value))
275        }
276    }
277
278    /// Apply a single operator
279    fn apply_operator(
280        &self,
281        operator: &PipelineOperator,
282        value: &JsonValue,
283        event: &Event,
284    ) -> Result<Option<JsonValue>> {
285        match operator {
286            PipelineOperator::Filter {
287                field,
288                value: expected,
289                op,
290            } => self.apply_filter(field, expected, op, value),
291
292            PipelineOperator::Map { field, transform } => self.apply_map(field, transform, value),
293
294            PipelineOperator::Reduce {
295                field,
296                function,
297                group_by,
298            } => self.apply_reduce(field, function, group_by.as_deref(), value, event),
299
300            PipelineOperator::Window {
301                config,
302                aggregation,
303            } => self.apply_window(config, aggregation, event),
304
305            PipelineOperator::Enrich { source, fields } => self.apply_enrich(source, fields, value),
306
307            PipelineOperator::Branch { field, branches } => {
308                self.apply_branch(field, branches, value)
309            }
310        }
311    }
312
313    /// Apply filter operator
314    fn apply_filter(
315        &self,
316        field: &str,
317        expected: &JsonValue,
318        op: &str,
319        value: &JsonValue,
320    ) -> Result<Option<JsonValue>> {
321        let field_value = self.get_field(value, field);
322
323        let matches = match op {
324            "eq" => field_value == Some(expected),
325            "ne" => field_value != Some(expected),
326            "gt" => {
327                if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
328                    (field_value.as_ref(), expected)
329                {
330                    a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
331                } else {
332                    false
333                }
334            }
335            "lt" => {
336                if let (Some(JsonValue::Number(a)), JsonValue::Number(b)) =
337                    (field_value.as_ref(), expected)
338                {
339                    a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
340                } else {
341                    false
342                }
343            }
344            "contains" => {
345                if let (Some(JsonValue::String(a)), JsonValue::String(b)) =
346                    (field_value.as_ref(), expected)
347                {
348                    a.contains(b)
349                } else {
350                    false
351                }
352            }
353            _ => {
354                return Err(AllSourceError::ValidationError(format!(
355                    "Unknown filter operator: {op}"
356                )));
357            }
358        };
359
360        if matches {
361            Ok(Some(value.clone()))
362        } else {
363            Ok(None) // Filtered out
364        }
365    }
366
367    /// Apply map transformation
368    fn apply_map(
369        &self,
370        field: &str,
371        transform: &str,
372        value: &JsonValue,
373    ) -> Result<Option<JsonValue>> {
374        let mut result = value.clone();
375
376        // Simple transformations
377        let field_value = self.get_field(value, field);
378
379        let transformed = match transform {
380            "uppercase" => field_value
381                .and_then(|v| v.as_str())
382                .map(|s| JsonValue::String(s.to_uppercase())),
383            "lowercase" => field_value
384                .and_then(|v| v.as_str())
385                .map(|s| JsonValue::String(s.to_lowercase())),
386            "trim" => field_value
387                .and_then(|v| v.as_str())
388                .map(|s| JsonValue::String(s.trim().to_string())),
389            _ => {
390                // Try to parse as number operation
391                if let Some(stripped) = transform.strip_prefix("multiply:") {
392                    if let Ok(multiplier) = stripped.parse::<f64>() {
393                        field_value.and_then(serde_json::Value::as_f64).map(|n| {
394                            JsonValue::Number(serde_json::Number::from_f64(n * multiplier).unwrap())
395                        })
396                    } else {
397                        None
398                    }
399                } else if let Some(stripped) = transform.strip_prefix("add:") {
400                    if let Ok(addend) = stripped.parse::<f64>() {
401                        field_value.and_then(serde_json::Value::as_f64).map(|n| {
402                            JsonValue::Number(serde_json::Number::from_f64(n + addend).unwrap())
403                        })
404                    } else {
405                        None
406                    }
407                } else {
408                    None
409                }
410            }
411        };
412
413        if let Some(new_value) = transformed {
414            self.set_field(&mut result, field, new_value);
415        }
416
417        Ok(Some(result))
418    }
419
420    /// Apply reduce aggregation
421    fn apply_reduce(
422        &self,
423        field: &str,
424        function: &str,
425        group_by: Option<&str>,
426        value: &JsonValue,
427        event: &Event,
428    ) -> Result<Option<JsonValue>> {
429        // Get group key
430        let group_key = if let Some(group_field) = group_by {
431            self.get_field(value, group_field)
432                .and_then(|v| v.as_str())
433                .unwrap_or("default")
434                .to_string()
435        } else {
436            "default".to_string()
437        };
438
439        let state_key = format!("reduce_{function}_{group_key}");
440
441        // Get current aggregate value
442        let current = self.state.get_state(&state_key);
443
444        // Get field value to aggregate
445        let field_value = self.get_field(value, field);
446
447        let new_value = match function {
448            "count" => {
449                let count = current.and_then(|v| v.as_u64()).unwrap_or(0) + 1;
450                JsonValue::Number(count.into())
451            }
452            "sum" => {
453                let current_sum = current.and_then(|v| v.as_f64()).unwrap_or(0.0);
454                let value_to_add = field_value
455                    .and_then(serde_json::Value::as_f64)
456                    .unwrap_or(0.0);
457                JsonValue::Number(serde_json::Number::from_f64(current_sum + value_to_add).unwrap())
458            }
459            "avg" => {
460                // Store sum and count separately
461                let sum_key = format!("{state_key}_sum");
462                let count_key = format!("{state_key}_count");
463
464                let current_sum = self
465                    .state
466                    .get_state(&sum_key)
467                    .and_then(|v| v.as_f64())
468                    .unwrap_or(0.0);
469                let current_count = self
470                    .state
471                    .get_state(&count_key)
472                    .and_then(|v| v.as_u64())
473                    .unwrap_or(0);
474
475                let value_to_add = field_value
476                    .and_then(serde_json::Value::as_f64)
477                    .unwrap_or(0.0);
478
479                let new_sum = current_sum + value_to_add;
480                let new_count = current_count + 1;
481
482                self.state.set_state(
483                    sum_key,
484                    JsonValue::Number(serde_json::Number::from_f64(new_sum).unwrap()),
485                );
486                self.state
487                    .set_state(count_key, JsonValue::Number(new_count.into()));
488
489                let avg = new_sum / new_count as f64;
490                JsonValue::Number(serde_json::Number::from_f64(avg).unwrap())
491            }
492            "min" => {
493                let current_min = current.and_then(|v| v.as_f64());
494                let new_val = field_value.and_then(serde_json::Value::as_f64);
495
496                match (current_min, new_val) {
497                    (Some(curr), Some(new)) => {
498                        JsonValue::Number(serde_json::Number::from_f64(curr.min(new)).unwrap())
499                    }
500                    (None, Some(new)) => {
501                        JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
502                    }
503                    (Some(curr), None) => {
504                        JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
505                    }
506                    (None, None) => JsonValue::Null,
507                }
508            }
509            "max" => {
510                let current_max = current.and_then(|v| v.as_f64());
511                let new_val = field_value.and_then(serde_json::Value::as_f64);
512
513                match (current_max, new_val) {
514                    (Some(curr), Some(new)) => {
515                        JsonValue::Number(serde_json::Number::from_f64(curr.max(new)).unwrap())
516                    }
517                    (None, Some(new)) => {
518                        JsonValue::Number(serde_json::Number::from_f64(new).unwrap())
519                    }
520                    (Some(curr), None) => {
521                        JsonValue::Number(serde_json::Number::from_f64(curr).unwrap())
522                    }
523                    (None, None) => JsonValue::Null,
524                }
525            }
526            _ => {
527                return Err(AllSourceError::ValidationError(format!(
528                    "Unknown reduce function: {function}"
529                )));
530            }
531        };
532
533        // Update state
534        self.state.set_state(state_key.clone(), new_value.clone());
535
536        // Return aggregated result
537        let result = serde_json::json!({
538            "group": group_key,
539            "function": function,
540            "value": new_value
541        });
542
543        Ok(Some(result))
544    }
545
546    /// Apply window aggregation
547    fn apply_window(
548        &self,
549        config: &WindowConfig,
550        aggregation: &PipelineOperator,
551        event: &Event,
552    ) -> Result<Option<JsonValue>> {
553        let window_key = format!("window_{}", self.config.id);
554        let now = Utc::now();
555
556        // Add event to window
557        self.state
558            .add_to_window(&window_key, event.clone(), event.timestamp);
559
560        // Evict expired events based on window type
561        let cutoff = match config.window_type {
562            WindowType::Tumbling => now - Duration::seconds(config.size_seconds),
563            WindowType::Sliding => {
564                let slide = config.slide_seconds.unwrap_or(config.size_seconds);
565                now - Duration::seconds(slide)
566            }
567            WindowType::Session => {
568                let timeout = config.session_timeout_seconds.unwrap_or(300);
569                now - Duration::seconds(timeout)
570            }
571        };
572
573        self.state.evict_window(&window_key, cutoff);
574
575        // Get events in current window
576        let window_events = self.state.get_window(&window_key);
577
578        // Apply aggregation to window
579        let mut aggregate_value = JsonValue::Null;
580        for window_event in &window_events {
581            if let Ok(Some(result)) =
582                self.apply_operator(aggregation, &window_event.payload, window_event)
583            {
584                aggregate_value = result;
585            }
586        }
587
588        Ok(Some(serde_json::json!({
589            "window_type": config.window_type,
590            "window_size_seconds": config.size_seconds,
591            "events_in_window": window_events.len(),
592            "aggregation": aggregate_value
593        })))
594    }
595
596    /// Apply enrichment
597    fn apply_enrich(
598        &self,
599        _source: &str,
600        fields: &[String],
601        value: &JsonValue,
602    ) -> Result<Option<JsonValue>> {
603        // Placeholder for enrichment logic
604        // In production, this would fetch data from external sources
605        let mut result = value.clone();
606
607        for field in fields {
608            let enriched_value = JsonValue::String(format!("enriched_{field}"));
609            self.set_field(&mut result, field, enriched_value);
610        }
611
612        Ok(Some(result))
613    }
614
615    /// Apply branch routing
616    fn apply_branch(
617        &self,
618        field: &str,
619        branches: &HashMap<String, String>,
620        value: &JsonValue,
621    ) -> Result<Option<JsonValue>> {
622        let field_value = self.get_field(value, field);
623
624        if let Some(JsonValue::String(val)) = field_value
625            && let Some(route) = branches.get(val)
626        {
627            let mut result = value.clone();
628            if let JsonValue::Object(ref mut obj) = result {
629                obj.insert("_route".to_string(), JsonValue::String(route.clone()));
630            }
631            return Ok(Some(result));
632        }
633
634        Ok(Some(value.clone()))
635    }
636
637    /// Helper: Get nested field from JSON
638    fn get_field<'a>(&self, value: &'a JsonValue, field: &str) -> Option<&'a JsonValue> {
639        let parts: Vec<&str> = field.split('.').collect();
640        let mut current = value;
641
642        for part in parts {
643            current = current.get(part)?;
644        }
645
646        Some(current)
647    }
648
649    /// Helper: Set nested field in JSON
650    fn set_field(&self, value: &mut JsonValue, field: &str, new_value: JsonValue) {
651        let parts: Vec<&str> = field.split('.').collect();
652
653        if parts.len() == 1 {
654            if let JsonValue::Object(obj) = value {
655                obj.insert(field.to_string(), new_value);
656            }
657            return;
658        }
659
660        // Navigate to parent
661        let mut current = value;
662        for part in &parts[..parts.len() - 1] {
663            if let JsonValue::Object(obj) = current {
664                current = obj
665                    .entry((*part).to_string())
666                    .or_insert(JsonValue::Object(Default::default()));
667            }
668        }
669
670        // Set final value
671        if let JsonValue::Object(obj) = current {
672            obj.insert((*parts.last().unwrap()).to_string(), new_value);
673        }
674    }
675
676    /// Get pipeline statistics
677    pub fn stats(&self) -> PipelineStats {
678        self.stats.read().clone()
679    }
680
681    /// Get pipeline configuration
682    pub fn config(&self) -> &PipelineConfig {
683        &self.config
684    }
685
686    /// Reset pipeline state
687    pub fn reset(&self) {
688        self.state.clear();
689        let mut stats = self.stats.write();
690        stats.events_processed = 0;
691        stats.events_filtered = 0;
692        stats.events_failed = 0;
693        stats.last_processed = None;
694    }
695}
696
697/// Manages multiple pipelines
698pub struct PipelineManager {
699    // Using DashMap for lock-free concurrent access
700    pipelines: Arc<DashMap<Uuid, Arc<Pipeline>>>,
701    metrics: Arc<MetricsRegistry>,
702}
703
704impl PipelineManager {
705    pub fn new() -> Self {
706        Self::with_metrics(MetricsRegistry::new())
707    }
708
709    pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
710        Self {
711            pipelines: Arc::new(DashMap::new()),
712            metrics,
713        }
714    }
715
716    /// Register a new pipeline
717    pub fn register(&self, config: PipelineConfig) -> Uuid {
718        let id = config.id;
719        let name = config.name.clone();
720        let pipeline = Arc::new(Pipeline::new(config));
721        self.pipelines.insert(id, pipeline);
722
723        let count = self.pipelines.len();
724        self.metrics.pipelines_registered_total.set(count as i64);
725
726        tracing::info!("📊 Registered pipeline: {} ({})", name, id);
727        id
728    }
729
730    /// Get a pipeline by ID
731    pub fn get(&self, id: Uuid) -> Option<Arc<Pipeline>> {
732        self.pipelines.get(&id).map(|entry| entry.value().clone())
733    }
734
735    /// Process event through all matching pipelines
736    pub fn process_event(&self, event: &Event) -> Vec<(Uuid, JsonValue)> {
737        let timer = self.metrics.pipeline_duration_seconds.start_timer();
738
739        let mut results = Vec::new();
740
741        for entry in self.pipelines.iter() {
742            let id = entry.key();
743            let pipeline = entry.value();
744            let pipeline_name = &pipeline.config().name;
745            let pipeline_id = id.to_string();
746
747            match pipeline.process(event) {
748                Ok(Some(result)) => {
749                    self.metrics
750                        .pipeline_events_processed
751                        .with_label_values(&[&pipeline_id, pipeline_name])
752                        .inc();
753                    results.push((*id, result));
754                }
755                Ok(None) => {
756                    // Event filtered out or didn't match - not an error
757                }
758                Err(e) => {
759                    self.metrics
760                        .pipeline_errors_total
761                        .with_label_values(&[pipeline_name])
762                        .inc();
763                    tracing::error!(
764                        "Pipeline '{}' ({}) failed to process event: {}",
765                        pipeline_name,
766                        id,
767                        e
768                    );
769                }
770            }
771        }
772
773        timer.observe_duration();
774        results
775    }
776
777    /// List all pipelines
778    pub fn list(&self) -> Vec<PipelineConfig> {
779        self.pipelines
780            .iter()
781            .map(|entry| entry.value().config().clone())
782            .collect()
783    }
784
785    /// Remove a pipeline
786    pub fn remove(&self, id: Uuid) -> bool {
787        let removed = self.pipelines.remove(&id).is_some();
788
789        if removed {
790            let count = self.pipelines.len();
791            self.metrics.pipelines_registered_total.set(count as i64);
792        }
793
794        removed
795    }
796
797    /// Get statistics for all pipelines
798    pub fn all_stats(&self) -> Vec<PipelineStats> {
799        self.pipelines
800            .iter()
801            .map(|entry| entry.value().stats())
802            .collect()
803    }
804}
805
806impl Default for PipelineManager {
807    fn default() -> Self {
808        Self::new()
809    }
810}
811
812#[cfg(test)]
813mod tests {
814    use super::*;
815    use serde_json::json;
816
817    #[test]
818    fn test_filter_operator() {
819        let config = PipelineConfig {
820            id: Uuid::new_v4(),
821            name: "test_filter".to_string(),
822            description: None,
823            source_event_types: vec!["test".to_string()],
824            operators: vec![PipelineOperator::Filter {
825                field: "status".to_string(),
826                value: json!("active"),
827                op: "eq".to_string(),
828            }],
829            enabled: true,
830            output: "test_output".to_string(),
831        };
832
833        let pipeline = Pipeline::new(config);
834        let event = Event::from_strings(
835            "test".to_string(),
836            "entity1".to_string(),
837            "default".to_string(),
838            json!({"status": "active"}),
839            None,
840        )
841        .unwrap();
842
843        let result = pipeline.process(&event).unwrap();
844        assert!(result.is_some());
845    }
846
847    #[test]
848    fn test_map_operator() {
849        let config = PipelineConfig {
850            id: Uuid::new_v4(),
851            name: "test_map".to_string(),
852            description: None,
853            source_event_types: vec!["test".to_string()],
854            operators: vec![PipelineOperator::Map {
855                field: "name".to_string(),
856                transform: "uppercase".to_string(),
857            }],
858            enabled: true,
859            output: "test_output".to_string(),
860        };
861
862        let pipeline = Pipeline::new(config);
863        let event = Event::from_strings(
864            "test".to_string(),
865            "entity1".to_string(),
866            "default".to_string(),
867            json!({"name": "hello"}),
868            None,
869        )
870        .unwrap();
871
872        let result = pipeline.process(&event).unwrap().unwrap();
873        assert_eq!(result["name"], "HELLO");
874    }
875
876    #[test]
877    fn test_reduce_count() {
878        let config = PipelineConfig {
879            id: Uuid::new_v4(),
880            name: "test_reduce".to_string(),
881            description: None,
882            source_event_types: vec!["test".to_string()],
883            operators: vec![PipelineOperator::Reduce {
884                field: "value".to_string(),
885                function: "count".to_string(),
886                group_by: None,
887            }],
888            enabled: true,
889            output: "test_output".to_string(),
890        };
891
892        let pipeline = Pipeline::new(config);
893
894        for i in 0..5 {
895            let event = Event::from_strings(
896                "test".to_string(),
897                "entity1".to_string(),
898                "default".to_string(),
899                json!({"value": i}),
900                None,
901            )
902            .unwrap();
903            pipeline.process(&event).unwrap();
904        }
905
906        let result = pipeline
907            .process(
908                &Event::from_strings(
909                    "test".to_string(),
910                    "entity1".to_string(),
911                    "default".to_string(),
912                    json!({"value": 5}),
913                    None,
914                )
915                .unwrap(),
916            )
917            .unwrap()
918            .unwrap();
919
920        assert_eq!(result["value"], 6);
921    }
922}