1pub mod queued_work;
4
5const PROC_BOOT_ID_PATH: &str = "/proc/sys/kernel/random/boot_id";
6
7fn default_root_session_id() -> String {
8 "root".to_string()
9}
10
11#[cfg(test)]
12mod persisted_state_tests {
13 use super::*;
14
15 #[test]
16 fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
17 let state = persisted_session_state_from_head(
18 SessionHead {
19 session_id: "stored".to_string(),
20 head_revision: 7,
21 agent_frames: Vec::new(),
22 current_agent_frame_id: String::new(),
23 graph: crate::SessionGraph::default(),
24 config: crate::PersistedSessionConfig {
25 provider_id: "stored-provider".to_string(),
26 model: crate::ModelSpec::default(),
27 },
28 checkpoint_ref: None,
29 token_ledger: Vec::new(),
30 },
31 None,
32 );
33
34 assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
35 assert!(
36 state
37 .agent_frames
38 .iter()
39 .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
40 );
41 assert_eq!(state.head_revision, Some(7));
42 }
43}
44
45#[derive(Debug, thiserror::Error)]
46pub enum StoreError {
47 #[error(
48 "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
49 )]
50 SessionBindingMismatch {
51 bound_session_id: String,
52 attempted_session_id: String,
53 },
54 #[error("store does not support read scope {0:?}")]
55 UnsupportedReadScope(SessionReadScope),
56 #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
57 HeadRevisionConflict { expected: Option<u64>, actual: u64 },
58 #[error(
59 "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
60 )]
61 RuntimeTurnCommitConflict { session_id: String, turn_id: String },
62 #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
63 QueuedWorkClaimExpired {
64 session_id: String,
65 claim_id: String,
66 },
67 #[error("session execution lease for session `{session_id}` is missing or expired")]
68 SessionExecutionLeaseExpired { session_id: String },
69 #[error(
70 "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
71 )]
72 UnsupportedRecordSchemaVersion {
73 record_kind: &'static str,
74 actual: u32,
75 expected: u32,
76 },
77 #[error("store backend error: {0}")]
78 Backend(String),
79}
80
81#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
82pub struct SessionMeta {
83 pub session_id: String,
84 pub session_name: String,
85 pub created_at: String,
86 pub model: String,
87 pub cwd: Option<String>,
88 pub relation: crate::SessionRelation,
89}
90
91impl SessionMeta {
92 pub fn parent_session_id(&self) -> Option<&str> {
95 self.relation.parent_session_id()
96 }
97}
98
99#[derive(Clone, Debug)]
101pub struct SessionPickerInfo {
102 pub session_id: String,
103 pub cwd: Option<String>,
104 pub relation: crate::SessionRelation,
105 pub first_user_message: String,
106 pub user_message_count: usize,
107}
108
109impl SessionPickerInfo {
110 pub fn parent_session_id(&self) -> Option<&str> {
111 self.relation.parent_session_id()
112 }
113}
114
115#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
116#[serde(transparent)]
117pub struct BlobRef(pub String);
118
119impl BlobRef {
120 pub fn as_str(&self) -> &str {
121 &self.0
122 }
123}
124
125impl std::fmt::Display for BlobRef {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 f.write_str(&self.0)
128 }
129}
130
131impl From<String> for BlobRef {
132 fn from(value: String) -> Self {
133 Self(value)
134 }
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Eq)]
138pub struct GcReport {
139 pub root_count: usize,
140 pub retained_blob_count: usize,
141 pub deleted_blob_count: usize,
142}
143
144#[derive(Clone, Debug, Default, PartialEq, Eq)]
148pub struct VacuumReport {
149 pub removed_node_count: usize,
150}
151
152#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
153pub struct SessionCheckpoint {
154 #[serde(default)]
155 pub turn_state: crate::PersistedTurnState,
156 #[serde(default, skip_serializing_if = "Option::is_none")]
157 pub tool_state_ref: Option<BlobRef>,
158 #[serde(default, skip_serializing_if = "Option::is_none")]
159 pub plugin_snapshot_ref: Option<BlobRef>,
160 #[serde(default, skip_serializing_if = "Option::is_none")]
161 pub plugin_snapshot_revision: Option<u64>,
162 #[serde(default, skip_serializing_if = "Option::is_none")]
163 pub execution_state_ref: Option<BlobRef>,
164}
165
166#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
167pub struct HydratedSessionCheckpoint {
168 pub turn_state: crate::PersistedTurnState,
169 pub tool_state_ref: Option<BlobRef>,
170 pub tool_state: Option<crate::ToolState>,
171 pub plugin_snapshot_ref: Option<BlobRef>,
172 pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
173 pub plugin_snapshot_revision: Option<u64>,
174 pub execution_state_ref: Option<BlobRef>,
175 pub execution_state: Option<Vec<u8>>,
176}
177
178#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
179pub struct SessionHead {
180 #[serde(default = "default_root_session_id")]
181 pub session_id: String,
182 #[serde(default)]
183 pub head_revision: u64,
184 #[serde(default)]
185 pub agent_frames: Vec<crate::AgentFrameRecord>,
186 #[serde(default, skip_serializing_if = "String::is_empty")]
187 pub current_agent_frame_id: crate::AgentFrameId,
188 pub graph: crate::SessionGraph,
189 pub config: crate::PersistedSessionConfig,
190 #[serde(default, skip_serializing_if = "Option::is_none")]
191 pub checkpoint_ref: Option<BlobRef>,
192 #[serde(default, skip_serializing_if = "Vec::is_empty")]
193 pub token_ledger: Vec<crate::TokenLedgerEntry>,
194}
195
196#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
197pub struct SessionHeadMeta {
198 #[serde(default = "default_root_session_id")]
199 pub session_id: String,
200 #[serde(default)]
201 pub head_revision: u64,
202 pub config: crate::PersistedSessionConfig,
203 #[serde(default)]
204 pub agent_frames: Vec<crate::AgentFrameRecord>,
205 #[serde(default, skip_serializing_if = "String::is_empty")]
206 pub current_agent_frame_id: crate::AgentFrameId,
207 #[serde(default, skip_serializing_if = "Option::is_none")]
208 pub checkpoint_ref: Option<BlobRef>,
209 #[serde(default, skip_serializing_if = "Option::is_none")]
210 pub leaf_node_id: Option<String>,
211 #[serde(default)]
212 pub graph_node_count: usize,
213 #[serde(default, skip_serializing_if = "Vec::is_empty")]
214 pub token_ledger: Vec<crate::TokenLedgerEntry>,
215}
216
217fn persisted_session_config_from_state(
218 state: &crate::RuntimeSessionState,
219) -> crate::PersistedSessionConfig {
220 crate::PersistedSessionConfig {
221 provider_id: state.policy.recorded_provider_id().to_string(),
222 model: state.policy.model.clone(),
223 }
224}
225
226#[derive(Clone, Debug, PartialEq, Eq)]
227pub enum SessionReadScope {
228 FullGraph,
229 ActivePath { leaf_node_id: Option<String> },
230}
231
232#[derive(Clone, Debug)]
233pub struct PersistedSessionRead {
234 pub session_id: String,
235 pub head_revision: u64,
236 pub config: crate::PersistedSessionConfig,
237 pub agent_frames: Vec<crate::AgentFrameRecord>,
238 pub current_agent_frame_id: crate::AgentFrameId,
239 pub graph: crate::SessionGraph,
240 pub checkpoint_ref: Option<BlobRef>,
241 pub checkpoint: Option<HydratedSessionCheckpoint>,
242 pub token_ledger: Vec<crate::TokenLedgerEntry>,
243}
244
245#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
246pub enum GraphCommitDelta {
247 Unchanged {
248 leaf_node_id: Option<String>,
249 },
250 Append {
251 nodes: Vec<crate::SessionNodeRecord>,
252 leaf_node_id: Option<String>,
253 },
254 ReplaceFull(crate::SessionGraph),
255}
256
257impl GraphCommitDelta {
258 pub fn leaf_node_id(&self) -> Option<&String> {
259 match self {
260 Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
261 leaf_node_id.as_ref()
262 }
263 Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
264 }
265 }
266}
267
268#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
269pub struct RuntimeCommit {
270 pub session_id: String,
271 pub expected_head_revision: Option<u64>,
272 #[serde(default, skip_serializing_if = "Option::is_none")]
273 pub session_execution_lease: Option<SessionExecutionLeaseFence>,
274 #[serde(default, skip_serializing_if = "Option::is_none")]
275 pub release_session_execution_lease: Option<SessionExecutionLeaseCompletion>,
276 pub config: crate::PersistedSessionConfig,
277 pub agent_frames: Vec<crate::AgentFrameRecord>,
278 pub current_agent_frame_id: crate::AgentFrameId,
279 pub graph: GraphCommitDelta,
280 pub checkpoint: HydratedSessionCheckpoint,
281 pub usage_deltas: Vec<crate::TokenLedgerEntry>,
282 pub turn_commit: Option<RuntimeTurnCommitStamp>,
283 pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
284 pub committed_attachment_ids: Vec<crate::AttachmentId>,
292}
293
294#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
295pub struct RuntimeCommitResult {
296 pub head_revision: u64,
297 pub checkpoint_ref: BlobRef,
298 pub manifest: SessionCheckpoint,
299}
300
301#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
302pub struct LeaseOwnerIdentity {
303 pub owner_id: String,
304 pub incarnation_id: String,
305 #[serde(default)]
306 pub liveness: LeaseOwnerLiveness,
307}
308
309impl LeaseOwnerIdentity {
310 pub fn opaque(
311 owner_id: impl Into<String>,
312 incarnation_id: impl Into<String>,
313 ) -> LeaseOwnerIdentity {
314 LeaseOwnerIdentity {
315 owner_id: owner_id.into(),
316 incarnation_id: incarnation_id.into(),
317 liveness: LeaseOwnerLiveness::Opaque,
318 }
319 }
320
321 pub fn local_process(
322 owner_id: impl Into<String>,
323 incarnation_id: impl Into<String>,
324 host_id: impl Into<String>,
325 ) -> LeaseOwnerIdentity {
326 let liveness = LeaseOwnerLiveness::current_local_process(host_id.into())
327 .unwrap_or(LeaseOwnerLiveness::Opaque);
328 LeaseOwnerIdentity {
329 owner_id: owner_id.into(),
330 incarnation_id: incarnation_id.into(),
331 liveness,
332 }
333 }
334
335 pub fn same_incarnation(&self, other: &LeaseOwnerIdentity) -> bool {
336 self.owner_id == other.owner_id && self.incarnation_id == other.incarnation_id
337 }
338
339 pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerIdentity) -> bool {
340 self.liveness
341 .is_definitely_dead_for_claimant(&claimant.liveness)
342 }
343}
344
345#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
346#[serde(tag = "kind", rename_all = "snake_case")]
347pub enum LeaseOwnerLiveness {
348 LocalProcess {
349 host_id: String,
350 boot_id: String,
351 pid: u32,
352 process_start: String,
353 },
354 #[default]
355 Opaque,
356}
357
358impl LeaseOwnerLiveness {
359 pub fn current_local_process(host_id: impl Into<String>) -> Option<LeaseOwnerLiveness> {
360 let boot_id = std::fs::read_to_string(PROC_BOOT_ID_PATH)
361 .ok()
362 .map(|value| value.trim().to_string())
363 .filter(|value| !value.is_empty())?;
364 let pid = std::process::id();
365 let process_start = read_linux_process_start(pid)?;
366 Some(LeaseOwnerLiveness::LocalProcess {
367 host_id: host_id.into(),
368 boot_id,
369 pid,
370 process_start,
371 })
372 }
373
374 pub fn local_process_for_test(
375 host_id: impl Into<String>,
376 boot_id: impl Into<String>,
377 pid: u32,
378 process_start: impl Into<String>,
379 ) -> LeaseOwnerLiveness {
380 LeaseOwnerLiveness::LocalProcess {
381 host_id: host_id.into(),
382 boot_id: boot_id.into(),
383 pid,
384 process_start: process_start.into(),
385 }
386 }
387
388 pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerLiveness) -> bool {
389 let (
390 LeaseOwnerLiveness::LocalProcess {
391 host_id,
392 boot_id,
393 pid,
394 process_start,
395 },
396 LeaseOwnerLiveness::LocalProcess {
397 host_id: claimant_host_id,
398 boot_id: claimant_boot_id,
399 ..
400 },
401 ) = (self, claimant)
402 else {
403 return false;
404 };
405 if host_id != claimant_host_id || boot_id != claimant_boot_id {
406 return false;
407 }
408 matches!(linux_process_is_live(*pid, process_start), Some(false))
409 }
410}
411
412fn read_linux_process_start(pid: u32) -> Option<String> {
413 let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
414 parse_linux_process_start(&stat)
415}
416
417fn linux_process_is_live(pid: u32, expected_process_start: &str) -> Option<bool> {
418 match std::fs::read_to_string(format!("/proc/{pid}/stat")) {
419 Ok(stat) => parse_linux_process_start(&stat).map(|start| start == expected_process_start),
420 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Some(false),
421 Err(_) => None,
422 }
423}
424
425fn parse_linux_process_start(stat: &str) -> Option<String> {
426 let after_comm = stat.rsplit_once(") ")?.1;
427 after_comm.split_whitespace().nth(19).map(ToOwned::to_owned)
428}
429
430#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
431pub struct SessionExecutionLease {
432 pub session_id: String,
433 pub owner: LeaseOwnerIdentity,
434 pub lease_token: String,
435 pub fencing_token: u64,
436 pub claimed_at_epoch_ms: u64,
437 pub expires_at_epoch_ms: u64,
438}
439
440#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
441pub struct SessionExecutionLeaseFence {
442 pub session_id: String,
443 pub owner: LeaseOwnerIdentity,
444 pub lease_token: String,
445 pub fencing_token: u64,
446}
447
448#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
449pub struct SessionExecutionLeaseCompletion {
450 pub session_id: String,
451 pub owner: LeaseOwnerIdentity,
452 pub lease_token: String,
453 pub fencing_token: u64,
454}
455
456impl SessionExecutionLease {
457 pub fn fence(&self) -> SessionExecutionLeaseFence {
458 SessionExecutionLeaseFence {
459 session_id: self.session_id.clone(),
460 owner: self.owner.clone(),
461 lease_token: self.lease_token.clone(),
462 fencing_token: self.fencing_token,
463 }
464 }
465
466 pub fn completion(&self) -> SessionExecutionLeaseCompletion {
467 SessionExecutionLeaseCompletion {
468 session_id: self.session_id.clone(),
469 owner: self.owner.clone(),
470 lease_token: self.lease_token.clone(),
471 fencing_token: self.fencing_token,
472 }
473 }
474}
475
476impl SessionExecutionLeaseCompletion {
477 pub fn from_lease(lease: &SessionExecutionLease) -> Self {
478 lease.completion()
479 }
480}
481
482#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
483pub enum SessionExecutionLeaseClaimOutcome {
484 Acquired(SessionExecutionLease),
485 Busy { holder: SessionExecutionLease },
486}
487
488impl SessionExecutionLeaseClaimOutcome {
489 pub fn acquired(self) -> Option<SessionExecutionLease> {
490 match self {
491 Self::Acquired(lease) => Some(lease),
492 Self::Busy { .. } => None,
493 }
494 }
495}
496
497#[derive(Clone, Debug)]
516pub struct AttachmentIntent {
517 pub attachment_id: crate::AttachmentId,
518 pub session_id: String,
519 pub canonical_uri: String,
524 pub intent_at_epoch_ms: u64,
525}
526
527#[derive(Clone, Debug)]
528pub struct AttachmentManifestEntry {
529 pub attachment_id: crate::AttachmentId,
530 pub session_id: String,
531 pub canonical_uri: String,
532 pub intent_at_epoch_ms: u64,
533 pub committed_at_epoch_ms: Option<u64>,
534}
535
536pub trait AttachmentManifest: Send + Sync {
548 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
549
550 fn commit_refs(
557 &self,
558 session_id: &str,
559 attachment_ids: &[crate::AttachmentId],
560 ) -> Result<(), StoreError>;
561
562 fn list_uncommitted(
567 &self,
568 older_than_epoch_ms: u64,
569 ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
570
571 fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
575}
576
577#[macro_export]
582macro_rules! impl_noop_attachment_manifest {
583 ($ty:ty) => {
584 impl $crate::AttachmentManifest for $ty {
585 fn record_intent(
586 &self,
587 _intent: $crate::AttachmentIntent,
588 ) -> ::std::result::Result<(), $crate::StoreError> {
589 Ok(())
590 }
591
592 fn commit_refs(
593 &self,
594 _session_id: &str,
595 _attachment_ids: &[$crate::AttachmentId],
596 ) -> ::std::result::Result<(), $crate::StoreError> {
597 Ok(())
598 }
599
600 fn list_uncommitted(
601 &self,
602 _older_than_epoch_ms: u64,
603 ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
604 {
605 Ok(Vec::new())
606 }
607
608 fn forget(
609 &self,
610 _attachment_id: &$crate::AttachmentId,
611 ) -> ::std::result::Result<(), $crate::StoreError> {
612 Ok(())
613 }
614 }
615 };
616}
617
618pub fn ensure_supported_schema_version(
622 record_kind: &'static str,
623 actual: u32,
624 expected: u32,
625) -> Result<(), StoreError> {
626 if actual == expected {
627 Ok(())
628 } else {
629 Err(StoreError::UnsupportedRecordSchemaVersion {
630 record_kind,
631 actual,
632 expected,
633 })
634 }
635}
636
637#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
638pub struct RuntimeTurnCommitStamp {
639 pub session_id: String,
640 pub turn_id: String,
641 pub turn_commit_hash: String,
642}
643
644impl RuntimeTurnCommitStamp {
645 pub fn new(
646 session_id: impl Into<String>,
647 turn_id: impl Into<String>,
648 turn_commit_hash: impl Into<String>,
649 ) -> Self {
650 Self {
651 session_id: session_id.into(),
652 turn_id: turn_id.into(),
653 turn_commit_hash: turn_commit_hash.into(),
654 }
655 }
656}
657
658fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
659 crate::PersistedTurnState {
660 turn_index: state.turn_index,
661 token_usage: state.token_usage.clone(),
662 last_prompt_usage: state.last_prompt_usage.clone(),
663 protocol_turn_options: state.protocol_turn_options.clone(),
664 }
665}
666
667fn build_checkpoint_from_persisted_state(
668 state: &crate::RuntimeSessionState,
669) -> HydratedSessionCheckpoint {
670 HydratedSessionCheckpoint {
671 turn_state: build_persisted_turn_state(state),
672 tool_state_ref: state.tool_state_ref.clone(),
673 tool_state: state.tool_state_snapshot.clone(),
674 plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
675 plugin_snapshot_revision: state.plugin_snapshot_revision,
676 plugin_snapshot: state.plugin_snapshot.clone(),
677 execution_state_ref: state.execution_state_ref.clone(),
678 execution_state: state.execution_state_snapshot.clone(),
679 }
680}
681
682impl RuntimeCommit {
683 pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
684 let mut semantic_commit = self.clone();
685 semantic_commit.expected_head_revision = None;
686 semantic_commit.session_execution_lease = None;
687 semantic_commit.release_session_execution_lease = None;
688 semantic_commit.turn_commit = None;
689 let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
690 StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
691 })?;
692 scrub_turn_commit_hash_value(&mut semantic_commit);
693 crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
694 StoreError::Backend(format!(
695 "failed to serialize runtime turn commit hash: {err}"
696 ))
697 })
698 }
699
700 pub fn persisted_state(
701 state: &crate::RuntimeSessionState,
702 usage_deltas: &[crate::TokenLedgerEntry],
703 ) -> Self {
704 Self {
705 session_id: state.session_id.clone(),
706 expected_head_revision: state.head_revision,
707 session_execution_lease: None,
708 release_session_execution_lease: None,
709 config: persisted_session_config_from_state(state),
710 agent_frames: state.agent_frames.clone(),
711 current_agent_frame_id: state.current_agent_frame_id.clone(),
712 graph: if state.graph_replace_required || state.head_revision.is_none() {
713 GraphCommitDelta::ReplaceFull(state.session_graph.clone())
714 } else {
715 GraphCommitDelta::Unchanged {
716 leaf_node_id: state.session_graph.leaf_node_id.clone(),
717 }
718 },
719 checkpoint: build_checkpoint_from_persisted_state(state),
720 usage_deltas: usage_deltas.to_vec(),
721 turn_commit: None,
722 completed_queue_claims: Vec::new(),
723 committed_attachment_ids: Vec::new(),
724 }
725 }
726
727 pub(crate) fn persisted_state_with_graph_commit(
728 state: &crate::RuntimeSessionState,
729 graph: GraphCommitDelta,
730 usage_deltas: &[crate::TokenLedgerEntry],
731 ) -> Self {
732 Self {
733 session_id: state.session_id.clone(),
734 expected_head_revision: state.head_revision,
735 session_execution_lease: None,
736 release_session_execution_lease: None,
737 config: persisted_session_config_from_state(state),
738 agent_frames: state.agent_frames.clone(),
739 current_agent_frame_id: state.current_agent_frame_id.clone(),
740 graph,
741 checkpoint: build_checkpoint_from_persisted_state(state),
742 usage_deltas: usage_deltas.to_vec(),
743 turn_commit: None,
744 completed_queue_claims: Vec::new(),
745 committed_attachment_ids: Vec::new(),
746 }
747 }
748
749 pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
750 self.turn_commit = Some(turn_commit);
751 self
752 }
753
754 pub fn with_session_execution_lease(mut self, lease: SessionExecutionLeaseFence) -> Self {
755 self.session_execution_lease = Some(lease);
756 self
757 }
758
759 pub fn releasing_session_execution_lease(
760 mut self,
761 completion: SessionExecutionLeaseCompletion,
762 ) -> Self {
763 self.release_session_execution_lease = Some(completion);
764 self
765 }
766
767 pub fn completing_queue_claim(
768 mut self,
769 completed_queue_claim: crate::QueuedWorkCompletion,
770 ) -> Self {
771 self.completed_queue_claims.push(completed_queue_claim);
772 self
773 }
774
775 pub fn completing_queue_claims(
776 mut self,
777 completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
778 ) -> Self {
779 self.completed_queue_claims.extend(completed_queue_claims);
780 self
781 }
782
783 pub fn with_committed_attachments(
784 mut self,
785 attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
786 ) -> Self {
787 self.committed_attachment_ids = attachment_ids.into_iter().collect();
788 self
789 }
790}
791
792fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
793 match value {
794 serde_json::Value::Object(map) => {
795 let is_message = map.contains_key("role") && map.contains_key("parts");
796 let is_message_part = map.contains_key("kind")
797 && map.contains_key("content")
798 && map.contains_key("prune_state");
799 if is_message || is_message_part {
800 map.remove("id");
801 }
802 for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
803 map.remove(volatile_key);
804 }
805 for child in map.values_mut() {
806 scrub_turn_commit_hash_value(child);
807 }
808 }
809 serde_json::Value::Array(items) => {
810 for item in items {
811 scrub_turn_commit_hash_value(item);
812 }
813 }
814 _ => {}
815 }
816}
817
818fn persisted_session_state_from_head(
819 head: SessionHead,
820 checkpoint: Option<HydratedSessionCheckpoint>,
821) -> crate::RuntimeSessionState {
822 let mut state = crate::RuntimeSessionState {
823 session_id: head.session_id,
824 policy: crate::SessionPolicy::default(),
825 agent_frames: head.agent_frames,
826 current_agent_frame_id: head.current_agent_frame_id,
827 session_graph: head.graph,
828 turn_index: 0,
829 token_usage: crate::TokenUsage::default(),
830 last_prompt_usage: None,
831 protocol_turn_options: crate::ProtocolTurnOptions::default(),
832 tool_state_ref: None,
833 tool_state_generation: None,
834 tool_state_snapshot: None,
835 plugin_snapshot_ref: None,
836 plugin_snapshot_revision: None,
837 plugin_snapshot: None,
838 execution_state_ref: None,
839 execution_state_snapshot: None,
840 token_ledger: head.token_ledger,
841 checkpoint_ref: head.checkpoint_ref.clone(),
842 head_revision: Some(head.head_revision),
843 graph_replace_required: false,
844 };
845 state.policy.model = head.config.model.clone();
846 state.policy.provider_id = head.config.provider_id.clone();
847 if let Some(checkpoint) = checkpoint {
848 state.turn_index = checkpoint.turn_state.turn_index;
849 state.token_usage = checkpoint.turn_state.token_usage;
850 state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
851 state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
852 state.tool_state_ref = checkpoint.tool_state_ref.clone();
853 state.tool_state_generation = checkpoint
854 .tool_state
855 .as_ref()
856 .map(|snapshot| snapshot.generation());
857 state.tool_state_snapshot = checkpoint.tool_state;
858 state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
859 state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
860 state.plugin_snapshot = checkpoint.plugin_snapshot;
861 state.execution_state_ref = checkpoint.execution_state_ref.clone();
862 state.execution_state_snapshot = checkpoint.execution_state;
863 }
864 state.ensure_agent_frame_initialized();
865 state
866}
867
868impl Default for SessionHead {
869 fn default() -> Self {
870 Self {
871 session_id: default_root_session_id(),
872 head_revision: 0,
873 agent_frames: Vec::new(),
874 current_agent_frame_id: String::new(),
875 graph: crate::SessionGraph::default(),
876 config: crate::PersistedSessionConfig::default(),
877 checkpoint_ref: None,
878 token_ledger: Vec::new(),
879 }
880 }
881}
882
883impl Default for SessionHeadMeta {
884 fn default() -> Self {
885 Self {
886 session_id: default_root_session_id(),
887 head_revision: 0,
888 config: crate::PersistedSessionConfig::default(),
889 agent_frames: Vec::new(),
890 current_agent_frame_id: String::new(),
891 checkpoint_ref: None,
892 leaf_node_id: None,
893 graph_node_count: 0,
894 token_ledger: Vec::new(),
895 }
896 }
897}
898
899#[async_trait::async_trait]
914pub trait RuntimePersistence: AttachmentManifest + Send + Sync {
915 fn durability_tier(&self) -> crate::DurabilityTier {
918 crate::DurabilityTier::Inline
919 }
920
921 async fn load_session(
922 &self,
923 scope: SessionReadScope,
924 ) -> Result<Option<PersistedSessionRead>, StoreError>;
925
926 async fn load_node(
927 &self,
928 node_id: &str,
929 ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
930
931 async fn commit_runtime_state(
932 &self,
933 commit: RuntimeCommit,
934 ) -> Result<RuntimeCommitResult, StoreError>;
935
936 async fn try_claim_session_execution_lease(
943 &self,
944 session_id: &str,
945 owner: &LeaseOwnerIdentity,
946 lease_ttl_ms: u64,
947 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
948
949 async fn reclaim_session_execution_lease(
955 &self,
956 session_id: &str,
957 owner: &LeaseOwnerIdentity,
958 observed_holder: &SessionExecutionLeaseFence,
959 lease_ttl_ms: u64,
960 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
961
962 async fn renew_session_execution_lease(
967 &self,
968 fence: &SessionExecutionLeaseFence,
969 lease_ttl_ms: u64,
970 ) -> Result<SessionExecutionLease, StoreError>;
971
972 async fn release_session_execution_lease(
976 &self,
977 completion: &SessionExecutionLeaseCompletion,
978 ) -> Result<(), StoreError>;
979
980 async fn enqueue_queued_work(
986 &self,
987 _batch: crate::QueuedWorkBatchDraft,
988 ) -> Result<crate::QueuedWorkBatch, StoreError> {
989 Err(StoreError::Backend(
990 "queued work is not supported by this test store".to_string(),
991 ))
992 }
993
994 async fn claim_ready_queued_work(
998 &self,
999 session_id: &str,
1000 _session_execution_lease: &SessionExecutionLeaseFence,
1001 _owner: &LeaseOwnerIdentity,
1002 _boundary: crate::QueuedWorkClaimBoundary,
1003 _lease_ttl_ms: u64,
1004 _max_batches: usize,
1005 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1006 Err(StoreError::Backend(format!(
1007 "queued work is not supported for session `{session_id}` by this test store"
1008 )))
1009 }
1010
1011 async fn claim_ready_queued_work_by_batch_ids(
1020 &self,
1021 session_id: &str,
1022 session_execution_lease: &SessionExecutionLeaseFence,
1023 owner: &LeaseOwnerIdentity,
1024 boundary: crate::QueuedWorkClaimBoundary,
1025 lease_ttl_ms: u64,
1026 batch_ids: &[String],
1027 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1028 if batch_ids.is_empty() {
1029 return Ok(None);
1030 }
1031 let Some(claim) = self
1032 .claim_ready_queued_work(
1033 session_id,
1034 session_execution_lease,
1035 owner,
1036 boundary,
1037 lease_ttl_ms,
1038 batch_ids.len(),
1039 )
1040 .await?
1041 else {
1042 return Ok(None);
1043 };
1044 let claimed_ids = claim
1045 .batches
1046 .iter()
1047 .map(|batch| batch.batch_id.as_str())
1048 .collect::<Vec<_>>();
1049 if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
1050 return Ok(Some(claim));
1051 }
1052 self.abandon_queued_work_claim(&claim).await?;
1053 Ok(None)
1054 }
1055
1056 async fn renew_queued_work_claim(
1061 &self,
1062 claim: &crate::QueuedWorkClaim,
1063 _lease_ttl_ms: u64,
1064 ) -> Result<crate::QueuedWorkClaim, StoreError> {
1065 Err(StoreError::QueuedWorkClaimExpired {
1066 session_id: claim.session_id.clone(),
1067 claim_id: claim.claim_id.clone(),
1068 })
1069 }
1070
1071 async fn abandon_queued_work_claim(
1076 &self,
1077 _claim: &crate::QueuedWorkClaim,
1078 ) -> Result<(), StoreError> {
1079 Ok(())
1080 }
1081
1082 async fn cancel_queued_work_batch(
1092 &self,
1093 _session_id: &str,
1094 _batch_id: &str,
1095 ) -> Result<Option<crate::QueuedWorkBatch>, StoreError> {
1096 Ok(None)
1097 }
1098
1099 async fn list_queued_work(
1103 &self,
1104 _session_id: &str,
1105 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
1106 Ok(Vec::new())
1107 }
1108
1109 async fn list_pending_queued_work(
1114 &self,
1115 session_id: &str,
1116 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
1117 self.list_queued_work(session_id).await
1118 }
1119
1120 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
1121 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
1122
1123 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
1124 async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
1125 async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
1126}
1127
1128fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
1129 persisted_session_state_from_head(
1130 SessionHead {
1131 session_id: read.session_id,
1132 head_revision: read.head_revision,
1133 agent_frames: read.agent_frames,
1134 current_agent_frame_id: read.current_agent_frame_id,
1135 graph: read.graph,
1136 config: read.config,
1137 checkpoint_ref: read.checkpoint_ref,
1138 token_ledger: read.token_ledger,
1139 },
1140 read.checkpoint,
1141 )
1142}
1143
1144pub async fn load_persisted_session_state(
1145 store: &(dyn RuntimePersistence + '_),
1146) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1147 Ok(store
1148 .load_session(SessionReadScope::FullGraph)
1149 .await?
1150 .map(persisted_session_state_from_read))
1151}
1152
1153pub async fn load_persisted_session_state_active_path(
1154 store: &(dyn RuntimePersistence + '_),
1155 leaf_node_id: Option<String>,
1156) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1157 Ok(store
1158 .load_session(SessionReadScope::ActivePath { leaf_node_id })
1159 .await?
1160 .map(persisted_session_state_from_read))
1161}
1162
1163pub async fn refresh_persisted_session_state(
1164 store: &(dyn RuntimePersistence + '_),
1165 state: &mut crate::RuntimeSessionState,
1166) -> Result<(), StoreError> {
1167 if let Some(mut fresh) = load_persisted_session_state(store).await? {
1168 fresh.policy.session_id = state.policy.session_id.clone();
1169 fresh.policy.max_turns = state.policy.max_turns;
1170 *state = fresh;
1171 }
1172 Ok(())
1173}
1174
1175#[cfg(test)]
1176mod tests {
1177 use super::{LeaseOwnerIdentity, LeaseOwnerLiveness};
1178
1179 fn local_liveness(
1180 host_id: &str,
1181 boot_id: &str,
1182 pid: u32,
1183 process_start: &str,
1184 ) -> LeaseOwnerLiveness {
1185 LeaseOwnerLiveness::local_process_for_test(host_id, boot_id, pid, process_start)
1186 }
1187
1188 #[test]
1189 fn lease_owner_identity_requires_same_incarnation() {
1190 let first = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1191 let same = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1192 let next = LeaseOwnerIdentity::opaque("owner", "incarnation-b");
1193
1194 assert!(first.same_incarnation(&same));
1195 assert!(!first.same_incarnation(&next));
1196 }
1197
1198 #[test]
1199 fn local_liveness_only_proves_same_host_boot_dead_processes() {
1200 let holder = local_liveness(
1201 "host-a",
1202 "boot-a",
1203 std::process::id(),
1204 "not-the-current-process-start",
1205 );
1206 let same_host_boot = local_liveness("host-a", "boot-a", std::process::id(), "claimant");
1207 let other_host = local_liveness("host-b", "boot-a", std::process::id(), "claimant");
1208 let other_boot = local_liveness("host-a", "boot-b", std::process::id(), "claimant");
1209
1210 assert!(holder.is_definitely_dead_for_claimant(&same_host_boot));
1211 assert!(!holder.is_definitely_dead_for_claimant(&other_host));
1212 assert!(!holder.is_definitely_dead_for_claimant(&other_boot));
1213 assert!(!holder.is_definitely_dead_for_claimant(&LeaseOwnerLiveness::Opaque));
1214 assert!(!LeaseOwnerLiveness::Opaque.is_definitely_dead_for_claimant(&same_host_boot));
1215 }
1216}