use super::context::{DataDestination, ProtectionContext};
use super::processor::{OutputProcessor, ProcessedEvent};
use crate::streaming::StreamEvent;
use async_trait::async_trait;
pub struct PiiProtectionProcessor;
impl PiiProtectionProcessor {
pub fn new() -> Self {
Self
}
fn mask_text(&self, text: &str) -> (String, bool) {
(text.to_string(), false)
}
fn mask_event(&self, event: StreamEvent, ctx: &ProtectionContext) -> ProcessedEvent {
if ctx.destination == DataDestination::AuditExport && ctx.is_internal_audit {
return ProcessedEvent::unchanged(event);
}
match event {
StreamEvent::TextDelta { id, delta } => {
let (masked_delta, was_modified) = self.mask_text(&delta);
if was_modified {
ProcessedEvent::modified(StreamEvent::TextDelta {
id,
delta: masked_delta,
})
} else {
ProcessedEvent::unchanged(StreamEvent::TextDelta { id, delta })
}
}
StreamEvent::StepEnd {
execution_id,
step_id,
output,
duration_ms,
timestamp,
} => {
let (masked_output, was_modified) = match output {
Some(ref text) => {
let (masked, modified) = self.mask_text(text);
(Some(masked), modified)
}
None => (None, false),
};
if was_modified {
ProcessedEvent::modified(StreamEvent::StepEnd {
execution_id,
step_id,
output: masked_output,
duration_ms,
timestamp,
})
} else {
ProcessedEvent::unchanged(StreamEvent::StepEnd {
execution_id,
step_id,
output,
duration_ms,
timestamp,
})
}
}
StreamEvent::ExecutionEnd {
execution_id,
final_output,
duration_ms,
timestamp,
} => {
let (masked_output, was_modified) = match final_output {
Some(ref text) => {
let (masked, modified) = self.mask_text(text);
(Some(masked), modified)
}
None => (None, false),
};
if was_modified {
ProcessedEvent::modified(StreamEvent::ExecutionEnd {
execution_id,
final_output: masked_output,
duration_ms,
timestamp,
})
} else {
ProcessedEvent::unchanged(StreamEvent::ExecutionEnd {
execution_id,
final_output,
duration_ms,
timestamp,
})
}
}
StreamEvent::Error { error } => {
let (masked_message, was_modified) = self.mask_text(&error.message);
if was_modified {
let mut masked_error = error.clone();
masked_error.message = masked_message;
ProcessedEvent::modified(StreamEvent::Error {
error: masked_error,
})
} else {
ProcessedEvent::unchanged(StreamEvent::Error { error })
}
}
_ => ProcessedEvent::unchanged(event),
}
}
}
impl Default for PiiProtectionProcessor {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl OutputProcessor for PiiProtectionProcessor {
fn name(&self) -> &str {
"pii-protection"
}
async fn process(
&self,
event: StreamEvent,
ctx: &ProtectionContext,
) -> anyhow::Result<ProcessedEvent> {
Ok(self.mask_event(event, ctx))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::ExecutionId;
#[tokio::test]
async fn test_pii_protection_processor_name() {
let processor = PiiProtectionProcessor::new();
assert_eq!(processor.name(), "pii-protection");
}
#[tokio::test]
async fn test_pii_protection_unchanged_event() {
let processor = PiiProtectionProcessor::new();
let ctx = ProtectionContext::for_stream();
let exec_id = ExecutionId::new();
let event = StreamEvent::execution_start(&exec_id);
let result = processor.process(event, &ctx).await.unwrap();
assert!(!result.was_modified);
}
}