Skip to main content

lash_core/store/
mod.rs

1//! The runtime's settled-session persistence contract and shared store types.
2
3pub mod queued_work;
4
5fn default_root_session_id() -> String {
6    "root".to_string()
7}
8
9#[cfg(test)]
10mod persisted_state_tests {
11    use super::*;
12
13    #[test]
14    fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
15        let state = persisted_session_state_from_head(
16            SessionHead {
17                session_id: "stored".to_string(),
18                head_revision: 7,
19                agent_frames: Vec::new(),
20                current_agent_frame_id: String::new(),
21                graph: crate::SessionGraph::default(),
22                config: crate::PersistedSessionConfig {
23                    provider_id: "stored-provider".to_string(),
24                    model: crate::ModelSpec::default(),
25                },
26                checkpoint_ref: None,
27                token_ledger: Vec::new(),
28            },
29            None,
30        );
31
32        assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
33        assert!(
34            state
35                .agent_frames
36                .iter()
37                .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
38        );
39        assert_eq!(state.head_revision, Some(7));
40    }
41}
42
43#[derive(Debug, thiserror::Error)]
44pub enum StoreError {
45    #[error(
46        "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
47    )]
48    SessionBindingMismatch {
49        bound_session_id: String,
50        attempted_session_id: String,
51    },
52    #[error("store does not support read scope {0:?}")]
53    UnsupportedReadScope(SessionReadScope),
54    #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
55    HeadRevisionConflict { expected: Option<u64>, actual: u64 },
56    #[error(
57        "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
58    )]
59    RuntimeTurnCommitConflict { session_id: String, turn_id: String },
60    #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
61    QueuedWorkClaimExpired {
62        session_id: String,
63        claim_id: String,
64    },
65    #[error("session execution lease for session `{session_id}` is missing or expired")]
66    SessionExecutionLeaseExpired { session_id: String },
67    #[error(
68        "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
69    )]
70    UnsupportedRecordSchemaVersion {
71        record_kind: &'static str,
72        actual: u32,
73        expected: u32,
74    },
75    #[error("store backend error: {0}")]
76    Backend(String),
77}
78
79#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
80pub struct SessionMeta {
81    pub session_id: String,
82    pub session_name: String,
83    pub created_at: String,
84    pub model: String,
85    pub cwd: Option<String>,
86    pub relation: crate::SessionRelation,
87}
88
89impl SessionMeta {
90    /// Returns the parent session id, if any, derived from the canonical
91    /// [`SessionRelation`] field.
92    pub fn parent_session_id(&self) -> Option<&str> {
93        self.relation.parent_session_id()
94    }
95}
96
97/// Lightweight session info for the resume picker.
98#[derive(Clone, Debug)]
99pub struct SessionPickerInfo {
100    pub session_id: String,
101    pub cwd: Option<String>,
102    pub relation: crate::SessionRelation,
103    pub first_user_message: String,
104    pub user_message_count: usize,
105}
106
107impl SessionPickerInfo {
108    pub fn parent_session_id(&self) -> Option<&str> {
109        self.relation.parent_session_id()
110    }
111}
112
113#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
114#[serde(transparent)]
115pub struct BlobRef(pub String);
116
117impl BlobRef {
118    pub fn as_str(&self) -> &str {
119        &self.0
120    }
121}
122
123impl std::fmt::Display for BlobRef {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.write_str(&self.0)
126    }
127}
128
129impl From<String> for BlobRef {
130    fn from(value: String) -> Self {
131        Self(value)
132    }
133}
134
135#[derive(Clone, Debug, Default, PartialEq, Eq)]
136pub struct GcReport {
137    pub root_count: usize,
138    pub retained_blob_count: usize,
139    pub deleted_blob_count: usize,
140}
141
142/// Result of a `RuntimePersistence::vacuum()` call.
143/// `removed_node_count` counts the tombstoned graph-node rows that were
144/// physically deleted from the store. Returned so hosts can emit metrics.
145#[derive(Clone, Debug, Default, PartialEq, Eq)]
146pub struct VacuumReport {
147    pub removed_node_count: usize,
148}
149
150#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
151pub struct SessionCheckpoint {
152    #[serde(default)]
153    pub turn_state: crate::PersistedTurnState,
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    pub tool_state_ref: Option<BlobRef>,
156    #[serde(default, skip_serializing_if = "Option::is_none")]
157    pub plugin_snapshot_ref: Option<BlobRef>,
158    #[serde(default, skip_serializing_if = "Option::is_none")]
159    pub plugin_snapshot_revision: Option<u64>,
160    #[serde(default, skip_serializing_if = "Option::is_none")]
161    pub execution_state_ref: Option<BlobRef>,
162}
163
164#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
165pub struct HydratedSessionCheckpoint {
166    pub turn_state: crate::PersistedTurnState,
167    pub tool_state_ref: Option<BlobRef>,
168    pub tool_state: Option<crate::ToolState>,
169    pub plugin_snapshot_ref: Option<BlobRef>,
170    pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
171    pub plugin_snapshot_revision: Option<u64>,
172    pub execution_state_ref: Option<BlobRef>,
173    pub execution_state: Option<Vec<u8>>,
174}
175
176#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
177pub struct SessionHead {
178    #[serde(default = "default_root_session_id")]
179    pub session_id: String,
180    #[serde(default)]
181    pub head_revision: u64,
182    #[serde(default)]
183    pub agent_frames: Vec<crate::AgentFrameRecord>,
184    #[serde(default, skip_serializing_if = "String::is_empty")]
185    pub current_agent_frame_id: crate::AgentFrameId,
186    pub graph: crate::SessionGraph,
187    pub config: crate::PersistedSessionConfig,
188    #[serde(default, skip_serializing_if = "Option::is_none")]
189    pub checkpoint_ref: Option<BlobRef>,
190    #[serde(default, skip_serializing_if = "Vec::is_empty")]
191    pub token_ledger: Vec<crate::TokenLedgerEntry>,
192}
193
194#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
195pub struct SessionHeadMeta {
196    #[serde(default = "default_root_session_id")]
197    pub session_id: String,
198    #[serde(default)]
199    pub head_revision: u64,
200    pub config: crate::PersistedSessionConfig,
201    #[serde(default)]
202    pub agent_frames: Vec<crate::AgentFrameRecord>,
203    #[serde(default, skip_serializing_if = "String::is_empty")]
204    pub current_agent_frame_id: crate::AgentFrameId,
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    pub checkpoint_ref: Option<BlobRef>,
207    #[serde(default, skip_serializing_if = "Option::is_none")]
208    pub leaf_node_id: Option<String>,
209    #[serde(default)]
210    pub graph_node_count: usize,
211    #[serde(default, skip_serializing_if = "Vec::is_empty")]
212    pub token_ledger: Vec<crate::TokenLedgerEntry>,
213}
214
215fn persisted_session_config_from_state(
216    state: &crate::RuntimeSessionState,
217) -> crate::PersistedSessionConfig {
218    crate::PersistedSessionConfig {
219        provider_id: state.policy.recorded_provider_id().to_string(),
220        model: state.policy.model.clone(),
221    }
222}
223
224#[derive(Clone, Debug, PartialEq, Eq)]
225pub enum SessionReadScope {
226    FullGraph,
227    ActivePath { leaf_node_id: Option<String> },
228}
229
230#[derive(Clone, Debug)]
231pub struct PersistedSessionRead {
232    pub session_id: String,
233    pub head_revision: u64,
234    pub config: crate::PersistedSessionConfig,
235    pub agent_frames: Vec<crate::AgentFrameRecord>,
236    pub current_agent_frame_id: crate::AgentFrameId,
237    pub graph: crate::SessionGraph,
238    pub checkpoint_ref: Option<BlobRef>,
239    pub checkpoint: Option<HydratedSessionCheckpoint>,
240    pub token_ledger: Vec<crate::TokenLedgerEntry>,
241}
242
243#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
244pub enum GraphCommitDelta {
245    Unchanged {
246        leaf_node_id: Option<String>,
247    },
248    Append {
249        nodes: Vec<crate::SessionNodeRecord>,
250        leaf_node_id: Option<String>,
251    },
252    ReplaceFull(crate::SessionGraph),
253}
254
255impl GraphCommitDelta {
256    pub fn leaf_node_id(&self) -> Option<&String> {
257        match self {
258            Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
259                leaf_node_id.as_ref()
260            }
261            Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
262        }
263    }
264}
265
266#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
267pub struct RuntimeCommit {
268    pub session_id: String,
269    pub expected_head_revision: Option<u64>,
270    #[serde(default, skip_serializing_if = "Option::is_none")]
271    pub session_execution_lease: Option<SessionExecutionLeaseFence>,
272    #[serde(default, skip_serializing_if = "Option::is_none")]
273    pub release_session_execution_lease: Option<SessionExecutionLeaseCompletion>,
274    pub config: crate::PersistedSessionConfig,
275    pub agent_frames: Vec<crate::AgentFrameRecord>,
276    pub current_agent_frame_id: crate::AgentFrameId,
277    pub graph: GraphCommitDelta,
278    pub checkpoint: HydratedSessionCheckpoint,
279    pub usage_deltas: Vec<crate::TokenLedgerEntry>,
280    pub turn_commit: Option<RuntimeTurnCommitStamp>,
281    pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
282    /// Attachment ids whose bytes are referenced by this commit and
283    /// should be stamped `committed` in the write-ahead manifest as
284    /// part of the same SQL transaction. The backend marks each id
285    /// committed via [`AttachmentManifest::commit_refs`] before the
286    /// commit returns success. Hosts populate this from the
287    /// attachments emitted by tool calls and inline LLM-request
288    /// attachments produced during the turn.
289    pub committed_attachment_ids: Vec<crate::AttachmentId>,
290}
291
292#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
293pub struct RuntimeCommitResult {
294    pub head_revision: u64,
295    pub checkpoint_ref: BlobRef,
296    pub manifest: SessionCheckpoint,
297}
298
299#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
300pub struct SessionExecutionLease {
301    pub session_id: String,
302    pub owner_id: String,
303    pub lease_token: String,
304    pub fencing_token: u64,
305    pub claimed_at_epoch_ms: u64,
306    pub expires_at_epoch_ms: u64,
307}
308
309#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
310pub struct SessionExecutionLeaseFence {
311    pub session_id: String,
312    pub owner_id: String,
313    pub lease_token: String,
314    pub fencing_token: u64,
315}
316
317#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
318pub struct SessionExecutionLeaseCompletion {
319    pub session_id: String,
320    pub owner_id: String,
321    pub lease_token: String,
322    pub fencing_token: u64,
323}
324
325impl SessionExecutionLease {
326    pub fn fence(&self) -> SessionExecutionLeaseFence {
327        SessionExecutionLeaseFence {
328            session_id: self.session_id.clone(),
329            owner_id: self.owner_id.clone(),
330            lease_token: self.lease_token.clone(),
331            fencing_token: self.fencing_token,
332        }
333    }
334
335    pub fn completion(&self) -> SessionExecutionLeaseCompletion {
336        SessionExecutionLeaseCompletion {
337            session_id: self.session_id.clone(),
338            owner_id: self.owner_id.clone(),
339            lease_token: self.lease_token.clone(),
340            fencing_token: self.fencing_token,
341        }
342    }
343}
344
345impl SessionExecutionLeaseCompletion {
346    pub fn from_lease(lease: &SessionExecutionLease) -> Self {
347        lease.completion()
348    }
349}
350
351// =============================================================================
352// Attachment write-ahead manifest
353// =============================================================================
354
355/// A pending attachment write recorded *before* the bytes hit the
356/// [`AttachmentStore`](crate::AttachmentStore) backend.
357///
358/// The runtime calls [`AttachmentManifest::record_intent`] from the
359/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
360/// wrapper before each `put`, so the manifest is a durable record that
361/// "some bytes are about to land at this URI." When the turn that
362/// references the attachment commits successfully via
363/// [`RuntimePersistence::commit_runtime_state`], the same transaction
364/// stamps `committed_at_epoch_ms`. Periodic GC sweeps manifest rows
365/// whose intent has aged past a host-chosen threshold without ever
366/// being committed and deletes the corresponding bytes — that's how we
367/// reconcile orphaned files left behind by crashes between `put` and
368/// the next turn commit.
369#[derive(Clone, Debug)]
370pub struct AttachmentIntent {
371    pub attachment_id: crate::AttachmentId,
372    pub session_id: String,
373    /// Canonical URI for the attachment payload in the backing store.
374    /// For file-backed stores this is the absolute on-disk path; for
375    /// blob-backed stores it can be any stable identifier the host
376    /// uses to clean the payload up.
377    pub canonical_uri: String,
378    pub intent_at_epoch_ms: u64,
379}
380
381#[derive(Clone, Debug)]
382pub struct AttachmentManifestEntry {
383    pub attachment_id: crate::AttachmentId,
384    pub session_id: String,
385    pub canonical_uri: String,
386    pub intent_at_epoch_ms: u64,
387    pub committed_at_epoch_ms: Option<u64>,
388}
389
390/// Trait alias for the synchronous attachment-manifest surface on
391/// [`RuntimePersistence`]. Used by
392/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
393/// to record intent rows before `put` and by GC sweeps to reconcile
394/// orphans. See the [`AttachmentIntent`] doc comment for the full
395/// crash-safety story.
396///
397/// Backends with no attachment story (in-memory tests, mock stores)
398/// inherit the default no-op impls on [`RuntimePersistence`] and
399/// participate transparently — `record_intent` is a no-op, the
400/// scoped wrapper still works, and GC sweeps return empty.
401pub trait AttachmentManifest: Send + Sync {
402    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
403
404    /// Mark a set of attachment ids as committed (i.e. now referenced
405    /// by a durable session-graph commit). Backends that store
406    /// commits and manifest in the same database stamp this inside
407    /// the commit transaction; the trait-level method is the
408    /// out-of-band entry point for hosts that want to commit an id
409    /// outside the normal turn-commit flow.
410    fn commit_refs(
411        &self,
412        session_id: &str,
413        attachment_ids: &[crate::AttachmentId],
414    ) -> Result<(), StoreError>;
415
416    /// Return manifest entries whose intent has aged past
417    /// `older_than_epoch_ms` without ever being committed. Hosts run
418    /// this periodically to find orphans left by crashes between
419    /// `record_intent` and the next turn commit.
420    fn list_uncommitted(
421        &self,
422        older_than_epoch_ms: u64,
423    ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
424
425    /// Remove a manifest row entirely. Called by the GC coordinator
426    /// after the corresponding bytes have been removed from the
427    /// backing [`AttachmentStore`](crate::AttachmentStore).
428    fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
429}
430
431/// Mixin macro for [`RuntimePersistence`] implementors that have no
432/// attachment-write story (mock backends, in-memory test stores,
433/// runtime-perf harnesses). Pastes no-op impls of every
434/// [`AttachmentManifest`] method.
435#[macro_export]
436macro_rules! impl_noop_attachment_manifest {
437    ($ty:ty) => {
438        impl $crate::AttachmentManifest for $ty {
439            fn record_intent(
440                &self,
441                _intent: $crate::AttachmentIntent,
442            ) -> ::std::result::Result<(), $crate::StoreError> {
443                Ok(())
444            }
445
446            fn commit_refs(
447                &self,
448                _session_id: &str,
449                _attachment_ids: &[$crate::AttachmentId],
450            ) -> ::std::result::Result<(), $crate::StoreError> {
451                Ok(())
452            }
453
454            fn list_uncommitted(
455                &self,
456                _older_than_epoch_ms: u64,
457            ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
458            {
459                Ok(Vec::new())
460            }
461
462            fn forget(
463                &self,
464                _attachment_id: &$crate::AttachmentId,
465            ) -> ::std::result::Result<(), $crate::StoreError> {
466                Ok(())
467            }
468        }
469    };
470}
471
472/// Reject a persisted record whose `schema_version` does not match the
473/// version this binary supports. Backends call this immediately after
474/// deserializing a record from durable storage.
475pub fn ensure_supported_schema_version(
476    record_kind: &'static str,
477    actual: u32,
478    expected: u32,
479) -> Result<(), StoreError> {
480    if actual == expected {
481        Ok(())
482    } else {
483        Err(StoreError::UnsupportedRecordSchemaVersion {
484            record_kind,
485            actual,
486            expected,
487        })
488    }
489}
490
491#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
492pub struct RuntimeTurnCommitStamp {
493    pub session_id: String,
494    pub turn_id: String,
495    pub turn_commit_hash: String,
496}
497
498impl RuntimeTurnCommitStamp {
499    pub fn new(
500        session_id: impl Into<String>,
501        turn_id: impl Into<String>,
502        turn_commit_hash: impl Into<String>,
503    ) -> Self {
504        Self {
505            session_id: session_id.into(),
506            turn_id: turn_id.into(),
507            turn_commit_hash: turn_commit_hash.into(),
508        }
509    }
510}
511
512fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
513    crate::PersistedTurnState {
514        turn_index: state.turn_index,
515        token_usage: state.token_usage.clone(),
516        last_prompt_usage: state.last_prompt_usage.clone(),
517        protocol_turn_options: state.protocol_turn_options.clone(),
518    }
519}
520
521fn build_checkpoint_from_persisted_state(
522    state: &crate::RuntimeSessionState,
523) -> HydratedSessionCheckpoint {
524    HydratedSessionCheckpoint {
525        turn_state: build_persisted_turn_state(state),
526        tool_state_ref: state.tool_state_ref.clone(),
527        tool_state: state.tool_state_snapshot.clone(),
528        plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
529        plugin_snapshot_revision: state.plugin_snapshot_revision,
530        plugin_snapshot: state.plugin_snapshot.clone(),
531        execution_state_ref: state.execution_state_ref.clone(),
532        execution_state: state.execution_state_snapshot.clone(),
533    }
534}
535
536impl RuntimeCommit {
537    pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
538        let mut semantic_commit = self.clone();
539        semantic_commit.expected_head_revision = None;
540        semantic_commit.session_execution_lease = None;
541        semantic_commit.release_session_execution_lease = None;
542        semantic_commit.turn_commit = None;
543        let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
544            StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
545        })?;
546        scrub_turn_commit_hash_value(&mut semantic_commit);
547        crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
548            StoreError::Backend(format!(
549                "failed to serialize runtime turn commit hash: {err}"
550            ))
551        })
552    }
553
554    pub fn persisted_state(
555        state: &crate::RuntimeSessionState,
556        usage_deltas: &[crate::TokenLedgerEntry],
557    ) -> Self {
558        Self {
559            session_id: state.session_id.clone(),
560            expected_head_revision: state.head_revision,
561            session_execution_lease: None,
562            release_session_execution_lease: None,
563            config: persisted_session_config_from_state(state),
564            agent_frames: state.agent_frames.clone(),
565            current_agent_frame_id: state.current_agent_frame_id.clone(),
566            graph: if state.graph_replace_required || state.head_revision.is_none() {
567                GraphCommitDelta::ReplaceFull(state.session_graph.clone())
568            } else {
569                GraphCommitDelta::Unchanged {
570                    leaf_node_id: state.session_graph.leaf_node_id.clone(),
571                }
572            },
573            checkpoint: build_checkpoint_from_persisted_state(state),
574            usage_deltas: usage_deltas.to_vec(),
575            turn_commit: None,
576            completed_queue_claims: Vec::new(),
577            committed_attachment_ids: Vec::new(),
578        }
579    }
580
581    pub(crate) fn persisted_state_with_graph_commit(
582        state: &crate::RuntimeSessionState,
583        graph: GraphCommitDelta,
584        usage_deltas: &[crate::TokenLedgerEntry],
585    ) -> Self {
586        Self {
587            session_id: state.session_id.clone(),
588            expected_head_revision: state.head_revision,
589            session_execution_lease: None,
590            release_session_execution_lease: None,
591            config: persisted_session_config_from_state(state),
592            agent_frames: state.agent_frames.clone(),
593            current_agent_frame_id: state.current_agent_frame_id.clone(),
594            graph,
595            checkpoint: build_checkpoint_from_persisted_state(state),
596            usage_deltas: usage_deltas.to_vec(),
597            turn_commit: None,
598            completed_queue_claims: Vec::new(),
599            committed_attachment_ids: Vec::new(),
600        }
601    }
602
603    pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
604        self.turn_commit = Some(turn_commit);
605        self
606    }
607
608    pub fn with_session_execution_lease(mut self, lease: SessionExecutionLeaseFence) -> Self {
609        self.session_execution_lease = Some(lease);
610        self
611    }
612
613    pub fn releasing_session_execution_lease(
614        mut self,
615        completion: SessionExecutionLeaseCompletion,
616    ) -> Self {
617        self.release_session_execution_lease = Some(completion);
618        self
619    }
620
621    pub fn completing_queue_claim(
622        mut self,
623        completed_queue_claim: crate::QueuedWorkCompletion,
624    ) -> Self {
625        self.completed_queue_claims.push(completed_queue_claim);
626        self
627    }
628
629    pub fn completing_queue_claims(
630        mut self,
631        completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
632    ) -> Self {
633        self.completed_queue_claims.extend(completed_queue_claims);
634        self
635    }
636
637    pub fn with_committed_attachments(
638        mut self,
639        attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
640    ) -> Self {
641        self.committed_attachment_ids = attachment_ids.into_iter().collect();
642        self
643    }
644}
645
646fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
647    match value {
648        serde_json::Value::Object(map) => {
649            let is_message = map.contains_key("role") && map.contains_key("parts");
650            let is_message_part = map.contains_key("kind")
651                && map.contains_key("content")
652                && map.contains_key("prune_state");
653            if is_message || is_message_part {
654                map.remove("id");
655            }
656            for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
657                map.remove(volatile_key);
658            }
659            for child in map.values_mut() {
660                scrub_turn_commit_hash_value(child);
661            }
662        }
663        serde_json::Value::Array(items) => {
664            for item in items {
665                scrub_turn_commit_hash_value(item);
666            }
667        }
668        _ => {}
669    }
670}
671
672fn persisted_session_state_from_head(
673    head: SessionHead,
674    checkpoint: Option<HydratedSessionCheckpoint>,
675) -> crate::RuntimeSessionState {
676    let mut state = crate::RuntimeSessionState {
677        session_id: head.session_id,
678        policy: crate::SessionPolicy::default(),
679        agent_frames: head.agent_frames,
680        current_agent_frame_id: head.current_agent_frame_id,
681        session_graph: head.graph,
682        turn_index: 0,
683        token_usage: crate::TokenUsage::default(),
684        last_prompt_usage: None,
685        protocol_turn_options: crate::ProtocolTurnOptions::default(),
686        tool_state_ref: None,
687        tool_state_generation: None,
688        tool_state_snapshot: None,
689        plugin_snapshot_ref: None,
690        plugin_snapshot_revision: None,
691        plugin_snapshot: None,
692        execution_state_ref: None,
693        execution_state_snapshot: None,
694        token_ledger: head.token_ledger,
695        checkpoint_ref: head.checkpoint_ref.clone(),
696        head_revision: Some(head.head_revision),
697        graph_replace_required: false,
698    };
699    state.policy.model = head.config.model.clone();
700    state.policy.provider_id = head.config.provider_id.clone();
701    if let Some(checkpoint) = checkpoint {
702        state.turn_index = checkpoint.turn_state.turn_index;
703        state.token_usage = checkpoint.turn_state.token_usage;
704        state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
705        state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
706        state.tool_state_ref = checkpoint.tool_state_ref.clone();
707        state.tool_state_generation = checkpoint
708            .tool_state
709            .as_ref()
710            .map(|snapshot| snapshot.generation());
711        state.tool_state_snapshot = checkpoint.tool_state;
712        state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
713        state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
714        state.plugin_snapshot = checkpoint.plugin_snapshot;
715        state.execution_state_ref = checkpoint.execution_state_ref.clone();
716        state.execution_state_snapshot = checkpoint.execution_state;
717    }
718    state.ensure_agent_frame_initialized();
719    state
720}
721
722impl Default for SessionHead {
723    fn default() -> Self {
724        Self {
725            session_id: default_root_session_id(),
726            head_revision: 0,
727            agent_frames: Vec::new(),
728            current_agent_frame_id: String::new(),
729            graph: crate::SessionGraph::default(),
730            config: crate::PersistedSessionConfig::default(),
731            checkpoint_ref: None,
732            token_ledger: Vec::new(),
733        }
734    }
735}
736
737impl Default for SessionHeadMeta {
738    fn default() -> Self {
739        Self {
740            session_id: default_root_session_id(),
741            head_revision: 0,
742            config: crate::PersistedSessionConfig::default(),
743            agent_frames: Vec::new(),
744            current_agent_frame_id: String::new(),
745            checkpoint_ref: None,
746            leaf_node_id: None,
747            graph_node_count: 0,
748            token_ledger: Vec::new(),
749        }
750    }
751}
752
753/// Exact settled-session persistence protocol required by the runtime.
754///
755/// This is the runtime's atomic transaction facade for visible session state:
756/// session graph/head commits, queued-work ingress and completion, final
757/// turn-commit idempotency, metadata, usage, and the attachment write-ahead
758/// manifest. In-flight nondeterministic work belongs to the active
759/// [`EffectHost`](crate::EffectHost), not to the store contract.
760///
761/// The [`AttachmentManifest`] supertrait is required so the runtime can wrap
762/// any persistence backend with a
763/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
764/// without dual-trait casting. Backends with no attachment-write story can
765/// implement the manifest methods as no-ops via
766/// [`NoopAttachmentManifest`]'s blanket helpers.
767#[async_trait::async_trait]
768pub trait RuntimePersistence: AttachmentManifest + Send + Sync {
769    /// Durability tier this session store provides; defaults to
770    /// [`DurabilityTier::Inline`].
771    fn durability_tier(&self) -> crate::DurabilityTier {
772        crate::DurabilityTier::Inline
773    }
774
775    async fn load_session(
776        &self,
777        scope: SessionReadScope,
778    ) -> Result<Option<PersistedSessionRead>, StoreError>;
779
780    async fn load_node(
781        &self,
782        node_id: &str,
783    ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
784
785    async fn commit_runtime_state(
786        &self,
787        commit: RuntimeCommit,
788    ) -> Result<RuntimeCommitResult, StoreError>;
789
790    /// Try to claim the durable single-writer execution lane for `session_id`.
791    ///
792    /// Returns `Ok(None)` when another owner holds an unexpired lease. Expired
793    /// or released leases may be reclaimed and receive a higher fencing token.
794    async fn try_claim_session_execution_lease(
795        &self,
796        session_id: &str,
797        owner_id: &str,
798        lease_ttl_ms: u64,
799    ) -> Result<Option<SessionExecutionLease>, StoreError>;
800
801    /// Extend a live session execution lease owned by the caller.
802    ///
803    /// Backends must reject stale, released, superseded, or expired fences with
804    /// [`StoreError::SessionExecutionLeaseExpired`].
805    async fn renew_session_execution_lease(
806        &self,
807        fence: &SessionExecutionLeaseFence,
808        lease_ttl_ms: u64,
809    ) -> Result<SessionExecutionLease, StoreError>;
810
811    /// Release a session execution lease fenced by its completion token.
812    ///
813    /// This operation is idempotent and must not clear a newer owner's lease.
814    async fn release_session_execution_lease(
815        &self,
816        completion: &SessionExecutionLeaseCompletion,
817    ) -> Result<(), StoreError>;
818
819    /// Persist a queued-work batch for later claiming.
820    ///
821    /// The default implementation rejects the batch: backends that do not
822    /// support queued work inherit it and stay loud rather than silently
823    /// dropping work.
824    async fn enqueue_queued_work(
825        &self,
826        _batch: crate::QueuedWorkBatchDraft,
827    ) -> Result<crate::QueuedWorkBatch, StoreError> {
828        Err(StoreError::Backend(
829            "queued work is not supported by this test store".to_string(),
830        ))
831    }
832
833    /// Claim the next ready queued-work group for `owner_id`.
834    ///
835    /// The default implementation reports queued work as unsupported.
836    async fn claim_ready_queued_work(
837        &self,
838        session_id: &str,
839        _session_execution_lease: &SessionExecutionLeaseFence,
840        _owner_id: &str,
841        _boundary: crate::QueuedWorkClaimBoundary,
842        _lease_ttl_ms: u64,
843        _max_batches: usize,
844    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
845        Err(StoreError::Backend(format!(
846            "queued work is not supported for session `{session_id}` by this test store"
847        )))
848    }
849
850    /// Claim a specific ready batch set selected from the durable queue.
851    ///
852    /// This is the host-facing counterpart to [`claim_ready_queued_work`]:
853    /// callers that project queued work into a UI can claim the exact batch ids
854    /// they rendered instead of reconstructing authority from local draft state.
855    /// The default implementation preserves the ordered queue contract by
856    /// claiming the next ready group and returning it only when the durable ids
857    /// match exactly.
858    async fn claim_ready_queued_work_by_batch_ids(
859        &self,
860        session_id: &str,
861        session_execution_lease: &SessionExecutionLeaseFence,
862        owner_id: &str,
863        boundary: crate::QueuedWorkClaimBoundary,
864        lease_ttl_ms: u64,
865        batch_ids: &[String],
866    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
867        if batch_ids.is_empty() {
868            return Ok(None);
869        }
870        let Some(claim) = self
871            .claim_ready_queued_work(
872                session_id,
873                session_execution_lease,
874                owner_id,
875                boundary,
876                lease_ttl_ms,
877                batch_ids.len(),
878            )
879            .await?
880        else {
881            return Ok(None);
882        };
883        let claimed_ids = claim
884            .batches
885            .iter()
886            .map(|batch| batch.batch_id.as_str())
887            .collect::<Vec<_>>();
888        if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
889            return Ok(Some(claim));
890        }
891        self.abandon_queued_work_claim(&claim).await?;
892        Ok(None)
893    }
894
895    /// Extend the lease on a held queued-work claim.
896    ///
897    /// The default implementation reports the claim as expired, matching a
898    /// backend that never granted one.
899    async fn renew_queued_work_claim(
900        &self,
901        claim: &crate::QueuedWorkClaim,
902        _lease_ttl_ms: u64,
903    ) -> Result<crate::QueuedWorkClaim, StoreError> {
904        Err(StoreError::QueuedWorkClaimExpired {
905            session_id: claim.session_id.clone(),
906            claim_id: claim.claim_id.clone(),
907        })
908    }
909
910    /// Release a held queued-work claim without completing it.
911    ///
912    /// The default implementation is a no-op: with no queued work there is
913    /// nothing to release.
914    async fn abandon_queued_work_claim(
915        &self,
916        _claim: &crate::QueuedWorkClaim,
917    ) -> Result<(), StoreError> {
918        Ok(())
919    }
920
921    /// Remove an unclaimed queued-work batch from durable ingress.
922    ///
923    /// Returns the removed batch when cancellation won the race. Returns `None`
924    /// when the batch is missing or currently held by a live claim; callers must
925    /// treat that as "already claimed or completed" and must not restore any
926    /// stale local draft state.
927    ///
928    /// The default implementation reports `None` (nothing queued, nothing to
929    /// cancel).
930    async fn cancel_queued_work_batch(
931        &self,
932        _session_id: &str,
933        _batch_id: &str,
934    ) -> Result<Option<crate::QueuedWorkBatch>, StoreError> {
935        Ok(None)
936    }
937
938    /// List all queued-work batches for a session.
939    ///
940    /// The default implementation reports an empty queue.
941    async fn list_queued_work(
942        &self,
943        _session_id: &str,
944    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
945        Ok(Vec::new())
946    }
947
948    /// List queued-work batches that are still pending presentation/editing.
949    ///
950    /// This excludes batches currently held by a live claim. Expired claims are
951    /// considered pending again because they can be reclaimed or cancelled.
952    async fn list_pending_queued_work(
953        &self,
954        session_id: &str,
955    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
956        self.list_queued_work(session_id).await
957    }
958
959    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
960    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
961
962    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
963    async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
964    async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
965}
966
967fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
968    persisted_session_state_from_head(
969        SessionHead {
970            session_id: read.session_id,
971            head_revision: read.head_revision,
972            agent_frames: read.agent_frames,
973            current_agent_frame_id: read.current_agent_frame_id,
974            graph: read.graph,
975            config: read.config,
976            checkpoint_ref: read.checkpoint_ref,
977            token_ledger: read.token_ledger,
978        },
979        read.checkpoint,
980    )
981}
982
983pub async fn load_persisted_session_state(
984    store: &(dyn RuntimePersistence + '_),
985) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
986    Ok(store
987        .load_session(SessionReadScope::FullGraph)
988        .await?
989        .map(persisted_session_state_from_read))
990}
991
992pub async fn load_persisted_session_state_active_path(
993    store: &(dyn RuntimePersistence + '_),
994    leaf_node_id: Option<String>,
995) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
996    Ok(store
997        .load_session(SessionReadScope::ActivePath { leaf_node_id })
998        .await?
999        .map(persisted_session_state_from_read))
1000}
1001
1002pub async fn refresh_persisted_session_state(
1003    store: &(dyn RuntimePersistence + '_),
1004    state: &mut crate::RuntimeSessionState,
1005) -> Result<(), StoreError> {
1006    if let Some(mut fresh) = load_persisted_session_state(store).await? {
1007        fresh.policy.session_id = state.policy.session_id.clone();
1008        fresh.policy.max_turns = state.policy.max_turns;
1009        *state = fresh;
1010    }
1011    Ok(())
1012}