Skip to main content

lash_core/runtime/
state.rs

1//! Runtime session state and persistence helpers.
2//!
3//! `RuntimeSessionState` is the runtime-private mutable state shape. Public
4//! host/plugin reads use `SessionSnapshot` from the plugin API instead.
5
6use lash_sansio::PromptUsage;
7
8use crate::session_model::{Message, SessionPolicy, TokenUsage, plugin_message_to_message};
9use crate::{PersistedTurnState, SessionSnapshot};
10
11use super::usage::TokenLedgerEntry;
12
13/// The runtime's view of a session: the persistable snapshot fields
14/// **plus** scratch fields the runtime tracks but never persists
15/// (head-revision CAS guard, pending dirty-write buffers, replace-graph
16/// flag). Public serialization goes through [`RuntimeSessionState::to_snapshot`],
17/// which drops runtime-only fields by construction.
18#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
19pub struct RuntimeSessionState {
20    pub session_id: String,
21    #[serde(default)]
22    pub policy: SessionPolicy,
23    #[serde(default)]
24    pub agent_frames: Vec<crate::AgentFrameRecord>,
25    #[serde(default, skip_serializing_if = "String::is_empty")]
26    pub current_agent_frame_id: crate::AgentFrameId,
27    #[serde(default)]
28    pub session_graph: crate::SessionGraph,
29    #[serde(default)]
30    pub turn_index: usize,
31    #[serde(default)]
32    pub token_usage: TokenUsage,
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub last_prompt_usage: Option<PromptUsage>,
35    #[serde(default)]
36    pub protocol_turn_options: crate::ProtocolTurnOptions,
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub tool_state_ref: Option<crate::store::BlobRef>,
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub tool_state_generation: Option<u64>,
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub tool_state_snapshot: Option<crate::ToolState>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub plugin_snapshot_ref: Option<crate::store::BlobRef>,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub plugin_snapshot_revision: Option<u64>,
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub execution_state_ref: Option<crate::store::BlobRef>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub execution_state_snapshot: Option<Vec<u8>>,
53    /// Cost-accounting ledger. Every LLM call (parent turns, subagent
54    /// children, compaction, observers, background helpers) contributes an
55    /// entry keyed by `(source, model)`. Separate from `token_usage`
56    /// which tracks context-window accounting only.
57    #[serde(default, skip_serializing_if = "Vec::is_empty")]
58    pub token_ledger: Vec<TokenLedgerEntry>,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub checkpoint_ref: Option<crate::store::BlobRef>,
61    /// Store head revision observed by the runtime. Commits use it for
62    /// optimistic concurrency; `None` means the runtime is creating the
63    /// first persisted head.
64    #[serde(skip)]
65    pub head_revision: Option<u64>,
66    /// Signals that the next commit must write the full graph (for example,
67    /// `heal_orphaned_leaf` repaired an invalid leaf). Cleared after the
68    /// next commit.
69    #[serde(skip)]
70    pub graph_replace_required: bool,
71}
72
73impl RuntimeSessionState {
74    pub fn from_snapshot(snapshot: SessionSnapshot) -> Self {
75        let mut state = Self {
76            session_id: snapshot.session_id,
77            policy: snapshot.policy,
78            agent_frames: snapshot.agent_frames,
79            current_agent_frame_id: snapshot.current_agent_frame_id,
80            session_graph: snapshot.session_graph,
81            turn_index: snapshot.turn_index,
82            token_usage: snapshot.token_usage,
83            last_prompt_usage: snapshot.last_prompt_usage,
84            protocol_turn_options: snapshot.protocol_turn_options,
85            tool_state_ref: snapshot.tool_state_ref,
86            tool_state_generation: snapshot.tool_state_generation,
87            tool_state_snapshot: None,
88            plugin_snapshot_ref: snapshot.plugin_snapshot_ref,
89            plugin_snapshot_revision: snapshot.plugin_snapshot_revision,
90            plugin_snapshot: None,
91            execution_state_ref: snapshot.execution_state_ref,
92            execution_state_snapshot: None,
93            token_ledger: snapshot.token_ledger,
94            checkpoint_ref: snapshot.checkpoint_ref,
95            head_revision: None,
96            graph_replace_required: false,
97        };
98        for frame in &mut state.agent_frames {
99            frame.execution_state_snapshot = None;
100        }
101        state.ensure_agent_frame_initialized();
102        state
103    }
104
105    pub fn to_snapshot(&self) -> SessionSnapshot {
106        let mut agent_frames = self.agent_frames.clone();
107        for frame in &mut agent_frames {
108            frame.execution_state_snapshot = None;
109        }
110        SessionSnapshot {
111            session_id: self.session_id.clone(),
112            policy: self.policy.clone(),
113            agent_frames,
114            current_agent_frame_id: self.current_agent_frame_id.clone(),
115            session_graph: self.session_graph.clone(),
116            turn_index: self.turn_index,
117            token_usage: self.token_usage.clone(),
118            last_prompt_usage: self.last_prompt_usage.clone(),
119            protocol_turn_options: self.protocol_turn_options.clone(),
120            tool_state_ref: self.tool_state_ref.clone(),
121            tool_state_generation: self.tool_state_generation,
122            plugin_snapshot_ref: self.plugin_snapshot_ref.clone(),
123            plugin_snapshot_revision: self.plugin_snapshot_revision,
124            execution_state_ref: self.execution_state_ref.clone(),
125            token_ledger: self.token_ledger.clone(),
126            checkpoint_ref: self.checkpoint_ref.clone(),
127        }
128    }
129
130    pub fn apply_snapshot(&mut self, snapshot: &SessionSnapshot) {
131        self.session_id = snapshot.session_id.clone();
132        self.policy = snapshot.policy.clone();
133        self.agent_frames = snapshot.agent_frames.clone();
134        self.current_agent_frame_id = snapshot.current_agent_frame_id.clone();
135        self.ensure_agent_frame_initialized();
136        self.session_graph = snapshot.session_graph.clone();
137        self.turn_index = snapshot.turn_index;
138        self.token_usage = snapshot.token_usage.clone();
139        self.last_prompt_usage = snapshot.last_prompt_usage.clone();
140        self.protocol_turn_options = snapshot.protocol_turn_options.clone();
141        self.tool_state_ref = snapshot.tool_state_ref.clone();
142        self.tool_state_generation = snapshot.tool_state_generation;
143        self.plugin_snapshot_ref = snapshot.plugin_snapshot_ref.clone();
144        self.plugin_snapshot_revision = snapshot.plugin_snapshot_revision;
145        self.execution_state_ref = snapshot.execution_state_ref.clone();
146        self.token_ledger = snapshot.token_ledger.clone();
147        self.checkpoint_ref = snapshot.checkpoint_ref.clone();
148    }
149
150    pub fn stamp_runtime_state(
151        &mut self,
152        tool_state: Option<&crate::ToolState>,
153        plugin_snapshot: Option<&crate::PluginSessionSnapshot>,
154    ) {
155        self.tool_state_snapshot = tool_state.cloned();
156        self.tool_state_generation = tool_state.map(|snapshot| snapshot.generation());
157        self.plugin_snapshot = plugin_snapshot.cloned();
158    }
159
160    pub fn usage_report(&self) -> super::usage::SessionUsageReport {
161        super::usage::SessionUsageReport::from_entries(&self.token_ledger)
162    }
163
164    pub(crate) fn read_model(&self) -> crate::session_graph::SessionReadModel {
165        self.session_graph.read_model_for_agent_frame(
166            &self.current_agent_frame_id,
167            self.current_agent_frame_is_initial(),
168        )
169    }
170
171    pub fn replace_active_read_state(&mut self, messages: &[Message]) {
172        self.session_graph
173            .replace_active_read_state_for_agent_frame(&self.current_agent_frame_id, messages);
174        self.graph_replace_required = false;
175    }
176
177    pub fn append_active_read_delta(&mut self, messages: &[Message]) {
178        self.session_graph
179            .append_active_read_delta_for_agent_frame(&self.current_agent_frame_id, messages);
180    }
181
182    pub fn append_active_conversation_messages(&mut self, messages: &[Message]) {
183        self.session_graph
184            .append_active_conversation_messages_for_agent_frame(
185                &self.current_agent_frame_id,
186                messages,
187            );
188    }
189
190    pub fn read_view(&self) -> crate::SessionReadView {
191        crate::SessionReadView::from_persisted_state(self)
192    }
193
194    pub fn session_graph(&self) -> &crate::SessionGraph {
195        &self.session_graph
196    }
197
198    pub fn policy(&self) -> &SessionPolicy {
199        self.effective_policy()
200    }
201
202    pub fn turn_state(&self) -> PersistedTurnState {
203        PersistedTurnState {
204            turn_index: self.turn_index,
205            token_usage: self.token_usage.clone(),
206            last_prompt_usage: self.last_prompt_usage.clone(),
207            protocol_turn_options: self.protocol_turn_options.clone(),
208        }
209    }
210
211    pub fn token_ledger(&self) -> &[TokenLedgerEntry] {
212        &self.token_ledger
213    }
214
215    pub fn apply_persisted_commit_result(&mut self, result: crate::store::RuntimeCommitResult) {
216        self.head_revision = Some(result.head_revision);
217        self.checkpoint_ref = Some(result.checkpoint_ref);
218        self.tool_state_ref = result.manifest.tool_state_ref;
219        if let Some(snapshot) = self.tool_state_snapshot.as_ref() {
220            self.tool_state_generation = Some(snapshot.generation());
221        } else if self.tool_state_ref.is_none() {
222            self.tool_state_generation = None;
223        }
224        self.plugin_snapshot_ref = result.manifest.plugin_snapshot_ref;
225        self.plugin_snapshot_revision = result.manifest.plugin_snapshot_revision;
226        self.execution_state_ref = result.manifest.execution_state_ref;
227        let execution_state_ref = self.execution_state_ref.clone();
228        if let Some(frame) = self.current_agent_frame_mut() {
229            frame.execution_state_ref = execution_state_ref;
230            frame.execution_state_snapshot = None;
231        }
232        self.graph_replace_required = false;
233        self.tool_state_snapshot = None;
234        self.plugin_snapshot = None;
235        self.execution_state_snapshot = None;
236        if let Some(frame) = self.current_agent_frame_mut() {
237            frame.execution_state_snapshot = None;
238        }
239    }
240
241    pub fn discard_runtime_snapshots(&mut self) {
242        self.tool_state_snapshot = None;
243        self.plugin_snapshot = None;
244        self.execution_state_snapshot = None;
245        if let Some(frame) = self.current_agent_frame_mut() {
246            frame.execution_state_snapshot = None;
247        }
248    }
249
250    pub fn set_execution_state_snapshot(&mut self, execution_state_snapshot: Option<Vec<u8>>) {
251        if execution_state_snapshot.is_none() {
252            self.execution_state_ref = None;
253        }
254        self.execution_state_snapshot = execution_state_snapshot.clone();
255        if let Some(frame) = self.current_agent_frame_mut() {
256            if execution_state_snapshot.is_none() {
257                frame.execution_state_ref = None;
258            }
259            frame.execution_state_snapshot = execution_state_snapshot;
260        }
261    }
262
263    pub fn execution_state_snapshot(&self) -> Option<&[u8]> {
264        self.current_agent_frame()
265            .and_then(|frame| frame.execution_state_snapshot.as_deref())
266            .or(self.execution_state_snapshot.as_deref())
267    }
268
269    pub fn refresh_plugin_snapshots(&mut self, plugins: &crate::PluginSession) {
270        let tool_registry = plugins.tool_registry();
271        let generation = tool_registry.generation();
272        if self.tool_state_ref.is_none() || self.tool_state_generation != Some(generation) {
273            let snapshot = tool_registry.export_state();
274            self.tool_state_generation = Some(snapshot.generation());
275            self.tool_state_snapshot = Some(snapshot);
276        }
277
278        let revision = plugins.snapshot_revision_fingerprint();
279        if self.plugin_snapshot_ref.is_none() || self.plugin_snapshot_revision != Some(revision) {
280            store_plugin_snapshot(&mut self.plugin_snapshot, plugins.snapshot());
281        }
282        self.plugin_snapshot_revision = Some(revision);
283    }
284}
285
286/// Persist a freshly captured plugin snapshot, logging and **retaining the prior
287/// snapshot** when the capture fails.
288///
289/// A failed capture (`Err`) previously collapsed to `None` via `.ok()`, erasing
290/// the last good snapshot — so the next cold rebuild would restore an empty
291/// plugin surface even though a valid snapshot had been captured earlier. Keep
292/// the prior value and surface the error instead.
293pub(crate) fn store_plugin_snapshot(
294    target: &mut Option<crate::PluginSessionSnapshot>,
295    captured: Result<crate::PluginSessionSnapshot, crate::PluginError>,
296) {
297    match captured {
298        Ok(snapshot) => *target = Some(snapshot),
299        Err(err) => tracing::warn!(
300            error = %err,
301            "failed to capture plugin snapshot; retaining the prior snapshot",
302        ),
303    }
304}
305
306impl RuntimeSessionState {
307    pub fn current_agent_frame(&self) -> Option<&crate::AgentFrameRecord> {
308        self.agent_frames
309            .iter()
310            .find(|frame| frame.frame_id == self.current_agent_frame_id)
311    }
312
313    pub fn current_agent_frame_mut(&mut self) -> Option<&mut crate::AgentFrameRecord> {
314        let current_agent_frame_id = self.current_agent_frame_id.clone();
315        self.agent_frames
316            .iter_mut()
317            .find(|frame| frame.frame_id == current_agent_frame_id)
318    }
319
320    pub fn effective_policy(&self) -> &SessionPolicy {
321        self.current_agent_frame()
322            .map(|frame| &frame.assignment.policy)
323            .unwrap_or(&self.policy)
324    }
325
326    pub fn process_execution_env_spec(
327        &self,
328        fallback_policy: &SessionPolicy,
329    ) -> crate::ProcessExecutionEnvSpec {
330        self.current_agent_frame()
331            .map(|frame| {
332                crate::ProcessExecutionEnvSpec::new(
333                    frame.assignment.plugin_options.clone(),
334                    frame.assignment.policy.clone(),
335                )
336            })
337            .unwrap_or_else(|| {
338                crate::ProcessExecutionEnvSpec::new(
339                    crate::PluginOptions::default(),
340                    fallback_policy.clone(),
341                )
342            })
343    }
344
345    pub fn effective_protocol_turn_options(&self) -> &crate::ProtocolTurnOptions {
346        self.current_agent_frame()
347            .map(|frame| &frame.protocol_turn_options)
348            .unwrap_or(&self.protocol_turn_options)
349    }
350
351    pub fn ensure_agent_frame_initialized(&mut self) {
352        if self.current_agent_frame_id.is_empty() {
353            self.current_agent_frame_id = default_agent_frame_id(&self.session_id);
354        }
355        if self
356            .agent_frames
357            .iter()
358            .any(|frame| frame.frame_id == self.current_agent_frame_id)
359        {
360            return;
361        }
362        let mut frame = default_agent_frame(&self.session_id, &self.policy);
363        frame.frame_id = self.current_agent_frame_id.clone();
364        frame.protocol_turn_options = self.protocol_turn_options.clone();
365        frame.execution_state_ref = self.execution_state_ref.clone();
366        frame.execution_state_snapshot = self.execution_state_snapshot.clone();
367        self.agent_frames.push(frame);
368    }
369
370    pub fn reset_initial_agent_frame(
371        &mut self,
372        assignment: crate::AgentFrameAssignment,
373        protocol_turn_options: crate::ProtocolTurnOptions,
374    ) {
375        let frame_id = default_agent_frame_id(&self.session_id);
376        self.policy = assignment.policy.clone();
377        self.protocol_turn_options = protocol_turn_options.clone();
378        self.current_agent_frame_id = frame_id.clone();
379        self.agent_frames = vec![crate::AgentFrameRecord::new(
380            frame_id,
381            self.session_id.clone(),
382            None,
383            crate::AgentFrameReason::initial(),
384            None,
385            assignment,
386            protocol_turn_options,
387        )];
388    }
389
390    pub fn append_agent_frame(&mut self, mut frame: crate::AgentFrameRecord) {
391        let previous_frame_id = self.current_agent_frame_id.clone();
392        for existing in &mut self.agent_frames {
393            if existing.frame_id == previous_frame_id {
394                existing.status = crate::AgentFrameStatus::Superseded;
395            }
396        }
397        if frame.previous_frame_id.is_none() && !previous_frame_id.is_empty() {
398            frame.previous_frame_id = Some(previous_frame_id);
399        }
400        frame.status = crate::AgentFrameStatus::Active;
401        self.policy = frame.assignment.policy.clone();
402        self.protocol_turn_options = frame.protocol_turn_options.clone();
403        self.current_agent_frame_id = frame.frame_id.clone();
404        self.execution_state_ref = frame.execution_state_ref.clone();
405        self.execution_state_snapshot = frame.execution_state_snapshot.clone();
406        self.agent_frames.push(frame);
407    }
408
409    fn current_agent_frame_is_initial(&self) -> bool {
410        self.current_agent_frame()
411            .map(|frame| frame.previous_frame_id.is_none())
412            .unwrap_or(true)
413    }
414}
415
416impl Default for RuntimeSessionState {
417    fn default() -> Self {
418        Self {
419            session_id: "root".to_string(),
420            policy: SessionPolicy::default(),
421            agent_frames: default_agent_frames("root", &SessionPolicy::default()),
422            current_agent_frame_id: default_agent_frame_id("root"),
423            session_graph: crate::SessionGraph::default(),
424            turn_index: 0,
425            token_usage: TokenUsage::default(),
426            last_prompt_usage: None,
427            protocol_turn_options: crate::ProtocolTurnOptions::default(),
428            tool_state_ref: None,
429            tool_state_generation: None,
430            tool_state_snapshot: None,
431            plugin_snapshot_ref: None,
432            plugin_snapshot_revision: None,
433            plugin_snapshot: None,
434            execution_state_ref: None,
435            execution_state_snapshot: None,
436            token_ledger: Vec::new(),
437            checkpoint_ref: None,
438            head_revision: None,
439            graph_replace_required: false,
440        }
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    #[test]
449    fn session_snapshot_serialization_excludes_runtime_only_fields_and_round_trips() {
450        let mut state = RuntimeSessionState {
451            session_id: "snapshot-test".to_string(),
452            policy: SessionPolicy {
453                provider_id: "mock".to_string(),
454                ..SessionPolicy::default()
455            },
456            tool_state_snapshot: Some(crate::ToolState::default()),
457            plugin_snapshot: Some(crate::PluginSessionSnapshot::default()),
458            execution_state_snapshot: Some(vec![1, 2, 3]),
459            head_revision: Some(42),
460            graph_replace_required: true,
461            ..RuntimeSessionState::default()
462        };
463        state.ensure_agent_frame_initialized();
464        if let Some(frame) = state.current_agent_frame_mut() {
465            frame.execution_state_snapshot = Some(vec![4, 5, 6]);
466        }
467
468        let value = serde_json::to_value(state.to_snapshot()).expect("serialize snapshot");
469
470        for runtime_key in [
471            "head_revision",
472            "graph_replace_required",
473            "tool_state_snapshot",
474            "plugin_snapshot",
475            "execution_state_snapshot",
476        ] {
477            assert!(
478                value.get(runtime_key).is_none(),
479                "snapshot unexpectedly exposed {runtime_key}"
480            );
481        }
482        assert!(
483            value["agent_frames"]
484                .as_array()
485                .expect("agent frames")
486                .iter()
487                .all(|frame| frame.get("execution_state_snapshot").is_none())
488        );
489
490        let snapshot: SessionSnapshot = serde_json::from_value(value).expect("round-trip snapshot");
491        let hydrated = RuntimeSessionState::from_snapshot(snapshot);
492
493        assert_eq!(hydrated.session_id, "snapshot-test");
494        assert_eq!(hydrated.policy.recorded_provider_id(), "mock");
495        assert!(hydrated.head_revision.is_none());
496        assert!(!hydrated.graph_replace_required);
497        assert!(hydrated.tool_state_snapshot.is_none());
498        assert!(hydrated.plugin_snapshot.is_none());
499        assert!(hydrated.execution_state_snapshot.is_none());
500        assert!(
501            hydrated
502                .agent_frames
503                .iter()
504                .all(|frame| frame.execution_state_snapshot.is_none())
505        );
506    }
507}
508
509pub(super) fn apply_persisted_session_config(
510    policy: &mut SessionPolicy,
511    config: &crate::PersistedSessionConfig,
512) {
513    policy.model = config.model.clone();
514    policy.provider_id = config.provider_id.clone();
515}
516
517pub(super) fn apply_session_checkpoint(
518    state: &mut RuntimeSessionState,
519    checkpoint: Option<crate::store::HydratedSessionCheckpoint>,
520) {
521    let Some(checkpoint) = checkpoint else {
522        state.tool_state_ref = None;
523        state.tool_state_generation = None;
524        state.tool_state_snapshot = None;
525        state.plugin_snapshot_ref = None;
526        state.plugin_snapshot_revision = None;
527        state.plugin_snapshot = None;
528        state.execution_state_ref = None;
529        state.execution_state_snapshot = None;
530        state.ensure_agent_frame_initialized();
531        return;
532    };
533    state.turn_index = checkpoint.turn_state.turn_index;
534    state.token_usage = checkpoint.turn_state.token_usage;
535    state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
536    state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
537    state.tool_state_ref = checkpoint.tool_state_ref.clone();
538    state.tool_state_generation = checkpoint
539        .tool_state
540        .as_ref()
541        .map(|snapshot| snapshot.generation());
542    state.tool_state_snapshot = checkpoint.tool_state;
543    state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
544    state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
545    state.plugin_snapshot = checkpoint.plugin_snapshot;
546    state.execution_state_ref = checkpoint.execution_state_ref.clone();
547    state.execution_state_snapshot = None;
548    state.ensure_agent_frame_initialized();
549    if let Some(frame) = state.current_agent_frame_mut() {
550        frame.execution_state_ref = checkpoint.execution_state_ref.clone();
551        frame.execution_state_snapshot = checkpoint.execution_state;
552    }
553}
554
555pub(super) fn apply_session_head(
556    state: &mut RuntimeSessionState,
557    head: &crate::store::SessionHead,
558) {
559    state.session_graph = head.graph.clone();
560    state.agent_frames = head.agent_frames.clone();
561    state.current_agent_frame_id = head.current_agent_frame_id.clone();
562    state.checkpoint_ref = head.checkpoint_ref.clone();
563    state.token_ledger = head.token_ledger.clone();
564    state.tool_state_ref = None;
565    state.tool_state_generation = None;
566    state.tool_state_snapshot = None;
567    state.plugin_snapshot_ref = None;
568    state.plugin_snapshot_revision = None;
569    state.plugin_snapshot = None;
570    state.execution_state_ref = None;
571    state.execution_state_snapshot = None;
572    state.ensure_agent_frame_initialized();
573    state.head_revision = Some(head.head_revision);
574    state.graph_replace_required = false;
575    apply_persisted_session_config(&mut state.policy, &head.config);
576}
577
578pub(super) fn append_session_nodes_to_state(
579    state: &mut RuntimeSessionState,
580    nodes: &[crate::SessionAppendNode],
581) -> Vec<String> {
582    let drafts = nodes
583        .iter()
584        .map(session_append_node_draft)
585        .collect::<Vec<_>>();
586    state.ensure_agent_frame_initialized();
587    let node_ids = state
588        .session_graph
589        .append_node_drafts_for_agent_frame(&state.current_agent_frame_id, drafts);
590    normalize_session_graph(state);
591    node_ids
592}
593
594pub(super) fn open_agent_frame_in_state(
595    state: &mut RuntimeSessionState,
596    request: crate::OpenAgentFrameRequest,
597) -> crate::OpenAgentFrameResult {
598    state.ensure_agent_frame_initialized();
599    if request.frame_id.trim().is_empty() || state.current_agent_frame_id == request.frame_id {
600        return crate::OpenAgentFrameResult {
601            frame_id: state.current_agent_frame_id.clone(),
602            opened: false,
603            initial_node_ids: Vec::new(),
604        };
605    }
606
607    let previous = state.current_agent_frame().cloned();
608    let assignment = previous
609        .as_ref()
610        .map(|frame| frame.assignment.clone())
611        .unwrap_or_else(|| crate::AgentFrameAssignment::from_policy(state.policy.clone()));
612    let protocol_turn_options = previous
613        .as_ref()
614        .map(|frame| frame.protocol_turn_options.clone())
615        .unwrap_or_else(|| state.protocol_turn_options.clone());
616    let previous_frame_id = previous.map(|frame| frame.frame_id);
617    state.append_agent_frame(crate::AgentFrameRecord::new(
618        request.frame_id.clone(),
619        state.session_id.clone(),
620        previous_frame_id,
621        request.reason,
622        request.caused_by,
623        assignment,
624        protocol_turn_options,
625    ));
626
627    let initial_node_ids = append_session_nodes_to_state(state, &request.initial_nodes);
628    if !initial_node_ids.is_empty() {
629        state.graph_replace_required = true;
630    }
631    crate::OpenAgentFrameResult {
632        frame_id: state.current_agent_frame_id.clone(),
633        opened: true,
634        initial_node_ids,
635    }
636}
637
638fn session_append_node_draft(
639    node: &crate::SessionAppendNode,
640) -> crate::session_graph::SessionNodeDraft {
641    match node {
642        crate::SessionAppendNode::Message { message, caused_by } => {
643            crate::session_graph::SessionNodeDraft::message(plugin_message_to_message(message))
644                .with_caused_by(caused_by.clone())
645        }
646        crate::SessionAppendNode::ProtocolEvent { event, caused_by } => {
647            crate::session_graph::SessionNodeDraft::protocol_event(event.clone())
648                .with_caused_by(caused_by.clone())
649        }
650        crate::SessionAppendNode::Plugin {
651            plugin_type,
652            body,
653            caused_by,
654        } => crate::session_graph::SessionNodeDraft::plugin(plugin_type.clone(), body.clone())
655            .with_caused_by(caused_by.clone()),
656    }
657}
658
659fn default_agent_frame_id(session_id: &str) -> crate::AgentFrameId {
660    format!("{session_id}:frame:initial")
661}
662
663fn default_agent_frames(session_id: &str, policy: &SessionPolicy) -> Vec<crate::AgentFrameRecord> {
664    vec![default_agent_frame(session_id, policy)]
665}
666
667fn default_agent_frame(session_id: &str, policy: &SessionPolicy) -> crate::AgentFrameRecord {
668    crate::AgentFrameRecord::new(
669        default_agent_frame_id(session_id),
670        session_id.to_string(),
671        None,
672        crate::AgentFrameReason::initial(),
673        None,
674        crate::AgentFrameAssignment::from_policy(policy.clone()),
675        crate::ProtocolTurnOptions::default(),
676    )
677}
678
679/// Heal any graph corruption (orphaned leaf) on load.
680///
681/// Must run BEFORE any residency-based trim (phase-9 feature) because
682/// healing's fallback search relies on having the full node set in RAM.
683/// Under `Residency::ActivePathOnly`, the runtime loads only the active
684/// path; if the leaf doesn't resolve against that reduced set, the
685/// caller falls back to a full `load_session_graph()` + `normalize` +
686/// trim.
687pub(super) fn normalize_session_graph(state: &mut RuntimeSessionState) {
688    if state.session_graph.heal_orphaned_leaf() {
689        state.graph_replace_required = true;
690    }
691}
692
693/// Trim the resident node set according to `Residency`. Called AFTER
694/// `normalize_session_graph` during `from_environment` load. Under
695/// `KeepAll` this is a no-op; under `ActivePathOnly` it replaces the
696/// resident graph with just the active path. Orphans remain on disk —
697/// the host decides whether/when to tombstone + vacuum them via
698/// `LashRuntime::orphaned_node_ids` + the store primitives.
699///
700pub(super) fn apply_residency_on_load(
701    state: &mut RuntimeSessionState,
702    residency: crate::Residency,
703) {
704    match residency {
705        crate::Residency::KeepAll => {}
706        crate::Residency::ActivePathOnly => {
707            state.session_graph = state.session_graph.fork_current_path();
708        }
709    }
710}
711
712#[cfg(test)]
713mod plugin_snapshot_tests {
714    use super::store_plugin_snapshot;
715    use crate::{PluginError, PluginSessionSnapshot};
716
717    #[test]
718    fn ok_capture_overwrites_target() {
719        let mut target = None;
720        store_plugin_snapshot(&mut target, Ok(PluginSessionSnapshot::default()));
721        assert!(target.is_some(), "a successful capture must be stored");
722    }
723
724    #[test]
725    fn failed_capture_retains_prior_snapshot() {
726        // The regression this guards: a failed snapshot capture used to collapse
727        // to `None` via `.ok()`, erasing the last good snapshot so the next cold
728        // rebuild would restore an empty plugin surface. A failure must leave the
729        // prior snapshot intact.
730        let prior = PluginSessionSnapshot::default();
731        let mut target = Some(prior);
732        store_plugin_snapshot(
733            &mut target,
734            Err(PluginError::Snapshot("capture failed".to_string())),
735        );
736        assert!(
737            target.is_some(),
738            "a failed capture must retain the prior snapshot, not erase it"
739        );
740    }
741}
742
743#[cfg(test)]
744mod residency_tests {
745    use super::apply_residency_on_load;
746    use crate::{
747        Message, MessageRole, Part, PartKind, PruneState, Residency, RuntimeSessionState,
748        shared_parts,
749    };
750
751    fn text_message(id: &str, content: &str) -> Message {
752        Message {
753            id: id.to_string(),
754            role: MessageRole::User,
755            parts: shared_parts(vec![Part {
756                id: format!("{id}.p0"),
757                kind: PartKind::Text,
758                content: content.to_string(),
759                attachment: None,
760                tool_call_id: None,
761                tool_name: None,
762                tool_replay: None,
763                prune_state: PruneState::Intact,
764                reasoning_meta: None,
765                response_meta: None,
766            }]),
767            origin: None,
768        }
769    }
770
771    /// Root, an inactive branch off the root, then an active branch off the root.
772    /// Returns the state plus the inactive and active branch node ids.
773    fn branching_state() -> (RuntimeSessionState, String, String) {
774        let mut state = RuntimeSessionState::default();
775        state.append_active_conversation_messages(&[text_message("root", "root")]);
776        let root = state.session_graph.leaf_node_id.clone();
777        state.append_active_conversation_messages(&[text_message("inactive", "inactive branch")]);
778        let inactive_node = state
779            .session_graph
780            .leaf_node_id
781            .clone()
782            .expect("inactive node");
783        state.session_graph.branch_to(root);
784        state.append_active_conversation_messages(&[text_message("active", "active branch")]);
785        let active_node = state
786            .session_graph
787            .leaf_node_id
788            .clone()
789            .expect("active node");
790        (state, inactive_node, active_node)
791    }
792
793    #[test]
794    fn active_path_only_trims_orphan_branches_on_load() {
795        // The durable worker rebuild (and session resume) call this to match the
796        // live runtime's residency. ActivePathOnly drops nodes off the active
797        // path so a rebuilt session does not silently retain the full graph.
798        let (mut state, inactive_node, active_node) = branching_state();
799        assert!(
800            state.session_graph.find_node(&inactive_node).is_some(),
801            "the inactive branch is resident before trimming"
802        );
803        apply_residency_on_load(&mut state, Residency::ActivePathOnly);
804        assert!(
805            state.session_graph.find_node(&inactive_node).is_none(),
806            "ActivePathOnly must drop the orphaned inactive branch on rebuild"
807        );
808        assert!(
809            state.session_graph.find_node(&active_node).is_some(),
810            "the active path must be retained"
811        );
812    }
813
814    #[test]
815    fn keep_all_retains_orphan_branches_on_load() {
816        let (mut state, inactive_node, _active_node) = branching_state();
817        apply_residency_on_load(&mut state, Residency::KeepAll);
818        assert!(
819            state.session_graph.find_node(&inactive_node).is_some(),
820            "KeepAll must retain the full resident graph"
821        );
822    }
823}