Skip to main content

lash_core/
session_graph.rs

1use std::collections::{HashMap, HashSet};
2use std::ops::Deref;
3use std::sync::{Arc, OnceLock};
4
5use chrono::Utc;
6use sha2::Digest;
7
8use crate::session_model::{ConversationRecord, ProtocolEvent, SessionEventRecord, ToolEvent};
9use crate::{BaseRenderCache, Message, MessageRole, PromptUsage, TokenUsage, ToolCallRecord};
10
11#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
12pub struct SessionGraphData {
13    #[serde(default)]
14    pub nodes: Vec<SessionNodeRecord>,
15    #[serde(default, skip_serializing_if = "Option::is_none")]
16    pub leaf_node_id: Option<String>,
17}
18
19#[derive(Debug)]
20pub struct SessionGraph {
21    inner: Arc<SessionGraphData>,
22    cache: Arc<OnceLock<SessionGraphCache>>,
23}
24
25impl Default for SessionGraph {
26    fn default() -> Self {
27        Self {
28            inner: Arc::new(SessionGraphData::default()),
29            cache: Arc::new(OnceLock::new()),
30        }
31    }
32}
33
34impl Clone for SessionGraph {
35    fn clone(&self) -> Self {
36        Self {
37            inner: Arc::clone(&self.inner),
38            cache: Arc::clone(&self.cache),
39        }
40    }
41}
42
43impl serde::Serialize for SessionGraph {
44    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
45    where
46        S: serde::Serializer,
47    {
48        self.inner.serialize(serializer)
49    }
50}
51
52impl<'de> serde::Deserialize<'de> for SessionGraph {
53    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
54    where
55        D: serde::Deserializer<'de>,
56    {
57        let inner = SessionGraphData::deserialize(deserializer)?;
58        Ok(Self {
59            inner: Arc::new(inner),
60            cache: Arc::new(OnceLock::new()),
61        })
62    }
63}
64
65impl Deref for SessionGraph {
66    type Target = SessionGraphData;
67
68    fn deref(&self) -> &Self::Target {
69        self.inner.as_ref()
70    }
71}
72
73#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
74pub struct SessionNodeRecord {
75    pub node_id: String,
76    #[serde(default, skip_serializing_if = "Option::is_none")]
77    pub parent_node_id: Option<String>,
78    #[serde(default, skip_serializing_if = "Option::is_none")]
79    pub caused_by: Option<crate::CausalRef>,
80    #[serde(default, skip_serializing_if = "Option::is_none")]
81    pub agent_frame_id: Option<crate::AgentFrameId>,
82    pub timestamp: String,
83    #[serde(flatten)]
84    pub payload: SessionNodePayload,
85}
86
87#[derive(Clone, Debug)]
88pub(crate) struct SessionNodeDraft {
89    payload: SessionNodeDraftPayload,
90    caused_by: Option<crate::CausalRef>,
91}
92
93#[derive(Clone, Debug)]
94enum SessionNodeDraftPayload {
95    Message(Message),
96    Plugin {
97        plugin_type: String,
98        body: serde_json::Value,
99    },
100    ProtocolEvent(ProtocolEvent),
101}
102
103impl SessionNodeDraft {
104    pub(crate) fn message(message: Message) -> Self {
105        Self {
106            payload: SessionNodeDraftPayload::Message(message),
107            caused_by: None,
108        }
109    }
110
111    pub(crate) fn plugin(plugin_type: impl Into<String>, body: serde_json::Value) -> Self {
112        Self {
113            payload: SessionNodeDraftPayload::Plugin {
114                plugin_type: plugin_type.into(),
115                body,
116            },
117            caused_by: None,
118        }
119    }
120
121    pub(crate) fn protocol_event(event: ProtocolEvent) -> Self {
122        Self {
123            payload: SessionNodeDraftPayload::ProtocolEvent(event),
124            caused_by: None,
125        }
126    }
127
128    pub(crate) fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
129        self.caused_by = caused_by;
130        self
131    }
132}
133
134#[derive(Clone, Debug, Default, PartialEq)]
135pub struct SharedJsonValue(pub Arc<serde_json::Value>);
136
137impl SharedJsonValue {
138    pub fn new(value: serde_json::Value) -> Self {
139        Self(Arc::new(value))
140    }
141
142    pub fn to_owned(&self) -> serde_json::Value {
143        self.0.as_ref().clone()
144    }
145}
146
147impl AsRef<serde_json::Value> for SharedJsonValue {
148    fn as_ref(&self) -> &serde_json::Value {
149        self.0.as_ref()
150    }
151}
152
153impl serde::Serialize for SharedJsonValue {
154    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
155    where
156        S: serde::Serializer,
157    {
158        self.0.serialize(serializer)
159    }
160}
161
162impl<'de> serde::Deserialize<'de> for SharedJsonValue {
163    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
164    where
165        D: serde::Deserializer<'de>,
166    {
167        let value = serde_json::Value::deserialize(deserializer)?;
168        Ok(Self::new(value))
169    }
170}
171
172#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
173#[serde(tag = "kind", rename_all = "snake_case")]
174#[allow(clippy::large_enum_variant)]
175pub enum SessionNodePayload {
176    Event {
177        event: SessionEventRecord,
178    },
179    Plugin {
180        plugin_type: String,
181        body: SharedJsonValue,
182    },
183}
184
185#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
186pub struct PersistedSessionConfig {
187    pub provider_id: String,
188    pub model: crate::ModelSpec,
189}
190
191#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
192pub struct PersistedTurnState {
193    pub turn_index: usize,
194    #[serde(default)]
195    pub token_usage: TokenUsage,
196    #[serde(default, skip_serializing_if = "Option::is_none")]
197    pub last_prompt_usage: Option<PromptUsage>,
198    #[serde(default)]
199    pub protocol_turn_options: crate::ProtocolTurnOptions,
200}
201
202#[derive(Clone, Debug)]
203pub struct SessionMessageTreeNode {
204    pub node_id: String,
205    pub parent_message_node_id: Option<String>,
206    pub message: Message,
207    pub timestamp: String,
208    pub children: Vec<SessionMessageTreeNode>,
209    pub active: bool,
210}
211
212#[derive(Clone)]
213enum ActiveReadItem<'a> {
214    Message(&'a Message),
215    ToolCall {
216        stable_key: String,
217        record: &'a ToolCallRecord,
218    },
219}
220
221#[derive(Debug)]
222pub(crate) struct ActiveReadReplacement {
223    pub(crate) leaf_node_id: Option<String>,
224    pub(crate) new_tail_nodes: Vec<SessionNodeRecord>,
225    pub(crate) active_events: Vec<SessionEventRecord>,
226    pub(crate) active_messages: Vec<Message>,
227    pub(crate) active_tool_calls: Vec<ToolCallRecord>,
228}
229
230#[derive(Clone, Debug)]
231pub(crate) struct SessionReadModel {
232    pub(crate) active_events: Arc<Vec<SessionEventRecord>>,
233    pub(crate) messages: Arc<Vec<Message>>,
234    pub(crate) tool_calls: Arc<Vec<ToolCallRecord>>,
235    pub(crate) prompt_render_cache: Arc<BaseRenderCache>,
236}
237
238#[derive(Clone, Debug)]
239pub(crate) struct SessionGraphAppendBuilder {
240    existing_ids: HashSet<String>,
241    leaf_node_id: Option<String>,
242    agent_frame_id: Option<crate::AgentFrameId>,
243}
244
245impl SessionGraphAppendBuilder {
246    pub(crate) fn with_agent_frame_id(
247        mut self,
248        agent_frame_id: impl Into<crate::AgentFrameId>,
249    ) -> Self {
250        self.agent_frame_id = Some(agent_frame_id.into());
251        self
252    }
253
254    pub(crate) fn agent_frame_id(&self) -> Option<&str> {
255        self.agent_frame_id.as_deref()
256    }
257
258    pub(crate) fn leaf_node_id(&self) -> Option<&String> {
259        self.leaf_node_id.as_ref()
260    }
261
262    pub(crate) fn set_leaf_node_id(&mut self, leaf_node_id: Option<String>) {
263        self.leaf_node_id = leaf_node_id;
264    }
265
266    pub(crate) fn register_existing_node_ids<'a>(
267        &mut self,
268        node_ids: impl IntoIterator<Item = &'a str>,
269    ) {
270        self.existing_ids
271            .extend(node_ids.into_iter().map(ToOwned::to_owned));
272    }
273
274    pub(crate) fn existing_node_ids(&self) -> &HashSet<String> {
275        &self.existing_ids
276    }
277
278    pub(crate) fn append_messages<I>(&mut self, messages: I) -> Vec<SessionNodeRecord>
279    where
280        I: IntoIterator<Item = Message>,
281    {
282        self.append_drafts(messages.into_iter().map(SessionNodeDraft::message))
283    }
284
285    pub(crate) fn append_tool_call_records<I>(&mut self, records: I) -> Vec<SessionNodeRecord>
286    where
287        I: IntoIterator<Item = ToolCallRecord>,
288    {
289        let mut nodes = Vec::new();
290        for record in records {
291            let stable_key = stable_tool_call_key(&record);
292            let node_id = unique_tool_node_id(&stable_key, &self.existing_ids);
293            self.existing_ids.insert(node_id.clone());
294            let parent_node_id = self.leaf_node_id.clone();
295            self.leaf_node_id = Some(node_id.clone());
296            nodes.push(SessionNodeRecord {
297                node_id,
298                parent_node_id,
299                caused_by: None,
300                agent_frame_id: self.agent_frame_id.clone(),
301                timestamp: Utc::now().to_rfc3339(),
302                payload: SessionNodePayload::Event {
303                    event: SessionEventRecord::Tool(ToolEvent::Invocation { stable_key, record }),
304                },
305            });
306        }
307        nodes
308    }
309
310    pub(crate) fn append_protocol_events<I>(&mut self, events: I) -> Vec<SessionNodeRecord>
311    where
312        I: IntoIterator<Item = ProtocolEvent>,
313    {
314        self.append_drafts(events.into_iter().map(SessionNodeDraft::protocol_event))
315    }
316
317    pub(crate) fn append_drafts<I>(&mut self, drafts: I) -> Vec<SessionNodeRecord>
318    where
319        I: IntoIterator<Item = SessionNodeDraft>,
320    {
321        let mut nodes = Vec::new();
322        for draft in drafts {
323            let parent_node_id = self.leaf_node_id.clone();
324            let (node_id, caused_by, payload) = match draft.payload {
325                SessionNodeDraftPayload::Message(mut message) => {
326                    if message.id.is_empty() {
327                        message.id = fresh_node_id("m");
328                    }
329                    let node_id = unique_message_node_id(&message.id, &self.existing_ids);
330                    let caused_by = draft
331                        .caused_by
332                        .or_else(|| causal_ref_from_message_origin(&message.origin));
333                    (
334                        node_id,
335                        caused_by,
336                        SessionNodePayload::Event {
337                            event: SessionEventRecord::Conversation(
338                                ConversationRecord::from_message(message),
339                            ),
340                        },
341                    )
342                }
343                SessionNodeDraftPayload::Plugin { plugin_type, body } => {
344                    let node_id = fresh_semantic_node_id("plugin", &self.existing_ids);
345                    (
346                        node_id,
347                        draft.caused_by,
348                        SessionNodePayload::Plugin {
349                            plugin_type,
350                            body: SharedJsonValue::new(body),
351                        },
352                    )
353                }
354                SessionNodeDraftPayload::ProtocolEvent(event) => {
355                    let node_id = fresh_semantic_node_id("protocol", &self.existing_ids);
356                    (
357                        node_id,
358                        draft.caused_by,
359                        SessionNodePayload::Event {
360                            event: SessionEventRecord::Protocol(event),
361                        },
362                    )
363                }
364            };
365            self.existing_ids.insert(node_id.clone());
366            self.leaf_node_id = Some(node_id.clone());
367            nodes.push(SessionNodeRecord {
368                node_id,
369                parent_node_id,
370                caused_by,
371                agent_frame_id: self.agent_frame_id.clone(),
372                timestamp: Utc::now().to_rfc3339(),
373                payload,
374            });
375        }
376        nodes
377    }
378}
379
380#[derive(Debug, Clone)]
381struct SessionGraphCache {
382    by_id: HashMap<String, usize>,
383    active_path_indices: Vec<usize>,
384    active_events: Arc<Vec<SessionEventRecord>>,
385    active_messages: Arc<Vec<Message>>,
386    /// Index from `Message::id` to its position in `active_messages`,
387    /// kept in sync with the vec so dedup on append is O(1) instead of an
388    /// O(n) linear scan (which made long sessions quadratic in message
389    /// count).
390    active_message_ids: HashMap<String, usize>,
391    active_tool_calls: Arc<Vec<ToolCallRecord>>,
392    /// Memoized render of `active_messages`. Shared with every
393    /// `MessageSequence` built off this read model so the chat projector's
394    /// per-iteration `render_prompt` walk only happens once per turn.
395    /// Replaced (not invalidated in-place) whenever `active_messages`
396    /// changes — the `Arc` identity tracks the cache's validity.
397    prompt_render_cache: Arc<BaseRenderCache>,
398}
399
400impl SessionGraphCache {
401    fn build(graph: &SessionGraph) -> Self {
402        let by_id = graph
403            .nodes
404            .iter()
405            .enumerate()
406            .map(|(idx, node)| (node.node_id.clone(), idx))
407            .collect::<HashMap<_, _>>();
408        let mut active_path_indices = Vec::new();
409        let mut current = graph
410            .leaf_node_id
411            .as_ref()
412            .and_then(|node_id| by_id.get(node_id).copied());
413        while let Some(idx) = current {
414            active_path_indices.push(idx);
415            current = graph.nodes[idx]
416                .parent_node_id
417                .as_ref()
418                .and_then(|node_id| by_id.get(node_id).copied());
419        }
420        active_path_indices.reverse();
421
422        let mut cache = Self {
423            by_id,
424            active_path_indices,
425            active_events: Arc::new(Vec::new()),
426            active_messages: Arc::new(Vec::new()),
427            active_message_ids: HashMap::new(),
428            active_tool_calls: Arc::new(Vec::new()),
429            prompt_render_cache: Arc::new(BaseRenderCache::new()),
430        };
431        cache.rebuild_read_model(graph);
432        cache
433    }
434
435    fn rebuild_read_model(&mut self, graph: &SessionGraph) {
436        let mut active_messages = Vec::with_capacity(self.active_path_indices.len());
437        let mut active_message_ids: HashMap<String, usize> =
438            HashMap::with_capacity(self.active_path_indices.len());
439        let mut active_tool_calls = Vec::with_capacity(self.active_path_indices.len());
440        let mut active_events = Vec::with_capacity(self.active_path_indices.len());
441        for idx in &self.active_path_indices {
442            let node = &graph.nodes[*idx];
443            if let Some(event) = node.event() {
444                active_events.push(event.clone());
445            }
446            if let Some(message) = node.message() {
447                if !message.is_transient() && !active_message_ids.contains_key(&message.id) {
448                    active_message_ids.insert(message.id.clone(), active_messages.len());
449                    active_messages.push(message);
450                }
451                continue;
452            }
453            if let Some(event) = node.event()
454                && let SessionEventRecord::Tool(ToolEvent::Invocation { record, .. }) = event
455            {
456                active_tool_calls.push(record.clone());
457                continue;
458            }
459        }
460        self.active_messages = Arc::new(active_messages);
461        self.active_message_ids = active_message_ids;
462        self.active_events = Arc::new(active_events);
463        self.active_tool_calls = Arc::new(active_tool_calls);
464        self.prompt_render_cache = Arc::new(BaseRenderCache::new());
465    }
466
467    fn read_model_for_agent_frame(
468        &self,
469        graph: &SessionGraph,
470        frame_id: &str,
471        include_unscoped: bool,
472    ) -> SessionReadModel {
473        let mut active_messages = Vec::with_capacity(self.active_path_indices.len());
474        let mut active_message_ids = HashSet::new();
475        let mut active_tool_calls = Vec::with_capacity(self.active_path_indices.len());
476        let mut active_events = Vec::with_capacity(self.active_path_indices.len());
477        for idx in &self.active_path_indices {
478            let node = &graph.nodes[*idx];
479            if !node_belongs_to_agent_frame(node, frame_id, include_unscoped) {
480                continue;
481            }
482            if let Some(event) = node.event() {
483                active_events.push(event.clone());
484            }
485            if let Some(message) = node.message() {
486                if !message.is_transient() && active_message_ids.insert(message.id.clone()) {
487                    active_messages.push(message);
488                }
489                continue;
490            }
491            if let Some(event) = node.event()
492                && let SessionEventRecord::Tool(ToolEvent::Invocation { record, .. }) = event
493            {
494                active_tool_calls.push(record.clone());
495            }
496        }
497        SessionReadModel {
498            active_events: Arc::new(active_events),
499            messages: Arc::new(active_messages),
500            tool_calls: Arc::new(active_tool_calls),
501            prompt_render_cache: Arc::new(BaseRenderCache::new()),
502        }
503    }
504
505    fn append_node(
506        &mut self,
507        node_index: usize,
508        node: &SessionNodeRecord,
509        previous_leaf_node_id: Option<&str>,
510    ) {
511        self.by_id.insert(node.node_id.clone(), node_index);
512        let parent_matches_leaf = node.parent_node_id.as_deref() == previous_leaf_node_id;
513        if !parent_matches_leaf {
514            return;
515        }
516        self.active_path_indices.push(node_index);
517        if let Some(event) = node.event() {
518            Arc::make_mut(&mut self.active_events).push(event.clone());
519        }
520        if let Some(message) = node.message() {
521            if !message.is_transient() && !self.active_message_ids.contains_key(&message.id) {
522                let messages = Arc::make_mut(&mut self.active_messages);
523                self.active_message_ids
524                    .insert(message.id.clone(), messages.len());
525                messages.push(message);
526                self.prompt_render_cache = Arc::new(BaseRenderCache::new());
527            }
528            return;
529        }
530        if let Some(event) = node.event()
531            && let SessionEventRecord::Tool(ToolEvent::Invocation { record, .. }) = event
532        {
533            Arc::make_mut(&mut self.active_tool_calls).push(record.clone());
534        }
535    }
536
537    fn reserve_append_capacity(
538        &mut self,
539        additional_nodes: usize,
540        additional_messages: usize,
541        additional_tool_calls: usize,
542    ) {
543        self.by_id.reserve(additional_nodes);
544        self.active_path_indices.reserve(additional_nodes);
545        if additional_messages > 0 {
546            Arc::make_mut(&mut self.active_messages).reserve(additional_messages);
547        }
548        if additional_tool_calls > 0 {
549            Arc::make_mut(&mut self.active_tool_calls).reserve(additional_tool_calls);
550        }
551    }
552}
553
554impl SessionNodeRecord {
555    pub fn event(&self) -> Option<&SessionEventRecord> {
556        match &self.payload {
557            SessionNodePayload::Event { event } => Some(event),
558            SessionNodePayload::Plugin { .. } => None,
559        }
560    }
561
562    pub fn message(&self) -> Option<Message> {
563        match self.event()? {
564            SessionEventRecord::Conversation(record) => Some(record.to_message()),
565            _ => None,
566        }
567    }
568
569    pub fn plugin(&self) -> Option<(&str, &serde_json::Value)> {
570        match &self.payload {
571            SessionNodePayload::Event { .. } => None,
572            SessionNodePayload::Plugin { plugin_type, body } => {
573                Some((plugin_type.as_str(), body.as_ref()))
574            }
575        }
576    }
577
578    pub fn plugin_body<T>(&self) -> Option<T>
579    where
580        T: for<'de> serde::Deserialize<'de>,
581    {
582        let (_, body) = self.plugin()?;
583        T::deserialize(body).ok()
584    }
585}
586
587impl SessionGraph {
588    pub fn append_active_read_delta(
589        &mut self,
590        messages: &[Message],
591        tool_calls: &[ToolCallRecord],
592    ) {
593        self.append_active_read_delta_scoped(None, messages, tool_calls);
594    }
595
596    pub fn append_active_read_delta_for_agent_frame(
597        &mut self,
598        agent_frame_id: &str,
599        messages: &[Message],
600        tool_calls: &[ToolCallRecord],
601    ) {
602        self.append_active_read_delta_scoped(Some(agent_frame_id), messages, tool_calls);
603    }
604
605    fn append_active_read_delta_scoped(
606        &mut self,
607        agent_frame_id: Option<&str>,
608        messages: &[Message],
609        tool_calls: &[ToolCallRecord],
610    ) {
611        let appendable_messages = {
612            let read_model = agent_frame_id
613                .map(|frame_id| self.read_model_for_agent_frame(frame_id, false))
614                .unwrap_or_else(|| self.read_model());
615            let mut seen_message_ids = read_model
616                .messages
617                .iter()
618                .map(|message| message.id.as_str())
619                .collect::<HashSet<_>>();
620            messages
621                .iter()
622                .filter(|message| {
623                    !message.is_transient() && seen_message_ids.insert(message.id.as_str())
624                })
625                .cloned()
626                .collect::<Vec<_>>()
627        };
628        let read_model = agent_frame_id
629            .map(|frame_id| self.read_model_for_agent_frame(frame_id, false))
630            .unwrap_or_else(|| self.read_model());
631        let mut seen_tool_call_keys = read_model
632            .tool_calls
633            .iter()
634            .map(|record| tool_call_active_read_key(&stable_tool_call_key(record), record))
635            .collect::<HashSet<_>>();
636        let appendable_tool_calls = tool_calls
637            .iter()
638            .filter_map(|record| {
639                let stable_key = stable_tool_call_key(record);
640                let active_read_key = tool_call_active_read_key(&stable_key, record);
641                seen_tool_call_keys
642                    .insert(active_read_key)
643                    .then_some(record.clone())
644            })
645            .collect::<Vec<_>>();
646
647        self.reserve_append_capacity(
648            appendable_messages.len() + appendable_tool_calls.len(),
649            appendable_messages.len(),
650            appendable_tool_calls.len(),
651        );
652        self.append_message_batch_scoped(agent_frame_id, appendable_messages);
653        self.append_tool_call_records_scoped(agent_frame_id, appendable_tool_calls);
654    }
655
656    pub(crate) fn append_active_conversation_messages_for_agent_frame(
657        &mut self,
658        agent_frame_id: &str,
659        messages: &[Message],
660    ) {
661        self.append_active_conversation_messages_scoped(Some(agent_frame_id), messages);
662    }
663
664    fn append_active_conversation_messages_scoped(
665        &mut self,
666        agent_frame_id: Option<&str>,
667        messages: &[Message],
668    ) {
669        let appendable_messages = messages
670            .iter()
671            .filter(|message| !message.is_transient())
672            .cloned()
673            .collect::<Vec<_>>();
674        self.reserve_append_capacity(appendable_messages.len(), appendable_messages.len(), 0);
675        self.append_message_batch_scoped(agent_frame_id, appendable_messages);
676    }
677
678    pub fn from_nodes(nodes: Vec<SessionNodeRecord>, leaf_node_id: Option<String>) -> Self {
679        Self {
680            inner: Arc::new(SessionGraphData {
681                nodes,
682                leaf_node_id,
683            }),
684            cache: Arc::new(OnceLock::new()),
685        }
686    }
687
688    pub(crate) fn append_builder(&self) -> SessionGraphAppendBuilder {
689        SessionGraphAppendBuilder {
690            existing_ids: self.nodes.iter().map(|node| node.node_id.clone()).collect(),
691            leaf_node_id: self.leaf_node_id.clone(),
692            agent_frame_id: None,
693        }
694    }
695
696    fn invalidate_cache(&mut self) {
697        self.cache = Arc::new(OnceLock::new());
698    }
699
700    fn data_mut(&mut self) -> &mut SessionGraphData {
701        self.invalidate_cache();
702        Arc::make_mut(&mut self.inner)
703    }
704
705    fn reserve_append_capacity(
706        &mut self,
707        additional_nodes: usize,
708        additional_messages: usize,
709        additional_tool_calls: usize,
710    ) {
711        if additional_nodes == 0 {
712            return;
713        }
714        self.detach_initialized_cache_for_append();
715        Arc::make_mut(&mut self.inner)
716            .nodes
717            .reserve(additional_nodes);
718        if let Some(cache_lock) = Arc::get_mut(&mut self.cache)
719            && let Some(cache) = cache_lock.get_mut()
720        {
721            cache.reserve_append_capacity(
722                additional_nodes,
723                additional_messages,
724                additional_tool_calls,
725            );
726        }
727    }
728
729    fn detach_initialized_cache_for_append(&mut self) {
730        if Arc::get_mut(&mut self.cache).is_some() {
731            return;
732        }
733        let Some(cache) = self.cache.get().cloned() else {
734            self.invalidate_cache();
735            return;
736        };
737        let lock = OnceLock::new();
738        let _ = lock.set(cache);
739        self.cache = Arc::new(lock);
740    }
741
742    fn cache(&self) -> &SessionGraphCache {
743        self.cache.get_or_init(|| SessionGraphCache::build(self))
744    }
745
746    fn append_message_batch_scoped(
747        &mut self,
748        agent_frame_id: Option<&str>,
749        messages: Vec<Message>,
750    ) {
751        if messages.is_empty() {
752            return;
753        }
754        self.append_node_drafts_scoped(
755            agent_frame_id,
756            messages.into_iter().map(SessionNodeDraft::message),
757        );
758    }
759
760    fn append_prebuilt_nodes(&mut self, nodes: Vec<SessionNodeRecord>) {
761        if nodes.is_empty() {
762            return;
763        }
764
765        self.detach_initialized_cache_for_append();
766        if let Some(cache_lock) = Arc::get_mut(&mut self.cache)
767            && let Some(cache) = cache_lock.get_mut()
768        {
769            let data = Arc::make_mut(&mut self.inner);
770            for node in nodes {
771                let previous_leaf = data.leaf_node_id.clone();
772                let node_id = node.node_id.clone();
773                data.nodes.push(node);
774                cache.append_node(
775                    data.nodes.len() - 1,
776                    data.nodes.last().expect("just appended graph node"),
777                    previous_leaf.as_deref(),
778                );
779                data.leaf_node_id = Some(node_id);
780            }
781            return;
782        }
783
784        let data = self.data_mut();
785        for node in nodes {
786            data.leaf_node_id = Some(node.node_id.clone());
787            data.nodes.push(node);
788        }
789    }
790
791    pub fn append_message(&mut self, message: Message) -> String {
792        self.append_node_draft(SessionNodeDraft::message(message))
793    }
794
795    pub fn append_plugin(
796        &mut self,
797        plugin_type: impl Into<String>,
798        body: serde_json::Value,
799    ) -> String {
800        self.append_node_draft(SessionNodeDraft::plugin(plugin_type, body))
801    }
802
803    pub fn active_path_nodes(&self) -> Vec<&SessionNodeRecord> {
804        self.cache()
805            .active_path_indices
806            .iter()
807            .map(|idx| &self.nodes[*idx])
808            .collect()
809    }
810
811    pub(crate) fn read_model(&self) -> SessionReadModel {
812        let cache = self.cache();
813        SessionReadModel {
814            active_events: Arc::clone(&cache.active_events),
815            messages: Arc::clone(&cache.active_messages),
816            tool_calls: Arc::clone(&cache.active_tool_calls),
817            prompt_render_cache: Arc::clone(&cache.prompt_render_cache),
818        }
819    }
820
821    pub(crate) fn read_model_for_agent_frame(
822        &self,
823        frame_id: &str,
824        include_unscoped: bool,
825    ) -> SessionReadModel {
826        if frame_id.is_empty() {
827            return self.read_model();
828        }
829        self.cache()
830            .read_model_for_agent_frame(self, frame_id, include_unscoped)
831    }
832
833    pub fn replace_active_tool_calls(&mut self, tool_calls: &[ToolCallRecord]) {
834        let messages = Arc::clone(&self.cache().active_messages);
835        self.replace_active_read_state(messages.as_slice(), tool_calls);
836    }
837
838    pub fn append_protocol_event(&mut self, event: ProtocolEvent) -> String {
839        self.append_node_draft(SessionNodeDraft::protocol_event(event))
840    }
841
842    pub(crate) fn append_node_draft(&mut self, draft: SessionNodeDraft) -> String {
843        self.append_node_drafts([draft])
844            .into_iter()
845            .next()
846            .expect("single draft append must create one node")
847    }
848
849    pub(crate) fn append_node_drafts<I>(&mut self, drafts: I) -> Vec<String>
850    where
851        I: IntoIterator<Item = SessionNodeDraft>,
852    {
853        self.append_node_drafts_scoped(None, drafts)
854    }
855
856    pub(crate) fn append_node_drafts_for_agent_frame<I>(
857        &mut self,
858        agent_frame_id: &str,
859        drafts: I,
860    ) -> Vec<String>
861    where
862        I: IntoIterator<Item = SessionNodeDraft>,
863    {
864        self.append_node_drafts_scoped(Some(agent_frame_id), drafts)
865    }
866
867    fn append_node_drafts_scoped<I>(
868        &mut self,
869        agent_frame_id: Option<&str>,
870        drafts: I,
871    ) -> Vec<String>
872    where
873        I: IntoIterator<Item = SessionNodeDraft>,
874    {
875        let mut builder = self.append_builder();
876        if let Some(agent_frame_id) = agent_frame_id {
877            builder = builder.with_agent_frame_id(agent_frame_id.to_string());
878        }
879        let nodes = builder.append_drafts(drafts);
880        let node_ids = nodes
881            .iter()
882            .map(|node| node.node_id.clone())
883            .collect::<Vec<_>>();
884        self.append_prebuilt_nodes(nodes);
885        node_ids
886    }
887
888    fn append_tool_call_records_scoped<I>(
889        &mut self,
890        agent_frame_id: Option<&str>,
891        records: I,
892    ) -> Vec<String>
893    where
894        I: IntoIterator<Item = ToolCallRecord>,
895    {
896        let mut builder = self.append_builder();
897        if let Some(agent_frame_id) = agent_frame_id {
898            builder = builder.with_agent_frame_id(agent_frame_id.to_string());
899        }
900        let nodes = builder.append_tool_call_records(records);
901        let node_ids = nodes
902            .iter()
903            .map(|node| node.node_id.clone())
904            .collect::<Vec<_>>();
905        self.append_prebuilt_nodes(nodes);
906        node_ids
907    }
908
909    pub fn user_message_count(&self) -> usize {
910        self.nodes
911            .iter()
912            .filter_map(SessionNodeRecord::message)
913            .filter(|message| matches!(message.role, MessageRole::User))
914            .count()
915    }
916
917    pub fn first_user_message(&self) -> String {
918        self.nodes
919            .iter()
920            .filter_map(SessionNodeRecord::message)
921            .find(|message| matches!(message.role, MessageRole::User))
922            .map(|message| first_message_search_text(&message))
923            .unwrap_or_default()
924    }
925
926    pub fn branch_to(&mut self, node_id: Option<String>) {
927        self.data_mut().leaf_node_id = node_id;
928    }
929
930    pub fn set_leaf_node_id(&mut self, node_id: Option<String>) {
931        self.data_mut().leaf_node_id = node_id;
932    }
933
934    pub fn push_node_record(&mut self, node: SessionNodeRecord) {
935        self.data_mut().nodes.push(node);
936    }
937
938    pub fn extend_node_records<I>(&mut self, nodes: I)
939    where
940        I: IntoIterator<Item = SessionNodeRecord>,
941    {
942        self.data_mut().nodes.extend(nodes);
943    }
944
945    /// Append nodes that extend the current active path, advancing the
946    /// leaf to the last node and updating the cache incrementally
947    /// instead of invalidating it. Use this when the appended nodes are
948    /// genuinely new descendants of the current leaf — e.g. the
949    /// turn-driver merging turn-local graph editor deltas into the base graph.
950    /// Use `extend_node_records` + `set_leaf_node_id` for store-side
951    /// replay paths that don't follow the active-path append shape.
952    pub fn extend_active_path(&mut self, nodes: Vec<SessionNodeRecord>) {
953        self.append_prebuilt_nodes(nodes);
954    }
955
956    pub fn active_path_contains(&self, node_id: &str) -> bool {
957        self.active_path_nodes()
958            .into_iter()
959            .any(|node| node.node_id == node_id)
960    }
961
962    /// If `leaf_node_id` points to a node that no longer exists in
963    /// `self.nodes` (e.g. after compaction rewrote the graph, or a
964    /// stored session referenced a node that was later purged), fall
965    /// back to the most recent message node. Returns `true` if the
966    /// leaf was repaired. Call this on load paths where an orphan
967    /// leaf would project to an empty transcript and silently drop
968    /// the user's history.
969    pub fn heal_orphaned_leaf(&mut self) -> bool {
970        if let Some(leaf) = self.leaf_node_id.as_ref()
971            && self.find_node(leaf).is_none()
972        {
973            let fallback = self
974                .nodes
975                .iter()
976                .rev()
977                .find(|node| node.message().is_some())
978                .map(|node| node.node_id.clone());
979            self.data_mut().leaf_node_id = fallback;
980            return true;
981        }
982        false
983    }
984
985    pub fn fork_current_path(&self) -> SessionGraph {
986        let path = self.active_path_nodes();
987        SessionGraph::from_nodes(
988            path.into_iter().cloned().collect(),
989            self.leaf_node_id.clone(),
990        )
991    }
992
993    pub fn find_node(&self, node_id: &str) -> Option<&SessionNodeRecord> {
994        self.cache()
995            .by_id
996            .get(node_id)
997            .and_then(|idx| self.nodes.get(*idx))
998    }
999
1000    pub fn node_index(&self, node_id: &str) -> Option<usize> {
1001        self.cache().by_id.get(node_id).copied()
1002    }
1003
1004    pub fn replace_active_read_state(
1005        &mut self,
1006        messages: &[Message],
1007        tool_calls: &[ToolCallRecord],
1008    ) {
1009        self.replace_active_read_state_scoped(None, messages, tool_calls);
1010    }
1011
1012    pub fn replace_active_read_state_for_agent_frame(
1013        &mut self,
1014        agent_frame_id: &str,
1015        messages: &[Message],
1016        tool_calls: &[ToolCallRecord],
1017    ) {
1018        self.replace_active_read_state_scoped(Some(agent_frame_id), messages, tool_calls);
1019    }
1020
1021    fn replace_active_read_state_scoped(
1022        &mut self,
1023        agent_frame_id: Option<&str>,
1024        messages: &[Message],
1025        tool_calls: &[ToolCallRecord],
1026    ) {
1027        let current_nodes = self.active_path_nodes();
1028        let existing_ids = self
1029            .nodes
1030            .iter()
1031            .map(|node| node.node_id.clone())
1032            .collect::<HashSet<_>>();
1033        let replacement = build_active_read_replacement(
1034            current_nodes,
1035            &existing_ids,
1036            agent_frame_id,
1037            messages,
1038            tool_calls,
1039        );
1040        let data = self.data_mut();
1041        data.leaf_node_id = replacement.leaf_node_id;
1042        data.nodes.extend(replacement.new_tail_nodes);
1043    }
1044
1045    pub fn from_active_read_state(messages: &[Message], tool_calls: &[ToolCallRecord]) -> Self {
1046        let mut graph = Self::default();
1047        graph.replace_active_read_state(messages, tool_calls);
1048        graph
1049    }
1050
1051    pub fn message_tree(&self) -> Vec<SessionMessageTreeNode> {
1052        let active_message_ids = self
1053            .active_path_nodes()
1054            .into_iter()
1055            .filter_map(|node| node.message().map(|message| message.id.clone()))
1056            .collect::<HashSet<_>>();
1057
1058        let message_nodes = self
1059            .nodes
1060            .iter()
1061            .filter_map(|node| {
1062                let message = node.message()?.clone();
1063                let parent_message_node_id =
1064                    self.nearest_message_ancestor(node.parent_node_id.as_deref());
1065                Some(SessionMessageTreeNode {
1066                    node_id: node.node_id.clone(),
1067                    parent_message_node_id,
1068                    message,
1069                    timestamp: node.timestamp.clone(),
1070                    children: Vec::new(),
1071                    active: active_message_ids.contains(&node.node_id),
1072                })
1073            })
1074            .collect::<Vec<_>>();
1075
1076        build_tree(message_nodes)
1077    }
1078
1079    fn nearest_message_ancestor(&self, node_id: Option<&str>) -> Option<String> {
1080        let by_id = self
1081            .nodes
1082            .iter()
1083            .map(|node| (node.node_id.as_str(), node))
1084            .collect::<HashMap<_, _>>();
1085        let mut current = node_id.and_then(|id| by_id.get(id).copied());
1086        while let Some(node) = current {
1087            if node.message().is_some() {
1088                return Some(node.node_id.clone());
1089            }
1090            current = node
1091                .parent_node_id
1092                .as_deref()
1093                .and_then(|parent| by_id.get(parent).copied());
1094        }
1095        None
1096    }
1097}
1098
1099fn build_tree(mut nodes: Vec<SessionMessageTreeNode>) -> Vec<SessionMessageTreeNode> {
1100    let mut children_by_parent = HashMap::<Option<String>, Vec<SessionMessageTreeNode>>::new();
1101    for node in nodes.drain(..) {
1102        children_by_parent
1103            .entry(node.parent_message_node_id.clone())
1104            .or_default()
1105            .push(node);
1106    }
1107    let mut roots = build_tree_children(None, &mut children_by_parent);
1108    sort_tree(&mut roots);
1109    roots
1110}
1111
1112fn sort_tree(nodes: &mut [SessionMessageTreeNode]) {
1113    nodes.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
1114    for node in nodes {
1115        sort_tree(&mut node.children);
1116    }
1117}
1118
1119fn build_tree_children(
1120    parent_id: Option<String>,
1121    children_by_parent: &mut HashMap<Option<String>, Vec<SessionMessageTreeNode>>,
1122) -> Vec<SessionMessageTreeNode> {
1123    let mut children = children_by_parent.remove(&parent_id).unwrap_or_default();
1124    for child in &mut children {
1125        child.children = build_tree_children(Some(child.node_id.clone()), children_by_parent);
1126    }
1127    children
1128}
1129
1130fn node_belongs_to_agent_frame(
1131    node: &SessionNodeRecord,
1132    frame_id: &str,
1133    include_unscoped: bool,
1134) -> bool {
1135    match node.agent_frame_id.as_deref() {
1136        Some(node_frame_id) => node_frame_id == frame_id,
1137        None => include_unscoped,
1138    }
1139}
1140
1141fn build_active_read_items<'a>(
1142    messages: &'a [Message],
1143    tool_calls: &'a [ToolCallRecord],
1144) -> Vec<ActiveReadItem<'a>> {
1145    let mut first_message_for_call = HashMap::<String, usize>::new();
1146    let active_messages = messages
1147        .iter()
1148        .filter(|message| !message.is_transient())
1149        .collect::<Vec<_>>();
1150    for (idx, message) in active_messages.iter().enumerate() {
1151        for part in message.parts.iter() {
1152            if let Some(call_id) = &part.tool_call_id {
1153                first_message_for_call.entry(call_id.clone()).or_insert(idx);
1154            }
1155        }
1156    }
1157
1158    let mut anchored = HashMap::<usize, Vec<ActiveReadItem<'a>>>::new();
1159    for record in tool_calls {
1160        let stable_key = stable_tool_call_key(record);
1161        let anchor = record
1162            .call_id
1163            .as_ref()
1164            .and_then(|call_id| first_message_for_call.get(call_id).copied())
1165            .unwrap_or_else(|| active_messages.len().saturating_sub(1));
1166        anchored
1167            .entry(anchor)
1168            .or_default()
1169            .push(ActiveReadItem::ToolCall { stable_key, record });
1170    }
1171
1172    let mut out = Vec::new();
1173    for (idx, message) in active_messages.iter().enumerate() {
1174        out.push(ActiveReadItem::Message(message));
1175        if let Some(items) = anchored.remove(&idx) {
1176            out.extend(items);
1177        }
1178    }
1179    out
1180}
1181
1182pub(crate) fn build_active_read_replacement<'a>(
1183    current_nodes: impl IntoIterator<Item = &'a SessionNodeRecord>,
1184    existing_node_ids: &HashSet<String>,
1185    agent_frame_id: Option<&str>,
1186    messages: &[Message],
1187    tool_calls: &[ToolCallRecord],
1188) -> ActiveReadReplacement {
1189    let target = build_active_read_items(messages, tool_calls);
1190
1191    let mut active_events = Vec::new();
1192    let mut active_messages = Vec::new();
1193    let mut active_message_ids = HashSet::new();
1194    let mut active_tool_calls = Vec::new();
1195    let mut seen_active_read_keys = HashSet::new();
1196    let mut target_idx = 0usize;
1197    let mut leaf_node_id = None;
1198    for node in current_nodes {
1199        if node
1200            .message()
1201            .map(|message| message.is_transient())
1202            .unwrap_or(false)
1203        {
1204            continue;
1205        }
1206        if let Some(key) = recognized_active_read_key(node) {
1207            if !seen_active_read_keys.insert(key.clone()) {
1208                continue;
1209            }
1210            let Some(target_item) = target.get(target_idx) else {
1211                break;
1212            };
1213            if key != active_read_item_key(target_item) {
1214                break;
1215            }
1216            push_active_read_node(
1217                node,
1218                &mut active_events,
1219                &mut active_messages,
1220                &mut active_message_ids,
1221                &mut active_tool_calls,
1222            );
1223            leaf_node_id = Some(node.node_id.clone());
1224            target_idx += 1;
1225        } else {
1226            push_active_read_node(
1227                node,
1228                &mut active_events,
1229                &mut active_messages,
1230                &mut active_message_ids,
1231                &mut active_tool_calls,
1232            );
1233            leaf_node_id = Some(node.node_id.clone());
1234        }
1235    }
1236
1237    let mut new_node_ids = HashSet::new();
1238    let mut new_tail_nodes = Vec::new();
1239
1240    for item in target.into_iter().skip(target_idx) {
1241        let parent_node_id = leaf_node_id.clone();
1242        let node = match item {
1243            ActiveReadItem::Message(message) => {
1244                let node_id = unique_message_node_id_for_replacement(
1245                    &message.id,
1246                    existing_node_ids,
1247                    &new_node_ids,
1248                );
1249                SessionNodeRecord {
1250                    node_id,
1251                    parent_node_id,
1252                    caused_by: causal_ref_from_message_origin(&message.origin),
1253                    agent_frame_id: agent_frame_id.map(ToOwned::to_owned),
1254                    timestamp: Utc::now().to_rfc3339(),
1255                    payload: SessionNodePayload::Event {
1256                        event: SessionEventRecord::Conversation(ConversationRecord::from_message(
1257                            message.clone(),
1258                        )),
1259                    },
1260                }
1261            }
1262            ActiveReadItem::ToolCall { stable_key, record } => {
1263                let node_id = unique_tool_node_id_for_replacement(
1264                    &stable_key,
1265                    existing_node_ids,
1266                    &new_node_ids,
1267                );
1268                SessionNodeRecord {
1269                    node_id,
1270                    parent_node_id,
1271                    caused_by: None,
1272                    agent_frame_id: agent_frame_id.map(ToOwned::to_owned),
1273                    timestamp: Utc::now().to_rfc3339(),
1274                    payload: SessionNodePayload::Event {
1275                        event: SessionEventRecord::Tool(ToolEvent::Invocation {
1276                            stable_key,
1277                            record: record.clone(),
1278                        }),
1279                    },
1280                }
1281            }
1282        };
1283        new_node_ids.insert(node.node_id.clone());
1284        leaf_node_id = Some(node.node_id.clone());
1285        push_active_read_node(
1286            &node,
1287            &mut active_events,
1288            &mut active_messages,
1289            &mut active_message_ids,
1290            &mut active_tool_calls,
1291        );
1292        new_tail_nodes.push(node);
1293    }
1294
1295    ActiveReadReplacement {
1296        leaf_node_id,
1297        new_tail_nodes,
1298        active_events,
1299        active_messages,
1300        active_tool_calls,
1301    }
1302}
1303
1304fn push_active_read_node(
1305    node: &SessionNodeRecord,
1306    active_events: &mut Vec<SessionEventRecord>,
1307    active_messages: &mut Vec<Message>,
1308    active_message_ids: &mut HashSet<String>,
1309    active_tool_calls: &mut Vec<ToolCallRecord>,
1310) {
1311    if let Some(event) = node.event() {
1312        active_events.push(event.clone());
1313    }
1314    if let Some(message) = node.message() {
1315        if !message.is_transient() && active_message_ids.insert(message.id.clone()) {
1316            active_messages.push(message);
1317        }
1318        return;
1319    }
1320    if let Some(SessionEventRecord::Tool(ToolEvent::Invocation { record, .. })) = node.event() {
1321        active_tool_calls.push(record.clone());
1322    }
1323}
1324
1325fn recognized_active_read_key(node: &SessionNodeRecord) -> Option<String> {
1326    match &node.payload {
1327        SessionNodePayload::Event { event } => match event {
1328            SessionEventRecord::Conversation(record) => Some(format!("message:{}", record.id)),
1329            SessionEventRecord::Tool(ToolEvent::Invocation { stable_key, record }) => {
1330                Some(tool_call_active_read_key(stable_key, record))
1331            }
1332            _ => None,
1333        },
1334        SessionNodePayload::Plugin { .. } => None,
1335    }
1336}
1337
1338fn active_read_item_key(item: &ActiveReadItem<'_>) -> String {
1339    match item {
1340        ActiveReadItem::Message(message) => format!("message:{}", message.id),
1341        ActiveReadItem::ToolCall { stable_key, record } => {
1342            tool_call_active_read_key(stable_key, record)
1343        }
1344    }
1345}
1346
1347fn tool_call_active_read_key(stable_key: &str, record: &ToolCallRecord) -> String {
1348    let fingerprint = serde_json::to_string(record).unwrap_or_default();
1349    format!("tool_call:{stable_key}:{fingerprint}")
1350}
1351
1352pub(crate) fn tool_call_record_active_read_key(record: &ToolCallRecord) -> String {
1353    let stable_key = stable_tool_call_key(record);
1354    tool_call_active_read_key(&stable_key, record)
1355}
1356
1357fn causal_ref_from_message_origin(
1358    origin: &Option<crate::MessageOrigin>,
1359) -> Option<crate::CausalRef> {
1360    let Some(crate::MessageOrigin::Process {
1361        process_id,
1362        sequence,
1363        ..
1364    }) = origin
1365    else {
1366        return None;
1367    };
1368    Some(crate::CausalRef::ProcessEvent {
1369        process_id: process_id.clone(),
1370        sequence: *sequence,
1371    })
1372}
1373
1374fn unique_tool_node_id(stable_key: &str, existing_ids: &HashSet<String>) -> String {
1375    let base = format!("tool:{}", stable_key);
1376    if !existing_ids.contains(&base) {
1377        return base;
1378    }
1379    loop {
1380        let candidate = format!("tool:{}:{}", stable_key, uuid::Uuid::new_v4());
1381        if !existing_ids.contains(&candidate) {
1382            return candidate;
1383        }
1384    }
1385}
1386
1387fn unique_tool_node_id_for_replacement(
1388    stable_key: &str,
1389    existing_ids: &HashSet<String>,
1390    new_ids: &HashSet<String>,
1391) -> String {
1392    let base = format!("tool:{}", stable_key);
1393    if !existing_ids.contains(&base) && !new_ids.contains(&base) {
1394        return base;
1395    }
1396    loop {
1397        let candidate = format!("tool:{}:{}", stable_key, uuid::Uuid::new_v4());
1398        if !existing_ids.contains(&candidate) && !new_ids.contains(&candidate) {
1399            return candidate;
1400        }
1401    }
1402}
1403
1404fn fresh_semantic_node_id(prefix: &str, existing_ids: &HashSet<String>) -> String {
1405    loop {
1406        let candidate = format!("{prefix}:{}", uuid::Uuid::new_v4().simple());
1407        if !existing_ids.contains(&candidate) {
1408            return candidate;
1409        }
1410    }
1411}
1412
1413fn unique_message_node_id(message_id: &str, existing_ids: &HashSet<String>) -> String {
1414    if !existing_ids.contains(message_id) {
1415        return message_id.to_string();
1416    }
1417    let base = format!("message:{message_id}");
1418    if !existing_ids.contains(&base) {
1419        return base;
1420    }
1421    for suffix in 2.. {
1422        let candidate = format!("{base}:{suffix}");
1423        if !existing_ids.contains(&candidate) {
1424            return candidate;
1425        }
1426    }
1427    unreachable!("message node id space exhausted")
1428}
1429
1430fn unique_message_node_id_for_replacement(
1431    message_id: &str,
1432    existing_ids: &HashSet<String>,
1433    new_ids: &HashSet<String>,
1434) -> String {
1435    if !existing_ids.contains(message_id) && !new_ids.contains(message_id) {
1436        return message_id.to_string();
1437    }
1438    let base = format!("message:{message_id}");
1439    if !existing_ids.contains(&base) && !new_ids.contains(&base) {
1440        return base;
1441    }
1442    for suffix in 2.. {
1443        let candidate = format!("{base}:{suffix}");
1444        if !existing_ids.contains(&candidate) && !new_ids.contains(&candidate) {
1445            return candidate;
1446        }
1447    }
1448    unreachable!("message node id space exhausted")
1449}
1450
1451fn fresh_node_id(prefix: &str) -> String {
1452    format!("{prefix}{}", uuid::Uuid::new_v4().simple())
1453}
1454
1455fn stable_tool_call_key(record: &ToolCallRecord) -> String {
1456    if let Some(call_id) = record
1457        .call_id
1458        .as_ref()
1459        .filter(|call_id| !call_id.is_empty())
1460    {
1461        return call_id.clone();
1462    }
1463    let raw = serde_json::to_vec(&(record.tool.clone(), &record.args, &record.output))
1464        .unwrap_or_else(|_| b"tool-call".to_vec());
1465    let digest = sha2::Sha256::digest(raw);
1466    format!("anon-{}", &format!("{digest:x}")[..12])
1467}
1468
1469fn first_message_search_text(message: &Message) -> String {
1470    message
1471        .parts
1472        .iter()
1473        .filter_map(|part| match part.kind {
1474            crate::PartKind::ToolCall | crate::PartKind::ToolResult => None,
1475            crate::PartKind::Image => Some("[Image attached]".to_string()),
1476            _ => (!part.content.trim().is_empty()).then(|| part.content.clone()),
1477        })
1478        .collect::<Vec<_>>()
1479        .join("\n\n")
1480        .trim()
1481        .to_string()
1482}
1483
1484#[cfg(test)]
1485mod tests {
1486    use super::*;
1487    use crate::{Part, PartKind, PruneState, ToolCallOutput, shared_parts};
1488
1489    fn text_message(id: &str, role: MessageRole, content: &str) -> Message {
1490        Message {
1491            id: id.to_string(),
1492            role,
1493            parts: shared_parts(vec![Part {
1494                id: format!("{id}.p0"),
1495                kind: PartKind::Text,
1496                content: content.to_string(),
1497                attachment: None,
1498                tool_call_id: None,
1499                tool_name: None,
1500                tool_replay: None,
1501                prune_state: PruneState::Intact,
1502                reasoning_meta: None,
1503                response_meta: None,
1504            }]),
1505            origin: None,
1506        }
1507    }
1508
1509    fn tool_record(call_id: &str) -> ToolCallRecord {
1510        ToolCallRecord {
1511            call_id: Some(call_id.to_string()),
1512            tool: "lookup".to_string(),
1513            args: serde_json::json!({"q": "x"}),
1514            output: ToolCallOutput::success(serde_json::json!({"answer": "y"})),
1515            duration_ms: 3,
1516        }
1517    }
1518
1519    fn protocol_event() -> ProtocolEvent {
1520        ProtocolEvent::typed("test_protocol", serde_json::json!({"step": "started"}))
1521            .expect("protocol event serializes")
1522    }
1523
1524    #[test]
1525    fn typed_append_node_ids_use_semantic_prefixes() {
1526        let mut graph = SessionGraph::default();
1527
1528        let message_id = graph.append_message(text_message("m1", MessageRole::User, "hello"));
1529        graph.append_active_read_delta(&[], &[tool_record("call-1")]);
1530        let protocol_id = graph.append_protocol_event(protocol_event());
1531        let plugin_id = graph.append_plugin("example", serde_json::json!({"ok": true}));
1532
1533        assert_eq!(message_id, "m1");
1534        assert!(protocol_id.starts_with("protocol:"));
1535        assert!(plugin_id.starts_with("plugin:"));
1536
1537        let tool_node = graph
1538            .nodes
1539            .iter()
1540            .find(|node| matches!(node.event(), Some(SessionEventRecord::Tool(_))))
1541            .expect("tool node");
1542        assert_eq!(tool_node.node_id, "tool:call-1");
1543    }
1544
1545    #[test]
1546    fn active_read_replacement_uses_tool_prefix() {
1547        let record = tool_record("call-replace");
1548        let message = text_message("m1", MessageRole::User, "hello");
1549        let graph = SessionGraph::from_active_read_state(&[message], std::slice::from_ref(&record));
1550
1551        let tool_node = graph
1552            .nodes
1553            .iter()
1554            .find(|node| matches!(node.event(), Some(SessionEventRecord::Tool(_))))
1555            .expect("tool node");
1556        assert_eq!(tool_node.node_id, "tool:call-replace");
1557    }
1558
1559    #[test]
1560    fn graph_writers_do_not_put_active_read_events_under_plugin_ids() {
1561        let mut graph = SessionGraph::default();
1562        graph.append_message(text_message("m1", MessageRole::User, "hello"));
1563        graph.append_active_read_delta(&[], &[tool_record("call-1")]);
1564        graph.append_protocol_event(protocol_event());
1565        graph.append_plugin("example", serde_json::json!({"ok": true}));
1566
1567        for node in &graph.nodes {
1568            match node.event() {
1569                Some(SessionEventRecord::Conversation(_) | SessionEventRecord::Tool(_)) => {
1570                    assert!(!node.node_id.starts_with("plugin:"), "{:?}", node);
1571                }
1572                Some(SessionEventRecord::Protocol(_)) => {
1573                    assert!(node.node_id.starts_with("protocol:"), "{:?}", node);
1574                }
1575                None => {
1576                    assert!(node.node_id.starts_with("plugin:"), "{:?}", node);
1577                }
1578            }
1579        }
1580    }
1581}