Skip to main content

enact_core/streaming/protection/
processor.rs

1//! Output Processor Trait
2//!
3//! Defines the trait for output processors that transform events before
4//! storage or streaming.
5
6use super::context::ProtectionContext;
7use crate::streaming::StreamEvent;
8use async_trait::async_trait;
9use std::sync::Arc;
10
11/// Result of processing an event
12#[derive(Debug, Clone)]
13pub struct ProcessedEvent {
14    /// The processed event (may be transformed)
15    pub event: StreamEvent,
16
17    /// Whether the event was modified
18    pub was_modified: bool,
19
20    /// Optional: encrypted payload for storage (if applicable)
21    pub encrypted_payload: Option<String>,
22}
23
24impl ProcessedEvent {
25    /// Create a processed event that was not modified
26    pub fn unchanged(event: StreamEvent) -> Self {
27        Self {
28            event,
29            was_modified: false,
30            encrypted_payload: None,
31        }
32    }
33
34    /// Create a processed event that was modified
35    pub fn modified(event: StreamEvent) -> Self {
36        Self {
37            event,
38            was_modified: true,
39            encrypted_payload: None,
40        }
41    }
42
43    /// Add an encrypted payload (for storage destination)
44    pub fn with_encrypted(mut self, encrypted: String) -> Self {
45        self.encrypted_payload = Some(encrypted);
46        self
47    }
48}
49
50/// Output Processor trait
51///
52/// Processors run AFTER kernel execution, BEFORE storage/streaming.
53/// They MUST NOT mutate execution state (streaming is read-only).
54#[async_trait]
55pub trait OutputProcessor: Send + Sync {
56    /// Processor name for logging/metrics
57    fn name(&self) -> &str;
58
59    /// Process an event before storage/streaming
60    ///
61    /// # Arguments
62    /// * `event` - The event to process
63    /// * `ctx` - Protection context (destination, tenant, etc.)
64    ///
65    /// # Returns
66    /// The processed event, potentially transformed
67    async fn process(
68        &self,
69        event: StreamEvent,
70        ctx: &ProtectionContext,
71    ) -> anyhow::Result<ProcessedEvent>;
72}
73
74/// Processor Pipeline - chains multiple processors
75pub struct ProcessorPipeline {
76    processors: Vec<Arc<dyn OutputProcessor>>,
77}
78
79impl ProcessorPipeline {
80    /// Create a new empty pipeline
81    pub fn new() -> Self {
82        Self { processors: vec![] }
83    }
84
85    /// Add a processor to the pipeline
86    #[allow(clippy::should_implement_trait)]
87    pub fn add(mut self, processor: Arc<dyn OutputProcessor>) -> Self {
88        self.processors.push(processor);
89        self
90    }
91
92    /// Run all processors in sequence
93    pub async fn process(
94        &self,
95        event: StreamEvent,
96        ctx: &ProtectionContext,
97    ) -> anyhow::Result<ProcessedEvent> {
98        let mut current_event = event;
99        let mut any_modified = false;
100        let mut encrypted_payload = None;
101
102        for processor in &self.processors {
103            let result = processor.process(current_event, ctx).await?;
104            any_modified = any_modified || result.was_modified;
105            encrypted_payload = result.encrypted_payload.or(encrypted_payload);
106            current_event = result.event;
107        }
108
109        Ok(ProcessedEvent {
110            event: current_event,
111            was_modified: any_modified,
112            encrypted_payload,
113        })
114    }
115
116    /// Check if the pipeline is empty
117    pub fn is_empty(&self) -> bool {
118        self.processors.is_empty()
119    }
120
121    /// Get the number of processors
122    pub fn len(&self) -> usize {
123        self.processors.len()
124    }
125}
126
127impl Default for ProcessorPipeline {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136
137    #[test]
138    fn test_processed_event_unchanged() {
139        let event = StreamEvent::text_start(None);
140        let processed = ProcessedEvent::unchanged(event);
141        assert!(!processed.was_modified);
142        assert!(processed.encrypted_payload.is_none());
143    }
144
145    #[test]
146    fn test_processed_event_modified() {
147        let event = StreamEvent::text_start(None);
148        let processed = ProcessedEvent::modified(event);
149        assert!(processed.was_modified);
150    }
151
152    #[test]
153    fn test_processed_event_with_encrypted() {
154        let event = StreamEvent::text_start(None);
155        let processed =
156            ProcessedEvent::modified(event).with_encrypted("encrypted_data".to_string());
157        assert!(processed.encrypted_payload.is_some());
158        assert_eq!(processed.encrypted_payload.unwrap(), "encrypted_data");
159    }
160
161    #[test]
162    fn test_pipeline_new() {
163        let pipeline = ProcessorPipeline::new();
164        assert!(pipeline.is_empty());
165        assert_eq!(pipeline.len(), 0);
166    }
167}