use super::context::ProtectionContext;
use crate::streaming::StreamEvent;
use async_trait::async_trait;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct ProcessedEvent {
pub event: StreamEvent,
pub was_modified: bool,
pub encrypted_payload: Option<String>,
}
impl ProcessedEvent {
pub fn unchanged(event: StreamEvent) -> Self {
Self {
event,
was_modified: false,
encrypted_payload: None,
}
}
pub fn modified(event: StreamEvent) -> Self {
Self {
event,
was_modified: true,
encrypted_payload: None,
}
}
pub fn with_encrypted(mut self, encrypted: String) -> Self {
self.encrypted_payload = Some(encrypted);
self
}
}
#[async_trait]
pub trait OutputProcessor: Send + Sync {
fn name(&self) -> &str;
async fn process(
&self,
event: StreamEvent,
ctx: &ProtectionContext,
) -> anyhow::Result<ProcessedEvent>;
}
pub struct ProcessorPipeline {
processors: Vec<Arc<dyn OutputProcessor>>,
}
impl ProcessorPipeline {
pub fn new() -> Self {
Self { processors: vec![] }
}
#[allow(clippy::should_implement_trait)]
pub fn add(mut self, processor: Arc<dyn OutputProcessor>) -> Self {
self.processors.push(processor);
self
}
pub async fn process(
&self,
event: StreamEvent,
ctx: &ProtectionContext,
) -> anyhow::Result<ProcessedEvent> {
let mut current_event = event;
let mut any_modified = false;
let mut encrypted_payload = None;
for processor in &self.processors {
let result = processor.process(current_event, ctx).await?;
any_modified = any_modified || result.was_modified;
encrypted_payload = result.encrypted_payload.or(encrypted_payload);
current_event = result.event;
}
Ok(ProcessedEvent {
event: current_event,
was_modified: any_modified,
encrypted_payload,
})
}
pub fn is_empty(&self) -> bool {
self.processors.is_empty()
}
pub fn len(&self) -> usize {
self.processors.len()
}
}
impl Default for ProcessorPipeline {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_processed_event_unchanged() {
let event = StreamEvent::text_start(None);
let processed = ProcessedEvent::unchanged(event);
assert!(!processed.was_modified);
assert!(processed.encrypted_payload.is_none());
}
#[test]
fn test_processed_event_modified() {
let event = StreamEvent::text_start(None);
let processed = ProcessedEvent::modified(event);
assert!(processed.was_modified);
}
#[test]
fn test_processed_event_with_encrypted() {
let event = StreamEvent::text_start(None);
let processed =
ProcessedEvent::modified(event).with_encrypted("encrypted_data".to_string());
assert!(processed.encrypted_payload.is_some());
assert_eq!(processed.encrypted_payload.unwrap(), "encrypted_data");
}
#[test]
fn test_pipeline_new() {
let pipeline = ProcessorPipeline::new();
assert!(pipeline.is_empty());
assert_eq!(pipeline.len(), 0);
}
}