enact_core/streaming/protection/
pii_protection.rs1use super::context::{DataDestination, ProtectionContext};
7use super::processor::{OutputProcessor, ProcessedEvent};
8use crate::streaming::StreamEvent;
9use async_trait::async_trait;
10
11pub struct PiiProtectionProcessor;
16
17impl PiiProtectionProcessor {
18 pub fn new() -> Self {
20 Self
21 }
22
23 fn mask_text(&self, text: &str) -> (String, bool) {
25 (text.to_string(), false)
26 }
27
28 fn mask_event(&self, event: StreamEvent, ctx: &ProtectionContext) -> ProcessedEvent {
30 if ctx.destination == DataDestination::AuditExport && ctx.is_internal_audit {
32 return ProcessedEvent::unchanged(event);
33 }
34
35 match event {
36 StreamEvent::TextDelta { id, delta } => {
37 let (masked_delta, was_modified) = self.mask_text(&delta);
38 if was_modified {
39 ProcessedEvent::modified(StreamEvent::TextDelta {
40 id,
41 delta: masked_delta,
42 })
43 } else {
44 ProcessedEvent::unchanged(StreamEvent::TextDelta { id, delta })
45 }
46 }
47 StreamEvent::StepEnd {
48 execution_id,
49 step_id,
50 output,
51 duration_ms,
52 timestamp,
53 } => {
54 let (masked_output, was_modified) = match output {
55 Some(ref text) => {
56 let (masked, modified) = self.mask_text(text);
57 (Some(masked), modified)
58 }
59 None => (None, false),
60 };
61 if was_modified {
62 ProcessedEvent::modified(StreamEvent::StepEnd {
63 execution_id,
64 step_id,
65 output: masked_output,
66 duration_ms,
67 timestamp,
68 })
69 } else {
70 ProcessedEvent::unchanged(StreamEvent::StepEnd {
71 execution_id,
72 step_id,
73 output,
74 duration_ms,
75 timestamp,
76 })
77 }
78 }
79 StreamEvent::ExecutionEnd {
80 execution_id,
81 final_output,
82 duration_ms,
83 timestamp,
84 } => {
85 let (masked_output, was_modified) = match final_output {
86 Some(ref text) => {
87 let (masked, modified) = self.mask_text(text);
88 (Some(masked), modified)
89 }
90 None => (None, false),
91 };
92 if was_modified {
93 ProcessedEvent::modified(StreamEvent::ExecutionEnd {
94 execution_id,
95 final_output: masked_output,
96 duration_ms,
97 timestamp,
98 })
99 } else {
100 ProcessedEvent::unchanged(StreamEvent::ExecutionEnd {
101 execution_id,
102 final_output,
103 duration_ms,
104 timestamp,
105 })
106 }
107 }
108 StreamEvent::Error { error } => {
109 let (masked_message, was_modified) = self.mask_text(&error.message);
111 if was_modified {
112 let mut masked_error = error.clone();
113 masked_error.message = masked_message;
114 ProcessedEvent::modified(StreamEvent::Error {
115 error: masked_error,
116 })
117 } else {
118 ProcessedEvent::unchanged(StreamEvent::Error { error })
119 }
120 }
121 _ => ProcessedEvent::unchanged(event),
123 }
124 }
125}
126
127impl Default for PiiProtectionProcessor {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133#[async_trait]
134impl OutputProcessor for PiiProtectionProcessor {
135 fn name(&self) -> &str {
136 "pii-protection"
137 }
138
139 async fn process(
140 &self,
141 event: StreamEvent,
142 ctx: &ProtectionContext,
143 ) -> anyhow::Result<ProcessedEvent> {
144 Ok(self.mask_event(event, ctx))
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use crate::kernel::ExecutionId;
152
153 #[tokio::test]
154 async fn test_pii_protection_processor_name() {
155 let processor = PiiProtectionProcessor::new();
156 assert_eq!(processor.name(), "pii-protection");
157 }
158
159 #[tokio::test]
160 async fn test_pii_protection_unchanged_event() {
161 let processor = PiiProtectionProcessor::new();
162 let ctx = ProtectionContext::for_stream();
163
164 let exec_id = ExecutionId::new();
166 let event = StreamEvent::execution_start(&exec_id);
167
168 let result = processor.process(event, &ctx).await.unwrap();
169 assert!(!result.was_modified);
170 }
171}