enact-core 0.0.1

Core agent runtime for Enact - Graph-Native AI agents
Documentation
//! Inbox Store - Storage for inbox messages
//!
//! ## Invariants
//!
//! - **INV-INBOX-002**: Control messages (pause/cancel) MUST be processed first
//!   - Implemented via `priority_order()` sorting in `drain_messages()`
//! - **INV-INBOX-004**: Messages are scoped to a specific ExecutionId
//!   - All operations require ExecutionId parameter
//!
//! @see docs/TECHNICAL/31-MID-EXECUTION-GUIDANCE.md

use super::message::InboxMessage;
use crate::kernel::ExecutionId;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

/// InboxStore trait - async storage for inbox messages
///
/// Implementations must ensure thread-safety for concurrent access.
pub trait InboxStore: Send + Sync {
    /// Push a message to the inbox for a specific execution
    ///
    /// ## Arguments
    /// * `execution_id` - Target execution (INV-INBOX-004)
    /// * `message` - The message to push
    fn push(&self, execution_id: &ExecutionId, message: InboxMessage);

    /// Get the number of pending messages for an execution
    fn len(&self, execution_id: &ExecutionId) -> usize;

    /// Check if there are any pending messages
    fn is_empty(&self, execution_id: &ExecutionId) -> bool {
        self.len(execution_id) == 0
    }

    /// Check if there are any control messages (highest priority)
    ///
    /// Used for fast-path cancellation/pause checks without draining.
    fn has_control_messages(&self, execution_id: &ExecutionId) -> bool;

    /// Drain all messages for an execution, sorted by priority
    ///
    /// ## Invariant INV-INBOX-002
    /// Messages are returned sorted by priority_order():
    /// 1. Control (pause/resume/cancel) - highest
    /// 2. Evidence (contradicts_plan)
    /// 3. Evidence (other)
    /// 4. Guidance (high priority)
    /// 5. Guidance (other)
    /// 6. A2A - lowest
    fn drain_messages(&self, execution_id: &ExecutionId) -> Vec<InboxMessage>;

    /// Peek at the next message without removing it
    fn peek(&self, execution_id: &ExecutionId) -> Option<InboxMessage>;

    /// Pop the highest-priority message
    fn pop(&self, execution_id: &ExecutionId) -> Option<InboxMessage>;

    /// Clear all messages for an execution
    fn clear(&self, execution_id: &ExecutionId);
}

/// In-memory inbox store implementation
///
/// Thread-safe storage using RwLock. Suitable for single-node deployments.
/// For distributed deployments, use Redis-backed implementation.
#[derive(Default)]
pub struct InMemoryInboxStore {
    /// Messages keyed by ExecutionId
    messages: RwLock<HashMap<String, Vec<InboxMessage>>>,
}

impl InMemoryInboxStore {
    /// Create a new empty inbox store
    pub fn new() -> Self {
        Self {
            messages: RwLock::new(HashMap::new()),
        }
    }

    /// Create an Arc-wrapped instance for sharing
    pub fn shared() -> Arc<Self> {
        Arc::new(Self::new())
    }
}

impl InboxStore for InMemoryInboxStore {
    fn push(&self, execution_id: &ExecutionId, message: InboxMessage) {
        let mut guard = self.messages.write().expect("lock poisoned");
        guard
            .entry(execution_id.to_string())
            .or_default()
            .push(message);
    }

    fn len(&self, execution_id: &ExecutionId) -> usize {
        let guard = self.messages.read().expect("lock poisoned");
        guard
            .get(&execution_id.to_string())
            .map(|v| v.len())
            .unwrap_or(0)
    }

    fn has_control_messages(&self, execution_id: &ExecutionId) -> bool {
        let guard = self.messages.read().expect("lock poisoned");
        guard
            .get(&execution_id.to_string())
            .map(|v| v.iter().any(|m| m.is_control()))
            .unwrap_or(false)
    }

    fn drain_messages(&self, execution_id: &ExecutionId) -> Vec<InboxMessage> {
        let mut guard = self.messages.write().expect("lock poisoned");
        let mut messages = guard.remove(&execution_id.to_string()).unwrap_or_default();

        // INV-INBOX-002: Sort by priority (control messages first)
        messages.sort_by_key(|m| m.priority_order());

        messages
    }

    fn peek(&self, execution_id: &ExecutionId) -> Option<InboxMessage> {
        let guard = self.messages.read().expect("lock poisoned");
        guard
            .get(&execution_id.to_string())
            .and_then(|v| {
                // Return highest priority message
                v.iter()
                    .min_by_key(|m| m.priority_order())
                    .cloned()
            })
    }

    fn pop(&self, execution_id: &ExecutionId) -> Option<InboxMessage> {
        let mut guard = self.messages.write().expect("lock poisoned");
        let messages = guard.get_mut(&execution_id.to_string())?;

        if messages.is_empty() {
            return None;
        }

        // Find index of highest priority message (INV-INBOX-002)
        let min_idx = messages
            .iter()
            .enumerate()
            .min_by_key(|(_, m)| m.priority_order())
            .map(|(i, _)| i)?;

        Some(messages.remove(min_idx))
    }

