use super::protection::{OutputProcessor, ProcessorPipeline, ProtectionContext};
use super::{EventEmitter, StreamEvent, StreamMode};
use std::sync::Arc;
pub struct ProtectedEventEmitter {
inner: EventEmitter,
pipeline: ProcessorPipeline,
context: ProtectionContext,
}
impl ProtectedEventEmitter {
pub fn new() -> Self {
Self {
inner: EventEmitter::new(),
pipeline: ProcessorPipeline::new(),
context: ProtectionContext::for_stream(),
}
}
pub fn with_mode(mode: StreamMode) -> Self {
Self {
inner: EventEmitter::with_mode(mode),
pipeline: ProcessorPipeline::new(),
context: ProtectionContext::for_stream(),
}
}
pub fn with_processor(mut self, processor: Arc<dyn OutputProcessor>) -> Self {
self.pipeline = self.pipeline.add(processor);
self
}
pub fn with_context(mut self, context: ProtectionContext) -> Self {
self.context = context;
self
}
pub fn set_context(&mut self, context: ProtectionContext) {
self.context = context;
}
pub fn context(&self) -> &ProtectionContext {
&self.context
}
pub async fn emit(&self, event: StreamEvent) -> anyhow::Result<()> {
if self.pipeline.is_empty() {
self.inner.emit(event);
return Ok(());
}
let processed = self.pipeline.process(event, &self.context).await?;
self.inner.emit(processed.event);
Ok(())
}
pub fn emit_unprotected(&self, event: StreamEvent) {
self.inner.emit(event);
}
pub async fn emit_force(&self, event: StreamEvent) -> anyhow::Result<()> {
if self.pipeline.is_empty() {
self.inner.emit_force(event);
return Ok(());
}
let processed = self.pipeline.process(event, &self.context).await?;
self.inner.emit_force(processed.event);
Ok(())
}
pub fn drain(&self) -> Vec<StreamEvent> {
self.inner.drain()
}
pub fn mode(&self) -> StreamMode {
self.inner.mode()
}
pub fn inner(&self) -> &EventEmitter {
&self.inner
}
}
impl Default for ProtectedEventEmitter {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::ExecutionId;
#[tokio::test]
async fn test_protected_emitter_no_processors() {
let emitter = ProtectedEventEmitter::new();
let exec_id = ExecutionId::new();
emitter
.emit(StreamEvent::execution_start(&exec_id))
.await
.unwrap();
let events = emitter.drain();
assert_eq!(events.len(), 1);
}
#[tokio::test]
async fn test_protected_emitter_emit_unprotected() {
let emitter = ProtectedEventEmitter::new();
let exec_id = ExecutionId::new();
emitter.emit_unprotected(StreamEvent::execution_start(&exec_id));
let events = emitter.drain();
assert_eq!(events.len(), 1);
}
#[tokio::test]
async fn test_protected_emitter_context() {
let emitter = ProtectedEventEmitter::new().with_context(ProtectionContext::for_storage());
assert!(emitter.context().destination.requires_encryption());
}
#[tokio::test]
async fn test_protected_emitter_mode() {
let emitter = ProtectedEventEmitter::with_mode(StreamMode::Summary);
assert_eq!(emitter.mode(), StreamMode::Summary);
}
}