enact_core/streaming/
protected_emitter.rs1use super::protection::{OutputProcessor, ProcessorPipeline, ProtectionContext};
9use super::{EventEmitter, StreamEvent, StreamMode};
10use std::sync::Arc;
11
12pub struct ProtectedEventEmitter {
27 inner: EventEmitter,
28 pipeline: ProcessorPipeline,
29 context: ProtectionContext,
30}
31
32impl ProtectedEventEmitter {
33 pub fn new() -> Self {
35 Self {
36 inner: EventEmitter::new(),
37 pipeline: ProcessorPipeline::new(),
38 context: ProtectionContext::for_stream(),
39 }
40 }
41
42 pub fn with_mode(mode: StreamMode) -> Self {
44 Self {
45 inner: EventEmitter::with_mode(mode),
46 pipeline: ProcessorPipeline::new(),
47 context: ProtectionContext::for_stream(),
48 }
49 }
50
51 pub fn with_processor(mut self, processor: Arc<dyn OutputProcessor>) -> Self {
53 self.pipeline = self.pipeline.add(processor);
54 self
55 }
56
57 pub fn with_context(mut self, context: ProtectionContext) -> Self {
59 self.context = context;
60 self
61 }
62
63 pub fn set_context(&mut self, context: ProtectionContext) {
65 self.context = context;
66 }
67
68 pub fn context(&self) -> &ProtectionContext {
70 &self.context
71 }
72
73 pub async fn emit(&self, event: StreamEvent) -> anyhow::Result<()> {
77 if self.pipeline.is_empty() {
78 self.inner.emit(event);
80 return Ok(());
81 }
82
83 let processed = self.pipeline.process(event, &self.context).await?;
85 self.inner.emit(processed.event);
86 Ok(())
87 }
88
89 pub fn emit_unprotected(&self, event: StreamEvent) {
93 self.inner.emit(event);
94 }
95
96 pub async fn emit_force(&self, event: StreamEvent) -> anyhow::Result<()> {
98 if self.pipeline.is_empty() {
99 self.inner.emit_force(event);
100 return Ok(());
101 }
102
103 let processed = self.pipeline.process(event, &self.context).await?;
104 self.inner.emit_force(processed.event);
105 Ok(())
106 }
107
108 pub fn drain(&self) -> Vec<StreamEvent> {
110 self.inner.drain()
111 }
112
113 pub fn mode(&self) -> StreamMode {
115 self.inner.mode()
116 }
117
118 pub fn inner(&self) -> &EventEmitter {
120 &self.inner
121 }
122}
123
124impl Default for ProtectedEventEmitter {
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use crate::kernel::ExecutionId;
134
135 #[tokio::test]
136 async fn test_protected_emitter_no_processors() {
137 let emitter = ProtectedEventEmitter::new();
138 let exec_id = ExecutionId::new();
139
140 emitter
141 .emit(StreamEvent::execution_start(&exec_id))
142 .await
143 .unwrap();
144
145 let events = emitter.drain();
146 assert_eq!(events.len(), 1);
147 }
148
149 #[tokio::test]
150 async fn test_protected_emitter_emit_unprotected() {
151 let emitter = ProtectedEventEmitter::new();
152 let exec_id = ExecutionId::new();
153
154 emitter.emit_unprotected(StreamEvent::execution_start(&exec_id));
156
157 let events = emitter.drain();
158 assert_eq!(events.len(), 1);
159 }
160
161 #[tokio::test]
162 async fn test_protected_emitter_context() {
163 let emitter = ProtectedEventEmitter::new().with_context(ProtectionContext::for_storage());
164
165 assert!(emitter.context().destination.requires_encryption());
166 }
167
168 #[tokio::test]
169 async fn test_protected_emitter_mode() {
170 let emitter = ProtectedEventEmitter::with_mode(StreamMode::Summary);
171 assert_eq!(emitter.mode(), StreamMode::Summary);
172 }
173}