enact-core 0.0.1

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! PII Protection Processor
//!
//! Masks PII in stream events before storage or streaming to frontend.
//! Uses `enact-guardrails` for detection and redaction.

use super::context::{DataDestination, ProtectionContext};
use super::processor::{OutputProcessor, ProcessedEvent};
use crate::streaming::StreamEvent;
use async_trait::async_trait;

#[cfg(feature = "guardrails")]
use enact_guardrails::{PiiDetector, PiiRedactor};

/// PII Protection Processor
///
/// Detects and masks PII in event payloads based on destination:
/// - Stream (frontend): Always mask
/// - Storage: Mask + optionally encrypt (future)
/// - Log: Hash sensitive fields
/// - AuditExport: Mask for external, allow for internal
pub struct PiiProtectionProcessor {
    #[cfg(feature = "guardrails")]
    detector: PiiDetector,
}

impl PiiProtectionProcessor {
    /// Create a new PII protection processor
    #[cfg(feature = "guardrails")]
    pub fn new() -> Self {
        Self {
            detector: PiiDetector::new(),
        }
    }

    /// Create a new PII protection processor (no-op when guardrails feature disabled)
    #[cfg(not(feature = "guardrails"))]
    pub fn new() -> Self {
        Self {}
    }

    /// Mask PII in a text field
    #[cfg(feature = "guardrails")]
    fn mask_text(&self, text: &str) -> (String, bool) {
        let matches = self.detector.detect(text);
        if matches.is_empty() {
            (text.to_string(), false)
        } else {
            (PiiRedactor::redact_matches(text, &matches), true)
        }
    }

    /// Mask PII in a text field (no-op when guardrails disabled)
    #[cfg(not(feature = "guardrails"))]
    fn mask_text(&self, text: &str) -> (String, bool) {
        (text.to_string(), false)
    }

    /// Process a StreamEvent, masking any PII in text fields
    fn mask_event(&self, event: StreamEvent, ctx: &ProtectionContext) -> ProcessedEvent {
        // For internal audit exports, don't mask (they need full access)
        if ctx.destination == DataDestination::AuditExport && ctx.is_internal_audit {
            return ProcessedEvent::unchanged(event);
        }

        match event {
            StreamEvent::TextDelta { id, delta } => {
                let (masked_delta, was_modified) = self.mask_text(&delta);
                if was_modified {
                    ProcessedEvent::modified(StreamEvent::TextDelta {
                        id,
                        delta: masked_delta,
                    })
                } else {
                    ProcessedEvent::unchanged(StreamEvent::TextDelta { id, delta })
                }
            }
            StreamEvent::StepEnd {
                execution_id,
                step_id,
                output,
                duration_ms,
                timestamp,
            } => {
                let (masked_output, was_modified) = match output {
                    Some(ref text) => {
                        let (masked, modified) = self.mask_text(text);
                        (Some(masked), modified)
                    }
                    None => (None, false),
                };
                if was_modified {
                    ProcessedEvent::modified(StreamEvent::StepEnd {
                        execution_id,
                        step_id,
                        output: masked_output,
                        duration_ms,
                        timestamp,
                    })
                } else {
                    ProcessedEvent::unchanged(StreamEvent::StepEnd {
                        execution_id,
                        step_id,
                        output,
                        duration_ms,
                        timestamp,
                    })
                }
            }
            StreamEvent::ExecutionEnd {
                execution_id,
                final_output,
                duration_ms,
                timestamp,
            } => {
                let (masked_output, was_modified) = match final_output {
                    Some(ref text) => {
                        let (masked, modified) = self.mask_text(text);
                        (Some(masked), modified)
                    }
                    None => (None, false),
                };
                if was_modified {
                    ProcessedEvent::modified(StreamEvent::ExecutionEnd {
                        execution_id,
                        final_output: masked_output,
                        duration_ms,
                        timestamp,
                    })
                } else {
                    ProcessedEvent::unchanged(StreamEvent::ExecutionEnd {
                        execution_id,
                        final_output,
                        duration_ms,
                        timestamp,
                    })
                }
            }
            StreamEvent::Error { error } => {
                // Mask error messages - they might contain PII
                let (masked_message, was_modified) = self.mask_text(&error.message);
                if was_modified {
                    let mut masked_error = error.clone();
                    masked_error.message = masked_message;
                    ProcessedEvent::modified(StreamEvent::Error {
                        error: masked_error,
                    })
                } else {
                    ProcessedEvent::unchanged(StreamEvent::Error { error })
                }
            }
            // Events without text content pass through unchanged
            _ => ProcessedEvent::unchanged(event),
        }
    }
}

impl Default for PiiProtectionProcessor {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl OutputProcessor for PiiProtectionProcessor {
    fn name(&self) -> &str {
        "pii-protection"
    }

    async fn process(
        &self,
        event: StreamEvent,
        ctx: &ProtectionContext,
    ) -> anyhow::Result<ProcessedEvent> {
        Ok(self.mask_event(event, ctx))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::kernel::ExecutionId;

    #[tokio::test]
    async fn test_pii_protection_processor_name() {
        let processor = PiiProtectionProcessor::new();
        assert_eq!(processor.name(), "pii-protection");
    }

    #[cfg(feature = "guardrails")]
    #[tokio::test]
    async fn test_pii_protection_masks_email_in_text_delta() {
        let processor = PiiProtectionProcessor::new();
        let ctx = ProtectionContext::for_stream();

        let event = StreamEvent::TextDelta {
            id: "test".to_string(),
            delta: "Contact me at user@example.com".to_string(),
        };

        let result = processor.process(event, &ctx).await.unwrap();
        assert!(result.was_modified);

        if let StreamEvent::TextDelta { delta, .. } = result.event {
            assert!(delta.contains("us***@example.com"));
            assert!(!delta.contains("user@example.com"));
        } else {
            panic!("Expected TextDelta event");
        }
    }

    #[cfg(feature = "guardrails")]
    #[tokio::test]
    async fn test_pii_protection_skips_internal_audit() {
        let processor = PiiProtectionProcessor::new();
        let ctx = ProtectionContext::for_audit(true); // internal audit

        let event = StreamEvent::TextDelta {
            id: "test".to_string(),
            delta: "Contact me at user@example.com".to_string(),
        };

        let result = processor.process(event, &ctx).await.unwrap();
        // Internal audit should NOT mask
        assert!(!result.was_modified);

        if let StreamEvent::TextDelta { delta, .. } = result.event {
            assert!(delta.contains("user@example.com")); // unmasked
        }
    }

    #[tokio::test]
    async fn test_pii_protection_unchanged_event() {
        let processor = PiiProtectionProcessor::new();
        let ctx = ProtectionContext::for_stream();

        // Events without text content pass through unchanged
        let exec_id = ExecutionId::new();
        let event = StreamEvent::execution_start(&exec_id);

        let result = processor.process(event, &ctx).await.unwrap();
        assert!(!result.was_modified);
    }
}