Skip to main content

lash_core/
chronological.rs

1use std::collections::HashSet;
2
3use crate::session_model::{ConversationRecord, ProtocolEvent, SessionEventRecord};
4use crate::{Message, MessageOrigin, MessageRole, MessageSequence, Part};
5
6#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
7pub struct ChronologicalProjection {
8    entries: Vec<ChronologicalEntry>,
9}
10
11#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
12pub struct ChronologicalEntry {
13    pub index: usize,
14    pub payload: ChronologicalPayload,
15}
16
17#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
18#[serde(tag = "kind", rename_all = "snake_case")]
19#[allow(clippy::large_enum_variant)]
20pub enum ChronologicalPayload {
21    Message(Message),
22    ProtocolEvent(crate::session_model::ProtocolEvent),
23}
24
25#[derive(Clone, Copy, Debug)]
26pub struct BorrowedChronologicalEntry<'a> {
27    pub index: usize,
28    pub payload: BorrowedChronologicalPayload<'a>,
29}
30
31#[derive(Clone, Copy, Debug)]
32pub enum BorrowedChronologicalPayload<'a> {
33    Message(BorrowedChronologicalMessage<'a>),
34    ProtocolEvent(&'a ProtocolEvent),
35}
36
37#[derive(Clone, Copy, Debug)]
38pub struct BorrowedChronologicalMessage<'a> {
39    pub id: &'a str,
40    pub role: MessageRole,
41    pub parts: &'a [Part],
42    pub origin: Option<&'a MessageOrigin>,
43}
44
45impl<'a> BorrowedChronologicalMessage<'a> {
46    fn from_message(message: &'a Message) -> Self {
47        Self {
48            id: &message.id,
49            role: message.role,
50            parts: message.parts.as_slice(),
51            origin: message.origin.as_ref(),
52        }
53    }
54
55    fn from_record(record: &'a ConversationRecord) -> Self {
56        Self {
57            id: &record.id,
58            role: record.role,
59            parts: record.parts.as_slice(),
60            origin: record.origin.as_ref(),
61        }
62    }
63
64    pub fn is_transient(&self) -> bool {
65        matches!(
66            self.origin,
67            Some(MessageOrigin::Plugin {
68                transient: true,
69                ..
70            })
71        )
72    }
73
74    fn to_owned(self) -> Message {
75        Message {
76            id: self.id.to_string(),
77            role: self.role,
78            parts: std::sync::Arc::new(self.parts.to_vec()),
79            origin: self.origin.cloned(),
80        }
81    }
82}
83
84impl ChronologicalProjection {
85    pub(crate) fn from_read_model(read_model: &crate::session_graph::SessionReadModel) -> Self {
86        Self::from_active_read(
87            read_model.active_events.as_slice(),
88            read_model.messages.as_slice(),
89        )
90    }
91
92    pub fn from_turn_view(events: &[SessionEventRecord], messages: &MessageSequence) -> Self {
93        Self::from_active_read(events, messages.as_slice())
94    }
95
96    fn from_active_read(active_events: &[SessionEventRecord], messages: &[Message]) -> Self {
97        let mut projection = Self::default();
98        projection
99            .entries
100            .reserve(active_events.len().saturating_add(messages.len()));
101        visit_active_read(active_events, messages, |entry| {
102            projection.push(match entry.payload {
103                BorrowedChronologicalPayload::Message(message) => {
104                    ChronologicalPayload::Message(message.to_owned())
105                }
106                BorrowedChronologicalPayload::ProtocolEvent(event) => {
107                    ChronologicalPayload::ProtocolEvent(event.clone())
108                }
109            });
110        });
111        projection
112    }
113
114    fn push(&mut self, payload: ChronologicalPayload) {
115        let index = self.entries.len();
116        self.entries.push(ChronologicalEntry { index, payload });
117    }
118
119    pub fn entries(&self) -> &[ChronologicalEntry] {
120        self.entries.as_slice()
121    }
122
123    pub fn into_entries(self) -> Vec<ChronologicalEntry> {
124        self.entries
125    }
126}
127
128pub fn visit_turn_view<'a>(
129    events: &'a [SessionEventRecord],
130    messages: &'a MessageSequence,
131    visit: impl FnMut(BorrowedChronologicalEntry<'a>),
132) {
133    visit_active_read(events, messages.as_slice(), visit);
134}
135
136fn visit_active_read<'a>(
137    active_events: &'a [SessionEventRecord],
138    messages: &'a [Message],
139    mut visit: impl FnMut(BorrowedChronologicalEntry<'a>),
140) {
141    if active_events.is_empty() {
142        visit_transcript(messages, visit);
143        return;
144    }
145
146    let mut index = 0;
147    let mut seen_messages = HashSet::new();
148
149    for event in active_events {
150        match event {
151            SessionEventRecord::Conversation(record) => {
152                let message = BorrowedChronologicalMessage::from_record(record);
153                if !message.is_transient() && seen_messages.insert(message.id.to_string()) {
154                    visit(BorrowedChronologicalEntry {
155                        index,
156                        payload: BorrowedChronologicalPayload::Message(message),
157                    });
158                    index += 1;
159                }
160            }
161            SessionEventRecord::Protocol(event) => {
162                visit(BorrowedChronologicalEntry {
163                    index,
164                    payload: BorrowedChronologicalPayload::ProtocolEvent(event),
165                });
166                index += 1;
167            }
168        }
169    }
170
171    seen_messages.reserve(messages.len());
172    for message in messages {
173        let message = BorrowedChronologicalMessage::from_message(message);
174        if !message.is_transient() && seen_messages.insert(message.id.to_string()) {
175            visit(BorrowedChronologicalEntry {
176                index,
177                payload: BorrowedChronologicalPayload::Message(message),
178            });
179            index += 1;
180        }
181    }
182}
183
184fn visit_transcript<'a>(
185    messages: &'a [Message],
186    mut visit: impl FnMut(BorrowedChronologicalEntry<'a>),
187) {
188    for (index, message) in messages
189        .iter()
190        .filter(|message| !message.is_transient())
191        .enumerate()
192    {
193        visit(BorrowedChronologicalEntry {
194            index,
195            payload: BorrowedChronologicalPayload::Message(
196                BorrowedChronologicalMessage::from_message(message),
197            ),
198        });
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::session_model::ConversationRecord;
206    use crate::{PartKind, PruneState, shared_parts};
207
208    fn text_message(id: &str, role: MessageRole, text: &str) -> Message {
209        Message {
210            id: id.to_string(),
211            role,
212            parts: shared_parts(vec![Part {
213                id: format!("{id}.p0"),
214                kind: PartKind::Text,
215                content: text.to_string(),
216                attachment: None,
217                tool_call_id: None,
218                tool_name: None,
219                tool_replay: None,
220                prune_state: PruneState::Intact,
221                reasoning_meta: None,
222                response_meta: None,
223            }]),
224            origin: None,
225        }
226    }
227
228    fn tool_result_message(id: &str, call_id: &str) -> Message {
229        let mut message = text_message(id, MessageRole::User, "tool result");
230        std::sync::Arc::make_mut(&mut message.parts)[0].tool_call_id = Some(call_id.to_string());
231        message
232    }
233
234    fn transient_message(id: &str) -> Message {
235        let mut message = text_message(id, MessageRole::System, "transient");
236        message.origin = Some(MessageOrigin::Plugin {
237            plugin_id: "test".to_string(),
238            transient: true,
239        });
240        message
241    }
242
243    fn protocol_event(value: &str) -> ProtocolEvent {
244        ProtocolEvent {
245            plugin_id: "test_protocol".to_string(),
246            payload: serde_json::json!({ "value": value }),
247        }
248    }
249
250    fn owned_summary(projection: &ChronologicalProjection) -> Vec<String> {
251        projection
252            .entries()
253            .iter()
254            .map(|entry| match &entry.payload {
255                ChronologicalPayload::Message(message) => {
256                    format!("{}:message:{}", entry.index, message.id)
257                }
258                ChronologicalPayload::ProtocolEvent(event) => {
259                    format!("{}:protocol:{}", entry.index, event.payload)
260                }
261            })
262            .collect()
263    }
264
265    fn borrowed_summary(events: &[SessionEventRecord], messages: &MessageSequence) -> Vec<String> {
266        let mut summary = Vec::new();
267        visit_turn_view(events, messages, |entry| {
268            summary.push(match entry.payload {
269                BorrowedChronologicalPayload::Message(message) => {
270                    format!("{}:message:{}", entry.index, message.id)
271                }
272                BorrowedChronologicalPayload::ProtocolEvent(event) => {
273                    format!("{}:protocol:{}", entry.index, event.payload)
274                }
275            });
276        });
277        summary
278    }
279
280    #[test]
281    fn borrowed_turn_view_matches_owned_active_event_projection() {
282        let m1 = text_message("m1", MessageRole::User, "first");
283        let m2 = text_message("m2", MessageRole::Assistant, "second");
284        let events = vec![
285            SessionEventRecord::Conversation(ConversationRecord::from_message(m1.clone())),
286            SessionEventRecord::Protocol(protocol_event("step")),
287            SessionEventRecord::Conversation(ConversationRecord::from_message(m1.clone())),
288        ];
289        let messages = MessageSequence::from_owned(vec![m1, m2]);
290        let projection = ChronologicalProjection::from_turn_view(&events, &messages);
291
292        assert_eq!(
293            borrowed_summary(&events, &messages),
294            owned_summary(&projection)
295        );
296    }
297
298    #[test]
299    fn borrowed_turn_view_matches_owned_transcript_fallback_projection() {
300        let messages = MessageSequence::from_owned(vec![
301            tool_result_message("m1", "call-1"),
302            transient_message("transient"),
303            text_message("m2", MessageRole::Assistant, "second"),
304        ]);
305        let events = Vec::new();
306        let projection = ChronologicalProjection::from_turn_view(&events, &messages);
307
308        assert_eq!(
309            borrowed_summary(&events, &messages),
310            owned_summary(&projection)
311        );
312    }
313}