Skip to main content

lash_sqlite_store/
graph.rs

1//! Session-graph persistence and garbage collection on [`Store`].
2//!
3//! Ported from the prior store. The public async surface
4//! (`replace_session_graph`, `append_session_graph_nodes`, `load_session_graph`,
5//! `gc_unreachable`) keeps the exact the prior store signatures. The shared
6//! `*_from_conn` helpers are **synchronous** and take a `&rusqlite::Connection`
7//! so `lifecycle::load_picker_info` (and any future caller already on the
8//! connection thread) can reuse them inside a `conn.call` closure — this is the
9//! load-bearing change from the prior store, which had them `async`.
10//!
11//! Read paths go through `self.conn.call(...)`; the graph-mutating and GC paths
12//! go through `self.conn.write(...)` so `BEGIN IMMEDIATE` takes the write lock
13//! up front, replacing the prior store `BEGIN IMMEDIATE` / `COMMIT` / `ROLLBACK`
14//! ceremony.
15
16use super::*;
17
18impl Store {
19    pub(crate) fn load_session_graph_from_conn(
20        conn: &Connection,
21        leaf_node_id: Option<String>,
22    ) -> lash_core::SessionGraph {
23        // Tombstoned rows are physically still present until `vacuum()` is
24        // called; the runtime view should never see them.
25        let mut stmt = match conn
26            .prepare("SELECT node_json FROM graph_nodes WHERE tombstoned = 0 ORDER BY seq ASC")
27        {
28            Ok(stmt) => stmt,
29            Err(err) => {
30                tracing::warn!(error = %err, "failed to prepare graph load statement");
31                return lash_core::SessionGraph::from_nodes(Vec::new(), leaf_node_id);
32            }
33        };
34        let rows = match stmt.query_map([], |row| row.get::<_, String>(0)) {
35            Ok(rows) => rows,
36            Err(err) => {
37                tracing::warn!(error = %err, "failed to query graph rows");
38                return lash_core::SessionGraph::from_nodes(Vec::new(), leaf_node_id);
39            }
40        };
41        let nodes = rows
42            .filter_map(Result::ok)
43            .filter_map(|node_json| {
44                serde_json::from_str::<lash_core::SessionNodeRecord>(&node_json).ok()
45            })
46            .collect();
47        lash_core::SessionGraph::from_nodes(nodes, leaf_node_id)
48    }
49
50    pub(crate) fn load_active_path_session_graph_from_conn(
51        conn: &Connection,
52        leaf_node_id: Option<String>,
53    ) -> rusqlite::Result<lash_core::SessionGraph> {
54        let Some(leaf_node_id) = leaf_node_id else {
55            return Ok(lash_core::SessionGraph::default());
56        };
57        let mut stmt = conn.prepare(
58            "WITH RECURSIVE active(node_id, node_json, parent_node_id, depth) AS (
59                SELECT
60                    node_id,
61                    node_json,
62                    json_extract(node_json, '$.parent_node_id'),
63                    0
64                FROM graph_nodes
65                WHERE node_id = ?1 AND tombstoned = 0
66              UNION ALL
67                SELECT
68                    g.node_id,
69                    g.node_json,
70                    json_extract(g.node_json, '$.parent_node_id'),
71                    active.depth + 1
72                FROM graph_nodes g
73                JOIN active ON g.node_id = active.parent_node_id
74                WHERE g.tombstoned = 0
75            )
76            SELECT node_json FROM active ORDER BY depth DESC",
77        )?;
78        let rows = stmt.query_map(params![leaf_node_id.as_str()], |row| {
79            row.get::<_, String>(0)
80        })?;
81        let mut nodes = Vec::new();
82        for row in rows {
83            let node_json = row?;
84            if let Ok(node) = serde_json::from_str::<lash_core::SessionNodeRecord>(&node_json) {
85                nodes.push(node);
86            }
87        }
88        Ok(lash_core::SessionGraph::from_nodes(
89            nodes,
90            Some(leaf_node_id),
91        ))
92    }
93
94    pub(crate) async fn maybe_auto_gc(&self) {
95        let Some(interval) = self.options.gc_policy.auto_run_every_commits else {
96            return;
97        };
98        let commits = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed) + 1;
99        if interval != 0 && commits % interval == 0 {
100            let _ = self.gc_unreachable().await;
101        }
102    }
103
104    pub async fn replace_session_graph(&self, graph: &lash_core::SessionGraph) {
105        let nodes = graph.nodes.clone();
106        let result = self
107            .conn
108            .write(move |tx| {
109                tx.execute("DELETE FROM graph_nodes", [])?;
110                let mut stmt =
111                    tx.prepare("INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)")?;
112                for node in &nodes {
113                    let node_json = encode_json(node);
114                    stmt.execute(params![node.node_id, node_json])?;
115                }
116                Ok(())
117            })
118            .await;
119        if let Err(err) = result {
120            tracing::warn!(error = %err, "failed to replace session graph");
121        }
122    }
123
124    pub async fn append_session_graph_nodes(&self, nodes: &[lash_core::SessionNodeRecord]) {
125        if nodes.is_empty() {
126            return;
127        }
128        let nodes = nodes.to_vec();
129        let result = self
130            .conn
131            .write(move |tx| {
132                let mut stmt =
133                    tx.prepare("INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)")?;
134                for node in &nodes {
135                    let node_json = encode_json(node);
136                    stmt.execute(params![node.node_id, node_json])?;
137                }
138                Ok(())
139            })
140            .await;
141        if let Err(err) = result {
142            tracing::warn!(error = %err, "failed to append session graph nodes");
143        }
144    }
145
146    pub async fn load_session_graph(&self) -> lash_core::SessionGraph {
147        self.conn
148            .call(|conn| Ok(Self::load_session_graph_from_conn(conn, None)))
149            .await
150            .unwrap_or_else(|_| lash_core::SessionGraph::from_nodes(Vec::new(), None))
151    }
152
153    pub async fn gc_unreachable(&self) -> GcReport {
154        match self.try_gc_unreachable().await {
155            Ok(report) => report,
156            Err(err) => {
157                // GC is best-effort space reclamation. A backend failure must
158                // never panic inside the commit and brick the store; log and
159                // leave every blob in place (the conservative outcome).
160                tracing::warn!(error = %err, "gc_unreachable failed; retaining all blobs");
161                GcReport::default()
162            }
163        }
164    }
165
166    /// Collect the checkpoint-manifest roots that must survive GC.
167    ///
168    /// The session head's current `checkpoint_ref` is the live checkpoint; its
169    /// manifest blob (and, transitively, the tool/plugin/execution snapshot
170    /// blobs it references) is reachable and must be kept. Synchronous: runs
171    /// inside the GC `conn.write` closure on the connection thread.
172    fn live_checkpoint_roots(conn: &Connection) -> Result<Vec<RetainedArtifactRef>, StoreError> {
173        let mut roots = Vec::new();
174        if let Some(checkpoint_ref) = load_session_head_meta_from_conn(conn)
175            .as_ref()
176            .and_then(|meta| meta.checkpoint_ref.as_ref())
177            .cloned()
178        {
179            roots.push(RetainedArtifactRef {
180                blob_ref: checkpoint_ref,
181                kind: PersistedArtifactKind::CheckpointManifest,
182            });
183        }
184        Ok(roots)
185    }
186
187    async fn try_gc_unreachable(&self) -> Result<GcReport, StoreError> {
188        self.conn
189            .write(|tx| {
190                Self::gc_unreachable_in_tx(tx).map_err(|err| {
191                    rusqlite::Error::ToSqlConversionFailure(Box::new(std::io::Error::other(
192                        err.to_string(),
193                    )))
194                })
195            })
196            .await
197            .map_err(sqlite_error)
198    }
199
200    /// Synchronous body of [`try_gc_unreachable`], run on the connection thread
201    /// inside the `BEGIN IMMEDIATE` transaction so the mark/sweep is atomic and
202    /// holds the write lock for its duration.
203    fn gc_unreachable_in_tx(tx: &Transaction<'_>) -> Result<GcReport, StoreError> {
204        let mut roots = Self::live_checkpoint_roots(tx)?;
205        {
206            let mut stmt = tx
207                .prepare("SELECT blob_ref FROM artifact_refs ORDER BY artifact_ref")
208                .map_err(sqlite_error)?;
209            let rows = stmt
210                .query_map([], |row| row.get::<_, String>(0))
211                .map_err(sqlite_error)?;
212            for row in rows {
213                roots.push(RetainedArtifactRef {
214                    blob_ref: BlobRef(row.map_err(sqlite_error)?),
215                    kind: PersistedArtifactKind::LashlangModule,
216                });
217            }
218        }
219        let root_count = roots.len();
220        let mut retained = std::collections::BTreeMap::<String, PersistedArtifactKind>::new();
221        let mut stack = roots;
222        while let Some(current) = stack.pop() {
223            if retained
224                .insert(current.blob_ref.0.clone(), current.kind)
225                .is_some()
226            {
227                continue;
228            }
229            if current.kind != PersistedArtifactKind::CheckpointManifest {
230                continue;
231            }
232            // A rooted checkpoint manifest is *live*. If we cannot read or
233            // decode it we must not silently drop the child blobs it points at
234            // (tool/plugin/execution snapshots) — doing so would delete blobs
235            // that belong to a live checkpoint. Skip a manifest that simply
236            // isn't present (it may have been collected on a prior run), but
237            // treat a present-yet-undecodable manifest as a hard error so GC
238            // aborts rather than deleting live data.
239            let bytes: Option<Vec<u8>> = tx
240                .query_row(
241                    "SELECT content FROM blobs WHERE hash = ?1",
242                    params![current.blob_ref.as_str()],
243                    |row| row.get::<_, Vec<u8>>(0),
244                )
245                .optional()
246                .map_err(sqlite_error)?;
247            let Some(bytes) = bytes else {
248                continue;
249            };
250            let content = decode_artifact_blob(&bytes).unwrap_or(bytes);
251            let checkpoint = decode_checkpoint(&content).ok_or_else(|| {
252                StoreError::Backend(format!(
253                    "gc: live checkpoint manifest `{}` could not be decoded",
254                    current.blob_ref
255                ))
256            })?;
257            stack.extend(retained_artifact_refs(&checkpoint));
258        }
259        let all_hashes = {
260            let mut stmt = tx
261                .prepare("SELECT hash FROM blobs ORDER BY hash ASC")
262                .map_err(sqlite_error)?;
263            let rows = stmt
264                .query_map([], |row| row.get::<_, String>(0))
265                .map_err(sqlite_error)?;
266            rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
267        };
268        let mut deleted_blob_count = 0usize;
269        for hash in &all_hashes {
270            if retained.contains_key(hash) {
271                continue;
272            }
273            tx.execute("DELETE FROM blobs WHERE hash = ?1", params![hash])
274                .map_err(sqlite_error)?;
275            deleted_blob_count += 1;
276        }
277        Ok(GcReport {
278            root_count,
279            retained_blob_count: retained.len(),
280            deleted_blob_count,
281        })
282    }
283}