Skip to main content

enact_core/streaming/protection/
pii_protection.rs

1//! PII Protection Processor
2//!
3//! Masks PII in stream events before storage or streaming to frontend.
4//! (Full detection/redaction requires the optional `enact-guardrails` crate.)
5
6use super::context::{DataDestination, ProtectionContext};
7use super::processor::{OutputProcessor, ProcessedEvent};
8use crate::streaming::StreamEvent;
9use async_trait::async_trait;
10
11/// PII Protection Processor
12///
13/// Detects and masks PII in event payloads based on destination.
14/// Without enact-guardrails, passes through unchanged.
15pub struct PiiProtectionProcessor;
16
17impl PiiProtectionProcessor {
18    /// Create a new PII protection processor (no-op without enact-guardrails)
19    pub fn new() -> Self {
20        Self
21    }
22
23    /// Mask PII in a text field (no-op without enact-guardrails)
24    fn mask_text(&self, text: &str) -> (String, bool) {
25        (text.to_string(), false)
26    }
27
28    /// Process a StreamEvent, masking any PII in text fields
29    fn mask_event(&self, event: StreamEvent, ctx: &ProtectionContext) -> ProcessedEvent {
30        // For internal audit exports, don't mask (they need full access)
31        if ctx.destination == DataDestination::AuditExport && ctx.is_internal_audit {
32            return ProcessedEvent::unchanged(event);
33        }
34
35        match event {
36            StreamEvent::TextDelta { id, delta } => {
37                let (masked_delta, was_modified) = self.mask_text(&delta);
38                if was_modified {
39                    ProcessedEvent::modified(StreamEvent::TextDelta {
40                        id,
41                        delta: masked_delta,
42                    })
43                } else {
44                    ProcessedEvent::unchanged(StreamEvent::TextDelta { id, delta })
45                }
46            }
47            StreamEvent::StepEnd {
48                execution_id,
49                step_id,
50                output,
51                duration_ms,
52                timestamp,
53            } => {
54                let (masked_output, was_modified) = match output {
55                    Some(ref text) => {
56                        let (masked, modified) = self.mask_text(text);
57                        (Some(masked), modified)
58                    }
59                    None => (None, false),
60                };
61                if was_modified {
62                    ProcessedEvent::modified(StreamEvent::StepEnd {
63                        execution_id,
64                        step_id,
65                        output: masked_output,
66                        duration_ms,
67                        timestamp,
68                    })
69                } else {
70                    ProcessedEvent::unchanged(StreamEvent::StepEnd {
71                        execution_id,
72                        step_id,
73                        output,
74                        duration_ms,
75                        timestamp,
76                    })
77                }
78            }
79            StreamEvent::ExecutionEnd {
80                execution_id,
81                final_output,
82                duration_ms,
83                timestamp,
84            } => {
85                let (masked_output, was_modified) = match final_output {
86                    Some(ref text) => {
87                        let (masked, modified) = self.mask_text(text);
88                        (Some(masked), modified)
89                    }
90                    None => (None, false),
91                };
92                if was_modified {
93                    ProcessedEvent::modified(StreamEvent::ExecutionEnd {
94                        execution_id,
95                        final_output: masked_output,
96                        duration_ms,
97                        timestamp,
98                    })
99                } else {
100                    ProcessedEvent::unchanged(StreamEvent::ExecutionEnd {
101                        execution_id,
102                        final_output,
103                        duration_ms,
104                        timestamp,
105                    })
106                }
107            }
108            StreamEvent::Error { error } => {
109                // Mask error messages - they might contain PII
110                let (masked_message, was_modified) = self.mask_text(&error.message);
111                if was_modified {
112                    let mut masked_error = error.clone();
113                    masked_error.message = masked_message;
114                    ProcessedEvent::modified(StreamEvent::Error {
115                        error: masked_error,
116                    })
117                } else {
118                    ProcessedEvent::unchanged(StreamEvent::Error { error })
119                }
120            }
121            // Events without text content pass through unchanged
122            _ => ProcessedEvent::unchanged(event),
123        }
124    }
125}
126
127impl Default for PiiProtectionProcessor {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133#[async_trait]
134impl OutputProcessor for PiiProtectionProcessor {
135    fn name(&self) -> &str {
136        "pii-protection"
137    }
138
139    async fn process(
140        &self,
141        event: StreamEvent,
142        ctx: &ProtectionContext,
143    ) -> anyhow::Result<ProcessedEvent> {
144        Ok(self.mask_event(event, ctx))
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use crate::kernel::ExecutionId;
152
153    #[tokio::test]
154    async fn test_pii_protection_processor_name() {
155        let processor = PiiProtectionProcessor::new();
156        assert_eq!(processor.name(), "pii-protection");
157    }
158
159    #[tokio::test]
160    async fn test_pii_protection_unchanged_event() {
161        let processor = PiiProtectionProcessor::new();
162        let ctx = ProtectionContext::for_stream();
163
164        // Events without text content pass through unchanged
165        let exec_id = ExecutionId::new();
166        let event = StreamEvent::execution_start(&exec_id);
167
168        let result = processor.process(event, &ctx).await.unwrap();
169        assert!(!result.was_modified);
170    }
171}