Skip to main content

lash_core/
store.rs

1fn default_root_session_id() -> String {
2    "root".to_string()
3}
4
5#[derive(Debug, thiserror::Error)]
6pub enum StoreError {
7    #[error(
8        "store is already bound to session `{bound_session_id}` and cannot be reused for `{attempted_session_id}`"
9    )]
10    SessionBindingMismatch {
11        bound_session_id: String,
12        attempted_session_id: String,
13    },
14    #[error("store does not support read scope {0:?}")]
15    UnsupportedReadScope(SessionReadScope),
16    #[error("store head revision conflict: expected {expected:?}, actual {actual}")]
17    HeadRevisionConflict { expected: Option<u64>, actual: u64 },
18    #[error("store backend error: {0}")]
19    Backend(String),
20}
21
22#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
23pub struct SessionMeta {
24    pub session_id: String,
25    pub session_name: String,
26    pub created_at: String,
27    pub model: String,
28    pub cwd: Option<String>,
29    pub parent_session_id: Option<String>,
30}
31
32/// Lightweight session info for the resume picker.
33#[derive(Clone, Debug)]
34pub struct SessionPickerInfo {
35    pub session_id: String,
36    pub cwd: Option<String>,
37    pub parent_session_id: Option<String>,
38    pub first_user_message: String,
39    pub user_message_count: usize,
40}
41
42#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
43#[serde(transparent)]
44pub struct BlobRef(pub String);
45
46impl BlobRef {
47    pub fn as_str(&self) -> &str {
48        &self.0
49    }
50}
51
52impl std::fmt::Display for BlobRef {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.write_str(&self.0)
55    }
56}
57
58impl From<String> for BlobRef {
59    fn from(value: String) -> Self {
60        Self(value)
61    }
62}
63
64#[derive(Clone, Debug, Default, PartialEq, Eq)]
65pub struct GcReport {
66    pub root_count: usize,
67    pub retained_blob_count: usize,
68    pub deleted_blob_count: usize,
69}
70
71/// Result of a `RuntimePersistence::vacuum()` call.
72/// `removed_node_count` counts the tombstoned graph-node rows that were
73/// physically deleted from the store. Returned so hosts can emit metrics.
74#[derive(Clone, Debug, Default, PartialEq, Eq)]
75pub struct VacuumReport {
76    pub removed_node_count: usize,
77}
78
79#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
80pub struct SessionCheckpoint {
81    #[serde(default)]
82    pub turn_state: crate::PersistedTurnState,
83    #[serde(default, skip_serializing_if = "Option::is_none")]
84    pub tool_state_ref: Option<BlobRef>,
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub plugin_snapshot_ref: Option<BlobRef>,
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    pub plugin_snapshot_revision: Option<u64>,
89    #[serde(default, skip_serializing_if = "Option::is_none")]
90    pub execution_state_ref: Option<BlobRef>,
91}
92
93#[derive(Clone, Debug, Default)]
94pub struct HydratedSessionCheckpoint {
95    pub turn_state: crate::PersistedTurnState,
96    pub tool_state_ref: Option<BlobRef>,
97    pub tool_state: Option<crate::ToolState>,
98    pub plugin_snapshot_ref: Option<BlobRef>,
99    pub plugin_snapshot: Option<crate::PluginSessionSnapshot>,
100    pub plugin_snapshot_revision: Option<u64>,
101    pub execution_state_ref: Option<BlobRef>,
102    pub execution_state: Option<Vec<u8>>,
103}
104
105#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
106pub struct SessionHead {
107    #[serde(default = "default_root_session_id")]
108    pub session_id: String,
109    #[serde(default)]
110    pub head_revision: u64,
111    pub graph: crate::SessionGraph,
112    pub config: crate::PersistedSessionConfig,
113    #[serde(default, skip_serializing_if = "Option::is_none")]
114    pub checkpoint_ref: Option<BlobRef>,
115    #[serde(default, skip_serializing_if = "Vec::is_empty")]
116    pub token_ledger: Vec<crate::TokenLedgerEntry>,
117}
118
119#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
120pub struct SessionHeadMeta {
121    #[serde(default = "default_root_session_id")]
122    pub session_id: String,
123    #[serde(default)]
124    pub head_revision: u64,
125    pub config: crate::PersistedSessionConfig,
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    pub checkpoint_ref: Option<BlobRef>,
128    #[serde(default, skip_serializing_if = "Option::is_none")]
129    pub leaf_node_id: Option<String>,
130    #[serde(default)]
131    pub graph_node_count: usize,
132    #[serde(default, skip_serializing_if = "Vec::is_empty")]
133    pub token_ledger: Vec<crate::TokenLedgerEntry>,
134}
135
136fn persisted_session_config_from_state(
137    state: &crate::PersistedSessionState,
138) -> crate::PersistedSessionConfig {
139    crate::PersistedSessionConfig {
140        provider_id: state.policy.provider.kind().to_string(),
141        configured_model: state.policy.model.clone(),
142        context_window: state.policy.max_context_tokens.unwrap_or_default() as u64,
143        execution_mode: state.policy.execution_mode.clone(),
144        standard_context_approach: state.policy.standard_context_approach.clone(),
145        model_variant: state.policy.model_variant.clone(),
146    }
147}
148
149#[derive(Clone, Debug, PartialEq, Eq)]
150pub enum SessionReadScope {
151    FullGraph,
152    ActivePath { leaf_node_id: Option<String> },
153}
154
155#[derive(Clone, Debug)]
156pub struct PersistedSessionRead {
157    pub session_id: String,
158    pub head_revision: u64,
159    pub config: crate::PersistedSessionConfig,
160    pub graph: crate::SessionGraph,
161    pub checkpoint_ref: Option<BlobRef>,
162    pub checkpoint: Option<HydratedSessionCheckpoint>,
163    pub token_ledger: Vec<crate::TokenLedgerEntry>,
164}
165
166#[derive(Clone, Debug)]
167pub enum GraphCommitDelta {
168    Unchanged {
169        leaf_node_id: Option<String>,
170    },
171    Append {
172        nodes: Vec<crate::SessionNodeRecord>,
173        leaf_node_id: Option<String>,
174    },
175    ReplaceFull(crate::SessionGraph),
176}
177
178impl GraphCommitDelta {
179    pub fn leaf_node_id(&self) -> Option<&String> {
180        match self {
181            Self::Unchanged { leaf_node_id } | Self::Append { leaf_node_id, .. } => {
182                leaf_node_id.as_ref()
183            }
184            Self::ReplaceFull(graph) => graph.leaf_node_id.as_ref(),
185        }
186    }
187}
188
189#[derive(Clone, Debug)]
190pub struct RuntimeCommit {
191    pub session_id: String,
192    pub expected_head_revision: Option<u64>,
193    pub config: crate::PersistedSessionConfig,
194    pub graph: GraphCommitDelta,
195    pub checkpoint: HydratedSessionCheckpoint,
196    pub usage_deltas: Vec<crate::TokenLedgerEntry>,
197}
198
199#[derive(Clone, Debug)]
200pub struct RuntimeCommitResult {
201    pub head_revision: u64,
202    pub checkpoint_ref: BlobRef,
203    pub manifest: SessionCheckpoint,
204}
205
206fn build_persisted_turn_state(state: &crate::PersistedSessionState) -> crate::PersistedTurnState {
207    crate::PersistedTurnState {
208        turn_index: state.turn_index,
209        token_usage: state.token_usage.clone(),
210        last_prompt_usage: state.last_prompt_usage.clone(),
211        mode_turn_options: state.mode_turn_options.clone(),
212    }
213}
214
215fn build_checkpoint_from_persisted_state(
216    state: &crate::PersistedSessionState,
217) -> HydratedSessionCheckpoint {
218    HydratedSessionCheckpoint {
219        turn_state: build_persisted_turn_state(state),
220        tool_state_ref: state.tool_state_ref.clone(),
221        tool_state: state.tool_state_snapshot.clone(),
222        plugin_snapshot_ref: state.plugin_snapshot_ref.clone(),
223        plugin_snapshot_revision: state.plugin_snapshot_revision,
224        plugin_snapshot: state.plugin_snapshot.clone(),
225        execution_state_ref: state.execution_state_ref.clone(),
226        execution_state: state.execution_state_snapshot.clone(),
227    }
228}
229
230impl RuntimeCommit {
231    pub fn persisted_state(
232        state: &crate::PersistedSessionState,
233        usage_deltas: &[crate::TokenLedgerEntry],
234    ) -> Self {
235        Self {
236            session_id: state.session_id.clone(),
237            expected_head_revision: state.head_revision,
238            config: persisted_session_config_from_state(state),
239            graph: if state.graph_replace_required || state.head_revision.is_none() {
240                GraphCommitDelta::ReplaceFull(state.session_graph.clone())
241            } else {
242                GraphCommitDelta::Unchanged {
243                    leaf_node_id: state.session_graph.leaf_node_id.clone(),
244                }
245            },
246            checkpoint: build_checkpoint_from_persisted_state(state),
247            usage_deltas: usage_deltas.to_vec(),
248        }
249    }
250
251    pub(crate) fn persisted_state_with_graph_commit(
252        state: &crate::PersistedSessionState,
253        graph: GraphCommitDelta,
254        usage_deltas: &[crate::TokenLedgerEntry],
255    ) -> Self {
256        Self {
257            session_id: state.session_id.clone(),
258            expected_head_revision: state.head_revision,
259            config: persisted_session_config_from_state(state),
260            graph,
261            checkpoint: build_checkpoint_from_persisted_state(state),
262            usage_deltas: usage_deltas.to_vec(),
263        }
264    }
265}
266
267fn persisted_session_state_from_head(
268    head: SessionHead,
269    checkpoint: Option<HydratedSessionCheckpoint>,
270) -> crate::PersistedSessionState {
271    let mut state = crate::PersistedSessionState {
272        session_id: head.session_id,
273        policy: crate::SessionPolicy::default(),
274        session_graph: head.graph,
275        turn_index: 0,
276        token_usage: crate::TokenUsage::default(),
277        last_prompt_usage: None,
278        mode_turn_options: crate::ModeTurnOptions::default(),
279        tool_state_ref: None,
280        tool_state_generation: None,
281        tool_state_snapshot: None,
282        plugin_snapshot_ref: None,
283        plugin_snapshot_revision: None,
284        plugin_snapshot: None,
285        execution_state_ref: None,
286        execution_state_snapshot: None,
287        token_ledger: head.token_ledger,
288        checkpoint_ref: head.checkpoint_ref.clone(),
289        head_revision: Some(head.head_revision),
290        graph_replace_required: false,
291    };
292    state.policy.model = head.config.configured_model.clone();
293    if head.config.context_window > 0 {
294        state.policy.max_context_tokens = Some(head.config.context_window as usize);
295    }
296    state.policy.execution_mode = head.config.execution_mode;
297    state.policy.standard_context_approach = head.config.standard_context_approach.clone();
298    state.policy.model_variant = head.config.model_variant.clone();
299    if let Some(checkpoint) = checkpoint {
300        state.turn_index = checkpoint.turn_state.turn_index;
301        state.token_usage = checkpoint.turn_state.token_usage;
302        state.last_prompt_usage = checkpoint.turn_state.last_prompt_usage;
303        state.mode_turn_options = checkpoint.turn_state.mode_turn_options;
304        state.tool_state_ref = checkpoint.tool_state_ref.clone();
305        state.tool_state_generation = checkpoint
306            .tool_state
307            .as_ref()
308            .map(|snapshot| snapshot.generation());
309        state.tool_state_snapshot = checkpoint.tool_state;
310        state.plugin_snapshot_ref = checkpoint.plugin_snapshot_ref.clone();
311        state.plugin_snapshot_revision = checkpoint.plugin_snapshot_revision;
312        state.plugin_snapshot = checkpoint.plugin_snapshot;
313        state.execution_state_ref = checkpoint.execution_state_ref.clone();
314        state.execution_state_snapshot = checkpoint.execution_state;
315    }
316    state
317}
318
319impl Default for SessionHead {
320    fn default() -> Self {
321        Self {
322            session_id: default_root_session_id(),
323            head_revision: 0,
324            graph: crate::SessionGraph::default(),
325            config: crate::PersistedSessionConfig::default(),
326            checkpoint_ref: None,
327            token_ledger: Vec::new(),
328        }
329    }
330}
331
332impl Default for SessionHeadMeta {
333    fn default() -> Self {
334        Self {
335            session_id: default_root_session_id(),
336            head_revision: 0,
337            config: crate::PersistedSessionConfig::default(),
338            checkpoint_ref: None,
339            leaf_node_id: None,
340            graph_node_count: 0,
341            token_ledger: Vec::new(),
342        }
343    }
344}
345
346/// Exact persistence protocol required by the runtime.
347#[async_trait::async_trait]
348pub trait RuntimePersistence: Send + Sync {
349    async fn load_session(
350        &self,
351        scope: SessionReadScope,
352    ) -> Result<Option<PersistedSessionRead>, StoreError>;
353
354    async fn load_node(
355        &self,
356        node_id: &str,
357    ) -> Result<Option<crate::SessionNodeRecord>, StoreError>;
358
359    async fn commit_runtime_state(
360        &self,
361        commit: RuntimeCommit,
362    ) -> Result<RuntimeCommitResult, StoreError>;
363
364    async fn save_session_meta(&self, meta: SessionMeta) -> Result<(), StoreError>;
365    async fn load_session_meta(&self) -> Result<Option<SessionMeta>, StoreError>;
366
367    async fn tombstone_nodes(&self, ids: &[String]) -> Result<(), StoreError>;
368    async fn vacuum(&self) -> Result<VacuumReport, StoreError>;
369    async fn gc_unreachable(&self) -> Result<GcReport, StoreError>;
370}
371
372fn persisted_session_state_from_read(read: PersistedSessionRead) -> crate::PersistedSessionState {
373    persisted_session_state_from_head(
374        SessionHead {
375            session_id: read.session_id,
376            head_revision: read.head_revision,
377            graph: read.graph,
378            config: read.config,
379            checkpoint_ref: read.checkpoint_ref,
380            token_ledger: read.token_ledger,
381        },
382        read.checkpoint,
383    )
384}
385
386pub async fn load_persisted_session_state(
387    store: &(dyn RuntimePersistence + '_),
388) -> Result<Option<crate::PersistedSessionState>, StoreError> {
389    Ok(store
390        .load_session(SessionReadScope::FullGraph)
391        .await?
392        .map(persisted_session_state_from_read))
393}
394
395pub async fn load_persisted_session_state_active_path(
396    store: &(dyn RuntimePersistence + '_),
397    leaf_node_id: Option<String>,
398) -> Result<Option<crate::PersistedSessionState>, StoreError> {
399    Ok(store
400        .load_session(SessionReadScope::ActivePath { leaf_node_id })
401        .await?
402        .map(persisted_session_state_from_read))
403}
404
405pub async fn refresh_persisted_session_state(
406    store: &(dyn RuntimePersistence + '_),
407    state: &mut crate::PersistedSessionState,
408) -> Result<(), StoreError> {
409    if let Some(mut fresh) = load_persisted_session_state(store).await? {
410        // The store owns persisted graph/checkpoint/config state, but not
411        // live provider credentials or other runtime-only policy fields.
412        fresh.policy.provider = state.policy.provider.clone();
413        fresh.policy.session_id = state.policy.session_id.clone();
414        fresh.policy.max_turns = state.policy.max_turns;
415        *state = fresh;
416    }
417    Ok(())
418}