1use std::collections::{HashMap, HashSet};
2use std::ops::Deref;
3use std::sync::{Arc, OnceLock};
4
5use chrono::Utc;
6use sha2::Digest;
7
8use crate::session_model::{ConversationRecord, ProtocolEvent, SessionEventRecord, ToolEvent};
9use crate::{BaseRenderCache, Message, MessageRole, PromptUsage, TokenUsage, ToolCallRecord};
10
11#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
12pub struct SessionGraphData {
13 #[serde(default)]
14 pub nodes: Vec<SessionNodeRecord>,
15 #[serde(default, skip_serializing_if = "Option::is_none")]
16 pub leaf_node_id: Option<String>,
17}
18
19#[derive(Debug)]
20pub struct SessionGraph {
21 inner: Arc<SessionGraphData>,
22 cache: Arc<OnceLock<SessionGraphCache>>,
23}
24
25impl Default for SessionGraph {
26 fn default() -> Self {
27 Self {
28 inner: Arc::new(SessionGraphData::default()),
29 cache: Arc::new(OnceLock::new()),
30 }
31 }
32}
33
34impl Clone for SessionGraph {
35 fn clone(&self) -> Self {
36 Self {
37 inner: Arc::clone(&self.inner),
38 cache: Arc::clone(&self.cache),
39 }
40 }
41}
42
43impl serde::Serialize for SessionGraph {
44 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
45 where
46 S: serde::Serializer,
47 {
48 self.inner.serialize(serializer)
49 }
50}
51
52impl<'de> serde::Deserialize<'de> for SessionGraph {
53 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
54 where
55 D: serde::Deserializer<'de>,
56 {
57 let inner = SessionGraphData::deserialize(deserializer)?;
58 Ok(Self {
59 inner: Arc::new(inner),
60 cache: Arc::new(OnceLock::new()),
61 })
62 }
63}
64
65impl Deref for SessionGraph {
66 type Target = SessionGraphData;
67
68 fn deref(&self) -> &Self::Target {
69 self.inner.as_ref()
70 }
71}
72
73#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
74pub struct SessionNodeRecord {
75 pub node_id: String,
76 #[serde(default, skip_serializing_if = "Option::is_none")]
77 pub parent_node_id: Option<String>,
78 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub caused_by: Option<crate::CausalRef>,
80 #[serde(default, skip_serializing_if = "Option::is_none")]
81 pub agent_frame_id: Option<crate::AgentFrameId>,
82 pub timestamp: String,
83 #[serde(flatten)]
84 pub payload: SessionNodePayload,
85}
86
87#[derive(Clone, Debug)]
88pub(crate) struct SessionNodeDraft {
89 payload: SessionNodeDraftPayload,
90 caused_by: Option<crate::CausalRef>,
91}
92
93#[derive(Clone, Debug)]
94enum SessionNodeDraftPayload {
95 Message(Message),
96 Plugin {
97 plugin_type: String,
98 body: serde_json::Value,
99 },
100 ProtocolEvent(ProtocolEvent),
101}
102
103impl SessionNodeDraft {
104 pub(crate) fn message(message: Message) -> Self {
105 Self {
106 payload: SessionNodeDraftPayload::Message(message),
107 caused_by: None,
108 }
109 }
110
111 pub(crate) fn plugin(plugin_type: impl Into<String>, body: serde_json::Value) -> Self {
112 Self {
113 payload: SessionNodeDraftPayload::Plugin {
114 plugin_type: plugin_type.into(),
115 body,
116 },
117 caused_by: None,
118 }
119 }
120
121 pub(crate) fn protocol_event(event: ProtocolEvent) -> Self {
122 Self {
123 payload: SessionNodeDraftPayload::ProtocolEvent(event),
124 caused_by: None,
125 }
126 }
127
128 pub(crate) fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
129 self.caused_by = caused_by;
130 self
131 }
132}
133
134#[derive(Clone, Debug, Default, PartialEq)]
135pub struct SharedJsonValue(pub Arc<serde_json::Value>);
136
137impl SharedJsonValue {
138 pub fn new(value: serde_json::Value) -> Self {
139 Self(Arc::new(value))
140 }
141
142 pub fn to_owned(&self) -> serde_json::Value {
143 self.0.as_ref().clone()
144 }
145}
146
147impl AsRef<serde_json::Value> for SharedJsonValue {
148 fn as_ref(&self) -> &serde_json::Value {
149 self.0.as_ref()
150 }
151}
152
153impl serde::Serialize for SharedJsonValue {
154 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
155 where
156 S: serde::Serializer,
157 {
158 self.0.serialize(serializer)
159 }
160}
161
162impl<'de> serde::Deserialize<'de> for SharedJsonValue {
163 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
164 where
165 D: serde::Deserializer<'de>,
166 {
167 let value = serde_json::Value::deserialize(deserializer)?;
168 Ok(Self::new(value))
169 }
170}
171
172#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
173#[serde(tag = "kind", rename_all = "snake_case")]
174#[allow(clippy::large_enum_variant)]
175pub enum SessionNodePayload {
176 Event {
177 event: SessionEventRecord,
178 },
179 Plugin {
180 plugin_type: String,
181 body: SharedJsonValue,
182 },
183}
184
185#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
186pub struct PersistedSessionConfig {
187 pub provider_id: String,
188 pub model: crate::ModelSpec,
189}
190
191#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
192pub struct PersistedTurnState {
193 pub turn_index: usize,
194 #[serde(default)]
195 pub token_usage: TokenUsage,
196 #[serde(default, skip_serializing_if = "Option::is_none")]
197 pub last_prompt_usage: Option<PromptUsage>,
198 #[serde(default)]
199 pub protocol_turn_options: crate::ProtocolTurnOptions,
200}
201
202#[derive(Clone, Debug)]
203pub struct SessionMessageTreeNode {
204 pub node_id: String,
205 pub parent_message_node_id: Option<String>,
206 pub message: Message,
207 pub timestamp: String,
208 pub children: Vec<SessionMessageTreeNode>,
209 pub active: bool,
210}
211
212#[derive(Clone)]
213enum ActiveReadItem<'a> {
214 Message(&'a Message),
215 ToolCall {
216 stable_key: String,
217 record: &'a ToolCallRecord,
218 },
219}
220
221#[derive(Debug)]
222pub(crate) struct ActiveReadReplacement {
223 pub(crate) leaf_node_id: Option<String>,
224 pub(crate) new_tail_nodes: Vec<SessionNodeRecord>,
225 pub(crate) active_events: Vec<SessionEventRecord>,
226 pub(crate) active_messages: Vec<Message>,
227 pub(crate) active_tool_calls: Vec<ToolCallRecord>,
228}
229
230#[derive(Clone, Debug)]
231pub(crate) struct SessionReadModel {
232 pub(crate) active_events: Arc<Vec<SessionEventRecord>>,
233 pub(crate) messages: Arc<Vec<Message>>,
234 pub(crate) tool_calls: Arc<Vec<ToolCallRecord>>,
235 pub(crate) prompt_render_cache: Arc<BaseRenderCache>,
236}
237
238#[derive(Clone, Debug)]
239pub(crate) struct SessionGraphAppendBuilder {
240 existing_ids: HashSet<String>,
241 leaf_node_id: Option<String>,
242 agent_frame_id: Option<crate::AgentFrameId>,
243}
244
245impl SessionGraphAppendBuilder {
246 pub(crate) fn with_agent_frame_id(
247 mut self,
248 agent_frame_id: impl Into<crate::AgentFrameId>,
249 ) -> Self {
250 self.agent_frame_id = Some(agent_frame_id.into());
251 self
252 }
253
254 pub(crate) fn agent_frame_id(&self) -> Option<&str> {
255 self.agent_frame_id.as_deref()
256 }
257
258 pub(crate) fn leaf_node_id(&self) -> Option<&String> {
259 self.leaf_node_id.as_ref()
260 }
261
262 pub(crate) fn set_leaf_node_id(&mut self, leaf_node_id: Option<String>) {
263 self.leaf_node_id = leaf_node_id;
264 }
265
266 pub(crate) fn register_existing_node_ids<'a>(
267 &mut self,
268 node_ids: impl IntoIterator<Item = &'a str>,
269 ) {
270 self.existing_ids
271 .extend(node_ids.into_iter().map(ToOwned::to_owned));
272 }
273
274 pub(crate) fn existing_node_ids(&self) -> &HashSet<String> {
275 &self.existing_ids
276 }
277
278 pub(crate) fn append_messages<I>(&mut self, messages: I) -> Vec<SessionNodeRecord>
279 where
280 I: IntoIterator<Item = Message>,
281 {
282 self.append_drafts(messages.into_iter().map(SessionNodeDraft::message))
283 }
284
285 pub(crate) fn append_tool_call_records<I>(&mut self, records: I) -> Vec<SessionNodeRecord>
286 where
287 I: IntoIterator<Item = ToolCallRecord>,
288 {
289 let mut nodes = Vec::new();
290 for record in records {
291 let stable_key = stable_tool_call_key(&record);
292 let node_id = unique_tool_node_id(&stable_key, &self.existing_ids);
293 self.existing_ids.insert(node_id.clone());
294 let parent_node_id = self.leaf_node_id.clone();
295 self.leaf_node_id = Some(node_id.clone());
296 nodes.push(SessionNodeRecord {
297 node_id,
298 parent_node_id,
299 caused_by: None,
300 agent_frame_id: self.agent_frame_id.clone(),
301 timestamp: Utc::now().to_rfc3339(),
302 payload: SessionNodePayload::Event {
303 event: SessionEventRecord::Tool(ToolEvent::Invocation { stable_key, record }),
304 },
305 });
306 }
307 nodes
308 }
309
310 pub(crate) fn append_protocol_events<I>(&mut self, events: I) -> Vec<SessionNodeRecord>
311 where
312 I: IntoIterator<Item = ProtocolEvent>,
313 {
314 self.append_drafts(events.into_iter().map(SessionNodeDraft::protocol_event))
315 }
316
317 pub(crate) fn append_drafts<I>(&mut self, drafts: I) -> Vec<SessionNodeRecord>
318 where
319 I: IntoIterator<Item = SessionNodeDraft>,
320 {
321 let mut nodes = Vec::new();
322 for draft in drafts {
323 let parent_node_id = self.leaf_node_id.clone();
324 let (node_id, caused_by, payload) = match draft.payload {
325 SessionNodeDraftPayload::Message(mut message) => {
326 if message.id.is_empty() {
327 message.id = fresh_node_id("m");
328 }
329 let node_id = unique_message_node_id(&message.id, &self.existing_ids);
330 let caused_by = draft
331 .caused_by
332 .or_else(|| causal_ref_from_message_origin(&message.origin));
333 (
334 node_id,
335 caused_by,
336 SessionNodePayload::Event {
337 event: SessionEventRecord::Conversation(
338 ConversationRecord::from_message(message),
339 ),
340 },
341 )
342 }
343 SessionNodeDraftPayload::Plugin { plugin_type, body } => {
344 let node_id = fresh_semantic_node_id("plugin", &self.existing_ids);
345 (
346 node_id,
347 draft.caused_by,
348 SessionNodePayload::Plugin {
349 plugin_type,
350 body: SharedJsonValue::new(body),
351 },
352 )
353 }
354 SessionNodeDraftPayload::ProtocolEvent(event) => {
355 let node_id = fresh_semantic_node_id("protocol", &self.existing_ids);
356 (
357 node_id,
358 draft.caused_by,
359 SessionNodePayload::Event {
360 event: SessionEventRecord::Protocol(event),
361 },
362 )
363 }
364 };
365 self.existing_ids.insert(node_id.clone());
366 self.leaf_node_id = Some(node_id.clone());
367 nodes.push(SessionNodeRecord {
368 node_id,
369 parent_node_id,
370 caused_by,
371 agent_frame_id: self.agent_frame_id.clone(),
372 timestamp: Utc::now().to_rfc3339(),
373 payload,
374 });
375 }
376 nodes
377 }
378}
379
380#[derive(Debug, Clone)]
381struct SessionGraphCache {
382 by_id: HashMap<String, usize>,
383 active_path_indices: Vec<usize>,
384 active_events: Arc<Vec<SessionEventRecord>>,
385 active_messages: Arc<Vec<Message>>,
386 active_message_ids: HashMap<String, usize>,
391 active_tool_calls: Arc<Vec<ToolCallRecord>>,
392 prompt_render_cache: Arc<BaseRenderCache>,
398}
399
400impl SessionGraphCache {
401 fn build(graph: &SessionGraph) -> Self {
402 let by_id = graph
403 .nodes
404 .iter()
405 .enumerate()
406 .map(|(idx, node)| (node.node_id.clone(), idx))
407 .collect::<HashMap<_, _>>();
408 let mut active_path_indices = Vec::new();
409 let mut current = graph
410 .leaf_node_id
411 .as_ref()
412 .and_then(|node_id| by_id.get(node_id).copied());
413 while let Some(idx) = current {
414 active_path_indices.push(idx);
415 current = graph.nodes[idx]
416 .parent_node_id
417 .as_ref()
418 .and_then(|node_id| by_id.get(node_id).copied());
419 }
420 active_path_indices.reverse();
421
422 let mut cache = Self {
423 by_id,
424 active_path_indices,
425 active_events: Arc::new(Vec::new()),
426 active_messages: Arc::new(Vec::new()),
427 active_message_ids: HashMap::new(),
428 active_tool_calls: Arc::new(Vec::new()),
429 prompt_render_cache: Arc::new(BaseRenderCache::new()),
430 };
431 cache.rebuild_read_model(graph);
432 cache
433 }
434
435 fn rebuild_read_model(&mut self, graph: &SessionGraph) {
436 let mut active_messages = Vec::with_capacity(self.active_path_indices.len());
437 let mut active_message_ids: HashMap<String, usize> =
438 HashMap::with_capacity(self.active_path_indices.len());
439 let mut active_tool_calls = Vec::with_capacity(self.active_path_indices.len());
440 let mut active_events = Vec::with_capacity(self.active_path_indices.len());
441 for idx in &self.active_path_indices {
442 let node = &graph.nodes[*idx];
443 if let Some(event) = node.event() {
444 active_events.push(event.clone());
445 }
446 if let Some(message) = node.message() {
447 if !message.is_transient() && !active_message_ids.contains_key(&message.id) {
448 active_message_ids.insert(message.id.clone(), active_messages.len());
449 active_messages.push(message);
450 }
451 continue;
452 }
453 if let Some(event) = node.event()
454 && let SessionEventRecord::Tool(ToolEvent::Invocation { record, .. }) = event
455 {
456 active_tool_calls.push(record.clone());
457 continue;
458 }
459 }
460 self.active_messages = Arc::new(active_messages);
461 self.active_message_ids = active_message_ids;
462 self.active_events = Arc::new(active_events);
463 self.active_tool_calls = Arc::new(active_tool_calls);
464 self.prompt_render_cache = Arc::new(BaseRenderCache::new());
465 }
466
467 fn read_model_for_agent_frame(
468 &self,
469 graph: &SessionGraph,
470 frame_id: &str,
471 include_unscoped: bool,
472 ) -> SessionReadModel {
473 let mut active_messages = Vec::with_capacity(self.active_path_indices.len());
474 let mut active_message_ids = HashSet::new();
475 let mut active_tool_calls = Vec::with_capacity(self.active_path_indices.len());
476 let mut active_events = Vec::with_capacity(self.active_path_indices.len());
477 for idx in &self.active_path_indices {
478 let node = &graph.nodes[*idx];
479 if !node_belongs_to_agent_frame(node, frame_id, include_unscoped) {
480 continue;
481 }
482 if let Some(event) = node.event() {
483 active_events.push(event.clone());
484 }
485 if let Some(message) = node.message() {
486 if !message.is_transient() && active_message_ids.insert(message.id.clone()) {
487 active_messages.push(message);
488 }
489 continue;
490 }
491 if let Some(event) = node.event()
492 && let SessionEventRecord::Tool(ToolEvent::Invocation { record, .. }) = event
493 {
494 active_tool_calls.push(record.clone());
495 }
496 }
497 SessionReadModel {
498 active_events: Arc::new(active_events),
499 messages: Arc::new(active_messages),
500 tool_calls: Arc::new(active_tool_calls),
501 prompt_render_cache: Arc::new(BaseRenderCache::new()),
502 }
503 }
504
505 fn append_node(
506 &mut self,
507 node_index: usize,
508 node: &SessionNodeRecord,
509 previous_leaf_node_id: Option<&str>,
510 ) {
511 self.by_id.insert(node.node_id.clone(), node_index);
512 let parent_matches_leaf = node.parent_node_id.as_deref() == previous_leaf_node_id;
513 if !parent_matches_leaf {
514 return;
515 }
516 self.active_path_indices.push(node_index);
517 if let Some(event) = node.event() {
518 Arc::make_mut(&mut self.active_events).push(event.clone());
519 }
520 if let Some(message) = node.message() {
521 if !message.is_transient() && !self.active_message_ids.contains_key(&message.id) {
522 let messages = Arc::make_mut(&mut self.active_messages);
523 self.active_message_ids
524 .insert(message.id.clone(), messages.len());
525 messages.push(message);
526 self.prompt_render_cache = Arc::new(BaseRenderCache::new());
527 }
528 return;
529 }
530 if let Some(event) = node.event()
531 && let SessionEventRecord::Tool(ToolEvent::Invocation { record, .. }) = event
532 {
533 Arc::make_mut(&mut self.active_tool_calls).push(record.clone());
534 }
535 }
536
537 fn reserve_append_capacity(
538 &mut self,
539 additional_nodes: usize,
540 additional_messages: usize,
541 additional_tool_calls: usize,
542 ) {
543 self.by_id.reserve(additional_nodes);
544 self.active_path_indices.reserve(additional_nodes);
545 if additional_messages > 0 {
546 Arc::make_mut(&mut self.active_messages).reserve(additional_messages);
547 }
548 if additional_tool_calls > 0 {
549 Arc::make_mut(&mut self.active_tool_calls).reserve(additional_tool_calls);
550 }
551 }
552}
553
554impl SessionNodeRecord {
555 pub fn event(&self) -> Option<&SessionEventRecord> {
556 match &self.payload {
557 SessionNodePayload::Event { event } => Some(event),
558 SessionNodePayload::Plugin { .. } => None,
559 }
560 }
561
562 pub fn message(&self) -> Option<Message> {
563 match self.event()? {
564 SessionEventRecord::Conversation(record) => Some(record.to_message()),
565 _ => None,
566 }
567 }
568
569 pub fn plugin(&self) -> Option<(&str, &serde_json::Value)> {
570 match &self.payload {
571 SessionNodePayload::Event { .. } => None,
572 SessionNodePayload::Plugin { plugin_type, body } => {
573 Some((plugin_type.as_str(), body.as_ref()))
574 }
575 }
576 }
577
578 pub fn plugin_body<T>(&self) -> Option<T>
579 where
580 T: for<'de> serde::Deserialize<'de>,
581 {
582 let (_, body) = self.plugin()?;
583 T::deserialize(body).ok()
584 }
585}
586
587impl SessionGraph {
588 pub fn append_active_read_delta(
589 &mut self,
590 messages: &[Message],
591 tool_calls: &[ToolCallRecord],
592 ) {
593 self.append_active_read_delta_scoped(None, messages, tool_calls);
594 }
595
596 pub fn append_active_read_delta_for_agent_frame(
597 &mut self,
598 agent_frame_id: &str,
599 messages: &[Message],
600 tool_calls: &[ToolCallRecord],
601 ) {
602 self.append_active_read_delta_scoped(Some(agent_frame_id), messages, tool_calls);
603 }
604
605 fn append_active_read_delta_scoped(
606 &mut self,
607 agent_frame_id: Option<&str>,
608 messages: &[Message],
609 tool_calls: &[ToolCallRecord],
610 ) {
611 let appendable_messages = {
612 let read_model = agent_frame_id
613 .map(|frame_id| self.read_model_for_agent_frame(frame_id, false))
614 .unwrap_or_else(|| self.read_model());
615 let mut seen_message_ids = read_model
616 .messages
617 .iter()
618 .map(|message| message.id.as_str())
619 .collect::<HashSet<_>>();
620 messages
621 .iter()
622 .filter(|message| {
623 !message.is_transient() && seen_message_ids.insert(message.id.as_str())
624 })
625 .cloned()
626 .collect::<Vec<_>>()
627 };
628 let read_model = agent_frame_id
629 .map(|frame_id| self.read_model_for_agent_frame(frame_id, false))
630 .unwrap_or_else(|| self.read_model());
631 let mut seen_tool_call_keys = read_model
632 .tool_calls
633 .iter()
634 .map(|record| tool_call_active_read_key(&stable_tool_call_key(record), record))
635 .collect::<HashSet<_>>();
636 let appendable_tool_calls = tool_calls
637 .iter()
638 .filter_map(|record| {
639 let stable_key = stable_tool_call_key(record);
640 let active_read_key = tool_call_active_read_key(&stable_key, record);
641 seen_tool_call_keys
642 .insert(active_read_key)
643 .then_some(record.clone())
644 })
645 .collect::<Vec<_>>();
646
647 self.reserve_append_capacity(
648 appendable_messages.len() + appendable_tool_calls.len(),
649 appendable_messages.len(),
650 appendable_tool_calls.len(),
651 );
652 self.append_message_batch_scoped(agent_frame_id, appendable_messages);
653 self.append_tool_call_records_scoped(agent_frame_id, appendable_tool_calls);
654 }
655
656 pub(crate) fn append_active_conversation_messages_for_agent_frame(
657 &mut self,
658 agent_frame_id: &str,
659 messages: &[Message],
660 ) {
661 self.append_active_conversation_messages_scoped(Some(agent_frame_id), messages);
662 }
663
664 fn append_active_conversation_messages_scoped(
665 &mut self,
666 agent_frame_id: Option<&str>,
667 messages: &[Message],
668 ) {
669 let appendable_messages = messages
670 .iter()
671 .filter(|message| !message.is_transient())
672 .cloned()
673 .collect::<Vec<_>>();
674 self.reserve_append_capacity(appendable_messages.len(), appendable_messages.len(), 0);
675 self.append_message_batch_scoped(agent_frame_id, appendable_messages);
676 }
677
678 pub fn from_nodes(nodes: Vec<SessionNodeRecord>, leaf_node_id: Option<String>) -> Self {
679 Self {
680 inner: Arc::new(SessionGraphData {
681 nodes,
682 leaf_node_id,
683 }),
684 cache: Arc::new(OnceLock::new()),
685 }
686 }
687
688 pub(crate) fn append_builder(&self) -> SessionGraphAppendBuilder {
689 SessionGraphAppendBuilder {
690 existing_ids: self.nodes.iter().map(|node| node.node_id.clone()).collect(),
691 leaf_node_id: self.leaf_node_id.clone(),
692 agent_frame_id: None,
693 }
694 }
695
696 fn invalidate_cache(&mut self) {
697 self.cache = Arc::new(OnceLock::new());
698 }
699
700 fn data_mut(&mut self) -> &mut SessionGraphData {
701 self.invalidate_cache();
702 Arc::make_mut(&mut self.inner)
703 }
704
705 fn reserve_append_capacity(
706 &mut self,
707 additional_nodes: usize,
708 additional_messages: usize,
709 additional_tool_calls: usize,
710 ) {
711 if additional_nodes == 0 {
712 return;
713 }
714 self.detach_initialized_cache_for_append();
715 Arc::make_mut(&mut self.inner)
716 .nodes
717 .reserve(additional_nodes);
718 if let Some(cache_lock) = Arc::get_mut(&mut self.cache)
719 && let Some(cache) = cache_lock.get_mut()
720 {
721 cache.reserve_append_capacity(
722 additional_nodes,
723 additional_messages,
724 additional_tool_calls,
725 );
726 }
727 }
728
729 fn detach_initialized_cache_for_append(&mut self) {
730 if Arc::get_mut(&mut self.cache).is_some() {
731 return;
732 }
733 let Some(cache) = self.cache.get().cloned() else {
734 self.invalidate_cache();
735 return;
736 };
737 let lock = OnceLock::new();
738 let _ = lock.set(cache);
739 self.cache = Arc::new(lock);
740 }
741
742 fn cache(&self) -> &SessionGraphCache {
743 self.cache.get_or_init(|| SessionGraphCache::build(self))
744 }
745
746 fn append_message_batch_scoped(
747 &mut self,
748 agent_frame_id: Option<&str>,
749 messages: Vec<Message>,
750 ) {
751 if messages.is_empty() {
752 return;
753 }
754 self.append_node_drafts_scoped(
755 agent_frame_id,
756 messages.into_iter().map(SessionNodeDraft::message),
757 );
758 }
759
760 fn append_prebuilt_nodes(&mut self, nodes: Vec<SessionNodeRecord>) {
761 if nodes.is_empty() {
762 return;
763 }
764
765 self.detach_initialized_cache_for_append();
766 if let Some(cache_lock) = Arc::get_mut(&mut self.cache)
767 && let Some(cache) = cache_lock.get_mut()
768 {
769 let data = Arc::make_mut(&mut self.inner);
770 for node in nodes {
771 let previous_leaf = data.leaf_node_id.clone();
772 let node_id = node.node_id.clone();
773 data.nodes.push(node);
774 cache.append_node(
775 data.nodes.len() - 1,
776 data.nodes.last().expect("just appended graph node"),
777 previous_leaf.as_deref(),
778 );
779 data.leaf_node_id = Some(node_id);
780 }
781 return;
782 }
783
784 let data = self.data_mut();
785 for node in nodes {
786 data.leaf_node_id = Some(node.node_id.clone());
787 data.nodes.push(node);
788 }
789 }
790
791 pub fn append_message(&mut self, message: Message) -> String {
792 self.append_node_draft(SessionNodeDraft::message(message))
793 }
794
795 pub fn append_plugin(
796 &mut self,
797 plugin_type: impl Into<String>,
798 body: serde_json::Value,
799 ) -> String {
800 self.append_node_draft(SessionNodeDraft::plugin(plugin_type, body))
801 }
802
803 pub fn active_path_nodes(&self) -> Vec<&SessionNodeRecord> {
804 self.cache()
805 .active_path_indices
806 .iter()
807 .map(|idx| &self.nodes[*idx])
808 .collect()
809 }
810
811 pub(crate) fn read_model(&self) -> SessionReadModel {
812 let cache = self.cache();
813 SessionReadModel {
814 active_events: Arc::clone(&cache.active_events),
815 messages: Arc::clone(&cache.active_messages),
816 tool_calls: Arc::clone(&cache.active_tool_calls),
817 prompt_render_cache: Arc::clone(&cache.prompt_render_cache),
818 }
819 }
820
821 pub(crate) fn read_model_for_agent_frame(
822 &self,
823 frame_id: &str,
824 include_unscoped: bool,
825 ) -> SessionReadModel {
826 if frame_id.is_empty() {
827 return self.read_model();
828 }
829 self.cache()
830 .read_model_for_agent_frame(self, frame_id, include_unscoped)
831 }
832
833 pub fn replace_active_tool_calls(&mut self, tool_calls: &[ToolCallRecord]) {
834 let messages = Arc::clone(&self.cache().active_messages);
835 self.replace_active_read_state(messages.as_slice(), tool_calls);
836 }
837
838 pub fn append_protocol_event(&mut self, event: ProtocolEvent) -> String {
839 self.append_node_draft(SessionNodeDraft::protocol_event(event))
840 }
841
842 pub(crate) fn append_node_draft(&mut self, draft: SessionNodeDraft) -> String {
843 self.append_node_drafts([draft])
844 .into_iter()
845 .next()
846 .expect("single draft append must create one node")
847 }
848
849 pub(crate) fn append_node_drafts<I>(&mut self, drafts: I) -> Vec<String>
850 where
851 I: IntoIterator<Item = SessionNodeDraft>,
852 {
853 self.append_node_drafts_scoped(None, drafts)
854 }
855
856 pub(crate) fn append_node_drafts_for_agent_frame<I>(
857 &mut self,
858 agent_frame_id: &str,
859 drafts: I,
860 ) -> Vec<String>
861 where
862 I: IntoIterator<Item = SessionNodeDraft>,
863 {
864 self.append_node_drafts_scoped(Some(agent_frame_id), drafts)
865 }
866
867 fn append_node_drafts_scoped<I>(
868 &mut self,
869 agent_frame_id: Option<&str>,
870 drafts: I,
871 ) -> Vec<String>
872 where
873 I: IntoIterator<Item = SessionNodeDraft>,
874 {
875 let mut builder = self.append_builder();
876 if let Some(agent_frame_id) = agent_frame_id {
877 builder = builder.with_agent_frame_id(agent_frame_id.to_string());
878 }
879 let nodes = builder.append_drafts(drafts);
880 let node_ids = nodes
881 .iter()
882 .map(|node| node.node_id.clone())
883 .collect::<Vec<_>>();
884 self.append_prebuilt_nodes(nodes);
885 node_ids
886 }
887
888 fn append_tool_call_records_scoped<I>(
889 &mut self,
890 agent_frame_id: Option<&str>,
891 records: I,
892 ) -> Vec<String>
893 where
894 I: IntoIterator<Item = ToolCallRecord>,
895 {
896 let mut builder = self.append_builder();
897 if let Some(agent_frame_id) = agent_frame_id {
898 builder = builder.with_agent_frame_id(agent_frame_id.to_string());
899 }
900 let nodes = builder.append_tool_call_records(records);
901 let node_ids = nodes
902 .iter()
903 .map(|node| node.node_id.clone())
904 .collect::<Vec<_>>();
905 self.append_prebuilt_nodes(nodes);
906 node_ids
907 }
908
909 pub fn user_message_count(&self) -> usize {
910 self.nodes
911 .iter()
912 .filter_map(SessionNodeRecord::message)
913 .filter(|message| matches!(message.role, MessageRole::User))
914 .count()
915 }
916
917 pub fn first_user_message(&self) -> String {
918 self.nodes
919 .iter()
920 .filter_map(SessionNodeRecord::message)
921 .find(|message| matches!(message.role, MessageRole::User))
922 .map(|message| first_message_search_text(&message))
923 .unwrap_or_default()
924 }
925
926 pub fn branch_to(&mut self, node_id: Option<String>) {
927 self.data_mut().leaf_node_id = node_id;
928 }
929
930 pub fn set_leaf_node_id(&mut self, node_id: Option<String>) {
931 self.data_mut().leaf_node_id = node_id;
932 }
933
934 pub fn push_node_record(&mut self, node: SessionNodeRecord) {
935 self.data_mut().nodes.push(node);
936 }
937
938 pub fn extend_node_records<I>(&mut self, nodes: I)
939 where
940 I: IntoIterator<Item = SessionNodeRecord>,
941 {
942 self.data_mut().nodes.extend(nodes);
943 }
944
945 pub fn extend_active_path(&mut self, nodes: Vec<SessionNodeRecord>) {
953 self.append_prebuilt_nodes(nodes);
954 }
955
956 pub fn active_path_contains(&self, node_id: &str) -> bool {
957 self.active_path_nodes()
958 .into_iter()
959 .any(|node| node.node_id == node_id)
960 }
961
962 pub fn heal_orphaned_leaf(&mut self) -> bool {
970 if let Some(leaf) = self.leaf_node_id.as_ref()
971 && self.find_node(leaf).is_none()
972 {
973 let fallback = self
974 .nodes
975 .iter()
976 .rev()
977 .find(|node| node.message().is_some())
978 .map(|node| node.node_id.clone());
979 self.data_mut().leaf_node_id = fallback;
980 return true;
981 }
982 false
983 }
984
985 pub fn fork_current_path(&self) -> SessionGraph {
986 let path = self.active_path_nodes();
987 SessionGraph::from_nodes(
988 path.into_iter().cloned().collect(),
989 self.leaf_node_id.clone(),
990 )
991 }
992
993 pub fn find_node(&self, node_id: &str) -> Option<&SessionNodeRecord> {
994 self.cache()
995 .by_id
996 .get(node_id)
997 .and_then(|idx| self.nodes.get(*idx))
998 }
999
1000 pub fn node_index(&self, node_id: &str) -> Option<usize> {
1001 self.cache().by_id.get(node_id).copied()
1002 }
1003
1004 pub fn replace_active_read_state(
1005 &mut self,
1006 messages: &[Message],
1007 tool_calls: &[ToolCallRecord],
1008 ) {
1009 self.replace_active_read_state_scoped(None, messages, tool_calls);
1010 }
1011
1012 pub fn replace_active_read_state_for_agent_frame(
1013 &mut self,
1014 agent_frame_id: &str,
1015 messages: &[Message],
1016 tool_calls: &[ToolCallRecord],
1017 ) {
1018 self.replace_active_read_state_scoped(Some(agent_frame_id), messages, tool_calls);
1019 }
1020
1021 fn replace_active_read_state_scoped(
1022 &mut self,
1023 agent_frame_id: Option<&str>,
1024 messages: &[Message],
1025 tool_calls: &[ToolCallRecord],
1026 ) {
1027 let current_nodes = self.active_path_nodes();
1028 let existing_ids = self
1029 .nodes
1030 .iter()
1031 .map(|node| node.node_id.clone())
1032 .collect::<HashSet<_>>();
1033 let replacement = build_active_read_replacement(
1034 current_nodes,
1035 &existing_ids,
1036 agent_frame_id,
1037 messages,
1038 tool_calls,
1039 );
1040 let data = self.data_mut();
1041 data.leaf_node_id = replacement.leaf_node_id;
1042 data.nodes.extend(replacement.new_tail_nodes);
1043 }
1044
1045 pub fn from_active_read_state(messages: &[Message], tool_calls: &[ToolCallRecord]) -> Self {
1046 let mut graph = Self::default();
1047 graph.replace_active_read_state(messages, tool_calls);
1048 graph
1049 }
1050
1051 pub fn message_tree(&self) -> Vec<SessionMessageTreeNode> {
1052 let active_message_ids = self
1053 .active_path_nodes()
1054 .into_iter()
1055 .filter_map(|node| node.message().map(|message| message.id.clone()))
1056 .collect::<HashSet<_>>();
1057
1058 let message_nodes = self
1059 .nodes
1060 .iter()
1061 .filter_map(|node| {
1062 let message = node.message()?.clone();
1063 let parent_message_node_id =
1064 self.nearest_message_ancestor(node.parent_node_id.as_deref());
1065 Some(SessionMessageTreeNode {
1066 node_id: node.node_id.clone(),
1067 parent_message_node_id,
1068 message,
1069 timestamp: node.timestamp.clone(),
1070 children: Vec::new(),
1071 active: active_message_ids.contains(&node.node_id),
1072 })
1073 })
1074 .collect::<Vec<_>>();
1075
1076 build_tree(message_nodes)
1077 }
1078
1079 fn nearest_message_ancestor(&self, node_id: Option<&str>) -> Option<String> {
1080 let by_id = self
1081 .nodes
1082 .iter()
1083 .map(|node| (node.node_id.as_str(), node))
1084 .collect::<HashMap<_, _>>();
1085 let mut current = node_id.and_then(|id| by_id.get(id).copied());
1086 while let Some(node) = current {
1087 if node.message().is_some() {
1088 return Some(node.node_id.clone());
1089 }
1090 current = node
1091 .parent_node_id
1092 .as_deref()
1093 .and_then(|parent| by_id.get(parent).copied());
1094 }
1095 None
1096 }
1097}
1098
1099fn build_tree(mut nodes: Vec<SessionMessageTreeNode>) -> Vec<SessionMessageTreeNode> {
1100 let mut children_by_parent = HashMap::<Option<String>, Vec<SessionMessageTreeNode>>::new();
1101 for node in nodes.drain(..) {
1102 children_by_parent
1103 .entry(node.parent_message_node_id.clone())
1104 .or_default()
1105 .push(node);
1106 }
1107 let mut roots = build_tree_children(None, &mut children_by_parent);
1108 sort_tree(&mut roots);
1109 roots
1110}
1111
1112fn sort_tree(nodes: &mut [SessionMessageTreeNode]) {
1113 nodes.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
1114 for node in nodes {
1115 sort_tree(&mut node.children);
1116 }
1117}
1118
1119fn build_tree_children(
1120 parent_id: Option<String>,
1121 children_by_parent: &mut HashMap<Option<String>, Vec<SessionMessageTreeNode>>,
1122) -> Vec<SessionMessageTreeNode> {
1123 let mut children = children_by_parent.remove(&parent_id).unwrap_or_default();
1124 for child in &mut children {
1125 child.children = build_tree_children(Some(child.node_id.clone()), children_by_parent);
1126 }
1127 children
1128}
1129
1130fn node_belongs_to_agent_frame(
1131 node: &SessionNodeRecord,
1132 frame_id: &str,
1133 include_unscoped: bool,
1134) -> bool {
1135 match node.agent_frame_id.as_deref() {
1136 Some(node_frame_id) => node_frame_id == frame_id,
1137 None => include_unscoped,
1138 }
1139}
1140
1141fn build_active_read_items<'a>(
1142 messages: &'a [Message],
1143 tool_calls: &'a [ToolCallRecord],
1144) -> Vec<ActiveReadItem<'a>> {
1145 let mut first_message_for_call = HashMap::<String, usize>::new();
1146 let active_messages = messages
1147 .iter()
1148 .filter(|message| !message.is_transient())
1149 .collect::<Vec<_>>();
1150 for (idx, message) in active_messages.iter().enumerate() {
1151 for part in message.parts.iter() {
1152 if let Some(call_id) = &part.tool_call_id {
1153 first_message_for_call.entry(call_id.clone()).or_insert(idx);
1154 }
1155 }
1156 }
1157
1158 let mut anchored = HashMap::<usize, Vec<ActiveReadItem<'a>>>::new();
1159 for record in tool_calls {
1160 let stable_key = stable_tool_call_key(record);
1161 let anchor = record
1162 .call_id
1163 .as_ref()
1164 .and_then(|call_id| first_message_for_call.get(call_id).copied())
1165 .unwrap_or_else(|| active_messages.len().saturating_sub(1));
1166 anchored
1167 .entry(anchor)
1168 .or_default()
1169 .push(ActiveReadItem::ToolCall { stable_key, record });
1170 }
1171
1172 let mut out = Vec::new();
1173 for (idx, message) in active_messages.iter().enumerate() {
1174 out.push(ActiveReadItem::Message(message));
1175 if let Some(items) = anchored.remove(&idx) {
1176 out.extend(items);
1177 }
1178 }
1179 out
1180}
1181
1182pub(crate) fn build_active_read_replacement<'a>(
1183 current_nodes: impl IntoIterator<Item = &'a SessionNodeRecord>,
1184 existing_node_ids: &HashSet<String>,
1185 agent_frame_id: Option<&str>,
1186 messages: &[Message],
1187 tool_calls: &[ToolCallRecord],
1188) -> ActiveReadReplacement {
1189 let target = build_active_read_items(messages, tool_calls);
1190
1191 let mut active_events = Vec::new();
1192 let mut active_messages = Vec::new();
1193 let mut active_message_ids = HashSet::new();
1194 let mut active_tool_calls = Vec::new();
1195 let mut seen_active_read_keys = HashSet::new();
1196 let mut target_idx = 0usize;
1197 let mut leaf_node_id = None;
1198 for node in current_nodes {
1199 if node
1200 .message()
1201 .map(|message| message.is_transient())
1202 .unwrap_or(false)
1203 {
1204 continue;
1205 }
1206 if let Some(key) = recognized_active_read_key(node) {
1207 if !seen_active_read_keys.insert(key.clone()) {
1208 continue;
1209 }
1210 let Some(target_item) = target.get(target_idx) else {
1211 break;
1212 };
1213 if key != active_read_item_key(target_item) {
1214 break;
1215 }
1216 push_active_read_node(
1217 node,
1218 &mut active_events,
1219 &mut active_messages,
1220 &mut active_message_ids,
1221 &mut active_tool_calls,
1222 );
1223 leaf_node_id = Some(node.node_id.clone());
1224 target_idx += 1;
1225 } else {
1226 push_active_read_node(
1227 node,
1228 &mut active_events,
1229 &mut active_messages,
1230 &mut active_message_ids,
1231 &mut active_tool_calls,
1232 );
1233 leaf_node_id = Some(node.node_id.clone());
1234 }
1235 }
1236
1237 let mut new_node_ids = HashSet::new();
1238 let mut new_tail_nodes = Vec::new();
1239
1240 for item in target.into_iter().skip(target_idx) {
1241 let parent_node_id = leaf_node_id.clone();
1242 let node = match item {
1243 ActiveReadItem::Message(message) => {
1244 let node_id = unique_message_node_id_for_replacement(
1245 &message.id,
1246 existing_node_ids,
1247 &new_node_ids,
1248 );
1249 SessionNodeRecord {
1250 node_id,
1251 parent_node_id,
1252 caused_by: causal_ref_from_message_origin(&message.origin),
1253 agent_frame_id: agent_frame_id.map(ToOwned::to_owned),
1254 timestamp: Utc::now().to_rfc3339(),
1255 payload: SessionNodePayload::Event {
1256 event: SessionEventRecord::Conversation(ConversationRecord::from_message(
1257 message.clone(),
1258 )),
1259 },
1260 }
1261 }
1262 ActiveReadItem::ToolCall { stable_key, record } => {
1263 let node_id = unique_tool_node_id_for_replacement(
1264 &stable_key,
1265 existing_node_ids,
1266 &new_node_ids,
1267 );
1268 SessionNodeRecord {
1269 node_id,
1270 parent_node_id,
1271 caused_by: None,
1272 agent_frame_id: agent_frame_id.map(ToOwned::to_owned),
1273 timestamp: Utc::now().to_rfc3339(),
1274 payload: SessionNodePayload::Event {
1275 event: SessionEventRecord::Tool(ToolEvent::Invocation {
1276 stable_key,
1277 record: record.clone(),
1278 }),
1279 },
1280 }
1281 }
1282 };
1283 new_node_ids.insert(node.node_id.clone());
1284 leaf_node_id = Some(node.node_id.clone());
1285 push_active_read_node(
1286 &node,
1287 &mut active_events,
1288 &mut active_messages,
1289 &mut active_message_ids,
1290 &mut active_tool_calls,
1291 );
1292 new_tail_nodes.push(node);
1293 }
1294
1295 ActiveReadReplacement {
1296 leaf_node_id,
1297 new_tail_nodes,
1298 active_events,
1299 active_messages,
1300 active_tool_calls,
1301 }
1302}
1303
1304fn push_active_read_node(
1305 node: &SessionNodeRecord,
1306 active_events: &mut Vec<SessionEventRecord>,
1307 active_messages: &mut Vec<Message>,
1308 active_message_ids: &mut HashSet<String>,
1309 active_tool_calls: &mut Vec<ToolCallRecord>,
1310) {
1311 if let Some(event) = node.event() {
1312 active_events.push(event.clone());
1313 }
1314 if let Some(message) = node.message() {
1315 if !message.is_transient() && active_message_ids.insert(message.id.clone()) {
1316 active_messages.push(message);
1317 }
1318 return;
1319 }
1320 if let Some(SessionEventRecord::Tool(ToolEvent::Invocation { record, .. })) = node.event() {
1321 active_tool_calls.push(record.clone());
1322 }
1323}
1324
1325fn recognized_active_read_key(node: &SessionNodeRecord) -> Option<String> {
1326 match &node.payload {
1327 SessionNodePayload::Event { event } => match event {
1328 SessionEventRecord::Conversation(record) => Some(format!("message:{}", record.id)),
1329 SessionEventRecord::Tool(ToolEvent::Invocation { stable_key, record }) => {
1330 Some(tool_call_active_read_key(stable_key, record))
1331 }
1332 _ => None,
1333 },
1334 SessionNodePayload::Plugin { .. } => None,
1335 }
1336}
1337
1338fn active_read_item_key(item: &ActiveReadItem<'_>) -> String {
1339 match item {
1340 ActiveReadItem::Message(message) => format!("message:{}", message.id),
1341 ActiveReadItem::ToolCall { stable_key, record } => {
1342 tool_call_active_read_key(stable_key, record)
1343 }
1344 }
1345}
1346
1347fn tool_call_active_read_key(stable_key: &str, record: &ToolCallRecord) -> String {
1348 let fingerprint = serde_json::to_string(record).unwrap_or_default();
1349 format!("tool_call:{stable_key}:{fingerprint}")
1350}
1351
1352pub(crate) fn tool_call_record_active_read_key(record: &ToolCallRecord) -> String {
1353 let stable_key = stable_tool_call_key(record);
1354 tool_call_active_read_key(&stable_key, record)
1355}
1356
1357fn causal_ref_from_message_origin(
1358 origin: &Option<crate::MessageOrigin>,
1359) -> Option<crate::CausalRef> {
1360 let Some(crate::MessageOrigin::Process {
1361 process_id,
1362 sequence,
1363 ..
1364 }) = origin
1365 else {
1366 return None;
1367 };
1368 Some(crate::CausalRef::ProcessEvent {
1369 process_id: process_id.clone(),
1370 sequence: *sequence,
1371 })
1372}
1373
1374fn unique_tool_node_id(stable_key: &str, existing_ids: &HashSet<String>) -> String {
1375 let base = format!("tool:{}", stable_key);
1376 if !existing_ids.contains(&base) {
1377 return base;
1378 }
1379 loop {
1380 let candidate = format!("tool:{}:{}", stable_key, uuid::Uuid::new_v4());
1381 if !existing_ids.contains(&candidate) {
1382 return candidate;
1383 }
1384 }
1385}
1386
1387fn unique_tool_node_id_for_replacement(
1388 stable_key: &str,
1389 existing_ids: &HashSet<String>,
1390 new_ids: &HashSet<String>,
1391) -> String {
1392 let base = format!("tool:{}", stable_key);
1393 if !existing_ids.contains(&base) && !new_ids.contains(&base) {
1394 return base;
1395 }
1396 loop {
1397 let candidate = format!("tool:{}:{}", stable_key, uuid::Uuid::new_v4());
1398 if !existing_ids.contains(&candidate) && !new_ids.contains(&candidate) {
1399 return candidate;
1400 }
1401 }
1402}
1403
1404fn fresh_semantic_node_id(prefix: &str, existing_ids: &HashSet<String>) -> String {
1405 loop {
1406 let candidate = format!("{prefix}:{}", uuid::Uuid::new_v4().simple());
1407 if !existing_ids.contains(&candidate) {
1408 return candidate;
1409 }
1410 }
1411}
1412
1413fn unique_message_node_id(message_id: &str, existing_ids: &HashSet<String>) -> String {
1414 if !existing_ids.contains(message_id) {
1415 return message_id.to_string();
1416 }
1417 let base = format!("message:{message_id}");
1418 if !existing_ids.contains(&base) {
1419 return base;
1420 }
1421 for suffix in 2.. {
1422 let candidate = format!("{base}:{suffix}");
1423 if !existing_ids.contains(&candidate) {
1424 return candidate;
1425 }
1426 }
1427 unreachable!("message node id space exhausted")
1428}
1429
1430fn unique_message_node_id_for_replacement(
1431 message_id: &str,
1432 existing_ids: &HashSet<String>,
1433 new_ids: &HashSet<String>,
1434) -> String {
1435 if !existing_ids.contains(message_id) && !new_ids.contains(message_id) {
1436 return message_id.to_string();
1437 }
1438 let base = format!("message:{message_id}");
1439 if !existing_ids.contains(&base) && !new_ids.contains(&base) {
1440 return base;
1441 }
1442 for suffix in 2.. {
1443 let candidate = format!("{base}:{suffix}");
1444 if !existing_ids.contains(&candidate) && !new_ids.contains(&candidate) {
1445 return candidate;
1446 }
1447 }
1448 unreachable!("message node id space exhausted")
1449}
1450
1451fn fresh_node_id(prefix: &str) -> String {
1452 format!("{prefix}{}", uuid::Uuid::new_v4().simple())
1453}
1454
1455fn stable_tool_call_key(record: &ToolCallRecord) -> String {
1456 if let Some(call_id) = record
1457 .call_id
1458 .as_ref()
1459 .filter(|call_id| !call_id.is_empty())
1460 {
1461 return call_id.clone();
1462 }
1463 let raw = serde_json::to_vec(&(record.tool.clone(), &record.args, &record.output))
1464 .unwrap_or_else(|_| b"tool-call".to_vec());
1465 let digest = sha2::Sha256::digest(raw);
1466 format!("anon-{}", &format!("{digest:x}")[..12])
1467}
1468
1469fn first_message_search_text(message: &Message) -> String {
1470 message
1471 .parts
1472 .iter()
1473 .filter_map(|part| match part.kind {
1474 crate::PartKind::ToolCall | crate::PartKind::ToolResult => None,
1475 crate::PartKind::Image => Some("[Image attached]".to_string()),
1476 _ => (!part.content.trim().is_empty()).then(|| part.content.clone()),
1477 })
1478 .collect::<Vec<_>>()
1479 .join("\n\n")
1480 .trim()
1481 .to_string()
1482}
1483
1484#[cfg(test)]
1485mod tests {
1486 use super::*;
1487 use crate::{Part, PartKind, PruneState, ToolCallOutput, shared_parts};
1488
1489 fn text_message(id: &str, role: MessageRole, content: &str) -> Message {
1490 Message {
1491 id: id.to_string(),
1492 role,
1493 parts: shared_parts(vec![Part {
1494 id: format!("{id}.p0"),
1495 kind: PartKind::Text,
1496 content: content.to_string(),
1497 attachment: None,
1498 tool_call_id: None,
1499 tool_name: None,
1500 tool_replay: None,
1501 prune_state: PruneState::Intact,
1502 reasoning_meta: None,
1503 response_meta: None,
1504 }]),
1505 origin: None,
1506 }
1507 }
1508
1509 fn tool_record(call_id: &str) -> ToolCallRecord {
1510 ToolCallRecord {
1511 call_id: Some(call_id.to_string()),
1512 tool: "lookup".to_string(),
1513 args: serde_json::json!({"q": "x"}),
1514 output: ToolCallOutput::success(serde_json::json!({"answer": "y"})),
1515 duration_ms: 3,
1516 }
1517 }
1518
1519 fn protocol_event() -> ProtocolEvent {
1520 ProtocolEvent::typed("test_protocol", serde_json::json!({"step": "started"}))
1521 .expect("protocol event serializes")
1522 }
1523
1524 #[test]
1525 fn typed_append_node_ids_use_semantic_prefixes() {
1526 let mut graph = SessionGraph::default();
1527
1528 let message_id = graph.append_message(text_message("m1", MessageRole::User, "hello"));
1529 graph.append_active_read_delta(&[], &[tool_record("call-1")]);
1530 let protocol_id = graph.append_protocol_event(protocol_event());
1531 let plugin_id = graph.append_plugin("example", serde_json::json!({"ok": true}));
1532
1533 assert_eq!(message_id, "m1");
1534 assert!(protocol_id.starts_with("protocol:"));
1535 assert!(plugin_id.starts_with("plugin:"));
1536
1537 let tool_node = graph
1538 .nodes
1539 .iter()
1540 .find(|node| matches!(node.event(), Some(SessionEventRecord::Tool(_))))
1541 .expect("tool node");
1542 assert_eq!(tool_node.node_id, "tool:call-1");
1543 }
1544
1545 #[test]
1546 fn active_read_replacement_uses_tool_prefix() {
1547 let record = tool_record("call-replace");
1548 let message = text_message("m1", MessageRole::User, "hello");
1549 let graph = SessionGraph::from_active_read_state(&[message], std::slice::from_ref(&record));
1550
1551 let tool_node = graph
1552 .nodes
1553 .iter()
1554 .find(|node| matches!(node.event(), Some(SessionEventRecord::Tool(_))))
1555 .expect("tool node");
1556 assert_eq!(tool_node.node_id, "tool:call-replace");
1557 }
1558
1559 #[test]
1560 fn graph_writers_do_not_put_active_read_events_under_plugin_ids() {
1561 let mut graph = SessionGraph::default();
1562 graph.append_message(text_message("m1", MessageRole::User, "hello"));
1563 graph.append_active_read_delta(&[], &[tool_record("call-1")]);
1564 graph.append_protocol_event(protocol_event());
1565 graph.append_plugin("example", serde_json::json!({"ok": true}));
1566
1567 for node in &graph.nodes {
1568 match node.event() {
1569 Some(SessionEventRecord::Conversation(_) | SessionEventRecord::Tool(_)) => {
1570 assert!(!node.node_id.starts_with("plugin:"), "{:?}", node);
1571 }
1572 Some(SessionEventRecord::Protocol(_)) => {
1573 assert!(node.node_id.starts_with("protocol:"), "{:?}", node);
1574 }
1575 None => {
1576 assert!(node.node_id.starts_with("plugin:"), "{:?}", node);
1577 }
1578 }
1579 }
1580 }
1581}