1mod lease_timings;
4pub mod queued_work;
5
6pub use lease_timings::{LeaseTimings, LeaseTimingsError};
7
8const PROC_BOOT_ID_PATH: &str = "/proc/sys/kernel/random/boot_id";
9
10fn default_root_session_id() -> String {
11 "root".to_string()
12}
13
14pub const SESSION_HEAD_META_SCHEMA_VERSION: u32 = 1;
15pub const SESSION_CHECKPOINT_SCHEMA_VERSION: u32 = 1;
16
17#[cfg(test)]
18mod persisted_state_tests {
19 use super::*;
20
21 #[test]
22 fn persisted_state_hydrates_provider_id_without_live_provider_rebinding() {
23 let state = persisted_session_state_from_head(
24 SessionHead {
25 session_id: "stored".to_string(),
26 head_revision: 7,
27 agent_frames: Vec::new(),
28 current_agent_frame_id: String::new(),
29 graph: crate::SessionGraph::default(),
30 config: crate::PersistedSessionConfig {
31 provider_id: "stored-provider".to_string(),
32 model: crate::ModelSpec::default(),
33 },
34 checkpoint_ref: None,
35 token_ledger: Vec::new(),
36 },
37 None,
38 );
39
40 assert_eq!(state.policy.recorded_provider_id(), "stored-provider");
41 assert!(
42 state
43 .agent_frames
44 .iter()
45 .all(|frame| frame.assignment.policy.recorded_provider_id() == "stored-provider")
46 );
47 assert_eq!(state.head_revision, Some(7));
48 }
49
50 #[test]
51 fn versioned_json_record_rejects_missing_schema_version() {
52 let err = decode_versioned_json_record::<SessionHeadMeta>(
53 "{}",
54 "SessionHeadMeta",
55 SESSION_HEAD_META_SCHEMA_VERSION,
56 )
57 .expect_err("pre-versioned session head should fail");
58
59 assert!(matches!(
60 err,
61 StoreError::MissingRecordSchemaVersion {
62 record_kind: "SessionHeadMeta",
63 expected: SESSION_HEAD_META_SCHEMA_VERSION
64 }
65 ));
66 }
67
68 #[test]
69 fn versioned_json_record_rejects_invalid_schema_version() {
70 let err = decode_versioned_json_record::<SessionHeadMeta>(
71 r#"{"schema_version":"1"}"#,
72 "SessionHeadMeta",
73 SESSION_HEAD_META_SCHEMA_VERSION,
74 )
75 .expect_err("invalid session head schema version should fail");
76
77 assert!(matches!(
78 err,
79 StoreError::InvalidRecordSchemaVersion {
80 record_kind: "SessionHeadMeta",
81 expected: SESSION_HEAD_META_SCHEMA_VERSION,
82 ..
83 }
84 ));
85 }
86
87 #[test]
88 fn versioned_json_record_rejects_unsupported_schema_version() {
89 let err = decode_versioned_json_record::<SessionHeadMeta>(
90 r#"{"schema_version":2}"#,
91 "SessionHeadMeta",
92 SESSION_HEAD_META_SCHEMA_VERSION,
93 )
94 .expect_err("unsupported session head schema version should fail");
95
96 assert!(matches!(
97 err,
98 StoreError::UnsupportedRecordSchemaVersion {
99 record_kind: "SessionHeadMeta",
100 actual: 2,
101 expected: SESSION_HEAD_META_SCHEMA_VERSION
102 }
103 ));
104 }
105}
106
107#[derive(Debug, thiserror::Error)]
108pub enum StoreError {
109 #[error(
110 "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
111 )]
112 SessionBindingMismatch {
113 bound_session_id: String,
114 attempted_session_id: String,
115 },
116 #[error("store does not support read scope {0:?}")]
117 UnsupportedReadScope(SessionReadScope),
118 #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
119 HeadRevisionConflict { expected: Option<u64>, actual: u64 },
120 #[error(
121 "runtime turn `{turn_id}` for session `{session_id}` was already committed with a different commit hash"
122 )]
123 RuntimeTurnCommitConflict { session_id: String, turn_id: String },
124 #[error("queued work claim `{claim_id}` for session `{session_id}` is missing or expired")]
125 QueuedWorkClaimExpired {
126 session_id: String,
127 claim_id: String,
128 },
129 #[error("turn input claim `{claim_id}` for session `{session_id}` is missing or expired")]
130 TurnInputClaimExpired {
131 session_id: String,
132 claim_id: String,
133 },
134 #[error(
135 "pending turn input source_key `{source_key}` for session `{session_id}` is already bound to input `{existing_input_id}` with different submitted content"
136 )]
137 PendingTurnInputSourceKeyConflict {
138 session_id: String,
139 source_key: String,
140 existing_input_id: String,
141 },
142 #[error("session execution lease for session `{session_id}` is missing or expired")]
143 SessionExecutionLeaseExpired { session_id: String },
144 #[error(
145 "{record_kind} schema_version {actual} is not supported by this binary (expected {expected})"
146 )]
147 UnsupportedRecordSchemaVersion {
148 record_kind: &'static str,
149 actual: u32,
150 expected: u32,
151 },
152 #[error(
153 "{record_kind} is missing schema_version and was written by unsupported pre-versioned state (expected {expected})"
154 )]
155 MissingRecordSchemaVersion {
156 record_kind: &'static str,
157 expected: u32,
158 },
159 #[error("{record_kind} schema_version {actual} is invalid (expected integer {expected})")]
160 InvalidRecordSchemaVersion {
161 record_kind: &'static str,
162 actual: String,
163 expected: u32,
164 },
165 #[error("store backend error: {0}")]
166 Backend(String),
167}
168
169#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
170pub struct SessionMeta {
171 pub session_id: String,
172 pub session_name: String,
173 pub created_at: String,
174 pub model: String,
175 pub cwd: Option<String>,
176 pub relation: crate::SessionRelation,
177}
178
179impl SessionMeta {
180 pub fn parent_session_id(&self) -> Option<&str> {
183 self.relation.parent_session_id()
184 }
185}
186
187#[derive(Clone, Debug)]
189pub struct SessionPickerInfo {
190 pub session_id: String,
191 pub cwd: Option<String>,
192 pub relation: crate::SessionRelation,
193 pub first_user_message: String,
194 pub user_message_count: usize,
195}
196
197impl SessionPickerInfo {
198 pub fn parent_session_id(&self) -> Option<&str> {
199 self.relation.parent_session_id()
200 }
201}
202
203#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
204#[serde(transparent)]
205pub struct BlobRef(pub String);
206
207impl BlobRef {
208 pub fn as_str(&self) -> &str {
209 &self.0
210 }
211}
212
213impl std::fmt::Display for BlobRef {
214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 f.write_str(&self.0)
216 }
217}
218
219impl From<String> for BlobRef {
220 fn from(value: String) -> Self {
221 Self(value)
222 }
223}
224
225#[derive(Clone, Debug, Default, PartialEq, Eq)]
226pub struct GcReport {
227 pub root_count: usize,
228 pub retained_blob_count: usize,
229 pub deleted_blob_count: usize,
230}
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
238pub struct VacuumReport {
239 pub removed_node_count: usize,
240 pub removed_pending_turn_input_tombstone_count: usize,
241}
242
243#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
244pub struct SessionCheckpoint {
245 pub schema_version: u32,
246 pub turn_state: crate::PersistedTurnState,
247 #[serde(default, skip_serializing_if = "Option::is_none")]
248 pub tool_state_ref: Option<BlobRef>,
249 #[serde(default, skip_serializing_if = "Option::is_none")]
250 pub plugin_snapshot_ref: Option<BlobRef>,
251 #[serde(default, skip_serializing_if = "Option::is_none")]
252 pub plugin_snapshot_revision: Option<u64>,
253 #[serde(default, skip_serializing_if = "Option::is_none")]
254 pub execution_state_ref: Option<BlobRef>,
255}
256
257impl Default for SessionCheckpoint {
258 fn default() -> Self {
259 Self {
260 schema_version: SESSION_CHECKPOINT_SCHEMA_VERSION,
261 turn_state: crate::PersistedTurnState::default(),
262 tool_state_ref: None,
263 plugin_snapshot_ref: None,
264 plugin_snapshot_revision: None,
265 execution_state_ref: None,
266 }
267 }
268}
269
270impl SessionCheckpoint {
271 pub fn new(
272 turn_state: crate::PersistedTurnState,
273 tool_state_ref: Option<BlobRef>,
274 plugin_snapshot_ref: Option<BlobRef>,
275 plugin_snapshot_revision: Option<u64>,
276 execution_state_ref: Option<BlobRef>,
277 ) -> Self {
278 Self {
279 schema_version: SESSION_CHECKPOINT_SCHEMA_VERSION,
280 turn_state,
281 tool_state_ref,
282 plugin_snapshot_ref,
283 plugin_snapshot_revision,
284 execution_state_ref,
285 }
286 }
287}
288
289#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
290pub struct HydratedSessionCheckpoint {
291 pub turn_state: crate::PersistedTurnState,
292 pub tool_state_ref: Option<BlobRef>,
293 pub tool_state: Option<crate::ToolState>,
294 pub plugin_snapshot_ref: Option<BlobRef>,
295 pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
296 pub plugin_snapshot_revision: Option<u64>,
297 pub execution_state_ref: Option<BlobRef>,
298 pub execution_state: Option<Vec<u8>>,
299}
300
301#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
302pub struct SessionHead {
303 #[serde(default = "default_root_session_id")]
304 pub session_id: String,
305 #[serde(default)]
306 pub head_revision: u64,
307 #[serde(default)]
308 pub agent_frames: Vec<crate::AgentFrameRecord>,
309 #[serde(default, skip_serializing_if = "String::is_empty")]
310 pub current_agent_frame_id: crate::AgentFrameId,
311 pub graph: crate::SessionGraph,
312 pub config: crate::PersistedSessionConfig,
313 #[serde(default, skip_serializing_if = "Option::is_none")]
314 pub checkpoint_ref: Option<BlobRef>,
315 #[serde(default, skip_serializing_if = "Vec::is_empty")]
316 pub token_ledger: Vec<crate::TokenLedgerEntry>,
317}
318
319#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
320pub struct SessionHeadMeta {
321 pub schema_version: u32,
322 #[serde(default = "default_root_session_id")]
323 pub session_id: String,
324 #[serde(default)]
325 pub head_revision: u64,
326 pub config: crate::PersistedSessionConfig,
327 #[serde(default)]
328 pub agent_frames: Vec<crate::AgentFrameRecord>,
329 #[serde(default, skip_serializing_if = "String::is_empty")]
330 pub current_agent_frame_id: crate::AgentFrameId,
331 #[serde(default, skip_serializing_if = "Option::is_none")]
332 pub checkpoint_ref: Option<BlobRef>,
333 #[serde(default, skip_serializing_if = "Option::is_none")]
334 pub leaf_node_id: Option<String>,
335 #[serde(default)]
336 pub graph_node_count: usize,
337 #[serde(default, skip_serializing_if = "Vec::is_empty")]
338 pub token_ledger: Vec<crate::TokenLedgerEntry>,
339}
340
341fn persisted_session_config_from_state(
342 state: &crate::RuntimeSessionState,
343) -> crate::PersistedSessionConfig {
344 crate::PersistedSessionConfig {
345 provider_id: state.policy.recorded_provider_id().to_string(),
346 model: state.policy.model.clone(),
347 }
348}
349
350#[derive(Clone, Debug, PartialEq, Eq)]
351pub enum SessionReadScope {
352 FullGraph,
353 ActivePath { leaf_node_id: Option<String> },
354}
355
356#[derive(Clone, Debug)]
357pub struct PersistedSessionRead {
358 pub session_id: String,
359 pub head_revision: u64,
360 pub config: crate::PersistedSessionConfig,
361 pub agent_frames: Vec<crate::AgentFrameRecord>,
362 pub current_agent_frame_id: crate::AgentFrameId,
363 pub graph: crate::SessionGraph,
364 pub checkpoint_ref: Option<BlobRef>,
365 pub checkpoint: Option<HydratedSessionCheckpoint>,
366 pub token_ledger: Vec<crate::TokenLedgerEntry>,
367}
368
369#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
370pub enum GraphCommitDelta {
371 Unchanged {
372 leaf_node_id: Option<String>,
373 },
374 Append {
375 nodes: Vec<crate::SessionNodeRecord>,
376 leaf_node_id: Option<String>,
377 },
378 ReplaceFull(crate::SessionGraph),
379}
380
381impl GraphCommitDelta {
382 pub fn leaf_node_id(&self) -> Option<&String> {
383 match self {
384 Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
385 leaf_node_id.as_ref()
386 }
387 Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
388 }
389 }
390}
391
392#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
393pub struct RuntimeCommit {
394 pub session_id: String,
395 pub expected_head_revision: Option<u64>,
396 #[serde(default, skip_serializing_if = "Option::is_none")]
397 pub session_execution_lease: Option<SessionExecutionLeaseFence>,
398 #[serde(default, skip_serializing_if = "Option::is_none")]
399 pub release_session_execution_lease: Option<SessionExecutionLeaseCompletion>,
400 pub config: crate::PersistedSessionConfig,
401 pub agent_frames: Vec<crate::AgentFrameRecord>,
402 pub current_agent_frame_id: crate::AgentFrameId,
403 pub graph: GraphCommitDelta,
404 pub checkpoint: HydratedSessionCheckpoint,
405 pub usage_deltas: Vec<crate::TokenLedgerEntry>,
406 pub turn_commit: Option<RuntimeTurnCommitStamp>,
407 pub completed_queue_claims: Vec<crate::QueuedWorkCompletion>,
408 pub completed_turn_input_claims: Vec<crate::TurnInputCompletion>,
409 #[serde(default, skip_serializing_if = "Option::is_none")]
410 pub interrupted_turn_input_turn_id: Option<String>,
411 pub committed_attachment_ids: Vec<crate::AttachmentId>,
419}
420
421#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
422pub struct RuntimeCommitResult {
423 pub head_revision: u64,
424 pub checkpoint_ref: BlobRef,
425 pub manifest: SessionCheckpoint,
426}
427
428#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
429pub struct LeaseOwnerIdentity {
430 pub owner_id: String,
431 pub incarnation_id: String,
432 #[serde(default)]
433 pub liveness: LeaseOwnerLiveness,
434}
435
436impl LeaseOwnerIdentity {
437 pub fn opaque(
438 owner_id: impl Into<String>,
439 incarnation_id: impl Into<String>,
440 ) -> LeaseOwnerIdentity {
441 LeaseOwnerIdentity {
442 owner_id: owner_id.into(),
443 incarnation_id: incarnation_id.into(),
444 liveness: LeaseOwnerLiveness::Opaque,
445 }
446 }
447
448 pub fn local_process(
449 owner_id: impl Into<String>,
450 incarnation_id: impl Into<String>,
451 host_id: impl Into<String>,
452 ) -> LeaseOwnerIdentity {
453 let liveness = LeaseOwnerLiveness::current_local_process(host_id.into())
454 .unwrap_or(LeaseOwnerLiveness::Opaque);
455 LeaseOwnerIdentity {
456 owner_id: owner_id.into(),
457 incarnation_id: incarnation_id.into(),
458 liveness,
459 }
460 }
461
462 pub fn same_incarnation(&self, other: &LeaseOwnerIdentity) -> bool {
463 self.owner_id == other.owner_id && self.incarnation_id == other.incarnation_id
464 }
465
466 pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerIdentity) -> bool {
467 self.liveness
468 .is_definitely_dead_for_claimant(&claimant.liveness)
469 }
470}
471
472#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
473#[serde(tag = "kind", rename_all = "snake_case")]
474pub enum LeaseOwnerLiveness {
475 LocalProcess {
476 host_id: String,
477 boot_id: String,
478 pid: u32,
479 process_start: String,
480 },
481 #[default]
482 Opaque,
483}
484
485impl LeaseOwnerLiveness {
486 pub fn current_local_process(host_id: impl Into<String>) -> Option<LeaseOwnerLiveness> {
487 let boot_id = std::fs::read_to_string(PROC_BOOT_ID_PATH)
488 .ok()
489 .map(|value| value.trim().to_string())
490 .filter(|value| !value.is_empty())?;
491 let pid = std::process::id();
492 let process_start = read_linux_process_start(pid)?;
493 Some(LeaseOwnerLiveness::LocalProcess {
494 host_id: host_id.into(),
495 boot_id,
496 pid,
497 process_start,
498 })
499 }
500
501 pub fn local_process_for_test(
502 host_id: impl Into<String>,
503 boot_id: impl Into<String>,
504 pid: u32,
505 process_start: impl Into<String>,
506 ) -> LeaseOwnerLiveness {
507 LeaseOwnerLiveness::LocalProcess {
508 host_id: host_id.into(),
509 boot_id: boot_id.into(),
510 pid,
511 process_start: process_start.into(),
512 }
513 }
514
515 pub fn is_definitely_dead_for_claimant(&self, claimant: &LeaseOwnerLiveness) -> bool {
516 let (
517 LeaseOwnerLiveness::LocalProcess {
518 host_id,
519 boot_id,
520 pid,
521 process_start,
522 },
523 LeaseOwnerLiveness::LocalProcess {
524 host_id: claimant_host_id,
525 boot_id: claimant_boot_id,
526 ..
527 },
528 ) = (self, claimant)
529 else {
530 return false;
531 };
532 if host_id != claimant_host_id || boot_id != claimant_boot_id {
533 return false;
534 }
535 matches!(linux_process_is_live(*pid, process_start), Some(false))
536 }
537}
538
539fn read_linux_process_start(pid: u32) -> Option<String> {
540 let stat = std::fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
541 parse_linux_process_start(&stat)
542}
543
544fn linux_process_is_live(pid: u32, expected_process_start: &str) -> Option<bool> {
545 match std::fs::read_to_string(format!("/proc/{pid}/stat")) {
546 Ok(stat) => parse_linux_process_start(&stat).map(|start| start == expected_process_start),
547 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Some(false),
548 Err(_) => None,
549 }
550}
551
552fn parse_linux_process_start(stat: &str) -> Option<String> {
553 let after_comm = stat.rsplit_once(") ")?.1;
554 after_comm.split_whitespace().nth(19).map(ToOwned::to_owned)
555}
556
557#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
558pub struct SessionExecutionLease {
559 pub session_id: String,
560 pub owner: LeaseOwnerIdentity,
561 pub lease_token: String,
562 pub fencing_token: u64,
563 pub claimed_at_epoch_ms: u64,
564 pub expires_at_epoch_ms: u64,
565}
566
567#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
568pub struct SessionExecutionLeaseFence {
569 pub session_id: String,
570 pub owner: LeaseOwnerIdentity,
571 pub lease_token: String,
572 pub fencing_token: u64,
573}
574
575#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
576pub struct SessionExecutionLeaseCompletion {
577 pub session_id: String,
578 pub owner: LeaseOwnerIdentity,
579 pub lease_token: String,
580 pub fencing_token: u64,
581}
582
583impl SessionExecutionLease {
584 pub fn fence(&self) -> SessionExecutionLeaseFence {
585 SessionExecutionLeaseFence {
586 session_id: self.session_id.clone(),
587 owner: self.owner.clone(),
588 lease_token: self.lease_token.clone(),
589 fencing_token: self.fencing_token,
590 }
591 }
592
593 pub fn completion(&self) -> SessionExecutionLeaseCompletion {
594 SessionExecutionLeaseCompletion {
595 session_id: self.session_id.clone(),
596 owner: self.owner.clone(),
597 lease_token: self.lease_token.clone(),
598 fencing_token: self.fencing_token,
599 }
600 }
601}
602
603impl SessionExecutionLeaseCompletion {
604 pub fn from_lease(lease: &SessionExecutionLease) -> Self {
605 lease.completion()
606 }
607}
608
609#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
610pub enum SessionExecutionLeaseClaimOutcome {
611 Acquired(SessionExecutionLease),
612 Busy { holder: SessionExecutionLease },
613}
614
615impl SessionExecutionLeaseClaimOutcome {
616 pub fn acquired(self) -> Option<SessionExecutionLease> {
617 match self {
618 Self::Acquired(lease) => Some(lease),
619 Self::Busy { .. } => None,
620 }
621 }
622}
623
624#[derive(Clone, Debug)]
643pub struct AttachmentIntent {
644 pub attachment_id: crate::AttachmentId,
645 pub session_id: String,
646 pub canonical_uri: String,
651 pub intent_at_epoch_ms: u64,
652}
653
654#[derive(Clone, Debug)]
655pub struct AttachmentManifestEntry {
656 pub attachment_id: crate::AttachmentId,
657 pub session_id: String,
658 pub canonical_uri: String,
659 pub intent_at_epoch_ms: u64,
660 pub committed_at_epoch_ms: Option<u64>,
661}
662
663pub trait AttachmentManifest: Send + Sync {
675 fn record_intent(&self, intent: AttachmentIntent) -> Result<(), StoreError>;
676
677 fn commit_refs(
684 &self,
685 session_id: &str,
686 attachment_ids: &[crate::AttachmentId],
687 ) -> Result<(), StoreError>;
688
689 fn list_uncommitted(
694 &self,
695 older_than_epoch_ms: u64,
696 ) -> Result<Vec<AttachmentManifestEntry>, StoreError>;
697
698 fn forget(&self, attachment_id: &crate::AttachmentId) -> Result<(), StoreError>;
702}
703
704#[macro_export]
709macro_rules! impl_noop_attachment_manifest {
710 ($ty:ty) => {
711 impl $crate::AttachmentManifest for $ty {
712 fn record_intent(
713 &self,
714 _intent: $crate::AttachmentIntent,
715 ) -> ::std::result::Result<(), $crate::StoreError> {
716 Ok(())
717 }
718
719 fn commit_refs(
720 &self,
721 _session_id: &str,
722 _attachment_ids: &[$crate::AttachmentId],
723 ) -> ::std::result::Result<(), $crate::StoreError> {
724 Ok(())
725 }
726
727 fn list_uncommitted(
728 &self,
729 _older_than_epoch_ms: u64,
730 ) -> ::std::result::Result<Vec<$crate::AttachmentManifestEntry>, $crate::StoreError>
731 {
732 Ok(Vec::new())
733 }
734
735 fn forget(
736 &self,
737 _attachment_id: &$crate::AttachmentId,
738 ) -> ::std::result::Result<(), $crate::StoreError> {
739 Ok(())
740 }
741 }
742 };
743}
744
745pub fn ensure_supported_schema_version(
749 record_kind: &'static str,
750 actual: u32,
751 expected: u32,
752) -> Result<(), StoreError> {
753 if actual == expected {
754 Ok(())
755 } else {
756 Err(StoreError::UnsupportedRecordSchemaVersion {
757 record_kind,
758 actual,
759 expected,
760 })
761 }
762}
763
764pub fn ensure_supported_record_schema_version(
765 record_kind: &'static str,
766 value: &serde_json::Value,
767 expected: u32,
768) -> Result<(), StoreError> {
769 let Some(schema_version) = value.get("schema_version") else {
770 return Err(StoreError::MissingRecordSchemaVersion {
771 record_kind,
772 expected,
773 });
774 };
775 let Some(actual) = schema_version
776 .as_u64()
777 .and_then(|version| u32::try_from(version).ok())
778 else {
779 return Err(StoreError::InvalidRecordSchemaVersion {
780 record_kind,
781 actual: schema_version.to_string(),
782 expected,
783 });
784 };
785 ensure_supported_schema_version(record_kind, actual, expected)
786}
787
788pub fn decode_versioned_json_record<T>(
789 json: &str,
790 record_kind: &'static str,
791 expected: u32,
792) -> Result<T, StoreError>
793where
794 T: serde::de::DeserializeOwned,
795{
796 let value: serde_json::Value = serde_json::from_str(json)
797 .map_err(|err| StoreError::Backend(format!("failed to decode {record_kind}: {err}")))?;
798 ensure_supported_record_schema_version(record_kind, &value, expected)?;
799 serde_json::from_value(value)
800 .map_err(|err| StoreError::Backend(format!("failed to decode {record_kind}: {err}")))
801}
802
803#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
804pub struct RuntimeTurnCommitStamp {
805 pub session_id: String,
806 pub turn_id: String,
807 pub turn_commit_hash: String,
808}
809
810impl RuntimeTurnCommitStamp {
811 pub fn new(
812 session_id: impl Into<String>,
813 turn_id: impl Into<String>,
814 turn_commit_hash: impl Into<String>,
815 ) -> Self {
816 Self {
817 session_id: session_id.into(),
818 turn_id: turn_id.into(),
819 turn_commit_hash: turn_commit_hash.into(),
820 }
821 }
822}
823
824fn build_persisted_turn_state(state: &crate::RuntimeSessionState) -> crate::PersistedTurnState {
825 crate::PersistedTurnState {
826 turn_index: state.turn_index,
827 token_usage: state.token_usage.clone(),
828 last_prompt_usage: state.last_prompt_usage.clone(),
829 protocol_turn_options: state.protocol_turn_options.clone(),
830 }
831}
832
833fn build_checkpoint_from_persisted_state(
834 state: &crate::RuntimeSessionState,
835) -> HydratedSessionCheckpoint {
836 HydratedSessionCheckpoint {
837 turn_state: build_persisted_turn_state(state),
838 tool_state_ref: state.tool_state_ref.clone(),
839 tool_state: state.tool_state_snapshot.clone(),
840 plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
841 plugin_snapshot_revision: state.plugin_snapshot_revision,
842 plugin_snapshot: state.plugin_snapshot.clone(),
843 execution_state_ref: state.execution_state_ref.clone(),
844 execution_state: state.execution_state_snapshot.clone(),
845 }
846}
847
848impl RuntimeCommit {
849 pub fn turn_commit_hash(&self) -> Result<String, StoreError> {
850 let mut semantic_commit = self.clone();
851 semantic_commit.expected_head_revision = None;
852 semantic_commit.session_execution_lease = None;
853 semantic_commit.release_session_execution_lease = None;
854 semantic_commit.turn_commit = None;
855 let mut semantic_commit = serde_json::to_value(&semantic_commit).map_err(|err| {
856 StoreError::Backend(format!("failed to serialize runtime turn commit: {err}"))
857 })?;
858 scrub_turn_commit_hash_value(&mut semantic_commit);
859 crate::stable_hash::stable_json_sha256_hex(&semantic_commit).map_err(|err| {
860 StoreError::Backend(format!(
861 "failed to serialize runtime turn commit hash: {err}"
862 ))
863 })
864 }
865
866 pub fn persisted_state(
867 state: &crate::RuntimeSessionState,
868 usage_deltas: &[crate::TokenLedgerEntry],
869 ) -> Self {
870 Self {
871 session_id: state.session_id.clone(),
872 expected_head_revision: state.head_revision,
873 session_execution_lease: None,
874 release_session_execution_lease: None,
875 config: persisted_session_config_from_state(state),
876 agent_frames: state.agent_frames.clone(),
877 current_agent_frame_id: state.current_agent_frame_id.clone(),
878 graph: if state.graph_replace_required || state.head_revision.is_none() {
879 GraphCommitDelta::ReplaceFull(state.session_graph.clone())
880 } else {
881 GraphCommitDelta::Unchanged {
882 leaf_node_id: state.session_graph.leaf_node_id.clone(),
883 }
884 },
885 checkpoint: build_checkpoint_from_persisted_state(state),
886 usage_deltas: usage_deltas.to_vec(),
887 turn_commit: None,
888 completed_queue_claims: Vec::new(),
889 completed_turn_input_claims: Vec::new(),
890 interrupted_turn_input_turn_id: None,
891 committed_attachment_ids: Vec::new(),
892 }
893 }
894
895 pub(crate) fn persisted_state_with_graph_commit(
896 state: &crate::RuntimeSessionState,
897 graph: GraphCommitDelta,
898 usage_deltas: &[crate::TokenLedgerEntry],
899 ) -> Self {
900 Self {
901 session_id: state.session_id.clone(),
902 expected_head_revision: state.head_revision,
903 session_execution_lease: None,
904 release_session_execution_lease: None,
905 config: persisted_session_config_from_state(state),
906 agent_frames: state.agent_frames.clone(),
907 current_agent_frame_id: state.current_agent_frame_id.clone(),
908 graph,
909 checkpoint: build_checkpoint_from_persisted_state(state),
910 usage_deltas: usage_deltas.to_vec(),
911 turn_commit: None,
912 completed_queue_claims: Vec::new(),
913 completed_turn_input_claims: Vec::new(),
914 interrupted_turn_input_turn_id: None,
915 committed_attachment_ids: Vec::new(),
916 }
917 }
918
919 pub fn with_turn_commit(mut self, turn_commit: RuntimeTurnCommitStamp) -> Self {
920 self.turn_commit = Some(turn_commit);
921 self
922 }
923
924 pub fn with_session_execution_lease(mut self, lease: SessionExecutionLeaseFence) -> Self {
925 self.session_execution_lease = Some(lease);
926 self
927 }
928
929 pub fn releasing_session_execution_lease(
930 mut self,
931 completion: SessionExecutionLeaseCompletion,
932 ) -> Self {
933 self.release_session_execution_lease = Some(completion);
934 self
935 }
936
937 pub fn completing_queue_claim(
938 mut self,
939 completed_queue_claim: crate::QueuedWorkCompletion,
940 ) -> Self {
941 self.completed_queue_claims.push(completed_queue_claim);
942 self
943 }
944
945 pub fn completing_queue_claims(
946 mut self,
947 completed_queue_claims: impl IntoIterator<Item = crate::QueuedWorkCompletion>,
948 ) -> Self {
949 self.completed_queue_claims.extend(completed_queue_claims);
950 self
951 }
952
953 pub fn completing_turn_input_claim(
954 mut self,
955 completed_turn_input_claim: crate::TurnInputCompletion,
956 ) -> Self {
957 self.completed_turn_input_claims
958 .push(completed_turn_input_claim);
959 self
960 }
961
962 pub fn completing_turn_input_claims(
963 mut self,
964 completed_turn_input_claims: impl IntoIterator<Item = crate::TurnInputCompletion>,
965 ) -> Self {
966 self.completed_turn_input_claims
967 .extend(completed_turn_input_claims);
968 self
969 }
970
971 pub fn deferring_interrupted_turn_inputs(mut self, turn_id: impl Into<String>) -> Self {
972 self.interrupted_turn_input_turn_id = Some(turn_id.into());
973 self
974 }
975
976 pub fn with_committed_attachments(
977 mut self,
978 attachment_ids: impl IntoIterator<Item = crate::AttachmentId>,
979 ) -> Self {
980 self.committed_attachment_ids = attachment_ids.into_iter().collect();
981 self
982 }
983}
984
985fn scrub_turn_commit_hash_value(value: &mut serde_json::Value) {
986 match value {
987 serde_json::Value::Object(map) => {
988 let is_message = map.contains_key("role") && map.contains_key("parts");
989 let is_message_part = map.contains_key("kind")
990 && map.contains_key("content")
991 && map.contains_key("prune_state");
992 if is_message || is_message_part {
993 map.remove("id");
994 }
995 for volatile_key in ["node_id", "parent_node_id", "leaf_node_id", "timestamp"] {
996 map.remove(volatile_key);
997 }
998 for child in map.values_mut() {
999 scrub_turn_commit_hash_value(child);
1000 }
1001 }
1002 serde_json::Value::Array(items) => {
1003 for item in items {
1004 scrub_turn_commit_hash_value(item);
1005 }
1006 }
1007 _ => {}
1008 }
1009}
1010
1011fn persisted_session_state_from_head(
1012 head: SessionHead,
1013 checkpoint: Option<HydratedSessionCheckpoint>,
1014) -> crate::RuntimeSessionState {
1015 let mut state = crate::RuntimeSessionState {
1016 session_id: head.session_id,
1017 policy: crate::SessionPolicy::default(),
1018 agent_frames: head.agent_frames,
1019 current_agent_frame_id: head.current_agent_frame_id,
1020 session_graph: head.graph,
1021 turn_index: 0,
1022 token_usage: crate::TokenUsage::default(),
1023 last_prompt_usage: None,
1024 protocol_turn_options: crate::ProtocolTurnOptions::default(),
1025 tool_state_ref: None,
1026 tool_state_generation: None,
1027 tool_state_snapshot: None,
1028 plugin_snapshot_ref: None,
1029 plugin_snapshot_revision: None,
1030 plugin_snapshot: None,
1031 execution_state_ref: None,
1032 execution_state_snapshot: None,
1033 token_ledger: head.token_ledger,
1034 checkpoint_ref: head.checkpoint_ref.clone(),
1035 head_revision: Some(head.head_revision),
1036 graph_replace_required: false,
1037 };
1038 state.policy.model = head.config.model.clone();
1039 state.policy.provider_id = head.config.provider_id.clone();
1040 if let Some(checkpoint) = checkpoint {
1041 state.turn_index = checkpoint.turn_state.turn_index;
1042 state.token_usage = checkpoint.turn_state.token_usage;
1043 state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
1044 state.protocol_turn_options = checkpoint.turn_state.protocol_turn_options;
1045 state.tool_state_ref = checkpoint.tool_state_ref.clone();
1046 state.tool_state_generation = checkpoint
1047 .tool_state
1048 .as_ref()
1049 .map(|snapshot| snapshot.generation());
1050 state.tool_state_snapshot = checkpoint.tool_state;
1051 state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
1052 state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
1053 state.plugin_snapshot = checkpoint.plugin_snapshot;
1054 state.execution_state_ref = checkpoint.execution_state_ref.clone();
1055 state.execution_state_snapshot = checkpoint.execution_state;
1056 }
1057 state.ensure_agent_frame_initialized();
1058 state
1059}
1060
1061impl Default for SessionHead {
1062 fn default() -> Self {
1063 Self {
1064 session_id: default_root_session_id(),
1065 head_revision: 0,
1066 agent_frames: Vec::new(),
1067 current_agent_frame_id: String::new(),
1068 graph: crate::SessionGraph::default(),
1069 config: crate::PersistedSessionConfig::default(),
1070 checkpoint_ref: None,
1071 token_ledger: Vec::new(),
1072 }
1073 }
1074}
1075
1076impl Default for SessionHeadMeta {
1077 fn default() -> Self {
1078 Self {
1079 schema_version: SESSION_HEAD_META_SCHEMA_VERSION,
1080 session_id: default_root_session_id(),
1081 head_revision: 0,
1082 config: crate::PersistedSessionConfig::default(),
1083 agent_frames: Vec::new(),
1084 current_agent_frame_id: String::new(),
1085 checkpoint_ref: None,
1086 leaf_node_id: None,
1087 graph_node_count: 0,
1088 token_ledger: Vec::new(),
1089 }
1090 }
1091}
1092
1093#[async_trait::async_trait]
1110pub trait SessionCommitStore: AttachmentManifest + Send + Sync {
1111 fn durability_tier(&self) -> crate::DurabilityTier {
1114 crate::DurabilityTier::Inline
1115 }
1116
1117 async fn load_session(
1118 &self,
1119 scope: SessionReadScope,
1120 ) -> Result<Option<PersistedSessionRead>, StoreError>;
1121
1122 async fn load_node(
1123 &self,
1124 node_id: &str,
1125 ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
1126
1127 async fn commit_runtime_state(
1128 &self,
1129 commit: RuntimeCommit,
1130 ) -> Result<RuntimeCommitResult, StoreError>;
1131
1132 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
1133 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
1134}
1135
1136#[async_trait::async_trait]
1144pub trait TurnInputStore: Send + Sync {
1145 async fn enqueue_pending_turn_input(
1147 &self,
1148 input: crate::PendingTurnInputDraft,
1149 ) -> Result<crate::PendingTurnInput, StoreError>;
1150
1151 async fn list_pending_turn_inputs(
1156 &self,
1157 session_id: &str,
1158 ) -> Result<Vec<crate::PendingTurnInput>, StoreError>;
1159
1160 async fn cancel_pending_turn_input(
1167 &self,
1168 session_id: &str,
1169 input_id: &str,
1170 ) -> Result<crate::PendingTurnInputCancelOutcome, StoreError> {
1171 let target = crate::PendingTurnInputCancelTarget::input_id(input_id);
1172 let targets = vec![target];
1173 let mut outcomes = self
1174 .cancel_pending_turn_inputs(session_id, &targets)
1175 .await?;
1176 Ok(outcomes
1177 .pop()
1178 .map(|result| result.outcome)
1179 .unwrap_or(crate::PendingTurnInputCancelOutcome::NotFound))
1180 }
1181
1182 async fn cancel_pending_turn_inputs(
1184 &self,
1185 session_id: &str,
1186 targets: &[crate::PendingTurnInputCancelTarget],
1187 ) -> Result<Vec<crate::PendingTurnInputCancelResult>, StoreError>;
1188
1189 async fn cancel_pending_turn_input_suffix(
1191 &self,
1192 session_id: &str,
1193 anchor: &crate::PendingTurnInputCancelTarget,
1194 ) -> Result<crate::PendingTurnInputSuffixCancelOutcome, StoreError>;
1195
1196 #[allow(clippy::too_many_arguments)]
1203 async fn claim_active_turn_inputs(
1204 &self,
1205 session_id: &str,
1206 session_execution_lease: &SessionExecutionLeaseFence,
1207 owner: &LeaseOwnerIdentity,
1208 turn_id: &str,
1209 checkpoint: crate::CheckpointKind,
1210 lease_ttl_ms: u64,
1211 max_inputs: usize,
1212 ) -> Result<Option<crate::TurnInputClaim>, StoreError>;
1213
1214 async fn claim_next_turn_inputs(
1216 &self,
1217 session_id: &str,
1218 session_execution_lease: &SessionExecutionLeaseFence,
1219 owner: &LeaseOwnerIdentity,
1220 lease_ttl_ms: u64,
1221 max_inputs: usize,
1222 ) -> Result<Option<crate::TurnInputClaim>, StoreError>;
1223
1224 async fn abandon_turn_input_claim(
1226 &self,
1227 claim: &crate::TurnInputClaim,
1228 ) -> Result<(), StoreError>;
1229}
1230
1231#[async_trait::async_trait]
1234pub trait SessionExecutionLeaseStore: Send + Sync {
1235 async fn try_claim_session_execution_lease(
1242 &self,
1243 session_id: &str,
1244 owner: &LeaseOwnerIdentity,
1245 lease_ttl_ms: u64,
1246 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
1247
1248 async fn reclaim_session_execution_lease(
1254 &self,
1255 session_id: &str,
1256 owner: &LeaseOwnerIdentity,
1257 observed_holder: &SessionExecutionLeaseFence,
1258 lease_ttl_ms: u64,
1259 ) -> Result<SessionExecutionLeaseClaimOutcome, StoreError>;
1260
1261 async fn renew_session_execution_lease(
1266 &self,
1267 fence: &SessionExecutionLeaseFence,
1268 lease_ttl_ms: u64,
1269 ) -> Result<SessionExecutionLease, StoreError>;
1270
1271 async fn release_session_execution_lease(
1275 &self,
1276 completion: &SessionExecutionLeaseCompletion,
1277 ) -> Result<(), StoreError>;
1278}
1279
1280#[async_trait::async_trait]
1286pub trait QueuedWorkStore: Send + Sync {
1287 async fn enqueue_queued_work(
1289 &self,
1290 batch: crate::QueuedWorkBatchDraft,
1291 ) -> Result<crate::QueuedWorkBatch, StoreError>;
1292
1293 async fn claim_leading_ready_session_command(
1300 &self,
1301 session_id: &str,
1302 session_execution_lease: &SessionExecutionLeaseFence,
1303 owner: &LeaseOwnerIdentity,
1304 lease_ttl_ms: u64,
1305 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError>;
1306
1307 async fn claim_ready_queued_work(
1314 &self,
1315 session_id: &str,
1316 session_execution_lease: &SessionExecutionLeaseFence,
1317 owner: &LeaseOwnerIdentity,
1318 boundary: crate::QueuedWorkClaimBoundary,
1319 lease_ttl_ms: u64,
1320 max_batches: usize,
1321 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError>;
1322
1323 async fn claim_ready_queued_work_by_batch_ids(
1335 &self,
1336 session_id: &str,
1337 session_execution_lease: &SessionExecutionLeaseFence,
1338 owner: &LeaseOwnerIdentity,
1339 boundary: crate::QueuedWorkClaimBoundary,
1340 lease_ttl_ms: u64,
1341 batch_ids: &[String],
1342 ) -> Result<Option<crate::QueuedWorkClaim>, StoreError> {
1343 if batch_ids.is_empty() {
1344 return Ok(None);
1345 }
1346 let Some(claim) = self
1347 .claim_ready_queued_work(
1348 session_id,
1349 session_execution_lease,
1350 owner,
1351 boundary,
1352 lease_ttl_ms,
1353 batch_ids.len(),
1354 )
1355 .await?
1356 else {
1357 return Ok(None);
1358 };
1359 let claimed_ids = claim
1360 .batches
1361 .iter()
1362 .map(|batch| batch.batch_id.as_str())
1363 .collect::<Vec<_>>();
1364 if claimed_ids == batch_ids.iter().map(String::as_str).collect::<Vec<_>>() {
1365 return Ok(Some(claim));
1366 }
1367 self.abandon_queued_work_claim(&claim).await?;
1368 Ok(None)
1369 }
1370
1371 async fn renew_queued_work_claim(
1373 &self,
1374 claim: &crate::QueuedWorkClaim,
1375 lease_ttl_ms: u64,
1376 ) -> Result<crate::QueuedWorkClaim, StoreError>;
1377
1378 async fn abandon_queued_work_claim(
1380 &self,
1381 claim: &crate::QueuedWorkClaim,
1382 ) -> Result<(), StoreError>;
1383
1384 async fn cancel_queued_work_batch(
1391 &self,
1392 session_id: &str,
1393 batch_id: &str,
1394 ) -> Result<Option<crate::QueuedWorkBatch>, StoreError>;
1395
1396 async fn list_queued_work(
1399 &self,
1400 session_id: &str,
1401 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError>;
1402
1403 async fn list_pending_queued_work(
1414 &self,
1415 session_id: &str,
1416 ) -> Result<Vec<crate::QueuedWorkBatch>, StoreError>;
1417}
1418
1419#[async_trait::async_trait]
1422pub trait StoreMaintenance: Send + Sync {
1423 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
1426
1427 async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
1430
1431 async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
1433}
1434
1435pub trait RuntimePersistence:
1452 SessionCommitStore
1453 + TurnInputStore
1454 + SessionExecutionLeaseStore
1455 + QueuedWorkStore
1456 + StoreMaintenance
1457{
1458}
1459
1460impl<T> RuntimePersistence for T where
1461 T: SessionCommitStore
1462 + TurnInputStore
1463 + SessionExecutionLeaseStore
1464 + QueuedWorkStore
1465 + StoreMaintenance
1466 + ?Sized
1467{
1468}
1469
1470fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::RuntimeSessionState {
1471 persisted_session_state_from_head(
1472 SessionHead {
1473 session_id: read.session_id,
1474 head_revision: read.head_revision,
1475 agent_frames: read.agent_frames,
1476 current_agent_frame_id: read.current_agent_frame_id,
1477 graph: read.graph,
1478 config: read.config,
1479 checkpoint_ref: read.checkpoint_ref,
1480 token_ledger: read.token_ledger,
1481 },
1482 read.checkpoint,
1483 )
1484}
1485
1486pub async fn load_persisted_session_state(
1487 store: &(dyn RuntimePersistence + '_),
1488) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1489 Ok(store
1490 .load_session(SessionReadScope::FullGraph)
1491 .await?
1492 .map(persisted_session_state_from_read))
1493}
1494
1495pub async fn load_persisted_session_state_active_path(
1496 store: &(dyn RuntimePersistence + '_),
1497 leaf_node_id: Option<String>,
1498) -> Result<Option<crate::RuntimeSessionState>, StoreError> {
1499 Ok(store
1500 .load_session(SessionReadScope::ActivePath { leaf_node_id })
1501 .await?
1502 .map(persisted_session_state_from_read))
1503}
1504
1505pub async fn refresh_persisted_session_state(
1506 store: &(dyn RuntimePersistence + '_),
1507 state: &mut crate::RuntimeSessionState,
1508) -> Result<(), StoreError> {
1509 if let Some(mut fresh) = load_persisted_session_state(store).await? {
1510 fresh.policy.session_id = state.policy.session_id.clone();
1511 fresh.policy.max_turns = state.policy.max_turns;
1512 *state = fresh;
1513 }
1514 Ok(())
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519 use super::{LeaseOwnerIdentity, LeaseOwnerLiveness};
1520
1521 fn local_liveness(
1522 host_id: &str,
1523 boot_id: &str,
1524 pid: u32,
1525 process_start: &str,
1526 ) -> LeaseOwnerLiveness {
1527 LeaseOwnerLiveness::local_process_for_test(host_id, boot_id, pid, process_start)
1528 }
1529
1530 #[test]
1531 fn lease_owner_identity_requires_same_incarnation() {
1532 let first = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1533 let same = LeaseOwnerIdentity::opaque("owner", "incarnation-a");
1534 let next = LeaseOwnerIdentity::opaque("owner", "incarnation-b");
1535
1536 assert!(first.same_incarnation(&same));
1537 assert!(!first.same_incarnation(&next));
1538 }
1539
1540 #[test]
1541 fn local_liveness_only_proves_same_host_boot_dead_processes() {
1542 let holder = local_liveness(
1543 "host-a",
1544 "boot-a",
1545 std::process::id(),
1546 "not-the-current-process-start",
1547 );
1548 let same_host_boot = local_liveness("host-a", "boot-a", std::process::id(), "claimant");
1549 let other_host = local_liveness("host-b", "boot-a", std::process::id(), "claimant");
1550 let other_boot = local_liveness("host-a", "boot-b", std::process::id(), "claimant");
1551
1552 assert!(holder.is_definitely_dead_for_claimant(&same_host_boot));
1553 assert!(!holder.is_definitely_dead_for_claimant(&other_host));
1554 assert!(!holder.is_definitely_dead_for_claimant(&other_boot));
1555 assert!(!holder.is_definitely_dead_for_claimant(&LeaseOwnerLiveness::Opaque));
1556 assert!(!LeaseOwnerLiveness::Opaque.is_definitely_dead_for_claimant(&same_host_boot));
1557 }
1558}