1pub mod queued_work;
4
5fn default_root_session_id() -> String {
6 "root".to_string()
7}
8
9#[cfg(test)]
10mod persisted_state_tests {
11 use super::*;
12
13 #[test]
14 fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
15 let state = persisted_session_state_from_head(
16 SessionHead {
17 session_id: "stored".to_string(),
18 head_revision: 7,
19 agent_frames: Vec::new(),
20 current_agent_frame_id: String::new(),
21 graph: crate::SessionGraph::default(),
22 config: crate::PersistedSessionConfig {
23 provider_id: "stored-provider".to_string(),
24 model: crate::ModelSpec::default(),
25 },
26 checkpoint_ref: None,
27 token_ledger: Vec::new(),
28 },
29 None,
30 );
31
32 assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
33 assert!(
34 state
35 .agent_frames
36 .iter()
37 .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
38 );
39 assert_eq!(state.head_revision, Some(7));
40 }
41}
42
43#[derive(Debug, thiserror::Error)]
44pub enum StoreError {
45 #[error(
46 "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
47 )]
48 SessionBindingMismatch {
49 bound_session_id: String,
50 attempted_session_id: String,
51 },
52 #[error("store does not support read scope {0:?}")]
53 UnsupportedReadScope(SessionReadScope),
54 #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
55 HeadRevisionConflict { expected: Option<u64>, actual: u64 },
56 #[error(
57 "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
58 )]
59 RuntimeTurnCommitConflict { session_id: String, turn_id: String },
60 #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
61 QueuedWorkClaimExpired {
62 session_id: String,
63 claim_id: String,
64 },
65 #[error(
66 "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
67 )]
68 UnsupportedRecordSchemaVersion {
69 record_kind: &'static str,
70 actual: u32,
71 expected: u32,
72 },
73 #[error("store backend error: {0}")]
74 Backend(String),
75}
76
77#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
78pub struct SessionMeta {
79 pub session_id: String,
80 pub session_name: String,
81 pub created_at: String,
82 pub model: String,
83 pub cwd: Option<String>,
84 pub relation: crate::SessionRelation,
85}
86
87impl SessionMeta {
88 pub fn parent_session_id(&self) -> Option<&str> {
91 self.relation.parent_session_id()
92 }
93}
94
95#[derive(Clone, Debug)]
97pub struct SessionPickerInfo {
98 pub session_id: String,
99 pub cwd: Option<String>,
100 pub relation: crate::SessionRelation,
101 pub first_user_message: String,
102 pub user_message_count: usize,
103}
104
105impl SessionPickerInfo {
106 pub fn parent_session_id(&self) -> Option<&str> {
107 self.relation.parent_session_id()
108 }
109}
110
111#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
112#[serde(transparent)]
113pub struct BlobRef(pub String);
114
115impl BlobRef {
116 pub fn as_str(&self) -> &str {
117 &self.0
118 }
119}
120
121impl std::fmt::Display for BlobRef {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 f.write_str(&self.0)
124 }
125}
126
127impl From<String> for BlobRef {
128 fn from(value: String) -> Self {
129 Self(value)
130 }
131}
132
133#[derive(Clone, Debug, Default, PartialEq, Eq)]
134pub struct GcReport {
135 pub root_count: usize,
136 pub retained_blob_count: usize,
137 pub deleted_blob_count: usize,
138}
139
140#[derive(Clone, Debug, Default, PartialEq, Eq)]
144pub struct VacuumReport {
145 pub removed_node_count: usize,
146}
147
148#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
149pub struct SessionCheckpoint {
150 #[serde(default)]
151 pub turn_state: crate::PersistedTurnState,
152 #[serde(default, skip_serializing_if = "Option::is_none")]
153 pub tool_state_ref: Option<BlobRef>,
154 #[serde(default, skip_serializing_if = "Option::is_none")]
155 pub plugin_snapshot_ref: Option<BlobRef>,
156 #[serde(default, skip_serializing_if = "Option::is_none")]
157 pub plugin_snapshot_revision: Option<u64>,
158 #[serde(default, skip_serializing_if = "Option::is_none")]
159 pub execution_state_ref: Option<BlobRef>,
160}
161
162#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
163pub struct HydratedSessionCheckpoint {
164 pub turn_state: crate::PersistedTurnState,
165 pub tool_state_ref: Option<BlobRef>,
166 pub tool_state: Option<crate::ToolState>,
167 pub plugin_snapshot_ref: Option<BlobRef>,
168 pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
169 pub plugin_snapshot_revision: Option<u64>,
170 pub execution_state_ref: Option<BlobRef>,
171 pub execution_state: Option<Vec<u8>>,
172}
173
174#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
175pub struct SessionHead {
176 #[serde(default = "default_root_session_id")]
177 pub session_id: String,
178 #[serde(default)]
179 pub head_revision: u64,
180 #[serde(default)]
181 pub agent_frames: Vec<crate::AgentFrameRecord>,
182 #[serde(default, skip_serializing_if = "String::is_empty")]
183 pub current_agent_frame_id: crate::AgentFrameId,
184 pub graph: crate::SessionGraph,
185 pub config: crate::PersistedSessionConfig,
186 #[serde(default, skip_serializing_if = "Option::is_none")]
187 pub checkpoint_ref: Option<BlobRef>,
188 #[serde(default, skip_serializing_if = "Vec::is_empty")]
189 pub token_ledger: Vec<crate::TokenLedgerEntry>,
190}
191
192#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
193pub struct SessionHeadMeta {
194 #[serde(default = "default_root_session_id")]
195 pub session_id: String,
196 #[serde(default)]
197 pub head_revision: u64,
198 pub config: crate::PersistedSessionConfig,
199 #[serde(default)]
200 pub agent_frames: Vec<crate::AgentFrameRecord>,
201 #[serde(default, skip_serializing_if = "String::is_empty")]
202 pub current_agent_frame_id: crate::AgentFrameId,
203 #[serde(default, skip_serializing_if = "Option::is_none")]
204 pub checkpoint_ref: Option<BlobRef>,
205 #[serde(default, skip_serializing_if = "Option::is_none")]
206 pub leaf_node_id: Option<String>,
207 #[serde(default)]
208 pub graph_node_count: usize,
209 #[serde(default, skip_serializing_if = "Vec::is_empty")]
210 pub token_ledger: Vec<crate::TokenLedgerEntry>,
211}
212
213fn persisted_session_config_from_state(
214 state: &crate::RuntimeSessionState,
215) -> crate::PersistedSessionConfig {
216 crate::PersistedSessionConfig {
217 provider_id: state.policy.recorded_provider_id().to_string(),
218 model: state.policy.model.clone(),
219 }
220}
221
222#[derive(Clone, Debug, PartialEq, Eq)]
223pub enum SessionReadScope {
224 FullGraph,
225 ActivePath { leaf_node_id: Option<String> },
226}
227
228#[derive(Clone, Debug)]
229pub struct PersistedSessionRead {
230 pub session_id: String,
231 pub head_revision: u64,
232 pub config: crate::PersistedSessionConfig,
233 pub agent_frames: Vec<crate::AgentFrameRecord>,
234 pub current_agent_frame_id: crate::AgentFrameId,
235 pub graph: crate::SessionGraph,
236 pub checkpoint_ref: Option<BlobRef>,
237 pub checkpoint: Option<HydratedSessionCheckpoint>,
238 pub token_ledger: Vec<crate::TokenLedgerEntry>,
239}
240
241#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
242pub enum GraphCommitDelta {
243 Unchanged {
244 leaf_node_id: Option<String>,
245 },
246 Append {
247 nodes: Vec<crate::SessionNodeRecord>,
248 leaf_node_id: Option<String>,
249 },
250 ReplaceFull(crate::SessionGraph),
251}
252
253impl GraphCommitDelta {
254 pub fn leaf_node_id(&self) -> Option<&String> {
255 match self {
256 Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
257 leaf_node_id.as_ref()
258 }
259 Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
260 }
261 }
262}
263
264#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
265pub struct RuntimeCommit {
266 pub session_id: String,
267 pub expected_head_revision: Option<u64>,
268 pub config: crate::PersistedSessionConfig,
269 pub agent_frames: Vec<crate::AgentFrameRecord>,
270 pub current_agent_frame_id: crate::AgentFrameId,
271 pub graph: GraphCommitDelta,
272 pub checkpoint: HydratedSessionCheckpoint,
273 pub usage_deltas: Vec<crate::TokenLedgerEntry>,
274 pub turn_commit: Option<RuntimeTurnCommitStamp>,
275 pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
276 pub committed_attachment_ids: Vec<crate::AttachmentId>,
284}
285
286#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
287pub struct RuntimeCommitResult {
288 pub head_revision: u64,
289 pub checkpoint_ref: BlobRef,
290 pub manifest: SessionCheckpoint,
291}
292
293#[derive(Clone, Debug)]
312pub struct AttachmentIntent {
313 pub attachment_id: crate::AttachmentId,
314 pub session_id: String,
315 pub canonical_uri: String,
320 pub intent_at_epoch_ms: u64,
321}
322
323#[derive(Clone, Debug)]
324pub struct AttachmentManifestEntry {
325 pub attachment_id: crate::AttachmentId,
326 pub session_id: String,
327 pub canonical_uri: String,
328 pub intent_at_epoch_ms: u64,
329 pub committed_at_epoch_ms: Option<u64>,
330}
331
332pub trait AttachmentManifest: Send + Sync {
344 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
345
346 fn commit_refs(
353 &self,
354 session_id: &str,
355 attachment_ids: &[crate::AttachmentId],
356 ) -> Result<(), StoreError>;
357
358 fn list_uncommitted(
363 &self,
364 older_than_epoch_ms: u64,
365 ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
366
367 fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
371}
372
373#[macro_export]
378macro_rules! impl_noop_attachment_manifest {
379 ($ty:ty) => {
380 impl $crate::AttachmentManifest for $ty {
381 fn record_intent(
382 &self,
383 _intent: $crate::AttachmentIntent,
384 ) -> ::std::result::Result<(), $crate::StoreError> {
385 Ok(())
386 }
387
388 fn commit_refs(
389 &self,
390 _session_id: &str,
391 _attachment_ids: &[$crate::AttachmentId],
392 ) -> ::std::result::Result<(), $crate::StoreError> {
393 Ok(())
394 }
395
396 fn list_uncommitted(
397 &self,
398 _older_than_epoch_ms: u64,
399 ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
400 {
401 Ok(Vec::new())
402 }
403
404 fn forget(
405 &self,
406 _attachment_id: &$crate::AttachmentId,
407 ) -> ::std::result::Result<(), $crate::StoreError> {
408 Ok(())
409 }
410 }
411 };
412}
413
414pub fn ensure_supported_schema_version(
418 record_kind: &'static str,
419 actual: u32,
420 expected: u32,
421) -> Result<(), StoreError> {
422 if actual == expected {
423 Ok(())
424 } else {
425 Err(StoreError::UnsupportedRecordSchemaVersion {
426 record_kind,
427 actual,
428 expected,
429 })
430 }
431}
432
433#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
434pub struct RuntimeTurnCommitStamp {
435 pub session_id: String,
436 pub turn_id: String,
437 pub turn_commit_hash: String,
438}
439
440impl RuntimeTurnCommitStamp {
441 pub fn new(
442 session_id: impl Into<String>,
443 turn_id: impl Into<String>,
444 turn_commit_hash: impl Into<String>,
445 ) -> Self {
446 Self {
447 session_id: session_id.into(),
448 turn_id: turn_id.into(),
449 turn_commit_hash: turn_commit_hash.into(),
450 }
451 }
452}
453
454fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
455 crate::PersistedTurnState {
456 turn_index: state.turn_index,
457 token_usage: state.token_usage.clone(),
458 last_prompt_usage: state.last_prompt_usage.clone(),
459 protocol_turn_options: state.protocol_turn_options.clone(),
460 }
461}
462
463fn build_checkpoint_from_persisted_state(
464 state: &crate::RuntimeSessionState,
465) -> HydratedSessionCheckpoint {
466 HydratedSessionCheckpoint {
467 turn_state: build_persisted_turn_state(state),
468 tool_state_ref: state.tool_state_ref.clone(),
469 tool_state: state.tool_state_snapshot.clone(),
470 plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
471 plugin_snapshot_revision: state.plugin_snapshot_revision,
472 plugin_snapshot: state.plugin_snapshot.clone(),
473 execution_state_ref: state.execution_state_ref.clone(),
474 execution_state: state.execution_state_snapshot.clone(),
475 }
476}
477
478impl RuntimeCommit {
479 pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
480 let mut semantic_commit = self.clone();
481 semantic_commit.expected_head_revision = None;
482 semantic_commit.turn_commit = None;
483 let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
484 StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
485 })?;
486 scrub_turn_commit_hash_value(&mut semantic_commit);
487 crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
488 StoreError::Backend(format!(
489 "failed to serialize runtime turn commit hash: {err}"
490 ))
491 })
492 }
493
494 pub fn persisted_state(
495 state: &crate::RuntimeSessionState,
496 usage_deltas: &[crate::TokenLedgerEntry],
497 ) -> Self {
498 Self {
499 session_id: state.session_id.clone(),
500 expected_head_revision: state.head_revision,
501 config: persisted_session_config_from_state(state),
502 agent_frames: state.agent_frames.clone(),
503 current_agent_frame_id: state.current_agent_frame_id.clone(),
504 graph: if state.graph_replace_required || state.head_revision.is_none() {
505 GraphCommitDelta::ReplaceFull(state.session_graph.clone())
506 } else {
507 GraphCommitDelta::Unchanged {
508 leaf_node_id: state.session_graph.leaf_node_id.clone(),
509 }
510 },
511 checkpoint: build_checkpoint_from_persisted_state(state),
512 usage_deltas: usage_deltas.to_vec(),
513 turn_commit: None,
514 completed_queue_claims: Vec::new(),
515 committed_attachment_ids: Vec::new(),
516 }
517 }
518
519 pub(crate) fn persisted_state_with_graph_commit(
520 state: &crate::RuntimeSessionState,
521 graph: GraphCommitDelta,
522 usage_deltas: &[crate::TokenLedgerEntry],
523 ) -> Self {
524 Self {
525 session_id: state.session_id.clone(),
526 expected_head_revision: state.head_revision,
527 config: persisted_session_config_from_state(state),
528 agent_frames: state.agent_frames.clone(),
529 current_agent_frame_id: state.current_agent_frame_id.clone(),
530 graph,
531 checkpoint: build_checkpoint_from_persisted_state(state),
532 usage_deltas: usage_deltas.to_vec(),
533 turn_commit: None,
534 completed_queue_claims: Vec::new(),
535 committed_attachment_ids: Vec::new(),
536 }
537 }
538
539 pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
540 self.turn_commit = Some(turn_commit);
541 self
542 }
543
544 pub fn completing_queue_claim(
545 mut self,
546 completed_queue_claim: crate::QueuedWorkCompletion,
547 ) -> Self {
548 self.completed_queue_claims.push(completed_queue_claim);
549 self
550 }
551
552 pub fn completing_queue_claims(
553 mut self,
554 completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
555 ) -> Self {
556 self.completed_queue_claims.extend(completed_queue_claims);
557 self
558 }
559
560 pub fn with_committed_attachments(
561 mut self,
562 attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
563 ) -> Self {
564 self.committed_attachment_ids = attachment_ids.into_iter().collect();
565 self
566 }
567}
568
569fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
570 match value {
571 serde_json::Value::Object(map) => {
572 let is_message = map.contains_key("role") && map.contains_key("parts");
573 let is_message_part = map.contains_key("kind")
574 && map.contains_key("content")
575 && map.contains_key("prune_state");
576 if is_message || is_message_part {
577 map.remove("id");
578 }
579 for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
580 map.remove(volatile_key);
581 }
582 for child in map.values_mut() {
583 scrub_turn_commit_hash_value(child);
584 }
585 }
586 serde_json::Value::Array(items) => {
587 for item in items {
588 scrub_turn_commit_hash_value(item);
589 }
590 }
591 _ => {}
592 }
593}
594
595fn persisted_session_state_from_head(
596 head: SessionHead,
597 checkpoint: Option<HydratedSessionCheckpoint>,
598) -> crate::RuntimeSessionState {
599 let mut state = crate::RuntimeSessionState {
600 session_id: head.session_id,
601 policy: crate::SessionPolicy::default(),
602 agent_frames: head.agent_frames,
603 current_agent_frame_id: head.current_agent_frame_id,
604 session_graph: head.graph,
605 turn_index: 0,
606 token_usage: crate::TokenUsage::default(),
607 last_prompt_usage: None,
608 protocol_turn_options: crate::ProtocolTurnOptions::default(),
609 tool_state_ref: None,
610 tool_state_generation: None,
611 tool_state_snapshot: None,
612 plugin_snapshot_ref: None,
613 plugin_snapshot_revision: None,
614 plugin_snapshot: None,
615 execution_state_ref: None,
616 execution_state_snapshot: None,
617 token_ledger: head.token_ledger,
618 checkpoint_ref: head.checkpoint_ref.clone(),
619 head_revision: Some(head.head_revision),
620 graph_replace_required: false,
621 };
622 state.policy.model = head.config.model.clone();
623 state.policy.provider_id = head.config.provider_id.clone();
624 if let Some(checkpoint) = checkpoint {
625 state.turn_index = checkpoint.turn_state.turn_index;
626 state.token_usage = checkpoint.turn_state.token_usage;
627 state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
628 state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
629 state.tool_state_ref = checkpoint.tool_state_ref.clone();
630 state.tool_state_generation = checkpoint
631 .tool_state
632 .as_ref()
633 .map(|snapshot| snapshot.generation());
634 state.tool_state_snapshot = checkpoint.tool_state;
635 state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
636 state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
637 state.plugin_snapshot = checkpoint.plugin_snapshot;
638 state.execution_state_ref = checkpoint.execution_state_ref.clone();
639 state.execution_state_snapshot = checkpoint.execution_state;
640 }
641 state.ensure_agent_frame_initialized();
642 state
643}
644
645impl Default for SessionHead {
646 fn default() -> Self {
647 Self {
648 session_id: default_root_session_id(),
649 head_revision: 0,
650 agent_frames: Vec::new(),
651 current_agent_frame_id: String::new(),
652 graph: crate::SessionGraph::default(),
653 config: crate::PersistedSessionConfig::default(),
654 checkpoint_ref: None,
655 token_ledger: Vec::new(),
656 }
657 }
658}
659
660impl Default for SessionHeadMeta {
661 fn default() -> Self {
662 Self {
663 session_id: default_root_session_id(),
664 head_revision: 0,
665 config: crate::PersistedSessionConfig::default(),
666 agent_frames: Vec::new(),
667 current_agent_frame_id: String::new(),
668 checkpoint_ref: None,
669 leaf_node_id: None,
670 graph_node_count: 0,
671 token_ledger: Vec::new(),
672 }
673 }
674}
675
676#[async_trait::async_trait]
691pub trait RuntimePersistence: AttachmentManifest + Send + Sync {
692 fn durability_tier(&self) -> crate::DurabilityTier {
695 crate::DurabilityTier::Inline
696 }
697
698 async fn load_session(
699 &self,
700 scope: SessionReadScope,
701 ) -> Result<Option<PersistedSessionRead>, StoreError>;
702
703 async fn load_node(
704 &self,
705 node_id: &str,
706 ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
707
708 async fn commit_runtime_state(
709 &self,
710 commit: RuntimeCommit,
711 ) -> Result<RuntimeCommitResult, StoreError>;
712
713 async fn enqueue_queued_work(
719 &self,
720 _batch: crate::QueuedWorkBatchDraft,
721 ) -> Result<crate::QueuedWorkBatch, StoreError> {
722 Err(StoreError::Backend(
723 "queued work is not supported by this test store".to_string(),
724 ))
725 }
726
727 async fn claim_ready_queued_work(
731 &self,
732 session_id: &str,
733 _owner_id: &str,
734 _boundary: crate::QueuedWorkClaimBoundary,
735 _lease_ttl_ms: u64,
736 _max_batches: usize,
737 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
738 Err(StoreError::Backend(format!(
739 "queued work is not supported for session `{session_id}` by this test store"
740 )))
741 }
742
743 async fn claim_ready_queued_work_by_batch_ids(
752 &self,
753 session_id: &str,
754 owner_id: &str,
755 boundary: crate::QueuedWorkClaimBoundary,
756 lease_ttl_ms: u64,
757 batch_ids: &[String],
758 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
759 if batch_ids.is_empty() {
760 return Ok(None);
761 }
762 let Some(claim) = self
763 .claim_ready_queued_work(
764 session_id,
765 owner_id,
766 boundary,
767 lease_ttl_ms,
768 batch_ids.len(),
769 )
770 .await?
771 else {
772 return Ok(None);
773 };
774 let claimed_ids = claim
775 .batches
776 .iter()
777 .map(|batch| batch.batch_id.as_str())
778 .collect::<Vec<_>>();
779 if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
780 return Ok(Some(claim));
781 }
782 self.abandon_queued_work_claim(&claim).await?;
783 Ok(None)
784 }
785
786 async fn renew_queued_work_claim(
791 &self,
792 claim: &crate::QueuedWorkClaim,
793 _lease_ttl_ms: u64,
794 ) -> Result<crate::QueuedWorkClaim, StoreError> {
795 Err(StoreError::QueuedWorkClaimExpired {
796 session_id: claim.session_id.clone(),
797 claim_id: claim.claim_id.clone(),
798 })
799 }
800
801 async fn abandon_queued_work_claim(
806 &self,
807 _claim: &crate::QueuedWorkClaim,
808 ) -> Result<(), StoreError> {
809 Ok(())
810 }
811
812 async fn cancel_queued_work_batch(
822 &self,
823 _session_id: &str,
824 _batch_id: &str,
825 ) -> Result<Option<crate::QueuedWorkBatch>, StoreError> {
826 Ok(None)
827 }
828
829 async fn list_queued_work(
833 &self,
834 _session_id: &str,
835 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
836 Ok(Vec::new())
837 }
838
839 async fn list_pending_queued_work(
844 &self,
845 session_id: &str,
846 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
847 self.list_queued_work(session_id).await
848 }
849
850 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
851 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
852
853 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
854 async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
855 async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
856}
857
858fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
859 persisted_session_state_from_head(
860 SessionHead {
861 session_id: read.session_id,
862 head_revision: read.head_revision,
863 agent_frames: read.agent_frames,
864 current_agent_frame_id: read.current_agent_frame_id,
865 graph: read.graph,
866 config: read.config,
867 checkpoint_ref: read.checkpoint_ref,
868 token_ledger: read.token_ledger,
869 },
870 read.checkpoint,
871 )
872}
873
874pub async fn load_persisted_session_state(
875 store: &(dyn RuntimePersistence + '_),
876) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
877 Ok(store
878 .load_session(SessionReadScope::FullGraph)
879 .await?
880 .map(persisted_session_state_from_read))
881}
882
883pub async fn load_persisted_session_state_active_path(
884 store: &(dyn RuntimePersistence + '_),
885 leaf_node_id: Option<String>,
886) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
887 Ok(store
888 .load_session(SessionReadScope::ActivePath { leaf_node_id })
889 .await?
890 .map(persisted_session_state_from_read))
891}
892
893pub async fn refresh_persisted_session_state(
894 store: &(dyn RuntimePersistence + '_),
895 state: &mut crate::RuntimeSessionState,
896) -> Result<(), StoreError> {
897 if let Some(mut fresh) = load_persisted_session_state(store).await? {
898 fresh.policy.session_id = state.policy.session_id.clone();
899 fresh.policy.max_turns = state.policy.max_turns;
900 *state = fresh;
901 }
902 Ok(())
903}