enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! Protected Event Emitter
//!
//! Wraps EventEmitter with output processor pipeline to ensure all events
//! pass through protection before storage/streaming.
//!
//! @see docs/TECHNICAL/17-GUARDRAILS-PROTECTION.md

use super::protection::{OutputProcessor, ProcessorPipeline, ProtectionContext};
use super::{EventEmitter, StreamEvent, StreamMode};
use std::sync::Arc;

/// Protected Event Emitter
///
/// Wraps EventEmitter with output processor pipeline.
/// All events pass through protection before being added to the stream.
///
/// ## Usage
///
/// ```ignore
/// use enact_core::streaming::{ProtectedEventEmitter, PiiProtectionProcessor};
///
/// let emitter = ProtectedEventEmitter::new()
///     .with_processor(Arc::new(PiiProtectionProcessor::new()))
///     .with_context(ProtectionContext::for_stream());
/// ```
pub struct ProtectedEventEmitter {
    inner: EventEmitter,
    pipeline: ProcessorPipeline,
    context: ProtectionContext,
}

impl ProtectedEventEmitter {
    /// Create a new protected event emitter
    pub fn new() -> Self {
        Self {
            inner: EventEmitter::new(),
            pipeline: ProcessorPipeline::new(),
            context: ProtectionContext::for_stream(),
        }
    }

    /// Create with a specific stream mode
    pub fn with_mode(mode: StreamMode) -> Self {
        Self {
            inner: EventEmitter::with_mode(mode),
            pipeline: ProcessorPipeline::new(),
            context: ProtectionContext::for_stream(),
        }
    }

    /// Add a processor to the pipeline
    pub fn with_processor(mut self, processor: Arc<dyn OutputProcessor>) -> Self {
        self.pipeline = self.pipeline.add(processor);
        self
    }

    /// Set the protection context
    pub fn with_context(mut self, context: ProtectionContext) -> Self {
        self.context = context;
        self
    }

    /// Set the protection context (mutable)
    pub fn set_context(&mut self, context: ProtectionContext) {
        self.context = context;
    }

    /// Get the protection context
    pub fn context(&self) -> &ProtectionContext {
        &self.context
    }

    /// Emit an event (runs through protection pipeline)
    ///
    /// This is async because processors may need async operations
    pub async fn emit(&self, event: StreamEvent) -> anyhow::Result<()> {
        if self.pipeline.is_empty() {
            // No processors, emit directly
            self.inner.emit(event);
            return Ok(());
        }

        // Run through protection pipeline
        let processed = self.pipeline.process(event, &self.context).await?;
        self.inner.emit(processed.event);
        Ok(())
    }

    /// Emit an event synchronously (bypasses protection pipeline)
    ///
    /// Use only for events that are guaranteed safe (control events, etc.)
    pub fn emit_unprotected(&self, event: StreamEvent) {
        self.inner.emit(event);
    }

    /// Emit an event unconditionally (ignores mode, runs through protection)
    pub async fn emit_force(&self, event: StreamEvent) -> anyhow::Result<()> {
        if self.pipeline.is_empty() {
            self.inner.emit_force(event);
            return Ok(());
        }

        let processed = self.pipeline.process(event, &self.context).await?;
        self.inner.emit_force(processed.event);
        Ok(())
    }

    /// Get all collected events
    pub fn drain(&self) -> Vec<StreamEvent> {
        self.inner.drain()
    }

    /// Get the current stream mode
    pub fn mode(&self) -> StreamMode {
        self.inner.mode()
    }

    /// Get reference to inner emitter (for compatibility)
    pub fn inner(&self) -> &EventEmitter {
        &self.inner
    }
}

impl Default for ProtectedEventEmitter {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::kernel::ExecutionId;

    #[tokio::test]
    async fn test_protected_emitter_no_processors() {
        let emitter = ProtectedEventEmitter::new();
        let exec_id = ExecutionId::new();

        emitter
            .emit(StreamEvent::execution_start(&exec_id))
            .await
            .unwrap();

        let events = emitter.drain();
        assert_eq!(events.len(), 1);
    }

    #[tokio::test]
    async fn test_protected_emitter_emit_unprotected() {
        let emitter = ProtectedEventEmitter::new();
        let exec_id = ExecutionId::new();

        // Emit synchronously
        emitter.emit_unprotected(StreamEvent::execution_start(&exec_id));

        let events = emitter.drain();
        assert_eq!(events.len(), 1);
    }

    #[tokio::test]
    async fn test_protected_emitter_context() {
        let emitter = ProtectedEventEmitter::new().with_context(ProtectionContext::for_storage());

        assert!(emitter.context().destination.requires_encryption());
    }

    #[tokio::test]
    async fn test_protected_emitter_mode() {
        let emitter = ProtectedEventEmitter::with_mode(StreamMode::Summary);
        assert_eq!(emitter.mode(), StreamMode::Summary);
    }
}