Skip to main content

lash_core/store/
mod.rs

1//! The runtime's settled-session persistence contract and shared store types.
2
3pub mod queued_work;
4
5fn default_root_session_id() -> String {
6    "root".to_string()
7}
8
9#[cfg(test)]
10mod persisted_state_tests {
11    use super::*;
12
13    #[test]
14    fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
15        let state = persisted_session_state_from_head(
16            SessionHead {
17                session_id: "stored".to_string(),
18                head_revision: 7,
19                agent_frames: Vec::new(),
20                current_agent_frame_id: String::new(),
21                graph: crate::SessionGraph::default(),
22                config: crate::PersistedSessionConfig {
23                    provider_id: "stored-provider".to_string(),
24                    model: crate::ModelSpec::default(),
25                },
26                checkpoint_ref: None,
27                token_ledger: Vec::new(),
28            },
29            None,
30        );
31
32        assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
33        assert!(
34            state
35                .agent_frames
36                .iter()
37                .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
38        );
39        assert_eq!(state.head_revision, Some(7));
40    }
41}
42
43#[derive(Debug, thiserror::Error)]
44pub enum StoreError {
45    #[error(
46        "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
47    )]
48    SessionBindingMismatch {
49        bound_session_id: String,
50        attempted_session_id: String,
51    },
52    #[error("store does not support read scope {0:?}")]
53    UnsupportedReadScope(SessionReadScope),
54    #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
55    HeadRevisionConflict { expected: Option<u64>, actual: u64 },
56    #[error(
57        "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
58    )]
59    RuntimeTurnCommitConflict { session_id: String, turn_id: String },
60    #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
61    QueuedWorkClaimExpired {
62        session_id: String,
63        claim_id: String,
64    },
65    #[error(
66        "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
67    )]
68    UnsupportedRecordSchemaVersion {
69        record_kind: &'static str,
70        actual: u32,
71        expected: u32,
72    },
73    #[error("store backend error: {0}")]
74    Backend(String),
75}
76
77#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
78pub struct SessionMeta {
79    pub session_id: String,
80    pub session_name: String,
81    pub created_at: String,
82    pub model: String,
83    pub cwd: Option<String>,
84    pub relation: crate::SessionRelation,
85}
86
87impl SessionMeta {
88    /// Returns the parent session id, if any, derived from the canonical
89    /// [`SessionRelation`] field.
90    pub fn parent_session_id(&self) -> Option<&str> {
91        self.relation.parent_session_id()
92    }
93}
94
95/// Lightweight session info for the resume picker.
96#[derive(Clone, Debug)]
97pub struct SessionPickerInfo {
98    pub session_id: String,
99    pub cwd: Option<String>,
100    pub relation: crate::SessionRelation,
101    pub first_user_message: String,
102    pub user_message_count: usize,
103}
104
105impl SessionPickerInfo {
106    pub fn parent_session_id(&self) -> Option<&str> {
107        self.relation.parent_session_id()
108    }
109}
110
111#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
112#[serde(transparent)]
113pub struct BlobRef(pub String);
114
115impl BlobRef {
116    pub fn as_str(&self) -> &str {
117        &self.0
118    }
119}
120
121impl std::fmt::Display for BlobRef {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        f.write_str(&self.0)
124    }
125}
126
127impl From<String> for BlobRef {
128    fn from(value: String) -> Self {
129        Self(value)
130    }
131}
132
133#[derive(Clone, Debug, Default, PartialEq, Eq)]
134pub struct GcReport {
135    pub root_count: usize,
136    pub retained_blob_count: usize,
137    pub deleted_blob_count: usize,
138}
139
140/// Result of a `RuntimePersistence::vacuum()` call.
141/// `removed_node_count` counts the tombstoned graph-node rows that were
142/// physically deleted from the store. Returned so hosts can emit metrics.
143#[derive(Clone, Debug, Default, PartialEq, Eq)]
144pub struct VacuumReport {
145    pub removed_node_count: usize,
146}
147
148#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
149pub struct SessionCheckpoint {
150    #[serde(default)]
151    pub turn_state: crate::PersistedTurnState,
152    #[serde(default, skip_serializing_if = "Option::is_none")]
153    pub tool_state_ref: Option<BlobRef>,
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    pub plugin_snapshot_ref: Option<BlobRef>,
156    #[serde(default, skip_serializing_if = "Option::is_none")]
157    pub plugin_snapshot_revision: Option<u64>,
158    #[serde(default, skip_serializing_if = "Option::is_none")]
159    pub execution_state_ref: Option<BlobRef>,
160}
161
162#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
163pub struct HydratedSessionCheckpoint {
164    pub turn_state: crate::PersistedTurnState,
165    pub tool_state_ref: Option<BlobRef>,
166    pub tool_state: Option<crate::ToolState>,
167    pub plugin_snapshot_ref: Option<BlobRef>,
168    pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
169    pub plugin_snapshot_revision: Option<u64>,
170    pub execution_state_ref: Option<BlobRef>,
171    pub execution_state: Option<Vec<u8>>,
172}
173
174#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
175pub struct SessionHead {
176    #[serde(default = "default_root_session_id")]
177    pub session_id: String,
178    #[serde(default)]
179    pub head_revision: u64,
180    #[serde(default)]
181    pub agent_frames: Vec<crate::AgentFrameRecord>,
182    #[serde(default, skip_serializing_if = "String::is_empty")]
183    pub current_agent_frame_id: crate::AgentFrameId,
184    pub graph: crate::SessionGraph,
185    pub config: crate::PersistedSessionConfig,
186    #[serde(default, skip_serializing_if = "Option::is_none")]
187    pub checkpoint_ref: Option<BlobRef>,
188    #[serde(default, skip_serializing_if = "Vec::is_empty")]
189    pub token_ledger: Vec<crate::TokenLedgerEntry>,
190}
191
192#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
193pub struct SessionHeadMeta {
194    #[serde(default = "default_root_session_id")]
195    pub session_id: String,
196    #[serde(default)]
197    pub head_revision: u64,
198    pub config: crate::PersistedSessionConfig,
199    #[serde(default)]
200    pub agent_frames: Vec<crate::AgentFrameRecord>,
201    #[serde(default, skip_serializing_if = "String::is_empty")]
202    pub current_agent_frame_id: crate::AgentFrameId,
203    #[serde(default, skip_serializing_if = "Option::is_none")]
204    pub checkpoint_ref: Option<BlobRef>,
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    pub leaf_node_id: Option<String>,
207    #[serde(default)]
208    pub graph_node_count: usize,
209    #[serde(default, skip_serializing_if = "Vec::is_empty")]
210    pub token_ledger: Vec<crate::TokenLedgerEntry>,
211}
212
213fn persisted_session_config_from_state(
214    state: &crate::RuntimeSessionState,
215) -> crate::PersistedSessionConfig {
216    crate::PersistedSessionConfig {
217        provider_id: state.policy.recorded_provider_id().to_string(),
218        model: state.policy.model.clone(),
219    }
220}
221
222#[derive(Clone, Debug, PartialEq, Eq)]
223pub enum SessionReadScope {
224    FullGraph,
225    ActivePath { leaf_node_id: Option<String> },
226}
227
228#[derive(Clone, Debug)]
229pub struct PersistedSessionRead {
230    pub session_id: String,
231    pub head_revision: u64,
232    pub config: crate::PersistedSessionConfig,
233    pub agent_frames: Vec<crate::AgentFrameRecord>,
234    pub current_agent_frame_id: crate::AgentFrameId,
235    pub graph: crate::SessionGraph,
236    pub checkpoint_ref: Option<BlobRef>,
237    pub checkpoint: Option<HydratedSessionCheckpoint>,
238    pub token_ledger: Vec<crate::TokenLedgerEntry>,
239}
240
241#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
242pub enum GraphCommitDelta {
243    Unchanged {
244        leaf_node_id: Option<String>,
245    },
246    Append {
247        nodes: Vec<crate::SessionNodeRecord>,
248        leaf_node_id: Option<String>,
249    },
250    ReplaceFull(crate::SessionGraph),
251}
252
253impl GraphCommitDelta {
254    pub fn leaf_node_id(&self) -> Option<&String> {
255        match self {
256            Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
257                leaf_node_id.as_ref()
258            }
259            Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
260        }
261    }
262}
263
264#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
265pub struct RuntimeCommit {
266    pub session_id: String,
267    pub expected_head_revision: Option<u64>,
268    pub config: crate::PersistedSessionConfig,
269    pub agent_frames: Vec<crate::AgentFrameRecord>,
270    pub current_agent_frame_id: crate::AgentFrameId,
271    pub graph: GraphCommitDelta,
272    pub checkpoint: HydratedSessionCheckpoint,
273    pub usage_deltas: Vec<crate::TokenLedgerEntry>,
274    pub turn_commit: Option<RuntimeTurnCommitStamp>,
275    pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
276    /// Attachment ids whose bytes are referenced by this commit and
277    /// should be stamped `committed` in the write-ahead manifest as
278    /// part of the same SQL transaction. The backend marks each id
279    /// committed via [`AttachmentManifest::commit_refs`] before the
280    /// commit returns success. Hosts populate this from the
281    /// attachments emitted by tool calls and inline LLM-request
282    /// attachments produced during the turn.
283    pub committed_attachment_ids: Vec<crate::AttachmentId>,
284}
285
286#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
287pub struct RuntimeCommitResult {
288    pub head_revision: u64,
289    pub checkpoint_ref: BlobRef,
290    pub manifest: SessionCheckpoint,
291}
292
293// =============================================================================
294// Attachment write-ahead manifest
295// =============================================================================
296
297/// A pending attachment write recorded *before* the bytes hit the
298/// [`AttachmentStore`](crate::AttachmentStore) backend.
299///
300/// The runtime calls [`AttachmentManifest::record_intent`] from the
301/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
302/// wrapper before each `put`, so the manifest is a durable record that
303/// "some bytes are about to land at this URI." When the turn that
304/// references the attachment commits successfully via
305/// [`RuntimePersistence::commit_runtime_state`], the same transaction
306/// stamps `committed_at_epoch_ms`. Periodic GC sweeps manifest rows
307/// whose intent has aged past a host-chosen threshold without ever
308/// being committed and deletes the corresponding bytes — that's how we
309/// reconcile orphaned files left behind by crashes between `put` and
310/// the next turn commit.
311#[derive(Clone, Debug)]
312pub struct AttachmentIntent {
313    pub attachment_id: crate::AttachmentId,
314    pub session_id: String,
315    /// Canonical URI for the attachment payload in the backing store.
316    /// For file-backed stores this is the absolute on-disk path; for
317    /// blob-backed stores it can be any stable identifier the host
318    /// uses to clean the payload up.
319    pub canonical_uri: String,
320    pub intent_at_epoch_ms: u64,
321}
322
323#[derive(Clone, Debug)]
324pub struct AttachmentManifestEntry {
325    pub attachment_id: crate::AttachmentId,
326    pub session_id: String,
327    pub canonical_uri: String,
328    pub intent_at_epoch_ms: u64,
329    pub committed_at_epoch_ms: Option<u64>,
330}
331
332/// Trait alias for the synchronous attachment-manifest surface on
333/// [`RuntimePersistence`]. Used by
334/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
335/// to record intent rows before `put` and by GC sweeps to reconcile
336/// orphans. See the [`AttachmentIntent`] doc comment for the full
337/// crash-safety story.
338///
339/// Backends with no attachment story (in-memory tests, mock stores)
340/// inherit the default no-op impls on [`RuntimePersistence`] and
341/// participate transparently — `record_intent` is a no-op, the
342/// scoped wrapper still works, and GC sweeps return empty.
343pub trait AttachmentManifest: Send + Sync {
344    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
345
346    /// Mark a set of attachment ids as committed (i.e. now referenced
347    /// by a durable session-graph commit). Backends that store
348    /// commits and manifest in the same database stamp this inside
349    /// the commit transaction; the trait-level method is the
350    /// out-of-band entry point for hosts that want to commit an id
351    /// outside the normal turn-commit flow.
352    fn commit_refs(
353        &self,
354        session_id: &str,
355        attachment_ids: &[crate::AttachmentId],
356    ) -> Result<(), StoreError>;
357
358    /// Return manifest entries whose intent has aged past
359    /// `older_than_epoch_ms` without ever being committed. Hosts run
360    /// this periodically to find orphans left by crashes between
361    /// `record_intent` and the next turn commit.
362    fn list_uncommitted(
363        &self,
364        older_than_epoch_ms: u64,
365    ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
366
367    /// Remove a manifest row entirely. Called by the GC coordinator
368    /// after the corresponding bytes have been removed from the
369    /// backing [`AttachmentStore`](crate::AttachmentStore).
370    fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
371}
372
373/// Mixin macro for [`RuntimePersistence`] implementors that have no
374/// attachment-write story (mock backends, in-memory test stores,
375/// runtime-perf harnesses). Pastes no-op impls of every
376/// [`AttachmentManifest`] method.
377#[macro_export]
378macro_rules! impl_noop_attachment_manifest {
379    ($ty:ty) => {
380        impl $crate::AttachmentManifest for $ty {
381            fn record_intent(
382                &self,
383                _intent: $crate::AttachmentIntent,
384            ) -> ::std::result::Result<(), $crate::StoreError> {
385                Ok(())
386            }
387
388            fn commit_refs(
389                &self,
390                _session_id: &str,
391                _attachment_ids: &[$crate::AttachmentId],
392            ) -> ::std::result::Result<(), $crate::StoreError> {
393                Ok(())
394            }
395
396            fn list_uncommitted(
397                &self,
398                _older_than_epoch_ms: u64,
399            ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
400            {
401                Ok(Vec::new())
402            }
403
404            fn forget(
405                &self,
406                _attachment_id: &$crate::AttachmentId,
407            ) -> ::std::result::Result<(), $crate::StoreError> {
408                Ok(())
409            }
410        }
411    };
412}
413
414/// Reject a persisted record whose `schema_version` does not match the
415/// version this binary supports. Backends call this immediately after
416/// deserializing a record from durable storage.
417pub fn ensure_supported_schema_version(
418    record_kind: &'static str,
419    actual: u32,
420    expected: u32,
421) -> Result<(), StoreError> {
422    if actual == expected {
423        Ok(())
424    } else {
425        Err(StoreError::UnsupportedRecordSchemaVersion {
426            record_kind,
427            actual,
428            expected,
429        })
430    }
431}
432
433#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
434pub struct RuntimeTurnCommitStamp {
435    pub session_id: String,
436    pub turn_id: String,
437    pub turn_commit_hash: String,
438}
439
440impl RuntimeTurnCommitStamp {
441    pub fn new(
442        session_id: impl Into<String>,
443        turn_id: impl Into<String>,
444        turn_commit_hash: impl Into<String>,
445    ) -> Self {
446        Self {
447            session_id: session_id.into(),
448            turn_id: turn_id.into(),
449            turn_commit_hash: turn_commit_hash.into(),
450        }
451    }
452}
453
454fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
455    crate::PersistedTurnState {
456        turn_index: state.turn_index,
457        token_usage: state.token_usage.clone(),
458        last_prompt_usage: state.last_prompt_usage.clone(),
459        protocol_turn_options: state.protocol_turn_options.clone(),
460    }
461}
462
463fn build_checkpoint_from_persisted_state(
464    state: &crate::RuntimeSessionState,
465) -> HydratedSessionCheckpoint {
466    HydratedSessionCheckpoint {
467        turn_state: build_persisted_turn_state(state),
468        tool_state_ref: state.tool_state_ref.clone(),
469        tool_state: state.tool_state_snapshot.clone(),
470        plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
471        plugin_snapshot_revision: state.plugin_snapshot_revision,
472        plugin_snapshot: state.plugin_snapshot.clone(),
473        execution_state_ref: state.execution_state_ref.clone(),
474        execution_state: state.execution_state_snapshot.clone(),
475    }
476}
477
478impl RuntimeCommit {
479    pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
480        let mut semantic_commit = self.clone();
481        semantic_commit.expected_head_revision = None;
482        semantic_commit.turn_commit = None;
483        let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
484            StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
485        })?;
486        scrub_turn_commit_hash_value(&mut semantic_commit);
487        crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
488            StoreError::Backend(format!(
489                "failed to serialize runtime turn commit hash: {err}"
490            ))
491        })
492    }
493
494    pub fn persisted_state(
495        state: &crate::RuntimeSessionState,
496        usage_deltas: &[crate::TokenLedgerEntry],
497    ) -> Self {
498        Self {
499            session_id: state.session_id.clone(),
500            expected_head_revision: state.head_revision,
501            config: persisted_session_config_from_state(state),
502            agent_frames: state.agent_frames.clone(),
503            current_agent_frame_id: state.current_agent_frame_id.clone(),
504            graph: if state.graph_replace_required || state.head_revision.is_none() {
505                GraphCommitDelta::ReplaceFull(state.session_graph.clone())
506            } else {
507                GraphCommitDelta::Unchanged {
508                    leaf_node_id: state.session_graph.leaf_node_id.clone(),
509                }
510            },
511            checkpoint: build_checkpoint_from_persisted_state(state),
512            usage_deltas: usage_deltas.to_vec(),
513            turn_commit: None,
514            completed_queue_claims: Vec::new(),
515            committed_attachment_ids: Vec::new(),
516        }
517    }
518
519    pub(crate) fn persisted_state_with_graph_commit(
520        state: &crate::RuntimeSessionState,
521        graph: GraphCommitDelta,
522        usage_deltas: &[crate::TokenLedgerEntry],
523    ) -> Self {
524        Self {
525            session_id: state.session_id.clone(),
526            expected_head_revision: state.head_revision,
527            config: persisted_session_config_from_state(state),
528            agent_frames: state.agent_frames.clone(),
529            current_agent_frame_id: state.current_agent_frame_id.clone(),
530            graph,
531            checkpoint: build_checkpoint_from_persisted_state(state),
532            usage_deltas: usage_deltas.to_vec(),
533            turn_commit: None,
534            completed_queue_claims: Vec::new(),
535            committed_attachment_ids: Vec::new(),
536        }
537    }
538
539    pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
540        self.turn_commit = Some(turn_commit);
541        self
542    }
543
544    pub fn completing_queue_claim(
545        mut self,
546        completed_queue_claim: crate::QueuedWorkCompletion,
547    ) -> Self {
548        self.completed_queue_claims.push(completed_queue_claim);
549        self
550    }
551
552    pub fn completing_queue_claims(
553        mut self,
554        completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
555    ) -> Self {
556        self.completed_queue_claims.extend(completed_queue_claims);
557        self
558    }
559
560    pub fn with_committed_attachments(
561        mut self,
562        attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
563    ) -> Self {
564        self.committed_attachment_ids = attachment_ids.into_iter().collect();
565        self
566    }
567}
568
569fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
570    match value {
571        serde_json::Value::Object(map) => {
572            let is_message = map.contains_key("role") && map.contains_key("parts");
573            let is_message_part = map.contains_key("kind")
574                && map.contains_key("content")
575                && map.contains_key("prune_state");
576            if is_message || is_message_part {
577                map.remove("id");
578            }
579            for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
580                map.remove(volatile_key);
581            }
582            for child in map.values_mut() {
583                scrub_turn_commit_hash_value(child);
584            }
585        }
586        serde_json::Value::Array(items) => {
587            for item in items {
588                scrub_turn_commit_hash_value(item);
589            }
590        }
591        _ => {}
592    }
593}
594
595fn persisted_session_state_from_head(
596    head: SessionHead,
597    checkpoint: Option<HydratedSessionCheckpoint>,
598) -> crate::RuntimeSessionState {
599    let mut state = crate::RuntimeSessionState {
600        session_id: head.session_id,
601        policy: crate::SessionPolicy::default(),
602        agent_frames: head.agent_frames,
603        current_agent_frame_id: head.current_agent_frame_id,
604        session_graph: head.graph,
605        turn_index: 0,
606        token_usage: crate::TokenUsage::default(),
607        last_prompt_usage: None,
608        protocol_turn_options: crate::ProtocolTurnOptions::default(),
609        tool_state_ref: None,
610        tool_state_generation: None,
611        tool_state_snapshot: None,
612        plugin_snapshot_ref: None,
613        plugin_snapshot_revision: None,
614        plugin_snapshot: None,
615        execution_state_ref: None,
616        execution_state_snapshot: None,
617        token_ledger: head.token_ledger,
618        checkpoint_ref: head.checkpoint_ref.clone(),
619        head_revision: Some(head.head_revision),
620        graph_replace_required: false,
621    };
622    state.policy.model = head.config.model.clone();
623    state.policy.provider_id = head.config.provider_id.clone();
624    if let Some(checkpoint) = checkpoint {
625        state.turn_index = checkpoint.turn_state.turn_index;
626        state.token_usage = checkpoint.turn_state.token_usage;
627        state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
628        state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
629        state.tool_state_ref = checkpoint.tool_state_ref.clone();
630        state.tool_state_generation = checkpoint
631            .tool_state
632            .as_ref()
633            .map(|snapshot| snapshot.generation());
634        state.tool_state_snapshot = checkpoint.tool_state;
635        state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
636        state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
637        state.plugin_snapshot = checkpoint.plugin_snapshot;
638        state.execution_state_ref = checkpoint.execution_state_ref.clone();
639        state.execution_state_snapshot = checkpoint.execution_state;
640    }
641    state.ensure_agent_frame_initialized();
642    state
643}
644
645impl Default for SessionHead {
646    fn default() -> Self {
647        Self {
648            session_id: default_root_session_id(),
649            head_revision: 0,
650            agent_frames: Vec::new(),
651            current_agent_frame_id: String::new(),
652            graph: crate::SessionGraph::default(),
653            config: crate::PersistedSessionConfig::default(),
654            checkpoint_ref: None,
655            token_ledger: Vec::new(),
656        }
657    }
658}
659
660impl Default for SessionHeadMeta {
661    fn default() -> Self {
662        Self {
663            session_id: default_root_session_id(),
664            head_revision: 0,
665            config: crate::PersistedSessionConfig::default(),
666            agent_frames: Vec::new(),
667            current_agent_frame_id: String::new(),
668            checkpoint_ref: None,
669            leaf_node_id: None,
670            graph_node_count: 0,
671            token_ledger: Vec::new(),
672        }
673    }
674}
675
676/// Exact settled-session persistence protocol required by the runtime.
677///
678/// This is the runtime's atomic transaction facade for visible session state:
679/// session graph/head commits, queued-work ingress and completion, final
680/// turn-commit idempotency, metadata, usage, and the attachment write-ahead
681/// manifest. In-flight nondeterministic work belongs to the active
682/// [`EffectHost`](crate::EffectHost), not to the store contract.
683///
684/// The [`AttachmentManifest`] supertrait is required so the runtime can wrap
685/// any persistence backend with a
686/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
687/// without dual-trait casting. Backends with no attachment-write story can
688/// implement the manifest methods as no-ops via
689/// [`NoopAttachmentManifest`]'s blanket helpers.
690#[async_trait::async_trait]
691pub trait RuntimePersistence: AttachmentManifest + Send + Sync {
692    /// Durability tier this session store provides; defaults to
693    /// [`DurabilityTier::Inline`].
694    fn durability_tier(&self) -> crate::DurabilityTier {
695        crate::DurabilityTier::Inline
696    }
697
698    async fn load_session(
699        &self,
700        scope: SessionReadScope,
701    ) -> Result<Option<PersistedSessionRead>, StoreError>;
702
703    async fn load_node(
704        &self,
705        node_id: &str,
706    ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
707
708    async fn commit_runtime_state(
709        &self,
710        commit: RuntimeCommit,
711    ) -> Result<RuntimeCommitResult, StoreError>;
712
713    /// Persist a queued-work batch for later claiming.
714    ///
715    /// The default implementation rejects the batch: backends that do not
716    /// support queued work inherit it and stay loud rather than silently
717    /// dropping work.
718    async fn enqueue_queued_work(
719        &self,
720        _batch: crate::QueuedWorkBatchDraft,
721    ) -> Result<crate::QueuedWorkBatch, StoreError> {
722        Err(StoreError::Backend(
723            "queued work is not supported by this test store".to_string(),
724        ))
725    }
726
727    /// Claim the next ready queued-work group for `owner_id`.
728    ///
729    /// The default implementation reports queued work as unsupported.
730    async fn claim_ready_queued_work(
731        &self,
732        session_id: &str,
733        _owner_id: &str,
734        _boundary: crate::QueuedWorkClaimBoundary,
735        _lease_ttl_ms: u64,
736        _max_batches: usize,
737    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
738        Err(StoreError::Backend(format!(
739            "queued work is not supported for session `{session_id}` by this test store"
740        )))
741    }
742
743    /// Claim a specific ready batch set selected from the durable queue.
744    ///
745    /// This is the host-facing counterpart to [`claim_ready_queued_work`]:
746    /// callers that project queued work into a UI can claim the exact batch ids
747    /// they rendered instead of reconstructing authority from local draft state.
748    /// The default implementation preserves the ordered queue contract by
749    /// claiming the next ready group and returning it only when the durable ids
750    /// match exactly.
751    async fn claim_ready_queued_work_by_batch_ids(
752        &self,
753        session_id: &str,
754        owner_id: &str,
755        boundary: crate::QueuedWorkClaimBoundary,
756        lease_ttl_ms: u64,
757        batch_ids: &[String],
758    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
759        if batch_ids.is_empty() {
760            return Ok(None);
761        }
762        let Some(claim) = self
763            .claim_ready_queued_work(
764                session_id,
765                owner_id,
766                boundary,
767                lease_ttl_ms,
768                batch_ids.len(),
769            )
770            .await?
771        else {
772            return Ok(None);
773        };
774        let claimed_ids = claim
775            .batches
776            .iter()
777            .map(|batch| batch.batch_id.as_str())
778            .collect::<Vec<_>>();
779        if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
780            return Ok(Some(claim));
781        }
782        self.abandon_queued_work_claim(&claim).await?;
783        Ok(None)
784    }
785
786    /// Extend the lease on a held queued-work claim.
787    ///
788    /// The default implementation reports the claim as expired, matching a
789    /// backend that never granted one.
790    async fn renew_queued_work_claim(
791        &self,
792        claim: &crate::QueuedWorkClaim,
793        _lease_ttl_ms: u64,
794    ) -> Result<crate::QueuedWorkClaim, StoreError> {
795        Err(StoreError::QueuedWorkClaimExpired {
796            session_id: claim.session_id.clone(),
797            claim_id: claim.claim_id.clone(),
798        })
799    }
800
801    /// Release a held queued-work claim without completing it.
802    ///
803    /// The default implementation is a no-op: with no queued work there is
804    /// nothing to release.
805    async fn abandon_queued_work_claim(
806        &self,
807        _claim: &crate::QueuedWorkClaim,
808    ) -> Result<(), StoreError> {
809        Ok(())
810    }
811
812    /// Remove an unclaimed queued-work batch from durable ingress.
813    ///
814    /// Returns the removed batch when cancellation won the race. Returns `None`
815    /// when the batch is missing or currently held by a live claim; callers must
816    /// treat that as "already claimed or completed" and must not restore any
817    /// stale local draft state.
818    ///
819    /// The default implementation reports `None` (nothing queued, nothing to
820    /// cancel).
821    async fn cancel_queued_work_batch(
822        &self,
823        _session_id: &str,
824        _batch_id: &str,
825    ) -> Result<Option<crate::QueuedWorkBatch>, StoreError> {
826        Ok(None)
827    }
828
829    /// List all queued-work batches for a session.
830    ///
831    /// The default implementation reports an empty queue.
832    async fn list_queued_work(
833        &self,
834        _session_id: &str,
835    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
836        Ok(Vec::new())
837    }
838
839    /// List queued-work batches that are still pending presentation/editing.
840    ///
841    /// This excludes batches currently held by a live claim. Expired claims are
842    /// considered pending again because they can be reclaimed or cancelled.
843    async fn list_pending_queued_work(
844        &self,
845        session_id: &str,
846    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
847        self.list_queued_work(session_id).await
848    }
849
850    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
851    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
852
853    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
854    async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
855    async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
856}
857
858fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
859    persisted_session_state_from_head(
860        SessionHead {
861            session_id: read.session_id,
862            head_revision: read.head_revision,
863            agent_frames: read.agent_frames,
864            current_agent_frame_id: read.current_agent_frame_id,
865            graph: read.graph,
866            config: read.config,
867            checkpoint_ref: read.checkpoint_ref,
868            token_ledger: read.token_ledger,
869        },
870        read.checkpoint,
871    )
872}
873
874pub async fn load_persisted_session_state(
875    store: &(dyn RuntimePersistence + '_),
876) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
877    Ok(store
878        .load_session(SessionReadScope::FullGraph)
879        .await?
880        .map(persisted_session_state_from_read))
881}
882
883pub async fn load_persisted_session_state_active_path(
884    store: &(dyn RuntimePersistence + '_),
885    leaf_node_id: Option<String>,
886) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
887    Ok(store
888        .load_session(SessionReadScope::ActivePath { leaf_node_id })
889        .await?
890        .map(persisted_session_state_from_read))
891}
892
893pub async fn refresh_persisted_session_state(
894    store: &(dyn RuntimePersistence + '_),
895    state: &mut crate::RuntimeSessionState,
896) -> Result<(), StoreError> {
897    if let Some(mut fresh) = load_persisted_session_state(store).await? {
898        fresh.policy.session_id = state.policy.session_id.clone();
899        fresh.policy.max_turns = state.policy.max_turns;
900        *state = fresh;
901    }
902    Ok(())
903}