Skip to main content

lash_core/store/
mod.rs

1//! The runtime's settled-session persistence contract and shared store types.
2
3pub mod queued_work;
4
5const PROC_BOOT_ID_PATH: &str = "/proc/sys/kernel/random/boot_id";
6
7fn default_root_session_id() -> String {
8    "root".to_string()
9}
10
11#[cfg(test)]
12mod persisted_state_tests {
13    use super::*;
14
15    #[test]
16    fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
17        let state = persisted_session_state_from_head(
18            SessionHead {
19                session_id: "stored".to_string(),
20                head_revision: 7,
21                agent_frames: Vec::new(),
22                current_agent_frame_id: String::new(),
23                graph: crate::SessionGraph::default(),
24                config: crate::PersistedSessionConfig {
25                    provider_id: "stored-provider".to_string(),
26                    model: crate::ModelSpec::default(),
27                },
28                checkpoint_ref: None,
29                token_ledger: Vec::new(),
30            },
31            None,
32        );
33
34        assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
35        assert!(
36            state
37                .agent_frames
38                .iter()
39                .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
40        );
41        assert_eq!(state.head_revision, Some(7));
42    }
43}
44
45#[derive(Debug, thiserror::Error)]
46pub enum StoreError {
47    #[error(
48        "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
49    )]
50    SessionBindingMismatch {
51        bound_session_id: String,
52        attempted_session_id: String,
53    },
54    #[error("store does not support read scope {0:?}")]
55    UnsupportedReadScope(SessionReadScope),
56    #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
57    HeadRevisionConflict { expected: Option<u64>, actual: u64 },
58    #[error(
59        "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
60    )]
61    RuntimeTurnCommitConflict { session_id: String, turn_id: String },
62    #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
63    QueuedWorkClaimExpired {
64        session_id: String,
65        claim_id: String,
66    },
67    #[error("session execution lease for session `{session_id}` is missing or expired")]
68    SessionExecutionLeaseExpired { session_id: String },
69    #[error(
70        "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
71    )]
72    UnsupportedRecordSchemaVersion {
73        record_kind: &'static str,
74        actual: u32,
75        expected: u32,
76    },
77    #[error("store backend error: {0}")]
78    Backend(String),
79}
80
81#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
82pub struct SessionMeta {
83    pub session_id: String,
84    pub session_name: String,
85    pub created_at: String,
86    pub model: String,
87    pub cwd: Option<String>,
88    pub relation: crate::SessionRelation,
89}
90
91impl SessionMeta {
92    /// Returns the parent session id, if any, derived from the canonical
93    /// [`SessionRelation`] field.
94    pub fn parent_session_id(&self) -> Option<&str> {
95        self.relation.parent_session_id()
96    }
97}
98
99/// Lightweight session info for the resume picker.
100#[derive(Clone, Debug)]
101pub struct SessionPickerInfo {
102    pub session_id: String,
103    pub cwd: Option<String>,
104    pub relation: crate::SessionRelation,
105    pub first_user_message: String,
106    pub user_message_count: usize,
107}
108
109impl SessionPickerInfo {
110    pub fn parent_session_id(&self) -> Option<&str> {
111        self.relation.parent_session_id()
112    }
113}
114
115#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
116#[serde(transparent)]
117pub struct BlobRef(pub String);
118
119impl BlobRef {
120    pub fn as_str(&self) -> &str {
121        &self.0
122    }
123}
124
125impl std::fmt::Display for BlobRef {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        f.write_str(&self.0)
128    }
129}
130
131impl From<String> for BlobRef {
132    fn from(value: String) -> Self {
133        Self(value)
134    }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq)]
138pub struct GcReport {
139    pub root_count: usize,
140    pub retained_blob_count: usize,
141    pub deleted_blob_count: usize,
142}
143
144/// Result of a `RuntimePersistence::vacuum()` call.
145/// `removed_node_count` counts the tombstoned graph-node rows that were
146/// physically deleted from the store. Returned so hosts can emit metrics.
147#[derive(Clone, Debug, Default, PartialEq, Eq)]
148pub struct VacuumReport {
149    pub removed_node_count: usize,
150}
151
152#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
153pub struct SessionCheckpoint {
154    #[serde(default)]
155    pub turn_state: crate::PersistedTurnState,
156    #[serde(default, skip_serializing_if = "Option::is_none")]
157    pub tool_state_ref: Option<BlobRef>,
158    #[serde(default, skip_serializing_if = "Option::is_none")]
159    pub plugin_snapshot_ref: Option<BlobRef>,
160    #[serde(default, skip_serializing_if = "Option::is_none")]
161    pub plugin_snapshot_revision: Option<u64>,
162    #[serde(default, skip_serializing_if = "Option::is_none")]
163    pub execution_state_ref: Option<BlobRef>,
164}
165
166#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
167pub struct HydratedSessionCheckpoint {
168    pub turn_state: crate::PersistedTurnState,
169    pub tool_state_ref: Option<BlobRef>,
170    pub tool_state: Option<crate::ToolState>,
171    pub plugin_snapshot_ref: Option<BlobRef>,
172    pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
173    pub plugin_snapshot_revision: Option<u64>,
174    pub execution_state_ref: Option<BlobRef>,
175    pub execution_state: Option<Vec<u8>>,
176}
177
178#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
179pub struct SessionHead {
180    #[serde(default = "default_root_session_id")]
181    pub session_id: String,
182    #[serde(default)]
183    pub head_revision: u64,
184    #[serde(default)]
185    pub agent_frames: Vec<crate::AgentFrameRecord>,
186    #[serde(default, skip_serializing_if = "String::is_empty")]
187    pub current_agent_frame_id: crate::AgentFrameId,
188    pub graph: crate::SessionGraph,
189    pub config: crate::PersistedSessionConfig,
190    #[serde(default, skip_serializing_if = "Option::is_none")]
191    pub checkpoint_ref: Option<BlobRef>,
192    #[serde(default, skip_serializing_if = "Vec::is_empty")]
193    pub token_ledger: Vec<crate::TokenLedgerEntry>,
194}
195
196#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
197pub struct SessionHeadMeta {
198    #[serde(default = "default_root_session_id")]
199    pub session_id: String,
200    #[serde(default)]
201    pub head_revision: u64,
202    pub config: crate::PersistedSessionConfig,
203    #[serde(default)]
204    pub agent_frames: Vec<crate::AgentFrameRecord>,
205    #[serde(default, skip_serializing_if = "String::is_empty")]
206    pub current_agent_frame_id: crate::AgentFrameId,
207    #[serde(default, skip_serializing_if = "Option::is_none")]
208    pub checkpoint_ref: Option<BlobRef>,
209    #[serde(default, skip_serializing_if = "Option::is_none")]
210    pub leaf_node_id: Option<String>,
211    #[serde(default)]
212    pub graph_node_count: usize,
213    #[serde(default, skip_serializing_if = "Vec::is_empty")]
214    pub token_ledger: Vec<crate::TokenLedgerEntry>,
215}
216
217fn persisted_session_config_from_state(
218    state: &crate::RuntimeSessionState,
219) -> crate::PersistedSessionConfig {
220    crate::PersistedSessionConfig {
221        provider_id: state.policy.recorded_provider_id().to_string(),
222        model: state.policy.model.clone(),
223    }
224}
225
226#[derive(Clone, Debug, PartialEq, Eq)]
227pub enum SessionReadScope {
228    FullGraph,
229    ActivePath { leaf_node_id: Option<String> },
230}
231
232#[derive(Clone, Debug)]
233pub struct PersistedSessionRead {
234    pub session_id: String,
235    pub head_revision: u64,
236    pub config: crate::PersistedSessionConfig,
237    pub agent_frames: Vec<crate::AgentFrameRecord>,
238    pub current_agent_frame_id: crate::AgentFrameId,
239    pub graph: crate::SessionGraph,
240    pub checkpoint_ref: Option<BlobRef>,
241    pub checkpoint: Option<HydratedSessionCheckpoint>,
242    pub token_ledger: Vec<crate::TokenLedgerEntry>,
243}
244
245#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
246pub enum GraphCommitDelta {
247    Unchanged {
248        leaf_node_id: Option<String>,
249    },
250    Append {
251        nodes: Vec<crate::SessionNodeRecord>,
252        leaf_node_id: Option<String>,
253    },
254    ReplaceFull(crate::SessionGraph),
255}
256
257impl GraphCommitDelta {
258    pub fn leaf_node_id(&self) -> Option<&String> {
259        match self {
260            Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
261                leaf_node_id.as_ref()
262            }
263            Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
264        }
265    }
266}
267
268#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
269pub struct RuntimeCommit {
270    pub session_id: String,
271    pub expected_head_revision: Option<u64>,
272    #[serde(default, skip_serializing_if = "Option::is_none")]
273    pub session_execution_lease: Option<SessionExecutionLeaseFence>,
274    #[serde(default, skip_serializing_if = "Option::is_none")]
275    pub release_session_execution_lease: Option<SessionExecutionLeaseCompletion>,
276    pub config: crate::PersistedSessionConfig,
277    pub agent_frames: Vec<crate::AgentFrameRecord>,
278    pub current_agent_frame_id: crate::AgentFrameId,
279    pub graph: GraphCommitDelta,
280    pub checkpoint: HydratedSessionCheckpoint,
281    pub usage_deltas: Vec<crate::TokenLedgerEntry>,
282    pub turn_commit: Option<RuntimeTurnCommitStamp>,
283    pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
284    /// Attachment ids whose bytes are referenced by this commit and
285    /// should be stamped `committed` in the write-ahead manifest as
286    /// part of the same SQL transaction. The backend marks each id
287    /// committed via [`AttachmentManifest::commit_refs`] before the
288    /// commit returns success. Hosts populate this from the
289    /// attachments emitted by tool calls and inline LLM-request
290    /// attachments produced during the turn.
291    pub committed_attachment_ids: Vec<crate::AttachmentId>,
292}
293
294#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
295pub struct RuntimeCommitResult {
296    pub head_revision: u64,
297    pub checkpoint_ref: BlobRef,
298    pub manifest: SessionCheckpoint,
299}
300
301#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
302pub struct LeaseOwnerIdentity {
303    pub owner_id: String,
304    pub incarnation_id: String,
305    #[serde(default)]
306    pub liveness: LeaseOwnerLiveness,
307}
308
309impl LeaseOwnerIdentity {
310    pub fn opaque(
311        owner_id: impl Into<String>,
312        incarnation_id: impl Into<String>,
313    ) -> LeaseOwnerIdentity {
314        LeaseOwnerIdentity {
315            owner_id: owner_id.into(),
316            incarnation_id: incarnation_id.into(),
317            liveness: LeaseOwnerLiveness::Opaque,
318        }
319    }
320
321    pub fn local_process(
322        owner_id: impl Into<String>,
323        incarnation_id: impl Into<String>,
324        host_id: impl Into<String>,
325    ) -> LeaseOwnerIdentity {
326        let liveness = LeaseOwnerLiveness::current_local_process(host_id.into())
327            .unwrap_or(LeaseOwnerLiveness::Opaque);
328        LeaseOwnerIdentity {
329            owner_id: owner_id.into(),
330            incarnation_id: incarnation_id.into(),
331            liveness,
332        }
333    }
334
335    pub fn same_incarnation(&self, other: &LeaseOwnerIdentity) -> bool {
336        self.owner_id == other.owner_id && self.incarnation_id == other.incarnation_id
337    }
338
339    pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerIdentity) -> bool {
340        self.liveness
341            .is_definitely_dead_for_claimant(&claimant.liveness)
342    }
343}
344
345#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
346#[serde(tag = "kind", rename_all = "snake_case")]
347pub enum LeaseOwnerLiveness {
348    LocalProcess {
349        host_id: String,
350        boot_id: String,
351        pid: u32,
352        process_start: String,
353    },
354    #[default]
355    Opaque,
356}
357
358impl LeaseOwnerLiveness {
359    pub fn current_local_process(host_id: impl Into<String>) -> Option<LeaseOwnerLiveness> {
360        let boot_id = std::fs::read_to_string(PROC_BOOT_ID_PATH)
361            .ok()
362            .map(|value| value.trim().to_string())
363            .filter(|value| !value.is_empty())?;
364        let pid = std::process::id();
365        let process_start = read_linux_process_start(pid)?;
366        Some(LeaseOwnerLiveness::LocalProcess {
367            host_id: host_id.into(),
368            boot_id,
369            pid,
370            process_start,
371        })
372    }
373
374    pub fn local_process_for_test(
375        host_id: impl Into<String>,
376        boot_id: impl Into<String>,
377        pid: u32,
378        process_start: impl Into<String>,
379    ) -> LeaseOwnerLiveness {
380        LeaseOwnerLiveness::LocalProcess {
381            host_id: host_id.into(),
382            boot_id: boot_id.into(),
383            pid,
384            process_start: process_start.into(),
385        }
386    }
387
388    pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerLiveness) -> bool {
389        let (
390            LeaseOwnerLiveness::LocalProcess {
391                host_id,
392                boot_id,
393                pid,
394                process_start,
395            },
396            LeaseOwnerLiveness::LocalProcess {
397                host_id: claimant_host_id,
398                boot_id: claimant_boot_id,
399                ..
400            },
401        ) = (self, claimant)
402        else {
403            return false;
404        };
405        if host_id != claimant_host_id || boot_id != claimant_boot_id {
406            return false;
407        }
408        matches!(linux_process_is_live(*pid, process_start), Some(false))
409    }
410}
411
412fn read_linux_process_start(pid: u32) -> Option<String> {
413    let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
414    parse_linux_process_start(&stat)
415}
416
417fn linux_process_is_live(pid: u32, expected_process_start: &str) -> Option<bool> {
418    match std::fs::read_to_string(format!("/proc/{pid}/stat")) {
419        Ok(stat) => parse_linux_process_start(&stat).map(|start| start == expected_process_start),
420        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Some(false),
421        Err(_) => None,
422    }
423}
424
425fn parse_linux_process_start(stat: &str) -> Option<String> {
426    let after_comm = stat.rsplit_once(") ")?.1;
427    after_comm.split_whitespace().nth(19).map(ToOwned::to_owned)
428}
429
430#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
431pub struct SessionExecutionLease {
432    pub session_id: String,
433    pub owner: LeaseOwnerIdentity,
434    pub lease_token: String,
435    pub fencing_token: u64,
436    pub claimed_at_epoch_ms: u64,
437    pub expires_at_epoch_ms: u64,
438}
439
440#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
441pub struct SessionExecutionLeaseFence {
442    pub session_id: String,
443    pub owner: LeaseOwnerIdentity,
444    pub lease_token: String,
445    pub fencing_token: u64,
446}
447
448#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
449pub struct SessionExecutionLeaseCompletion {
450    pub session_id: String,
451    pub owner: LeaseOwnerIdentity,
452    pub lease_token: String,
453    pub fencing_token: u64,
454}
455
456impl SessionExecutionLease {
457    pub fn fence(&self) -> SessionExecutionLeaseFence {
458        SessionExecutionLeaseFence {
459            session_id: self.session_id.clone(),
460            owner: self.owner.clone(),
461            lease_token: self.lease_token.clone(),
462            fencing_token: self.fencing_token,
463        }
464    }
465
466    pub fn completion(&self) -> SessionExecutionLeaseCompletion {
467        SessionExecutionLeaseCompletion {
468            session_id: self.session_id.clone(),
469            owner: self.owner.clone(),
470            lease_token: self.lease_token.clone(),
471            fencing_token: self.fencing_token,
472        }
473    }
474}
475
476impl SessionExecutionLeaseCompletion {
477    pub fn from_lease(lease: &SessionExecutionLease) -> Self {
478        lease.completion()
479    }
480}
481
482#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
483pub enum SessionExecutionLeaseClaimOutcome {
484    Acquired(SessionExecutionLease),
485    Busy { holder: SessionExecutionLease },
486}
487
488impl SessionExecutionLeaseClaimOutcome {
489    pub fn acquired(self) -> Option<SessionExecutionLease> {
490        match self {
491            Self::Acquired(lease) => Some(lease),
492            Self::Busy { .. } => None,
493        }
494    }
495}
496
497// =============================================================================
498// Attachment write-ahead manifest
499// =============================================================================
500
501/// A pending attachment write recorded *before* the bytes hit the
502/// [`AttachmentStore`](crate::AttachmentStore) backend.
503///
504/// The runtime calls [`AttachmentManifest::record_intent`] from the
505/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
506/// wrapper before each `put`, so the manifest is a durable record that
507/// "some bytes are about to land at this URI." When the turn that
508/// references the attachment commits successfully via
509/// [`RuntimePersistence::commit_runtime_state`], the same transaction
510/// stamps `committed_at_epoch_ms`. Periodic GC sweeps manifest rows
511/// whose intent has aged past a host-chosen threshold without ever
512/// being committed and deletes the corresponding bytes — that's how we
513/// reconcile orphaned files left behind by crashes between `put` and
514/// the next turn commit.
515#[derive(Clone, Debug)]
516pub struct AttachmentIntent {
517    pub attachment_id: crate::AttachmentId,
518    pub session_id: String,
519    /// Canonical URI for the attachment payload in the backing store.
520    /// For file-backed stores this is the absolute on-disk path; for
521    /// blob-backed stores it can be any stable identifier the host
522    /// uses to clean the payload up.
523    pub canonical_uri: String,
524    pub intent_at_epoch_ms: u64,
525}
526
527#[derive(Clone, Debug)]
528pub struct AttachmentManifestEntry {
529    pub attachment_id: crate::AttachmentId,
530    pub session_id: String,
531    pub canonical_uri: String,
532    pub intent_at_epoch_ms: u64,
533    pub committed_at_epoch_ms: Option<u64>,
534}
535
536/// Trait alias for the synchronous attachment-manifest surface on
537/// [`RuntimePersistence`]. Used by
538/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
539/// to record intent rows before `put` and by GC sweeps to reconcile
540/// orphans. See the [`AttachmentIntent`] doc comment for the full
541/// crash-safety story.
542///
543/// Backends with no attachment story (in-memory tests, mock stores)
544/// inherit the default no-op impls on [`RuntimePersistence`] and
545/// participate transparently — `record_intent` is a no-op, the
546/// scoped wrapper still works, and GC sweeps return empty.
547pub trait AttachmentManifest: Send + Sync {
548    fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
549
550    /// Mark a set of attachment ids as committed (i.e. now referenced
551    /// by a durable session-graph commit). Backends that store
552    /// commits and manifest in the same database stamp this inside
553    /// the commit transaction; the trait-level method is the
554    /// out-of-band entry point for hosts that want to commit an id
555    /// outside the normal turn-commit flow.
556    fn commit_refs(
557        &self,
558        session_id: &str,
559        attachment_ids: &[crate::AttachmentId],
560    ) -> Result<(), StoreError>;
561
562    /// Return manifest entries whose intent has aged past
563    /// `older_than_epoch_ms` without ever being committed. Hosts run
564    /// this periodically to find orphans left by crashes between
565    /// `record_intent` and the next turn commit.
566    fn list_uncommitted(
567        &self,
568        older_than_epoch_ms: u64,
569    ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
570
571    /// Remove a manifest row entirely. Called by the GC coordinator
572    /// after the corresponding bytes have been removed from the
573    /// backing [`AttachmentStore`](crate::AttachmentStore).
574    fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
575}
576
577/// Mixin macro for [`RuntimePersistence`] implementors that have no
578/// attachment-write story (mock backends, in-memory test stores,
579/// runtime-perf harnesses). Pastes no-op impls of every
580/// [`AttachmentManifest`] method.
581#[macro_export]
582macro_rules! impl_noop_attachment_manifest {
583    ($ty:ty) => {
584        impl $crate::AttachmentManifest for $ty {
585            fn record_intent(
586                &self,
587                _intent: $crate::AttachmentIntent,
588            ) -> ::std::result::Result<(), $crate::StoreError> {
589                Ok(())
590            }
591
592            fn commit_refs(
593                &self,
594                _session_id: &str,
595                _attachment_ids: &[$crate::AttachmentId],
596            ) -> ::std::result::Result<(), $crate::StoreError> {
597                Ok(())
598            }
599
600            fn list_uncommitted(
601                &self,
602                _older_than_epoch_ms: u64,
603            ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
604            {
605                Ok(Vec::new())
606            }
607
608            fn forget(
609                &self,
610                _attachment_id: &$crate::AttachmentId,
611            ) -> ::std::result::Result<(), $crate::StoreError> {
612                Ok(())
613            }
614        }
615    };
616}
617
618/// Reject a persisted record whose `schema_version` does not match the
619/// version this binary supports. Backends call this immediately after
620/// deserializing a record from durable storage.
621pub fn ensure_supported_schema_version(
622    record_kind: &'static str,
623    actual: u32,
624    expected: u32,
625) -> Result<(), StoreError> {
626    if actual == expected {
627        Ok(())
628    } else {
629        Err(StoreError::UnsupportedRecordSchemaVersion {
630            record_kind,
631            actual,
632            expected,
633        })
634    }
635}
636
637#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
638pub struct RuntimeTurnCommitStamp {
639    pub session_id: String,
640    pub turn_id: String,
641    pub turn_commit_hash: String,
642}
643
644impl RuntimeTurnCommitStamp {
645    pub fn new(
646        session_id: impl Into<String>,
647        turn_id: impl Into<String>,
648        turn_commit_hash: impl Into<String>,
649    ) -> Self {
650        Self {
651            session_id: session_id.into(),
652            turn_id: turn_id.into(),
653            turn_commit_hash: turn_commit_hash.into(),
654        }
655    }
656}
657
658fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
659    crate::PersistedTurnState {
660        turn_index: state.turn_index,
661        token_usage: state.token_usage.clone(),
662        last_prompt_usage: state.last_prompt_usage.clone(),
663        protocol_turn_options: state.protocol_turn_options.clone(),
664    }
665}
666
667fn build_checkpoint_from_persisted_state(
668    state: &crate::RuntimeSessionState,
669) -> HydratedSessionCheckpoint {
670    HydratedSessionCheckpoint {
671        turn_state: build_persisted_turn_state(state),
672        tool_state_ref: state.tool_state_ref.clone(),
673        tool_state: state.tool_state_snapshot.clone(),
674        plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
675        plugin_snapshot_revision: state.plugin_snapshot_revision,
676        plugin_snapshot: state.plugin_snapshot.clone(),
677        execution_state_ref: state.execution_state_ref.clone(),
678        execution_state: state.execution_state_snapshot.clone(),
679    }
680}
681
682impl RuntimeCommit {
683    pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
684        let mut semantic_commit = self.clone();
685        semantic_commit.expected_head_revision = None;
686        semantic_commit.session_execution_lease = None;
687        semantic_commit.release_session_execution_lease = None;
688        semantic_commit.turn_commit = None;
689        let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
690            StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
691        })?;
692        scrub_turn_commit_hash_value(&mut semantic_commit);
693        crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
694            StoreError::Backend(format!(
695                "failed to serialize runtime turn commit hash: {err}"
696            ))
697        })
698    }
699
700    pub fn persisted_state(
701        state: &crate::RuntimeSessionState,
702        usage_deltas: &[crate::TokenLedgerEntry],
703    ) -> Self {
704        Self {
705            session_id: state.session_id.clone(),
706            expected_head_revision: state.head_revision,
707            session_execution_lease: None,
708            release_session_execution_lease: None,
709            config: persisted_session_config_from_state(state),
710            agent_frames: state.agent_frames.clone(),
711            current_agent_frame_id: state.current_agent_frame_id.clone(),
712            graph: if state.graph_replace_required || state.head_revision.is_none() {
713                GraphCommitDelta::ReplaceFull(state.session_graph.clone())
714            } else {
715                GraphCommitDelta::Unchanged {
716                    leaf_node_id: state.session_graph.leaf_node_id.clone(),
717                }
718            },
719            checkpoint: build_checkpoint_from_persisted_state(state),
720            usage_deltas: usage_deltas.to_vec(),
721            turn_commit: None,
722            completed_queue_claims: Vec::new(),
723            committed_attachment_ids: Vec::new(),
724        }
725    }
726
727    pub(crate) fn persisted_state_with_graph_commit(
728        state: &crate::RuntimeSessionState,
729        graph: GraphCommitDelta,
730        usage_deltas: &[crate::TokenLedgerEntry],
731    ) -> Self {
732        Self {
733            session_id: state.session_id.clone(),
734            expected_head_revision: state.head_revision,
735            session_execution_lease: None,
736            release_session_execution_lease: None,
737            config: persisted_session_config_from_state(state),
738            agent_frames: state.agent_frames.clone(),
739            current_agent_frame_id: state.current_agent_frame_id.clone(),
740            graph,
741            checkpoint: build_checkpoint_from_persisted_state(state),
742            usage_deltas: usage_deltas.to_vec(),
743            turn_commit: None,
744            completed_queue_claims: Vec::new(),
745            committed_attachment_ids: Vec::new(),
746        }
747    }
748
749    pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
750        self.turn_commit = Some(turn_commit);
751        self
752    }
753
754    pub fn with_session_execution_lease(mut self, lease: SessionExecutionLeaseFence) -> Self {
755        self.session_execution_lease = Some(lease);
756        self
757    }
758
759    pub fn releasing_session_execution_lease(
760        mut self,
761        completion: SessionExecutionLeaseCompletion,
762    ) -> Self {
763        self.release_session_execution_lease = Some(completion);
764        self
765    }
766
767    pub fn completing_queue_claim(
768        mut self,
769        completed_queue_claim: crate::QueuedWorkCompletion,
770    ) -> Self {
771        self.completed_queue_claims.push(completed_queue_claim);
772        self
773    }
774
775    pub fn completing_queue_claims(
776        mut self,
777        completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
778    ) -> Self {
779        self.completed_queue_claims.extend(completed_queue_claims);
780        self
781    }
782
783    pub fn with_committed_attachments(
784        mut self,
785        attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
786    ) -> Self {
787        self.committed_attachment_ids = attachment_ids.into_iter().collect();
788        self
789    }
790}
791
792fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
793    match value {
794        serde_json::Value::Object(map) => {
795            let is_message = map.contains_key("role") && map.contains_key("parts");
796            let is_message_part = map.contains_key("kind")
797                && map.contains_key("content")
798                && map.contains_key("prune_state");
799            if is_message || is_message_part {
800                map.remove("id");
801            }
802            for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
803                map.remove(volatile_key);
804            }
805            for child in map.values_mut() {
806                scrub_turn_commit_hash_value(child);
807            }
808        }
809        serde_json::Value::Array(items) => {
810            for item in items {
811                scrub_turn_commit_hash_value(item);
812            }
813        }
814        _ => {}
815    }
816}
817
818fn persisted_session_state_from_head(
819    head: SessionHead,
820    checkpoint: Option<HydratedSessionCheckpoint>,
821) -> crate::RuntimeSessionState {
822    let mut state = crate::RuntimeSessionState {
823        session_id: head.session_id,
824        policy: crate::SessionPolicy::default(),
825        agent_frames: head.agent_frames,
826        current_agent_frame_id: head.current_agent_frame_id,
827        session_graph: head.graph,
828        turn_index: 0,
829        token_usage: crate::TokenUsage::default(),
830        last_prompt_usage: None,
831        protocol_turn_options: crate::ProtocolTurnOptions::default(),
832        tool_state_ref: None,
833        tool_state_generation: None,
834        tool_state_snapshot: None,
835        plugin_snapshot_ref: None,
836        plugin_snapshot_revision: None,
837        plugin_snapshot: None,
838        execution_state_ref: None,
839        execution_state_snapshot: None,
840        token_ledger: head.token_ledger,
841        checkpoint_ref: head.checkpoint_ref.clone(),
842        head_revision: Some(head.head_revision),
843        graph_replace_required: false,
844    };
845    state.policy.model = head.config.model.clone();
846    state.policy.provider_id = head.config.provider_id.clone();
847    if let Some(checkpoint) = checkpoint {
848        state.turn_index = checkpoint.turn_state.turn_index;
849        state.token_usage = checkpoint.turn_state.token_usage;
850        state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
851        state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
852        state.tool_state_ref = checkpoint.tool_state_ref.clone();
853        state.tool_state_generation = checkpoint
854            .tool_state
855            .as_ref()
856            .map(|snapshot| snapshot.generation());
857        state.tool_state_snapshot = checkpoint.tool_state;
858        state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
859        state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
860        state.plugin_snapshot = checkpoint.plugin_snapshot;
861        state.execution_state_ref = checkpoint.execution_state_ref.clone();
862        state.execution_state_snapshot = checkpoint.execution_state;
863    }
864    state.ensure_agent_frame_initialized();
865    state
866}
867
868impl Default for SessionHead {
869    fn default() -> Self {
870        Self {
871            session_id: default_root_session_id(),
872            head_revision: 0,
873            agent_frames: Vec::new(),
874            current_agent_frame_id: String::new(),
875            graph: crate::SessionGraph::default(),
876            config: crate::PersistedSessionConfig::default(),
877            checkpoint_ref: None,
878            token_ledger: Vec::new(),
879        }
880    }
881}
882
883impl Default for SessionHeadMeta {
884    fn default() -> Self {
885        Self {
886            session_id: default_root_session_id(),
887            head_revision: 0,
888            config: crate::PersistedSessionConfig::default(),
889            agent_frames: Vec::new(),
890            current_agent_frame_id: String::new(),
891            checkpoint_ref: None,
892            leaf_node_id: None,
893            graph_node_count: 0,
894            token_ledger: Vec::new(),
895        }
896    }
897}
898
899/// Exact settled-session persistence protocol required by the runtime.
900///
901/// This is the runtime's atomic transaction facade for visible session state:
902/// session graph/head commits, queued-work ingress and completion, final
903/// turn-commit idempotency, metadata, usage, and the attachment write-ahead
904/// manifest. In-flight nondeterministic work belongs to the active
905/// [`EffectHost`](crate::EffectHost), not to the store contract.
906///
907/// The [`AttachmentManifest`] supertrait is required so the runtime can wrap
908/// any persistence backend with a
909/// [`SessionScopedAttachmentStore`](crate::SessionScopedAttachmentStore)
910/// without dual-trait casting. Backends with no attachment-write story can
911/// implement the manifest methods as no-ops via
912/// [`NoopAttachmentManifest`]'s blanket helpers.
913#[async_trait::async_trait]
914pub trait RuntimePersistence: AttachmentManifest + Send + Sync {
915    /// Durability tier this session store provides; defaults to
916    /// [`DurabilityTier::Inline`].
917    fn durability_tier(&self) -> crate::DurabilityTier {
918        crate::DurabilityTier::Inline
919    }
920
921    async fn load_session(
922        &self,
923        scope: SessionReadScope,
924    ) -> Result<Option<PersistedSessionRead>, StoreError>;
925
926    async fn load_node(
927        &self,
928        node_id: &str,
929    ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
930
931    async fn commit_runtime_state(
932        &self,
933        commit: RuntimeCommit,
934    ) -> Result<RuntimeCommitResult, StoreError>;
935
936    /// Try to claim the durable single-writer execution lane for `session_id`.
937    ///
938    /// Returns [`SessionExecutionLeaseClaimOutcome::Busy`] when another owner
939    /// holds an unexpired lease. Expired or released leases may be reclaimed
940    /// and receive a higher fencing token. An unexpired lease held by the same
941    /// owner id but a different incarnation is busy.
942    async fn try_claim_session_execution_lease(
943        &self,
944        session_id: &str,
945        owner: &LeaseOwnerIdentity,
946        lease_ttl_ms: u64,
947    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
948
949    /// Reclaim an unexpired session execution lease whose observed holder is
950    /// definitely dead according to persisted local-process liveness metadata.
951    ///
952    /// Backends must CAS on `observed_holder` so a stale claimant cannot clear
953    /// a newer live lease that won the race after the busy observation.
954    async fn reclaim_session_execution_lease(
955        &self,
956        session_id: &str,
957        owner: &LeaseOwnerIdentity,
958        observed_holder: &SessionExecutionLeaseFence,
959        lease_ttl_ms: u64,
960    ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
961
962    /// Extend a live session execution lease owned by the caller.
963    ///
964    /// Backends must reject stale, released, superseded, or expired fences with
965    /// [`StoreError::SessionExecutionLeaseExpired`].
966    async fn renew_session_execution_lease(
967        &self,
968        fence: &SessionExecutionLeaseFence,
969        lease_ttl_ms: u64,
970    ) -> Result<SessionExecutionLease, StoreError>;
971
972    /// Release a session execution lease fenced by its completion token.
973    ///
974    /// This operation is idempotent and must not clear a newer owner's lease.
975    async fn release_session_execution_lease(
976        &self,
977        completion: &SessionExecutionLeaseCompletion,
978    ) -> Result<(), StoreError>;
979
980    /// Persist a queued-work batch for later claiming.
981    ///
982    /// The default implementation rejects the batch: backends that do not
983    /// support queued work inherit it and stay loud rather than silently
984    /// dropping work.
985    async fn enqueue_queued_work(
986        &self,
987        _batch: crate::QueuedWorkBatchDraft,
988    ) -> Result<crate::QueuedWorkBatch, StoreError> {
989        Err(StoreError::Backend(
990            "queued work is not supported by this test store".to_string(),
991        ))
992    }
993
994    /// Claim the next ready queued-work group for `owner_id`.
995    ///
996    /// The default implementation reports queued work as unsupported.
997    async fn claim_ready_queued_work(
998        &self,
999        session_id: &str,
1000        _session_execution_lease: &SessionExecutionLeaseFence,
1001        _owner: &LeaseOwnerIdentity,
1002        _boundary: crate::QueuedWorkClaimBoundary,
1003        _lease_ttl_ms: u64,
1004        _max_batches: usize,
1005    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1006        Err(StoreError::Backend(format!(
1007            "queued work is not supported for session `{session_id}` by this test store"
1008        )))
1009    }
1010
1011    /// Claim a specific ready batch set selected from the durable queue.
1012    ///
1013    /// This is the host-facing counterpart to [`claim_ready_queued_work`]:
1014    /// callers that project queued work into a UI can claim the exact batch ids
1015    /// they rendered instead of reconstructing authority from local draft state.
1016    /// The default implementation preserves the ordered queue contract by
1017    /// claiming the next ready group and returning it only when the durable ids
1018    /// match exactly.
1019    async fn claim_ready_queued_work_by_batch_ids(
1020        &self,
1021        session_id: &str,
1022        session_execution_lease: &SessionExecutionLeaseFence,
1023        owner: &LeaseOwnerIdentity,
1024        boundary: crate::QueuedWorkClaimBoundary,
1025        lease_ttl_ms: u64,
1026        batch_ids: &[String],
1027    ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1028        if batch_ids.is_empty() {
1029            return Ok(None);
1030        }
1031        let Some(claim) = self
1032            .claim_ready_queued_work(
1033                session_id,
1034                session_execution_lease,
1035                owner,
1036                boundary,
1037                lease_ttl_ms,
1038                batch_ids.len(),
1039            )
1040            .await?
1041        else {
1042            return Ok(None);
1043        };
1044        let claimed_ids = claim
1045            .batches
1046            .iter()
1047            .map(|batch| batch.batch_id.as_str())
1048            .collect::<Vec<_>>();
1049        if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
1050            return Ok(Some(claim));
1051        }
1052        self.abandon_queued_work_claim(&claim).await?;
1053        Ok(None)
1054    }
1055
1056    /// Extend the lease on a held queued-work claim.
1057    ///
1058    /// The default implementation reports the claim as expired, matching a
1059    /// backend that never granted one.
1060    async fn renew_queued_work_claim(
1061        &self,
1062        claim: &crate::QueuedWorkClaim,
1063        _lease_ttl_ms: u64,
1064    ) -> Result<crate::QueuedWorkClaim, StoreError> {
1065        Err(StoreError::QueuedWorkClaimExpired {
1066            session_id: claim.session_id.clone(),
1067            claim_id: claim.claim_id.clone(),
1068        })
1069    }
1070
1071    /// Release a held queued-work claim without completing it.
1072    ///
1073    /// The default implementation is a no-op: with no queued work there is
1074    /// nothing to release.
1075    async fn abandon_queued_work_claim(
1076        &self,
1077        _claim: &crate::QueuedWorkClaim,
1078    ) -> Result<(), StoreError> {
1079        Ok(())
1080    }
1081
1082    /// Remove an unclaimed queued-work batch from durable ingress.
1083    ///
1084    /// Returns the removed batch when cancellation won the race. Returns `None`
1085    /// when the batch is missing or currently held by a live claim; callers must
1086    /// treat that as "already claimed or completed" and must not restore any
1087    /// stale local draft state.
1088    ///
1089    /// The default implementation reports `None` (nothing queued, nothing to
1090    /// cancel).
1091    async fn cancel_queued_work_batch(
1092        &self,
1093        _session_id: &str,
1094        _batch_id: &str,
1095    ) -> Result<Option<crate::QueuedWorkBatch>, StoreError> {
1096        Ok(None)
1097    }
1098
1099    /// List all queued-work batches for a session.
1100    ///
1101    /// The default implementation reports an empty queue.
1102    async fn list_queued_work(
1103        &self,
1104        _session_id: &str,
1105    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
1106        Ok(Vec::new())
1107    }
1108
1109    /// List queued-work batches that are still pending presentation/editing.
1110    ///
1111    /// This excludes batches currently held by a live claim. Expired claims are
1112    /// considered pending again because they can be reclaimed or cancelled.
1113    async fn list_pending_queued_work(
1114        &self,
1115        session_id: &str,
1116    ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
1117        self.list_queued_work(session_id).await
1118    }
1119
1120    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
1121    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
1122
1123    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
1124    async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
1125    async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
1126}
1127
1128fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
1129    persisted_session_state_from_head(
1130        SessionHead {
1131            session_id: read.session_id,
1132            head_revision: read.head_revision,
1133            agent_frames: read.agent_frames,
1134            current_agent_frame_id: read.current_agent_frame_id,
1135            graph: read.graph,
1136            config: read.config,
1137            checkpoint_ref: read.checkpoint_ref,
1138            token_ledger: read.token_ledger,
1139        },
1140        read.checkpoint,
1141    )
1142}
1143
1144pub async fn load_persisted_session_state(
1145    store: &(dyn RuntimePersistence + '_),
1146) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1147    Ok(store
1148        .load_session(SessionReadScope::FullGraph)
1149        .await?
1150        .map(persisted_session_state_from_read))
1151}
1152
1153pub async fn load_persisted_session_state_active_path(
1154    store: &(dyn RuntimePersistence + '_),
1155    leaf_node_id: Option<String>,
1156) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1157    Ok(store
1158        .load_session(SessionReadScope::ActivePath { leaf_node_id })
1159        .await?
1160        .map(persisted_session_state_from_read))
1161}
1162
1163pub async fn refresh_persisted_session_state(
1164    store: &(dyn RuntimePersistence + '_),
1165    state: &mut crate::RuntimeSessionState,
1166) -> Result<(), StoreError> {
1167    if let Some(mut fresh) = load_persisted_session_state(store).await? {
1168        fresh.policy.session_id = state.policy.session_id.clone();
1169        fresh.policy.max_turns = state.policy.max_turns;
1170        *state = fresh;
1171    }
1172    Ok(())
1173}
1174
1175#[cfg(test)]
1176mod tests {
1177    use super::{LeaseOwnerIdentity, LeaseOwnerLiveness};
1178
1179    fn local_liveness(
1180        host_id: &str,
1181        boot_id: &str,
1182        pid: u32,
1183        process_start: &str,
1184    ) -> LeaseOwnerLiveness {
1185        LeaseOwnerLiveness::local_process_for_test(host_id, boot_id, pid, process_start)
1186    }
1187
1188    #[test]
1189    fn lease_owner_identity_requires_same_incarnation() {
1190        let first = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1191        let same = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1192        let next = LeaseOwnerIdentity::opaque("owner", "incarnation-b");
1193
1194        assert!(first.same_incarnation(&same));
1195        assert!(!first.same_incarnation(&next));
1196    }
1197
1198    #[test]
1199    fn local_liveness_only_proves_same_host_boot_dead_processes() {
1200        let holder = local_liveness(
1201            "host-a",
1202            "boot-a",
1203            std::process::id(),
1204            "not-the-current-process-start",
1205        );
1206        let same_host_boot = local_liveness("host-a", "boot-a", std::process::id(), "claimant");
1207        let other_host = local_liveness("host-b", "boot-a", std::process::id(), "claimant");
1208        let other_boot = local_liveness("host-a", "boot-b", std::process::id(), "claimant");
1209
1210        assert!(holder.is_definitely_dead_for_claimant(&same_host_boot));
1211        assert!(!holder.is_definitely_dead_for_claimant(&other_host));
1212        assert!(!holder.is_definitely_dead_for_claimant(&other_boot));
1213        assert!(!holder.is_definitely_dead_for_claimant(&LeaseOwnerLiveness::Opaque));
1214        assert!(!LeaseOwnerLiveness::Opaque.is_definitely_dead_for_claimant(&same_host_boot));
1215    }
1216}