enact_core/streaming/protection/
processor.rs1use super::context::ProtectionContext;
7use crate::streaming::StreamEvent;
8use async_trait::async_trait;
9use std::sync::Arc;
10
11#[derive(Debug, Clone)]
13pub struct ProcessedEvent {
14 pub event: StreamEvent,
16
17 pub was_modified: bool,
19
20 pub encrypted_payload: Option<String>,
22}
23
24impl ProcessedEvent {
25 pub fn unchanged(event: StreamEvent) -> Self {
27 Self {
28 event,
29 was_modified: false,
30 encrypted_payload: None,
31 }
32 }
33
34 pub fn modified(event: StreamEvent) -> Self {
36 Self {
37 event,
38 was_modified: true,
39 encrypted_payload: None,
40 }
41 }
42
43 pub fn with_encrypted(mut self, encrypted: String) -> Self {
45 self.encrypted_payload = Some(encrypted);
46 self
47 }
48}
49
50#[async_trait]
55pub trait OutputProcessor: Send + Sync {
56 fn name(&self) -> &str;
58
59 async fn process(
68 &self,
69 event: StreamEvent,
70 ctx: &ProtectionContext,
71 ) -> anyhow::Result<ProcessedEvent>;
72}
73
74pub struct ProcessorPipeline {
76 processors: Vec<Arc<dyn OutputProcessor>>,
77}
78
79impl ProcessorPipeline {
80 pub fn new() -> Self {
82 Self { processors: vec![] }
83 }
84
85 #[allow(clippy::should_implement_trait)]
87 pub fn add(mut self, processor: Arc<dyn OutputProcessor>) -> Self {
88 self.processors.push(processor);
89 self
90 }
91
92 pub async fn process(
94 &self,
95 event: StreamEvent,
96 ctx: &ProtectionContext,
97 ) -> anyhow::Result<ProcessedEvent> {
98 let mut current_event = event;
99 let mut any_modified = false;
100 let mut encrypted_payload = None;
101
102 for processor in &self.processors {
103 let result = processor.process(current_event, ctx).await?;
104 any_modified = any_modified || result.was_modified;
105 encrypted_payload = result.encrypted_payload.or(encrypted_payload);
106 current_event = result.event;
107 }
108
109 Ok(ProcessedEvent {
110 event: current_event,
111 was_modified: any_modified,
112 encrypted_payload,
113 })
114 }
115
116 pub fn is_empty(&self) -> bool {
118 self.processors.is_empty()
119 }
120
121 pub fn len(&self) -> usize {
123 self.processors.len()
124 }
125}
126
127impl Default for ProcessorPipeline {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136
137 #[test]
138 fn test_processed_event_unchanged() {
139 let event = StreamEvent::text_start(None);
140 let processed = ProcessedEvent::unchanged(event);
141 assert!(!processed.was_modified);
142 assert!(processed.encrypted_payload.is_none());
143 }
144
145 #[test]
146 fn test_processed_event_modified() {
147 let event = StreamEvent::text_start(None);
148 let processed = ProcessedEvent::modified(event);
149 assert!(processed.was_modified);
150 }
151
152 #[test]
153 fn test_processed_event_with_encrypted() {
154 let event = StreamEvent::text_start(None);
155 let processed =
156 ProcessedEvent::modified(event).with_encrypted("encrypted_data".to_string());
157 assert!(processed.encrypted_payload.is_some());
158 assert_eq!(processed.encrypted_payload.unwrap(), "encrypted_data");
159 }
160
161 #[test]
162 fn test_pipeline_new() {
163 let pipeline = ProcessorPipeline::new();
164 assert!(pipeline.is_empty());
165 assert_eq!(pipeline.len(), 0);
166 }
167}