1pub mod queued_work;
4
5fn default_root_session_id() -> String {
6 "root".to_string()
7}
8
9#[cfg(test)]
10mod persisted_state_tests {
11 use super::*;
12
13 #[test]
14 fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
15 let state = persisted_session_state_from_head(
16 SessionHead {
17 session_id: "stored".to_string(),
18 head_revision: 7,
19 agent_frames: Vec::new(),
20 current_agent_frame_id: String::new(),
21 graph: crate::SessionGraph::default(),
22 config: crate::PersistedSessionConfig {
23 provider_id: "stored-provider".to_string(),
24 model: crate::ModelSpec::default(),
25 },
26 checkpoint_ref: None,
27 token_ledger: Vec::new(),
28 },
29 None,
30 );
31
32 assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
33 assert!(
34 state
35 .agent_frames
36 .iter()
37 .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
38 );
39 assert_eq!(state.head_revision, Some(7));
40 }
41}
42
43#[derive(Debug, thiserror::Error)]
44pub enum StoreError {
45 #[error(
46 "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
47 )]
48 SessionBindingMismatch {
49 bound_session_id: String,
50 attempted_session_id: String,
51 },
52 #[error("store does not support read scope {0:?}")]
53 UnsupportedReadScope(SessionReadScope),
54 #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
55 HeadRevisionConflict { expected: Option<u64>, actual: u64 },
56 #[error(
57 "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
58 )]
59 RuntimeTurnCommitConflict { session_id: String, turn_id: String },
60 #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
61 QueuedWorkClaimExpired {
62 session_id: String,
63 claim_id: String,
64 },
65 #[error("session execution lease for session `{session_id}` is missing or expired")]
66 SessionExecutionLeaseExpired { session_id: String },
67 #[error(
68 "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
69 )]
70 UnsupportedRecordSchemaVersion {
71 record_kind: &'static str,
72 actual: u32,
73 expected: u32,
74 },
75 #[error("store backend error: {0}")]
76 Backend(String),
77}
78
79#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
80pub struct SessionMeta {
81 pub session_id: String,
82 pub session_name: String,
83 pub created_at: String,
84 pub model: String,
85 pub cwd: Option<String>,
86 pub relation: crate::SessionRelation,
87}
88
89impl SessionMeta {
90 pub fn parent_session_id(&self) -> Option<&str> {
93 self.relation.parent_session_id()
94 }
95}
96
97#[derive(Clone, Debug)]
99pub struct SessionPickerInfo {
100 pub session_id: String,
101 pub cwd: Option<String>,
102 pub relation: crate::SessionRelation,
103 pub first_user_message: String,
104 pub user_message_count: usize,
105}
106
107impl SessionPickerInfo {
108 pub fn parent_session_id(&self) -> Option<&str> {
109 self.relation.parent_session_id()
110 }
111}
112
113#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
114#[serde(transparent)]
115pub struct BlobRef(pub String);
116
117impl BlobRef {
118 pub fn as_str(&self) -> &str {
119 &self.0
120 }
121}
122
123impl std::fmt::Display for BlobRef {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.write_str(&self.0)
126 }
127}
128
129impl From<String> for BlobRef {
130 fn from(value: String) -> Self {
131 Self(value)
132 }
133}
134
135#[derive(Clone, Debug, Default, PartialEq, Eq)]
136pub struct GcReport {
137 pub root_count: usize,
138 pub retained_blob_count: usize,
139 pub deleted_blob_count: usize,
140}
141
142#[derive(Clone, Debug, Default, PartialEq, Eq)]
146pub struct VacuumReport {
147 pub removed_node_count: usize,
148}
149
150#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
151pub struct SessionCheckpoint {
152 #[serde(default)]
153 pub turn_state: crate::PersistedTurnState,
154 #[serde(default, skip_serializing_if = "Option::is_none")]
155 pub tool_state_ref: Option<BlobRef>,
156 #[serde(default, skip_serializing_if = "Option::is_none")]
157 pub plugin_snapshot_ref: Option<BlobRef>,
158 #[serde(default, skip_serializing_if = "Option::is_none")]
159 pub plugin_snapshot_revision: Option<u64>,
160 #[serde(default, skip_serializing_if = "Option::is_none")]
161 pub execution_state_ref: Option<BlobRef>,
162}
163
164#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
165pub struct HydratedSessionCheckpoint {
166 pub turn_state: crate::PersistedTurnState,
167 pub tool_state_ref: Option<BlobRef>,
168 pub tool_state: Option<crate::ToolState>,
169 pub plugin_snapshot_ref: Option<BlobRef>,
170 pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
171 pub plugin_snapshot_revision: Option<u64>,
172 pub execution_state_ref: Option<BlobRef>,
173 pub execution_state: Option<Vec<u8>>,
174}
175
176#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
177pub struct SessionHead {
178 #[serde(default = "default_root_session_id")]
179 pub session_id: String,
180 #[serde(default)]
181 pub head_revision: u64,
182 #[serde(default)]
183 pub agent_frames: Vec<crate::AgentFrameRecord>,
184 #[serde(default, skip_serializing_if = "String::is_empty")]
185 pub current_agent_frame_id: crate::AgentFrameId,
186 pub graph: crate::SessionGraph,
187 pub config: crate::PersistedSessionConfig,
188 #[serde(default, skip_serializing_if = "Option::is_none")]
189 pub checkpoint_ref: Option<BlobRef>,
190 #[serde(default, skip_serializing_if = "Vec::is_empty")]
191 pub token_ledger: Vec<crate::TokenLedgerEntry>,
192}
193
194#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
195pub struct SessionHeadMeta {
196 #[serde(default = "default_root_session_id")]
197 pub session_id: String,
198 #[serde(default)]
199 pub head_revision: u64,
200 pub config: crate::PersistedSessionConfig,
201 #[serde(default)]
202 pub agent_frames: Vec<crate::AgentFrameRecord>,
203 #[serde(default, skip_serializing_if = "String::is_empty")]
204 pub current_agent_frame_id: crate::AgentFrameId,
205 #[serde(default, skip_serializing_if = "Option::is_none")]
206 pub checkpoint_ref: Option<BlobRef>,
207 #[serde(default, skip_serializing_if = "Option::is_none")]
208 pub leaf_node_id: Option<String>,
209 #[serde(default)]
210 pub graph_node_count: usize,
211 #[serde(default, skip_serializing_if = "Vec::is_empty")]
212 pub token_ledger: Vec<crate::TokenLedgerEntry>,
213}
214
215fn persisted_session_config_from_state(
216 state: &crate::RuntimeSessionState,
217) -> crate::PersistedSessionConfig {
218 crate::PersistedSessionConfig {
219 provider_id: state.policy.recorded_provider_id().to_string(),
220 model: state.policy.model.clone(),
221 }
222}
223
224#[derive(Clone, Debug, PartialEq, Eq)]
225pub enum SessionReadScope {
226 FullGraph,
227 ActivePath { leaf_node_id: Option<String> },
228}
229
230#[derive(Clone, Debug)]
231pub struct PersistedSessionRead {
232 pub session_id: String,
233 pub head_revision: u64,
234 pub config: crate::PersistedSessionConfig,
235 pub agent_frames: Vec<crate::AgentFrameRecord>,
236 pub current_agent_frame_id: crate::AgentFrameId,
237 pub graph: crate::SessionGraph,
238 pub checkpoint_ref: Option<BlobRef>,
239 pub checkpoint: Option<HydratedSessionCheckpoint>,
240 pub token_ledger: Vec<crate::TokenLedgerEntry>,
241}
242
243#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
244pub enum GraphCommitDelta {
245 Unchanged {
246 leaf_node_id: Option<String>,
247 },
248 Append {
249 nodes: Vec<crate::SessionNodeRecord>,
250 leaf_node_id: Option<String>,
251 },
252 ReplaceFull(crate::SessionGraph),
253}
254
255impl GraphCommitDelta {
256 pub fn leaf_node_id(&self) -> Option<&String> {
257 match self {
258 Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
259 leaf_node_id.as_ref()
260 }
261 Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
262 }
263 }
264}
265
266#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
267pub struct RuntimeCommit {
268 pub session_id: String,
269 pub expected_head_revision: Option<u64>,
270 #[serde(default, skip_serializing_if = "Option::is_none")]
271 pub session_execution_lease: Option<SessionExecutionLeaseFence>,
272 #[serde(default, skip_serializing_if = "Option::is_none")]
273 pub release_session_execution_lease: Option<SessionExecutionLeaseCompletion>,
274 pub config: crate::PersistedSessionConfig,
275 pub agent_frames: Vec<crate::AgentFrameRecord>,
276 pub current_agent_frame_id: crate::AgentFrameId,
277 pub graph: GraphCommitDelta,
278 pub checkpoint: HydratedSessionCheckpoint,
279 pub usage_deltas: Vec<crate::TokenLedgerEntry>,
280 pub turn_commit: Option<RuntimeTurnCommitStamp>,
281 pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
282 pub committed_attachment_ids: Vec<crate::AttachmentId>,
290}
291
292#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
293pub struct RuntimeCommitResult {
294 pub head_revision: u64,
295 pub checkpoint_ref: BlobRef,
296 pub manifest: SessionCheckpoint,
297}
298
299#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
300pub struct SessionExecutionLease {
301 pub session_id: String,
302 pub owner_id: String,
303 pub lease_token: String,
304 pub fencing_token: u64,
305 pub claimed_at_epoch_ms: u64,
306 pub expires_at_epoch_ms: u64,
307}
308
309#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
310pub struct SessionExecutionLeaseFence {
311 pub session_id: String,
312 pub owner_id: String,
313 pub lease_token: String,
314 pub fencing_token: u64,
315}
316
317#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
318pub struct SessionExecutionLeaseCompletion {
319 pub session_id: String,
320 pub owner_id: String,
321 pub lease_token: String,
322 pub fencing_token: u64,
323}
324
325impl SessionExecutionLease {
326 pub fn fence(&self) -> SessionExecutionLeaseFence {
327 SessionExecutionLeaseFence {
328 session_id: self.session_id.clone(),
329 owner_id: self.owner_id.clone(),
330 lease_token: self.lease_token.clone(),
331 fencing_token: self.fencing_token,
332 }
333 }
334
335 pub fn completion(&self) -> SessionExecutionLeaseCompletion {
336 SessionExecutionLeaseCompletion {
337 session_id: self.session_id.clone(),
338 owner_id: self.owner_id.clone(),
339 lease_token: self.lease_token.clone(),
340 fencing_token: self.fencing_token,
341 }
342 }
343}
344
345impl SessionExecutionLeaseCompletion {
346 pub fn from_lease(lease: &SessionExecutionLease) -> Self {
347 lease.completion()
348 }
349}
350
351#[derive(Clone, Debug)]
370pub struct AttachmentIntent {
371 pub attachment_id: crate::AttachmentId,
372 pub session_id: String,
373 pub canonical_uri: String,
378 pub intent_at_epoch_ms: u64,
379}
380
381#[derive(Clone, Debug)]
382pub struct AttachmentManifestEntry {
383 pub attachment_id: crate::AttachmentId,
384 pub session_id: String,
385 pub canonical_uri: String,
386 pub intent_at_epoch_ms: u64,
387 pub committed_at_epoch_ms: Option<u64>,
388}
389
390pub trait AttachmentManifest: Send + Sync {
402 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
403
404 fn commit_refs(
411 &self,
412 session_id: &str,
413 attachment_ids: &[crate::AttachmentId],
414 ) -> Result<(), StoreError>;
415
416 fn list_uncommitted(
421 &self,
422 older_than_epoch_ms: u64,
423 ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
424
425 fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
429}
430
431#[macro_export]
436macro_rules! impl_noop_attachment_manifest {
437 ($ty:ty) => {
438 impl $crate::AttachmentManifest for $ty {
439 fn record_intent(
440 &self,
441 _intent: $crate::AttachmentIntent,
442 ) -> ::std::result::Result<(), $crate::StoreError> {
443 Ok(())
444 }
445
446 fn commit_refs(
447 &self,
448 _session_id: &str,
449 _attachment_ids: &[$crate::AttachmentId],
450 ) -> ::std::result::Result<(), $crate::StoreError> {
451 Ok(())
452 }
453
454 fn list_uncommitted(
455 &self,
456 _older_than_epoch_ms: u64,
457 ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
458 {
459 Ok(Vec::new())
460 }
461
462 fn forget(
463 &self,
464 _attachment_id: &$crate::AttachmentId,
465 ) -> ::std::result::Result<(), $crate::StoreError> {
466 Ok(())
467 }
468 }
469 };
470}
471
472pub fn ensure_supported_schema_version(
476 record_kind: &'static str,
477 actual: u32,
478 expected: u32,
479) -> Result<(), StoreError> {
480 if actual == expected {
481 Ok(())
482 } else {
483 Err(StoreError::UnsupportedRecordSchemaVersion {
484 record_kind,
485 actual,
486 expected,
487 })
488 }
489}
490
491#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
492pub struct RuntimeTurnCommitStamp {
493 pub session_id: String,
494 pub turn_id: String,
495 pub turn_commit_hash: String,
496}
497
498impl RuntimeTurnCommitStamp {
499 pub fn new(
500 session_id: impl Into<String>,
501 turn_id: impl Into<String>,
502 turn_commit_hash: impl Into<String>,
503 ) -> Self {
504 Self {
505 session_id: session_id.into(),
506 turn_id: turn_id.into(),
507 turn_commit_hash: turn_commit_hash.into(),
508 }
509 }
510}
511
512fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
513 crate::PersistedTurnState {
514 turn_index: state.turn_index,
515 token_usage: state.token_usage.clone(),
516 last_prompt_usage: state.last_prompt_usage.clone(),
517 protocol_turn_options: state.protocol_turn_options.clone(),
518 }
519}
520
521fn build_checkpoint_from_persisted_state(
522 state: &crate::RuntimeSessionState,
523) -> HydratedSessionCheckpoint {
524 HydratedSessionCheckpoint {
525 turn_state: build_persisted_turn_state(state),
526 tool_state_ref: state.tool_state_ref.clone(),
527 tool_state: state.tool_state_snapshot.clone(),
528 plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
529 plugin_snapshot_revision: state.plugin_snapshot_revision,
530 plugin_snapshot: state.plugin_snapshot.clone(),
531 execution_state_ref: state.execution_state_ref.clone(),
532 execution_state: state.execution_state_snapshot.clone(),
533 }
534}
535
536impl RuntimeCommit {
537 pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
538 let mut semantic_commit = self.clone();
539 semantic_commit.expected_head_revision = None;
540 semantic_commit.session_execution_lease = None;
541 semantic_commit.release_session_execution_lease = None;
542 semantic_commit.turn_commit = None;
543 let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
544 StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
545 })?;
546 scrub_turn_commit_hash_value(&mut semantic_commit);
547 crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
548 StoreError::Backend(format!(
549 "failed to serialize runtime turn commit hash: {err}"
550 ))
551 })
552 }
553
554 pub fn persisted_state(
555 state: &crate::RuntimeSessionState,
556 usage_deltas: &[crate::TokenLedgerEntry],
557 ) -> Self {
558 Self {
559 session_id: state.session_id.clone(),
560 expected_head_revision: state.head_revision,
561 session_execution_lease: None,
562 release_session_execution_lease: None,
563 config: persisted_session_config_from_state(state),
564 agent_frames: state.agent_frames.clone(),
565 current_agent_frame_id: state.current_agent_frame_id.clone(),
566 graph: if state.graph_replace_required || state.head_revision.is_none() {
567 GraphCommitDelta::ReplaceFull(state.session_graph.clone())
568 } else {
569 GraphCommitDelta::Unchanged {
570 leaf_node_id: state.session_graph.leaf_node_id.clone(),
571 }
572 },
573 checkpoint: build_checkpoint_from_persisted_state(state),
574 usage_deltas: usage_deltas.to_vec(),
575 turn_commit: None,
576 completed_queue_claims: Vec::new(),
577 committed_attachment_ids: Vec::new(),
578 }
579 }
580
581 pub(crate) fn persisted_state_with_graph_commit(
582 state: &crate::RuntimeSessionState,
583 graph: GraphCommitDelta,
584 usage_deltas: &[crate::TokenLedgerEntry],
585 ) -> Self {
586 Self {
587 session_id: state.session_id.clone(),
588 expected_head_revision: state.head_revision,
589 session_execution_lease: None,
590 release_session_execution_lease: None,
591 config: persisted_session_config_from_state(state),
592 agent_frames: state.agent_frames.clone(),
593 current_agent_frame_id: state.current_agent_frame_id.clone(),
594 graph,
595 checkpoint: build_checkpoint_from_persisted_state(state),
596 usage_deltas: usage_deltas.to_vec(),
597 turn_commit: None,
598 completed_queue_claims: Vec::new(),
599 committed_attachment_ids: Vec::new(),
600 }
601 }
602
603 pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
604 self.turn_commit = Some(turn_commit);
605 self
606 }
607
608 pub fn with_session_execution_lease(mut self, lease: SessionExecutionLeaseFence) -> Self {
609 self.session_execution_lease = Some(lease);
610 self
611 }
612
613 pub fn releasing_session_execution_lease(
614 mut self,
615 completion: SessionExecutionLeaseCompletion,
616 ) -> Self {
617 self.release_session_execution_lease = Some(completion);
618 self
619 }
620
621 pub fn completing_queue_claim(
622 mut self,
623 completed_queue_claim: crate::QueuedWorkCompletion,
624 ) -> Self {
625 self.completed_queue_claims.push(completed_queue_claim);
626 self
627 }
628
629 pub fn completing_queue_claims(
630 mut self,
631 completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
632 ) -> Self {
633 self.completed_queue_claims.extend(completed_queue_claims);
634 self
635 }
636
637 pub fn with_committed_attachments(
638 mut self,
639 attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
640 ) -> Self {
641 self.committed_attachment_ids = attachment_ids.into_iter().collect();
642 self
643 }
644}
645
646fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
647 match value {
648 serde_json::Value::Object(map) => {
649 let is_message = map.contains_key("role") && map.contains_key("parts");
650 let is_message_part = map.contains_key("kind")
651 && map.contains_key("content")
652 && map.contains_key("prune_state");
653 if is_message || is_message_part {
654 map.remove("id");
655 }
656 for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
657 map.remove(volatile_key);
658 }
659 for child in map.values_mut() {
660 scrub_turn_commit_hash_value(child);
661 }
662 }
663 serde_json::Value::Array(items) => {
664 for item in items {
665 scrub_turn_commit_hash_value(item);
666 }
667 }
668 _ => {}
669 }
670}
671
672fn persisted_session_state_from_head(
673 head: SessionHead,
674 checkpoint: Option<HydratedSessionCheckpoint>,
675) -> crate::RuntimeSessionState {
676 let mut state = crate::RuntimeSessionState {
677 session_id: head.session_id,
678 policy: crate::SessionPolicy::default(),
679 agent_frames: head.agent_frames,
680 current_agent_frame_id: head.current_agent_frame_id,
681 session_graph: head.graph,
682 turn_index: 0,
683 token_usage: crate::TokenUsage::default(),
684 last_prompt_usage: None,
685 protocol_turn_options: crate::ProtocolTurnOptions::default(),
686 tool_state_ref: None,
687 tool_state_generation: None,
688 tool_state_snapshot: None,
689 plugin_snapshot_ref: None,
690 plugin_snapshot_revision: None,
691 plugin_snapshot: None,
692 execution_state_ref: None,
693 execution_state_snapshot: None,
694 token_ledger: head.token_ledger,
695 checkpoint_ref: head.checkpoint_ref.clone(),
696 head_revision: Some(head.head_revision),
697 graph_replace_required: false,
698 };
699 state.policy.model = head.config.model.clone();
700 state.policy.provider_id = head.config.provider_id.clone();
701 if let Some(checkpoint) = checkpoint {
702 state.turn_index = checkpoint.turn_state.turn_index;
703 state.token_usage = checkpoint.turn_state.token_usage;
704 state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
705 state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
706 state.tool_state_ref = checkpoint.tool_state_ref.clone();
707 state.tool_state_generation = checkpoint
708 .tool_state
709 .as_ref()
710 .map(|snapshot| snapshot.generation());
711 state.tool_state_snapshot = checkpoint.tool_state;
712 state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
713 state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
714 state.plugin_snapshot = checkpoint.plugin_snapshot;
715 state.execution_state_ref = checkpoint.execution_state_ref.clone();
716 state.execution_state_snapshot = checkpoint.execution_state;
717 }
718 state.ensure_agent_frame_initialized();
719 state
720}
721
722impl Default for SessionHead {
723 fn default() -> Self {
724 Self {
725 session_id: default_root_session_id(),
726 head_revision: 0,
727 agent_frames: Vec::new(),
728 current_agent_frame_id: String::new(),
729 graph: crate::SessionGraph::default(),
730 config: crate::PersistedSessionConfig::default(),
731 checkpoint_ref: None,
732 token_ledger: Vec::new(),
733 }
734 }
735}
736
737impl Default for SessionHeadMeta {
738 fn default() -> Self {
739 Self {
740 session_id: default_root_session_id(),
741 head_revision: 0,
742 config: crate::PersistedSessionConfig::default(),
743 agent_frames: Vec::new(),
744 current_agent_frame_id: String::new(),
745 checkpoint_ref: None,
746 leaf_node_id: None,
747 graph_node_count: 0,
748 token_ledger: Vec::new(),
749 }
750 }
751}
752
753#[async_trait::async_trait]
768pub trait RuntimePersistence: AttachmentManifest + Send + Sync {
769 fn durability_tier(&self) -> crate::DurabilityTier {
772 crate::DurabilityTier::Inline
773 }
774
775 async fn load_session(
776 &self,
777 scope: SessionReadScope,
778 ) -> Result<Option<PersistedSessionRead>, StoreError>;
779
780 async fn load_node(
781 &self,
782 node_id: &str,
783 ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
784
785 async fn commit_runtime_state(
786 &self,
787 commit: RuntimeCommit,
788 ) -> Result<RuntimeCommitResult, StoreError>;
789
790 async fn try_claim_session_execution_lease(
795 &self,
796 session_id: &str,
797 owner_id: &str,
798 lease_ttl_ms: u64,
799 ) -> Result<Option<SessionExecutionLease>, StoreError>;
800
801 async fn renew_session_execution_lease(
806 &self,
807 fence: &SessionExecutionLeaseFence,
808 lease_ttl_ms: u64,
809 ) -> Result<SessionExecutionLease, StoreError>;
810
811 async fn release_session_execution_lease(
815 &self,
816 completion: &SessionExecutionLeaseCompletion,
817 ) -> Result<(), StoreError>;
818
819 async fn enqueue_queued_work(
825 &self,
826 _batch: crate::QueuedWorkBatchDraft,
827 ) -> Result<crate::QueuedWorkBatch, StoreError> {
828 Err(StoreError::Backend(
829 "queued work is not supported by this test store".to_string(),
830 ))
831 }
832
833 async fn claim_ready_queued_work(
837 &self,
838 session_id: &str,
839 _session_execution_lease: &SessionExecutionLeaseFence,
840 _owner_id: &str,
841 _boundary: crate::QueuedWorkClaimBoundary,
842 _lease_ttl_ms: u64,
843 _max_batches: usize,
844 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
845 Err(StoreError::Backend(format!(
846 "queued work is not supported for session `{session_id}` by this test store"
847 )))
848 }
849
850 async fn claim_ready_queued_work_by_batch_ids(
859 &self,
860 session_id: &str,
861 session_execution_lease: &SessionExecutionLeaseFence,
862 owner_id: &str,
863 boundary: crate::QueuedWorkClaimBoundary,
864 lease_ttl_ms: u64,
865 batch_ids: &[String],
866 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
867 if batch_ids.is_empty() {
868 return Ok(None);
869 }
870 let Some(claim) = self
871 .claim_ready_queued_work(
872 session_id,
873 session_execution_lease,
874 owner_id,
875 boundary,
876 lease_ttl_ms,
877 batch_ids.len(),
878 )
879 .await?
880 else {
881 return Ok(None);
882 };
883 let claimed_ids = claim
884 .batches
885 .iter()
886 .map(|batch| batch.batch_id.as_str())
887 .collect::<Vec<_>>();
888 if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
889 return Ok(Some(claim));
890 }
891 self.abandon_queued_work_claim(&claim).await?;
892 Ok(None)
893 }
894
895 async fn renew_queued_work_claim(
900 &self,
901 claim: &crate::QueuedWorkClaim,
902 _lease_ttl_ms: u64,
903 ) -> Result<crate::QueuedWorkClaim, StoreError> {
904 Err(StoreError::QueuedWorkClaimExpired {
905 session_id: claim.session_id.clone(),
906 claim_id: claim.claim_id.clone(),
907 })
908 }
909
910 async fn abandon_queued_work_claim(
915 &self,
916 _claim: &crate::QueuedWorkClaim,
917 ) -> Result<(), StoreError> {
918 Ok(())
919 }
920
921 async fn cancel_queued_work_batch(
931 &self,
932 _session_id: &str,
933 _batch_id: &str,
934 ) -> Result<Option<crate::QueuedWorkBatch>, StoreError> {
935 Ok(None)
936 }
937
938 async fn list_queued_work(
942 &self,
943 _session_id: &str,
944 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
945 Ok(Vec::new())
946 }
947
948 async fn list_pending_queued_work(
953 &self,
954 session_id: &str,
955 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
956 self.list_queued_work(session_id).await
957 }
958
959 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
960 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
961
962 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
963 async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
964 async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
965}
966
967fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
968 persisted_session_state_from_head(
969 SessionHead {
970 session_id: read.session_id,
971 head_revision: read.head_revision,
972 agent_frames: read.agent_frames,
973 current_agent_frame_id: read.current_agent_frame_id,
974 graph: read.graph,
975 config: read.config,
976 checkpoint_ref: read.checkpoint_ref,
977 token_ledger: read.token_ledger,
978 },
979 read.checkpoint,
980 )
981}
982
983pub async fn load_persisted_session_state(
984 store: &(dyn RuntimePersistence + '_),
985) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
986 Ok(store
987 .load_session(SessionReadScope::FullGraph)
988 .await?
989 .map(persisted_session_state_from_read))
990}
991
992pub async fn load_persisted_session_state_active_path(
993 store: &(dyn RuntimePersistence + '_),
994 leaf_node_id: Option<String>,
995) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
996 Ok(store
997 .load_session(SessionReadScope::ActivePath { leaf_node_id })
998 .await?
999 .map(persisted_session_state_from_read))
1000}
1001
1002pub async fn refresh_persisted_session_state(
1003 store: &(dyn RuntimePersistence + '_),
1004 state: &mut crate::RuntimeSessionState,
1005) -> Result<(), StoreError> {
1006 if let Some(mut fresh) = load_persisted_session_state(store).await? {
1007 fresh.policy.session_id = state.policy.session_id.clone();
1008 fresh.policy.max_turns = state.policy.max_turns;
1009 *state = fresh;
1010 }
1011 Ok(())
1012}