Skip to main content

lash_core/runtime/
state.rs

1//! Runtime session state and persistence helpers.
2//!
3//! `RuntimeSessionState` is the runtime-private mutable state shape. Public
4//! host/plugin reads use `SessionSnapshot` from the plugin API instead.
5
6use lash_sansio::PromptUsage;
7
8use crate::session_model::{Message, SessionPolicy, TokenUsage, plugin_message_to_message};
9use crate::{PersistedTurnState, SessionSnapshot, ToolCallRecord};
10
11use super::usage::TokenLedgerEntry;
12
13/// The runtime's view of a session: the persistable snapshot fields
14/// **plus** scratch fields the runtime tracks but never persists
15/// (head-revision CAS guard, pending dirty-write buffers, replace-graph
16/// flag). Public serialization goes through [`RuntimeSessionState::to_snapshot`],
17/// which drops runtime-only fields by construction.
18#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
19pub struct RuntimeSessionState {
20    pub session_id: String,
21    #[serde(default)]
22    pub policy: SessionPolicy,
23    #[serde(default)]
24    pub agent_frames: Vec<crate::AgentFrameRecord>,
25    #[serde(default, skip_serializing_if = "String::is_empty")]
26    pub current_agent_frame_id: crate::AgentFrameId,
27    #[serde(default)]
28    pub session_graph: crate::SessionGraph,
29    #[serde(default)]
30    pub turn_index: usize,
31    #[serde(default)]
32    pub token_usage: TokenUsage,
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub last_prompt_usage: Option<PromptUsage>,
35    #[serde(default)]
36    pub protocol_turn_options: crate::ProtocolTurnOptions,
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub tool_state_ref: Option<crate::store::BlobRef>,
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub tool_state_generation: Option<u64>,
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub tool_state_snapshot: Option<crate::ToolState>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub plugin_snapshot_ref: Option<crate::store::BlobRef>,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub plugin_snapshot_revision: Option<u64>,
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub execution_state_ref: Option<crate::store::BlobRef>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub execution_state_snapshot: Option<Vec<u8>>,
53    /// Cost-accounting ledger. Every LLM call (parent turns, subagent
54    /// children, compaction, observers, background helpers) contributes an
55    /// entry keyed by `(source, model)`. Separate from `token_usage`
56    /// which tracks context-window accounting only.
57    #[serde(default, skip_serializing_if = "Vec::is_empty")]
58    pub token_ledger: Vec<TokenLedgerEntry>,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub checkpoint_ref: Option<crate::store::BlobRef>,
61    /// Store head revision observed by the runtime. Commits use it for
62    /// optimistic concurrency; `None` means the runtime is creating the
63    /// first persisted head.
64    #[serde(skip)]
65    pub head_revision: Option<u64>,
66    /// Signals that the next commit must write the full graph (a
67    /// destructive rewrite happened, e.g. `heal_orphaned_leaf`). Cleared
68    /// after the next commit.
69    #[serde(skip)]
70    pub graph_replace_required: bool,
71}
72
73impl RuntimeSessionState {
74    pub fn from_snapshot(snapshot: SessionSnapshot) -> Self {
75        let mut state = Self {
76            session_id: snapshot.session_id,
77            policy: snapshot.policy,
78            agent_frames: snapshot.agent_frames,
79            current_agent_frame_id: snapshot.current_agent_frame_id,
80            session_graph: snapshot.session_graph,
81            turn_index: snapshot.turn_index,
82            token_usage: snapshot.token_usage,
83            last_prompt_usage: snapshot.last_prompt_usage,
84            protocol_turn_options: snapshot.protocol_turn_options,
85            tool_state_ref: snapshot.tool_state_ref,
86            tool_state_generation: snapshot.tool_state_generation,
87            tool_state_snapshot: None,
88            plugin_snapshot_ref: snapshot.plugin_snapshot_ref,
89            plugin_snapshot_revision: snapshot.plugin_snapshot_revision,
90            plugin_snapshot: None,
91            execution_state_ref: snapshot.execution_state_ref,
92            execution_state_snapshot: None,
93            token_ledger: snapshot.token_ledger,
94            checkpoint_ref: snapshot.checkpoint_ref,
95            head_revision: None,
96            graph_replace_required: false,
97        };
98        for frame in &mut state.agent_frames {
99            frame.execution_state_snapshot = None;
100        }
101        state.ensure_agent_frame_initialized();
102        state
103    }
104
105    pub fn to_snapshot(&self) -> SessionSnapshot {
106        let mut agent_frames = self.agent_frames.clone();
107        for frame in &mut agent_frames {
108            frame.execution_state_snapshot = None;
109        }
110        SessionSnapshot {
111            session_id: self.session_id.clone(),
112            policy: self.policy.clone(),
113            agent_frames,
114            current_agent_frame_id: self.current_agent_frame_id.clone(),
115            session_graph: self.session_graph.clone(),
116            turn_index: self.turn_index,
117            token_usage: self.token_usage.clone(),
118            last_prompt_usage: self.last_prompt_usage.clone(),
119            protocol_turn_options: self.protocol_turn_options.clone(),
120            tool_state_ref: self.tool_state_ref.clone(),
121            tool_state_generation: self.tool_state_generation,
122            plugin_snapshot_ref: self.plugin_snapshot_ref.clone(),
123            plugin_snapshot_revision: self.plugin_snapshot_revision,
124            execution_state_ref: self.execution_state_ref.clone(),
125            token_ledger: self.token_ledger.clone(),
126            checkpoint_ref: self.checkpoint_ref.clone(),
127        }
128    }
129
130    pub fn apply_snapshot(&mut self, snapshot: &SessionSnapshot) {
131        self.session_id = snapshot.session_id.clone();
132        self.policy = snapshot.policy.clone();
133        self.agent_frames = snapshot.agent_frames.clone();
134        self.current_agent_frame_id = snapshot.current_agent_frame_id.clone();
135        self.ensure_agent_frame_initialized();
136        self.session_graph = snapshot.session_graph.clone();
137        self.turn_index = snapshot.turn_index;
138        self.token_usage = snapshot.token_usage.clone();
139        self.last_prompt_usage = snapshot.last_prompt_usage.clone();
140        self.protocol_turn_options = snapshot.protocol_turn_options.clone();
141        self.tool_state_ref = snapshot.tool_state_ref.clone();
142        self.tool_state_generation = snapshot.tool_state_generation;
143        self.plugin_snapshot_ref = snapshot.plugin_snapshot_ref.clone();
144        self.plugin_snapshot_revision = snapshot.plugin_snapshot_revision;
145        self.execution_state_ref = snapshot.execution_state_ref.clone();
146        self.token_ledger = snapshot.token_ledger.clone();
147        self.checkpoint_ref = snapshot.checkpoint_ref.clone();
148    }
149
150    pub fn stamp_runtime_state(
151        &mut self,
152        tool_state: Option<&crate::ToolState>,
153        plugin_snapshot: Option<&crate::PluginSessionSnapshot>,
154    ) {
155        self.tool_state_snapshot = tool_state.cloned();
156        self.tool_state_generation = tool_state.map(|snapshot| snapshot.generation());
157        self.plugin_snapshot = plugin_snapshot.cloned();
158    }
159
160    pub fn usage_report(&self) -> super::usage::SessionUsageReport {
161        super::usage::SessionUsageReport::from_entries(&self.token_ledger)
162    }
163
164    pub(crate) fn read_model(&self) -> crate::session_graph::SessionReadModel {
165        self.session_graph.read_model_for_agent_frame(
166            &self.current_agent_frame_id,
167            self.current_agent_frame_is_initial(),
168        )
169    }
170
171    pub fn replace_active_read_state(
172        &mut self,
173        messages: &[Message],
174        tool_calls: &[ToolCallRecord],
175    ) {
176        self.session_graph
177            .replace_active_read_state_for_agent_frame(
178                &self.current_agent_frame_id,
179                messages,
180                tool_calls,
181            );
182        self.graph_replace_required = false;
183    }
184
185    pub fn replace_active_tool_calls(&mut self, tool_calls: &[ToolCallRecord]) {
186        let messages = self.read_model().messages;
187        self.session_graph
188            .replace_active_read_state_for_agent_frame(
189                &self.current_agent_frame_id,
190                messages.as_slice(),
191                tool_calls,
192            );
193        self.graph_replace_required = false;
194    }
195
196    pub fn append_active_read_delta(
197        &mut self,
198        messages: &[Message],
199        tool_calls: &[ToolCallRecord],
200    ) {
201        self.session_graph.append_active_read_delta_for_agent_frame(
202            &self.current_agent_frame_id,
203            messages,
204            tool_calls,
205        );
206    }
207
208    pub fn append_active_conversation_messages(&mut self, messages: &[Message]) {
209        self.session_graph
210            .append_active_conversation_messages_for_agent_frame(
211                &self.current_agent_frame_id,
212                messages,
213            );
214    }
215
216    pub fn read_view(&self) -> crate::SessionReadView {
217        crate::SessionReadView::from_persisted_state(self)
218    }
219
220    pub fn session_graph(&self) -> &crate::SessionGraph {
221        &self.session_graph
222    }
223
224    pub fn policy(&self) -> &SessionPolicy {
225        self.effective_policy()
226    }
227
228    pub fn turn_state(&self) -> PersistedTurnState {
229        PersistedTurnState {
230            turn_index: self.turn_index,
231            token_usage: self.token_usage.clone(),
232            last_prompt_usage: self.last_prompt_usage.clone(),
233            protocol_turn_options: self.protocol_turn_options.clone(),
234        }
235    }
236
237    pub fn token_ledger(&self) -> &[TokenLedgerEntry] {
238        &self.token_ledger
239    }
240
241    pub fn apply_persisted_commit_result(&mut self, result: crate::store::RuntimeCommitResult) {
242        self.head_revision = Some(result.head_revision);
243        self.checkpoint_ref = Some(result.checkpoint_ref);
244        self.tool_state_ref = result.manifest.tool_state_ref;
245        if let Some(snapshot) = self.tool_state_snapshot.as_ref() {
246            self.tool_state_generation = Some(snapshot.generation());
247        } else if self.tool_state_ref.is_none() {
248            self.tool_state_generation = None;
249        }
250        self.plugin_snapshot_ref = result.manifest.plugin_snapshot_ref;
251        self.plugin_snapshot_revision = result.manifest.plugin_snapshot_revision;
252        self.execution_state_ref = result.manifest.execution_state_ref;
253        let execution_state_ref = self.execution_state_ref.clone();
254        if let Some(frame) = self.current_agent_frame_mut() {
255            frame.execution_state_ref = execution_state_ref;
256            frame.execution_state_snapshot = None;
257        }
258        self.graph_replace_required = false;
259        self.tool_state_snapshot = None;
260        self.plugin_snapshot = None;
261        self.execution_state_snapshot = None;
262        if let Some(frame) = self.current_agent_frame_mut() {
263            frame.execution_state_snapshot = None;
264        }
265    }
266
267    pub fn discard_runtime_snapshots(&mut self) {
268        self.tool_state_snapshot = None;
269        self.plugin_snapshot = None;
270        self.execution_state_snapshot = None;
271        if let Some(frame) = self.current_agent_frame_mut() {
272            frame.execution_state_snapshot = None;
273        }
274    }
275
276    pub fn set_execution_state_snapshot(&mut self, execution_state_snapshot: Option<Vec<u8>>) {
277        if execution_state_snapshot.is_none() {
278            self.execution_state_ref = None;
279        }
280        self.execution_state_snapshot = execution_state_snapshot.clone();
281        if let Some(frame) = self.current_agent_frame_mut() {
282            if execution_state_snapshot.is_none() {
283                frame.execution_state_ref = None;
284            }
285            frame.execution_state_snapshot = execution_state_snapshot;
286        }
287    }
288
289    pub fn execution_state_snapshot(&self) -> Option<&[u8]> {
290        self.current_agent_frame()
291            .and_then(|frame| frame.execution_state_snapshot.as_deref())
292            .or(self.execution_state_snapshot.as_deref())
293    }
294
295    pub fn refresh_plugin_snapshots(&mut self, plugins: &crate::PluginSession) {
296        let tool_registry = plugins.tool_registry();
297        let generation = tool_registry.generation();
298        if self.tool_state_ref.is_none() || self.tool_state_generation != Some(generation) {
299            let snapshot = tool_registry.export_state();
300            self.tool_state_generation = Some(snapshot.generation());
301            self.tool_state_snapshot = Some(snapshot);
302        }
303
304        let revision = plugins.snapshot_revision_fingerprint();
305        if self.plugin_snapshot_ref.is_none() || self.plugin_snapshot_revision != Some(revision) {
306            store_plugin_snapshot(&mut self.plugin_snapshot, plugins.snapshot());
307        }
308        self.plugin_snapshot_revision = Some(revision);
309    }
310}
311
312/// Persist a freshly captured plugin snapshot, logging and **retaining the prior
313/// snapshot** when the capture fails.
314///
315/// A failed capture (`Err`) previously collapsed to `None` via `.ok()`, erasing
316/// the last good snapshot — so the next cold rebuild would restore an empty
317/// plugin surface even though a valid snapshot had been captured earlier. Keep
318/// the prior value and surface the error instead.
319pub(crate) fn store_plugin_snapshot(
320    target: &mut Option<crate::PluginSessionSnapshot>,
321    captured: Result<crate::PluginSessionSnapshot, crate::PluginError>,
322) {
323    match captured {
324        Ok(snapshot) => *target = Some(snapshot),
325        Err(err) => tracing::warn!(
326            error = %err,
327            "failed to capture plugin snapshot; retaining the prior snapshot",
328        ),
329    }
330}
331
332impl RuntimeSessionState {
333    pub fn current_agent_frame(&self) -> Option<&crate::AgentFrameRecord> {
334        self.agent_frames
335            .iter()
336            .find(|frame| frame.frame_id == self.current_agent_frame_id)
337    }
338
339    pub fn current_agent_frame_mut(&mut self) -> Option<&mut crate::AgentFrameRecord> {
340        let current_agent_frame_id = self.current_agent_frame_id.clone();
341        self.agent_frames
342            .iter_mut()
343            .find(|frame| frame.frame_id == current_agent_frame_id)
344    }
345
346    pub fn effective_policy(&self) -> &SessionPolicy {
347        self.current_agent_frame()
348            .map(|frame| &frame.assignment.policy)
349            .unwrap_or(&self.policy)
350    }
351
352    pub fn effective_protocol_turn_options(&self) -> &crate::ProtocolTurnOptions {
353        self.current_agent_frame()
354            .map(|frame| &frame.protocol_turn_options)
355            .unwrap_or(&self.protocol_turn_options)
356    }
357
358    pub fn ensure_agent_frame_initialized(&mut self) {
359        if self.current_agent_frame_id.is_empty() {
360            self.current_agent_frame_id = default_agent_frame_id(&self.session_id);
361        }
362        if self
363            .agent_frames
364            .iter()
365            .any(|frame| frame.frame_id == self.current_agent_frame_id)
366        {
367            return;
368        }
369        let mut frame = default_agent_frame(&self.session_id, &self.policy);
370        frame.frame_id = self.current_agent_frame_id.clone();
371        frame.protocol_turn_options = self.protocol_turn_options.clone();
372        frame.execution_state_ref = self.execution_state_ref.clone();
373        frame.execution_state_snapshot = self.execution_state_snapshot.clone();
374        self.agent_frames.push(frame);
375    }
376
377    pub fn reset_initial_agent_frame(
378        &mut self,
379        assignment: crate::AgentFrameAssignment,
380        protocol_turn_options: crate::ProtocolTurnOptions,
381    ) {
382        let frame_id = default_agent_frame_id(&self.session_id);
383        self.policy = assignment.policy.clone();
384        self.protocol_turn_options = protocol_turn_options.clone();
385        self.current_agent_frame_id = frame_id.clone();
386        self.agent_frames = vec![crate::AgentFrameRecord::new(
387            frame_id,
388            self.session_id.clone(),
389            None,
390            crate::AgentFrameReason::Initial,
391            None,
392            assignment,
393            protocol_turn_options,
394        )];
395    }
396
397    pub fn append_agent_frame(&mut self, mut frame: crate::AgentFrameRecord) {
398        let previous_frame_id = self.current_agent_frame_id.clone();
399        for existing in &mut self.agent_frames {
400            if existing.frame_id == previous_frame_id {
401                existing.status = crate::AgentFrameStatus::Superseded;
402            }
403        }
404        if frame.previous_frame_id.is_none() && !previous_frame_id.is_empty() {
405            frame.previous_frame_id = Some(previous_frame_id);
406        }
407        frame.status = crate::AgentFrameStatus::Active;
408        self.policy = frame.assignment.policy.clone();
409        self.protocol_turn_options = frame.protocol_turn_options.clone();
410        self.current_agent_frame_id = frame.frame_id.clone();
411        self.execution_state_ref = frame.execution_state_ref.clone();
412        self.execution_state_snapshot = frame.execution_state_snapshot.clone();
413        self.agent_frames.push(frame);
414    }
415
416    fn current_agent_frame_is_initial(&self) -> bool {
417        self.current_agent_frame()
418            .map(|frame| frame.previous_frame_id.is_none())
419            .unwrap_or(true)
420    }
421}
422
423impl Default for RuntimeSessionState {
424    fn default() -> Self {
425        Self {
426            session_id: "root".to_string(),
427            policy: SessionPolicy::default(),
428            agent_frames: default_agent_frames("root", &SessionPolicy::default()),
429            current_agent_frame_id: default_agent_frame_id("root"),
430            session_graph: crate::SessionGraph::default(),
431            turn_index: 0,
432            token_usage: TokenUsage::default(),
433            last_prompt_usage: None,
434            protocol_turn_options: crate::ProtocolTurnOptions::default(),
435            tool_state_ref: None,
436            tool_state_generation: None,
437            tool_state_snapshot: None,
438            plugin_snapshot_ref: None,
439            plugin_snapshot_revision: None,
440            plugin_snapshot: None,
441            execution_state_ref: None,
442            execution_state_snapshot: None,
443            token_ledger: Vec::new(),
444            checkpoint_ref: None,
445            head_revision: None,
446            graph_replace_required: false,
447        }
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    #[test]
456    fn session_snapshot_serialization_excludes_runtime_only_fields_and_round_trips() {
457        let mut state = RuntimeSessionState {
458            session_id: "snapshot-test".to_string(),
459            policy: SessionPolicy {
460                provider_id: "mock".to_string(),
461                ..SessionPolicy::default()
462            },
463            tool_state_snapshot: Some(crate::ToolState::default()),
464            plugin_snapshot: Some(crate::PluginSessionSnapshot::default()),
465            execution_state_snapshot: Some(vec![1, 2, 3]),
466            head_revision: Some(42),
467            graph_replace_required: true,
468            ..RuntimeSessionState::default()
469        };
470        state.ensure_agent_frame_initialized();
471        if let Some(frame) = state.current_agent_frame_mut() {
472            frame.execution_state_snapshot = Some(vec![4, 5, 6]);
473        }
474
475        let value = serde_json::to_value(state.to_snapshot()).expect("serialize snapshot");
476
477        for runtime_key in [
478            "head_revision",
479            "graph_replace_required",
480            "tool_state_snapshot",
481            "plugin_snapshot",
482            "execution_state_snapshot",
483        ] {
484            assert!(
485                value.get(runtime_key).is_none(),
486                "snapshot unexpectedly exposed {runtime_key}"
487            );
488        }
489        assert!(
490            value["agent_frames"]
491                .as_array()
492                .expect("agent frames")
493                .iter()
494                .all(|frame| frame.get("execution_state_snapshot").is_none())
495        );
496
497        let snapshot: SessionSnapshot = serde_json::from_value(value).expect("round-trip snapshot");
498        let hydrated = RuntimeSessionState::from_snapshot(snapshot);
499
500        assert_eq!(hydrated.session_id, "snapshot-test");
501        assert_eq!(hydrated.policy.recorded_provider_id(), "mock");
502        assert!(hydrated.head_revision.is_none());
503        assert!(!hydrated.graph_replace_required);
504        assert!(hydrated.tool_state_snapshot.is_none());
505        assert!(hydrated.plugin_snapshot.is_none());
506        assert!(hydrated.execution_state_snapshot.is_none());
507        assert!(
508            hydrated
509                .agent_frames
510                .iter()
511                .all(|frame| frame.execution_state_snapshot.is_none())
512        );
513    }
514}
515
516pub(super) fn apply_persisted_session_config(
517    policy: &mut SessionPolicy,
518    config: &crate::PersistedSessionConfig,
519) {
520    policy.model = config.model.clone();
521    policy.provider_id = config.provider_id.clone();
522}
523
524pub(super) fn apply_session_checkpoint(
525    state: &mut RuntimeSessionState,
526    checkpoint: Option<crate::store::HydratedSessionCheckpoint>,
527) {
528    let Some(checkpoint) = checkpoint else {
529        state.tool_state_ref = None;
530        state.tool_state_generation = None;
531        state.tool_state_snapshot = None;
532        state.plugin_snapshot_ref = None;
533        state.plugin_snapshot_revision = None;
534        state.plugin_snapshot = None;
535        state.execution_state_ref = None;
536        state.execution_state_snapshot = None;
537        state.ensure_agent_frame_initialized();
538        return;
539    };
540    state.turn_index = checkpoint.turn_state.turn_index;
541    state.token_usage = checkpoint.turn_state.token_usage;
542    state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
543    state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
544    state.tool_state_ref = checkpoint.tool_state_ref.clone();
545    state.tool_state_generation = checkpoint
546        .tool_state
547        .as_ref()
548        .map(|snapshot| snapshot.generation());
549    state.tool_state_snapshot = checkpoint.tool_state;
550    state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
551    state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
552    state.plugin_snapshot = checkpoint.plugin_snapshot;
553    state.execution_state_ref = checkpoint.execution_state_ref.clone();
554    state.execution_state_snapshot = None;
555    state.ensure_agent_frame_initialized();
556    if let Some(frame) = state.current_agent_frame_mut() {
557        frame.execution_state_ref = checkpoint.execution_state_ref.clone();
558        frame.execution_state_snapshot = checkpoint.execution_state;
559    }
560}
561
562pub(super) fn apply_session_head(
563    state: &mut RuntimeSessionState,
564    head: &crate::store::SessionHead,
565) {
566    state.session_graph = head.graph.clone();
567    state.agent_frames = head.agent_frames.clone();
568    state.current_agent_frame_id = head.current_agent_frame_id.clone();
569    state.checkpoint_ref = head.checkpoint_ref.clone();
570    state.token_ledger = head.token_ledger.clone();
571    state.tool_state_ref = None;
572    state.tool_state_generation = None;
573    state.tool_state_snapshot = None;
574    state.plugin_snapshot_ref = None;
575    state.plugin_snapshot_revision = None;
576    state.plugin_snapshot = None;
577    state.execution_state_ref = None;
578    state.execution_state_snapshot = None;
579    state.ensure_agent_frame_initialized();
580    state.head_revision = Some(head.head_revision);
581    state.graph_replace_required = false;
582    apply_persisted_session_config(&mut state.policy, &head.config);
583}
584
585pub(super) fn append_session_nodes_to_state(
586    state: &mut RuntimeSessionState,
587    nodes: &[crate::SessionAppendNode],
588) -> Vec<String> {
589    let drafts = nodes
590        .iter()
591        .map(session_append_node_draft)
592        .collect::<Vec<_>>();
593    state.ensure_agent_frame_initialized();
594    let node_ids = state
595        .session_graph
596        .append_node_drafts_for_agent_frame(&state.current_agent_frame_id, drafts);
597    normalize_session_graph(state);
598    node_ids
599}
600
601fn session_append_node_draft(
602    node: &crate::SessionAppendNode,
603) -> crate::session_graph::SessionNodeDraft {
604    match node {
605        crate::SessionAppendNode::Message { message, caused_by } => {
606            crate::session_graph::SessionNodeDraft::message(plugin_message_to_message(message))
607                .with_caused_by(caused_by.clone())
608        }
609        crate::SessionAppendNode::ProtocolEvent { event, caused_by } => {
610            crate::session_graph::SessionNodeDraft::protocol_event(event.clone())
611                .with_caused_by(caused_by.clone())
612        }
613        crate::SessionAppendNode::Plugin {
614            plugin_type,
615            body,
616            caused_by,
617        } => crate::session_graph::SessionNodeDraft::plugin(plugin_type.clone(), body.clone())
618            .with_caused_by(caused_by.clone()),
619    }
620}
621
622fn default_agent_frame_id(session_id: &str) -> crate::AgentFrameId {
623    format!("{session_id}:frame:initial")
624}
625
626fn default_agent_frames(session_id: &str, policy: &SessionPolicy) -> Vec<crate::AgentFrameRecord> {
627    vec![default_agent_frame(session_id, policy)]
628}
629
630fn default_agent_frame(session_id: &str, policy: &SessionPolicy) -> crate::AgentFrameRecord {
631    crate::AgentFrameRecord::new(
632        default_agent_frame_id(session_id),
633        session_id.to_string(),
634        None,
635        crate::AgentFrameReason::Initial,
636        None,
637        crate::AgentFrameAssignment::from_policy(policy.clone()),
638        crate::ProtocolTurnOptions::default(),
639    )
640}
641
642/// Heal any graph corruption (orphaned leaf) on load.
643///
644/// Must run BEFORE any residency-based trim (phase-9 feature) because
645/// healing's fallback search relies on having the full node set in RAM.
646/// Under `Residency::ActivePathOnly`, the runtime loads only the active
647/// path; if the leaf doesn't resolve against that reduced set, the
648/// caller falls back to a full `load_session_graph()` + `normalize` +
649/// trim.
650pub(super) fn normalize_session_graph(state: &mut RuntimeSessionState) {
651    if state.session_graph.heal_orphaned_leaf() {
652        state.graph_replace_required = true;
653    }
654}
655
656/// Trim the resident node set according to `Residency`. Called AFTER
657/// `normalize_session_graph` during `from_environment` load. Under
658/// `KeepAll` this is a no-op; under `ActivePathOnly` it replaces the
659/// resident graph with just the active path. Orphans remain on disk —
660/// the host decides whether/when to tombstone + vacuum them via
661/// `LashRuntime::orphaned_node_ids` + the store primitives.
662///
663pub(super) fn apply_residency_on_load(
664    state: &mut RuntimeSessionState,
665    residency: crate::Residency,
666) {
667    match residency {
668        crate::Residency::KeepAll => {}
669        crate::Residency::ActivePathOnly => {
670            state.session_graph = state.session_graph.fork_current_path();
671        }
672    }
673}
674
675#[cfg(test)]
676mod plugin_snapshot_tests {
677    use super::store_plugin_snapshot;
678    use crate::{PluginError, PluginSessionSnapshot};
679
680    #[test]
681    fn ok_capture_overwrites_target() {
682        let mut target = None;
683        store_plugin_snapshot(&mut target, Ok(PluginSessionSnapshot::default()));
684        assert!(target.is_some(), "a successful capture must be stored");
685    }
686
687    #[test]
688    fn failed_capture_retains_prior_snapshot() {
689        // The regression this guards: a failed snapshot capture used to collapse
690        // to `None` via `.ok()`, erasing the last good snapshot so the next cold
691        // rebuild would restore an empty plugin surface. A failure must leave the
692        // prior snapshot intact.
693        let prior = PluginSessionSnapshot::default();
694        let mut target = Some(prior);
695        store_plugin_snapshot(
696            &mut target,
697            Err(PluginError::Snapshot("capture failed".to_string())),
698        );
699        assert!(
700            target.is_some(),
701            "a failed capture must retain the prior snapshot, not erase it"
702        );
703    }
704}
705
706#[cfg(test)]
707mod residency_tests {
708    use super::apply_residency_on_load;
709    use crate::{
710        Message, MessageRole, Part, PartKind, PruneState, Residency, RuntimeSessionState,
711        shared_parts,
712    };
713
714    fn text_message(id: &str, content: &str) -> Message {
715        Message {
716            id: id.to_string(),
717            role: MessageRole::User,
718            parts: shared_parts(vec![Part {
719                id: format!("{id}.p0"),
720                kind: PartKind::Text,
721                content: content.to_string(),
722                attachment: None,
723                tool_call_id: None,
724                tool_name: None,
725                tool_replay: None,
726                prune_state: PruneState::Intact,
727                reasoning_meta: None,
728                response_meta: None,
729            }]),
730            origin: None,
731        }
732    }
733
734    /// Root, an inactive branch off the root, then an active branch off the root.
735    /// Returns the state plus the inactive and active branch node ids.
736    fn branching_state() -> (RuntimeSessionState, String, String) {
737        let mut state = RuntimeSessionState::default();
738        state.append_active_conversation_messages(&[text_message("root", "root")]);
739        let root = state.session_graph.leaf_node_id.clone();
740        state.append_active_conversation_messages(&[text_message("inactive", "inactive branch")]);
741        let inactive_node = state
742            .session_graph
743            .leaf_node_id
744            .clone()
745            .expect("inactive node");
746        state.session_graph.branch_to(root);
747        state.append_active_conversation_messages(&[text_message("active", "active branch")]);
748        let active_node = state
749            .session_graph
750            .leaf_node_id
751            .clone()
752            .expect("active node");
753        (state, inactive_node, active_node)
754    }
755
756    #[test]
757    fn active_path_only_trims_orphan_branches_on_load() {
758        // The durable worker rebuild (and session resume) call this to match the
759        // live runtime's residency. ActivePathOnly drops nodes off the active
760        // path so a rebuilt session does not silently retain the full graph.
761        let (mut state, inactive_node, active_node) = branching_state();
762        assert!(
763            state.session_graph.find_node(&inactive_node).is_some(),
764            "the inactive branch is resident before trimming"
765        );
766        apply_residency_on_load(&mut state, Residency::ActivePathOnly);
767        assert!(
768            state.session_graph.find_node(&inactive_node).is_none(),
769            "ActivePathOnly must drop the orphaned inactive branch on rebuild"
770        );
771        assert!(
772            state.session_graph.find_node(&active_node).is_some(),
773            "the active path must be retained"
774        );
775    }
776
777    #[test]
778    fn keep_all_retains_orphan_branches_on_load() {
779        let (mut state, inactive_node, _active_node) = branching_state();
780        apply_residency_on_load(&mut state, Residency::KeepAll);
781        assert!(
782            state.session_graph.find_node(&inactive_node).is_some(),
783            "KeepAll must retain the full resident graph"
784        );
785    }
786}