Skip to main content

lash_core/store/
mod.rs

1//! The runtime's settled-session persistence contract and shared store types.
2
3pub mod queued_work;
4
5const PROC_BOOT_ID_PATH: &str = "/proc/sys/kernel/random/boot_id";
6
7fn default_root_session_id() -> String {
8    "root".to_string()
9}
10
11pub const SESSION_HEAD_META_SCHEMA_VERSION: u32 = 1;
12pub const SESSION_CHECKPOINT_SCHEMA_VERSION: u32 = 1;
13
14#[cfg(test)]
15mod persisted_state_tests {
16    use super::*;
17
18    #[test]
19    fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
20        let state = persisted_session_state_from_head(
21            SessionHead {
22                session_id: "stored".to_string(),
23                head_revision: 7,
24                agent_frames: Vec::new(),
25                current_agent_frame_id: String::new(),
26                graph: crate::SessionGraph::default(),
27                config: crate::PersistedSessionConfig {
28                    provider_id: "stored-provider".to_string(),
29                    model: crate::ModelSpec::default(),
30                },
31                checkpoint_ref: None,
32                token_ledger: Vec::new(),
33            },
34            None,
35        );
36
37        assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
38        assert!(
39            state
40                .agent_frames
41                .iter()
42                .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
43        );
44        assert_eq!(state.head_revision, Some(7));
45    }
46
47    #[test]
48    fn versioned_json_record_rejects_missing_schema_version() {
49        let err = decode_versioned_json_record::<SessionHeadMeta>(
50            "{}",
51            "SessionHeadMeta",
52            SESSION_HEAD_META_SCHEMA_VERSION,
53        )
54        .expect_err("pre-versioned session head should fail");
55
56        assert!(matches!(
57            err,
58            StoreError::MissingRecordSchemaVersion {
59                record_kind: "SessionHeadMeta",
60                expected: SESSION_HEAD_META_SCHEMA_VERSION
61            }
62        ));
63    }
64
65    #[test]
66    fn versioned_json_record_rejects_invalid_schema_version() {
67        let err = decode_versioned_json_record::<SessionHeadMeta>(
68            r#"{"schema_version":"1"}"#,
69            "SessionHeadMeta",
70            SESSION_HEAD_META_SCHEMA_VERSION,
71        )
72        .expect_err("invalid session head schema version should fail");
73
74        assert!(matches!(
75            err,
76            StoreError::InvalidRecordSchemaVersion {
77                record_kind: "SessionHeadMeta",
78                expected: SESSION_HEAD_META_SCHEMA_VERSION,
79                ..
80            }
81        ));
82    }
83
84    #[test]
85    fn versioned_json_record_rejects_unsupported_schema_version() {
86        let err = decode_versioned_json_record::<SessionHeadMeta>(
87            r#"{"schema_version":2}"#,
88            "SessionHeadMeta",
89            SESSION_HEAD_META_SCHEMA_VERSION,
90        )
91        .expect_err("unsupported session head schema version should fail");
92
93        assert!(matches!(
94            err,
95            StoreError::UnsupportedRecordSchemaVersion {
96                record_kind: "SessionHeadMeta",
97                actual: 2,
98                expected: SESSION_HEAD_META_SCHEMA_VERSION
99            }
100        ));
101    }
102}
103
104#[derive(Debug, thiserror::Error)]
105pub enum StoreError {
106    #[error(
107        "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
108    )]
109    SessionBindingMismatch {
110        bound_session_id: String,
111        attempted_session_id: String,
112    },
113    #[error("store does not support read scope {0:?}")]
114    UnsupportedReadScope(SessionReadScope),
115    #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
116    HeadRevisionConflict { expected: Option<u64>, actual: u64 },
117    #[error(
118        "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
119    )]
120    RuntimeTurnCommitConflict { session_id: String, turn_id: String },
121    #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
122    QueuedWorkClaimExpired {
123        session_id: String,
124        claim_id: String,
125    },
126    #[error("turn input claim `{claim_id}` for session `{session_id}` is missing or expired")]
127    TurnInputClaimExpired {
128        session_id: String,
129        claim_id: String,
130    },
131    #[error(
132        "pending turn input source_key `{source_key}` for session `{session_id}` is already bound to input `{existing_input_id}` with different submitted content"
133    )]
134    PendingTurnInputSourceKeyConflict {
135        session_id: String,
136        source_key: String,
137        existing_input_id: String,
138    },
139    #[error("session execution lease for session `{session_id}` is missing or expired")]
140    SessionExecutionLeaseExpired { session_id: String },
141    #[error(
142        "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
143    )]
144    UnsupportedRecordSchemaVersion {
145        record_kind: &'static str,
146        actual: u32,
147        expected: u32,
148    },
149    #[error(
150        "{record_kind} is missing schema_version and was written by unsupported pre-versioned state (expected {expected})"
151    )]
152    MissingRecordSchemaVersion {
153        record_kind: &'static str,
154        expected: u32,
155    },
156    #[error("{record_kind} schema_version {actual} is invalid (expected integer {expected})")]
157    InvalidRecordSchemaVersion {
158        record_kind: &'static str,
159        actual: String,
160        expected: u32,
161    },
162    #[error("store backend error: {0}")]
163    Backend(String),
164}
165
166#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
167pub struct SessionMeta {
168    pub session_id: String,
169    pub session_name: String,
170    pub created_at: String,
171    pub model: String,
172    pub cwd: Option<String>,
173    pub relation: crate::SessionRelation,
174}
175
176impl SessionMeta {
177    /// Returns the parent session id, if any, derived from the canonical
178    /// [`SessionRelation`] field.
179    pub fn parent_session_id(&self) -> Option<&str> {
180        self.relation.parent_session_id()
181    }
182}
183
184/// Lightweight session info for the resume picker.
185#[derive(Clone, Debug)]
186pub struct SessionPickerInfo {
187    pub session_id: String,
188    pub cwd: Option<String>,
189    pub relation: crate::SessionRelation,
190    pub first_user_message: String,
191    pub user_message_count: usize,
192}
193
194impl SessionPickerInfo {
195    pub fn parent_session_id(&self) -> Option<&str> {
196        self.relation.parent_session_id()
197    }
198}
199
200#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
201#[serde(transparent)]
202pub struct BlobRef(pub String);
203
204impl BlobRef {
205    pub fn as_str(&self) -> &str {
206        &self.0
207    }
208}
209
210impl std::fmt::Display for BlobRef {
211    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212        f.write_str(&self.0)
213    }
214}
215
216impl From<String> for BlobRef {
217    fn from(value: String) -> Self {
218        Self(value)
219    }
220}
221
222#[derive(Clone, Debug, Default, PartialEq, Eq)]
223pub struct GcReport {
224    pub root_count: usize,
225    pub retained_blob_count: usize,
226    pub deleted_blob_count: usize,
227}
228
229/// Result of a `RuntimePersistence::vacuum()` call.
230/// `removed_node_count` counts the tombstoned graph-node rows that were
231/// physically deleted from the store. `removed_pending_turn_input_tombstone_count`
232/// counts terminal pending-input evidence rows pruned by host-scheduled
233/// retention. Returned so hosts can emit metrics.
234#[derive(Clone, Debug, Default, PartialEq, Eq)]
235pub struct VacuumReport {
236    pub removed_node_count: usize,
237    pub removed_pending_turn_input_tombstone_count: usize,
238}
239
240#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
241pub struct SessionCheckpoint {
242    pub schema_version: u32,
243    pub turn_state: crate::PersistedTurnState,
244    #[serde(default, skip_serializing_if = "Option::is_none")]
245    pub tool_state_ref: Option<BlobRef>,
246    #[serde(default, skip_serializing_if = "Option::is_none")]
247    pub plugin_snapshot_ref: Option<BlobRef>,
248    #[serde(default, skip_serializing_if = "Option::is_none")]
249    pub plugin_snapshot_revision: Option<u64>,
250    #[serde(default, skip_serializing_if = "Option::is_none")]
251    pub execution_state_ref: Option<BlobRef>,
252}
253
254impl Default for SessionCheckpoint {
255    fn default() -> Self {
256        Self {
257            schema_version: SESSION_CHECKPOINT_SCHEMA_VERSION,
258            turn_state: crate::PersistedTurnState::default(),
259            tool_state_ref: None,
260            plugin_snapshot_ref: None,
261            plugin_snapshot_revision: None,
262            execution_state_ref: None,
263        }
264    }
265}
266
267impl SessionCheckpoint {
268    pub fn new(
269        turn_state: crate::PersistedTurnState,
270        tool_state_ref: Option<BlobRef>,
271        plugin_snapshot_ref: Option<BlobRef>,
272        plugin_snapshot_revision: Option<u64>,
273        execution_state_ref: Option<BlobRef>,
274    ) -> Self {
275        Self {
276            schema_version: SESSION_CHECKPOINT_SCHEMA_VERSION,
277            turn_state,
278            tool_state_ref,
279            plugin_snapshot_ref,
280            plugin_snapshot_revision,
281            execution_state_ref,
282        }
283    }
284}
285
286#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
287pub struct HydratedSessionCheckpoint {
288    pub turn_state: crate::PersistedTurnState,
289    pub tool_state_ref: Option<BlobRef>,
290    pub tool_state: Option<crate::ToolState>,
291    pub plugin_snapshot_ref: Option<BlobRef>,
292    pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
293    pub plugin_snapshot_revision: Option<u64>,
294    pub execution_state_ref: Option<BlobRef>,
295    pub execution_state: Option<Vec<u8>>,
296}
297
298#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
299pub struct SessionHead {
300    #[serde(default = "default_root_session_id")]
301    pub session_id: String,
302    #[serde(default)]
303    pub head_revision: u64,
304    #[serde(default)]
305    pub agent_frames: Vec<crate::AgentFrameRecord>,
306    #[serde(default, skip_serializing_if = "String::is_empty")]
307    pub current_agent_frame_id: crate::AgentFrameId,
308    pub graph: crate::SessionGraph,
309    pub config: crate::PersistedSessionConfig,
310    #[serde(default, skip_serializing_if = "Option::is_none")]
311    pub checkpoint_ref: Option<BlobRef>,
312    #[serde(default, skip_serializing_if = "Vec::is_empty")]
313    pub token_ledger: Vec<crate::TokenLedgerEntry>,
314}
315
316#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
317pub struct SessionHeadMeta {
318    pub schema_version: u32,
319    #[serde(default = "default_root_session_id")]
320    pub session_id: String,
321    #[serde(default)]
322    pub head_revision: u64,
323    pub config: crate::PersistedSessionConfig,
324    #[serde(default)]
325    pub agent_frames: Vec<crate::AgentFrameRecord>,
326    #[serde(default, skip_serializing_if = "String::is_empty")]
327    pub current_agent_frame_id: crate::AgentFrameId,
328    #[serde(default, skip_serializing_if = "Option::is_none")]
329    pub checkpoint_ref: Option<BlobRef>,
330    #[serde(default, skip_serializing_if = "Option::is_none")]
331    pub leaf_node_id: Option<String>,
332    #[serde(default)]
333    pub graph_node_count: usize,
334    #[serde(default, skip_serializing_if = "Vec::is_empty")]
335    pub token_ledger: Vec<crate::TokenLedgerEntry>,
336}
337
338fn persisted_session_config_from_state(
339    state: &crate::RuntimeSessionState,
340) -> crate::PersistedSessionConfig {
341    crate::PersistedSessionConfig {
342        provider_id: state.policy.recorded_provider_id().to_string(),
343        model: state.policy.model.clone(),
344    }
345}
346
347#[derive(Clone, Debug, PartialEq, Eq)]
348pub enum SessionReadScope {
349    FullGraph,
350    ActivePath { leaf_node_id: Option<String> },
351}
352
353#[derive(Clone, Debug)]
354pub struct PersistedSessionRead {
355    pub session_id: String,
356    pub head_revision: u64,
357    pub config: crate::PersistedSessionConfig,
358    pub agent_frames: Vec<crate::AgentFrameRecord>,
359    pub current_agent_frame_id: crate::AgentFrameId,
360    pub graph: crate::SessionGraph,
361    pub checkpoint_ref: Option<BlobRef>,
362    pub checkpoint: Option<HydratedSessionCheckpoint>,
363    pub token_ledger: Vec<crate::TokenLedgerEntry>,
364}
365
366#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
367pub enum GraphCommitDelta {
368    Unchanged {
369        leaf_node_id: Option<String>,
370    },
371    Append {
372        nodes: Vec<crate::SessionNodeRecord>,
373        leaf_node_id: Option<String>,
374    },
375    ReplaceFull(crate::SessionGraph),
376}
377
378impl GraphCommitDelta {
379    pub fn leaf_node_id(&self) -> Option<&String> {
380        match self {
381            Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
382                leaf_node_id.as_ref()
383            }
384            Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
385        }
386    }
387}
388
389#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
390pub struct RuntimeCommit {
391    pub session_id: String,
392    pub expected_head_revision: Option<u64>,
393    #[serde(default, skip_serializing_if = "Option::is_none")]
394    pub session_execution_lease: Option<SessionExecutionLeaseFence>,
395    #[serde(default, skip_serializing_if = "Option::is_none")]
396    pub release_session_execution_lease: Option<SessionExecutionLeaseCompletion>,
397    pub config: crate::PersistedSessionConfig,
398    pub agent_frames: Vec<crate::AgentFrameRecord>,
399    pub current_agent_frame_id: crate::AgentFrameId,
400    pub graph: GraphCommitDelta,
401    pub checkpoint: HydratedSessionCheckpoint,
402    pub usage_deltas: Vec<crate::TokenLedgerEntry>,
403    pub turn_commit: Option<RuntimeTurnCommitStamp>,
404    pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
405    pub completed_turn_input_claims: Vec<crate::TurnInputCompletion>,
406    #[serde(default, skip_serializing_if = "Option::is_none")]
407    pub interrupted_turn_input_turn_id: Option<String>,
408    /// Attachment ids whose bytes are referenced by this commit and
409    /// should be stamped `committed` in the write-ahead manifest as
410    /// part of the same SQL transaction. The backend marks each id
411    /// committed via [`AttachmentManifest::commit_refs`] before the
412    /// commit returns success. Hosts populate this from the
413    /// attachments emitted by tool calls and inline LLM-request
414    /// attachments produced during the turn.
415    pub committed_attachment_ids: Vec<crate::AttachmentId>,
416}
417
418#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
419pub struct RuntimeCommitResult {
420    pub head_revision: u64,
421    pub checkpoint_ref: BlobRef,
422    pub manifest: SessionCheckpoint,
423}
424
425#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
426pub struct LeaseOwnerIdentity {
427    pub owner_id: String,
428    pub incarnation_id: String,
429    #[serde(default)]
430    pub liveness: LeaseOwnerLiveness,
431}
432
433impl LeaseOwnerIdentity {
434    pub fn opaque(
435        owner_id: impl Into<String>,
436        incarnation_id: impl Into<String>,
437    ) -> LeaseOwnerIdentity {
438        LeaseOwnerIdentity {
439            owner_id: owner_id.into(),
440            incarnation_id: incarnation_id.into(),
441            liveness: LeaseOwnerLiveness::Opaque,
442        }
443    }
444
445    pub fn local_process(
446        owner_id: impl Into<String>,
447        incarnation_id: impl Into<String>,
448        host_id: impl Into<String>,
449    ) -> LeaseOwnerIdentity {
450        let liveness = LeaseOwnerLiveness::current_local_process(host_id.into())
451            .unwrap_or(LeaseOwnerLiveness::Opaque);
452        LeaseOwnerIdentity {
453            owner_id: owner_id.into(),
454            incarnation_id: incarnation_id.into(),
455            liveness,
456        }
457    }
458
459    pub fn same_incarnation(&self, other: &LeaseOwnerIdentity) -> bool {
460        self.owner_id == other.owner_id && self.incarnation_id == other.incarnation_id
461    }
462
463    pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerIdentity) -> bool {
464        self.liveness
465            .is_definitely_dead_for_claimant(&claimant.liveness)
466    }
467}
468
469#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
470#[serde(tag = "kind", rename_all = "snake_case")]
471pub enum LeaseOwnerLiveness {
472    LocalProcess {
473        host_id: String,
474        boot_id: String,
475        pid: u32,
476        process_start: String,
477    },
478    #[default]
479    Opaque,
480}
481
482impl LeaseOwnerLiveness {
483    pub fn current_local_process(host_id: impl Into<String>) -> Option<LeaseOwnerLiveness> {
484        let boot_id = std::fs::read_to_string(PROC_BOOT_ID_PATH)
485            .ok()
486            .map(|value| value.trim().to_string())
487            .filter(|value| !value.is_empty())?;
488        let pid = std::process::id();
489        let process_start = read_linux_process_start(pid)?;
490        Some(LeaseOwnerLiveness::LocalProcess {
491            host_id: host_id.into(),
492            boot_id,
493            pid,
494            process_start,
495        })
496    }
497
498    pub fn local_process_for_test(
499        host_id: impl Into<String>,
500        boot_id: impl Into<String>,
501        pid: u32,
502        process_start: impl Into<String>,
503    ) -> LeaseOwnerLiveness {
504        LeaseOwnerLiveness::LocalProcess {
505            host_id: host_id.into(),
506            boot_id: boot_id.into(),
507            pid,
508            process_start: process_start.into(),
509        }
510    }
511
512    pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerLiveness) -> bool {
513        let (
514            LeaseOwnerLiveness::LocalProcess {
515                host_id,
516                boot_id,
517                pid,
518                process_start,
519            },
520            LeaseOwnerLiveness::LocalProcess {
521                host_id: claimant_host_id,
522                boot_id: claimant_boot_id,
523                ..
524            },
525        ) = (self, claimant)
526        else {
527            return false;
528        };
529        if host_id != claimant_host_id || boot_id != claimant_boot_id {
530            return false;
531        }
532        matches!(linux_process_is_live(*pid, process_start), Some(false))
533    }
534}
535
536fn read_linux_process_start(pid: u32) -> Option<String> {
537    let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
538    parse_linux_process_start(&stat)
539}
540
541fn linux_process_is_live(pid: u32, expected_process_start: &str) -> Option<bool> {
542    match std::fs::read_to_string(format!("/proc/{pid}/stat")) {
543        Ok(stat) => parse_linux_process_start(&stat).map(|start| start == expected_process_start),
544        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Some(false),
545        Err(_) => None,
546    }
547}
548
549fn parse_linux_process_start(stat: &str) -> Option<String> {
550    let after_comm = stat.rsplit_once(") ")?.1;
551    after_comm.split_whitespace().nth(19).map(ToOwned::to_owned)
552}
553
554#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
555pub struct SessionExecutionLease {
556    pub session_id: String,
557    pub owner: LeaseOwnerIdentity,
558    pub lease_token: String,
559    pub fencing_token: u64,
560    pub claimed_at_epoch_ms: u64,
561    pub expires_at_epoch_ms: u64,
562}
563
564#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
565pub struct SessionExecutionLeaseFence {
566    pub session_id: String,
567    pub owner: LeaseOwnerIdentity,
568    pub lease_token: String,
569    pub fencing_token: u64,
570}
571
572#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
573pub struct SessionExecutionLeaseCompletion {
574    pub session_id: String,
575    pub owner: LeaseOwnerIdentity,
576    pub lease_token: String,
577    pub fencing_token: u64,
578}
579
580impl SessionExecutionLease {
581    pub fn fence(&self) -> SessionExecutionLeaseFence {
582        SessionExecutionLeaseFence {
583            session_id: self.session_id.clone(),
584            owner: self.owner.clone(),
585            lease_token: self.lease_token.clone(),
586            fencing_token: self.fencing_token,
587        }
588    }
589
590    pub fn completion(&self) -> SessionExecutionLeaseCompletion {
591        SessionExecutionLeaseCompletion {
592            session_id: self.session_id.clone(),
593            owner: self.owner.clone(),
594            lease_token: self.lease_token.clone(),
595            fencing_token: self.fencing_token,
596        }
597    }
598}
599
600impl SessionExecutionLeaseCompletion {
601    pub fn from_lease(lease: &SessionExecutionLease) -> Self {
602        lease.completion()
603    }
604}
605
606#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
607pub enum SessionExecutionLeaseClaimOutcome {
608    Acquired(SessionExecutionLease),
609    Busy { holder: SessionExecutionLease },
610}
611
612impl SessionExecutionLeaseClaimOutcome {
613    pub fn acquired(self) -> Option<SessionExecutionLease> {
614        match self {
615            Self::Acquired(lease) => Some(lease),
616            Self::Busy { .. } => None,
617        }
618    }
619}
620
621// =============================================================================
622// Attachment write-ahead manifest
623// =============================================================================
624
625/// A pending attachment write recorded *before* the bytes hit the
626/// [`AttachmentStore`](crate::AttachmentStore) backend.
627///
628/// The runtime calls [`AttachmentManifest::record_intent`] from the
629/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
630/// wrapper before each `put`, so the manifest is a durable record that
631/// "some bytes are about to land at this URI." When the turn that
632/// references the attachment commits successfully via
633/// [`RuntimePersistence::commit_runtime_state`], the same transaction
634/// stamps `committed_at_epoch_ms`. Periodic GC sweeps manifest rows
635/// whose intent has aged past a host-chosen threshold without ever
636/// being committed and deletes the corresponding bytes — that's how we
637/// reconcile orphaned files left behind by crashes between `put` and
638/// the next turn commit.
639#[derive(Clone, Debug)]
640pub struct AttachmentIntent {
641    pub attachment_id: crate::AttachmentId,
642    pub session_id: String,
643    /// Canonical URI for the attachment payload in the backing store.
644    /// For file-backed stores this is the absolute on-disk path; for
645    /// blob-backed stores it can be any stable identifier the host
646    /// uses to clean the payload up.
647    pub canonical_uri: String,
648    pub intent_at_epoch_ms: u64,
649}
650
651#[derive(Clone, Debug)]
652pub struct AttachmentManifestEntry {
653    pub attachment_id: crate::AttachmentId,
654    pub session_id: String,
655    pub canonical_uri: String,
656    pub intent_at_epoch_ms: u64,
657    pub committed_at_epoch_ms: Option<u64>,
658}
659
660/// Trait alias for the synchronous attachment-manifest surface on
661/// [`RuntimePersistence`]. Used by
662/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
663/// to record intent rows before `put` and by GC sweeps to reconcile
664/// orphans. See the [`AttachmentIntent`] doc comment for the full
665/// crash-safety story.
666///
667/// Backends with no attachment story (in-memory tests, mock stores)
668/// inherit the default no-op impls on [`RuntimePersistence`] and
669/// participate transparently — `record_intent` is a no-op, the
670/// scoped wrapper still works, and GC sweeps return empty.
671pub trait AttachmentManifest: Send + Sync {
672    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
673
674    /// Mark a set of attachment ids as committed (i.e. now referenced
675    /// by a durable session-graph commit). Backends that store
676    /// commits and manifest in the same database stamp this inside
677    /// the commit transaction; the trait-level method is the
678    /// out-of-band entry point for hosts that want to commit an id
679    /// outside the normal turn-commit flow.
680    fn commit_refs(
681        &self,
682        session_id: &str,
683        attachment_ids: &[crate::AttachmentId],
684    ) -> Result<(), StoreError>;
685
686    /// Return manifest entries whose intent has aged past
687    /// `older_than_epoch_ms` without ever being committed. Hosts run
688    /// this periodically to find orphans left by crashes between
689    /// `record_intent` and the next turn commit.
690    fn list_uncommitted(
691        &self,
692        older_than_epoch_ms: u64,
693    ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
694
695    /// Remove a manifest row entirely. Called by the GC coordinator
696    /// after the corresponding bytes have been removed from the
697    /// backing [`AttachmentStore`](crate::AttachmentStore).
698    fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
699}
700
701/// Mixin macro for [`RuntimePersistence`] implementors that have no
702/// attachment-write story (mock backends, in-memory test stores,
703/// runtime-perf harnesses). Pastes no-op impls of every
704/// [`AttachmentManifest`] method.
705#[macro_export]
706macro_rules! impl_noop_attachment_manifest {
707    ($ty:ty) => {
708        impl $crate::AttachmentManifest for $ty {
709            fn record_intent(
710                &self,
711                _intent: $crate::AttachmentIntent,
712            ) -> ::std::result::Result<(), $crate::StoreError> {
713                Ok(())
714            }
715
716            fn commit_refs(
717                &self,
718                _session_id: &str,
719                _attachment_ids: &[$crate::AttachmentId],
720            ) -> ::std::result::Result<(), $crate::StoreError> {
721                Ok(())
722            }
723
724            fn list_uncommitted(
725                &self,
726                _older_than_epoch_ms: u64,
727            ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
728            {
729                Ok(Vec::new())
730            }
731
732            fn forget(
733                &self,
734                _attachment_id: &$crate::AttachmentId,
735            ) -> ::std::result::Result<(), $crate::StoreError> {
736                Ok(())
737            }
738        }
739    };
740}
741
742/// Reject a persisted record whose `schema_version` does not match the
743/// version this binary supports. Backends call this immediately after
744/// deserializing a record from durable storage.
745pub fn ensure_supported_schema_version(
746    record_kind: &'static str,
747    actual: u32,
748    expected: u32,
749) -> Result<(), StoreError> {
750    if actual == expected {
751        Ok(())
752    } else {
753        Err(StoreError::UnsupportedRecordSchemaVersion {
754            record_kind,
755            actual,
756            expected,
757        })
758    }
759}
760
761pub fn ensure_supported_record_schema_version(
762    record_kind: &'static str,
763    value: &serde_json::Value,
764    expected: u32,
765) -> Result<(), StoreError> {
766    let Some(schema_version) = value.get("schema_version") else {
767        return Err(StoreError::MissingRecordSchemaVersion {
768            record_kind,
769            expected,
770        });
771    };
772    let Some(actual) = schema_version
773        .as_u64()
774        .and_then(|version| u32::try_from(version).ok())
775    else {
776        return Err(StoreError::InvalidRecordSchemaVersion {
777            record_kind,
778            actual: schema_version.to_string(),
779            expected,
780        });
781    };
782    ensure_supported_schema_version(record_kind, actual, expected)
783}
784
785pub fn decode_versioned_json_record<T>(
786    json: &str,
787    record_kind: &'static str,
788    expected: u32,
789) -> Result<T, StoreError>
790where
791    T: serde::de::DeserializeOwned,
792{
793    let value: serde_json::Value = serde_json::from_str(json)
794        .map_err(|err| StoreError::Backend(format!("failed to decode {record_kind}: {err}")))?;
795    ensure_supported_record_schema_version(record_kind, &value, expected)?;
796    serde_json::from_value(value)
797        .map_err(|err| StoreError::Backend(format!("failed to decode {record_kind}: {err}")))
798}
799
800#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
801pub struct RuntimeTurnCommitStamp {
802    pub session_id: String,
803    pub turn_id: String,
804    pub turn_commit_hash: String,
805}
806
807impl RuntimeTurnCommitStamp {
808    pub fn new(
809        session_id: impl Into<String>,
810        turn_id: impl Into<String>,
811        turn_commit_hash: impl Into<String>,
812    ) -> Self {
813        Self {
814            session_id: session_id.into(),
815            turn_id: turn_id.into(),
816            turn_commit_hash: turn_commit_hash.into(),
817        }
818    }
819}
820
821fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
822    crate::PersistedTurnState {
823        turn_index: state.turn_index,
824        token_usage: state.token_usage.clone(),
825        last_prompt_usage: state.last_prompt_usage.clone(),
826        protocol_turn_options: state.protocol_turn_options.clone(),
827    }
828}
829
830fn build_checkpoint_from_persisted_state(
831    state: &crate::RuntimeSessionState,
832) -> HydratedSessionCheckpoint {
833    HydratedSessionCheckpoint {
834        turn_state: build_persisted_turn_state(state),
835        tool_state_ref: state.tool_state_ref.clone(),
836        tool_state: state.tool_state_snapshot.clone(),
837        plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
838        plugin_snapshot_revision: state.plugin_snapshot_revision,
839        plugin_snapshot: state.plugin_snapshot.clone(),
840        execution_state_ref: state.execution_state_ref.clone(),
841        execution_state: state.execution_state_snapshot.clone(),
842    }
843}
844
845impl RuntimeCommit {
846    pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
847        let mut semantic_commit = self.clone();
848        semantic_commit.expected_head_revision = None;
849        semantic_commit.session_execution_lease = None;
850        semantic_commit.release_session_execution_lease = None;
851        semantic_commit.turn_commit = None;
852        let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
853            StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
854        })?;
855        scrub_turn_commit_hash_value(&mut semantic_commit);
856        crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
857            StoreError::Backend(format!(
858                "failed to serialize runtime turn commit hash: {err}"
859            ))
860        })
861    }
862
863    pub fn persisted_state(
864        state: &crate::RuntimeSessionState,
865        usage_deltas: &[crate::TokenLedgerEntry],
866    ) -> Self {
867        Self {
868            session_id: state.session_id.clone(),
869            expected_head_revision: state.head_revision,
870            session_execution_lease: None,
871            release_session_execution_lease: None,
872            config: persisted_session_config_from_state(state),
873            agent_frames: state.agent_frames.clone(),
874            current_agent_frame_id: state.current_agent_frame_id.clone(),
875            graph: if state.graph_replace_required || state.head_revision.is_none() {
876                GraphCommitDelta::ReplaceFull(state.session_graph.clone())
877            } else {
878                GraphCommitDelta::Unchanged {
879                    leaf_node_id: state.session_graph.leaf_node_id.clone(),
880                }
881            },
882            checkpoint: build_checkpoint_from_persisted_state(state),
883            usage_deltas: usage_deltas.to_vec(),
884            turn_commit: None,
885            completed_queue_claims: Vec::new(),
886            completed_turn_input_claims: Vec::new(),
887            interrupted_turn_input_turn_id: None,
888            committed_attachment_ids: Vec::new(),
889        }
890    }
891
892    pub(crate) fn persisted_state_with_graph_commit(
893        state: &crate::RuntimeSessionState,
894        graph: GraphCommitDelta,
895        usage_deltas: &[crate::TokenLedgerEntry],
896    ) -> Self {
897        Self {
898            session_id: state.session_id.clone(),
899            expected_head_revision: state.head_revision,
900            session_execution_lease: None,
901            release_session_execution_lease: None,
902            config: persisted_session_config_from_state(state),
903            agent_frames: state.agent_frames.clone(),
904            current_agent_frame_id: state.current_agent_frame_id.clone(),
905            graph,
906            checkpoint: build_checkpoint_from_persisted_state(state),
907            usage_deltas: usage_deltas.to_vec(),
908            turn_commit: None,
909            completed_queue_claims: Vec::new(),
910            completed_turn_input_claims: Vec::new(),
911            interrupted_turn_input_turn_id: None,
912            committed_attachment_ids: Vec::new(),
913        }
914    }
915
916    pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
917        self.turn_commit = Some(turn_commit);
918        self
919    }
920
921    pub fn with_session_execution_lease(mut self, lease: SessionExecutionLeaseFence) -> Self {
922        self.session_execution_lease = Some(lease);
923        self
924    }
925
926    pub fn releasing_session_execution_lease(
927        mut self,
928        completion: SessionExecutionLeaseCompletion,
929    ) -> Self {
930        self.release_session_execution_lease = Some(completion);
931        self
932    }
933
934    pub fn completing_queue_claim(
935        mut self,
936        completed_queue_claim: crate::QueuedWorkCompletion,
937    ) -> Self {
938        self.completed_queue_claims.push(completed_queue_claim);
939        self
940    }
941
942    pub fn completing_queue_claims(
943        mut self,
944        completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
945    ) -> Self {
946        self.completed_queue_claims.extend(completed_queue_claims);
947        self
948    }
949
950    pub fn completing_turn_input_claim(
951        mut self,
952        completed_turn_input_claim: crate::TurnInputCompletion,
953    ) -> Self {
954        self.completed_turn_input_claims
955            .push(completed_turn_input_claim);
956        self
957    }
958
959    pub fn completing_turn_input_claims(
960        mut self,
961        completed_turn_input_claims: impl IntoIterator<Item = crate::TurnInputCompletion>,
962    ) -> Self {
963        self.completed_turn_input_claims
964            .extend(completed_turn_input_claims);
965        self
966    }
967
968    pub fn deferring_interrupted_turn_inputs(mut self, turn_id: impl Into<String>) -> Self {
969        self.interrupted_turn_input_turn_id = Some(turn_id.into());
970        self
971    }
972
973    pub fn with_committed_attachments(
974        mut self,
975        attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
976    ) -> Self {
977        self.committed_attachment_ids = attachment_ids.into_iter().collect();
978        self
979    }
980}
981
982fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
983    match value {
984        serde_json::Value::Object(map) => {
985            let is_message = map.contains_key("role") && map.contains_key("parts");
986            let is_message_part = map.contains_key("kind")
987                && map.contains_key("content")
988                && map.contains_key("prune_state");
989            if is_message || is_message_part {
990                map.remove("id");
991            }
992            for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
993                map.remove(volatile_key);
994            }
995            for child in map.values_mut() {
996                scrub_turn_commit_hash_value(child);
997            }
998        }
999        serde_json::Value::Array(items) => {
1000            for item in items {
1001                scrub_turn_commit_hash_value(item);
1002            }
1003        }
1004        _ => {}
1005    }
1006}
1007
1008fn persisted_session_state_from_head(
1009    head: SessionHead,
1010    checkpoint: Option<HydratedSessionCheckpoint>,
1011) -> crate::RuntimeSessionState {
1012    let mut state = crate::RuntimeSessionState {
1013        session_id: head.session_id,
1014        policy: crate::SessionPolicy::default(),
1015        agent_frames: head.agent_frames,
1016        current_agent_frame_id: head.current_agent_frame_id,
1017        session_graph: head.graph,
1018        turn_index: 0,
1019        token_usage: crate::TokenUsage::default(),
1020        last_prompt_usage: None,
1021        protocol_turn_options: crate::ProtocolTurnOptions::default(),
1022        tool_state_ref: None,
1023        tool_state_generation: None,
1024        tool_state_snapshot: None,
1025        plugin_snapshot_ref: None,
1026        plugin_snapshot_revision: None,
1027        plugin_snapshot: None,
1028        execution_state_ref: None,
1029        execution_state_snapshot: None,
1030        token_ledger: head.token_ledger,
1031        checkpoint_ref: head.checkpoint_ref.clone(),
1032        head_revision: Some(head.head_revision),
1033        graph_replace_required: false,
1034    };
1035    state.policy.model = head.config.model.clone();
1036    state.policy.provider_id = head.config.provider_id.clone();
1037    if let Some(checkpoint) = checkpoint {
1038        state.turn_index = checkpoint.turn_state.turn_index;
1039        state.token_usage = checkpoint.turn_state.token_usage;
1040        state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
1041        state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
1042        state.tool_state_ref = checkpoint.tool_state_ref.clone();
1043        state.tool_state_generation = checkpoint
1044            .tool_state
1045            .as_ref()
1046            .map(|snapshot| snapshot.generation());
1047        state.tool_state_snapshot = checkpoint.tool_state;
1048        state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
1049        state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
1050        state.plugin_snapshot = checkpoint.plugin_snapshot;
1051        state.execution_state_ref = checkpoint.execution_state_ref.clone();
1052        state.execution_state_snapshot = checkpoint.execution_state;
1053    }
1054    state.ensure_agent_frame_initialized();
1055    state
1056}
1057
1058impl Default for SessionHead {
1059    fn default() -> Self {
1060        Self {
1061            session_id: default_root_session_id(),
1062            head_revision: 0,
1063            agent_frames: Vec::new(),
1064            current_agent_frame_id: String::new(),
1065            graph: crate::SessionGraph::default(),
1066            config: crate::PersistedSessionConfig::default(),
1067            checkpoint_ref: None,
1068            token_ledger: Vec::new(),
1069        }
1070    }
1071}
1072
1073impl Default for SessionHeadMeta {
1074    fn default() -> Self {
1075        Self {
1076            schema_version: SESSION_HEAD_META_SCHEMA_VERSION,
1077            session_id: default_root_session_id(),
1078            head_revision: 0,
1079            config: crate::PersistedSessionConfig::default(),
1080            agent_frames: Vec::new(),
1081            current_agent_frame_id: String::new(),
1082            checkpoint_ref: None,
1083            leaf_node_id: None,
1084            graph_node_count: 0,
1085            token_ledger: Vec::new(),
1086        }
1087    }
1088}
1089
1090/// Exact settled-session persistence protocol required by the runtime.
1091///
1092/// This is the runtime's atomic transaction facade for visible session state:
1093/// session graph/head commits, queued-work ingress and completion, final
1094/// turn-commit idempotency, metadata, usage, and the attachment write-ahead
1095/// manifest. In-flight nondeterministic work belongs to the active
1096/// [`EffectHost`](crate::EffectHost), not to the store contract.
1097///
1098/// The [`AttachmentManifest`] supertrait is required so the runtime can wrap
1099/// any persistence backend with a
1100/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
1101/// without dual-trait casting. Backends with no attachment-write story can
1102/// implement the manifest methods as no-ops via
1103/// [`NoopAttachmentManifest`]'s blanket helpers.
1104#[async_trait::async_trait]
1105pub trait RuntimePersistence: AttachmentManifest + Send + Sync {
1106    /// Durability tier this session store provides; defaults to
1107    /// [`DurabilityTier::Inline`].
1108    fn durability_tier(&self) -> crate::DurabilityTier {
1109        crate::DurabilityTier::Inline
1110    }
1111
1112    async fn load_session(
1113        &self,
1114        scope: SessionReadScope,
1115    ) -> Result<Option<PersistedSessionRead>, StoreError>;
1116
1117    async fn load_node(
1118        &self,
1119        node_id: &str,
1120    ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
1121
1122    async fn commit_runtime_state(
1123        &self,
1124        commit: RuntimeCommit,
1125    ) -> Result<RuntimeCommitResult, StoreError>;
1126
1127    /// Persist model-visible user input into the pending turn-input lifecycle.
1128    ///
1129    /// Active-turn ingress is claimed only by the matching live turn at a
1130    /// checkpoint. Next-turn ingress is claimed only by idle dispatch. User
1131    /// input must not be represented as generic queued work.
1132    async fn enqueue_pending_turn_input(
1133        &self,
1134        _input: crate::PendingTurnInputDraft,
1135    ) -> Result<crate::PendingTurnInput, StoreError> {
1136        Err(StoreError::Backend(
1137            "pending turn input is not supported by this test store".to_string(),
1138        ))
1139    }
1140
1141    /// List pending user inputs for UI reconciliation and queue preview.
1142    ///
1143    /// This excludes completed/cancelled rows and rows currently held by a live
1144    /// claim. Expired claims are visible again according to their state.
1145    async fn list_pending_turn_inputs(
1146        &self,
1147        _session_id: &str,
1148    ) -> Result<Vec<crate::PendingTurnInput>, StoreError> {
1149        Ok(Vec::new())
1150    }
1151
1152    /// Cancel an unclaimed pending user input by id.
1153    async fn cancel_pending_turn_input(
1154        &self,
1155        session_id: &str,
1156        input_id: &str,
1157    ) -> Result<crate::PendingTurnInputCancelOutcome, StoreError> {
1158        let target = crate::PendingTurnInputCancelTarget::input_id(input_id);
1159        let targets = vec![target];
1160        let mut outcomes = self
1161            .cancel_pending_turn_inputs(session_id, &targets)
1162            .await?;
1163        Ok(outcomes
1164            .pop()
1165            .map(|result| result.outcome)
1166            .unwrap_or(crate::PendingTurnInputCancelOutcome::NotFound))
1167    }
1168
1169    /// Atomically cancel a list of pending user inputs by input id or source key.
1170    async fn cancel_pending_turn_inputs(
1171        &self,
1172        _session_id: &str,
1173        _targets: &[crate::PendingTurnInputCancelTarget],
1174    ) -> Result<Vec<crate::PendingTurnInputCancelResult>, StoreError> {
1175        Err(StoreError::Backend(
1176            "pending turn input is not supported by this test store".to_string(),
1177        ))
1178    }
1179
1180    /// Atomically cancel the same-session runtime-admission suffix from an anchor.
1181    async fn cancel_pending_turn_input_suffix(
1182        &self,
1183        _session_id: &str,
1184        anchor: &crate::PendingTurnInputCancelTarget,
1185    ) -> Result<crate::PendingTurnInputSuffixCancelOutcome, StoreError> {
1186        Err(StoreError::Backend(format!(
1187            "pending turn input suffix cancellation is not supported by this test store for anchor `{anchor:?}`"
1188        )))
1189    }
1190
1191    /// Claim active-turn input at a checkpoint for the live turn id.
1192    async fn claim_active_turn_inputs(
1193        &self,
1194        session_id: &str,
1195        _session_execution_lease: &SessionExecutionLeaseFence,
1196        _owner: &LeaseOwnerIdentity,
1197        _turn_id: &str,
1198        _checkpoint: crate::CheckpointKind,
1199        _lease_ttl_ms: u64,
1200        _max_inputs: usize,
1201    ) -> Result<Option<crate::TurnInputClaim>, StoreError> {
1202        Err(StoreError::Backend(format!(
1203            "pending turn input is not supported for session `{session_id}` by this test store"
1204        )))
1205    }
1206
1207    /// Claim queued next-turn input at idle.
1208    async fn claim_next_turn_inputs(
1209        &self,
1210        session_id: &str,
1211        _session_execution_lease: &SessionExecutionLeaseFence,
1212        _owner: &LeaseOwnerIdentity,
1213        _lease_ttl_ms: u64,
1214        _max_inputs: usize,
1215    ) -> Result<Option<crate::TurnInputClaim>, StoreError> {
1216        Err(StoreError::Backend(format!(
1217            "pending turn input is not supported for session `{session_id}` by this test store"
1218        )))
1219    }
1220
1221    /// Abandon a held pending-turn-input claim so it can be reclaimed.
1222    async fn abandon_turn_input_claim(
1223        &self,
1224        _claim: &crate::TurnInputClaim,
1225    ) -> Result<(), StoreError> {
1226        Ok(())
1227    }
1228
1229    /// Try to claim the durable single-writer execution lane for `session_id`.
1230    ///
1231    /// Returns [`SessionExecutionLeaseClaimOutcome::Busy`] when another owner
1232    /// holds an unexpired lease. Expired or released leases may be reclaimed
1233    /// and receive a higher fencing token. An unexpired lease held by the same
1234    /// owner id but a different incarnation is busy.
1235    async fn try_claim_session_execution_lease(
1236        &self,
1237        session_id: &str,
1238        owner: &LeaseOwnerIdentity,
1239        lease_ttl_ms: u64,
1240    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
1241
1242    /// Reclaim an unexpired session execution lease whose observed holder is
1243    /// definitely dead according to persisted local-process liveness metadata.
1244    ///
1245    /// Backends must CAS on `observed_holder` so a stale claimant cannot clear
1246    /// a newer live lease that won the race after the busy observation.
1247    async fn reclaim_session_execution_lease(
1248        &self,
1249        session_id: &str,
1250        owner: &LeaseOwnerIdentity,
1251        observed_holder: &SessionExecutionLeaseFence,
1252        lease_ttl_ms: u64,
1253    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
1254
1255    /// Extend a live session execution lease owned by the caller.
1256    ///
1257    /// Backends must reject stale, released, superseded, or expired fences with
1258    /// [`StoreError::SessionExecutionLeaseExpired`].
1259    async fn renew_session_execution_lease(
1260        &self,
1261        fence: &SessionExecutionLeaseFence,
1262        lease_ttl_ms: u64,
1263    ) -> Result<SessionExecutionLease, StoreError>;
1264
1265    /// Release a session execution lease fenced by its completion token.
1266    ///
1267    /// This operation is idempotent and must not clear a newer owner's lease.
1268    async fn release_session_execution_lease(
1269        &self,
1270        completion: &SessionExecutionLeaseCompletion,
1271    ) -> Result<(), StoreError>;
1272
1273    /// Persist a queued-work batch for later claiming.
1274    ///
1275    /// The default implementation rejects the batch: backends that do not
1276    /// support queued work inherit it and stay loud rather than silently
1277    /// dropping work.
1278    async fn enqueue_queued_work(
1279        &self,
1280        _batch: crate::QueuedWorkBatchDraft,
1281    ) -> Result<crate::QueuedWorkBatch, StoreError> {
1282        Err(StoreError::Backend(
1283            "queued work is not supported by this test store".to_string(),
1284        ))
1285    }
1286
1287    /// Claim a leading ready session-command batch for `owner_id`.
1288    ///
1289    /// A command claim is returned only when the earliest ready claimable batch
1290    /// is classified as [`crate::runtime::QueuedWorkClass::SessionCommand`].
1291    /// Backends derive the class from queued payloads; no schema column is
1292    /// required.
1293    /// The default implementation reports queued work as unsupported.
1294    async fn claim_leading_ready_session_command(
1295        &self,
1296        session_id: &str,
1297        _session_execution_lease: &SessionExecutionLeaseFence,
1298        _owner: &LeaseOwnerIdentity,
1299        _lease_ttl_ms: u64,
1300    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1301        Err(StoreError::Backend(format!(
1302            "queued work is not supported for session `{session_id}` by this test store"
1303        )))
1304    }
1305
1306    /// Claim the next ready turn-work group for `owner_id`.
1307    ///
1308    /// A turn-work claim is returned only when the earliest ready claimable
1309    /// batch is classified as [`crate::runtime::QueuedWorkClass::TurnWork`].
1310    /// Earlier ready session commands are not skipped and are never
1311    /// materialized as turn input.
1312    ///
1313    /// The default implementation reports queued work as unsupported.
1314    async fn claim_ready_queued_work(
1315        &self,
1316        session_id: &str,
1317        _session_execution_lease: &SessionExecutionLeaseFence,
1318        _owner: &LeaseOwnerIdentity,
1319        _boundary: crate::QueuedWorkClaimBoundary,
1320        _lease_ttl_ms: u64,
1321        _max_batches: usize,
1322    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1323        Err(StoreError::Backend(format!(
1324            "queued work is not supported for session `{session_id}` by this test store"
1325        )))
1326    }
1327
1328    /// Claim a specific ready batch set selected from the durable queue.
1329    ///
1330    /// This is the host-facing counterpart to [`claim_ready_queued_work`]:
1331    /// callers that project queued work into a UI can claim the exact batch ids
1332    /// they rendered instead of reconstructing authority from local draft state.
1333    /// The default implementation preserves the ordered queue contract by
1334    /// claiming the next ready group and returning it only when the durable ids
1335    /// match exactly.
1336    async fn claim_ready_queued_work_by_batch_ids(
1337        &self,
1338        session_id: &str,
1339        session_execution_lease: &SessionExecutionLeaseFence,
1340        owner: &LeaseOwnerIdentity,
1341        boundary: crate::QueuedWorkClaimBoundary,
1342        lease_ttl_ms: u64,
1343        batch_ids: &[String],
1344    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1345        if batch_ids.is_empty() {
1346            return Ok(None);
1347        }
1348        let Some(claim) = self
1349            .claim_ready_queued_work(
1350                session_id,
1351                session_execution_lease,
1352                owner,
1353                boundary,
1354                lease_ttl_ms,
1355                batch_ids.len(),
1356            )
1357            .await?
1358        else {
1359            return Ok(None);
1360        };
1361        let claimed_ids = claim
1362            .batches
1363            .iter()
1364            .map(|batch| batch.batch_id.as_str())
1365            .collect::<Vec<_>>();
1366        if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
1367            return Ok(Some(claim));
1368        }
1369        self.abandon_queued_work_claim(&claim).await?;
1370        Ok(None)
1371    }
1372
1373    /// Extend the lease on a held queued-work claim.
1374    ///
1375    /// The default implementation reports the claim as expired, matching a
1376    /// backend that never granted one.
1377    async fn renew_queued_work_claim(
1378        &self,
1379        claim: &crate::QueuedWorkClaim,
1380        _lease_ttl_ms: u64,
1381    ) -> Result<crate::QueuedWorkClaim, StoreError> {
1382        Err(StoreError::QueuedWorkClaimExpired {
1383            session_id: claim.session_id.clone(),
1384            claim_id: claim.claim_id.clone(),
1385        })
1386    }
1387
1388    /// Release a held queued-work claim without completing it.
1389    ///
1390    /// The default implementation is a no-op: with no queued work there is
1391    /// nothing to release.
1392    async fn abandon_queued_work_claim(
1393        &self,
1394        _claim: &crate::QueuedWorkClaim,
1395    ) -> Result<(), StoreError> {
1396        Ok(())
1397    }
1398
1399    /// Remove an unclaimed queued-work batch from durable ingress.
1400    ///
1401    /// Returns the removed batch when cancellation won the race. Returns `None`
1402    /// when the batch is missing or currently held by a live claim; callers must
1403    /// treat that as "already claimed or completed" and must not restore any
1404    /// stale local draft state.
1405    ///
1406    /// The default implementation reports `None` (nothing queued, nothing to
1407    /// cancel).
1408    async fn cancel_queued_work_batch(
1409        &self,
1410        _session_id: &str,
1411        _batch_id: &str,
1412    ) -> Result<Option<crate::QueuedWorkBatch>, StoreError> {
1413        Ok(None)
1414    }
1415
1416    /// List all queued-work batches for a session.
1417    ///
1418    /// The default implementation reports an empty queue.
1419    async fn list_queued_work(
1420        &self,
1421        _session_id: &str,
1422    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
1423        Ok(Vec::new())
1424    }
1425
1426    /// List queued-work batches that are still pending presentation/editing.
1427    ///
1428    /// This excludes batches currently held by a live claim. Expired claims are
1429    /// considered pending again because they can be reclaimed or cancelled.
1430    async fn list_pending_queued_work(
1431        &self,
1432        session_id: &str,
1433    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
1434        self.list_queued_work(session_id).await
1435    }
1436
1437    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
1438    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
1439
1440    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
1441    async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
1442    async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
1443}
1444
1445fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
1446    persisted_session_state_from_head(
1447        SessionHead {
1448            session_id: read.session_id,
1449            head_revision: read.head_revision,
1450            agent_frames: read.agent_frames,
1451            current_agent_frame_id: read.current_agent_frame_id,
1452            graph: read.graph,
1453            config: read.config,
1454            checkpoint_ref: read.checkpoint_ref,
1455            token_ledger: read.token_ledger,
1456        },
1457        read.checkpoint,
1458    )
1459}
1460
1461pub async fn load_persisted_session_state(
1462    store: &(dyn RuntimePersistence + '_),
1463) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1464    Ok(store
1465        .load_session(SessionReadScope::FullGraph)
1466        .await?
1467        .map(persisted_session_state_from_read))
1468}
1469
1470pub async fn load_persisted_session_state_active_path(
1471    store: &(dyn RuntimePersistence + '_),
1472    leaf_node_id: Option<String>,
1473) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1474    Ok(store
1475        .load_session(SessionReadScope::ActivePath { leaf_node_id })
1476        .await?
1477        .map(persisted_session_state_from_read))
1478}
1479
1480pub async fn refresh_persisted_session_state(
1481    store: &(dyn RuntimePersistence + '_),
1482    state: &mut crate::RuntimeSessionState,
1483) -> Result<(), StoreError> {
1484    if let Some(mut fresh) = load_persisted_session_state(store).await? {
1485        fresh.policy.session_id = state.policy.session_id.clone();
1486        fresh.policy.max_turns = state.policy.max_turns;
1487        *state = fresh;
1488    }
1489    Ok(())
1490}
1491
1492#[cfg(test)]
1493mod tests {
1494    use super::{LeaseOwnerIdentity, LeaseOwnerLiveness};
1495
1496    fn local_liveness(
1497        host_id: &str,
1498        boot_id: &str,
1499        pid: u32,
1500        process_start: &str,
1501    ) -> LeaseOwnerLiveness {
1502        LeaseOwnerLiveness::local_process_for_test(host_id, boot_id, pid, process_start)
1503    }
1504
1505    #[test]
1506    fn lease_owner_identity_requires_same_incarnation() {
1507        let first = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1508        let same = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1509        let next = LeaseOwnerIdentity::opaque("owner", "incarnation-b");
1510
1511        assert!(first.same_incarnation(&same));
1512        assert!(!first.same_incarnation(&next));
1513    }
1514
1515    #[test]
1516    fn local_liveness_only_proves_same_host_boot_dead_processes() {
1517        let holder = local_liveness(
1518            "host-a",
1519            "boot-a",
1520            std::process::id(),
1521            "not-the-current-process-start",
1522        );
1523        let same_host_boot = local_liveness("host-a", "boot-a", std::process::id(), "claimant");
1524        let other_host = local_liveness("host-b", "boot-a", std::process::id(), "claimant");
1525        let other_boot = local_liveness("host-a", "boot-b", std::process::id(), "claimant");
1526
1527        assert!(holder.is_definitely_dead_for_claimant(&same_host_boot));
1528        assert!(!holder.is_definitely_dead_for_claimant(&other_host));
1529        assert!(!holder.is_definitely_dead_for_claimant(&other_boot));
1530        assert!(!holder.is_definitely_dead_for_claimant(&LeaseOwnerLiveness::Opaque));
1531        assert!(!LeaseOwnerLiveness::Opaque.is_definitely_dead_for_claimant(&same_host_boot));
1532    }
1533}