Skip to main content

lash_core/store/
mod.rs

1//! The runtime's settled-session persistence contract and shared store types.
2
3mod lease_timings;
4pub mod queued_work;
5
6pub use lease_timings::{LeaseTimings, LeaseTimingsError};
7
8const PROC_BOOT_ID_PATH: &str = "/proc/sys/kernel/random/boot_id";
9
10fn default_root_session_id() -> String {
11    "root".to_string()
12}
13
14pub const SESSION_HEAD_META_SCHEMA_VERSION: u32 = 1;
15pub const SESSION_CHECKPOINT_SCHEMA_VERSION: u32 = 1;
16
17#[cfg(test)]
18mod persisted_state_tests {
19    use super::*;
20
21    #[test]
22    fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
23        let state = persisted_session_state_from_head(
24            SessionHead {
25                session_id: "stored".to_string(),
26                head_revision: 7,
27                agent_frames: Vec::new(),
28                current_agent_frame_id: String::new(),
29                graph: crate::SessionGraph::default(),
30                config: crate::PersistedSessionConfig {
31                    provider_id: "stored-provider".to_string(),
32                    model: crate::ModelSpec::default(),
33                },
34                checkpoint_ref: None,
35                token_ledger: Vec::new(),
36            },
37            None,
38        );
39
40        assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
41        assert!(
42            state
43                .agent_frames
44                .iter()
45                .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
46        );
47        assert_eq!(state.head_revision, Some(7));
48    }
49
50    #[test]
51    fn versioned_json_record_rejects_missing_schema_version() {
52        let err = decode_versioned_json_record::<SessionHeadMeta>(
53            "{}",
54            "SessionHeadMeta",
55            SESSION_HEAD_META_SCHEMA_VERSION,
56        )
57        .expect_err("pre-versioned session head should fail");
58
59        assert!(matches!(
60            err,
61            StoreError::MissingRecordSchemaVersion {
62                record_kind: "SessionHeadMeta",
63                expected: SESSION_HEAD_META_SCHEMA_VERSION
64            }
65        ));
66    }
67
68    #[test]
69    fn versioned_json_record_rejects_invalid_schema_version() {
70        let err = decode_versioned_json_record::<SessionHeadMeta>(
71            r#"{"schema_version":"1"}"#,
72            "SessionHeadMeta",
73            SESSION_HEAD_META_SCHEMA_VERSION,
74        )
75        .expect_err("invalid session head schema version should fail");
76
77        assert!(matches!(
78            err,
79            StoreError::InvalidRecordSchemaVersion {
80                record_kind: "SessionHeadMeta",
81                expected: SESSION_HEAD_META_SCHEMA_VERSION,
82                ..
83            }
84        ));
85    }
86
87    #[test]
88    fn versioned_json_record_rejects_unsupported_schema_version() {
89        let err = decode_versioned_json_record::<SessionHeadMeta>(
90            r#"{"schema_version":2}"#,
91            "SessionHeadMeta",
92            SESSION_HEAD_META_SCHEMA_VERSION,
93        )
94        .expect_err("unsupported session head schema version should fail");
95
96        assert!(matches!(
97            err,
98            StoreError::UnsupportedRecordSchemaVersion {
99                record_kind: "SessionHeadMeta",
100                actual: 2,
101                expected: SESSION_HEAD_META_SCHEMA_VERSION
102            }
103        ));
104    }
105}
106
107#[derive(Debug, thiserror::Error)]
108pub enum StoreError {
109    #[error(
110        "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
111    )]
112    SessionBindingMismatch {
113        bound_session_id: String,
114        attempted_session_id: String,
115    },
116    #[error("store does not support read scope {0:?}")]
117    UnsupportedReadScope(SessionReadScope),
118    #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
119    HeadRevisionConflict { expected: Option<u64>, actual: u64 },
120    #[error(
121        "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
122    )]
123    RuntimeTurnCommitConflict { session_id: String, turn_id: String },
124    #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
125    QueuedWorkClaimExpired {
126        session_id: String,
127        claim_id: String,
128    },
129    #[error("turn input claim `{claim_id}` for session `{session_id}` is missing or expired")]
130    TurnInputClaimExpired {
131        session_id: String,
132        claim_id: String,
133    },
134    #[error(
135        "pending turn input source_key `{source_key}` for session `{session_id}` is already bound to input `{existing_input_id}` with different submitted content"
136    )]
137    PendingTurnInputSourceKeyConflict {
138        session_id: String,
139        source_key: String,
140        existing_input_id: String,
141    },
142    #[error("session execution lease for session `{session_id}` is missing or expired")]
143    SessionExecutionLeaseExpired { session_id: String },
144    #[error(
145        "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
146    )]
147    UnsupportedRecordSchemaVersion {
148        record_kind: &'static str,
149        actual: u32,
150        expected: u32,
151    },
152    #[error(
153        "{record_kind} is missing schema_version and was written by unsupported pre-versioned state (expected {expected})"
154    )]
155    MissingRecordSchemaVersion {
156        record_kind: &'static str,
157        expected: u32,
158    },
159    #[error("{record_kind} schema_version {actual} is invalid (expected integer {expected})")]
160    InvalidRecordSchemaVersion {
161        record_kind: &'static str,
162        actual: String,
163        expected: u32,
164    },
165    #[error("store backend error: {0}")]
166    Backend(String),
167}
168
169#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
170pub struct SessionMeta {
171    pub session_id: String,
172    pub session_name: String,
173    pub created_at: String,
174    pub model: String,
175    pub cwd: Option<String>,
176    pub relation: crate::SessionRelation,
177}
178
179impl SessionMeta {
180    /// Returns the parent session id, if any, derived from the canonical
181    /// [`SessionRelation`] field.
182    pub fn parent_session_id(&self) -> Option<&str> {
183        self.relation.parent_session_id()
184    }
185}
186
187/// Lightweight session info for the resume picker.
188#[derive(Clone, Debug)]
189pub struct SessionPickerInfo {
190    pub session_id: String,
191    pub cwd: Option<String>,
192    pub relation: crate::SessionRelation,
193    pub first_user_message: String,
194    pub user_message_count: usize,
195}
196
197impl SessionPickerInfo {
198    pub fn parent_session_id(&self) -> Option<&str> {
199        self.relation.parent_session_id()
200    }
201}
202
203#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
204#[serde(transparent)]
205pub struct BlobRef(pub String);
206
207impl BlobRef {
208    pub fn as_str(&self) -> &str {
209        &self.0
210    }
211}
212
213impl std::fmt::Display for BlobRef {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        f.write_str(&self.0)
216    }
217}
218
219impl From<String> for BlobRef {
220    fn from(value: String) -> Self {
221        Self(value)
222    }
223}
224
225#[derive(Clone, Debug, Default, PartialEq, Eq)]
226pub struct GcReport {
227    pub root_count: usize,
228    pub retained_blob_count: usize,
229    pub deleted_blob_count: usize,
230}
231
232/// Result of a `StoreMaintenance::vacuum()` call.
233/// `removed_node_count` counts the tombstoned graph-node rows that were
234/// physically deleted from the store. `removed_pending_turn_input_tombstone_count`
235/// counts terminal pending-input evidence rows pruned by host-scheduled
236/// retention. Returned so hosts can emit metrics.
237#[derive(Clone, Debug, Default, PartialEq, Eq)]
238pub struct VacuumReport {
239    pub removed_node_count: usize,
240    pub removed_pending_turn_input_tombstone_count: usize,
241}
242
243#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
244pub struct SessionCheckpoint {
245    pub schema_version: u32,
246    pub turn_state: crate::PersistedTurnState,
247    #[serde(default, skip_serializing_if = "Option::is_none")]
248    pub tool_state_ref: Option<BlobRef>,
249    #[serde(default, skip_serializing_if = "Option::is_none")]
250    pub plugin_snapshot_ref: Option<BlobRef>,
251    #[serde(default, skip_serializing_if = "Option::is_none")]
252    pub plugin_snapshot_revision: Option<u64>,
253    #[serde(default, skip_serializing_if = "Option::is_none")]
254    pub execution_state_ref: Option<BlobRef>,
255}
256
257impl Default for SessionCheckpoint {
258    fn default() -> Self {
259        Self {
260            schema_version: SESSION_CHECKPOINT_SCHEMA_VERSION,
261            turn_state: crate::PersistedTurnState::default(),
262            tool_state_ref: None,
263            plugin_snapshot_ref: None,
264            plugin_snapshot_revision: None,
265            execution_state_ref: None,
266        }
267    }
268}
269
270impl SessionCheckpoint {
271    pub fn new(
272        turn_state: crate::PersistedTurnState,
273        tool_state_ref: Option<BlobRef>,
274        plugin_snapshot_ref: Option<BlobRef>,
275        plugin_snapshot_revision: Option<u64>,
276        execution_state_ref: Option<BlobRef>,
277    ) -> Self {
278        Self {
279            schema_version: SESSION_CHECKPOINT_SCHEMA_VERSION,
280            turn_state,
281            tool_state_ref,
282            plugin_snapshot_ref,
283            plugin_snapshot_revision,
284            execution_state_ref,
285        }
286    }
287}
288
289#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
290pub struct HydratedSessionCheckpoint {
291    pub turn_state: crate::PersistedTurnState,
292    pub tool_state_ref: Option<BlobRef>,
293    pub tool_state: Option<crate::ToolState>,
294    pub plugin_snapshot_ref: Option<BlobRef>,
295    pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
296    pub plugin_snapshot_revision: Option<u64>,
297    pub execution_state_ref: Option<BlobRef>,
298    pub execution_state: Option<Vec<u8>>,
299}
300
301#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
302pub struct SessionHead {
303    #[serde(default = "default_root_session_id")]
304    pub session_id: String,
305    #[serde(default)]
306    pub head_revision: u64,
307    #[serde(default)]
308    pub agent_frames: Vec<crate::AgentFrameRecord>,
309    #[serde(default, skip_serializing_if = "String::is_empty")]
310    pub current_agent_frame_id: crate::AgentFrameId,
311    pub graph: crate::SessionGraph,
312    pub config: crate::PersistedSessionConfig,
313    #[serde(default, skip_serializing_if = "Option::is_none")]
314    pub checkpoint_ref: Option<BlobRef>,
315    #[serde(default, skip_serializing_if = "Vec::is_empty")]
316    pub token_ledger: Vec<crate::TokenLedgerEntry>,
317}
318
319#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
320pub struct SessionHeadMeta {
321    pub schema_version: u32,
322    #[serde(default = "default_root_session_id")]
323    pub session_id: String,
324    #[serde(default)]
325    pub head_revision: u64,
326    pub config: crate::PersistedSessionConfig,
327    #[serde(default)]
328    pub agent_frames: Vec<crate::AgentFrameRecord>,
329    #[serde(default, skip_serializing_if = "String::is_empty")]
330    pub current_agent_frame_id: crate::AgentFrameId,
331    #[serde(default, skip_serializing_if = "Option::is_none")]
332    pub checkpoint_ref: Option<BlobRef>,
333    #[serde(default, skip_serializing_if = "Option::is_none")]
334    pub leaf_node_id: Option<String>,
335    #[serde(default)]
336    pub graph_node_count: usize,
337    #[serde(default, skip_serializing_if = "Vec::is_empty")]
338    pub token_ledger: Vec<crate::TokenLedgerEntry>,
339}
340
341fn persisted_session_config_from_state(
342    state: &crate::RuntimeSessionState,
343) -> crate::PersistedSessionConfig {
344    crate::PersistedSessionConfig {
345        provider_id: state.policy.recorded_provider_id().to_string(),
346        model: state.policy.model.clone(),
347    }
348}
349
350#[derive(Clone, Debug, PartialEq, Eq)]
351pub enum SessionReadScope {
352    FullGraph,
353    ActivePath { leaf_node_id: Option<String> },
354}
355
356#[derive(Clone, Debug)]
357pub struct PersistedSessionRead {
358    pub session_id: String,
359    pub head_revision: u64,
360    pub config: crate::PersistedSessionConfig,
361    pub agent_frames: Vec<crate::AgentFrameRecord>,
362    pub current_agent_frame_id: crate::AgentFrameId,
363    pub graph: crate::SessionGraph,
364    pub checkpoint_ref: Option<BlobRef>,
365    pub checkpoint: Option<HydratedSessionCheckpoint>,
366    pub token_ledger: Vec<crate::TokenLedgerEntry>,
367}
368
369#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
370pub enum GraphCommitDelta {
371    Unchanged {
372        leaf_node_id: Option<String>,
373    },
374    Append {
375        nodes: Vec<crate::SessionNodeRecord>,
376        leaf_node_id: Option<String>,
377    },
378    ReplaceFull(crate::SessionGraph),
379}
380
381impl GraphCommitDelta {
382    pub fn leaf_node_id(&self) -> Option<&String> {
383        match self {
384            Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
385                leaf_node_id.as_ref()
386            }
387            Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
388        }
389    }
390}
391
392#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
393pub struct RuntimeCommit {
394    pub session_id: String,
395    pub expected_head_revision: Option<u64>,
396    #[serde(default, skip_serializing_if = "Option::is_none")]
397    pub session_execution_lease: Option<SessionExecutionLeaseFence>,
398    #[serde(default, skip_serializing_if = "Option::is_none")]
399    pub release_session_execution_lease: Option<SessionExecutionLeaseCompletion>,
400    pub config: crate::PersistedSessionConfig,
401    pub agent_frames: Vec<crate::AgentFrameRecord>,
402    pub current_agent_frame_id: crate::AgentFrameId,
403    pub graph: GraphCommitDelta,
404    pub checkpoint: HydratedSessionCheckpoint,
405    pub usage_deltas: Vec<crate::TokenLedgerEntry>,
406    pub turn_commit: Option<RuntimeTurnCommitStamp>,
407    pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
408    pub completed_turn_input_claims: Vec<crate::TurnInputCompletion>,
409    #[serde(default, skip_serializing_if = "Option::is_none")]
410    pub interrupted_turn_input_turn_id: Option<String>,
411    /// Attachment ids whose bytes are referenced by this commit and
412    /// should be stamped `committed` in the write-ahead manifest as
413    /// part of the same SQL transaction. The backend marks each id
414    /// committed via [`AttachmentManifest::commit_refs`] before the
415    /// commit returns success. Hosts populate this from the
416    /// attachments emitted by tool calls and inline LLM-request
417    /// attachments produced during the turn.
418    pub committed_attachment_ids: Vec<crate::AttachmentId>,
419}
420
421#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
422pub struct RuntimeCommitResult {
423    pub head_revision: u64,
424    pub checkpoint_ref: BlobRef,
425    pub manifest: SessionCheckpoint,
426}
427
428#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
429pub struct LeaseOwnerIdentity {
430    pub owner_id: String,
431    pub incarnation_id: String,
432    #[serde(default)]
433    pub liveness: LeaseOwnerLiveness,
434}
435
436impl LeaseOwnerIdentity {
437    pub fn opaque(
438        owner_id: impl Into<String>,
439        incarnation_id: impl Into<String>,
440    ) -> LeaseOwnerIdentity {
441        LeaseOwnerIdentity {
442            owner_id: owner_id.into(),
443            incarnation_id: incarnation_id.into(),
444            liveness: LeaseOwnerLiveness::Opaque,
445        }
446    }
447
448    pub fn local_process(
449        owner_id: impl Into<String>,
450        incarnation_id: impl Into<String>,
451        host_id: impl Into<String>,
452    ) -> LeaseOwnerIdentity {
453        let liveness = LeaseOwnerLiveness::current_local_process(host_id.into())
454            .unwrap_or(LeaseOwnerLiveness::Opaque);
455        LeaseOwnerIdentity {
456            owner_id: owner_id.into(),
457            incarnation_id: incarnation_id.into(),
458            liveness,
459        }
460    }
461
462    pub fn same_incarnation(&self, other: &LeaseOwnerIdentity) -> bool {
463        self.owner_id == other.owner_id && self.incarnation_id == other.incarnation_id
464    }
465
466    pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerIdentity) -> bool {
467        self.liveness
468            .is_definitely_dead_for_claimant(&claimant.liveness)
469    }
470}
471
472#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
473#[serde(tag = "kind", rename_all = "snake_case")]
474pub enum LeaseOwnerLiveness {
475    LocalProcess {
476        host_id: String,
477        boot_id: String,
478        pid: u32,
479        process_start: String,
480    },
481    #[default]
482    Opaque,
483}
484
485impl LeaseOwnerLiveness {
486    pub fn current_local_process(host_id: impl Into<String>) -> Option<LeaseOwnerLiveness> {
487        let boot_id = std::fs::read_to_string(PROC_BOOT_ID_PATH)
488            .ok()
489            .map(|value| value.trim().to_string())
490            .filter(|value| !value.is_empty())?;
491        let pid = std::process::id();
492        let process_start = read_linux_process_start(pid)?;
493        Some(LeaseOwnerLiveness::LocalProcess {
494            host_id: host_id.into(),
495            boot_id,
496            pid,
497            process_start,
498        })
499    }
500
501    pub fn local_process_for_test(
502        host_id: impl Into<String>,
503        boot_id: impl Into<String>,
504        pid: u32,
505        process_start: impl Into<String>,
506    ) -> LeaseOwnerLiveness {
507        LeaseOwnerLiveness::LocalProcess {
508            host_id: host_id.into(),
509            boot_id: boot_id.into(),
510            pid,
511            process_start: process_start.into(),
512        }
513    }
514
515    pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerLiveness) -> bool {
516        let (
517            LeaseOwnerLiveness::LocalProcess {
518                host_id,
519                boot_id,
520                pid,
521                process_start,
522            },
523            LeaseOwnerLiveness::LocalProcess {
524                host_id: claimant_host_id,
525                boot_id: claimant_boot_id,
526                ..
527            },
528        ) = (self, claimant)
529        else {
530            return false;
531        };
532        if host_id != claimant_host_id || boot_id != claimant_boot_id {
533            return false;
534        }
535        matches!(linux_process_is_live(*pid, process_start), Some(false))
536    }
537}
538
539fn read_linux_process_start(pid: u32) -> Option<String> {
540    let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
541    parse_linux_process_start(&stat)
542}
543
544fn linux_process_is_live(pid: u32, expected_process_start: &str) -> Option<bool> {
545    match std::fs::read_to_string(format!("/proc/{pid}/stat")) {
546        Ok(stat) => parse_linux_process_start(&stat).map(|start| start == expected_process_start),
547        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Some(false),
548        Err(_) => None,
549    }
550}
551
552fn parse_linux_process_start(stat: &str) -> Option<String> {
553    let after_comm = stat.rsplit_once(") ")?.1;
554    after_comm.split_whitespace().nth(19).map(ToOwned::to_owned)
555}
556
557#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
558pub struct SessionExecutionLease {
559    pub session_id: String,
560    pub owner: LeaseOwnerIdentity,
561    pub lease_token: String,
562    pub fencing_token: u64,
563    pub claimed_at_epoch_ms: u64,
564    pub expires_at_epoch_ms: u64,
565}
566
567#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
568pub struct SessionExecutionLeaseFence {
569    pub session_id: String,
570    pub owner: LeaseOwnerIdentity,
571    pub lease_token: String,
572    pub fencing_token: u64,
573}
574
575#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
576pub struct SessionExecutionLeaseCompletion {
577    pub session_id: String,
578    pub owner: LeaseOwnerIdentity,
579    pub lease_token: String,
580    pub fencing_token: u64,
581}
582
583impl SessionExecutionLease {
584    pub fn fence(&self) -> SessionExecutionLeaseFence {
585        SessionExecutionLeaseFence {
586            session_id: self.session_id.clone(),
587            owner: self.owner.clone(),
588            lease_token: self.lease_token.clone(),
589            fencing_token: self.fencing_token,
590        }
591    }
592
593    pub fn completion(&self) -> SessionExecutionLeaseCompletion {
594        SessionExecutionLeaseCompletion {
595            session_id: self.session_id.clone(),
596            owner: self.owner.clone(),
597            lease_token: self.lease_token.clone(),
598            fencing_token: self.fencing_token,
599        }
600    }
601}
602
603impl SessionExecutionLeaseCompletion {
604    pub fn from_lease(lease: &SessionExecutionLease) -> Self {
605        lease.completion()
606    }
607}
608
609#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
610pub enum SessionExecutionLeaseClaimOutcome {
611    Acquired(SessionExecutionLease),
612    Busy { holder: SessionExecutionLease },
613}
614
615impl SessionExecutionLeaseClaimOutcome {
616    pub fn acquired(self) -> Option<SessionExecutionLease> {
617        match self {
618            Self::Acquired(lease) => Some(lease),
619            Self::Busy { .. } => None,
620        }
621    }
622}
623
624// =============================================================================
625// Attachment write-ahead manifest
626// =============================================================================
627
628/// A pending attachment write recorded *before* the bytes hit the
629/// [`AttachmentStore`](crate::AttachmentStore) backend.
630///
631/// The runtime calls [`AttachmentManifest::record_intent`] from the
632/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
633/// wrapper before each `put`, so the manifest is a durable record that
634/// "some bytes are about to land at this URI." When the turn that
635/// references the attachment commits successfully via
636/// [`SessionCommitStore::commit_runtime_state`], the same transaction
637/// stamps `committed_at_epoch_ms`. Periodic GC sweeps manifest rows
638/// whose intent has aged past a host-chosen threshold without ever
639/// being committed and deletes the corresponding bytes — that's how we
640/// reconcile orphaned files left behind by crashes between `put` and
641/// the next turn commit.
642#[derive(Clone, Debug)]
643pub struct AttachmentIntent {
644    pub attachment_id: crate::AttachmentId,
645    pub session_id: String,
646    /// Canonical URI for the attachment payload in the backing store.
647    /// For file-backed stores this is the absolute on-disk path; for
648    /// blob-backed stores it can be any stable identifier the host
649    /// uses to clean the payload up.
650    pub canonical_uri: String,
651    pub intent_at_epoch_ms: u64,
652}
653
654#[derive(Clone, Debug)]
655pub struct AttachmentManifestEntry {
656    pub attachment_id: crate::AttachmentId,
657    pub session_id: String,
658    pub canonical_uri: String,
659    pub intent_at_epoch_ms: u64,
660    pub committed_at_epoch_ms: Option<u64>,
661}
662
663/// The synchronous attachment-manifest surface required from every
664/// [`SessionCommitStore`]. Used by
665/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
666/// to record intent rows before `put` and by GC sweeps to reconcile
667/// orphans. See the [`AttachmentIntent`] doc comment for the full
668/// crash-safety story.
669///
670/// Backends with no attachment story (in-memory tests, mock stores)
671/// paste no-op impls via [`impl_noop_attachment_manifest!`] and
672/// participate transparently — `record_intent` is a no-op, the
673/// scoped wrapper still works, and GC sweeps return empty.
674pub trait AttachmentManifest: Send + Sync {
675    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
676
677    /// Mark a set of attachment ids as committed (i.e. now referenced
678    /// by a durable session-graph commit). Backends that store
679    /// commits and manifest in the same database stamp this inside
680    /// the commit transaction; the trait-level method is the
681    /// out-of-band entry point for hosts that want to commit an id
682    /// outside the normal turn-commit flow.
683    fn commit_refs(
684        &self,
685        session_id: &str,
686        attachment_ids: &[crate::AttachmentId],
687    ) -> Result<(), StoreError>;
688
689    /// Return manifest entries whose intent has aged past
690    /// `older_than_epoch_ms` without ever being committed. Hosts run
691    /// this periodically to find orphans left by crashes between
692    /// `record_intent` and the next turn commit.
693    fn list_uncommitted(
694        &self,
695        older_than_epoch_ms: u64,
696    ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
697
698    /// Remove a manifest row entirely. Called by the GC coordinator
699    /// after the corresponding bytes have been removed from the
700    /// backing [`AttachmentStore`](crate::AttachmentStore).
701    fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
702}
703
704/// Mixin macro for [`SessionCommitStore`] implementors that have no
705/// attachment-write story (mock backends, in-memory test stores,
706/// runtime-perf harnesses). Pastes no-op impls of every
707/// [`AttachmentManifest`] method.
708#[macro_export]
709macro_rules! impl_noop_attachment_manifest {
710    ($ty:ty) => {
711        impl $crate::AttachmentManifest for $ty {
712            fn record_intent(
713                &self,
714                _intent: $crate::AttachmentIntent,
715            ) -> ::std::result::Result<(), $crate::StoreError> {
716                Ok(())
717            }
718
719            fn commit_refs(
720                &self,
721                _session_id: &str,
722                _attachment_ids: &[$crate::AttachmentId],
723            ) -> ::std::result::Result<(), $crate::StoreError> {
724                Ok(())
725            }
726
727            fn list_uncommitted(
728                &self,
729                _older_than_epoch_ms: u64,
730            ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
731            {
732                Ok(Vec::new())
733            }
734
735            fn forget(
736                &self,
737                _attachment_id: &$crate::AttachmentId,
738            ) -> ::std::result::Result<(), $crate::StoreError> {
739                Ok(())
740            }
741        }
742    };
743}
744
745/// Reject a persisted record whose `schema_version` does not match the
746/// version this binary supports. Backends call this immediately after
747/// deserializing a record from durable storage.
748pub fn ensure_supported_schema_version(
749    record_kind: &'static str,
750    actual: u32,
751    expected: u32,
752) -> Result<(), StoreError> {
753    if actual == expected {
754        Ok(())
755    } else {
756        Err(StoreError::UnsupportedRecordSchemaVersion {
757            record_kind,
758            actual,
759            expected,
760        })
761    }
762}
763
764pub fn ensure_supported_record_schema_version(
765    record_kind: &'static str,
766    value: &serde_json::Value,
767    expected: u32,
768) -> Result<(), StoreError> {
769    let Some(schema_version) = value.get("schema_version") else {
770        return Err(StoreError::MissingRecordSchemaVersion {
771            record_kind,
772            expected,
773        });
774    };
775    let Some(actual) = schema_version
776        .as_u64()
777        .and_then(|version| u32::try_from(version).ok())
778    else {
779        return Err(StoreError::InvalidRecordSchemaVersion {
780            record_kind,
781            actual: schema_version.to_string(),
782            expected,
783        });
784    };
785    ensure_supported_schema_version(record_kind, actual, expected)
786}
787
788pub fn decode_versioned_json_record<T>(
789    json: &str,
790    record_kind: &'static str,
791    expected: u32,
792) -> Result<T, StoreError>
793where
794    T: serde::de::DeserializeOwned,
795{
796    let value: serde_json::Value = serde_json::from_str(json)
797        .map_err(|err| StoreError::Backend(format!("failed to decode {record_kind}: {err}")))?;
798    ensure_supported_record_schema_version(record_kind, &value, expected)?;
799    serde_json::from_value(value)
800        .map_err(|err| StoreError::Backend(format!("failed to decode {record_kind}: {err}")))
801}
802
803#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
804pub struct RuntimeTurnCommitStamp {
805    pub session_id: String,
806    pub turn_id: String,
807    pub turn_commit_hash: String,
808}
809
810impl RuntimeTurnCommitStamp {
811    pub fn new(
812        session_id: impl Into<String>,
813        turn_id: impl Into<String>,
814        turn_commit_hash: impl Into<String>,
815    ) -> Self {
816        Self {
817            session_id: session_id.into(),
818            turn_id: turn_id.into(),
819            turn_commit_hash: turn_commit_hash.into(),
820        }
821    }
822}
823
824fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
825    crate::PersistedTurnState {
826        turn_index: state.turn_index,
827        token_usage: state.token_usage.clone(),
828        last_prompt_usage: state.last_prompt_usage.clone(),
829        protocol_turn_options: state.protocol_turn_options.clone(),
830    }
831}
832
833fn build_checkpoint_from_persisted_state(
834    state: &crate::RuntimeSessionState,
835) -> HydratedSessionCheckpoint {
836    HydratedSessionCheckpoint {
837        turn_state: build_persisted_turn_state(state),
838        tool_state_ref: state.tool_state_ref.clone(),
839        tool_state: state.tool_state_snapshot.clone(),
840        plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
841        plugin_snapshot_revision: state.plugin_snapshot_revision,
842        plugin_snapshot: state.plugin_snapshot.clone(),
843        execution_state_ref: state.execution_state_ref.clone(),
844        execution_state: state.execution_state_snapshot.clone(),
845    }
846}
847
848impl RuntimeCommit {
849    pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
850        let mut semantic_commit = self.clone();
851        semantic_commit.expected_head_revision = None;
852        semantic_commit.session_execution_lease = None;
853        semantic_commit.release_session_execution_lease = None;
854        semantic_commit.turn_commit = None;
855        let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
856            StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
857        })?;
858        scrub_turn_commit_hash_value(&mut semantic_commit);
859        crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
860            StoreError::Backend(format!(
861                "failed to serialize runtime turn commit hash: {err}"
862            ))
863        })
864    }
865
866    pub fn persisted_state(
867        state: &crate::RuntimeSessionState,
868        usage_deltas: &[crate::TokenLedgerEntry],
869    ) -> Self {
870        Self {
871            session_id: state.session_id.clone(),
872            expected_head_revision: state.head_revision,
873            session_execution_lease: None,
874            release_session_execution_lease: None,
875            config: persisted_session_config_from_state(state),
876            agent_frames: state.agent_frames.clone(),
877            current_agent_frame_id: state.current_agent_frame_id.clone(),
878            graph: if state.graph_replace_required || state.head_revision.is_none() {
879                GraphCommitDelta::ReplaceFull(state.session_graph.clone())
880            } else {
881                GraphCommitDelta::Unchanged {
882                    leaf_node_id: state.session_graph.leaf_node_id.clone(),
883                }
884            },
885            checkpoint: build_checkpoint_from_persisted_state(state),
886            usage_deltas: usage_deltas.to_vec(),
887            turn_commit: None,
888            completed_queue_claims: Vec::new(),
889            completed_turn_input_claims: Vec::new(),
890            interrupted_turn_input_turn_id: None,
891            committed_attachment_ids: Vec::new(),
892        }
893    }
894
895    pub(crate) fn persisted_state_with_graph_commit(
896        state: &crate::RuntimeSessionState,
897        graph: GraphCommitDelta,
898        usage_deltas: &[crate::TokenLedgerEntry],
899    ) -> Self {
900        Self {
901            session_id: state.session_id.clone(),
902            expected_head_revision: state.head_revision,
903            session_execution_lease: None,
904            release_session_execution_lease: None,
905            config: persisted_session_config_from_state(state),
906            agent_frames: state.agent_frames.clone(),
907            current_agent_frame_id: state.current_agent_frame_id.clone(),
908            graph,
909            checkpoint: build_checkpoint_from_persisted_state(state),
910            usage_deltas: usage_deltas.to_vec(),
911            turn_commit: None,
912            completed_queue_claims: Vec::new(),
913            completed_turn_input_claims: Vec::new(),
914            interrupted_turn_input_turn_id: None,
915            committed_attachment_ids: Vec::new(),
916        }
917    }
918
919    pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
920        self.turn_commit = Some(turn_commit);
921        self
922    }
923
924    pub fn with_session_execution_lease(mut self, lease: SessionExecutionLeaseFence) -> Self {
925        self.session_execution_lease = Some(lease);
926        self
927    }
928
929    pub fn releasing_session_execution_lease(
930        mut self,
931        completion: SessionExecutionLeaseCompletion,
932    ) -> Self {
933        self.release_session_execution_lease = Some(completion);
934        self
935    }
936
937    pub fn completing_queue_claim(
938        mut self,
939        completed_queue_claim: crate::QueuedWorkCompletion,
940    ) -> Self {
941        self.completed_queue_claims.push(completed_queue_claim);
942        self
943    }
944
945    pub fn completing_queue_claims(
946        mut self,
947        completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
948    ) -> Self {
949        self.completed_queue_claims.extend(completed_queue_claims);
950        self
951    }
952
953    pub fn completing_turn_input_claim(
954        mut self,
955        completed_turn_input_claim: crate::TurnInputCompletion,
956    ) -> Self {
957        self.completed_turn_input_claims
958            .push(completed_turn_input_claim);
959        self
960    }
961
962    pub fn completing_turn_input_claims(
963        mut self,
964        completed_turn_input_claims: impl IntoIterator<Item = crate::TurnInputCompletion>,
965    ) -> Self {
966        self.completed_turn_input_claims
967            .extend(completed_turn_input_claims);
968        self
969    }
970
971    pub fn deferring_interrupted_turn_inputs(mut self, turn_id: impl Into<String>) -> Self {
972        self.interrupted_turn_input_turn_id = Some(turn_id.into());
973        self
974    }
975
976    pub fn with_committed_attachments(
977        mut self,
978        attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
979    ) -> Self {
980        self.committed_attachment_ids = attachment_ids.into_iter().collect();
981        self
982    }
983}
984
985fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
986    match value {
987        serde_json::Value::Object(map) => {
988            let is_message = map.contains_key("role") && map.contains_key("parts");
989            let is_message_part = map.contains_key("kind")
990                && map.contains_key("content")
991                && map.contains_key("prune_state");
992            if is_message || is_message_part {
993                map.remove("id");
994            }
995            for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
996                map.remove(volatile_key);
997            }
998            for child in map.values_mut() {
999                scrub_turn_commit_hash_value(child);
1000            }
1001        }
1002        serde_json::Value::Array(items) => {
1003            for item in items {
1004                scrub_turn_commit_hash_value(item);
1005            }
1006        }
1007        _ => {}
1008    }
1009}
1010
1011fn persisted_session_state_from_head(
1012    head: SessionHead,
1013    checkpoint: Option<HydratedSessionCheckpoint>,
1014) -> crate::RuntimeSessionState {
1015    let mut state = crate::RuntimeSessionState {
1016        session_id: head.session_id,
1017        policy: crate::SessionPolicy::default(),
1018        agent_frames: head.agent_frames,
1019        current_agent_frame_id: head.current_agent_frame_id,
1020        session_graph: head.graph,
1021        turn_index: 0,
1022        token_usage: crate::TokenUsage::default(),
1023        last_prompt_usage: None,
1024        protocol_turn_options: crate::ProtocolTurnOptions::default(),
1025        tool_state_ref: None,
1026        tool_state_generation: None,
1027        tool_state_snapshot: None,
1028        plugin_snapshot_ref: None,
1029        plugin_snapshot_revision: None,
1030        plugin_snapshot: None,
1031        execution_state_ref: None,
1032        execution_state_snapshot: None,
1033        token_ledger: head.token_ledger,
1034        checkpoint_ref: head.checkpoint_ref.clone(),
1035        head_revision: Some(head.head_revision),
1036        graph_replace_required: false,
1037    };
1038    state.policy.model = head.config.model.clone();
1039    state.policy.provider_id = head.config.provider_id.clone();
1040    if let Some(checkpoint) = checkpoint {
1041        state.turn_index = checkpoint.turn_state.turn_index;
1042        state.token_usage = checkpoint.turn_state.token_usage;
1043        state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
1044        state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
1045        state.tool_state_ref = checkpoint.tool_state_ref.clone();
1046        state.tool_state_generation = checkpoint
1047            .tool_state
1048            .as_ref()
1049            .map(|snapshot| snapshot.generation());
1050        state.tool_state_snapshot = checkpoint.tool_state;
1051        state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
1052        state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
1053        state.plugin_snapshot = checkpoint.plugin_snapshot;
1054        state.execution_state_ref = checkpoint.execution_state_ref.clone();
1055        state.execution_state_snapshot = checkpoint.execution_state;
1056    }
1057    state.ensure_agent_frame_initialized();
1058    state
1059}
1060
1061impl Default for SessionHead {
1062    fn default() -> Self {
1063        Self {
1064            session_id: default_root_session_id(),
1065            head_revision: 0,
1066            agent_frames: Vec::new(),
1067            current_agent_frame_id: String::new(),
1068            graph: crate::SessionGraph::default(),
1069            config: crate::PersistedSessionConfig::default(),
1070            checkpoint_ref: None,
1071            token_ledger: Vec::new(),
1072        }
1073    }
1074}
1075
1076impl Default for SessionHeadMeta {
1077    fn default() -> Self {
1078        Self {
1079            schema_version: SESSION_HEAD_META_SCHEMA_VERSION,
1080            session_id: default_root_session_id(),
1081            head_revision: 0,
1082            config: crate::PersistedSessionConfig::default(),
1083            agent_frames: Vec::new(),
1084            current_agent_frame_id: String::new(),
1085            checkpoint_ref: None,
1086            leaf_node_id: None,
1087            graph_node_count: 0,
1088            token_ledger: Vec::new(),
1089        }
1090    }
1091}
1092
1093/// Settled-session commit/read capability: the runtime's atomic transaction
1094/// facade for visible session state.
1095///
1096/// This segment owns session graph/head commits, checkpoint hydration and
1097/// usage, final turn-commit idempotency, session metadata, and the attachment
1098/// write-ahead manifest. Queued-work and turn-input *completions* also settle
1099/// here — [`commit_runtime_state`](Self::commit_runtime_state) consumes claims
1100/// granted by [`QueuedWorkStore`] and [`TurnInputStore`] in the same atomic
1101/// commit. In-flight nondeterministic work belongs to the active
1102/// [`EffectHost`](crate::EffectHost), not to the store contract.
1103///
1104/// The [`AttachmentManifest`] supertrait is required so the runtime can wrap
1105/// any persistence backend with a
1106/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
1107/// without dual-trait casting. Backends with no attachment-write story can
1108/// paste no-op manifest impls via [`impl_noop_attachment_manifest!`].
1109#[async_trait::async_trait]
1110pub trait SessionCommitStore: AttachmentManifest + Send + Sync {
1111    /// Durability tier this session store provides; defaults to
1112    /// [`DurabilityTier::Inline`](crate::DurabilityTier::Inline).
1113    fn durability_tier(&self) -> crate::DurabilityTier {
1114        crate::DurabilityTier::Inline
1115    }
1116
1117    async fn load_session(
1118        &self,
1119        scope: SessionReadScope,
1120    ) -> Result<Option<PersistedSessionRead>, StoreError>;
1121
1122    async fn load_node(
1123        &self,
1124        node_id: &str,
1125    ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
1126
1127    async fn commit_runtime_state(
1128        &self,
1129        commit: RuntimeCommit,
1130    ) -> Result<RuntimeCommitResult, StoreError>;
1131
1132    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
1133    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
1134}
1135
1136/// Pending turn-input lifecycle capability: durable ingress for model-visible
1137/// user input.
1138///
1139/// Active-turn ingress is claimed only by the matching live turn at a
1140/// checkpoint. Next-turn ingress is claimed only by idle dispatch. User input
1141/// must not be represented as generic queued work. Claims granted here are
1142/// completed atomically by [`SessionCommitStore::commit_runtime_state`].
1143#[async_trait::async_trait]
1144pub trait TurnInputStore: Send + Sync {
1145    /// Persist model-visible user input into the pending turn-input lifecycle.
1146    async fn enqueue_pending_turn_input(
1147        &self,
1148        input: crate::PendingTurnInputDraft,
1149    ) -> Result<crate::PendingTurnInput, StoreError>;
1150
1151    /// List pending user inputs for UI reconciliation and queue preview.
1152    ///
1153    /// This excludes completed/cancelled rows and rows currently held by a live
1154    /// claim. Expired claims are visible again according to their state.
1155    async fn list_pending_turn_inputs(
1156        &self,
1157        session_id: &str,
1158    ) -> Result<Vec<crate::PendingTurnInput>, StoreError>;
1159
1160    /// Cancel an unclaimed pending user input by id.
1161    ///
1162    /// Provided convenience: the singular form is exactly
1163    /// [`cancel_pending_turn_inputs`](Self::cancel_pending_turn_inputs) with a
1164    /// one-element target list, so backends implement only the plural
1165    /// primitive.
1166    async fn cancel_pending_turn_input(
1167        &self,
1168        session_id: &str,
1169        input_id: &str,
1170    ) -> Result<crate::PendingTurnInputCancelOutcome, StoreError> {
1171        let target = crate::PendingTurnInputCancelTarget::input_id(input_id);
1172        let targets = vec![target];
1173        let mut outcomes = self
1174            .cancel_pending_turn_inputs(session_id, &targets)
1175            .await?;
1176        Ok(outcomes
1177            .pop()
1178            .map(|result| result.outcome)
1179            .unwrap_or(crate::PendingTurnInputCancelOutcome::NotFound))
1180    }
1181
1182    /// Atomically cancel a list of pending user inputs by input id or source key.
1183    async fn cancel_pending_turn_inputs(
1184        &self,
1185        session_id: &str,
1186        targets: &[crate::PendingTurnInputCancelTarget],
1187    ) -> Result<Vec<crate::PendingTurnInputCancelResult>, StoreError>;
1188
1189    /// Atomically cancel the same-session runtime-admission suffix from an anchor.
1190    async fn cancel_pending_turn_input_suffix(
1191        &self,
1192        session_id: &str,
1193        anchor: &crate::PendingTurnInputCancelTarget,
1194    ) -> Result<crate::PendingTurnInputSuffixCancelOutcome, StoreError>;
1195
1196    /// Claim active-turn input at a checkpoint for the live turn id.
1197    // The parameters are the cohesive, all-required identity/lease inputs of a
1198    // durable active-turn claim (session, execution-lease fence, owner, turn,
1199    // checkpoint, lease TTL, batch size). They exceed the default threshold by
1200    // one and have no non-artificial sub-grouping; the sibling
1201    // `claim_next_turn_inputs` uses the same positional shape.
1202    #[allow(clippy::too_many_arguments)]
1203    async fn claim_active_turn_inputs(
1204        &self,
1205        session_id: &str,
1206        session_execution_lease: &SessionExecutionLeaseFence,
1207        owner: &LeaseOwnerIdentity,
1208        turn_id: &str,
1209        checkpoint: crate::CheckpointKind,
1210        lease_ttl_ms: u64,
1211        max_inputs: usize,
1212    ) -> Result<Option<crate::TurnInputClaim>, StoreError>;
1213
1214    /// Claim queued next-turn input at idle.
1215    async fn claim_next_turn_inputs(
1216        &self,
1217        session_id: &str,
1218        session_execution_lease: &SessionExecutionLeaseFence,
1219        owner: &LeaseOwnerIdentity,
1220        lease_ttl_ms: u64,
1221        max_inputs: usize,
1222    ) -> Result<Option<crate::TurnInputClaim>, StoreError>;
1223
1224    /// Abandon a held pending-turn-input claim so it can be reclaimed.
1225    async fn abandon_turn_input_claim(
1226        &self,
1227        claim: &crate::TurnInputClaim,
1228    ) -> Result<(), StoreError>;
1229}
1230
1231/// Durable single-writer execution-lane capability, fenced by monotonic
1232/// fencing tokens.
1233#[async_trait::async_trait]
1234pub trait SessionExecutionLeaseStore: Send + Sync {
1235    /// Try to claim the durable single-writer execution lane for `session_id`.
1236    ///
1237    /// Returns [`SessionExecutionLeaseClaimOutcome::Busy`] when another owner
1238    /// holds an unexpired lease. Expired or released leases may be reclaimed
1239    /// and receive a higher fencing token. An unexpired lease held by the same
1240    /// owner id but a different incarnation is busy.
1241    async fn try_claim_session_execution_lease(
1242        &self,
1243        session_id: &str,
1244        owner: &LeaseOwnerIdentity,
1245        lease_ttl_ms: u64,
1246    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
1247
1248    /// Reclaim an unexpired session execution lease whose observed holder is
1249    /// definitely dead according to persisted local-process liveness metadata.
1250    ///
1251    /// Backends must CAS on `observed_holder` so a stale claimant cannot clear
1252    /// a newer live lease that won the race after the busy observation.
1253    async fn reclaim_session_execution_lease(
1254        &self,
1255        session_id: &str,
1256        owner: &LeaseOwnerIdentity,
1257        observed_holder: &SessionExecutionLeaseFence,
1258        lease_ttl_ms: u64,
1259    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
1260
1261    /// Extend a live session execution lease owned by the caller.
1262    ///
1263    /// Backends must reject stale, released, superseded, or expired fences with
1264    /// [`StoreError::SessionExecutionLeaseExpired`].
1265    async fn renew_session_execution_lease(
1266        &self,
1267        fence: &SessionExecutionLeaseFence,
1268        lease_ttl_ms: u64,
1269    ) -> Result<SessionExecutionLease, StoreError>;
1270
1271    /// Release a session execution lease fenced by its completion token.
1272    ///
1273    /// This operation is idempotent and must not clear a newer owner's lease.
1274    async fn release_session_execution_lease(
1275        &self,
1276        completion: &SessionExecutionLeaseCompletion,
1277    ) -> Result<(), StoreError>;
1278}
1279
1280/// Durable queued-work capability: ingress, ordered claiming, and claim leases
1281/// for non-input work (process wakes and session commands).
1282///
1283/// Claims granted here are completed atomically by
1284/// [`SessionCommitStore::commit_runtime_state`].
1285#[async_trait::async_trait]
1286pub trait QueuedWorkStore: Send + Sync {
1287    /// Persist a queued-work batch for later claiming.
1288    async fn enqueue_queued_work(
1289        &self,
1290        batch: crate::QueuedWorkBatchDraft,
1291    ) -> Result<crate::QueuedWorkBatch, StoreError>;
1292
1293    /// Claim a leading ready session-command batch for `owner_id`.
1294    ///
1295    /// A command claim is returned only when the earliest ready claimable batch
1296    /// is classified as [`crate::runtime::QueuedWorkClass::SessionCommand`].
1297    /// Backends derive the class from queued payloads; no schema column is
1298    /// required.
1299    async fn claim_leading_ready_session_command(
1300        &self,
1301        session_id: &str,
1302        session_execution_lease: &SessionExecutionLeaseFence,
1303        owner: &LeaseOwnerIdentity,
1304        lease_ttl_ms: u64,
1305    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError>;
1306
1307    /// Claim the next ready turn-work group for `owner_id`.
1308    ///
1309    /// A turn-work claim is returned only when the earliest ready claimable
1310    /// batch is classified as [`crate::runtime::QueuedWorkClass::TurnWork`].
1311    /// Earlier ready session commands are not skipped and are never
1312    /// materialized as turn input.
1313    async fn claim_ready_queued_work(
1314        &self,
1315        session_id: &str,
1316        session_execution_lease: &SessionExecutionLeaseFence,
1317        owner: &LeaseOwnerIdentity,
1318        boundary: crate::QueuedWorkClaimBoundary,
1319        lease_ttl_ms: u64,
1320        max_batches: usize,
1321    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError>;
1322
1323    /// Claim a specific ready batch set selected from the durable queue.
1324    ///
1325    /// This is the host-facing counterpart to
1326    /// [`claim_ready_queued_work`](Self::claim_ready_queued_work): callers that
1327    /// project queued work into a UI can claim the exact batch ids they
1328    /// rendered instead of reconstructing authority from local draft state.
1329    ///
1330    /// Provided derivation: preserves the ordered queue contract by claiming
1331    /// the next ready group and returning it only when the durable ids match
1332    /// exactly, abandoning the claim otherwise. Backends may override it with
1333    /// a native query; none currently need to.
1334    async fn claim_ready_queued_work_by_batch_ids(
1335        &self,
1336        session_id: &str,
1337        session_execution_lease: &SessionExecutionLeaseFence,
1338        owner: &LeaseOwnerIdentity,
1339        boundary: crate::QueuedWorkClaimBoundary,
1340        lease_ttl_ms: u64,
1341        batch_ids: &[String],
1342    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1343        if batch_ids.is_empty() {
1344            return Ok(None);
1345        }
1346        let Some(claim) = self
1347            .claim_ready_queued_work(
1348                session_id,
1349                session_execution_lease,
1350                owner,
1351                boundary,
1352                lease_ttl_ms,
1353                batch_ids.len(),
1354            )
1355            .await?
1356        else {
1357            return Ok(None);
1358        };
1359        let claimed_ids = claim
1360            .batches
1361            .iter()
1362            .map(|batch| batch.batch_id.as_str())
1363            .collect::<Vec<_>>();
1364        if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
1365            return Ok(Some(claim));
1366        }
1367        self.abandon_queued_work_claim(&claim).await?;
1368        Ok(None)
1369    }
1370
1371    /// Extend the lease on a held queued-work claim.
1372    async fn renew_queued_work_claim(
1373        &self,
1374        claim: &crate::QueuedWorkClaim,
1375        lease_ttl_ms: u64,
1376    ) -> Result<crate::QueuedWorkClaim, StoreError>;
1377
1378    /// Release a held queued-work claim without completing it.
1379    async fn abandon_queued_work_claim(
1380        &self,
1381        claim: &crate::QueuedWorkClaim,
1382    ) -> Result<(), StoreError>;
1383
1384    /// Remove an unclaimed queued-work batch from durable ingress.
1385    ///
1386    /// Returns the removed batch when cancellation won the race. Returns `None`
1387    /// when the batch is missing or currently held by a live claim; callers must
1388    /// treat that as "already claimed or completed" and must not restore any
1389    /// stale local draft state.
1390    async fn cancel_queued_work_batch(
1391        &self,
1392        session_id: &str,
1393        batch_id: &str,
1394    ) -> Result<Option<crate::QueuedWorkBatch>, StoreError>;
1395
1396    /// List all queued-work batches for a session, including batches held by a
1397    /// live claim.
1398    async fn list_queued_work(
1399        &self,
1400        session_id: &str,
1401    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError>;
1402
1403    /// List queued-work batches that are still pending presentation/editing.
1404    ///
1405    /// This excludes batches currently held by a live claim. Expired claims are
1406    /// considered pending again because they can be reclaimed or cancelled.
1407    ///
1408    /// This is a distinct required query, not a derivation of
1409    /// [`list_queued_work`](Self::list_queued_work): the two differ by
1410    /// claim-state filter, and backends answer each with its own query over
1411    /// claim rows rather than leaking claim state to callers for client-side
1412    /// filtering.
1413    async fn list_pending_queued_work(
1414        &self,
1415        session_id: &str,
1416    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError>;
1417}
1418
1419/// Host-scheduled retention and garbage-collection capability over settled
1420/// state.
1421#[async_trait::async_trait]
1422pub trait StoreMaintenance: Send + Sync {
1423    /// Mark graph nodes as tombstoned so reads exclude them until
1424    /// [`vacuum`](Self::vacuum) physically removes them.
1425    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
1426
1427    /// Physically delete tombstoned graph-node rows and prune terminal
1428    /// pending-turn-input evidence rows. See [`VacuumReport`].
1429    async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
1430
1431    /// Delete blobs no longer reachable from any retained root.
1432    async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
1433}
1434
1435/// Exact settled-session persistence protocol required by the runtime.
1436///
1437/// `Arc<dyn RuntimePersistence>` is *the* runtime storage handle: one object
1438/// implementing every persistence capability segment —
1439/// [`SessionCommitStore`] (atomic graph/head commits, reads, metadata, and the
1440/// attachment write-ahead manifest), [`TurnInputStore`] (pending turn-input
1441/// lifecycle), [`QueuedWorkStore`] (queued-work ingress and claiming),
1442/// [`SessionExecutionLeaseStore`] (single-writer execution lane), and
1443/// [`StoreMaintenance`] (vacuum/GC). The segments share one transactional
1444/// domain: claims granted by the input and queue segments settle atomically in
1445/// [`SessionCommitStore::commit_runtime_state`]. In-flight nondeterministic
1446/// work belongs to the active [`EffectHost`](crate::EffectHost), not to the
1447/// store contract.
1448///
1449/// Blanket-implemented for every type that implements all five segments;
1450/// backends implement the segment traits and never this trait directly.
1451pub trait RuntimePersistence:
1452    SessionCommitStore
1453    + TurnInputStore
1454    + SessionExecutionLeaseStore
1455    + QueuedWorkStore
1456    + StoreMaintenance
1457{
1458}
1459
1460impl<T> RuntimePersistence for T where
1461    T: SessionCommitStore
1462        + TurnInputStore
1463        + SessionExecutionLeaseStore
1464        + QueuedWorkStore
1465        + StoreMaintenance
1466        + ?Sized
1467{
1468}
1469
1470fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
1471    persisted_session_state_from_head(
1472        SessionHead {
1473            session_id: read.session_id,
1474            head_revision: read.head_revision,
1475            agent_frames: read.agent_frames,
1476            current_agent_frame_id: read.current_agent_frame_id,
1477            graph: read.graph,
1478            config: read.config,
1479            checkpoint_ref: read.checkpoint_ref,
1480            token_ledger: read.token_ledger,
1481        },
1482        read.checkpoint,
1483    )
1484}
1485
1486pub async fn load_persisted_session_state(
1487    store: &(dyn RuntimePersistence + '_),
1488) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1489    Ok(store
1490        .load_session(SessionReadScope::FullGraph)
1491        .await?
1492        .map(persisted_session_state_from_read))
1493}
1494
1495pub async fn load_persisted_session_state_active_path(
1496    store: &(dyn RuntimePersistence + '_),
1497    leaf_node_id: Option<String>,
1498) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1499    Ok(store
1500        .load_session(SessionReadScope::ActivePath { leaf_node_id })
1501        .await?
1502        .map(persisted_session_state_from_read))
1503}
1504
1505pub async fn refresh_persisted_session_state(
1506    store: &(dyn RuntimePersistence + '_),
1507    state: &mut crate::RuntimeSessionState,
1508) -> Result<(), StoreError> {
1509    if let Some(mut fresh) = load_persisted_session_state(store).await? {
1510        fresh.policy.session_id = state.policy.session_id.clone();
1511        fresh.policy.max_turns = state.policy.max_turns;
1512        *state = fresh;
1513    }
1514    Ok(())
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519    use super::{LeaseOwnerIdentity, LeaseOwnerLiveness};
1520
1521    fn local_liveness(
1522        host_id: &str,
1523        boot_id: &str,
1524        pid: u32,
1525        process_start: &str,
1526    ) -> LeaseOwnerLiveness {
1527        LeaseOwnerLiveness::local_process_for_test(host_id, boot_id, pid, process_start)
1528    }
1529
1530    #[test]
1531    fn lease_owner_identity_requires_same_incarnation() {
1532        let first = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1533        let same = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1534        let next = LeaseOwnerIdentity::opaque("owner", "incarnation-b");
1535
1536        assert!(first.same_incarnation(&same));
1537        assert!(!first.same_incarnation(&next));
1538    }
1539
1540    #[test]
1541    fn local_liveness_only_proves_same_host_boot_dead_processes() {
1542        let holder = local_liveness(
1543            "host-a",
1544            "boot-a",
1545            std::process::id(),
1546            "not-the-current-process-start",
1547        );
1548        let same_host_boot = local_liveness("host-a", "boot-a", std::process::id(), "claimant");
1549        let other_host = local_liveness("host-b", "boot-a", std::process::id(), "claimant");
1550        let other_boot = local_liveness("host-a", "boot-b", std::process::id(), "claimant");
1551
1552        assert!(holder.is_definitely_dead_for_claimant(&same_host_boot));
1553        assert!(!holder.is_definitely_dead_for_claimant(&other_host));
1554        assert!(!holder.is_definitely_dead_for_claimant(&other_boot));
1555        assert!(!holder.is_definitely_dead_for_claimant(&LeaseOwnerLiveness::Opaque));
1556        assert!(!LeaseOwnerLiveness::Opaque.is_definitely_dead_for_claimant(&same_host_boot));
1557    }
1558}