Skip to main content

everruns_core/
message_retriever.rs

1// MessageRetriever - Retrieval-only trait for conversation messages
2//
3// Design decision: MessageRetriever is retrieval-only.
4// Messages are stored via EventEmitter (input.message, output.message.completed events).
5// This trait provides read access for building LLM context.
6
7use async_trait::async_trait;
8
9use crate::error::Result;
10use crate::message::{ContentPart, Controls, Message, MessageRole};
11use crate::message_filter::MessageQuery;
12use crate::typed_id::{MessageId, SessionId};
13
14// ============================================================================
15// InputMessage - Input structure for message creation
16// ============================================================================
17
18/// Input message for creating a new message
19///
20/// This is the input structure for adding messages, without the ID and timestamp
21/// which are generated by the storage layer.
22///
23/// Note: Message creation happens via EventService/EventEmitter, not MessageRetriever.
24/// This struct is used by the API layer and in-memory stores for tests.
25#[derive(Debug, Clone)]
26pub struct InputMessage {
27    /// Message role (user, assistant, tool_result, system)
28    pub role: MessageRole,
29    /// Message content as array of content parts
30    pub content: Vec<ContentPart>,
31    /// Runtime controls (model, reasoning, etc.)
32    pub controls: Option<Controls>,
33    /// Message-level metadata
34    pub metadata: Option<std::collections::HashMap<String, serde_json::Value>>,
35    /// Tags for filtering/categorization
36    pub tags: Vec<String>,
37}
38
39impl InputMessage {
40    /// Create a new user input message with text content
41    pub fn user(content: impl Into<String>) -> Self {
42        Self {
43            role: MessageRole::User,
44            content: vec![ContentPart::text(content)],
45            controls: None,
46            metadata: None,
47            tags: vec![],
48        }
49    }
50
51    /// Create from a Message (useful for storing existing messages)
52    pub fn from_message(msg: &Message) -> Self {
53        Self {
54            role: msg.role.clone(),
55            content: msg.content.clone(),
56            controls: msg.controls.clone(),
57            metadata: msg.metadata.clone(),
58            tags: vec![],
59        }
60    }
61}
62
63impl From<&str> for InputMessage {
64    fn from(text: &str) -> Self {
65        InputMessage::user(text)
66    }
67}
68
69impl From<String> for InputMessage {
70    fn from(text: String) -> Self {
71        InputMessage::user(text)
72    }
73}
74
75// ============================================================================
76// MessageRetriever trait
77// ============================================================================
78
79/// Trait for retrieving conversation messages
80///
81/// This trait provides read-only access to conversation history for building
82/// LLM context. Message storage is handled separately via EventEmitter
83/// (messages are stored as events: input.message, output.message.completed).
84///
85/// Implementations can:
86/// - Load messages from a database (reconstructing from events)
87/// - Keep messages in memory for testing
88/// - Load messages via gRPC from control-plane
89#[async_trait]
90pub trait MessageRetriever: Send + Sync {
91    /// Get a specific message by ID
92    async fn get(&self, session_id: SessionId, message_id: MessageId) -> Result<Option<Message>>;
93
94    /// Load all messages for a session
95    async fn load(&self, session_id: SessionId) -> Result<Vec<Message>>;
96
97    /// Load messages with filters and injections applied.
98    ///
99    /// This method supports the composable filter system where capabilities
100    /// can contribute filters that modify how messages are loaded.
101    ///
102    /// Default implementation calls `load()` and ignores the query filters,
103    /// maintaining backward compatibility for implementations that don't
104    /// support filtering.
105    async fn load_filtered(&self, query: MessageQuery) -> Result<Vec<Message>> {
106        // Default: load all messages for the session, ignoring filters
107        // Implementations should override this to support filtering
108        self.load(query.session_id).await
109    }
110
111    /// Load messages with pagination
112    async fn load_page(
113        &self,
114        session_id: SessionId,
115        offset: usize,
116        limit: usize,
117    ) -> Result<Vec<Message>> {
118        let all = self.load(session_id).await?;
119        Ok(all.into_iter().skip(offset).take(limit).collect())
120    }
121
122    /// Count messages in a session
123    async fn count(&self, session_id: SessionId) -> Result<usize> {
124        Ok(self.load(session_id).await?.len())
125    }
126}
127
128#[async_trait]
129impl<T: MessageRetriever + ?Sized> MessageRetriever for std::sync::Arc<T> {
130    async fn get(&self, session_id: SessionId, message_id: MessageId) -> Result<Option<Message>> {
131        (**self).get(session_id, message_id).await
132    }
133
134    async fn load(&self, session_id: SessionId) -> Result<Vec<Message>> {
135        (**self).load(session_id).await
136    }
137
138    async fn load_filtered(&self, query: MessageQuery) -> Result<Vec<Message>> {
139        (**self).load_filtered(query).await
140    }
141
142    async fn load_page(
143        &self,
144        session_id: SessionId,
145        offset: usize,
146        limit: usize,
147    ) -> Result<Vec<Message>> {
148        (**self).load_page(session_id, offset, limit).await
149    }
150
151    async fn count(&self, session_id: SessionId) -> Result<usize> {
152        (**self).count(session_id).await
153    }
154}