    fn clear(&self, execution_id: &ExecutionId) {
        let mut guard = self.messages.write().expect("lock poisoned");
        guard.remove(&execution_id.to_string());
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::inbox::message::{
        ControlAction, ControlMessage, EvidenceImpact, EvidenceUpdate, EvidenceSource,
        GuidanceMessage,
    };

    fn test_execution_id() -> ExecutionId {
        ExecutionId::new()
    }

    #[test]
    fn test_push_and_drain() {
        let store = InMemoryInboxStore::new();
        let exec_id = test_execution_id();

        // Push messages
        let guidance = InboxMessage::Guidance(
            GuidanceMessage::from_user(exec_id.clone(), "Focus on EU"),
        );
        store.push(&exec_id, guidance);

        assert_eq!(store.len(&exec_id), 1);
        assert!(!store.is_empty(&exec_id));

        // Drain
        let messages = store.drain_messages(&exec_id);
        assert_eq!(messages.len(), 1);
        assert!(store.is_empty(&exec_id));
    }

    #[test]
    fn test_priority_sorting_inv_inbox_002() {
        let store = InMemoryInboxStore::new();
        let exec_id = test_execution_id();

        // Push in reverse priority order
        let guidance = InboxMessage::Guidance(
            GuidanceMessage::from_user(exec_id.clone(), "low priority"),
        );
        let evidence = InboxMessage::Evidence(EvidenceUpdate::new(
            exec_id.clone(),
            EvidenceSource::Discovery,
            "Found something",
            serde_json::json!({}),
            EvidenceImpact::Informational,
        ));
        let control = InboxMessage::Control(ControlMessage::new(
            exec_id.clone(),
            ControlAction::Pause,
            "admin",
        ));

        store.push(&exec_id, guidance);
        store.push(&exec_id, evidence);
        store.push(&exec_id, control);

        // Drain should return sorted by priority
        let messages = store.drain_messages(&exec_id);
        assert_eq!(messages.len(), 3);

        // INV-INBOX-002: Control first
        assert!(messages[0].is_control());
        assert!(matches!(messages[1], InboxMessage::Evidence(_)));
        assert!(matches!(messages[2], InboxMessage::Guidance(_)));
    }

    #[test]
    fn test_has_control_messages() {
        let store = InMemoryInboxStore::new();
        let exec_id = test_execution_id();

        // No messages
        assert!(!store.has_control_messages(&exec_id));

        // Add guidance only
        let guidance = InboxMessage::Guidance(
            GuidanceMessage::from_user(exec_id.clone(), "test"),
        );
        store.push(&exec_id, guidance);
        assert!(!store.has_control_messages(&exec_id));

        // Add control message
        let control = InboxMessage::Control(ControlMessage::new(
            exec_id.clone(),
            ControlAction::Cancel,
            "admin",
        ));
        store.push(&exec_id, control);
        assert!(store.has_control_messages(&exec_id));
    }

    #[test]
    fn test_pop_returns_highest_priority() {
        let store = InMemoryInboxStore::new();
        let exec_id = test_execution_id();

        // Push low priority first, then high priority
        let guidance = InboxMessage::Guidance(
            GuidanceMessage::from_user(exec_id.clone(), "low"),
        );
        let control = InboxMessage::Control(ControlMessage::new(
            exec_id.clone(),
            ControlAction::Pause,
            "admin",
        ));

        store.push(&exec_id, guidance);
        store.push(&exec_id, control);

        // Pop should return control first (highest priority)
        let msg = store.pop(&exec_id).unwrap();
        assert!(msg.is_control());

        // Next pop returns guidance
        let msg = store.pop(&exec_id).unwrap();
        assert!(matches!(msg, InboxMessage::Guidance(_)));

        // Empty now
        assert!(store.pop(&exec_id).is_none());
    }

    #[test]
    fn test_peek_does_not_remove() {
        let store = InMemoryInboxStore::new();
        let exec_id = test_execution_id();

        let control = InboxMessage::Control(ControlMessage::new(
            exec_id.clone(),
            ControlAction::Pause,
            "admin",
        ));
        store.push(&exec_id, control);

        // Peek multiple times - message remains
        let msg1 = store.peek(&exec_id);
        let msg2 = store.peek(&exec_id);

        assert!(msg1.is_some());
        assert!(msg2.is_some());
        assert_eq!(store.len(&exec_id), 1);
    }

    #[test]
    fn test_execution_isolation_inv_inbox_004() {
        let store = InMemoryInboxStore::new();
        let exec_id_1 = test_execution_id();
        let exec_id_2 = test_execution_id();

        // Push to different executions
        let control_1 = InboxMessage::Control(ControlMessage::new(
            exec_id_1.clone(),
            ControlAction::Pause,
            "admin",
        ));
        let control_2 = InboxMessage::Control(ControlMessage::new(
            exec_id_2.clone(),
            ControlAction::Cancel,
            "admin",
        ));

        store.push(&exec_id_1, control_1);
        store.push(&exec_id_2, control_2);

        // Each execution has its own messages
        assert_eq!(store.len(&exec_id_1), 1);
        assert_eq!(store.len(&exec_id_2), 1);

        // Drain exec_1 doesn't affect exec_2
        let msgs = store.drain_messages(&exec_id_1);
        assert_eq!(msgs.len(), 1);
        assert_eq!(store.len(&exec_id_1), 0);
        assert_eq!(store.len(&exec_id_2), 1);
    }

    #[test]
    fn test_clear() {
        let store = InMemoryInboxStore::new();
        let exec_id = test_execution_id();

        for _ in 0..5 {
            let control = InboxMessage::Control(ControlMessage::new(
                exec_id.clone(),
                ControlAction::Pause,
                "admin",
            ));
            store.push(&exec_id, control);
        }

        assert_eq!(store.len(&exec_id), 5);

        store.clear(&exec_id);

        assert_eq!(store.len(&exec_id), 0);
        assert!(store.is_empty(&exec_id));
    }
}