Skip to main content

lash_core/runtime/
state.rs

1//! Runtime session state and persistence helpers.
2//!
3//! `RuntimeSessionState` is the runtime-private mutable state shape. Public
4//! host/plugin reads use `SessionSnapshot` from the plugin API instead.
5
6use lash_sansio::PromptUsage;
7
8use crate::session_model::{Message, SessionPolicy, TokenUsage, plugin_message_to_message};
9use crate::{PersistedTurnState, SessionSnapshot};
10
11use super::usage::TokenLedgerEntry;
12
13/// The runtime's view of a session: the persistable snapshot fields
14/// **plus** scratch fields the runtime tracks but never persists
15/// (head-revision CAS guard, pending dirty-write buffers, replace-graph
16/// flag). Public serialization goes through [`RuntimeSessionState::to_snapshot`],
17/// which drops runtime-only fields by construction.
18#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
19pub struct RuntimeSessionState {
20    pub session_id: String,
21    #[serde(default)]
22    pub policy: SessionPolicy,
23    #[serde(default)]
24    pub agent_frames: Vec<crate::AgentFrameRecord>,
25    #[serde(default, skip_serializing_if = "String::is_empty")]
26    pub current_agent_frame_id: crate::AgentFrameId,
27    #[serde(default)]
28    pub session_graph: crate::SessionGraph,
29    #[serde(default)]
30    pub turn_index: usize,
31    #[serde(default)]
32    pub token_usage: TokenUsage,
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub last_prompt_usage: Option<PromptUsage>,
35    #[serde(default)]
36    pub protocol_turn_options: crate::ProtocolTurnOptions,
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub tool_state_ref: Option<crate::store::BlobRef>,
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub tool_state_generation: Option<u64>,
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub tool_state_snapshot: Option<crate::ToolState>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub plugin_snapshot_ref: Option<crate::store::BlobRef>,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub plugin_snapshot_revision: Option<u64>,
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub execution_state_ref: Option<crate::store::BlobRef>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub execution_state_snapshot: Option<Vec<u8>>,
53    /// Cost-accounting ledger. Every LLM call (parent turns, subagent
54    /// children, compaction, observers, background helpers) contributes an
55    /// entry keyed by `(source, model)`. Separate from `token_usage`
56    /// which tracks context-window accounting only.
57    #[serde(default, skip_serializing_if = "Vec::is_empty")]
58    pub token_ledger: Vec<TokenLedgerEntry>,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub checkpoint_ref: Option<crate::store::BlobRef>,
61    /// Store head revision observed by the runtime. Commits use it for
62    /// optimistic concurrency; `None` means the runtime is creating the
63    /// first persisted head.
64    #[serde(skip)]
65    pub head_revision: Option<u64>,
66    /// Signals that the next commit must write the full graph (a
67    /// destructive rewrite happened, e.g. `heal_orphaned_leaf`). Cleared
68    /// after the next commit.
69    #[serde(skip)]
70    pub graph_replace_required: bool,
71}
72
73impl RuntimeSessionState {
74    pub fn from_snapshot(snapshot: SessionSnapshot) -> Self {
75        let mut state = Self {
76            session_id: snapshot.session_id,
77            policy: snapshot.policy,
78            agent_frames: snapshot.agent_frames,
79            current_agent_frame_id: snapshot.current_agent_frame_id,
80            session_graph: snapshot.session_graph,
81            turn_index: snapshot.turn_index,
82            token_usage: snapshot.token_usage,
83            last_prompt_usage: snapshot.last_prompt_usage,
84            protocol_turn_options: snapshot.protocol_turn_options,
85            tool_state_ref: snapshot.tool_state_ref,
86            tool_state_generation: snapshot.tool_state_generation,
87            tool_state_snapshot: None,
88            plugin_snapshot_ref: snapshot.plugin_snapshot_ref,
89            plugin_snapshot_revision: snapshot.plugin_snapshot_revision,
90            plugin_snapshot: None,
91            execution_state_ref: snapshot.execution_state_ref,
92            execution_state_snapshot: None,
93            token_ledger: snapshot.token_ledger,
94            checkpoint_ref: snapshot.checkpoint_ref,
95            head_revision: None,
96            graph_replace_required: false,
97        };
98        for frame in &mut state.agent_frames {
99            frame.execution_state_snapshot = None;
100        }
101        state.ensure_agent_frame_initialized();
102        state
103    }
104
105    pub fn to_snapshot(&self) -> SessionSnapshot {
106        let mut agent_frames = self.agent_frames.clone();
107        for frame in &mut agent_frames {
108            frame.execution_state_snapshot = None;
109        }
110        SessionSnapshot {
111            session_id: self.session_id.clone(),
112            policy: self.policy.clone(),
113            agent_frames,
114            current_agent_frame_id: self.current_agent_frame_id.clone(),
115            session_graph: self.session_graph.clone(),
116            turn_index: self.turn_index,
117            token_usage: self.token_usage.clone(),
118            last_prompt_usage: self.last_prompt_usage.clone(),
119            protocol_turn_options: self.protocol_turn_options.clone(),
120            tool_state_ref: self.tool_state_ref.clone(),
121            tool_state_generation: self.tool_state_generation,
122            plugin_snapshot_ref: self.plugin_snapshot_ref.clone(),
123            plugin_snapshot_revision: self.plugin_snapshot_revision,
124            execution_state_ref: self.execution_state_ref.clone(),
125            token_ledger: self.token_ledger.clone(),
126            checkpoint_ref: self.checkpoint_ref.clone(),
127        }
128    }
129
130    pub fn apply_snapshot(&mut self, snapshot: &SessionSnapshot) {
131        self.session_id = snapshot.session_id.clone();
132        self.policy = snapshot.policy.clone();
133        self.agent_frames = snapshot.agent_frames.clone();
134        self.current_agent_frame_id = snapshot.current_agent_frame_id.clone();
135        self.ensure_agent_frame_initialized();
136        self.session_graph = snapshot.session_graph.clone();
137        self.turn_index = snapshot.turn_index;
138        self.token_usage = snapshot.token_usage.clone();
139        self.last_prompt_usage = snapshot.last_prompt_usage.clone();
140        self.protocol_turn_options = snapshot.protocol_turn_options.clone();
141        self.tool_state_ref = snapshot.tool_state_ref.clone();
142        self.tool_state_generation = snapshot.tool_state_generation;
143        self.plugin_snapshot_ref = snapshot.plugin_snapshot_ref.clone();
144        self.plugin_snapshot_revision = snapshot.plugin_snapshot_revision;
145        self.execution_state_ref = snapshot.execution_state_ref.clone();
146        self.token_ledger = snapshot.token_ledger.clone();
147        self.checkpoint_ref = snapshot.checkpoint_ref.clone();
148    }
149
150    pub fn stamp_runtime_state(
151        &mut self,
152        tool_state: Option<&crate::ToolState>,
153        plugin_snapshot: Option<&crate::PluginSessionSnapshot>,
154    ) {
155        self.tool_state_snapshot = tool_state.cloned();
156        self.tool_state_generation = tool_state.map(|snapshot| snapshot.generation());
157        self.plugin_snapshot = plugin_snapshot.cloned();
158    }
159
160    pub fn usage_report(&self) -> super::usage::SessionUsageReport {
161        super::usage::SessionUsageReport::from_entries(&self.token_ledger)
162    }
163
164    pub(crate) fn read_model(&self) -> crate::session_graph::SessionReadModel {
165        self.session_graph.read_model_for_agent_frame(
166            &self.current_agent_frame_id,
167            self.current_agent_frame_is_initial(),
168        )
169    }
170
171    pub fn replace_active_read_state(&mut self, messages: &[Message]) {
172        self.session_graph
173            .replace_active_read_state_for_agent_frame(&self.current_agent_frame_id, messages);
174        self.graph_replace_required = false;
175    }
176
177    pub fn append_active_read_delta(&mut self, messages: &[Message]) {
178        self.session_graph
179            .append_active_read_delta_for_agent_frame(&self.current_agent_frame_id, messages);
180    }
181
182    pub fn append_active_conversation_messages(&mut self, messages: &[Message]) {
183        self.session_graph
184            .append_active_conversation_messages_for_agent_frame(
185                &self.current_agent_frame_id,
186                messages,
187            );
188    }
189
190    pub fn read_view(&self) -> crate::SessionReadView {
191        crate::SessionReadView::from_persisted_state(self)
192    }
193
194    pub fn session_graph(&self) -> &crate::SessionGraph {
195        &self.session_graph
196    }
197
198    pub fn policy(&self) -> &SessionPolicy {
199        self.effective_policy()
200    }
201
202    pub fn turn_state(&self) -> PersistedTurnState {
203        PersistedTurnState {
204            turn_index: self.turn_index,
205            token_usage: self.token_usage.clone(),
206            last_prompt_usage: self.last_prompt_usage.clone(),
207            protocol_turn_options: self.protocol_turn_options.clone(),
208        }
209    }
210
211    pub fn token_ledger(&self) -> &[TokenLedgerEntry] {
212        &self.token_ledger
213    }
214
215    pub fn apply_persisted_commit_result(&mut self, result: crate::store::RuntimeCommitResult) {
216        self.head_revision = Some(result.head_revision);
217        self.checkpoint_ref = Some(result.checkpoint_ref);
218        self.tool_state_ref = result.manifest.tool_state_ref;
219        if let Some(snapshot) = self.tool_state_snapshot.as_ref() {
220            self.tool_state_generation = Some(snapshot.generation());
221        } else if self.tool_state_ref.is_none() {
222            self.tool_state_generation = None;
223        }
224        self.plugin_snapshot_ref = result.manifest.plugin_snapshot_ref;
225        self.plugin_snapshot_revision = result.manifest.plugin_snapshot_revision;
226        self.execution_state_ref = result.manifest.execution_state_ref;
227        let execution_state_ref = self.execution_state_ref.clone();
228        if let Some(frame) = self.current_agent_frame_mut() {
229            frame.execution_state_ref = execution_state_ref;
230            frame.execution_state_snapshot = None;
231        }
232        self.graph_replace_required = false;
233        self.tool_state_snapshot = None;
234        self.plugin_snapshot = None;
235        self.execution_state_snapshot = None;
236        if let Some(frame) = self.current_agent_frame_mut() {
237            frame.execution_state_snapshot = None;
238        }
239    }
240
241    pub fn discard_runtime_snapshots(&mut self) {
242        self.tool_state_snapshot = None;
243        self.plugin_snapshot = None;
244        self.execution_state_snapshot = None;
245        if let Some(frame) = self.current_agent_frame_mut() {
246            frame.execution_state_snapshot = None;
247        }
248    }
249
250    pub fn set_execution_state_snapshot(&mut self, execution_state_snapshot: Option<Vec<u8>>) {
251        if execution_state_snapshot.is_none() {
252            self.execution_state_ref = None;
253        }
254        self.execution_state_snapshot = execution_state_snapshot.clone();
255        if let Some(frame) = self.current_agent_frame_mut() {
256            if execution_state_snapshot.is_none() {
257                frame.execution_state_ref = None;
258            }
259            frame.execution_state_snapshot = execution_state_snapshot;
260        }
261    }
262
263    pub fn execution_state_snapshot(&self) -> Option<&[u8]> {
264        self.current_agent_frame()
265            .and_then(|frame| frame.execution_state_snapshot.as_deref())
266            .or(self.execution_state_snapshot.as_deref())
267    }
268
269    pub fn refresh_plugin_snapshots(&mut self, plugins: &crate::PluginSession) {
270        let tool_registry = plugins.tool_registry();
271        let generation = tool_registry.generation();
272        if self.tool_state_ref.is_none() || self.tool_state_generation != Some(generation) {
273            let snapshot = tool_registry.export_state();
274            self.tool_state_generation = Some(snapshot.generation());
275            self.tool_state_snapshot = Some(snapshot);
276        }
277
278        let revision = plugins.snapshot_revision_fingerprint();
279        if self.plugin_snapshot_ref.is_none() || self.plugin_snapshot_revision != Some(revision) {
280            store_plugin_snapshot(&mut self.plugin_snapshot, plugins.snapshot());
281        }
282        self.plugin_snapshot_revision = Some(revision);
283    }
284}
285
286/// Persist a freshly captured plugin snapshot, logging and **retaining the prior
287/// snapshot** when the capture fails.
288///
289/// A failed capture (`Err`) previously collapsed to `None` via `.ok()`, erasing
290/// the last good snapshot — so the next cold rebuild would restore an empty
291/// plugin surface even though a valid snapshot had been captured earlier. Keep
292/// the prior value and surface the error instead.
293pub(crate) fn store_plugin_snapshot(
294    target: &mut Option<crate::PluginSessionSnapshot>,
295    captured: Result<crate::PluginSessionSnapshot, crate::PluginError>,
296) {
297    match captured {
298        Ok(snapshot) => *target = Some(snapshot),
299        Err(err) => tracing::warn!(
300            error = %err,
301            "failed to capture plugin snapshot; retaining the prior snapshot",
302        ),
303    }
304}
305
306impl RuntimeSessionState {
307    pub fn current_agent_frame(&self) -> Option<&crate::AgentFrameRecord> {
308        self.agent_frames
309            .iter()
310            .find(|frame| frame.frame_id == self.current_agent_frame_id)
311    }
312
313    pub fn current_agent_frame_mut(&mut self) -> Option<&mut crate::AgentFrameRecord> {
314        let current_agent_frame_id = self.current_agent_frame_id.clone();
315        self.agent_frames
316            .iter_mut()
317            .find(|frame| frame.frame_id == current_agent_frame_id)
318    }
319
320    pub fn effective_policy(&self) -> &SessionPolicy {
321        self.current_agent_frame()
322            .map(|frame| &frame.assignment.policy)
323            .unwrap_or(&self.policy)
324    }
325
326    pub fn effective_protocol_turn_options(&self) -> &crate::ProtocolTurnOptions {
327        self.current_agent_frame()
328            .map(|frame| &frame.protocol_turn_options)
329            .unwrap_or(&self.protocol_turn_options)
330    }
331
332    pub fn ensure_agent_frame_initialized(&mut self) {
333        if self.current_agent_frame_id.is_empty() {
334            self.current_agent_frame_id = default_agent_frame_id(&self.session_id);
335        }
336        if self
337            .agent_frames
338            .iter()
339            .any(|frame| frame.frame_id == self.current_agent_frame_id)
340        {
341            return;
342        }
343        let mut frame = default_agent_frame(&self.session_id, &self.policy);
344        frame.frame_id = self.current_agent_frame_id.clone();
345        frame.protocol_turn_options = self.protocol_turn_options.clone();
346        frame.execution_state_ref = self.execution_state_ref.clone();
347        frame.execution_state_snapshot = self.execution_state_snapshot.clone();
348        self.agent_frames.push(frame);
349    }
350
351    pub fn reset_initial_agent_frame(
352        &mut self,
353        assignment: crate::AgentFrameAssignment,
354        protocol_turn_options: crate::ProtocolTurnOptions,
355    ) {
356        let frame_id = default_agent_frame_id(&self.session_id);
357        self.policy = assignment.policy.clone();
358        self.protocol_turn_options = protocol_turn_options.clone();
359        self.current_agent_frame_id = frame_id.clone();
360        self.agent_frames = vec![crate::AgentFrameRecord::new(
361            frame_id,
362            self.session_id.clone(),
363            None,
364            crate::AgentFrameReason::Initial,
365            None,
366            assignment,
367            protocol_turn_options,
368        )];
369    }
370
371    pub fn append_agent_frame(&mut self, mut frame: crate::AgentFrameRecord) {
372        let previous_frame_id = self.current_agent_frame_id.clone();
373        for existing in &mut self.agent_frames {
374            if existing.frame_id == previous_frame_id {
375                existing.status = crate::AgentFrameStatus::Superseded;
376            }
377        }
378        if frame.previous_frame_id.is_none() && !previous_frame_id.is_empty() {
379            frame.previous_frame_id = Some(previous_frame_id);
380        }
381        frame.status = crate::AgentFrameStatus::Active;
382        self.policy = frame.assignment.policy.clone();
383        self.protocol_turn_options = frame.protocol_turn_options.clone();
384        self.current_agent_frame_id = frame.frame_id.clone();
385        self.execution_state_ref = frame.execution_state_ref.clone();
386        self.execution_state_snapshot = frame.execution_state_snapshot.clone();
387        self.agent_frames.push(frame);
388    }
389
390    fn current_agent_frame_is_initial(&self) -> bool {
391        self.current_agent_frame()
392            .map(|frame| frame.previous_frame_id.is_none())
393            .unwrap_or(true)
394    }
395}
396
397impl Default for RuntimeSessionState {
398    fn default() -> Self {
399        Self {
400            session_id: "root".to_string(),
401            policy: SessionPolicy::default(),
402            agent_frames: default_agent_frames("root", &SessionPolicy::default()),
403            current_agent_frame_id: default_agent_frame_id("root"),
404            session_graph: crate::SessionGraph::default(),
405            turn_index: 0,
406            token_usage: TokenUsage::default(),
407            last_prompt_usage: None,
408            protocol_turn_options: crate::ProtocolTurnOptions::default(),
409            tool_state_ref: None,
410            tool_state_generation: None,
411            tool_state_snapshot: None,
412            plugin_snapshot_ref: None,
413            plugin_snapshot_revision: None,
414            plugin_snapshot: None,
415            execution_state_ref: None,
416            execution_state_snapshot: None,
417            token_ledger: Vec::new(),
418            checkpoint_ref: None,
419            head_revision: None,
420            graph_replace_required: false,
421        }
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428
429    #[test]
430    fn session_snapshot_serialization_excludes_runtime_only_fields_and_round_trips() {
431        let mut state = RuntimeSessionState {
432            session_id: "snapshot-test".to_string(),
433            policy: SessionPolicy {
434                provider_id: "mock".to_string(),
435                ..SessionPolicy::default()
436            },
437            tool_state_snapshot: Some(crate::ToolState::default()),
438            plugin_snapshot: Some(crate::PluginSessionSnapshot::default()),
439            execution_state_snapshot: Some(vec![1, 2, 3]),
440            head_revision: Some(42),
441            graph_replace_required: true,
442            ..RuntimeSessionState::default()
443        };
444        state.ensure_agent_frame_initialized();
445        if let Some(frame) = state.current_agent_frame_mut() {
446            frame.execution_state_snapshot = Some(vec![4, 5, 6]);
447        }
448
449        let value = serde_json::to_value(state.to_snapshot()).expect("serialize snapshot");
450
451        for runtime_key in [
452            "head_revision",
453            "graph_replace_required",
454            "tool_state_snapshot",
455            "plugin_snapshot",
456            "execution_state_snapshot",
457        ] {
458            assert!(
459                value.get(runtime_key).is_none(),
460                "snapshot unexpectedly exposed {runtime_key}"
461            );
462        }
463        assert!(
464            value["agent_frames"]
465                .as_array()
466                .expect("agent frames")
467                .iter()
468                .all(|frame| frame.get("execution_state_snapshot").is_none())
469        );
470
471        let snapshot: SessionSnapshot = serde_json::from_value(value).expect("round-trip snapshot");
472        let hydrated = RuntimeSessionState::from_snapshot(snapshot);
473
474        assert_eq!(hydrated.session_id, "snapshot-test");
475        assert_eq!(hydrated.policy.recorded_provider_id(), "mock");
476        assert!(hydrated.head_revision.is_none());
477        assert!(!hydrated.graph_replace_required);
478        assert!(hydrated.tool_state_snapshot.is_none());
479        assert!(hydrated.plugin_snapshot.is_none());
480        assert!(hydrated.execution_state_snapshot.is_none());
481        assert!(
482            hydrated
483                .agent_frames
484                .iter()
485                .all(|frame| frame.execution_state_snapshot.is_none())
486        );
487    }
488}
489
490pub(super) fn apply_persisted_session_config(
491    policy: &mut SessionPolicy,
492    config: &crate::PersistedSessionConfig,
493) {
494    policy.model = config.model.clone();
495    policy.provider_id = config.provider_id.clone();
496}
497
498pub(super) fn apply_session_checkpoint(
499    state: &mut RuntimeSessionState,
500    checkpoint: Option<crate::store::HydratedSessionCheckpoint>,
501) {
502    let Some(checkpoint) = checkpoint else {
503        state.tool_state_ref = None;
504        state.tool_state_generation = None;
505        state.tool_state_snapshot = None;
506        state.plugin_snapshot_ref = None;
507        state.plugin_snapshot_revision = None;
508        state.plugin_snapshot = None;
509        state.execution_state_ref = None;
510        state.execution_state_snapshot = None;
511        state.ensure_agent_frame_initialized();
512        return;
513    };
514    state.turn_index = checkpoint.turn_state.turn_index;
515    state.token_usage = checkpoint.turn_state.token_usage;
516    state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
517    state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
518    state.tool_state_ref = checkpoint.tool_state_ref.clone();
519    state.tool_state_generation = checkpoint
520        .tool_state
521        .as_ref()
522        .map(|snapshot| snapshot.generation());
523    state.tool_state_snapshot = checkpoint.tool_state;
524    state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
525    state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
526    state.plugin_snapshot = checkpoint.plugin_snapshot;
527    state.execution_state_ref = checkpoint.execution_state_ref.clone();
528    state.execution_state_snapshot = None;
529    state.ensure_agent_frame_initialized();
530    if let Some(frame) = state.current_agent_frame_mut() {
531        frame.execution_state_ref = checkpoint.execution_state_ref.clone();
532        frame.execution_state_snapshot = checkpoint.execution_state;
533    }
534}
535
536pub(super) fn apply_session_head(
537    state: &mut RuntimeSessionState,
538    head: &crate::store::SessionHead,
539) {
540    state.session_graph = head.graph.clone();
541    state.agent_frames = head.agent_frames.clone();
542    state.current_agent_frame_id = head.current_agent_frame_id.clone();
543    state.checkpoint_ref = head.checkpoint_ref.clone();
544    state.token_ledger = head.token_ledger.clone();
545    state.tool_state_ref = None;
546    state.tool_state_generation = None;
547    state.tool_state_snapshot = None;
548    state.plugin_snapshot_ref = None;
549    state.plugin_snapshot_revision = None;
550    state.plugin_snapshot = None;
551    state.execution_state_ref = None;
552    state.execution_state_snapshot = None;
553    state.ensure_agent_frame_initialized();
554    state.head_revision = Some(head.head_revision);
555    state.graph_replace_required = false;
556    apply_persisted_session_config(&mut state.policy, &head.config);
557}
558
559pub(super) fn append_session_nodes_to_state(
560    state: &mut RuntimeSessionState,
561    nodes: &[crate::SessionAppendNode],
562) -> Vec<String> {
563    let drafts = nodes
564        .iter()
565        .map(session_append_node_draft)
566        .collect::<Vec<_>>();
567    state.ensure_agent_frame_initialized();
568    let node_ids = state
569        .session_graph
570        .append_node_drafts_for_agent_frame(&state.current_agent_frame_id, drafts);
571    normalize_session_graph(state);
572    node_ids
573}
574
575fn session_append_node_draft(
576    node: &crate::SessionAppendNode,
577) -> crate::session_graph::SessionNodeDraft {
578    match node {
579        crate::SessionAppendNode::Message { message, caused_by } => {
580            crate::session_graph::SessionNodeDraft::message(plugin_message_to_message(message))
581                .with_caused_by(caused_by.clone())
582        }
583        crate::SessionAppendNode::ProtocolEvent { event, caused_by } => {
584            crate::session_graph::SessionNodeDraft::protocol_event(event.clone())
585                .with_caused_by(caused_by.clone())
586        }
587        crate::SessionAppendNode::Plugin {
588            plugin_type,
589            body,
590            caused_by,
591        } => crate::session_graph::SessionNodeDraft::plugin(plugin_type.clone(), body.clone())
592            .with_caused_by(caused_by.clone()),
593    }
594}
595
596fn default_agent_frame_id(session_id: &str) -> crate::AgentFrameId {
597    format!("{session_id}:frame:initial")
598}
599
600fn default_agent_frames(session_id: &str, policy: &SessionPolicy) -> Vec<crate::AgentFrameRecord> {
601    vec![default_agent_frame(session_id, policy)]
602}
603
604fn default_agent_frame(session_id: &str, policy: &SessionPolicy) -> crate::AgentFrameRecord {
605    crate::AgentFrameRecord::new(
606        default_agent_frame_id(session_id),
607        session_id.to_string(),
608        None,
609        crate::AgentFrameReason::Initial,
610        None,
611        crate::AgentFrameAssignment::from_policy(policy.clone()),
612        crate::ProtocolTurnOptions::default(),
613    )
614}
615
616/// Heal any graph corruption (orphaned leaf) on load.
617///
618/// Must run BEFORE any residency-based trim (phase-9 feature) because
619/// healing's fallback search relies on having the full node set in RAM.
620/// Under `Residency::ActivePathOnly`, the runtime loads only the active
621/// path; if the leaf doesn't resolve against that reduced set, the
622/// caller falls back to a full `load_session_graph()` + `normalize` +
623/// trim.
624pub(super) fn normalize_session_graph(state: &mut RuntimeSessionState) {
625    if state.session_graph.heal_orphaned_leaf() {
626        state.graph_replace_required = true;
627    }
628}
629
630/// Trim the resident node set according to `Residency`. Called AFTER
631/// `normalize_session_graph` during `from_environment` load. Under
632/// `KeepAll` this is a no-op; under `ActivePathOnly` it replaces the
633/// resident graph with just the active path. Orphans remain on disk —
634/// the host decides whether/when to tombstone + vacuum them via
635/// `LashRuntime::orphaned_node_ids` + the store primitives.
636///
637pub(super) fn apply_residency_on_load(
638    state: &mut RuntimeSessionState,
639    residency: crate::Residency,
640) {
641    match residency {
642        crate::Residency::KeepAll => {}
643        crate::Residency::ActivePathOnly => {
644            state.session_graph = state.session_graph.fork_current_path();
645        }
646    }
647}
648
649#[cfg(test)]
650mod plugin_snapshot_tests {
651    use super::store_plugin_snapshot;
652    use crate::{PluginError, PluginSessionSnapshot};
653
654    #[test]
655    fn ok_capture_overwrites_target() {
656        let mut target = None;
657        store_plugin_snapshot(&mut target, Ok(PluginSessionSnapshot::default()));
658        assert!(target.is_some(), "a successful capture must be stored");
659    }
660
661    #[test]
662    fn failed_capture_retains_prior_snapshot() {
663        // The regression this guards: a failed snapshot capture used to collapse
664        // to `None` via `.ok()`, erasing the last good snapshot so the next cold
665        // rebuild would restore an empty plugin surface. A failure must leave the
666        // prior snapshot intact.
667        let prior = PluginSessionSnapshot::default();
668        let mut target = Some(prior);
669        store_plugin_snapshot(
670            &mut target,
671            Err(PluginError::Snapshot("capture failed".to_string())),
672        );
673        assert!(
674            target.is_some(),
675            "a failed capture must retain the prior snapshot, not erase it"
676        );
677    }
678}
679
680#[cfg(test)]
681mod residency_tests {
682    use super::apply_residency_on_load;
683    use crate::{
684        Message, MessageRole, Part, PartKind, PruneState, Residency, RuntimeSessionState,
685        shared_parts,
686    };
687
688    fn text_message(id: &str, content: &str) -> Message {
689        Message {
690            id: id.to_string(),
691            role: MessageRole::User,
692            parts: shared_parts(vec![Part {
693                id: format!("{id}.p0"),
694                kind: PartKind::Text,
695                content: content.to_string(),
696                attachment: None,
697                tool_call_id: None,
698                tool_name: None,
699                tool_replay: None,
700                prune_state: PruneState::Intact,
701                reasoning_meta: None,
702                response_meta: None,
703            }]),
704            origin: None,
705        }
706    }
707
708    /// Root, an inactive branch off the root, then an active branch off the root.
709    /// Returns the state plus the inactive and active branch node ids.
710    fn branching_state() -> (RuntimeSessionState, String, String) {
711        let mut state = RuntimeSessionState::default();
712        state.append_active_conversation_messages(&[text_message("root", "root")]);
713        let root = state.session_graph.leaf_node_id.clone();
714        state.append_active_conversation_messages(&[text_message("inactive", "inactive branch")]);
715        let inactive_node = state
716            .session_graph
717            .leaf_node_id
718            .clone()
719            .expect("inactive node");
720        state.session_graph.branch_to(root);
721        state.append_active_conversation_messages(&[text_message("active", "active branch")]);
722        let active_node = state
723            .session_graph
724            .leaf_node_id
725            .clone()
726            .expect("active node");
727        (state, inactive_node, active_node)
728    }
729
730    #[test]
731    fn active_path_only_trims_orphan_branches_on_load() {
732        // The durable worker rebuild (and session resume) call this to match the
733        // live runtime's residency. ActivePathOnly drops nodes off the active
734        // path so a rebuilt session does not silently retain the full graph.
735        let (mut state, inactive_node, active_node) = branching_state();
736        assert!(
737            state.session_graph.find_node(&inactive_node).is_some(),
738            "the inactive branch is resident before trimming"
739        );
740        apply_residency_on_load(&mut state, Residency::ActivePathOnly);
741        assert!(
742            state.session_graph.find_node(&inactive_node).is_none(),
743            "ActivePathOnly must drop the orphaned inactive branch on rebuild"
744        );
745        assert!(
746            state.session_graph.find_node(&active_node).is_some(),
747            "the active path must be retained"
748        );
749    }
750
751    #[test]
752    fn keep_all_retains_orphan_branches_on_load() {
753        let (mut state, inactive_node, _active_node) = branching_state();
754        apply_residency_on_load(&mut state, Residency::KeepAll);
755        assert!(
756            state.session_graph.find_node(&inactive_node).is_some(),
757            "KeepAll must retain the full resident graph"
758        );
759    }
760}