Skip to main content

lash_core/
store.rs

1fn default_root_session_id() -> String {
2    "root".to_string()
3}
4
5#[cfg(test)]
6mod persisted_state_tests {
7    use super::*;
8
9    #[test]
10    fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
11        let state = persisted_session_state_from_head(
12            SessionHead {
13                session_id: "stored".to_string(),
14                head_revision: 7,
15                agent_frames: Vec::new(),
16                current_agent_frame_id: String::new(),
17                graph: crate::SessionGraph::default(),
18                config: crate::PersistedSessionConfig {
19                    provider_id: "stored-provider".to_string(),
20                    model: crate::ModelSpec::default(),
21                },
22                checkpoint_ref: None,
23                token_ledger: Vec::new(),
24            },
25            None,
26        );
27
28        assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
29        assert!(
30            state
31                .agent_frames
32                .iter()
33                .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
34        );
35        assert_eq!(state.head_revision, Some(7));
36    }
37}
38
39#[derive(Debug, thiserror::Error)]
40pub enum StoreError {
41    #[error(
42        "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
43    )]
44    SessionBindingMismatch {
45        bound_session_id: String,
46        attempted_session_id: String,
47    },
48    #[error("store does not support read scope {0:?}")]
49    UnsupportedReadScope(SessionReadScope),
50    #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
51    HeadRevisionConflict { expected: Option<u64>, actual: u64 },
52    #[error(
53        "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
54    )]
55    RuntimeTurnCommitConflict { session_id: String, turn_id: String },
56    #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
57    QueuedWorkClaimExpired {
58        session_id: String,
59        claim_id: String,
60    },
61    #[error(
62        "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
63    )]
64    UnsupportedRecordSchemaVersion {
65        record_kind: &'static str,
66        actual: u32,
67        expected: u32,
68    },
69    #[error("store backend error: {0}")]
70    Backend(String),
71}
72
73#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
74pub struct SessionMeta {
75    pub session_id: String,
76    pub session_name: String,
77    pub created_at: String,
78    pub model: String,
79    pub cwd: Option<String>,
80    pub relation: crate::SessionRelation,
81}
82
83impl SessionMeta {
84    /// Returns the parent session id, if any, derived from the canonical
85    /// [`SessionRelation`] field.
86    pub fn parent_session_id(&self) -> Option<&str> {
87        self.relation.parent_session_id()
88    }
89}
90
91/// Lightweight session info for the resume picker.
92#[derive(Clone, Debug)]
93pub struct SessionPickerInfo {
94    pub session_id: String,
95    pub cwd: Option<String>,
96    pub relation: crate::SessionRelation,
97    pub first_user_message: String,
98    pub user_message_count: usize,
99}
100
101impl SessionPickerInfo {
102    pub fn parent_session_id(&self) -> Option<&str> {
103        self.relation.parent_session_id()
104    }
105}
106
107#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
108#[serde(transparent)]
109pub struct BlobRef(pub String);
110
111impl BlobRef {
112    pub fn as_str(&self) -> &str {
113        &self.0
114    }
115}
116
117impl std::fmt::Display for BlobRef {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        f.write_str(&self.0)
120    }
121}
122
123impl From<String> for BlobRef {
124    fn from(value: String) -> Self {
125        Self(value)
126    }
127}
128
129#[derive(Clone, Debug, Default, PartialEq, Eq)]
130pub struct GcReport {
131    pub root_count: usize,
132    pub retained_blob_count: usize,
133    pub deleted_blob_count: usize,
134}
135
136/// Result of a `RuntimePersistence::vacuum()` call.
137/// `removed_node_count` counts the tombstoned graph-node rows that were
138/// physically deleted from the store. Returned so hosts can emit metrics.
139#[derive(Clone, Debug, Default, PartialEq, Eq)]
140pub struct VacuumReport {
141    pub removed_node_count: usize,
142}
143
144#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
145pub struct SessionCheckpoint {
146    #[serde(default)]
147    pub turn_state: crate::PersistedTurnState,
148    #[serde(default, skip_serializing_if = "Option::is_none")]
149    pub tool_state_ref: Option<BlobRef>,
150    #[serde(default, skip_serializing_if = "Option::is_none")]
151    pub plugin_snapshot_ref: Option<BlobRef>,
152    #[serde(default, skip_serializing_if = "Option::is_none")]
153    pub plugin_snapshot_revision: Option<u64>,
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    pub execution_state_ref: Option<BlobRef>,
156}
157
158#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
159pub struct HydratedSessionCheckpoint {
160    pub turn_state: crate::PersistedTurnState,
161    pub tool_state_ref: Option<BlobRef>,
162    pub tool_state: Option<crate::ToolState>,
163    pub plugin_snapshot_ref: Option<BlobRef>,
164    pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
165    pub plugin_snapshot_revision: Option<u64>,
166    pub execution_state_ref: Option<BlobRef>,
167    pub execution_state: Option<Vec<u8>>,
168}
169
170#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
171pub struct SessionHead {
172    #[serde(default = "default_root_session_id")]
173    pub session_id: String,
174    #[serde(default)]
175    pub head_revision: u64,
176    #[serde(default)]
177    pub agent_frames: Vec<crate::AgentFrameRecord>,
178    #[serde(default, skip_serializing_if = "String::is_empty")]
179    pub current_agent_frame_id: crate::AgentFrameId,
180    pub graph: crate::SessionGraph,
181    pub config: crate::PersistedSessionConfig,
182    #[serde(default, skip_serializing_if = "Option::is_none")]
183    pub checkpoint_ref: Option<BlobRef>,
184    #[serde(default, skip_serializing_if = "Vec::is_empty")]
185    pub token_ledger: Vec<crate::TokenLedgerEntry>,
186}
187
188#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
189pub struct SessionHeadMeta {
190    #[serde(default = "default_root_session_id")]
191    pub session_id: String,
192    #[serde(default)]
193    pub head_revision: u64,
194    pub config: crate::PersistedSessionConfig,
195    #[serde(default)]
196    pub agent_frames: Vec<crate::AgentFrameRecord>,
197    #[serde(default, skip_serializing_if = "String::is_empty")]
198    pub current_agent_frame_id: crate::AgentFrameId,
199    #[serde(default, skip_serializing_if = "Option::is_none")]
200    pub checkpoint_ref: Option<BlobRef>,
201    #[serde(default, skip_serializing_if = "Option::is_none")]
202    pub leaf_node_id: Option<String>,
203    #[serde(default)]
204    pub graph_node_count: usize,
205    #[serde(default, skip_serializing_if = "Vec::is_empty")]
206    pub token_ledger: Vec<crate::TokenLedgerEntry>,
207}
208
209fn persisted_session_config_from_state(
210    state: &crate::RuntimeSessionState,
211) -> crate::PersistedSessionConfig {
212    crate::PersistedSessionConfig {
213        provider_id: state.policy.recorded_provider_id().to_string(),
214        model: state.policy.model.clone(),
215    }
216}
217
218#[derive(Clone, Debug, PartialEq, Eq)]
219pub enum SessionReadScope {
220    FullGraph,
221    ActivePath { leaf_node_id: Option<String> },
222}
223
224#[derive(Clone, Debug)]
225pub struct PersistedSessionRead {
226    pub session_id: String,
227    pub head_revision: u64,
228    pub config: crate::PersistedSessionConfig,
229    pub agent_frames: Vec<crate::AgentFrameRecord>,
230    pub current_agent_frame_id: crate::AgentFrameId,
231    pub graph: crate::SessionGraph,
232    pub checkpoint_ref: Option<BlobRef>,
233    pub checkpoint: Option<HydratedSessionCheckpoint>,
234    pub token_ledger: Vec<crate::TokenLedgerEntry>,
235}
236
237#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
238pub enum GraphCommitDelta {
239    Unchanged {
240        leaf_node_id: Option<String>,
241    },
242    Append {
243        nodes: Vec<crate::SessionNodeRecord>,
244        leaf_node_id: Option<String>,
245    },
246    ReplaceFull(crate::SessionGraph),
247}
248
249impl GraphCommitDelta {
250    pub fn leaf_node_id(&self) -> Option<&String> {
251        match self {
252            Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
253                leaf_node_id.as_ref()
254            }
255            Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
256        }
257    }
258}
259
260#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
261pub struct RuntimeCommit {
262    pub session_id: String,
263    pub expected_head_revision: Option<u64>,
264    pub config: crate::PersistedSessionConfig,
265    pub agent_frames: Vec<crate::AgentFrameRecord>,
266    pub current_agent_frame_id: crate::AgentFrameId,
267    pub graph: GraphCommitDelta,
268    pub checkpoint: HydratedSessionCheckpoint,
269    pub usage_deltas: Vec<crate::TokenLedgerEntry>,
270    pub turn_commit: Option<RuntimeTurnCommitStamp>,
271    pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
272    /// Attachment ids whose bytes are referenced by this commit and
273    /// should be stamped `committed` in the write-ahead manifest as
274    /// part of the same SQL transaction. The backend marks each id
275    /// committed via [`AttachmentManifest::commit_refs`] before the
276    /// commit returns success. Hosts populate this from the
277    /// attachments emitted by tool calls and inline LLM-request
278    /// attachments produced during the turn.
279    pub committed_attachment_ids: Vec<crate::AttachmentId>,
280}
281
282#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
283pub struct RuntimeCommitResult {
284    pub head_revision: u64,
285    pub checkpoint_ref: BlobRef,
286    pub manifest: SessionCheckpoint,
287}
288
289// =============================================================================
290// Attachment write-ahead manifest
291// =============================================================================
292
293/// A pending attachment write recorded *before* the bytes hit the
294/// [`AttachmentStore`](crate::AttachmentStore) backend.
295///
296/// The runtime calls [`AttachmentManifest::record_intent`] from the
297/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
298/// wrapper before each `put`, so the manifest is a durable record that
299/// "some bytes are about to land at this URI." When the turn that
300/// references the attachment commits successfully via
301/// [`RuntimePersistence::commit_runtime_state`], the same transaction
302/// stamps `committed_at_epoch_ms`. Periodic GC sweeps manifest rows
303/// whose intent has aged past a host-chosen threshold without ever
304/// being committed and deletes the corresponding bytes — that's how we
305/// reconcile orphaned files left behind by crashes between `put` and
306/// the next turn commit.
307#[derive(Clone, Debug)]
308pub struct AttachmentIntent {
309    pub attachment_id: crate::AttachmentId,
310    pub session_id: String,
311    /// Canonical URI for the attachment payload in the backing store.
312    /// For file-backed stores this is the absolute on-disk path; for
313    /// blob-backed stores it can be any stable identifier the host
314    /// uses to clean the payload up.
315    pub canonical_uri: String,
316    pub intent_at_epoch_ms: u64,
317}
318
319#[derive(Clone, Debug)]
320pub struct AttachmentManifestEntry {
321    pub attachment_id: crate::AttachmentId,
322    pub session_id: String,
323    pub canonical_uri: String,
324    pub intent_at_epoch_ms: u64,
325    pub committed_at_epoch_ms: Option<u64>,
326}
327
328/// Trait alias for the synchronous attachment-manifest surface on
329/// [`RuntimePersistence`]. Used by
330/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
331/// to record intent rows before `put` and by GC sweeps to reconcile
332/// orphans. See the [`AttachmentIntent`] doc comment for the full
333/// crash-safety story.
334///
335/// Backends with no attachment story (in-memory tests, mock stores)
336/// inherit the default no-op impls on [`RuntimePersistence`] and
337/// participate transparently — `record_intent` is a no-op, the
338/// scoped wrapper still works, and GC sweeps return empty.
339pub trait AttachmentManifest: Send + Sync {
340    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
341
342    /// Mark a set of attachment ids as committed (i.e. now referenced
343    /// by a durable session-graph commit). Backends that store
344    /// commits and manifest in the same database stamp this inside
345    /// the commit transaction; the trait-level method is the
346    /// out-of-band entry point for hosts that want to commit an id
347    /// outside the normal turn-commit flow.
348    fn commit_refs(
349        &self,
350        session_id: &str,
351        attachment_ids: &[crate::AttachmentId],
352    ) -> Result<(), StoreError>;
353
354    /// Return manifest entries whose intent has aged past
355    /// `older_than_epoch_ms` without ever being committed. Hosts run
356    /// this periodically to find orphans left by crashes between
357    /// `record_intent` and the next turn commit.
358    fn list_uncommitted(
359        &self,
360        older_than_epoch_ms: u64,
361    ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
362
363    /// Remove a manifest row entirely. Called by the GC coordinator
364    /// after the corresponding bytes have been removed from the
365    /// backing [`AttachmentStore`](crate::AttachmentStore).
366    fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
367}
368
369/// Mixin macro for [`RuntimePersistence`] implementors that have no
370/// attachment-write story (mock backends, in-memory test stores,
371/// runtime-perf harnesses). Pastes no-op impls of every
372/// [`AttachmentManifest`] method.
373#[macro_export]
374macro_rules! impl_noop_attachment_manifest {
375    ($ty:ty) => {
376        impl $crate::AttachmentManifest for $ty {
377            fn record_intent(
378                &self,
379                _intent: $crate::AttachmentIntent,
380            ) -> ::std::result::Result<(), $crate::StoreError> {
381                Ok(())
382            }
383
384            fn commit_refs(
385                &self,
386                _session_id: &str,
387                _attachment_ids: &[$crate::AttachmentId],
388            ) -> ::std::result::Result<(), $crate::StoreError> {
389                Ok(())
390            }
391
392            fn list_uncommitted(
393                &self,
394                _older_than_epoch_ms: u64,
395            ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
396            {
397                Ok(Vec::new())
398            }
399
400            fn forget(
401                &self,
402                _attachment_id: &$crate::AttachmentId,
403            ) -> ::std::result::Result<(), $crate::StoreError> {
404                Ok(())
405            }
406        }
407    };
408}
409
410#[macro_export]
411macro_rules! impl_unsupported_queued_work_methods {
412    () => {
413        fn enqueue_queued_work<'life0, 'async_trait>(
414            &'life0 self,
415            _batch: $crate::runtime::QueuedWorkBatchDraft,
416        ) -> ::core::pin::Pin<
417            Box<
418                dyn ::core::future::Future<
419                        Output = ::std::result::Result<
420                            $crate::runtime::QueuedWorkBatch,
421                            $crate::store::StoreError,
422                        >,
423                    > + Send
424                    + 'async_trait,
425            >,
426        >
427        where
428            'life0: 'async_trait,
429            Self: 'async_trait,
430        {
431            Box::pin(async move {
432                Err($crate::store::StoreError::Backend(
433                    "queued work is not supported by this test store".to_string(),
434                ))
435            })
436        }
437
438        fn claim_ready_queued_work<'life0, 'life1, 'life2, 'async_trait>(
439            &'life0 self,
440            session_id: &'life1 str,
441            _owner_id: &'life2 str,
442            _boundary: $crate::runtime::QueuedWorkClaimBoundary,
443            _lease_ttl_ms: u64,
444            _max_batches: usize,
445        ) -> ::core::pin::Pin<
446            Box<
447                dyn ::core::future::Future<
448                        Output = ::std::result::Result<
449                            Option<$crate::runtime::QueuedWorkClaim>,
450                            $crate::store::StoreError,
451                        >,
452                    > + Send
453                    + 'async_trait,
454            >,
455        >
456        where
457            'life0: 'async_trait,
458            'life1: 'async_trait,
459            'life2: 'async_trait,
460            Self: 'async_trait,
461        {
462            Box::pin(async move {
463                Err($crate::store::StoreError::Backend(format!(
464                    "queued work is not supported for session `{session_id}` by this test store"
465                )))
466            })
467        }
468
469        fn renew_queued_work_claim<'life0, 'life1, 'async_trait>(
470            &'life0 self,
471            claim: &'life1 $crate::runtime::QueuedWorkClaim,
472            _lease_ttl_ms: u64,
473        ) -> ::core::pin::Pin<
474            Box<
475                dyn ::core::future::Future<
476                        Output = ::std::result::Result<
477                            $crate::runtime::QueuedWorkClaim,
478                            $crate::store::StoreError,
479                        >,
480                    > + Send
481                    + 'async_trait,
482            >,
483        >
484        where
485            'life0: 'async_trait,
486            'life1: 'async_trait,
487            Self: 'async_trait,
488        {
489            Box::pin(async move {
490                Err($crate::store::StoreError::QueuedWorkClaimExpired {
491                    session_id: claim.session_id.clone(),
492                    claim_id: claim.claim_id.clone(),
493                })
494            })
495        }
496
497        fn abandon_queued_work_claim<'life0, 'life1, 'async_trait>(
498            &'life0 self,
499            _claim: &'life1 $crate::runtime::QueuedWorkClaim,
500        ) -> ::core::pin::Pin<
501            Box<
502                dyn ::core::future::Future<
503                        Output = ::std::result::Result<(), $crate::store::StoreError>,
504                    > + Send
505                    + 'async_trait,
506            >,
507        >
508        where
509            'life0: 'async_trait,
510            'life1: 'async_trait,
511            Self: 'async_trait,
512        {
513            Box::pin(async move { Ok(()) })
514        }
515
516        fn cancel_queued_work_batch<'life0, 'life1, 'life2, 'async_trait>(
517            &'life0 self,
518            _session_id: &'life1 str,
519            _batch_id: &'life2 str,
520        ) -> ::core::pin::Pin<
521            Box<
522                dyn ::core::future::Future<
523                        Output = ::std::result::Result<
524                            Option<$crate::runtime::QueuedWorkBatch>,
525                            $crate::store::StoreError,
526                        >,
527                    > + Send
528                    + 'async_trait,
529            >,
530        >
531        where
532            'life0: 'async_trait,
533            'life1: 'async_trait,
534            'life2: 'async_trait,
535            Self: 'async_trait,
536        {
537            Box::pin(async move { Ok(None) })
538        }
539
540        fn list_queued_work<'life0, 'life1, 'async_trait>(
541            &'life0 self,
542            _session_id: &'life1 str,
543        ) -> ::core::pin::Pin<
544            Box<
545                dyn ::core::future::Future<
546                        Output = ::std::result::Result<
547                            Vec<$crate::runtime::QueuedWorkBatch>,
548                            $crate::store::StoreError,
549                        >,
550                    > + Send
551                    + 'async_trait,
552            >,
553        >
554        where
555            'life0: 'async_trait,
556            'life1: 'async_trait,
557            Self: 'async_trait,
558        {
559            Box::pin(async move { Ok(Vec::new()) })
560        }
561    };
562}
563
564/// Reject a persisted record whose `schema_version` does not match the
565/// version this binary supports. Backends call this immediately after
566/// deserializing a record from durable storage.
567pub fn ensure_supported_schema_version(
568    record_kind: &'static str,
569    actual: u32,
570    expected: u32,
571) -> Result<(), StoreError> {
572    if actual == expected {
573        Ok(())
574    } else {
575        Err(StoreError::UnsupportedRecordSchemaVersion {
576            record_kind,
577            actual,
578            expected,
579        })
580    }
581}
582
583#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
584pub struct RuntimeTurnCommitStamp {
585    pub session_id: String,
586    pub turn_id: String,
587    pub turn_commit_hash: String,
588}
589
590impl RuntimeTurnCommitStamp {
591    pub fn new(
592        session_id: impl Into<String>,
593        turn_id: impl Into<String>,
594        turn_commit_hash: impl Into<String>,
595    ) -> Self {
596        Self {
597            session_id: session_id.into(),
598            turn_id: turn_id.into(),
599            turn_commit_hash: turn_commit_hash.into(),
600        }
601    }
602}
603
604fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
605    crate::PersistedTurnState {
606        turn_index: state.turn_index,
607        token_usage: state.token_usage.clone(),
608        last_prompt_usage: state.last_prompt_usage.clone(),
609        protocol_turn_options: state.protocol_turn_options.clone(),
610    }
611}
612
613fn build_checkpoint_from_persisted_state(
614    state: &crate::RuntimeSessionState,
615) -> HydratedSessionCheckpoint {
616    HydratedSessionCheckpoint {
617        turn_state: build_persisted_turn_state(state),
618        tool_state_ref: state.tool_state_ref.clone(),
619        tool_state: state.tool_state_snapshot.clone(),
620        plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
621        plugin_snapshot_revision: state.plugin_snapshot_revision,
622        plugin_snapshot: state.plugin_snapshot.clone(),
623        execution_state_ref: state.execution_state_ref.clone(),
624        execution_state: state.execution_state_snapshot.clone(),
625    }
626}
627
628impl RuntimeCommit {
629    pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
630        let mut semantic_commit = self.clone();
631        semantic_commit.expected_head_revision = None;
632        semantic_commit.turn_commit = None;
633        let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
634            StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
635        })?;
636        scrub_turn_commit_hash_value(&mut semantic_commit);
637        crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
638            StoreError::Backend(format!(
639                "failed to serialize runtime turn commit hash: {err}"
640            ))
641        })
642    }
643
644    pub fn persisted_state(
645        state: &crate::RuntimeSessionState,
646        usage_deltas: &[crate::TokenLedgerEntry],
647    ) -> Self {
648        Self {
649            session_id: state.session_id.clone(),
650            expected_head_revision: state.head_revision,
651            config: persisted_session_config_from_state(state),
652            agent_frames: state.agent_frames.clone(),
653            current_agent_frame_id: state.current_agent_frame_id.clone(),
654            graph: if state.graph_replace_required || state.head_revision.is_none() {
655                GraphCommitDelta::ReplaceFull(state.session_graph.clone())
656            } else {
657                GraphCommitDelta::Unchanged {
658                    leaf_node_id: state.session_graph.leaf_node_id.clone(),
659                }
660            },
661            checkpoint: build_checkpoint_from_persisted_state(state),
662            usage_deltas: usage_deltas.to_vec(),
663            turn_commit: None,
664            completed_queue_claims: Vec::new(),
665            committed_attachment_ids: Vec::new(),
666        }
667    }
668
669    pub(crate) fn persisted_state_with_graph_commit(
670        state: &crate::RuntimeSessionState,
671        graph: GraphCommitDelta,
672        usage_deltas: &[crate::TokenLedgerEntry],
673    ) -> Self {
674        Self {
675            session_id: state.session_id.clone(),
676            expected_head_revision: state.head_revision,
677            config: persisted_session_config_from_state(state),
678            agent_frames: state.agent_frames.clone(),
679            current_agent_frame_id: state.current_agent_frame_id.clone(),
680            graph,
681            checkpoint: build_checkpoint_from_persisted_state(state),
682            usage_deltas: usage_deltas.to_vec(),
683            turn_commit: None,
684            completed_queue_claims: Vec::new(),
685            committed_attachment_ids: Vec::new(),
686        }
687    }
688
689    pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
690        self.turn_commit = Some(turn_commit);
691        self
692    }
693
694    pub fn completing_queue_claim(
695        mut self,
696        completed_queue_claim: crate::QueuedWorkCompletion,
697    ) -> Self {
698        self.completed_queue_claims.push(completed_queue_claim);
699        self
700    }
701
702    pub fn completing_queue_claims(
703        mut self,
704        completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
705    ) -> Self {
706        self.completed_queue_claims.extend(completed_queue_claims);
707        self
708    }
709
710    pub fn with_committed_attachments(
711        mut self,
712        attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
713    ) -> Self {
714        self.committed_attachment_ids = attachment_ids.into_iter().collect();
715        self
716    }
717}
718
719fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
720    match value {
721        serde_json::Value::Object(map) => {
722            let is_message = map.contains_key("role") && map.contains_key("parts");
723            let is_message_part = map.contains_key("kind")
724                && map.contains_key("content")
725                && map.contains_key("prune_state");
726            if is_message || is_message_part {
727                map.remove("id");
728            }
729            for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
730                map.remove(volatile_key);
731            }
732            for child in map.values_mut() {
733                scrub_turn_commit_hash_value(child);
734            }
735        }
736        serde_json::Value::Array(items) => {
737            for item in items {
738                scrub_turn_commit_hash_value(item);
739            }
740        }
741        _ => {}
742    }
743}
744
745fn persisted_session_state_from_head(
746    head: SessionHead,
747    checkpoint: Option<HydratedSessionCheckpoint>,
748) -> crate::RuntimeSessionState {
749    let mut state = crate::RuntimeSessionState {
750        session_id: head.session_id,
751        policy: crate::SessionPolicy::default(),
752        agent_frames: head.agent_frames,
753        current_agent_frame_id: head.current_agent_frame_id,
754        session_graph: head.graph,
755        turn_index: 0,
756        token_usage: crate::TokenUsage::default(),
757        last_prompt_usage: None,
758        protocol_turn_options: crate::ProtocolTurnOptions::default(),
759        tool_state_ref: None,
760        tool_state_generation: None,
761        tool_state_snapshot: None,
762        plugin_snapshot_ref: None,
763        plugin_snapshot_revision: None,
764        plugin_snapshot: None,
765        execution_state_ref: None,
766        execution_state_snapshot: None,
767        token_ledger: head.token_ledger,
768        checkpoint_ref: head.checkpoint_ref.clone(),
769        head_revision: Some(head.head_revision),
770        graph_replace_required: false,
771    };
772    state.policy.model = head.config.model.clone();
773    state.policy.provider_id = head.config.provider_id.clone();
774    if let Some(checkpoint) = checkpoint {
775        state.turn_index = checkpoint.turn_state.turn_index;
776        state.token_usage = checkpoint.turn_state.token_usage;
777        state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
778        state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
779        state.tool_state_ref = checkpoint.tool_state_ref.clone();
780        state.tool_state_generation = checkpoint
781            .tool_state
782            .as_ref()
783            .map(|snapshot| snapshot.generation());
784        state.tool_state_snapshot = checkpoint.tool_state;
785        state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
786        state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
787        state.plugin_snapshot = checkpoint.plugin_snapshot;
788        state.execution_state_ref = checkpoint.execution_state_ref.clone();
789        state.execution_state_snapshot = checkpoint.execution_state;
790    }
791    state.ensure_agent_frame_initialized();
792    state
793}
794
795impl Default for SessionHead {
796    fn default() -> Self {
797        Self {
798            session_id: default_root_session_id(),
799            head_revision: 0,
800            agent_frames: Vec::new(),
801            current_agent_frame_id: String::new(),
802            graph: crate::SessionGraph::default(),
803            config: crate::PersistedSessionConfig::default(),
804            checkpoint_ref: None,
805            token_ledger: Vec::new(),
806        }
807    }
808}
809
810impl Default for SessionHeadMeta {
811    fn default() -> Self {
812        Self {
813            session_id: default_root_session_id(),
814            head_revision: 0,
815            config: crate::PersistedSessionConfig::default(),
816            agent_frames: Vec::new(),
817            current_agent_frame_id: String::new(),
818            checkpoint_ref: None,
819            leaf_node_id: None,
820            graph_node_count: 0,
821            token_ledger: Vec::new(),
822        }
823    }
824}
825
826/// Exact settled-session persistence protocol required by the runtime.
827///
828/// This is the runtime's atomic transaction facade for visible session state:
829/// session graph/head commits, queued-work ingress and completion, final
830/// turn-commit idempotency, metadata, usage, and the attachment write-ahead
831/// manifest. In-flight nondeterministic work belongs to the active
832/// [`EffectHost`](crate::EffectHost), not to the store contract.
833///
834/// The [`AttachmentManifest`] supertrait is required so the runtime can wrap
835/// any persistence backend with a
836/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
837/// without dual-trait casting. Backends with no attachment-write story can
838/// implement the manifest methods as no-ops via
839/// [`NoopAttachmentManifest`]'s blanket helpers.
840#[async_trait::async_trait]
841pub trait RuntimePersistence: AttachmentManifest + Send + Sync {
842    /// Durability tier this session store provides; defaults to
843    /// [`DurabilityTier::Inline`].
844    fn durability_tier(&self) -> crate::DurabilityTier {
845        crate::DurabilityTier::Inline
846    }
847
848    async fn load_session(
849        &self,
850        scope: SessionReadScope,
851    ) -> Result<Option<PersistedSessionRead>, StoreError>;
852
853    async fn load_node(
854        &self,
855        node_id: &str,
856    ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
857
858    async fn commit_runtime_state(
859        &self,
860        commit: RuntimeCommit,
861    ) -> Result<RuntimeCommitResult, StoreError>;
862
863    async fn enqueue_queued_work(
864        &self,
865        batch: crate::QueuedWorkBatchDraft,
866    ) -> Result<crate::QueuedWorkBatch, StoreError>;
867
868    async fn claim_ready_queued_work(
869        &self,
870        session_id: &str,
871        owner_id: &str,
872        boundary: crate::QueuedWorkClaimBoundary,
873        lease_ttl_ms: u64,
874        max_batches: usize,
875    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError>;
876
877    /// Claim a specific ready batch set selected from the durable queue.
878    ///
879    /// This is the host-facing counterpart to [`claim_ready_queued_work`]:
880    /// callers that project queued work into a UI can claim the exact batch ids
881    /// they rendered instead of reconstructing authority from local draft state.
882    /// The default implementation preserves the ordered queue contract by
883    /// claiming the next ready group and returning it only when the durable ids
884    /// match exactly.
885    async fn claim_ready_queued_work_by_batch_ids(
886        &self,
887        session_id: &str,
888        owner_id: &str,
889        boundary: crate::QueuedWorkClaimBoundary,
890        lease_ttl_ms: u64,
891        batch_ids: &[String],
892    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
893        if batch_ids.is_empty() {
894            return Ok(None);
895        }
896        let Some(claim) = self
897            .claim_ready_queued_work(
898                session_id,
899                owner_id,
900                boundary,
901                lease_ttl_ms,
902                batch_ids.len(),
903            )
904            .await?
905        else {
906            return Ok(None);
907        };
908        let claimed_ids = claim
909            .batches
910            .iter()
911            .map(|batch| batch.batch_id.as_str())
912            .collect::<Vec<_>>();
913        if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
914            return Ok(Some(claim));
915        }
916        self.abandon_queued_work_claim(&claim).await?;
917        Ok(None)
918    }
919
920    async fn renew_queued_work_claim(
921        &self,
922        claim: &crate::QueuedWorkClaim,
923        lease_ttl_ms: u64,
924    ) -> Result<crate::QueuedWorkClaim, StoreError>;
925
926    async fn abandon_queued_work_claim(
927        &self,
928        claim: &crate::QueuedWorkClaim,
929    ) -> Result<(), StoreError>;
930
931    /// Remove an unclaimed queued-work batch from durable ingress.
932    ///
933    /// Returns the removed batch when cancellation won the race. Returns `None`
934    /// when the batch is missing or currently held by a live claim; callers must
935    /// treat that as "already claimed or completed" and must not restore any
936    /// stale local draft state.
937    async fn cancel_queued_work_batch(
938        &self,
939        session_id: &str,
940        batch_id: &str,
941    ) -> Result<Option<crate::QueuedWorkBatch>, StoreError>;
942
943    async fn list_queued_work(
944        &self,
945        session_id: &str,
946    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError>;
947
948    /// List queued-work batches that are still pending presentation/editing.
949    ///
950    /// This excludes batches currently held by a live claim. Expired claims are
951    /// considered pending again because they can be reclaimed or cancelled.
952    async fn list_pending_queued_work(
953        &self,
954        session_id: &str,
955    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
956        self.list_queued_work(session_id).await
957    }
958
959    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
960    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
961
962    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
963    async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
964    async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
965}
966
967fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
968    persisted_session_state_from_head(
969        SessionHead {
970            session_id: read.session_id,
971            head_revision: read.head_revision,
972            agent_frames: read.agent_frames,
973            current_agent_frame_id: read.current_agent_frame_id,
974            graph: read.graph,
975            config: read.config,
976            checkpoint_ref: read.checkpoint_ref,
977            token_ledger: read.token_ledger,
978        },
979        read.checkpoint,
980    )
981}
982
983pub async fn load_persisted_session_state(
984    store: &(dyn RuntimePersistence + '_),
985) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
986    Ok(store
987        .load_session(SessionReadScope::FullGraph)
988        .await?
989        .map(persisted_session_state_from_read))
990}
991
992pub async fn load_persisted_session_state_active_path(
993    store: &(dyn RuntimePersistence + '_),
994    leaf_node_id: Option<String>,
995) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
996    Ok(store
997        .load_session(SessionReadScope::ActivePath { leaf_node_id })
998        .await?
999        .map(persisted_session_state_from_read))
1000}
1001
1002pub async fn refresh_persisted_session_state(
1003    store: &(dyn RuntimePersistence + '_),
1004    state: &mut crate::RuntimeSessionState,
1005) -> Result<(), StoreError> {
1006    if let Some(mut fresh) = load_persisted_session_state(store).await? {
1007        fresh.policy.session_id = state.policy.session_id.clone();
1008        fresh.policy.max_turns = state.policy.max_turns;
1009        *state = fresh;
1010    }
1011    Ok(())
1012}