1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// MessageRetriever - Retrieval-only trait for conversation messages
//
// Design decision: MessageRetriever is retrieval-only.
// Messages are stored via EventEmitter (input.message, output.message.completed events).
// This trait provides read access for building LLM context.
use async_trait::async_trait;
use crate::error::Result;
use crate::message::{ContentPart, Controls, Message, MessageRole};
use crate::message_filter::MessageQuery;
use crate::typed_id::{MessageId, SessionId};
// ============================================================================
// InputMessage - Input structure for message creation
// ============================================================================
/// Input message for creating a new message
///
/// This is the input structure for adding messages, without the ID and timestamp
/// which are generated by the storage layer.
///
/// Note: Message creation happens via EventService/EventEmitter, not MessageRetriever.
/// This struct is used by the API layer and in-memory stores for tests.
#[derive(Debug, Clone)]
pub struct InputMessage {
/// Message role (user, assistant, tool_result, system)
pub role: MessageRole,
/// Message content as array of content parts
pub content: Vec<ContentPart>,
/// Runtime controls (model, reasoning, etc.)
pub controls: Option<Controls>,
/// Message-level metadata
pub metadata: Option<std::collections::HashMap<String, serde_json::Value>>,
/// Tags for filtering/categorization
pub tags: Vec<String>,
}
impl InputMessage {
/// Create a new user input message with text content
pub fn user(content: impl Into<String>) -> Self {
Self {
role: MessageRole::User,
content: vec![ContentPart::text(content)],
controls: None,
metadata: None,
tags: vec![],
}
}
/// Create from a Message (useful for storing existing messages)
pub fn from_message(msg: &Message) -> Self {
Self {
role: msg.role.clone(),
content: msg.content.clone(),
controls: msg.controls.clone(),
metadata: msg.metadata.clone(),
tags: vec![],
}
}
}
impl From<&str> for InputMessage {
fn from(text: &str) -> Self {
InputMessage::user(text)
}
}
impl From<String> for InputMessage {
fn from(text: String) -> Self {
InputMessage::user(text)
}
}
// ============================================================================
// MessageRetriever trait
// ============================================================================
/// Trait for retrieving conversation messages
///
/// This trait provides read-only access to conversation history for building
/// LLM context. Message storage is handled separately via EventEmitter
/// (messages are stored as events: input.message, output.message.completed).
///
/// Implementations can:
/// - Load messages from a database (reconstructing from events)
/// - Keep messages in memory for testing
/// - Load messages via gRPC from control-plane
#[async_trait]
pub trait MessageRetriever: Send + Sync {
/// Get a specific message by ID
async fn get(&self, session_id: SessionId, message_id: MessageId) -> Result<Option<Message>>;
/// Load all messages for a session
async fn load(&self, session_id: SessionId) -> Result<Vec<Message>>;
/// Load messages with filters and injections applied.
///
/// This method supports the composable filter system where capabilities
/// can contribute filters that modify how messages are loaded.
///
/// Default implementation calls `load()` and ignores the query filters,
/// maintaining backward compatibility for implementations that don't
/// support filtering.
async fn load_filtered(&self, query: MessageQuery) -> Result<Vec<Message>> {
// Default: load all messages for the session, ignoring filters
// Implementations should override this to support filtering
self.load(query.session_id).await
}
/// Load messages with pagination
async fn load_page(
&self,
session_id: SessionId,
offset: usize,
limit: usize,
) -> Result<Vec<Message>> {
let all = self.load(session_id).await?;
Ok(all.into_iter().skip(offset).take(limit).collect())
}
/// Count messages in a session
async fn count(&self, session_id: SessionId) -> Result<usize> {
Ok(self.load(session_id).await?.len())
}
}
#[async_trait]
impl<T: MessageRetriever + ?Sized> MessageRetriever for std::sync::Arc<T> {
async fn get(&self, session_id: SessionId, message_id: MessageId) -> Result<Option<Message>> {
(**self).get(session_id, message_id).await
}
async fn load(&self, session_id: SessionId) -> Result<Vec<Message>> {
(**self).load(session_id).await
}
async fn load_filtered(&self, query: MessageQuery) -> Result<Vec<Message>> {
(**self).load_filtered(query).await
}
async fn load_page(
&self,
session_id: SessionId,
offset: usize,
limit: usize,
) -> Result<Vec<Message>> {
(**self).load_page(session_id, offset, limit).await
}
async fn count(&self, session_id: SessionId) -> Result<usize> {
(**self).count(session_id).await
}
}