Skip to main content

enact_core/streaming/
protected_emitter.rs

1//! Protected Event Emitter
2//!
3//! Wraps EventEmitter with output processor pipeline to ensure all events
4//! pass through protection before storage/streaming.
5//!
6//! @see docs/TECHNICAL/17-GUARDRAILS-PROTECTION.md
7
8use super::protection::{OutputProcessor, ProcessorPipeline, ProtectionContext};
9use super::{EventEmitter, StreamEvent, StreamMode};
10use std::sync::Arc;
11
12/// Protected Event Emitter
13///
14/// Wraps EventEmitter with output processor pipeline.
15/// All events pass through protection before being added to the stream.
16///
17/// ## Usage
18///
19/// ```ignore
20/// use enact_core::streaming::{ProtectedEventEmitter, PiiProtectionProcessor};
21///
22/// let emitter = ProtectedEventEmitter::new()
23///     .with_processor(Arc::new(PiiProtectionProcessor::new()))
24///     .with_context(ProtectionContext::for_stream());
25/// ```
26pub struct ProtectedEventEmitter {
27    inner: EventEmitter,
28    pipeline: ProcessorPipeline,
29    context: ProtectionContext,
30}
31
32impl ProtectedEventEmitter {
33    /// Create a new protected event emitter
34    pub fn new() -> Self {
35        Self {
36            inner: EventEmitter::new(),
37            pipeline: ProcessorPipeline::new(),
38            context: ProtectionContext::for_stream(),
39        }
40    }
41
42    /// Create with a specific stream mode
43    pub fn with_mode(mode: StreamMode) -> Self {
44        Self {
45            inner: EventEmitter::with_mode(mode),
46            pipeline: ProcessorPipeline::new(),
47            context: ProtectionContext::for_stream(),
48        }
49    }
50
51    /// Add a processor to the pipeline
52    pub fn with_processor(mut self, processor: Arc<dyn OutputProcessor>) -> Self {
53        self.pipeline = self.pipeline.add(processor);
54        self
55    }
56
57    /// Set the protection context
58    pub fn with_context(mut self, context: ProtectionContext) -> Self {
59        self.context = context;
60        self
61    }
62
63    /// Set the protection context (mutable)
64    pub fn set_context(&mut self, context: ProtectionContext) {
65        self.context = context;
66    }
67
68    /// Get the protection context
69    pub fn context(&self) -> &ProtectionContext {
70        &self.context
71    }
72
73    /// Emit an event (runs through protection pipeline)
74    ///
75    /// This is async because processors may need async operations
76    pub async fn emit(&self, event: StreamEvent) -> anyhow::Result<()> {
77        if self.pipeline.is_empty() {
78            // No processors, emit directly
79            self.inner.emit(event);
80            return Ok(());
81        }
82
83        // Run through protection pipeline
84        let processed = self.pipeline.process(event, &self.context).await?;
85        self.inner.emit(processed.event);
86        Ok(())
87    }
88
89    /// Emit an event synchronously (bypasses protection pipeline)
90    ///
91    /// Use only for events that are guaranteed safe (control events, etc.)
92    pub fn emit_unprotected(&self, event: StreamEvent) {
93        self.inner.emit(event);
94    }
95
96    /// Emit an event unconditionally (ignores mode, runs through protection)
97    pub async fn emit_force(&self, event: StreamEvent) -> anyhow::Result<()> {
98        if self.pipeline.is_empty() {
99            self.inner.emit_force(event);
100            return Ok(());
101        }
102
103        let processed = self.pipeline.process(event, &self.context).await?;
104        self.inner.emit_force(processed.event);
105        Ok(())
106    }
107
108    /// Get all collected events
109    pub fn drain(&self) -> Vec<StreamEvent> {
110        self.inner.drain()
111    }
112
113    /// Get the current stream mode
114    pub fn mode(&self) -> StreamMode {
115        self.inner.mode()
116    }
117
118    /// Get reference to inner emitter (for compatibility)
119    pub fn inner(&self) -> &EventEmitter {
120        &self.inner
121    }
122}
123
124impl Default for ProtectedEventEmitter {
125    fn default() -> Self {
126        Self::new()
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use crate::kernel::ExecutionId;
134
135    #[tokio::test]
136    async fn test_protected_emitter_no_processors() {
137        let emitter = ProtectedEventEmitter::new();
138        let exec_id = ExecutionId::new();
139
140        emitter
141            .emit(StreamEvent::execution_start(&exec_id))
142            .await
143            .unwrap();
144
145        let events = emitter.drain();
146        assert_eq!(events.len(), 1);
147    }
148
149    #[tokio::test]
150    async fn test_protected_emitter_emit_unprotected() {
151        let emitter = ProtectedEventEmitter::new();
152        let exec_id = ExecutionId::new();
153
154        // Emit synchronously
155        emitter.emit_unprotected(StreamEvent::execution_start(&exec_id));
156
157        let events = emitter.drain();
158        assert_eq!(events.len(), 1);
159    }
160
161    #[tokio::test]
162    async fn test_protected_emitter_context() {
163        let emitter = ProtectedEventEmitter::new().with_context(ProtectionContext::for_storage());
164
165        assert!(emitter.context().destination.requires_encryption());
166    }
167
168    #[tokio::test]
169    async fn test_protected_emitter_mode() {
170        let emitter = ProtectedEventEmitter::with_mode(StreamMode::Summary);
171        assert_eq!(emitter.mode(), StreamMode::Summary);
172    }
173}