enact-core 0.0.2

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! Output Processor Trait
//!
//! Defines the trait for output processors that transform events before
//! storage or streaming.

use super::context::ProtectionContext;
use crate::streaming::StreamEvent;
use async_trait::async_trait;
use std::sync::Arc;

/// Result of processing an event
#[derive(Debug, Clone)]
pub struct ProcessedEvent {
    /// The processed event (may be transformed)
    pub event: StreamEvent,

    /// Whether the event was modified
    pub was_modified: bool,

    /// Optional: encrypted payload for storage (if applicable)
    pub encrypted_payload: Option<String>,
}

impl ProcessedEvent {
    /// Create a processed event that was not modified
    pub fn unchanged(event: StreamEvent) -> Self {
        Self {
            event,
            was_modified: false,
            encrypted_payload: None,
        }
    }

    /// Create a processed event that was modified
    pub fn modified(event: StreamEvent) -> Self {
        Self {
            event,
            was_modified: true,
            encrypted_payload: None,
        }
    }

    /// Add an encrypted payload (for storage destination)
    pub fn with_encrypted(mut self, encrypted: String) -> Self {
        self.encrypted_payload = Some(encrypted);
        self
    }
}

/// Output Processor trait
///
/// Processors run AFTER kernel execution, BEFORE storage/streaming.
/// They MUST NOT mutate execution state (streaming is read-only).
#[async_trait]
pub trait OutputProcessor: Send + Sync {
    /// Processor name for logging/metrics
    fn name(&self) -> &str;

    /// Process an event before storage/streaming
    ///
    /// # Arguments
    /// * `event` - The event to process
    /// * `ctx` - Protection context (destination, tenant, etc.)
    ///
    /// # Returns
    /// The processed event, potentially transformed
    async fn process(
        &self,
        event: StreamEvent,
        ctx: &ProtectionContext,
    ) -> anyhow::Result<ProcessedEvent>;
}

/// Processor Pipeline - chains multiple processors
pub struct ProcessorPipeline {
    processors: Vec<Arc<dyn OutputProcessor>>,
}

impl ProcessorPipeline {
    /// Create a new empty pipeline
    pub fn new() -> Self {
        Self { processors: vec![] }
    }

    /// Add a processor to the pipeline
    #[allow(clippy::should_implement_trait)]
    pub fn add(mut self, processor: Arc<dyn OutputProcessor>) -> Self {
        self.processors.push(processor);
        self
    }

    /// Run all processors in sequence
    pub async fn process(
        &self,
        event: StreamEvent,
        ctx: &ProtectionContext,
    ) -> anyhow::Result<ProcessedEvent> {
        let mut current_event = event;
        let mut any_modified = false;
        let mut encrypted_payload = None;

        for processor in &self.processors {
            let result = processor.process(current_event, ctx).await?;
            any_modified = any_modified || result.was_modified;
            encrypted_payload = result.encrypted_payload.or(encrypted_payload);
            current_event = result.event;
        }

        Ok(ProcessedEvent {
            event: current_event,
            was_modified: any_modified,
            encrypted_payload,
        })
    }

    /// Check if the pipeline is empty
    pub fn is_empty(&self) -> bool {
        self.processors.is_empty()
    }

    /// Get the number of processors
    pub fn len(&self) -> usize {
        self.processors.len()
    }
}

impl Default for ProcessorPipeline {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_processed_event_unchanged() {
        let event = StreamEvent::text_start(None);
        let processed = ProcessedEvent::unchanged(event);
        assert!(!processed.was_modified);
        assert!(processed.encrypted_payload.is_none());
    }

    #[test]
    fn test_processed_event_modified() {
        let event = StreamEvent::text_start(None);
        let processed = ProcessedEvent::modified(event);
        assert!(processed.was_modified);
    }

    #[test]
    fn test_processed_event_with_encrypted() {
        let event = StreamEvent::text_start(None);
        let processed =
            ProcessedEvent::modified(event).with_encrypted("encrypted_data".to_string());
        assert!(processed.encrypted_payload.is_some());
        assert_eq!(processed.encrypted_payload.unwrap(), "encrypted_data");
    }

    #[test]
    fn test_pipeline_new() {
        let pipeline = ProcessorPipeline::new();
        assert!(pipeline.is_empty());
        assert_eq!(pipeline.len(), 0);
    }
}