1fn default_root_session_id() -> String {
2 "root".to_string()
3}
4
5#[derive(Debug, thiserror::Error)]
6pub enum StoreError {
7 #[error(
8 "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
9 )]
10 SessionBindingMismatch {
11 bound_session_id: String,
12 attempted_session_id: String,
13 },
14 #[error("store does not support read scope {0:?}")]
15 UnsupportedReadScope(SessionReadScope),
16 #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
17 HeadRevisionConflict { expected: Option<u64>, actual: u64 },
18 #[error("store backend error: {0}")]
19 Backend(String),
20}
21
22#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
23pub struct SessionMeta {
24 pub session_id: String,
25 pub session_name: String,
26 pub created_at: String,
27 pub model: String,
28 pub cwd: Option<String>,
29 pub parent_session_id: Option<String>,
30}
31
32#[derive(Clone, Debug)]
34pub struct SessionPickerInfo {
35 pub session_id: String,
36 pub cwd: Option<String>,
37 pub parent_session_id: Option<String>,
38 pub first_user_message: String,
39 pub user_message_count: usize,
40}
41
42#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
43#[serde(transparent)]
44pub struct BlobRef(pub String);
45
46impl BlobRef {
47 pub fn as_str(&self) -> &str {
48 &self.0
49 }
50}
51
52impl std::fmt::Display for BlobRef {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.write_str(&self.0)
55 }
56}
57
58impl From<String> for BlobRef {
59 fn from(value: String) -> Self {
60 Self(value)
61 }
62}
63
64#[derive(Clone, Debug, Default, PartialEq, Eq)]
65pub struct GcReport {
66 pub root_count: usize,
67 pub retained_blob_count: usize,
68 pub deleted_blob_count: usize,
69}
70
71#[derive(Clone, Debug, Default, PartialEq, Eq)]
75pub struct VacuumReport {
76 pub removed_node_count: usize,
77}
78
79#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
80pub struct SessionCheckpoint {
81 #[serde(default)]
82 pub turn_state: crate::PersistedTurnState,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
84 pub tool_state_ref: Option<BlobRef>,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub plugin_snapshot_ref: Option<BlobRef>,
87 #[serde(default, skip_serializing_if = "Option::is_none")]
88 pub plugin_snapshot_revision: Option<u64>,
89 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub execution_state_ref: Option<BlobRef>,
91}
92
93#[derive(Clone, Debug, Default)]
94pub struct HydratedSessionCheckpoint {
95 pub turn_state: crate::PersistedTurnState,
96 pub tool_state_ref: Option<BlobRef>,
97 pub tool_state: Option<crate::ToolState>,
98 pub plugin_snapshot_ref: Option<BlobRef>,
99 pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
100 pub plugin_snapshot_revision: Option<u64>,
101 pub execution_state_ref: Option<BlobRef>,
102 pub execution_state: Option<Vec<u8>>,
103}
104
105#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
106pub struct SessionHead {
107 #[serde(default = "default_root_session_id")]
108 pub session_id: String,
109 #[serde(default)]
110 pub head_revision: u64,
111 pub graph: crate::SessionGraph,
112 pub config: crate::PersistedSessionConfig,
113 #[serde(default, skip_serializing_if = "Option::is_none")]
114 pub checkpoint_ref: Option<BlobRef>,
115 #[serde(default, skip_serializing_if = "Vec::is_empty")]
116 pub token_ledger: Vec<crate::TokenLedgerEntry>,
117}
118
119#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
120pub struct SessionHeadMeta {
121 #[serde(default = "default_root_session_id")]
122 pub session_id: String,
123 #[serde(default)]
124 pub head_revision: u64,
125 pub config: crate::PersistedSessionConfig,
126 #[serde(default, skip_serializing_if = "Option::is_none")]
127 pub checkpoint_ref: Option<BlobRef>,
128 #[serde(default, skip_serializing_if = "Option::is_none")]
129 pub leaf_node_id: Option<String>,
130 #[serde(default)]
131 pub graph_node_count: usize,
132 #[serde(default, skip_serializing_if = "Vec::is_empty")]
133 pub token_ledger: Vec<crate::TokenLedgerEntry>,
134}
135
136fn persisted_session_config_from_state(
137 state: &crate::PersistedSessionState,
138) -> crate::PersistedSessionConfig {
139 crate::PersistedSessionConfig {
140 provider_id: state.policy.provider.kind().to_string(),
141 configured_model: state.policy.model.clone(),
142 context_window: state.policy.max_context_tokens.unwrap_or_default() as u64,
143 execution_mode: state.policy.execution_mode.clone(),
144 standard_context_approach: state.policy.standard_context_approach.clone(),
145 model_variant: state.policy.model_variant.clone(),
146 }
147}
148
149#[derive(Clone, Debug, PartialEq, Eq)]
150pub enum SessionReadScope {
151 FullGraph,
152 ActivePath { leaf_node_id: Option<String> },
153}
154
155#[derive(Clone, Debug)]
156pub struct PersistedSessionRead {
157 pub session_id: String,
158 pub head_revision: u64,
159 pub config: crate::PersistedSessionConfig,
160 pub graph: crate::SessionGraph,
161 pub checkpoint_ref: Option<BlobRef>,
162 pub checkpoint: Option<HydratedSessionCheckpoint>,
163 pub token_ledger: Vec<crate::TokenLedgerEntry>,
164}
165
166#[derive(Clone, Debug)]
167pub enum GraphCommitDelta {
168 Unchanged {
169 leaf_node_id: Option<String>,
170 },
171 Append {
172 nodes: Vec<crate::SessionNodeRecord>,
173 leaf_node_id: Option<String>,
174 },
175 ReplaceFull(crate::SessionGraph),
176}
177
178impl GraphCommitDelta {
179 pub fn leaf_node_id(&self) -> Option<&String> {
180 match self {
181 Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
182 leaf_node_id.as_ref()
183 }
184 Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
185 }
186 }
187}
188
189#[derive(Clone, Debug)]
190pub struct RuntimeCommit {
191 pub session_id: String,
192 pub expected_head_revision: Option<u64>,
193 pub config: crate::PersistedSessionConfig,
194 pub graph: GraphCommitDelta,
195 pub checkpoint: HydratedSessionCheckpoint,
196 pub usage_deltas: Vec<crate::TokenLedgerEntry>,
197}
198
199#[derive(Clone, Debug)]
200pub struct RuntimeCommitResult {
201 pub head_revision: u64,
202 pub checkpoint_ref: BlobRef,
203 pub manifest: SessionCheckpoint,
204}
205
206fn build_persisted_turn_state(state: &crate::PersistedSessionState) -> crate::PersistedTurnState {
207 crate::PersistedTurnState {
208 turn_index: state.turn_index,
209 token_usage: state.token_usage.clone(),
210 last_prompt_usage: state.last_prompt_usage.clone(),
211 mode_turn_options: state.mode_turn_options.clone(),
212 }
213}
214
215fn build_checkpoint_from_persisted_state(
216 state: &crate::PersistedSessionState,
217) -> HydratedSessionCheckpoint {
218 HydratedSessionCheckpoint {
219 turn_state: build_persisted_turn_state(state),
220 tool_state_ref: state.tool_state_ref.clone(),
221 tool_state: state.tool_state_snapshot.clone(),
222 plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
223 plugin_snapshot_revision: state.plugin_snapshot_revision,
224 plugin_snapshot: state.plugin_snapshot.clone(),
225 execution_state_ref: state.execution_state_ref.clone(),
226 execution_state: state.execution_state_snapshot.clone(),
227 }
228}
229
230impl RuntimeCommit {
231 pub fn persisted_state(
232 state: &crate::PersistedSessionState,
233 usage_deltas: &[crate::TokenLedgerEntry],
234 ) -> Self {
235 Self {
236 session_id: state.session_id.clone(),
237 expected_head_revision: state.head_revision,
238 config: persisted_session_config_from_state(state),
239 graph: if state.graph_replace_required || state.head_revision.is_none() {
240 GraphCommitDelta::ReplaceFull(state.session_graph.clone())
241 } else {
242 GraphCommitDelta::Unchanged {
243 leaf_node_id: state.session_graph.leaf_node_id.clone(),
244 }
245 },
246 checkpoint: build_checkpoint_from_persisted_state(state),
247 usage_deltas: usage_deltas.to_vec(),
248 }
249 }
250
251 pub(crate) fn persisted_state_with_graph_commit(
252 state: &crate::PersistedSessionState,
253 graph: GraphCommitDelta,
254 usage_deltas: &[crate::TokenLedgerEntry],
255 ) -> Self {
256 Self {
257 session_id: state.session_id.clone(),
258 expected_head_revision: state.head_revision,
259 config: persisted_session_config_from_state(state),
260 graph,
261 checkpoint: build_checkpoint_from_persisted_state(state),
262 usage_deltas: usage_deltas.to_vec(),
263 }
264 }
265}
266
267fn persisted_session_state_from_head(
268 head: SessionHead,
269 checkpoint: Option<HydratedSessionCheckpoint>,
270) -> crate::PersistedSessionState {
271 let mut state = crate::PersistedSessionState {
272 session_id: head.session_id,
273 policy: crate::SessionPolicy::default(),
274 session_graph: head.graph,
275 turn_index: 0,
276 token_usage: crate::TokenUsage::default(),
277 last_prompt_usage: None,
278 mode_turn_options: crate::ModeTurnOptions::default(),
279 tool_state_ref: None,
280 tool_state_generation: None,
281 tool_state_snapshot: None,
282 plugin_snapshot_ref: None,
283 plugin_snapshot_revision: None,
284 plugin_snapshot: None,
285 execution_state_ref: None,
286 execution_state_snapshot: None,
287 token_ledger: head.token_ledger,
288 checkpoint_ref: head.checkpoint_ref.clone(),
289 head_revision: Some(head.head_revision),
290 graph_replace_required: false,
291 };
292 state.policy.model = head.config.configured_model.clone();
293 if head.config.context_window > 0 {
294 state.policy.max_context_tokens = Some(head.config.context_window as usize);
295 }
296 state.policy.execution_mode = head.config.execution_mode;
297 state.policy.standard_context_approach = head.config.standard_context_approach.clone();
298 state.policy.model_variant = head.config.model_variant.clone();
299 if let Some(checkpoint) = checkpoint {
300 state.turn_index = checkpoint.turn_state.turn_index;
301 state.token_usage = checkpoint.turn_state.token_usage;
302 state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
303 state.mode_turn_options = checkpoint.turn_state.mode_turn_options;
304 state.tool_state_ref = checkpoint.tool_state_ref.clone();
305 state.tool_state_generation = checkpoint
306 .tool_state
307 .as_ref()
308 .map(|snapshot| snapshot.generation());
309 state.tool_state_snapshot = checkpoint.tool_state;
310 state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
311 state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
312 state.plugin_snapshot = checkpoint.plugin_snapshot;
313 state.execution_state_ref = checkpoint.execution_state_ref.clone();
314 state.execution_state_snapshot = checkpoint.execution_state;
315 }
316 state
317}
318
319impl Default for SessionHead {
320 fn default() -> Self {
321 Self {
322 session_id: default_root_session_id(),
323 head_revision: 0,
324 graph: crate::SessionGraph::default(),
325 config: crate::PersistedSessionConfig::default(),
326 checkpoint_ref: None,
327 token_ledger: Vec::new(),
328 }
329 }
330}
331
332impl Default for SessionHeadMeta {
333 fn default() -> Self {
334 Self {
335 session_id: default_root_session_id(),
336 head_revision: 0,
337 config: crate::PersistedSessionConfig::default(),
338 checkpoint_ref: None,
339 leaf_node_id: None,
340 graph_node_count: 0,
341 token_ledger: Vec::new(),
342 }
343 }
344}
345
346#[async_trait::async_trait]
348pub trait RuntimePersistence: Send + Sync {
349 async fn load_session(
350 &self,
351 scope: SessionReadScope,
352 ) -> Result<Option<PersistedSessionRead>, StoreError>;
353
354 async fn load_node(
355 &self,
356 node_id: &str,
357 ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
358
359 async fn commit_runtime_state(
360 &self,
361 commit: RuntimeCommit,
362 ) -> Result<RuntimeCommitResult, StoreError>;
363
364 async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
365 async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
366
367 async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
368 async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
369 async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
370}
371
372fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::PersistedSessionState {
373 persisted_session_state_from_head(
374 SessionHead {
375 session_id: read.session_id,
376 head_revision: read.head_revision,
377 graph: read.graph,
378 config: read.config,
379 checkpoint_ref: read.checkpoint_ref,
380 token_ledger: read.token_ledger,
381 },
382 read.checkpoint,
383 )
384}
385
386pub async fn load_persisted_session_state(
387 store: &(dyn RuntimePersistence + '_),
388) -> Result<Option<crate::PersistedSessionState>, StoreError> {
389 Ok(store
390 .load_session(SessionReadScope::FullGraph)
391 .await?
392 .map(persisted_session_state_from_read))
393}
394
395pub async fn load_persisted_session_state_active_path(
396 store: &(dyn RuntimePersistence + '_),
397 leaf_node_id: Option<String>,
398) -> Result<Option<crate::PersistedSessionState>, StoreError> {
399 Ok(store
400 .load_session(SessionReadScope::ActivePath { leaf_node_id })
401 .await?
402 .map(persisted_session_state_from_read))
403}
404
405pub async fn refresh_persisted_session_state(
406 store: &(dyn RuntimePersistence + '_),
407 state: &mut crate::PersistedSessionState,
408) -> Result<(), StoreError> {
409 if let Some(mut fresh) = load_persisted_session_state(store).await? {
410 fresh.policy.provider = state.policy.provider.clone();
413 fresh.policy.session_id = state.policy.session_id.clone();
414 fresh.policy.max_turns = state.policy.max_turns;
415 *state = fresh;
416 }
417 Ok(())
418}