1use std::collections::HashSet;
2
3use crate::session_model::{ConversationRecord, ProtocolEvent, SessionEventRecord};
4use crate::{Message, MessageOrigin, MessageRole, MessageSequence, Part};
5
6#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
7pub struct ChronologicalProjection {
8 entries: Vec<ChronologicalEntry>,
9}
10
11#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
12pub struct ChronologicalEntry {
13 pub index: usize,
14 pub payload: ChronologicalPayload,
15}
16
17#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
18#[serde(tag = "kind", rename_all = "snake_case")]
19#[allow(clippy::large_enum_variant)]
20pub enum ChronologicalPayload {
21 Message(Message),
22 ProtocolEvent(crate::session_model::ProtocolEvent),
23}
24
25#[derive(Clone, Copy, Debug)]
26pub struct BorrowedChronologicalEntry<'a> {
27 pub index: usize,
28 pub payload: BorrowedChronologicalPayload<'a>,
29}
30
31#[derive(Clone, Copy, Debug)]
32pub enum BorrowedChronologicalPayload<'a> {
33 Message(BorrowedChronologicalMessage<'a>),
34 ProtocolEvent(&'a ProtocolEvent),
35}
36
37#[derive(Clone, Copy, Debug)]
38pub struct BorrowedChronologicalMessage<'a> {
39 pub id: &'a str,
40 pub role: MessageRole,
41 pub parts: &'a [Part],
42 pub origin: Option<&'a MessageOrigin>,
43}
44
45impl<'a> BorrowedChronologicalMessage<'a> {
46 fn from_message(message: &'a Message) -> Self {
47 Self {
48 id: &message.id,
49 role: message.role,
50 parts: message.parts.as_slice(),
51 origin: message.origin.as_ref(),
52 }
53 }
54
55 fn from_record(record: &'a ConversationRecord) -> Self {
56 Self {
57 id: &record.id,
58 role: record.role,
59 parts: record.parts.as_slice(),
60 origin: record.origin.as_ref(),
61 }
62 }
63
64 pub fn is_transient(&self) -> bool {
65 matches!(
66 self.origin,
67 Some(MessageOrigin::Plugin {
68 transient: true,
69 ..
70 })
71 )
72 }
73
74 fn to_owned(self) -> Message {
75 Message {
76 id: self.id.to_string(),
77 role: self.role,
78 parts: std::sync::Arc::new(self.parts.to_vec()),
79 origin: self.origin.cloned(),
80 }
81 }
82}
83
84impl ChronologicalProjection {
85 pub(crate) fn from_read_model(read_model: &crate::session_graph::SessionReadModel) -> Self {
86 Self::from_active_read(
87 read_model.active_events.as_slice(),
88 read_model.messages.as_slice(),
89 )
90 }
91
92 pub fn from_turn_view(events: &[SessionEventRecord], messages: &MessageSequence) -> Self {
93 Self::from_active_read(events, messages.as_slice())
94 }
95
96 fn from_active_read(active_events: &[SessionEventRecord], messages: &[Message]) -> Self {
97 let mut projection = Self::default();
98 projection
99 .entries
100 .reserve(active_events.len().saturating_add(messages.len()));
101 visit_active_read(active_events, messages, |entry| {
102 projection.push(match entry.payload {
103 BorrowedChronologicalPayload::Message(message) => {
104 ChronologicalPayload::Message(message.to_owned())
105 }
106 BorrowedChronologicalPayload::ProtocolEvent(event) => {
107 ChronologicalPayload::ProtocolEvent(event.clone())
108 }
109 });
110 });
111 projection
112 }
113
114 fn push(&mut self, payload: ChronologicalPayload) {
115 let index = self.entries.len();
116 self.entries.push(ChronologicalEntry { index, payload });
117 }
118
119 pub fn entries(&self) -> &[ChronologicalEntry] {
120 self.entries.as_slice()
121 }
122
123 pub fn into_entries(self) -> Vec<ChronologicalEntry> {
124 self.entries
125 }
126}
127
128pub fn visit_turn_view<'a>(
129 events: &'a [SessionEventRecord],
130 messages: &'a MessageSequence,
131 visit: impl FnMut(BorrowedChronologicalEntry<'a>),
132) {
133 visit_active_read(events, messages.as_slice(), visit);
134}
135
136fn visit_active_read<'a>(
137 active_events: &'a [SessionEventRecord],
138 messages: &'a [Message],
139 mut visit: impl FnMut(BorrowedChronologicalEntry<'a>),
140) {
141 if active_events.is_empty() {
142 visit_transcript(messages, visit);
143 return;
144 }
145
146 let mut index = 0;
147 let mut seen_messages = HashSet::new();
148
149 for event in active_events {
150 match event {
151 SessionEventRecord::Conversation(record) => {
152 let message = BorrowedChronologicalMessage::from_record(record);
153 if !message.is_transient() && seen_messages.insert(message.id.to_string()) {
154 visit(BorrowedChronologicalEntry {
155 index,
156 payload: BorrowedChronologicalPayload::Message(message),
157 });
158 index += 1;
159 }
160 }
161 SessionEventRecord::Protocol(event) => {
162 visit(BorrowedChronologicalEntry {
163 index,
164 payload: BorrowedChronologicalPayload::ProtocolEvent(event),
165 });
166 index += 1;
167 }
168 }
169 }
170
171 seen_messages.reserve(messages.len());
172 for message in messages {
173 let message = BorrowedChronologicalMessage::from_message(message);
174 if !message.is_transient() && seen_messages.insert(message.id.to_string()) {
175 visit(BorrowedChronologicalEntry {
176 index,
177 payload: BorrowedChronologicalPayload::Message(message),
178 });
179 index += 1;
180 }
181 }
182}
183
184fn visit_transcript<'a>(
185 messages: &'a [Message],
186 mut visit: impl FnMut(BorrowedChronologicalEntry<'a>),
187) {
188 for (index, message) in messages
189 .iter()
190 .filter(|message| !message.is_transient())
191 .enumerate()
192 {
193 visit(BorrowedChronologicalEntry {
194 index,
195 payload: BorrowedChronologicalPayload::Message(
196 BorrowedChronologicalMessage::from_message(message),
197 ),
198 });
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::session_model::ConversationRecord;
206 use crate::{PartKind, PruneState, shared_parts};
207
208 fn text_message(id: &str, role: MessageRole, text: &str) -> Message {
209 Message {
210 id: id.to_string(),
211 role,
212 parts: shared_parts(vec![Part {
213 id: format!("{id}.p0"),
214 kind: PartKind::Text,
215 content: text.to_string(),
216 attachment: None,
217 tool_call_id: None,
218 tool_name: None,
219 tool_replay: None,
220 prune_state: PruneState::Intact,
221 reasoning_meta: None,
222 response_meta: None,
223 }]),
224 origin: None,
225 }
226 }
227
228 fn tool_result_message(id: &str, call_id: &str) -> Message {
229 let mut message = text_message(id, MessageRole::User, "tool result");
230 std::sync::Arc::make_mut(&mut message.parts)[0].tool_call_id = Some(call_id.to_string());
231 message
232 }
233
234 fn transient_message(id: &str) -> Message {
235 let mut message = text_message(id, MessageRole::System, "transient");
236 message.origin = Some(MessageOrigin::Plugin {
237 plugin_id: "test".to_string(),
238 transient: true,
239 });
240 message
241 }
242
243 fn protocol_event(value: &str) -> ProtocolEvent {
244 ProtocolEvent {
245 plugin_id: "test_protocol".to_string(),
246 payload: serde_json::json!({ "value": value }),
247 }
248 }
249
250 fn owned_summary(projection: &ChronologicalProjection) -> Vec<String> {
251 projection
252 .entries()
253 .iter()
254 .map(|entry| match &entry.payload {
255 ChronologicalPayload::Message(message) => {
256 format!("{}:message:{}", entry.index, message.id)
257 }
258 ChronologicalPayload::ProtocolEvent(event) => {
259 format!("{}:protocol:{}", entry.index, event.payload)
260 }
261 })
262 .collect()
263 }
264
265 fn borrowed_summary(events: &[SessionEventRecord], messages: &MessageSequence) -> Vec<String> {
266 let mut summary = Vec::new();
267 visit_turn_view(events, messages, |entry| {
268 summary.push(match entry.payload {
269 BorrowedChronologicalPayload::Message(message) => {
270 format!("{}:message:{}", entry.index, message.id)
271 }
272 BorrowedChronologicalPayload::ProtocolEvent(event) => {
273 format!("{}:protocol:{}", entry.index, event.payload)
274 }
275 });
276 });
277 summary
278 }
279
280 #[test]
281 fn borrowed_turn_view_matches_owned_active_event_projection() {
282 let m1 = text_message("m1", MessageRole::User, "first");
283 let m2 = text_message("m2", MessageRole::Assistant, "second");
284 let events = vec![
285 SessionEventRecord::Conversation(ConversationRecord::from_message(m1.clone())),
286 SessionEventRecord::Protocol(protocol_event("step")),
287 SessionEventRecord::Conversation(ConversationRecord::from_message(m1.clone())),
288 ];
289 let messages = MessageSequence::from_owned(vec![m1, m2]);
290 let projection = ChronologicalProjection::from_turn_view(&events, &messages);
291
292 assert_eq!(
293 borrowed_summary(&events, &messages),
294 owned_summary(&projection)
295 );
296 }
297
298 #[test]
299 fn borrowed_turn_view_matches_owned_transcript_fallback_projection() {
300 let messages = MessageSequence::from_owned(vec![
301 tool_result_message("m1", "call-1"),
302 transient_message("transient"),
303 text_message("m2", MessageRole::Assistant, "second"),
304 ]);
305 let events = Vec::new();
306 let projection = ChronologicalProjection::from_turn_view(&events, &messages);
307
308 assert_eq!(
309 borrowed_summary(&events, &messages),
310 owned_summary(&projection)
311 );
312 }
313}