1fn default_root_session_id() -> String {
2 "root".to_string()
3}
4
5#[cfg(test)]
6mod persisted_state_tests {
7 use super::*;
8
9 #[test]
10 fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
11 let state = persisted_session_state_from_head(
12 SessionHead {
13 session_id: "stored".to_string(),
14 head_revision: 7,
15 agent_frames: Vec::new(),
16 current_agent_frame_id: String::new(),
17 graph: crate::SessionGraph::default(),
18 config: crate::PersistedSessionConfig {
19 provider_id: "stored-provider".to_string(),
20 model: crate::ModelSpec::default(),
21 },
22 checkpoint_ref: None,
23 token_ledger: Vec::new(),
24 },
25 None,
26 );
27
28 assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
29 assert!(
30 state
31 .agent_frames
32 .iter()
33 .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
34 );
35 assert_eq!(state.head_revision, Some(7));
36 }
37}
38
39#[derive(Debug, thiserror::Error)]
40pub enum StoreError {
41 #[error(
42 "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
43 )]
44 SessionBindingMismatch {
45 bound_session_id: String,
46 attempted_session_id: String,
47 },
48 #[error("store does not support read scope {0:?}")]
49 UnsupportedReadScope(SessionReadScope),
50 #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
51 HeadRevisionConflict { expected: Option<u64>, actual: u64 },
52 #[error(
53 "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
54 )]
55 RuntimeTurnCommitConflict { session_id: String, turn_id: String },
56 #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
57 QueuedWorkClaimExpired {
58 session_id: String,
59 claim_id: String,
60 },
61 #[error(
62 "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
63 )]
64 UnsupportedRecordSchemaVersion {
65 record_kind: &'static str,
66 actual: u32,
67 expected: u32,
68 },
69 #[error("store backend error: {0}")]
70 Backend(String),
71}
72
73#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
74pub struct SessionMeta {
75 pub session_id: String,
76 pub session_name: String,
77 pub created_at: String,
78 pub model: String,
79 pub cwd: Option<String>,
80 pub relation: crate::SessionRelation,
81}
82
83impl SessionMeta {
84 pub fn parent_session_id(&self) -> Option<&str> {
87 self.relation.parent_session_id()
88 }
89}
90
91#[derive(Clone, Debug)]
93pub struct SessionPickerInfo {
94 pub session_id: String,
95 pub cwd: Option<String>,
96 pub relation: crate::SessionRelation,
97 pub first_user_message: String,
98 pub user_message_count: usize,
99}
100
101impl SessionPickerInfo {
102 pub fn parent_session_id(&self) -> Option<&str> {
103 self.relation.parent_session_id()
104 }
105}
106
107#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
108#[serde(transparent)]
109pub struct BlobRef(pub String);
110
111impl BlobRef {
112 pub fn as_str(&self) -> &str {
113 &self.0
114 }
115}
116
117impl std::fmt::Display for BlobRef {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 f.write_str(&self.0)
120 }
121}
122
123impl From<String> for BlobRef {
124 fn from(value: String) -> Self {
125 Self(value)
126 }
127}
128
129#[derive(Clone, Debug, Default, PartialEq, Eq)]
130pub struct GcReport {
131 pub root_count: usize,
132 pub retained_blob_count: usize,
133 pub deleted_blob_count: usize,
134}
135
136#[derive(Clone, Debug, Default, PartialEq, Eq)]
140pub struct VacuumReport {
141 pub removed_node_count: usize,
142}
143
144#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
145pub struct SessionCheckpoint {
146 #[serde(default)]
147 pub turn_state: crate::PersistedTurnState,
148 #[serde(default, skip_serializing_if = "Option::is_none")]
149 pub tool_state_ref: Option<BlobRef>,
150 #[serde(default, skip_serializing_if = "Option::is_none")]
151 pub plugin_snapshot_ref: Option<BlobRef>,
152 #[serde(default, skip_serializing_if = "Option::is_none")]
153 pub plugin_snapshot_revision: Option<u64>,
154 #[serde(default, skip_serializing_if = "Option::is_none")]
155 pub execution_state_ref: Option<BlobRef>,
156}
157
158#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
159pub struct HydratedSessionCheckpoint {
160 pub turn_state: crate::PersistedTurnState,
161 pub tool_state_ref: Option<BlobRef>,
162 pub tool_state: Option<crate::ToolState>,
163 pub plugin_snapshot_ref: Option<BlobRef>,
164 pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
165 pub plugin_snapshot_revision: Option<u64>,
166 pub execution_state_ref: Option<BlobRef>,
167 pub execution_state: Option<Vec<u8>>,
168}
169
170#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
171pub struct SessionHead {
172 #[serde(default = "default_root_session_id")]
173 pub session_id: String,
174 #[serde(default)]
175 pub head_revision: u64,
176 #[serde(default)]
177 pub agent_frames: Vec<crate::AgentFrameRecord>,
178 #[serde(default, skip_serializing_if = "String::is_empty")]
179 pub current_agent_frame_id: crate::AgentFrameId,
180 pub graph: crate::SessionGraph,
181 pub config: crate::PersistedSessionConfig,
182 #[serde(default, skip_serializing_if = "Option::is_none")]
183 pub checkpoint_ref: Option<BlobRef>,
184 #[serde(default, skip_serializing_if = "Vec::is_empty")]
185 pub token_ledger: Vec<crate::TokenLedgerEntry>,
186}
187
188#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
189pub struct SessionHeadMeta {
190 #[serde(default = "default_root_session_id")]
191 pub session_id: String,
192 #[serde(default)]
193 pub head_revision: u64,
194 pub config: crate::PersistedSessionConfig,
195 #[serde(default)]
196 pub agent_frames: Vec<crate::AgentFrameRecord>,
197 #[serde(default, skip_serializing_if = "String::is_empty")]
198 pub current_agent_frame_id: crate::AgentFrameId,
199 #[serde(default, skip_serializing_if = "Option::is_none")]
200 pub checkpoint_ref: Option<BlobRef>,
201 #[serde(default, skip_serializing_if = "Option::is_none")]
202 pub leaf_node_id: Option<String>,
203 #[serde(default)]
204 pub graph_node_count: usize,
205 #[serde(default, skip_serializing_if = "Vec::is_empty")]
206 pub token_ledger: Vec<crate::TokenLedgerEntry>,
207}
208
209fn persisted_session_config_from_state(
210 state: &crate::RuntimeSessionState,
211) -> crate::PersistedSessionConfig {
212 crate::PersistedSessionConfig {
213 provider_id: state.policy.recorded_provider_id().to_string(),
214 model: state.policy.model.clone(),
215 }
216}
217
218#[derive(Clone, Debug, PartialEq, Eq)]
219pub enum SessionReadScope {
220 FullGraph,
221 ActivePath { leaf_node_id: Option<String> },
222}
223
224#[derive(Clone, Debug)]
225pub struct PersistedSessionRead {
226 pub session_id: String,
227 pub head_revision: u64,
228 pub config: crate::PersistedSessionConfig,
229 pub agent_frames: Vec<crate::AgentFrameRecord>,
230 pub current_agent_frame_id: crate::AgentFrameId,
231 pub graph: crate::SessionGraph,
232 pub checkpoint_ref: Option<BlobRef>,
233 pub checkpoint: Option<HydratedSessionCheckpoint>,
234 pub token_ledger: Vec<crate::TokenLedgerEntry>,
235}
236
237#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
238pub enum GraphCommitDelta {
239 Unchanged {
240 leaf_node_id: Option<String>,
241 },
242 Append {
243 nodes: Vec<crate::SessionNodeRecord>,
244 leaf_node_id: Option<String>,
245 },
246 ReplaceFull(crate::SessionGraph),
247}
248
249impl GraphCommitDelta {
250 pub fn leaf_node_id(&self) -> Option<&String> {
251 match self {
252 Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
253 leaf_node_id.as_ref()
254 }
255 Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
256 }
257 }
258}
259
260#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
261pub struct RuntimeCommit {
262 pub session_id: String,
263 pub expected_head_revision: Option<u64>,
264 pub config: crate::PersistedSessionConfig,
265 pub agent_frames: Vec<crate::AgentFrameRecord>,
266 pub current_agent_frame_id: crate::AgentFrameId,
267 pub graph: GraphCommitDelta,
268 pub checkpoint: HydratedSessionCheckpoint,
269 pub usage_deltas: Vec<crate::TokenLedgerEntry>,
270 pub turn_commit: Option<RuntimeTurnCommitStamp>,
271 pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
272 pub committed_attachment_ids: Vec<crate::AttachmentId>,
280}
281
282#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
283pub struct RuntimeCommitResult {
284 pub head_revision: u64,
285 pub checkpoint_ref: BlobRef,
286 pub manifest: SessionCheckpoint,
287}
288
289#[derive(Clone, Debug)]
308pub struct AttachmentIntent {
309 pub attachment_id: crate::AttachmentId,
310 pub session_id: String,
311 pub canonical_uri: String,
316 pub intent_at_epoch_ms: u64,
317}
318
319#[derive(Clone, Debug)]
320pub struct AttachmentManifestEntry {
321 pub attachment_id: crate::AttachmentId,
322 pub session_id: String,
323 pub canonical_uri: String,
324 pub intent_at_epoch_ms: u64,
325 pub committed_at_epoch_ms: Option<u64>,
326}
327
328pub trait AttachmentManifest: Send + Sync {
340 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
341
342 fn commit_refs(
349 &self,
350 session_id: &str,
351 attachment_ids: &[crate::AttachmentId],
352 ) -> Result<(), StoreError>;
353
354 fn list_uncommitted(
359 &self,
360 older_than_epoch_ms: u64,
361 ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
362
363 fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
367}
368
369#[macro_export]
374macro_rules! impl_noop_attachment_manifest {
375 ($ty:ty) => {
376 impl $crate::AttachmentManifest for $ty {
377 fn record_intent(
378 &self,
379 _intent: $crate::AttachmentIntent,
380 ) -> ::std::result::Result<(), $crate::StoreError> {
381 Ok(())
382 }
383
384 fn commit_refs(
385 &self,
386 _session_id: &str,
387 _attachment_ids: &[$crate::AttachmentId],
388 ) -> ::std::result::Result<(), $crate::StoreError> {
389 Ok(())
390 }
391
392 fn list_uncommitted(
393 &self,
394 _older_than_epoch_ms: u64,
395 ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
396 {
397 Ok(Vec::new())
398 }
399
400 fn forget(
401 &self,
402 _attachment_id: &$crate::AttachmentId,
403 ) -> ::std::result::Result<(), $crate::StoreError> {
404 Ok(())
405 }
406 }
407 };
408}
409
410#[macro_export]
411macro_rules! impl_unsupported_queued_work_methods {
412 () => {
413 fn enqueue_queued_work<'life0, 'async_trait>(
414 &'life0 self,
415 _batch: $crate::runtime::QueuedWorkBatchDraft,
416 ) -> ::core::pin::Pin<
417 Box<
418 dyn ::core::future::Future<
419 Output = ::std::result::Result<
420 $crate::runtime::QueuedWorkBatch,
421 $crate::store::StoreError,
422 >,
423 > + Send
424 + 'async_trait,
425 >,
426 >
427 where
428 'life0: 'async_trait,
429 Self: 'async_trait,
430 {
431 Box::pin(async move {
432 Err($crate::store::StoreError::Backend(
433 "queued work is not supported by this test store".to_string(),
434 ))
435 })
436 }
437
438 fn claim_ready_queued_work<'life0, 'life1, 'life2, 'async_trait>(
439 &'life0 self,
440 session_id: &'life1 str,
441 _owner_id: &'life2 str,
442 _boundary: $crate::runtime::QueuedWorkClaimBoundary,
443 _lease_ttl_ms: u64,
444 _max_batches: usize,
445 ) -> ::core::pin::Pin<
446 Box<
447 dyn ::core::future::Future<
448 Output = ::std::result::Result<
449 Option<$crate::runtime::QueuedWorkClaim>,
450 $crate::store::StoreError,
451 >,
452 > + Send
453 + 'async_trait,
454 >,
455 >
456 where
457 'life0: 'async_trait,
458 'life1: 'async_trait,
459 'life2: 'async_trait,
460 Self: 'async_trait,
461 {
462 Box::pin(async move {
463 Err($crate::store::StoreError::Backend(format!(
464 "queued work is not supported for session `{session_id}` by this test store"
465 )))
466 })
467 }
468
469 fn renew_queued_work_claim<'life0, 'life1, 'async_trait>(
470 &'life0 self,
471 claim: &'life1 $crate::runtime::QueuedWorkClaim,
472 _lease_ttl_ms: u64,
473 ) -> ::core::pin::Pin<
474 Box<
475 dyn ::core::future::Future<
476 Output = ::std::result::Result<
477 $crate::runtime::QueuedWorkClaim,
478 $crate::store::StoreError,
479 >,
480 > + Send
481 + 'async_trait,
482 >,
483 >
484 where
485 'life0: 'async_trait,
486 'life1: 'async_trait,
487 Self: 'async_trait,
488 {
489 Box::pin(async move {
490 Err($crate::store::StoreError::QueuedWorkClaimExpired {
491 session_id: claim.session_id.clone(),
492 claim_id: claim.claim_id.clone(),
493 })
494 })
495 }
496
497 fn abandon_queued_work_claim<'life0, 'life1, 'async_trait>(
498 &'life0 self,
499 _claim: &'life1 $crate::runtime::QueuedWorkClaim,
500 ) -> ::core::pin::Pin<
501 Box<
502 dyn ::core::future::Future<
503 Output = ::std::result::Result<(), $crate::store::StoreError>,
504 > + Send
505 + 'async_trait,
506 >,
507 >
508 where
509 'life0: 'async_trait,
510 'life1: 'async_trait,
511 Self: 'async_trait,
512 {
513 Box::pin(async move { Ok(()) })
514 }
515
516 fn cancel_queued_work_batch<'life0, 'life1, 'life2, 'async_trait>(
517 &'life0 self,
518 _session_id: &'life1 str,
519 _batch_id: &'life2 str,
520 ) -> ::core::pin::Pin<
521 Box<
522 dyn ::core::future::Future<
523 Output = ::std::result::Result<
524 Option<$crate::runtime::QueuedWorkBatch>,
525 $crate::store::StoreError,
526 >,
527 > + Send
528 + 'async_trait,
529 >,
530 >
531 where
532 'life0: 'async_trait,
533 'life1: 'async_trait,
534 'life2: 'async_trait,
535 Self: 'async_trait,
536 {
537 Box::pin(async move { Ok(None) })
538 }
539
540 fn list_queued_work<'life0, 'life1, 'async_trait>(
541 &'life0 self,
542 _session_id: &'life1 str,
543 ) -> ::core::pin::Pin<
544 Box<
545 dyn ::core::future::Future<
546 Output = ::std::result::Result<
547 Vec<$crate::runtime::QueuedWorkBatch>,
548 $crate::store::StoreError,
549 >,
550 > + Send
551 + 'async_trait,
552 >,
553 >
554 where
555 'life0: 'async_trait,
556 'life1: 'async_trait,
557 Self: 'async_trait,
558 {
559 Box::pin(async move { Ok(Vec::new()) })
560 }
561 };
562}
563
564pub fn ensure_supported_schema_version(
568 record_kind: &'static str,
569 actual: u32,
570 expected: u32,
571) -> Result<(), StoreError> {
572 if actual == expected {
573 Ok(())
574 } else {
575 Err(StoreError::UnsupportedRecordSchemaVersion {
576 record_kind,
577 actual,
578 expected,
579 })
580 }
581}
582
583#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
584pub struct RuntimeTurnCommitStamp {
585 pub session_id: String,
586 pub turn_id: String,
587 pub turn_commit_hash: String,
588}
589
590impl RuntimeTurnCommitStamp {
591 pub fn new(
592 session_id: impl Into<String>,
593 turn_id: impl Into<String>,
594 turn_commit_hash: impl Into<String>,
595 ) -> Self {
596 Self {
597 session_id: session_id.into(),
598 turn_id: turn_id.into(),
599 turn_commit_hash: turn_commit_hash.into(),
600 }
601 }
602}
603
604fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
605 crate::PersistedTurnState {
606 turn_index: state.turn_index,
607 token_usage: state.token_usage.clone(),
608 last_prompt_usage: state.last_prompt_usage.clone(),
609 protocol_turn_options: state.protocol_turn_options.clone(),
610 }
611}
612
613fn build_checkpoint_from_persisted_state(
614 state: &crate::RuntimeSessionState,
615) -> HydratedSessionCheckpoint {
616 HydratedSessionCheckpoint {
617 turn_state: build_persisted_turn_state(state),
618 tool_state_ref: state.tool_state_ref.clone(),
619 tool_state: state.tool_state_snapshot.clone(),
620 plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
621 plugin_snapshot_revision: state.plugin_snapshot_revision,
622 plugin_snapshot: state.plugin_snapshot.clone(),
623 execution_state_ref: state.execution_state_ref.clone(),
624 execution_state: state.execution_state_snapshot.clone(),
625 }
626}
627
628impl RuntimeCommit {
629 pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
630 let mut semantic_commit = self.clone();
631 semantic_commit.expected_head_revision = None;
632 semantic_commit.turn_commit = None;
633 let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
634 StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
635 })?;
636 scrub_turn_commit_hash_value(&mut semantic_commit);
637 crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
638 StoreError::Backend(format!(
639 "failed to serialize runtime turn commit hash: {err}"
640 ))
641 })
642 }
643
644 pub fn persisted_state(
645 state: &crate::RuntimeSessionState,
646 usage_deltas: &[crate::TokenLedgerEntry],
647 ) -> Self {
648 Self {
649 session_id: state.session_id.clone(),
650 expected_head_revision: state.head_revision,
651 config: persisted_session_config_from_state(state),
652 agent_frames: state.agent_frames.clone(),
653 current_agent_frame_id: state.current_agent_frame_id.clone(),
654 graph: if state.graph_replace_required || state.head_revision.is_none() {
655 GraphCommitDelta::ReplaceFull(state.session_graph.clone())
656 } else {
657 GraphCommitDelta::Unchanged {
658 leaf_node_id: state.session_graph.leaf_node_id.clone(),
659 }
660 },
661 checkpoint: build_checkpoint_from_persisted_state(state),
662 usage_deltas: usage_deltas.to_vec(),
663 turn_commit: None,
664 completed_queue_claims: Vec::new(),
665 committed_attachment_ids: Vec::new(),
666 }
667 }
668
669 pub(crate) fn persisted_state_with_graph_commit(
670 state: &crate::RuntimeSessionState,
671 graph: GraphCommitDelta,
672 usage_deltas: &[crate::TokenLedgerEntry],
673 ) -> Self {
674 Self {
675 session_id: state.session_id.clone(),
676 expected_head_revision: state.head_revision,
677 config: persisted_session_config_from_state(state),
678 agent_frames: state.agent_frames.clone(),
679 current_agent_frame_id: state.current_agent_frame_id.clone(),
680 graph,
681 checkpoint: build_checkpoint_from_persisted_state(state),
682 usage_deltas: usage_deltas.to_vec(),
683 turn_commit: None,
684 completed_queue_claims: Vec::new(),
685 committed_attachment_ids: Vec::new(),
686 }
687 }
688
689 pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
690 self.turn_commit = Some(turn_commit);
691 self
692 }
693
694 pub fn completing_queue_claim(
695 mut self,
696 completed_queue_claim: crate::QueuedWorkCompletion,
697 ) -> Self {
698 self.completed_queue_claims.push(completed_queue_claim);
699 self
700 }
701
702 pub fn completing_queue_claims(
703 mut self,
704 completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
705 ) -> Self {
706 self.completed_queue_claims.extend(completed_queue_claims);
707 self
708 }
709
710 pub fn with_committed_attachments(
711 mut self,
712 attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
713 ) -> Self {
714 self.committed_attachment_ids = attachment_ids.into_iter().collect();
715 self
716 }
717}
718
719fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
720 match value {
721 serde_json::Value::Object(map) => {
722 let is_message = map.contains_key("role") && map.contains_key("parts");
723 let is_message_part = map.contains_key("kind")
724 && map.contains_key("content")
725 && map.contains_key("prune_state");
726 if is_message || is_message_part {
727 map.remove("id");
728 }
729 for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
730 map.remove(volatile_key);
731 }
732 for child in map.values_mut() {
733 scrub_turn_commit_hash_value(child);
734 }
735 }
736 serde_json::Value::Array(items) => {
737 for item in items {
738 scrub_turn_commit_hash_value(item);
739 }
740 }
741 _ => {}
742 }
743}
744
745fn persisted_session_state_from_head(
746 head: SessionHead,
747 checkpoint: Option<HydratedSessionCheckpoint>,
748) -> crate::RuntimeSessionState {
749 let mut state = crate::RuntimeSessionState {
750 session_id: head.session_id,
751 policy: crate::SessionPolicy::default(),
752 agent_frames: head.agent_frames,
753 current_agent_frame_id: head.current_agent_frame_id,
754 session_graph: head.graph,
755 turn_index: 0,
756 token_usage: crate::TokenUsage::default(),
757 last_prompt_usage: None,
758 protocol_turn_options: crate::ProtocolTurnOptions::default(),
759 tool_state_ref: None,
760 tool_state_generation: None,
761 tool_state_snapshot: None,
762 plugin_snapshot_ref: None,
763 plugin_snapshot_revision: None,
764 plugin_snapshot: None,
765 execution_state_ref: None,
766 execution_state_snapshot: None,
767 token_ledger: head.token_ledger,
768 checkpoint_ref: head.checkpoint_ref.clone(),
769 head_revision: Some(head.head_revision),
770 graph_replace_required: false,
771 };
772 state.policy.model = head.config.model.clone();
773 state.policy.provider_id = head.config.provider_id.clone();
774 if let Some(checkpoint) = checkpoint {
775 state.turn_index = checkpoint.turn_state.turn_index;
776 state.token_usage = checkpoint.turn_state.token_usage;
777 state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
778 state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
779 state.tool_state_ref = checkpoint.tool_state_ref.clone();
780 state.tool_state_generation = checkpoint
781 .tool_state
782 .as_ref()
783 .map(|snapshot| snapshot.generation());
784 state.tool_state_snapshot = checkpoint.tool_state;
785 state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
786 state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
787 state.plugin_snapshot = checkpoint.plugin_snapshot;
788 state.execution_state_ref = checkpoint.execution_state_ref.clone();
789 state.execution_state_snapshot = checkpoint.execution_state;
790 }
791 state.ensure_agent_frame_initialized();
792 state
793}
794
795impl Default for SessionHead {
796 fn default() -> Self {
797 Self {
798 session_id: default_root_session_id(),
799 head_revision: 0,
800 agent_frames: Vec::new(),
801 current_agent_frame_id: String::new(),
802 graph: crate::SessionGraph::default(),
803 config: crate::PersistedSessionConfig::default(),
804 checkpoint_ref: None,
805 token_ledger: Vec::new(),
806 }
807 }
808}
809
810impl Default for SessionHeadMeta {
811 fn default() -> Self {
812 Self {
813 session_id: default_root_session_id(),
814 head_revision: 0,
815 config: crate::PersistedSessionConfig::default(),
816 agent_frames: Vec::new(),
817 current_agent_frame_id: String::new(),
818 checkpoint_ref: None,
819 leaf_node_id: None,
820 graph_node_count: 0,
821 token_ledger: Vec::new(),
822 }
823 }
824}
825
826#[async_trait::async_trait]
841pub trait RuntimePersistence: AttachmentManifest + Send + Sync {
842 fn durability_tier(&self) -> crate::DurabilityTier {
845 crate::DurabilityTier::Inline
846 }
847
848 async fn load_session(
849 &self,
850 scope: SessionReadScope,
851 ) -> Result<Option<PersistedSessionRead>, StoreError>;
852
853 async fn load_node(
854 &self,
855 node_id: &str,
856 ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
857
858 async fn commit_runtime_state(
859 &self,
860 commit: RuntimeCommit,
861 ) -> Result<RuntimeCommitResult, StoreError>;
862
863 async fn enqueue_queued_work(
864 &self,
865 batch: crate::QueuedWorkBatchDraft,
866 ) -> Result<crate::QueuedWorkBatch, StoreError>;
867
868 async fn claim_ready_queued_work(
869 &self,
870 session_id: &str,
871 owner_id: &str,
872 boundary: crate::QueuedWorkClaimBoundary,
873 lease_ttl_ms: u64,
874 max_batches: usize,
875 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError>;
876
877 async fn claim_ready_queued_work_by_batch_ids(
886 &self,
887 session_id: &str,
888 owner_id: &str,
889 boundary: crate::QueuedWorkClaimBoundary,
890 lease_ttl_ms: u64,
891 batch_ids: &[String],
892 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
893 if batch_ids.is_empty() {
894 return Ok(None);
895 }
896 let Some(claim) = self
897 .claim_ready_queued_work(
898 session_id,
899 owner_id,
900 boundary,
901 lease_ttl_ms,
902 batch_ids.len(),
903 )
904 .await?
905 else {
906 return Ok(None);
907 };
908 let claimed_ids = claim
909 .batches
910 .iter()
911 .map(|batch| batch.batch_id.as_str())
912 .collect::<Vec<_>>();
913 if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
914 return Ok(Some(claim));
915 }
916 self.abandon_queued_work_claim(&claim).await?;
917 Ok(None)
918 }
919
920 async fn renew_queued_work_claim(
921 &self,
922 claim: &crate::QueuedWorkClaim,
923 lease_ttl_ms: u64,
924 ) -> Result<crate::QueuedWorkClaim, StoreError>;
925
926 async fn abandon_queued_work_claim(
927 &self,
928 claim: &crate::QueuedWorkClaim,
929 ) -> Result<(), StoreError>;
930
931 async fn cancel_queued_work_batch(
938 &self,
939 session_id: &str,
940 batch_id: &str,
941 ) -> Result<Option<crate::QueuedWorkBatch>, StoreError>;
942
943 async fn list_queued_work(
944 &self,
945 session_id: &str,
946 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError>;
947
948 async fn list_pending_queued_work(
953 &self,
954 session_id: &str,
955 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError> {
956 self.list_queued_work(session_id).await
957 }
958
959 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
960 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
961
962 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
963 async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
964 async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
965}
966
967fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
968 persisted_session_state_from_head(
969 SessionHead {
970 session_id: read.session_id,
971 head_revision: read.head_revision,
972 agent_frames: read.agent_frames,
973 current_agent_frame_id: read.current_agent_frame_id,
974 graph: read.graph,
975 config: read.config,
976 checkpoint_ref: read.checkpoint_ref,
977 token_ledger: read.token_ledger,
978 },
979 read.checkpoint,
980 )
981}
982
983pub async fn load_persisted_session_state(
984 store: &(dyn RuntimePersistence + '_),
985) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
986 Ok(store
987 .load_session(SessionReadScope::FullGraph)
988 .await?
989 .map(persisted_session_state_from_read))
990}
991
992pub async fn load_persisted_session_state_active_path(
993 store: &(dyn RuntimePersistence + '_),
994 leaf_node_id: Option<String>,
995) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
996 Ok(store
997 .load_session(SessionReadScope::ActivePath { leaf_node_id })
998 .await?
999 .map(persisted_session_state_from_read))
1000}
1001
1002pub async fn refresh_persisted_session_state(
1003 store: &(dyn RuntimePersistence + '_),
1004 state: &mut crate::RuntimeSessionState,
1005) -> Result<(), StoreError> {
1006 if let Some(mut fresh) = load_persisted_session_state(store).await? {
1007 fresh.policy.session_id = state.policy.session_id.clone();
1008 fresh.policy.max_turns = state.policy.max_turns;
1009 *state = fresh;
1010 }
1011 Ok(())
1012}