everruns-core 0.9.0

Core agent abstractions for Everruns - agent loop, events, tools, LLM providers
Documentation
// MessageRetriever - Retrieval-only trait for conversation messages
//
// Design decision: MessageRetriever is retrieval-only.
// Messages are stored via EventEmitter (input.message, output.message.completed events).
// This trait provides read access for building LLM context.

use async_trait::async_trait;

use crate::error::Result;
use crate::message::{ContentPart, Controls, Message, MessageRole};
use crate::message_filter::MessageQuery;
use crate::typed_id::{MessageId, SessionId};

// ============================================================================
// InputMessage - Input structure for message creation
// ============================================================================

/// Input message for creating a new message
///
/// This is the input structure for adding messages, without the ID and timestamp
/// which are generated by the storage layer.
///
/// Note: Message creation happens via EventService/EventEmitter, not MessageRetriever.
/// This struct is used by the API layer and in-memory stores for tests.
#[derive(Debug, Clone)]
pub struct InputMessage {
    /// Message role (user, assistant, tool_result, system)
    pub role: MessageRole,
    /// Message content as array of content parts
    pub content: Vec<ContentPart>,
    /// Runtime controls (model, reasoning, etc.)
    pub controls: Option<Controls>,
    /// Message-level metadata
    pub metadata: Option<std::collections::HashMap<String, serde_json::Value>>,
    /// Tags for filtering/categorization
    pub tags: Vec<String>,
}

impl InputMessage {
    /// Create a new user input message with text content
    pub fn user(content: impl Into<String>) -> Self {
        Self {
            role: MessageRole::User,
            content: vec![ContentPart::text(content)],
            controls: None,
            metadata: None,
            tags: vec![],
        }
    }

    /// Create from a Message (useful for storing existing messages)
    pub fn from_message(msg: &Message) -> Self {
        Self {
            role: msg.role.clone(),
            content: msg.content.clone(),
            controls: msg.controls.clone(),
            metadata: msg.metadata.clone(),
            tags: vec![],
        }
    }
}

impl From<&str> for InputMessage {
    fn from(text: &str) -> Self {
        InputMessage::user(text)
    }
}

impl From<String> for InputMessage {
    fn from(text: String) -> Self {
        InputMessage::user(text)
    }
}

// ============================================================================
// MessageRetriever trait
// ============================================================================

/// Trait for retrieving conversation messages
///
/// This trait provides read-only access to conversation history for building
/// LLM context. Message storage is handled separately via EventEmitter
/// (messages are stored as events: input.message, output.message.completed).
///
/// Implementations can:
/// - Load messages from a database (reconstructing from events)
/// - Keep messages in memory for testing
/// - Load messages via gRPC from control-plane
#[async_trait]
pub trait MessageRetriever: Send + Sync {
    /// Get a specific message by ID
    async fn get(&self, session_id: SessionId, message_id: MessageId) -> Result<Option<Message>>;

    /// Load all messages for a session
    async fn load(&self, session_id: SessionId) -> Result<Vec<Message>>;

    /// Load messages with filters and injections applied.
    ///
    /// This method supports the composable filter system where capabilities
    /// can contribute filters that modify how messages are loaded.
    ///
    /// Default implementation calls `load()` and ignores the query filters,
    /// maintaining backward compatibility for implementations that don't
    /// support filtering.
    async fn load_filtered(&self, query: MessageQuery) -> Result<Vec<Message>> {
        // Default: load all messages for the session, ignoring filters
        // Implementations should override this to support filtering
        self.load(query.session_id).await
    }

    /// Load messages with pagination
    async fn load_page(
        &self,
        session_id: SessionId,
        offset: usize,
        limit: usize,
    ) -> Result<Vec<Message>> {
        let all = self.load(session_id).await?;
        Ok(all.into_iter().skip(offset).take(limit).collect())
    }

    /// Count messages in a session
    async fn count(&self, session_id: SessionId) -> Result<usize> {
        Ok(self.load(session_id).await?.len())
    }
}

#[async_trait]
impl<T: MessageRetriever + ?Sized> MessageRetriever for std::sync::Arc<T> {
    async fn get(&self, session_id: SessionId, message_id: MessageId) -> Result<Option<Message>> {
        (**self).get(session_id, message_id).await
    }

    async fn load(&self, session_id: SessionId) -> Result<Vec<Message>> {
        (**self).load(session_id).await
    }

    async fn load_filtered(&self, query: MessageQuery) -> Result<Vec<Message>> {
        (**self).load_filtered(query).await
    }

    async fn load_page(
        &self,
        session_id: SessionId,
        offset: usize,
        limit: usize,
    ) -> Result<Vec<Message>> {
        (**self).load_page(session_id, offset, limit).await
    }

    async fn count(&self, session_id: SessionId) -> Result<usize> {
        (**self).count(session_id).await
    }
}