everruns_core/message_retriever.rs
1// MessageRetriever - Retrieval-only trait for conversation messages
2//
3// Design decision: MessageRetriever is retrieval-only.
4// Messages are stored via EventEmitter (input.message, output.message.completed events).
5// This trait provides read access for building LLM context.
6
7use async_trait::async_trait;
8
9use crate::error::Result;
10use crate::message::{ContentPart, Controls, Message, MessageRole};
11use crate::message_filter::MessageQuery;
12use crate::typed_id::{MessageId, SessionId};
13
14// ============================================================================
15// InputMessage - Input structure for message creation
16// ============================================================================
17
18/// Input message for creating a new message
19///
20/// This is the input structure for adding messages, without the ID and timestamp
21/// which are generated by the storage layer.
22///
23/// Note: Message creation happens via EventService/EventEmitter, not MessageRetriever.
24/// This struct is used by the API layer and in-memory stores for tests.
25#[derive(Debug, Clone)]
26pub struct InputMessage {
27 /// Message role (user, assistant, tool_result, system)
28 pub role: MessageRole,
29 /// Message content as array of content parts
30 pub content: Vec<ContentPart>,
31 /// Runtime controls (model, reasoning, etc.)
32 pub controls: Option<Controls>,
33 /// Message-level metadata
34 pub metadata: Option<std::collections::HashMap<String, serde_json::Value>>,
35 /// Tags for filtering/categorization
36 pub tags: Vec<String>,
37}
38
39impl InputMessage {
40 /// Create a new user input message with text content
41 pub fn user(content: impl Into<String>) -> Self {
42 Self {
43 role: MessageRole::User,
44 content: vec![ContentPart::text(content)],
45 controls: None,
46 metadata: None,
47 tags: vec![],
48 }
49 }
50
51 /// Create from a Message (useful for storing existing messages)
52 pub fn from_message(msg: &Message) -> Self {
53 Self {
54 role: msg.role.clone(),
55 content: msg.content.clone(),
56 controls: msg.controls.clone(),
57 metadata: msg.metadata.clone(),
58 tags: vec![],
59 }
60 }
61}
62
63impl From<&str> for InputMessage {
64 fn from(text: &str) -> Self {
65 InputMessage::user(text)
66 }
67}
68
69impl From<String> for InputMessage {
70 fn from(text: String) -> Self {
71 InputMessage::user(text)
72 }
73}
74
75// ============================================================================
76// MessageRetriever trait
77// ============================================================================
78
79/// Trait for retrieving conversation messages
80///
81/// This trait provides read-only access to conversation history for building
82/// LLM context. Message storage is handled separately via EventEmitter
83/// (messages are stored as events: input.message, output.message.completed).
84///
85/// Implementations can:
86/// - Load messages from a database (reconstructing from events)
87/// - Keep messages in memory for testing
88/// - Load messages via gRPC from control-plane
89#[async_trait]
90pub trait MessageRetriever: Send + Sync {
91 /// Get a specific message by ID
92 async fn get(&self, session_id: SessionId, message_id: MessageId) -> Result<Option<Message>>;
93
94 /// Load all messages for a session
95 async fn load(&self, session_id: SessionId) -> Result<Vec<Message>>;
96
97 /// Load messages with filters and injections applied.
98 ///
99 /// This method supports the composable filter system where capabilities
100 /// can contribute filters that modify how messages are loaded.
101 ///
102 /// Default implementation calls `load()` and ignores the query filters,
103 /// maintaining backward compatibility for implementations that don't
104 /// support filtering.
105 async fn load_filtered(&self, query: MessageQuery) -> Result<Vec<Message>> {
106 // Default: load all messages for the session, ignoring filters
107 // Implementations should override this to support filtering
108 self.load(query.session_id).await
109 }
110
111 /// Load messages with pagination
112 async fn load_page(
113 &self,
114 session_id: SessionId,
115 offset: usize,
116 limit: usize,
117 ) -> Result<Vec<Message>> {
118 let all = self.load(session_id).await?;
119 Ok(all.into_iter().skip(offset).take(limit).collect())
120 }
121
122 /// Count messages in a session
123 async fn count(&self, session_id: SessionId) -> Result<usize> {
124 Ok(self.load(session_id).await?.len())
125 }
126}
127
128#[async_trait]
129impl<T: MessageRetriever + ?Sized> MessageRetriever for std::sync::Arc<T> {
130 async fn get(&self, session_id: SessionId, message_id: MessageId) -> Result<Option<Message>> {
131 (**self).get(session_id, message_id).await
132 }
133
134 async fn load(&self, session_id: SessionId) -> Result<Vec<Message>> {
135 (**self).load(session_id).await
136 }
137
138 async fn load_filtered(&self, query: MessageQuery) -> Result<Vec<Message>> {
139 (**self).load_filtered(query).await
140 }
141
142 async fn load_page(
143 &self,
144 session_id: SessionId,
145 offset: usize,
146 limit: usize,
147 ) -> Result<Vec<Message>> {
148 (**self).load_page(session_id, offset, limit).await
149 }
150
151 async fn count(&self, session_id: SessionId) -> Result<usize> {
152 (**self).count(session_id).await
153 }
154}