Skip to main content

lash_core/runtime/
state.rs

1//! Runtime session state and persistence helpers.
2//!
3//! `RuntimeSessionState` is the runtime-private mutable state shape. Public
4//! host/plugin reads use `SessionSnapshot` from the plugin API instead.
5
6use lash_sansio::PromptUsage;
7
8use crate::session_model::{Message, SessionPolicy, TokenUsage, plugin_message_to_message};
9use crate::{PersistedTurnState, SessionSnapshot};
10
11use super::usage::TokenLedgerEntry;
12
13/// The runtime's view of a session: the persistable snapshot fields
14/// **plus** scratch fields the runtime tracks but never persists
15/// (head-revision CAS guard, pending dirty-write buffers, replace-graph
16/// flag). Public serialization goes through [`RuntimeSessionState::to_snapshot`],
17/// which drops runtime-only fields by construction.
18#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
19pub struct RuntimeSessionState {
20    pub session_id: String,
21    #[serde(default)]
22    pub policy: SessionPolicy,
23    #[serde(default)]
24    pub agent_frames: Vec<crate::AgentFrameRecord>,
25    #[serde(default, skip_serializing_if = "String::is_empty")]
26    pub current_agent_frame_id: crate::AgentFrameId,
27    #[serde(default)]
28    pub session_graph: crate::SessionGraph,
29    #[serde(default)]
30    pub turn_index: usize,
31    #[serde(default)]
32    pub token_usage: TokenUsage,
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub last_prompt_usage: Option<PromptUsage>,
35    #[serde(default)]
36    pub protocol_turn_options: crate::ProtocolTurnOptions,
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub tool_state_ref: Option<crate::store::BlobRef>,
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub tool_state_generation: Option<u64>,
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub tool_state_snapshot: Option<crate::ToolState>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub plugin_snapshot_ref: Option<crate::store::BlobRef>,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub plugin_snapshot_revision: Option<u64>,
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub execution_state_ref: Option<crate::store::BlobRef>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub execution_state_snapshot: Option<Vec<u8>>,
53    /// Cost-accounting ledger. Every LLM call (parent turns, subagent
54    /// children, compaction, observers, background helpers) contributes an
55    /// entry keyed by `(source, model)`. Separate from `token_usage`
56    /// which tracks context-window accounting only.
57    #[serde(default, skip_serializing_if = "Vec::is_empty")]
58    pub token_ledger: Vec<TokenLedgerEntry>,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub checkpoint_ref: Option<crate::store::BlobRef>,
61    /// Store head revision observed by the runtime. Commits use it for
62    /// optimistic concurrency; `None` means the runtime is creating the
63    /// first persisted head.
64    #[serde(skip)]
65    pub head_revision: Option<u64>,
66    /// Signals that the next commit must write the full graph (for example,
67    /// `heal_orphaned_leaf` repaired an invalid leaf). Cleared after the
68    /// next commit.
69    #[serde(skip)]
70    pub graph_replace_required: bool,
71}
72
73impl RuntimeSessionState {
74    pub fn from_snapshot(snapshot: SessionSnapshot) -> Self {
75        let mut state = Self {
76            session_id: snapshot.session_id,
77            policy: snapshot.policy,
78            agent_frames: snapshot.agent_frames,
79            current_agent_frame_id: snapshot.current_agent_frame_id,
80            session_graph: snapshot.session_graph,
81            turn_index: snapshot.turn_index,
82            token_usage: snapshot.token_usage,
83            last_prompt_usage: snapshot.last_prompt_usage,
84            protocol_turn_options: snapshot.protocol_turn_options,
85            tool_state_ref: snapshot.tool_state_ref,
86            tool_state_generation: snapshot.tool_state_generation,
87            tool_state_snapshot: None,
88            plugin_snapshot_ref: snapshot.plugin_snapshot_ref,
89            plugin_snapshot_revision: snapshot.plugin_snapshot_revision,
90            plugin_snapshot: None,
91            execution_state_ref: snapshot.execution_state_ref,
92            execution_state_snapshot: None,
93            token_ledger: snapshot.token_ledger,
94            checkpoint_ref: snapshot.checkpoint_ref,
95            head_revision: None,
96            graph_replace_required: false,
97        };
98        for frame in &mut state.agent_frames {
99            frame.execution_state_snapshot = None;
100        }
101        state.ensure_agent_frame_initialized();
102        state
103    }
104
105    pub fn to_snapshot(&self) -> SessionSnapshot {
106        let mut agent_frames = self.agent_frames.clone();
107        for frame in &mut agent_frames {
108            frame.execution_state_snapshot = None;
109        }
110        SessionSnapshot {
111            session_id: self.session_id.clone(),
112            policy: self.policy.clone(),
113            agent_frames,
114            current_agent_frame_id: self.current_agent_frame_id.clone(),
115            session_graph: self.session_graph.clone(),
116            turn_index: self.turn_index,
117            token_usage: self.token_usage.clone(),
118            last_prompt_usage: self.last_prompt_usage.clone(),
119            protocol_turn_options: self.protocol_turn_options.clone(),
120            tool_state_ref: self.tool_state_ref.clone(),
121            tool_state_generation: self.tool_state_generation,
122            plugin_snapshot_ref: self.plugin_snapshot_ref.clone(),
123            plugin_snapshot_revision: self.plugin_snapshot_revision,
124            execution_state_ref: self.execution_state_ref.clone(),
125            token_ledger: self.token_ledger.clone(),
126            checkpoint_ref: self.checkpoint_ref.clone(),
127        }
128    }
129
130    pub fn apply_snapshot(&mut self, snapshot: &SessionSnapshot) {
131        self.session_id = snapshot.session_id.clone();
132        self.policy = snapshot.policy.clone();
133        self.agent_frames = snapshot.agent_frames.clone();
134        self.current_agent_frame_id = snapshot.current_agent_frame_id.clone();
135        self.ensure_agent_frame_initialized();
136        self.session_graph = snapshot.session_graph.clone();
137        self.turn_index = snapshot.turn_index;
138        self.token_usage = snapshot.token_usage.clone();
139        self.last_prompt_usage = snapshot.last_prompt_usage.clone();
140        self.protocol_turn_options = snapshot.protocol_turn_options.clone();
141        self.tool_state_ref = snapshot.tool_state_ref.clone();
142        self.tool_state_generation = snapshot.tool_state_generation;
143        self.plugin_snapshot_ref = snapshot.plugin_snapshot_ref.clone();
144        self.plugin_snapshot_revision = snapshot.plugin_snapshot_revision;
145        self.execution_state_ref = snapshot.execution_state_ref.clone();
146        self.token_ledger = snapshot.token_ledger.clone();
147        self.checkpoint_ref = snapshot.checkpoint_ref.clone();
148    }
149
150    pub fn stamp_runtime_state(
151        &mut self,
152        tool_state: Option<&crate::ToolState>,
153        plugin_snapshot: Option<&crate::PluginSessionSnapshot>,
154    ) {
155        self.tool_state_snapshot = tool_state.cloned();
156        self.tool_state_generation = tool_state.map(|snapshot| snapshot.generation());
157        self.plugin_snapshot = plugin_snapshot.cloned();
158    }
159
160    pub fn usage_report(&self) -> super::usage::SessionUsageReport {
161        super::usage::SessionUsageReport::from_entries(&self.token_ledger)
162    }
163
164    pub(crate) fn read_model(&self) -> crate::session_graph::SessionReadModel {
165        self.session_graph.read_model_for_agent_frame(
166            &self.current_agent_frame_id,
167            self.current_agent_frame_is_initial(),
168        )
169    }
170
171    pub fn replace_active_read_state(&mut self, messages: &[Message]) {
172        self.session_graph
173            .replace_active_read_state_for_agent_frame(&self.current_agent_frame_id, messages);
174        self.graph_replace_required = false;
175    }
176
177    pub fn append_active_read_delta(&mut self, messages: &[Message]) {
178        self.session_graph
179            .append_active_read_delta_for_agent_frame(&self.current_agent_frame_id, messages);
180    }
181
182    pub fn append_active_conversation_messages(&mut self, messages: &[Message]) {
183        self.session_graph
184            .append_active_conversation_messages_for_agent_frame(
185                &self.current_agent_frame_id,
186                messages,
187            );
188    }
189
190    pub(crate) fn append_active_conversation_messages_with_clock(
191        &mut self,
192        messages: &[Message],
193        clock: &dyn crate::Clock,
194    ) {
195        self.session_graph
196            .append_active_conversation_messages_for_agent_frame_at(
197                &self.current_agent_frame_id,
198                messages,
199                clock.timestamp_rfc3339(),
200            );
201    }
202
203    pub fn read_view(&self) -> crate::SessionReadView {
204        crate::SessionReadView::from_persisted_state(self)
205    }
206
207    pub fn session_graph(&self) -> &crate::SessionGraph {
208        &self.session_graph
209    }
210
211    pub fn policy(&self) -> &SessionPolicy {
212        self.effective_policy()
213    }
214
215    pub fn turn_state(&self) -> PersistedTurnState {
216        PersistedTurnState {
217            turn_index: self.turn_index,
218            token_usage: self.token_usage.clone(),
219            last_prompt_usage: self.last_prompt_usage.clone(),
220            protocol_turn_options: self.protocol_turn_options.clone(),
221        }
222    }
223
224    pub fn token_ledger(&self) -> &[TokenLedgerEntry] {
225        &self.token_ledger
226    }
227
228    pub fn apply_persisted_commit_result(&mut self, result: crate::store::RuntimeCommitResult) {
229        self.head_revision = Some(result.head_revision);
230        self.checkpoint_ref = Some(result.checkpoint_ref);
231        self.tool_state_ref = result.manifest.tool_state_ref;
232        if let Some(snapshot) = self.tool_state_snapshot.as_ref() {
233            self.tool_state_generation = Some(snapshot.generation());
234        } else if self.tool_state_ref.is_none() {
235            self.tool_state_generation = None;
236        }
237        self.plugin_snapshot_ref = result.manifest.plugin_snapshot_ref;
238        self.plugin_snapshot_revision = result.manifest.plugin_snapshot_revision;
239        self.execution_state_ref = result.manifest.execution_state_ref;
240        let execution_state_ref = self.execution_state_ref.clone();
241        if let Some(frame) = self.current_agent_frame_mut() {
242            frame.execution_state_ref = execution_state_ref;
243            frame.execution_state_snapshot = None;
244        }
245        self.graph_replace_required = false;
246        self.tool_state_snapshot = None;
247        self.plugin_snapshot = None;
248        self.execution_state_snapshot = None;
249        if let Some(frame) = self.current_agent_frame_mut() {
250            frame.execution_state_snapshot = None;
251        }
252    }
253
254    pub fn discard_runtime_snapshots(&mut self) {
255        self.tool_state_snapshot = None;
256        self.plugin_snapshot = None;
257        self.execution_state_snapshot = None;
258        if let Some(frame) = self.current_agent_frame_mut() {
259            frame.execution_state_snapshot = None;
260        }
261    }
262
263    pub fn set_execution_state_snapshot(&mut self, execution_state_snapshot: Option<Vec<u8>>) {
264        if execution_state_snapshot.is_none() {
265            self.execution_state_ref = None;
266        }
267        self.execution_state_snapshot = execution_state_snapshot.clone();
268        if let Some(frame) = self.current_agent_frame_mut() {
269            if execution_state_snapshot.is_none() {
270                frame.execution_state_ref = None;
271            }
272            frame.execution_state_snapshot = execution_state_snapshot;
273        }
274    }
275
276    pub fn execution_state_snapshot(&self) -> Option<&[u8]> {
277        self.current_agent_frame()
278            .and_then(|frame| frame.execution_state_snapshot.as_deref())
279            .or(self.execution_state_snapshot.as_deref())
280    }
281
282    pub fn refresh_plugin_snapshots(&mut self, plugins: &crate::PluginSession) {
283        let tool_registry = plugins.tool_registry();
284        let generation = tool_registry.generation();
285        if self.tool_state_ref.is_none() || self.tool_state_generation != Some(generation) {
286            let snapshot = tool_registry.export_state();
287            self.tool_state_generation = Some(snapshot.generation());
288            self.tool_state_snapshot = Some(snapshot);
289        }
290
291        let revision = plugins.snapshot_revision_fingerprint();
292        if self.plugin_snapshot_ref.is_none() || self.plugin_snapshot_revision != Some(revision) {
293            store_plugin_snapshot(&mut self.plugin_snapshot, plugins.snapshot());
294        }
295        self.plugin_snapshot_revision = Some(revision);
296    }
297}
298
299/// Persist a freshly captured plugin snapshot, logging and **retaining the prior
300/// snapshot** when the capture fails.
301///
302/// A failed capture (`Err`) previously collapsed to `None` via `.ok()`, erasing
303/// the last good snapshot — so the next cold rebuild would restore an empty
304/// plugin surface even though a valid snapshot had been captured earlier. Keep
305/// the prior value and surface the error instead.
306pub(crate) fn store_plugin_snapshot(
307    target: &mut Option<crate::PluginSessionSnapshot>,
308    captured: Result<crate::PluginSessionSnapshot, crate::PluginError>,
309) {
310    match captured {
311        Ok(snapshot) => *target = Some(snapshot),
312        Err(err) => tracing::warn!(
313            error = %err,
314            "failed to capture plugin snapshot; retaining the prior snapshot",
315        ),
316    }
317}
318
319impl RuntimeSessionState {
320    pub fn current_agent_frame(&self) -> Option<&crate::AgentFrameRecord> {
321        self.agent_frames
322            .iter()
323            .find(|frame| frame.frame_id == self.current_agent_frame_id)
324    }
325
326    pub fn current_agent_frame_mut(&mut self) -> Option<&mut crate::AgentFrameRecord> {
327        let current_agent_frame_id = self.current_agent_frame_id.clone();
328        self.agent_frames
329            .iter_mut()
330            .find(|frame| frame.frame_id == current_agent_frame_id)
331    }
332
333    pub fn effective_policy(&self) -> &SessionPolicy {
334        self.current_agent_frame()
335            .map(|frame| &frame.assignment.policy)
336            .unwrap_or(&self.policy)
337    }
338
339    pub fn process_execution_env_spec(
340        &self,
341        fallback_policy: &SessionPolicy,
342    ) -> crate::ProcessExecutionEnvSpec {
343        self.current_agent_frame()
344            .map(|frame| {
345                crate::ProcessExecutionEnvSpec::new(
346                    frame.assignment.plugin_options.clone(),
347                    frame.assignment.policy.clone(),
348                )
349            })
350            .unwrap_or_else(|| {
351                crate::ProcessExecutionEnvSpec::new(
352                    crate::PluginOptions::default(),
353                    fallback_policy.clone(),
354                )
355            })
356    }
357
358    pub fn effective_protocol_turn_options(&self) -> &crate::ProtocolTurnOptions {
359        self.current_agent_frame()
360            .map(|frame| &frame.protocol_turn_options)
361            .unwrap_or(&self.protocol_turn_options)
362    }
363
364    pub fn ensure_agent_frame_initialized(&mut self) {
365        self.ensure_agent_frame_initialized_with_clock(&crate::SystemClock);
366    }
367
368    pub fn ensure_agent_frame_initialized_with_clock(&mut self, clock: &dyn crate::Clock) {
369        if self.current_agent_frame_id.is_empty() {
370            self.current_agent_frame_id = default_agent_frame_id(&self.session_id);
371        }
372        if self
373            .agent_frames
374            .iter()
375            .any(|frame| frame.frame_id == self.current_agent_frame_id)
376        {
377            return;
378        }
379        let mut frame = default_agent_frame_with_clock(&self.session_id, &self.policy, clock);
380        frame.frame_id = self.current_agent_frame_id.clone();
381        frame.protocol_turn_options = self.protocol_turn_options.clone();
382        frame.execution_state_ref = self.execution_state_ref.clone();
383        frame.execution_state_snapshot = self.execution_state_snapshot.clone();
384        self.agent_frames.push(frame);
385    }
386
387    pub fn reset_initial_agent_frame(
388        &mut self,
389        assignment: crate::AgentFrameAssignment,
390        protocol_turn_options: crate::ProtocolTurnOptions,
391    ) {
392        self.reset_initial_agent_frame_with_clock(
393            assignment,
394            protocol_turn_options,
395            &crate::SystemClock,
396        );
397    }
398
399    pub fn reset_initial_agent_frame_with_clock(
400        &mut self,
401        assignment: crate::AgentFrameAssignment,
402        protocol_turn_options: crate::ProtocolTurnOptions,
403        clock: &dyn crate::Clock,
404    ) {
405        let frame_id = default_agent_frame_id(&self.session_id);
406        self.policy = assignment.policy.clone();
407        self.protocol_turn_options = protocol_turn_options.clone();
408        self.current_agent_frame_id = frame_id.clone();
409        self.agent_frames = vec![crate::AgentFrameRecord::new_at(
410            frame_id,
411            self.session_id.clone(),
412            None,
413            crate::AgentFrameReason::initial(),
414            None,
415            assignment,
416            protocol_turn_options,
417            clock.timestamp_rfc3339(),
418        )];
419    }
420
421    pub fn append_agent_frame(&mut self, mut frame: crate::AgentFrameRecord) {
422        let previous_frame_id = self.current_agent_frame_id.clone();
423        for existing in &mut self.agent_frames {
424            if existing.frame_id == previous_frame_id {
425                existing.status = crate::AgentFrameStatus::Superseded;
426            }
427        }
428        if frame.previous_frame_id.is_none() && !previous_frame_id.is_empty() {
429            frame.previous_frame_id = Some(previous_frame_id);
430        }
431        frame.status = crate::AgentFrameStatus::Active;
432        self.policy = frame.assignment.policy.clone();
433        self.protocol_turn_options = frame.protocol_turn_options.clone();
434        self.current_agent_frame_id = frame.frame_id.clone();
435        self.execution_state_ref = frame.execution_state_ref.clone();
436        self.execution_state_snapshot = frame.execution_state_snapshot.clone();
437        self.agent_frames.push(frame);
438    }
439
440    fn current_agent_frame_is_initial(&self) -> bool {
441        self.current_agent_frame()
442            .map(|frame| frame.previous_frame_id.is_none())
443            .unwrap_or(true)
444    }
445}
446
447impl Default for RuntimeSessionState {
448    fn default() -> Self {
449        Self {
450            session_id: "root".to_string(),
451            policy: SessionPolicy::default(),
452            agent_frames: default_agent_frames("root", &SessionPolicy::default()),
453            current_agent_frame_id: default_agent_frame_id("root"),
454            session_graph: crate::SessionGraph::default(),
455            turn_index: 0,
456            token_usage: TokenUsage::default(),
457            last_prompt_usage: None,
458            protocol_turn_options: crate::ProtocolTurnOptions::default(),
459            tool_state_ref: None,
460            tool_state_generation: None,
461            tool_state_snapshot: None,
462            plugin_snapshot_ref: None,
463            plugin_snapshot_revision: None,
464            plugin_snapshot: None,
465            execution_state_ref: None,
466            execution_state_snapshot: None,
467            token_ledger: Vec::new(),
468            checkpoint_ref: None,
469            head_revision: None,
470            graph_replace_required: false,
471        }
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478
479    #[test]
480    fn session_snapshot_serialization_excludes_runtime_only_fields_and_round_trips() {
481        let mut state = RuntimeSessionState {
482            session_id: "snapshot-test".to_string(),
483            policy: SessionPolicy {
484                provider_id: "mock".to_string(),
485                ..SessionPolicy::default()
486            },
487            tool_state_snapshot: Some(crate::ToolState::default()),
488            plugin_snapshot: Some(crate::PluginSessionSnapshot::default()),
489            execution_state_snapshot: Some(vec![1, 2, 3]),
490            head_revision: Some(42),
491            graph_replace_required: true,
492            ..RuntimeSessionState::default()
493        };
494        state.ensure_agent_frame_initialized();
495        if let Some(frame) = state.current_agent_frame_mut() {
496            frame.execution_state_snapshot = Some(vec![4, 5, 6]);
497        }
498
499        let value = serde_json::to_value(state.to_snapshot()).expect("serialize snapshot");
500
501        for runtime_key in [
502            "head_revision",
503            "graph_replace_required",
504            "tool_state_snapshot",
505            "plugin_snapshot",
506            "execution_state_snapshot",
507        ] {
508            assert!(
509                value.get(runtime_key).is_none(),
510                "snapshot unexpectedly exposed {runtime_key}"
511            );
512        }
513        assert!(
514            value["agent_frames"]
515                .as_array()
516                .expect("agent frames")
517                .iter()
518                .all(|frame| frame.get("execution_state_snapshot").is_none())
519        );
520
521        let snapshot: SessionSnapshot = serde_json::from_value(value).expect("round-trip snapshot");
522        let hydrated = RuntimeSessionState::from_snapshot(snapshot);
523
524        assert_eq!(hydrated.session_id, "snapshot-test");
525        assert_eq!(hydrated.policy.recorded_provider_id(), "mock");
526        assert!(hydrated.head_revision.is_none());
527        assert!(!hydrated.graph_replace_required);
528        assert!(hydrated.tool_state_snapshot.is_none());
529        assert!(hydrated.plugin_snapshot.is_none());
530        assert!(hydrated.execution_state_snapshot.is_none());
531        assert!(
532            hydrated
533                .agent_frames
534                .iter()
535                .all(|frame| frame.execution_state_snapshot.is_none())
536        );
537    }
538}
539
540pub(super) fn apply_persisted_session_config(
541    policy: &mut SessionPolicy,
542    config: &crate::PersistedSessionConfig,
543) {
544    policy.model = config.model.clone();
545    policy.provider_id = config.provider_id.clone();
546}
547
548pub(super) fn apply_session_checkpoint(
549    state: &mut RuntimeSessionState,
550    checkpoint: Option<crate::store::HydratedSessionCheckpoint>,
551) {
552    let Some(checkpoint) = checkpoint else {
553        state.tool_state_ref = None;
554        state.tool_state_generation = None;
555        state.tool_state_snapshot = None;
556        state.plugin_snapshot_ref = None;
557        state.plugin_snapshot_revision = None;
558        state.plugin_snapshot = None;
559        state.execution_state_ref = None;
560        state.execution_state_snapshot = None;
561        state.ensure_agent_frame_initialized();
562        return;
563    };
564    state.turn_index = checkpoint.turn_state.turn_index;
565    state.token_usage = checkpoint.turn_state.token_usage;
566    state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
567    state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
568    state.tool_state_ref = checkpoint.tool_state_ref.clone();
569    state.tool_state_generation = checkpoint
570        .tool_state
571        .as_ref()
572        .map(|snapshot| snapshot.generation());
573    state.tool_state_snapshot = checkpoint.tool_state;
574    state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
575    state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
576    state.plugin_snapshot = checkpoint.plugin_snapshot;
577    state.execution_state_ref = checkpoint.execution_state_ref.clone();
578    state.execution_state_snapshot = None;
579    state.ensure_agent_frame_initialized();
580    if let Some(frame) = state.current_agent_frame_mut() {
581        frame.execution_state_ref = checkpoint.execution_state_ref.clone();
582        frame.execution_state_snapshot = checkpoint.execution_state;
583    }
584}
585
586pub(super) fn apply_session_head(
587    state: &mut RuntimeSessionState,
588    head: &crate::store::SessionHead,
589) {
590    state.session_graph = head.graph.clone();
591    state.agent_frames = head.agent_frames.clone();
592    state.current_agent_frame_id = head.current_agent_frame_id.clone();
593    state.checkpoint_ref = head.checkpoint_ref.clone();
594    state.token_ledger = head.token_ledger.clone();
595    state.tool_state_ref = None;
596    state.tool_state_generation = None;
597    state.tool_state_snapshot = None;
598    state.plugin_snapshot_ref = None;
599    state.plugin_snapshot_revision = None;
600    state.plugin_snapshot = None;
601    state.execution_state_ref = None;
602    state.execution_state_snapshot = None;
603    state.ensure_agent_frame_initialized();
604    state.head_revision = Some(head.head_revision);
605    state.graph_replace_required = false;
606    apply_persisted_session_config(&mut state.policy, &head.config);
607}
608
609pub(super) fn append_session_nodes_to_state_with_clock(
610    state: &mut RuntimeSessionState,
611    nodes: &[crate::SessionAppendNode],
612    clock: &dyn crate::Clock,
613) -> Vec<String> {
614    let drafts = nodes
615        .iter()
616        .map(session_append_node_draft)
617        .collect::<Vec<_>>();
618    state.ensure_agent_frame_initialized_with_clock(clock);
619    let node_ids = state.session_graph.append_node_drafts_for_agent_frame_at(
620        &state.current_agent_frame_id,
621        drafts,
622        clock.timestamp_rfc3339(),
623    );
624    normalize_session_graph(state);
625    node_ids
626}
627
628pub(super) fn open_agent_frame_in_state_with_clock(
629    state: &mut RuntimeSessionState,
630    request: crate::OpenAgentFrameRequest,
631    clock: &dyn crate::Clock,
632) -> crate::OpenAgentFrameResult {
633    state.ensure_agent_frame_initialized_with_clock(clock);
634    if request.frame_id.trim().is_empty() || state.current_agent_frame_id == request.frame_id {
635        return crate::OpenAgentFrameResult {
636            frame_id: state.current_agent_frame_id.clone(),
637            opened: false,
638            initial_node_ids: Vec::new(),
639        };
640    }
641
642    let previous = state.current_agent_frame().cloned();
643    let assignment = previous
644        .as_ref()
645        .map(|frame| frame.assignment.clone())
646        .unwrap_or_else(|| crate::AgentFrameAssignment::from_policy(state.policy.clone()));
647    let protocol_turn_options = previous
648        .as_ref()
649        .map(|frame| frame.protocol_turn_options.clone())
650        .unwrap_or_else(|| state.protocol_turn_options.clone());
651    let previous_frame_id = previous.map(|frame| frame.frame_id);
652    state.append_agent_frame(crate::AgentFrameRecord::new_at(
653        request.frame_id.clone(),
654        state.session_id.clone(),
655        previous_frame_id,
656        request.reason,
657        request.caused_by,
658        assignment,
659        protocol_turn_options,
660        clock.timestamp_rfc3339(),
661    ));
662
663    let initial_node_ids =
664        append_session_nodes_to_state_with_clock(state, &request.initial_nodes, clock);
665    if !initial_node_ids.is_empty() {
666        state.graph_replace_required = true;
667    }
668    crate::OpenAgentFrameResult {
669        frame_id: state.current_agent_frame_id.clone(),
670        opened: true,
671        initial_node_ids,
672    }
673}
674
675fn session_append_node_draft(
676    node: &crate::SessionAppendNode,
677) -> crate::session_graph::SessionNodeDraft {
678    match node {
679        crate::SessionAppendNode::Message { message, caused_by } => {
680            crate::session_graph::SessionNodeDraft::message(plugin_message_to_message(message))
681                .with_caused_by(caused_by.clone())
682        }
683        crate::SessionAppendNode::ProtocolEvent { event, caused_by } => {
684            crate::session_graph::SessionNodeDraft::protocol_event(event.clone())
685                .with_caused_by(caused_by.clone())
686        }
687        crate::SessionAppendNode::Plugin {
688            plugin_type,
689            body,
690            caused_by,
691        } => crate::session_graph::SessionNodeDraft::plugin(plugin_type.clone(), body.clone())
692            .with_caused_by(caused_by.clone()),
693    }
694}
695
696fn default_agent_frame_id(session_id: &str) -> crate::AgentFrameId {
697    format!("{session_id}:frame:initial")
698}
699
700fn default_agent_frames(session_id: &str, policy: &SessionPolicy) -> Vec<crate::AgentFrameRecord> {
701    vec![default_agent_frame(session_id, policy)]
702}
703
704fn default_agent_frame(session_id: &str, policy: &SessionPolicy) -> crate::AgentFrameRecord {
705    default_agent_frame_with_clock(session_id, policy, &crate::SystemClock)
706}
707
708fn default_agent_frame_with_clock(
709    session_id: &str,
710    policy: &SessionPolicy,
711    clock: &dyn crate::Clock,
712) -> crate::AgentFrameRecord {
713    crate::AgentFrameRecord::new_at(
714        default_agent_frame_id(session_id),
715        session_id.to_string(),
716        None,
717        crate::AgentFrameReason::initial(),
718        None,
719        crate::AgentFrameAssignment::from_policy(policy.clone()),
720        crate::ProtocolTurnOptions::default(),
721        clock.timestamp_rfc3339(),
722    )
723}
724
725/// Heal any graph corruption (orphaned leaf) on load.
726///
727/// Must run BEFORE any residency-based trim (phase-9 feature) because
728/// healing's fallback search relies on having the full node set in RAM.
729/// Under `Residency::ActivePathOnly`, the runtime loads only the active
730/// path; if the leaf doesn't resolve against that reduced set, the
731/// caller falls back to a full `load_session_graph()` + `normalize` +
732/// trim.
733pub(super) fn normalize_session_graph(state: &mut RuntimeSessionState) {
734    if state.session_graph.heal_orphaned_leaf() {
735        state.graph_replace_required = true;
736    }
737}
738
739/// Trim the resident node set according to `Residency`. Called AFTER
740/// `normalize_session_graph` during `from_environment` load. Under
741/// `KeepAll` this is a no-op; under `ActivePathOnly` it replaces the
742/// resident graph with just the active path. Orphans remain on disk —
743/// the host decides whether/when to tombstone + vacuum them via
744/// `LashRuntime::orphaned_node_ids` + the store primitives.
745///
746pub(super) fn apply_residency_on_load(
747    state: &mut RuntimeSessionState,
748    residency: crate::Residency,
749) {
750    match residency {
751        crate::Residency::KeepAll => {}
752        crate::Residency::ActivePathOnly => {
753            state.session_graph = state.session_graph.fork_current_path();
754        }
755    }
756}
757
758#[cfg(test)]
759mod plugin_snapshot_tests {
760    use super::store_plugin_snapshot;
761    use crate::{PluginError, PluginSessionSnapshot};
762
763    #[test]
764    fn ok_capture_overwrites_target() {
765        let mut target = None;
766        store_plugin_snapshot(&mut target, Ok(PluginSessionSnapshot::default()));
767        assert!(target.is_some(), "a successful capture must be stored");
768    }
769
770    #[test]
771    fn failed_capture_retains_prior_snapshot() {
772        // The regression this guards: a failed snapshot capture used to collapse
773        // to `None` via `.ok()`, erasing the last good snapshot so the next cold
774        // rebuild would restore an empty plugin surface. A failure must leave the
775        // prior snapshot intact.
776        let prior = PluginSessionSnapshot::default();
777        let mut target = Some(prior);
778        store_plugin_snapshot(
779            &mut target,
780            Err(PluginError::Snapshot("capture failed".to_string())),
781        );
782        assert!(
783            target.is_some(),
784            "a failed capture must retain the prior snapshot, not erase it"
785        );
786    }
787}
788
789#[cfg(test)]
790mod residency_tests {
791    use super::apply_residency_on_load;
792    use crate::{
793        Message, MessageRole, Part, PartKind, PruneState, Residency, RuntimeSessionState,
794        shared_parts,
795    };
796
797    fn text_message(id: &str, content: &str) -> Message {
798        Message {
799            id: id.to_string(),
800            role: MessageRole::User,
801            parts: shared_parts(vec![Part {
802                id: format!("{id}.p0"),
803                kind: PartKind::Text,
804                content: content.to_string(),
805                attachment: None,
806                tool_call_id: None,
807                tool_name: None,
808                tool_replay: None,
809                prune_state: PruneState::Intact,
810                reasoning_meta: None,
811                response_meta: None,
812            }]),
813            origin: None,
814        }
815    }
816
817    /// Root, an inactive branch off the root, then an active branch off the root.
818    /// Returns the state plus the inactive and active branch node ids.
819    fn branching_state() -> (RuntimeSessionState, String, String) {
820        let mut state = RuntimeSessionState::default();
821        state.append_active_conversation_messages(&[text_message("root", "root")]);
822        let root = state.session_graph.leaf_node_id.clone();
823        state.append_active_conversation_messages(&[text_message("inactive", "inactive branch")]);
824        let inactive_node = state
825            .session_graph
826            .leaf_node_id
827            .clone()
828            .expect("inactive node");
829        state.session_graph.branch_to(root);
830        state.append_active_conversation_messages(&[text_message("active", "active branch")]);
831        let active_node = state
832            .session_graph
833            .leaf_node_id
834            .clone()
835            .expect("active node");
836        (state, inactive_node, active_node)
837    }
838
839    #[test]
840    fn active_path_only_trims_orphan_branches_on_load() {
841        // The durable worker rebuild (and session resume) call this to match the
842        // live runtime's residency. ActivePathOnly drops nodes off the active
843        // path so a rebuilt session does not silently retain the full graph.
844        let (mut state, inactive_node, active_node) = branching_state();
845        assert!(
846            state.session_graph.find_node(&inactive_node).is_some(),
847            "the inactive branch is resident before trimming"
848        );
849        apply_residency_on_load(&mut state, Residency::ActivePathOnly);
850        assert!(
851            state.session_graph.find_node(&inactive_node).is_none(),
852            "ActivePathOnly must drop the orphaned inactive branch on rebuild"
853        );
854        assert!(
855            state.session_graph.find_node(&active_node).is_some(),
856            "the active path must be retained"
857        );
858    }
859
860    #[test]
861    fn keep_all_retains_orphan_branches_on_load() {
862        let (mut state, inactive_node, _active_node) = branching_state();
863        apply_residency_on_load(&mut state, Residency::KeepAll);
864        assert!(
865            state.session_graph.find_node(&inactive_node).is_some(),
866            "KeepAll must retain the full resident graph"
867        );
868    }
869}