1pub mod queued_work;
4
5const PROC_BOOT_ID_PATH: &str = "/proc/sys/kernel/random/boot_id";
6
7fn default_root_session_id() -> String {
8 "root".to_string()
9}
10
11pub const SESSION_HEAD_META_SCHEMA_VERSION: u32 = 1;
12pub const SESSION_CHECKPOINT_SCHEMA_VERSION: u32 = 1;
13
14#[cfg(test)]
15mod persisted_state_tests {
16 use super::*;
17
18 #[test]
19 fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
20 let state = persisted_session_state_from_head(
21 SessionHead {
22 session_id: "stored".to_string(),
23 head_revision: 7,
24 agent_frames: Vec::new(),
25 current_agent_frame_id: String::new(),
26 graph: crate::SessionGraph::default(),
27 config: crate::PersistedSessionConfig {
28 provider_id: "stored-provider".to_string(),
29 model: crate::ModelSpec::default(),
30 },
31 checkpoint_ref: None,
32 token_ledger: Vec::new(),
33 },
34 None,
35 );
36
37 assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
38 assert!(
39 state
40 .agent_frames
41 .iter()
42 .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
43 );
44 assert_eq!(state.head_revision, Some(7));
45 }
46
47 #[test]
48 fn versioned_json_record_rejects_missing_schema_version() {
49 let err = decode_versioned_json_record::<SessionHeadMeta>(
50 "{}",
51 "SessionHeadMeta",
52 SESSION_HEAD_META_SCHEMA_VERSION,
53 )
54 .expect_err("pre-versioned session head should fail");
55
56 assert!(matches!(
57 err,
58 StoreError::MissingRecordSchemaVersion {
59 record_kind: "SessionHeadMeta",
60 expected: SESSION_HEAD_META_SCHEMA_VERSION
61 }
62 ));
63 }
64
65 #[test]
66 fn versioned_json_record_rejects_invalid_schema_version() {
67 let err = decode_versioned_json_record::<SessionHeadMeta>(
68 r#"{"schema_version":"1"}"#,
69 "SessionHeadMeta",
70 SESSION_HEAD_META_SCHEMA_VERSION,
71 )
72 .expect_err("invalid session head schema version should fail");
73
74 assert!(matches!(
75 err,
76 StoreError::InvalidRecordSchemaVersion {
77 record_kind: "SessionHeadMeta",
78 expected: SESSION_HEAD_META_SCHEMA_VERSION,
79 ..
80 }
81 ));
82 }
83
84 #[test]
85 fn versioned_json_record_rejects_unsupported_schema_version() {
86 let err = decode_versioned_json_record::<SessionHeadMeta>(
87 r#"{"schema_version":2}"#,
88 "SessionHeadMeta",
89 SESSION_HEAD_META_SCHEMA_VERSION,
90 )
91 .expect_err("unsupported session head schema version should fail");
92
93 assert!(matches!(
94 err,
95 StoreError::UnsupportedRecordSchemaVersion {
96 record_kind: "SessionHeadMeta",
97 actual: 2,
98 expected: SESSION_HEAD_META_SCHEMA_VERSION
99 }
100 ));
101 }
102}
103
104#[derive(Debug, thiserror::Error)]
105pub enum StoreError {
106 #[error(
107 "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
108 )]
109 SessionBindingMismatch {
110 bound_session_id: String,
111 attempted_session_id: String,
112 },
113 #[error("store does not support read scope {0:?}")]
114 UnsupportedReadScope(SessionReadScope),
115 #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
116 HeadRevisionConflict { expected: Option<u64>, actual: u64 },
117 #[error(
118 "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
119 )]
120 RuntimeTurnCommitConflict { session_id: String, turn_id: String },
121 #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
122 QueuedWorkClaimExpired {
123 session_id: String,
124 claim_id: String,
125 },
126 #[error("turn input claim `{claim_id}` for session `{session_id}` is missing or expired")]
127 TurnInputClaimExpired {
128 session_id: String,
129 claim_id: String,
130 },
131 #[error(
132 "pending turn input source_key `{source_key}` for session `{session_id}` is already bound to input `{existing_input_id}` with different submitted content"
133 )]
134 PendingTurnInputSourceKeyConflict {
135 session_id: String,
136 source_key: String,
137 existing_input_id: String,
138 },
139 #[error("session execution lease for session `{session_id}` is missing or expired")]
140 SessionExecutionLeaseExpired { session_id: String },
141 #[error(
142 "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
143 )]
144 UnsupportedRecordSchemaVersion {
145 record_kind: &'static str,
146 actual: u32,
147 expected: u32,
148 },
149 #[error(
150 "{record_kind} is missing schema_version and was written by unsupported pre-versioned state (expected {expected})"
151 )]
152 MissingRecordSchemaVersion {
153 record_kind: &'static str,
154 expected: u32,
155 },
156 #[error("{record_kind} schema_version {actual} is invalid (expected integer {expected})")]
157 InvalidRecordSchemaVersion {
158 record_kind: &'static str,
159 actual: String,
160 expected: u32,
161 },
162 #[error("store backend error: {0}")]
163 Backend(String),
164}
165
166#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
167pub struct SessionMeta {
168 pub session_id: String,
169 pub session_name: String,
170 pub created_at: String,
171 pub model: String,
172 pub cwd: Option<String>,
173 pub relation: crate::SessionRelation,
174}
175
176impl SessionMeta {
177 pub fn parent_session_id(&self) -> Option<&str> {
180 self.relation.parent_session_id()
181 }
182}
183
184#[derive(Clone, Debug)]
186pub struct SessionPickerInfo {
187 pub session_id: String,
188 pub cwd: Option<String>,
189 pub relation: crate::SessionRelation,
190 pub first_user_message: String,
191 pub user_message_count: usize,
192}
193
194impl SessionPickerInfo {
195 pub fn parent_session_id(&self) -> Option<&str> {
196 self.relation.parent_session_id()
197 }
198}
199
200#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
201#[serde(transparent)]
202pub struct BlobRef(pub String);
203
204impl BlobRef {
205 pub fn as_str(&self) -> &str {
206 &self.0
207 }
208}
209
210impl std::fmt::Display for BlobRef {
211 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 f.write_str(&self.0)
213 }
214}
215
216impl From<String> for BlobRef {
217 fn from(value: String) -> Self {
218 Self(value)
219 }
220}
221
222#[derive(Clone, Debug, Default, PartialEq, Eq)]
223pub struct GcReport {
224 pub root_count: usize,
225 pub retained_blob_count: usize,
226 pub deleted_blob_count: usize,
227}
228
229#[derive(Clone, Debug, Default, PartialEq, Eq)]
235pub struct VacuumReport {
236 pub removed_node_count: usize,
237 pub removed_pending_turn_input_tombstone_count: usize,
238}
239
240#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
241pub struct SessionCheckpoint {
242 pub schema_version: u32,
243 pub turn_state: crate::PersistedTurnState,
244 #[serde(default, skip_serializing_if = "Option::is_none")]
245 pub tool_state_ref: Option<BlobRef>,
246 #[serde(default, skip_serializing_if = "Option::is_none")]
247 pub plugin_snapshot_ref: Option<BlobRef>,
248 #[serde(default, skip_serializing_if = "Option::is_none")]
249 pub plugin_snapshot_revision: Option<u64>,
250 #[serde(default, skip_serializing_if = "Option::is_none")]
251 pub execution_state_ref: Option<BlobRef>,
252}
253
254impl Default for SessionCheckpoint {
255 fn default() -> Self {
256 Self {
257 schema_version: SESSION_CHECKPOINT_SCHEMA_VERSION,
258 turn_state: crate::PersistedTurnState::default(),
259 tool_state_ref: None,
260 plugin_snapshot_ref: None,
261 plugin_snapshot_revision: None,
262 execution_state_ref: None,
263 }
264 }
265}
266
267impl SessionCheckpoint {
268 pub fn new(
269 turn_state: crate::PersistedTurnState,
270 tool_state_ref: Option<BlobRef>,
271 plugin_snapshot_ref: Option<BlobRef>,
272 plugin_snapshot_revision: Option<u64>,
273 execution_state_ref: Option<BlobRef>,
274 ) -> Self {
275 Self {
276 schema_version: SESSION_CHECKPOINT_SCHEMA_VERSION,
277 turn_state,
278 tool_state_ref,
279 plugin_snapshot_ref,
280 plugin_snapshot_revision,
281 execution_state_ref,
282 }
283 }
284}
285
286#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
287pub struct HydratedSessionCheckpoint {
288 pub turn_state: crate::PersistedTurnState,
289 pub tool_state_ref: Option<BlobRef>,
290 pub tool_state: Option<crate::ToolState>,
291 pub plugin_snapshot_ref: Option<BlobRef>,
292 pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
293 pub plugin_snapshot_revision: Option<u64>,
294 pub execution_state_ref: Option<BlobRef>,
295 pub execution_state: Option<Vec<u8>>,
296}
297
298#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
299pub struct SessionHead {
300 #[serde(default = "default_root_session_id")]
301 pub session_id: String,
302 #[serde(default)]
303 pub head_revision: u64,
304 #[serde(default)]
305 pub agent_frames: Vec<crate::AgentFrameRecord>,
306 #[serde(default, skip_serializing_if = "String::is_empty")]
307 pub current_agent_frame_id: crate::AgentFrameId,
308 pub graph: crate::SessionGraph,
309 pub config: crate::PersistedSessionConfig,
310 #[serde(default, skip_serializing_if = "Option::is_none")]
311 pub checkpoint_ref: Option<BlobRef>,
312 #[serde(default, skip_serializing_if = "Vec::is_empty")]
313 pub token_ledger: Vec<crate::TokenLedgerEntry>,
314}
315
316#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
317pub struct SessionHeadMeta {
318 pub schema_version: u32,
319 #[serde(default = "default_root_session_id")]
320 pub session_id: String,
321 #[serde(default)]
322 pub head_revision: u64,
323 pub config: crate::PersistedSessionConfig,
324 #[serde(default)]
325 pub agent_frames: Vec<crate::AgentFrameRecord>,
326 #[serde(default, skip_serializing_if = "String::is_empty")]
327 pub current_agent_frame_id: crate::AgentFrameId,
328 #[serde(default, skip_serializing_if = "Option::is_none")]
329 pub checkpoint_ref: Option<BlobRef>,
330 #[serde(default, skip_serializing_if = "Option::is_none")]
331 pub leaf_node_id: Option<String>,
332 #[serde(default)]
333 pub graph_node_count: usize,
334 #[serde(default, skip_serializing_if = "Vec::is_empty")]
335 pub token_ledger: Vec<crate::TokenLedgerEntry>,
336}
337
338fn persisted_session_config_from_state(
339 state: &crate::RuntimeSessionState,
340) -> crate::PersistedSessionConfig {
341 crate::PersistedSessionConfig {
342 provider_id: state.policy.recorded_provider_id().to_string(),
343 model: state.policy.model.clone(),
344 }
345}
346
347#[derive(Clone, Debug, PartialEq, Eq)]
348pub enum SessionReadScope {
349 FullGraph,
350 ActivePath { leaf_node_id: Option<String> },
351}
352
353#[derive(Clone, Debug)]
354pub struct PersistedSessionRead {
355 pub session_id: String,
356 pub head_revision: u64,
357 pub config: crate::PersistedSessionConfig,
358 pub agent_frames: Vec<crate::AgentFrameRecord>,
359 pub current_agent_frame_id: crate::AgentFrameId,
360 pub graph: crate::SessionGraph,
361 pub checkpoint_ref: Option<BlobRef>,
362 pub checkpoint: Option<HydratedSessionCheckpoint>,
363 pub token_ledger: Vec<crate::TokenLedgerEntry>,
364}
365
366#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
367pub enum GraphCommitDelta {
368 Unchanged {
369 leaf_node_id: Option<String>,
370 },
371 Append {
372 nodes: Vec<crate::SessionNodeRecord>,
373 leaf_node_id: Option<String>,
374 },
375 ReplaceFull(crate::SessionGraph),
376}
377
378impl GraphCommitDelta {
379 pub fn leaf_node_id(&self) -> Option<&String> {
380 match self {
381 Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
382 leaf_node_id.as_ref()
383 }
384 Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
385 }
386 }
387}
388
389#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
390pub struct RuntimeCommit {
391 pub session_id: String,
392 pub expected_head_revision: Option<u64>,
393 #[serde(default, skip_serializing_if = "Option::is_none")]
394 pub session_execution_lease: Option<SessionExecutionLeaseFence>,
395 #[serde(default, skip_serializing_if = "Option::is_none")]
396 pub release_session_execution_lease: Option<SessionExecutionLeaseCompletion>,
397 pub config: crate::PersistedSessionConfig,
398 pub agent_frames: Vec<crate::AgentFrameRecord>,
399 pub current_agent_frame_id: crate::AgentFrameId,
400 pub graph: GraphCommitDelta,
401 pub checkpoint: HydratedSessionCheckpoint,
402 pub usage_deltas: Vec<crate::TokenLedgerEntry>,
403 pub turn_commit: Option<RuntimeTurnCommitStamp>,
404 pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
405 pub completed_turn_input_claims: Vec<crate::TurnInputCompletion>,
406 #[serde(default, skip_serializing_if = "Option::is_none")]
407 pub interrupted_turn_input_turn_id: Option<String>,
408 pub committed_attachment_ids: Vec<crate::AttachmentId>,
416}
417
418#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
419pub struct RuntimeCommitResult {
420 pub head_revision: u64,
421 pub checkpoint_ref: BlobRef,
422 pub manifest: SessionCheckpoint,
423}
424
425#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
426pub struct LeaseOwnerIdentity {
427 pub owner_id: String,
428 pub incarnation_id: String,
429 #[serde(default)]
430 pub liveness: LeaseOwnerLiveness,
431}
432
433impl LeaseOwnerIdentity {
434 pub fn opaque(
435 owner_id: impl Into<String>,
436 incarnation_id: impl Into<String>,
437 ) -> LeaseOwnerIdentity {
438 LeaseOwnerIdentity {
439 owner_id: owner_id.into(),
440 incarnation_id: incarnation_id.into(),
441 liveness: LeaseOwnerLiveness::Opaque,
442 }
443 }
444
445 pub fn local_process(
446 owner_id: impl Into<String>,
447 incarnation_id: impl Into<String>,
448 host_id: impl Into<String>,
449 ) -> LeaseOwnerIdentity {
450 let liveness = LeaseOwnerLiveness::current_local_process(host_id.into())
451 .unwrap_or(LeaseOwnerLiveness::Opaque);
452 LeaseOwnerIdentity {
453 owner_id: owner_id.into(),
454 incarnation_id: incarnation_id.into(),
455 liveness,
456 }
457 }
458
459 pub fn same_incarnation(&self, other: &LeaseOwnerIdentity) -> bool {
460 self.owner_id == other.owner_id && self.incarnation_id == other.incarnation_id
461 }
462
463 pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerIdentity) -> bool {
464 self.liveness
465 .is_definitely_dead_for_claimant(&claimant.liveness)
466 }
467}
468
469#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
470#[serde(tag = "kind", rename_all = "snake_case")]
471pub enum LeaseOwnerLiveness {
472 LocalProcess {
473 host_id: String,
474 boot_id: String,
475 pid: u32,
476 process_start: String,
477 },
478 #[default]
479 Opaque,
480}
481
482impl LeaseOwnerLiveness {
483 pub fn current_local_process(host_id: impl Into<String>) -> Option<LeaseOwnerLiveness> {
484 let boot_id = std::fs::read_to_string(PROC_BOOT_ID_PATH)
485 .ok()
486 .map(|value| value.trim().to_string())
487 .filter(|value| !value.is_empty())?;
488 let pid = std::process::id();
489 let process_start = read_linux_process_start(pid)?;
490 Some(LeaseOwnerLiveness::LocalProcess {
491 host_id: host_id.into(),
492 boot_id,
493 pid,
494 process_start,
495 })
496 }
497
498 pub fn local_process_for_test(
499 host_id: impl Into<String>,
500 boot_id: impl Into<String>,
501 pid: u32,
502 process_start: impl Into<String>,
503 ) -> LeaseOwnerLiveness {
504 LeaseOwnerLiveness::LocalProcess {
505 host_id: host_id.into(),
506 boot_id: boot_id.into(),
507 pid,
508 process_start: process_start.into(),
509 }
510 }
511
512 pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerLiveness) -> bool {
513 let (
514 LeaseOwnerLiveness::LocalProcess {
515 host_id,
516 boot_id,
517 pid,
518 process_start,
519 },
520 LeaseOwnerLiveness::LocalProcess {
521 host_id: claimant_host_id,
522 boot_id: claimant_boot_id,
523 ..
524 },
525 ) = (self, claimant)
526 else {
527 return false;
528 };
529 if host_id != claimant_host_id || boot_id != claimant_boot_id {
530 return false;
531 }
532 matches!(linux_process_is_live(*pid, process_start), Some(false))
533 }
534}
535
536fn read_linux_process_start(pid: u32) -> Option<String> {
537 let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
538 parse_linux_process_start(&stat)
539}
540
541fn linux_process_is_live(pid: u32, expected_process_start: &str) -> Option<bool> {
542 match std::fs::read_to_string(format!("/proc/{pid}/stat")) {
543 Ok(stat) => parse_linux_process_start(&stat).map(|start| start == expected_process_start),
544 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Some(false),
545 Err(_) => None,
546 }
547}
548
549fn parse_linux_process_start(stat: &str) -> Option<String> {
550 let after_comm = stat.rsplit_once(") ")?.1;
551 after_comm.split_whitespace().nth(19).map(ToOwned::to_owned)
552}
553
554#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
555pub struct SessionExecutionLease {
556 pub session_id: String,
557 pub owner: LeaseOwnerIdentity,
558 pub lease_token: String,
559 pub fencing_token: u64,
560 pub claimed_at_epoch_ms: u64,
561 pub expires_at_epoch_ms: u64,
562}
563
564#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
565pub struct SessionExecutionLeaseFence {
566 pub session_id: String,
567 pub owner: LeaseOwnerIdentity,
568 pub lease_token: String,
569 pub fencing_token: u64,
570}
571
572#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
573pub struct SessionExecutionLeaseCompletion {
574 pub session_id: String,
575 pub owner: LeaseOwnerIdentity,
576 pub lease_token: String,
577 pub fencing_token: u64,
578}
579
580impl SessionExecutionLease {
581 pub fn fence(&self) -> SessionExecutionLeaseFence {
582 SessionExecutionLeaseFence {
583 session_id: self.session_id.clone(),
584 owner: self.owner.clone(),
585 lease_token: self.lease_token.clone(),
586 fencing_token: self.fencing_token,
587 }
588 }
589
590 pub fn completion(&self) -> SessionExecutionLeaseCompletion {
591 SessionExecutionLeaseCompletion {
592 session_id: self.session_id.clone(),
593 owner: self.owner.clone(),
594 lease_token: self.lease_token.clone(),
595 fencing_token: self.fencing_token,
596 }
597 }
598}
599
600impl SessionExecutionLeaseCompletion {
601 pub fn from_lease(lease: &SessionExecutionLease) -> Self {
602 lease.completion()
603 }
604}
605
606#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
607pub enum SessionExecutionLeaseClaimOutcome {
608 Acquired(SessionExecutionLease),
609 Busy { holder: SessionExecutionLease },
610}
611
612impl SessionExecutionLeaseClaimOutcome {
613 pub fn acquired(self) -> Option<SessionExecutionLease> {
614 match self {
615 Self::Acquired(lease) => Some(lease),
616 Self::Busy { .. } => None,
617 }
618 }
619}
620
621#[derive(Clone, Debug)]
640pub struct AttachmentIntent {
641 pub attachment_id: crate::AttachmentId,
642 pub session_id: String,
643 pub canonical_uri: String,
648 pub intent_at_epoch_ms: u64,
649}
650
651#[derive(Clone, Debug)]
652pub struct AttachmentManifestEntry {
653 pub attachment_id: crate::AttachmentId,
654 pub session_id: String,
655 pub canonical_uri: String,
656 pub intent_at_epoch_ms: u64,
657 pub committed_at_epoch_ms: Option<u64>,
658}
659
660pub trait AttachmentManifest: Send + Sync {
672 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
673
674 fn commit_refs(
681 &self,
682 session_id: &str,
683 attachment_ids: &[crate::AttachmentId],
684 ) -> Result<(), StoreError>;
685
686 fn list_uncommitted(
691 &self,
692 older_than_epoch_ms: u64,
693 ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
694
695 fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
699}
700
701#[macro_export]
706macro_rules! impl_noop_attachment_manifest {
707 ($ty:ty) => {
708 impl $crate::AttachmentManifest for $ty {
709 fn record_intent(
710 &self,
711 _intent: $crate::AttachmentIntent,
712 ) -> ::std::result::Result<(), $crate::StoreError> {
713 Ok(())
714 }
715
716 fn commit_refs(
717 &self,
718 _session_id: &str,
719 _attachment_ids: &[$crate::AttachmentId],
720 ) -> ::std::result::Result<(), $crate::StoreError> {
721 Ok(())
722 }
723
724 fn list_uncommitted(
725 &self,
726 _older_than_epoch_ms: u64,
727 ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
728 {
729 Ok(Vec::new())
730 }
731
732 fn forget(
733 &self,
734 _attachment_id: &$crate::AttachmentId,
735 ) -> ::std::result::Result<(), $crate::StoreError> {
736 Ok(())
737 }
738 }
739 };
740}
741
742pub fn ensure_supported_schema_version(
746 record_kind: &'static str,
747 actual: u32,
748 expected: u32,
749) -> Result<(), StoreError> {
750 if actual == expected {
751 Ok(())
752 } else {
753 Err(StoreError::UnsupportedRecordSchemaVersion {
754 record_kind,
755 actual,
756 expected,
757 })
758 }
759}
760
761pub fn ensure_supported_record_schema_version(
762 record_kind: &'static str,
763 value: &serde_json::Value,
764 expected: u32,
765) -> Result<(), StoreError> {
766 let Some(schema_version) = value.get("schema_version") else {
767 return Err(StoreError::MissingRecordSchemaVersion {
768 record_kind,
769 expected,
770 });
771 };
772 let Some(actual) = schema_version
773 .as_u64()
774 .and_then(|version| u32::try_from(version).ok())
775 else {
776 return Err(StoreError::InvalidRecordSchemaVersion {
777 record_kind,
778 actual: schema_version.to_string(),
779 expected,
780 });
781 };
782 ensure_supported_schema_version(record_kind, actual, expected)
783}
784
785pub fn decode_versioned_json_record<T>(
786 json: &str,
787 record_kind: &'static str,
788 expected: u32,
789) -> Result<T, StoreError>
790where
791 T: serde::de::DeserializeOwned,
792{
793 let value: serde_json::Value = serde_json::from_str(json)
794 .map_err(|err| StoreError::Backend(format!("failed to decode {record_kind}: {err}")))?;
795 ensure_supported_record_schema_version(record_kind, &value, expected)?;
796 serde_json::from_value(value)
797 .map_err(|err| StoreError::Backend(format!("failed to decode {record_kind}: {err}")))
798}
799
800#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
801pub struct RuntimeTurnCommitStamp {
802 pub session_id: String,
803 pub turn_id: String,
804 pub turn_commit_hash: String,
805}
806
807impl RuntimeTurnCommitStamp {
808 pub fn new(
809 session_id: impl Into<String>,
810 turn_id: impl Into<String>,
811 turn_commit_hash: impl Into<String>,
812 ) -> Self {
813 Self {
814 session_id: session_id.into(),
815 turn_id: turn_id.into(),
816 turn_commit_hash: turn_commit_hash.into(),
817 }
818 }
819}
820
821fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
822 crate::PersistedTurnState {
823 turn_index: state.turn_index,
824 token_usage: state.token_usage.clone(),
825 last_prompt_usage: state.last_prompt_usage.clone(),
826 protocol_turn_options: state.protocol_turn_options.clone(),
827 }
828}
829
830fn build_checkpoint_from_persisted_state(
831 state: &crate::RuntimeSessionState,
832) -> HydratedSessionCheckpoint {
833 HydratedSessionCheckpoint {
834 turn_state: build_persisted_turn_state(state),
835 tool_state_ref: state.tool_state_ref.clone(),
836 tool_state: state.tool_state_snapshot.clone(),
837 plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
838 plugin_snapshot_revision: state.plugin_snapshot_revision,
839 plugin_snapshot: state.plugin_snapshot.clone(),
840 execution_state_ref: state.execution_state_ref.clone(),
841 execution_state: state.execution_state_snapshot.clone(),
842 }
843}
844
845impl RuntimeCommit {
846 pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
847 let mut semantic_commit = self.clone();
848 semantic_commit.expected_head_revision = None;
849 semantic_commit.session_execution_lease = None;
850 semantic_commit.release_session_execution_lease = None;
851 semantic_commit.turn_commit = None;
852 let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
853 StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
854 })?;
855 scrub_turn_commit_hash_value(&mut semantic_commit);
856 crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
857 StoreError::Backend(format!(
858 "failed to serialize runtime turn commit hash: {err}"
859 ))
860 })
861 }
862
863 pub fn persisted_state(
864 state: &crate::RuntimeSessionState,
865 usage_deltas: &[crate::TokenLedgerEntry],
866 ) -> Self {
867 Self {
868 session_id: state.session_id.clone(),
869 expected_head_revision: state.head_revision,
870 session_execution_lease: None,
871 release_session_execution_lease: None,
872 config: persisted_session_config_from_state(state),
873 agent_frames: state.agent_frames.clone(),
874 current_agent_frame_id: state.current_agent_frame_id.clone(),
875 graph: if state.graph_replace_required || state.head_revision.is_none() {
876 GraphCommitDelta::ReplaceFull(state.session_graph.clone())
877 } else {
878 GraphCommitDelta::Unchanged {
879 leaf_node_id: state.session_graph.leaf_node_id.clone(),
880 }
881 },
882 checkpoint: build_checkpoint_from_persisted_state(state),
883 usage_deltas: usage_deltas.to_vec(),
884 turn_commit: None,
885 completed_queue_claims: Vec::new(),
886 completed_turn_input_claims: Vec::new(),
887 interrupted_turn_input_turn_id: None,
888 committed_attachment_ids: Vec::new(),
889 }
890 }
891
892 pub(crate) fn persisted_state_with_graph_commit(
893 state: &crate::RuntimeSessionState,
894 graph: GraphCommitDelta,
895 usage_deltas: &[crate::TokenLedgerEntry],
896 ) -> Self {
897 Self {
898 session_id: state.session_id.clone(),
899 expected_head_revision: state.head_revision,
900 session_execution_lease: None,
901 release_session_execution_lease: None,
902 config: persisted_session_config_from_state(state),
903 agent_frames: state.agent_frames.clone(),
904 current_agent_frame_id: state.current_agent_frame_id.clone(),
905 graph,
906 checkpoint: build_checkpoint_from_persisted_state(state),
907 usage_deltas: usage_deltas.to_vec(),
908 turn_commit: None,
909 completed_queue_claims: Vec::new(),
910 completed_turn_input_claims: Vec::new(),
911 interrupted_turn_input_turn_id: None,
912 committed_attachment_ids: Vec::new(),
913 }
914 }
915
916 pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
917 self.turn_commit = Some(turn_commit);
918 self
919 }
920
921 pub fn with_session_execution_lease(mut self, lease: SessionExecutionLeaseFence) -> Self {
922 self.session_execution_lease = Some(lease);
923 self
924 }
925
926 pub fn releasing_session_execution_lease(
927 mut self,
928 completion: SessionExecutionLeaseCompletion,
929 ) -> Self {
930 self.release_session_execution_lease = Some(completion);
931 self
932 }
933
934 pub fn completing_queue_claim(
935 mut self,
936 completed_queue_claim: crate::QueuedWorkCompletion,
937 ) -> Self {
938 self.completed_queue_claims.push(completed_queue_claim);
939 self
940 }
941
942 pub fn completing_queue_claims(
943 mut self,
944 completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
945 ) -> Self {
946 self.completed_queue_claims.extend(completed_queue_claims);
947 self
948 }
949
950 pub fn completing_turn_input_claim(
951 mut self,
952 completed_turn_input_claim: crate::TurnInputCompletion,
953 ) -> Self {
954 self.completed_turn_input_claims
955 .push(completed_turn_input_claim);
956 self
957 }
958
959 pub fn completing_turn_input_claims(
960 mut self,
961 completed_turn_input_claims: impl IntoIterator<Item = crate::TurnInputCompletion>,
962 ) -> Self {
963 self.completed_turn_input_claims
964 .extend(completed_turn_input_claims);
965 self
966 }
967
968 pub fn deferring_interrupted_turn_inputs(mut self, turn_id: impl Into<String>) -> Self {
969 self.interrupted_turn_input_turn_id = Some(turn_id.into());
970 self
971 }
972
973 pub fn with_committed_attachments(
974 mut self,
975 attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
976 ) -> Self {
977 self.committed_attachment_ids = attachment_ids.into_iter().collect();
978 self
979 }
980}
981
982fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
983 match value {
984 serde_json::Value::Object(map) => {
985 let is_message = map.contains_key("role") && map.contains_key("parts");
986 let is_message_part = map.contains_key("kind")
987 && map.contains_key("content")
988 && map.contains_key("prune_state");
989 if is_message || is_message_part {
990 map.remove("id");
991 }
992 for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
993 map.remove(volatile_key);
994 }
995 for child in map.values_mut() {
996 scrub_turn_commit_hash_value(child);
997 }
998 }
999 serde_json::Value::Array(items) => {
1000 for item in items {
1001 scrub_turn_commit_hash_value(item);
1002 }
1003 }
1004 _ => {}
1005 }
1006}
1007
1008fn persisted_session_state_from_head(
1009 head: SessionHead,
1010 checkpoint: Option<HydratedSessionCheckpoint>,
1011) -> crate::RuntimeSessionState {
1012 let mut state = crate::RuntimeSessionState {
1013 session_id: head.session_id,
1014 policy: crate::SessionPolicy::default(),
1015 agent_frames: head.agent_frames,
1016 current_agent_frame_id: head.current_agent_frame_id,
1017 session_graph: head.graph,
1018 turn_index: 0,
1019 token_usage: crate::TokenUsage::default(),
1020 last_prompt_usage: None,
1021 protocol_turn_options: crate::ProtocolTurnOptions::default(),
1022 tool_state_ref: None,
1023 tool_state_generation: None,
1024 tool_state_snapshot: None,
1025 plugin_snapshot_ref: None,
1026 plugin_snapshot_revision: None,
1027 plugin_snapshot: None,
1028 execution_state_ref: None,
1029 execution_state_snapshot: None,
1030 token_ledger: head.token_ledger,
1031 checkpoint_ref: head.checkpoint_ref.clone(),
1032 head_revision: Some(head.head_revision),
1033 graph_replace_required: false,
1034 };
1035 state.policy.model = head.config.model.clone();
1036 state.policy.provider_id = head.config.provider_id.clone();
1037 if let Some(checkpoint) = checkpoint {
1038 state.turn_index = checkpoint.turn_state.turn_index;
1039 state.token_usage = checkpoint.turn_state.token_usage;
1040 state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
1041 state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
1042 state.tool_state_ref = checkpoint.tool_state_ref.clone();
1043 state.tool_state_generation = checkpoint
1044 .tool_state
1045 .as_ref()
1046 .map(|snapshot| snapshot.generation());
1047 state.tool_state_snapshot = checkpoint.tool_state;
1048 state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
1049 state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
1050 state.plugin_snapshot = checkpoint.plugin_snapshot;
1051 state.execution_state_ref = checkpoint.execution_state_ref.clone();
1052 state.execution_state_snapshot = checkpoint.execution_state;
1053 }
1054 state.ensure_agent_frame_initialized();
1055 state
1056}
1057
1058impl Default for SessionHead {
1059 fn default() -> Self {
1060 Self {
1061 session_id: default_root_session_id(),
1062 head_revision: 0,
1063 agent_frames: Vec::new(),
1064 current_agent_frame_id: String::new(),
1065 graph: crate::SessionGraph::default(),
1066 config: crate::PersistedSessionConfig::default(),
1067 checkpoint_ref: None,
1068 token_ledger: Vec::new(),
1069 }
1070 }
1071}
1072
1073impl Default for SessionHeadMeta {
1074 fn default() -> Self {
1075 Self {
1076 schema_version: SESSION_HEAD_META_SCHEMA_VERSION,
1077 session_id: default_root_session_id(),
1078 head_revision: 0,
1079 config: crate::PersistedSessionConfig::default(),
1080 agent_frames: Vec::new(),
1081 current_agent_frame_id: String::new(),
1082 checkpoint_ref: None,
1083 leaf_node_id: None,
1084 graph_node_count: 0,
1085 token_ledger: Vec::new(),
1086 }
1087 }
1088}
1089
1090#[async_trait::async_trait]
1105pub trait RuntimePersistence: AttachmentManifest + Send + Sync {
1106 fn durability_tier(&self) -> crate::DurabilityTier {
1109 crate::DurabilityTier::Inline
1110 }
1111
1112 async fn load_session(
1113 &self,
1114 scope: SessionReadScope,
1115 ) -> Result<Option<PersistedSessionRead>, StoreError>;
1116
1117 async fn load_node(
1118 &self,
1119 node_id: &str,
1120 ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
1121
1122 async fn commit_runtime_state(
1123 &self,
1124 commit: RuntimeCommit,
1125 ) -> Result<RuntimeCommitResult, StoreError>;
1126
1127 async fn enqueue_pending_turn_input(
1133 &self,
1134 _input: crate::PendingTurnInputDraft,
1135 ) -> Result<crate::PendingTurnInput, StoreError> {
1136 Err(StoreError::Backend(
1137 "pending turn input is not supported by this test store".to_string(),
1138 ))
1139 }
1140
1141 async fn list_pending_turn_inputs(
1146 &self,
1147 _session_id: &str,
1148 ) -> Result<Vec<crate::PendingTurnInput>, StoreError> {
1149 Ok(Vec::new())
1150 }
1151
1152 async fn cancel_pending_turn_input(
1154 &self,
1155 session_id: &str,
1156 input_id: &str,
1157 ) -> Result<crate::PendingTurnInputCancelOutcome, StoreError> {
1158 let target = crate::PendingTurnInputCancelTarget::input_id(input_id);
1159 let targets = vec![target];
1160 let mut outcomes = self
1161 .cancel_pending_turn_inputs(session_id, &targets)
1162 .await?;
1163 Ok(outcomes
1164 .pop()
1165 .map(|result| result.outcome)
1166 .unwrap_or(crate::PendingTurnInputCancelOutcome::NotFound))
1167 }
1168
1169 async fn cancel_pending_turn_inputs(
1171 &self,
1172 _session_id: &str,
1173 _targets: &[crate::PendingTurnInputCancelTarget],
1174 ) -> Result<Vec<crate::PendingTurnInputCancelResult>, StoreError> {
1175 Err(StoreError::Backend(
1176 "pending turn input is not supported by this test store".to_string(),
1177 ))
1178 }
1179
1180 async fn cancel_pending_turn_input_suffix(
1182 &self,
1183 _session_id: &str,
1184 anchor: &crate::PendingTurnInputCancelTarget,
1185 ) -> Result<crate::PendingTurnInputSuffixCancelOutcome, StoreError> {
1186 Err(StoreError::Backend(format!(
1187 "pending turn input suffix cancellation is not supported by this test store for anchor `{anchor:?}`"
1188 )))
1189 }
1190
1191 async fn claim_active_turn_inputs(
1193 &self,
1194 session_id: &str,
1195 _session_execution_lease: &SessionExecutionLeaseFence,
1196 _owner: &LeaseOwnerIdentity,
1197 _turn_id: &str,
1198 _checkpoint: crate::CheckpointKind,
1199 _lease_ttl_ms: u64,
1200 _max_inputs: usize,
1201 ) -> Result<Option<crate::TurnInputClaim>, StoreError> {
1202 Err(StoreError::Backend(format!(
1203 "pending turn input is not supported for session `{session_id}` by this test store"
1204 )))
1205 }
1206
1207 async fn claim_next_turn_inputs(
1209 &self,
1210 session_id: &str,
1211 _session_execution_lease: &SessionExecutionLeaseFence,
1212 _owner: &LeaseOwnerIdentity,
1213 _lease_ttl_ms: u64,
1214 _max_inputs: usize,
1215 ) -> Result<Option<crate::TurnInputClaim>, StoreError> {
1216 Err(StoreError::Backend(format!(
1217 "pending turn input is not supported for session `{session_id}` by this test store"
1218 )))
1219 }
1220
1221 async fn abandon_turn_input_claim(
1223 &self,
1224 _claim: &crate::TurnInputClaim,
1225 ) -> Result<(), StoreError> {
1226 Ok(())
1227 }
1228
1229 async fn try_claim_session_execution_lease(
1236 &self,
1237 session_id: &str,
1238 owner: &LeaseOwnerIdentity,
1239 lease_ttl_ms: u64,
1240 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
1241
1242 async fn reclaim_session_execution_lease(
1248 &self,
1249 session_id: &str,
1250 owner: &LeaseOwnerIdentity,
1251 observed_holder: &SessionExecutionLeaseFence,
1252 lease_ttl_ms: u64,
1253 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
1254
1255 async fn renew_session_execution_lease(
1260 &self,
1261 fence: &SessionExecutionLeaseFence,
1262 lease_ttl_ms: u64,
1263 ) -> Result<SessionExecutionLease, StoreError>;
1264
1265 async fn release_session_execution_lease(
1269 &self,
1270 completion: &SessionExecutionLeaseCompletion,
1271 ) -> Result<(), StoreError>;
1272
1273 async fn enqueue_queued_work(
1279 &self,
1280 _batch: crate::QueuedWorkBatchDraft,
1281 ) -> Result<crate::QueuedWorkBatch, StoreError> {
1282 Err(StoreError::Backend(
1283 "queued work is not supported by this test store".to_string(),
1284 ))
1285 }
1286
1287 async fn claim_leading_ready_session_command(
1295 &self,
1296 session_id: &str,
1297 _session_execution_lease: &SessionExecutionLeaseFence,
1298 _owner: &LeaseOwnerIdentity,
1299 _lease_ttl_ms: u64,
1300 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1301 Err(StoreError::Backend(format!(
1302 "queued work is not supported for session `{session_id}` by this test store"
1303 )))
1304 }
1305
1306 async fn claim_ready_queued_work(
1315 &self,
1316 session_id: &str,
1317 _session_execution_lease: &SessionExecutionLeaseFence,
1318 _owner: &LeaseOwnerIdentity,
1319 _boundary: crate::QueuedWorkClaimBoundary,
1320 _lease_ttl_ms: u64,
1321 _max_batches: usize,
1322 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1323 Err(StoreError::Backend(format!(
1324 "queued work is not supported for session `{session_id}` by this test store"
1325 )))
1326 }
1327
1328 async fn claim_ready_queued_work_by_batch_ids(
1337 &self,
1338 session_id: &str,
1339 session_execution_lease: &SessionExecutionLeaseFence,
1340 owner: &LeaseOwnerIdentity,
1341 boundary: crate::QueuedWorkClaimBoundary,
1342 lease_ttl_ms: u64,
1343 batch_ids: &[String],
1344 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1345 if batch_ids.is_empty() {
1346 return Ok(None);
1347 }
1348 let Some(claim) = self
1349 .claim_ready_queued_work(
1350 session_id,
1351 session_execution_lease,
1352 owner,
1353 boundary,
1354 lease_ttl_ms,
1355 batch_ids.len(),
1356 )
1357 .await?
1358 else {
1359 return Ok(None);
1360 };
1361 let claimed_ids = claim
1362 .batches
1363 .iter()
1364 .map(|batch| batch.batch_id.as_str())
1365 .collect::<Vec<_>>();
1366 if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
1367 return Ok(Some(claim));
1368 }
1369 self.abandon_queued_work_claim(&claim).await?;
1370 Ok(None)
1371 }
1372
1373 async fn renew_queued_work_claim(
1378 &self,
1379 claim: &crate::QueuedWorkClaim,
1380 _lease_ttl_ms: u64,
1381 ) -> Result<crate::QueuedWorkClaim, StoreError> {
1382 Err(StoreError::QueuedWorkClaimExpired {
1383 session_id: claim.session_id.clone(),
1384 claim_id: claim.claim_id.clone(),
1385 })
1386 }
1387
1388 async fn abandon_queued_work_claim(
1393 &self,
1394 _claim: &crate::QueuedWorkClaim,
1395 ) -> Result<(), StoreError> {
1396 Ok(())
1397 }
1398
1399 async fn cancel_queued_work_batch(
1409 &self,
1410 _session_id: &str,
1411 _batch_id: &str,
1412 ) -> Result<Option<crate::QueuedWorkBatch>, StoreError> {
1413 Ok(None)
1414 }
1415
1416 async fn list_queued_work(
1420 &self,
1421 _session_id: &str,
1422 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
1423 Ok(Vec::new())
1424 }
1425
1426 async fn list_pending_queued_work(
1431 &self,
1432 session_id: &str,
1433 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
1434 self.list_queued_work(session_id).await
1435 }
1436
1437 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
1438 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
1439
1440 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
1441 async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
1442 async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
1443}
1444
1445fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
1446 persisted_session_state_from_head(
1447 SessionHead {
1448 session_id: read.session_id,
1449 head_revision: read.head_revision,
1450 agent_frames: read.agent_frames,
1451 current_agent_frame_id: read.current_agent_frame_id,
1452 graph: read.graph,
1453 config: read.config,
1454 checkpoint_ref: read.checkpoint_ref,
1455 token_ledger: read.token_ledger,
1456 },
1457 read.checkpoint,
1458 )
1459}
1460
1461pub async fn load_persisted_session_state(
1462 store: &(dyn RuntimePersistence + '_),
1463) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1464 Ok(store
1465 .load_session(SessionReadScope::FullGraph)
1466 .await?
1467 .map(persisted_session_state_from_read))
1468}
1469
1470pub async fn load_persisted_session_state_active_path(
1471 store: &(dyn RuntimePersistence + '_),
1472 leaf_node_id: Option<String>,
1473) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1474 Ok(store
1475 .load_session(SessionReadScope::ActivePath { leaf_node_id })
1476 .await?
1477 .map(persisted_session_state_from_read))
1478}
1479
1480pub async fn refresh_persisted_session_state(
1481 store: &(dyn RuntimePersistence + '_),
1482 state: &mut crate::RuntimeSessionState,
1483) -> Result<(), StoreError> {
1484 if let Some(mut fresh) = load_persisted_session_state(store).await? {
1485 fresh.policy.session_id = state.policy.session_id.clone();
1486 fresh.policy.max_turns = state.policy.max_turns;
1487 *state = fresh;
1488 }
1489 Ok(())
1490}
1491
1492#[cfg(test)]
1493mod tests {
1494 use super::{LeaseOwnerIdentity, LeaseOwnerLiveness};
1495
1496 fn local_liveness(
1497 host_id: &str,
1498 boot_id: &str,
1499 pid: u32,
1500 process_start: &str,
1501 ) -> LeaseOwnerLiveness {
1502 LeaseOwnerLiveness::local_process_for_test(host_id, boot_id, pid, process_start)
1503 }
1504
1505 #[test]
1506 fn lease_owner_identity_requires_same_incarnation() {
1507 let first = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1508 let same = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1509 let next = LeaseOwnerIdentity::opaque("owner", "incarnation-b");
1510
1511 assert!(first.same_incarnation(&same));
1512 assert!(!first.same_incarnation(&next));
1513 }
1514
1515 #[test]
1516 fn local_liveness_only_proves_same_host_boot_dead_processes() {
1517 let holder = local_liveness(
1518 "host-a",
1519 "boot-a",
1520 std::process::id(),
1521 "not-the-current-process-start",
1522 );
1523 let same_host_boot = local_liveness("host-a", "boot-a", std::process::id(), "claimant");
1524 let other_host = local_liveness("host-b", "boot-a", std::process::id(), "claimant");
1525 let other_boot = local_liveness("host-a", "boot-b", std::process::id(), "claimant");
1526
1527 assert!(holder.is_definitely_dead_for_claimant(&same_host_boot));
1528 assert!(!holder.is_definitely_dead_for_claimant(&other_host));
1529 assert!(!holder.is_definitely_dead_for_claimant(&other_boot));
1530 assert!(!holder.is_definitely_dead_for_claimant(&LeaseOwnerLiveness::Opaque));
1531 assert!(!LeaseOwnerLiveness::Opaque.is_definitely_dead_for_claimant(&same_host_boot));
1532 }
1533}