lash_sqlite_store/
graph.rs1use super::*;
17
18impl Store {
19 pub(crate) fn load_session_graph_from_conn(
20 conn: &Connection,
21 leaf_node_id: Option<String>,
22 ) -> lash_core::SessionGraph {
23 let mut stmt = match conn
26 .prepare("SELECT node_json FROM graph_nodes WHERE tombstoned = 0 ORDER BY seq ASC")
27 {
28 Ok(stmt) => stmt,
29 Err(err) => {
30 tracing::warn!(error = %err, "failed to prepare graph load statement");
31 return lash_core::SessionGraph::from_nodes(Vec::new(), leaf_node_id);
32 }
33 };
34 let rows = match stmt.query_map([], |row| row.get::<_, String>(0)) {
35 Ok(rows) => rows,
36 Err(err) => {
37 tracing::warn!(error = %err, "failed to query graph rows");
38 return lash_core::SessionGraph::from_nodes(Vec::new(), leaf_node_id);
39 }
40 };
41 let nodes = rows
42 .filter_map(Result::ok)
43 .filter_map(|node_json| {
44 serde_json::from_str::<lash_core::SessionNodeRecord>(&node_json).ok()
45 })
46 .collect();
47 lash_core::SessionGraph::from_nodes(nodes, leaf_node_id)
48 }
49
50 pub(crate) fn load_active_path_session_graph_from_conn(
51 conn: &Connection,
52 leaf_node_id: Option<String>,
53 ) -> rusqlite::Result<lash_core::SessionGraph> {
54 let Some(leaf_node_id) = leaf_node_id else {
55 return Ok(lash_core::SessionGraph::default());
56 };
57 let mut stmt = conn.prepare(
58 "WITH RECURSIVE active(node_id, node_json, parent_node_id, depth) AS (
59 SELECT
60 node_id,
61 node_json,
62 json_extract(node_json, '$.parent_node_id'),
63 0
64 FROM graph_nodes
65 WHERE node_id = ?1 AND tombstoned = 0
66 UNION ALL
67 SELECT
68 g.node_id,
69 g.node_json,
70 json_extract(g.node_json, '$.parent_node_id'),
71 active.depth + 1
72 FROM graph_nodes g
73 JOIN active ON g.node_id = active.parent_node_id
74 WHERE g.tombstoned = 0
75 )
76 SELECT node_json FROM active ORDER BY depth DESC",
77 )?;
78 let rows = stmt.query_map(params![leaf_node_id.as_str()], |row| {
79 row.get::<_, String>(0)
80 })?;
81 let mut nodes = Vec::new();
82 for row in rows {
83 let node_json = row?;
84 if let Ok(node) = serde_json::from_str::<lash_core::SessionNodeRecord>(&node_json) {
85 nodes.push(node);
86 }
87 }
88 Ok(lash_core::SessionGraph::from_nodes(
89 nodes,
90 Some(leaf_node_id),
91 ))
92 }
93
94 pub(crate) async fn maybe_auto_gc(&self) {
95 let Some(interval) = self.options.gc_policy.auto_run_every_commits else {
96 return;
97 };
98 let commits = self.commit_count.fetch_add(1, AtomicOrdering::Relaxed) + 1;
99 if interval != 0 && commits % interval == 0 {
100 let _ = self.gc_unreachable().await;
101 }
102 }
103
104 pub async fn replace_session_graph(&self, graph: &lash_core::SessionGraph) {
105 let nodes = graph.nodes.clone();
106 let result = self
107 .conn
108 .write(move |tx| {
109 tx.execute("DELETE FROM graph_nodes", [])?;
110 let mut stmt =
111 tx.prepare("INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)")?;
112 for node in &nodes {
113 let node_json = encode_json(node);
114 stmt.execute(params![node.node_id, node_json])?;
115 }
116 Ok(())
117 })
118 .await;
119 if let Err(err) = result {
120 tracing::warn!(error = %err, "failed to replace session graph");
121 }
122 }
123
124 pub async fn append_session_graph_nodes(&self, nodes: &[lash_core::SessionNodeRecord]) {
125 if nodes.is_empty() {
126 return;
127 }
128 let nodes = nodes.to_vec();
129 let result = self
130 .conn
131 .write(move |tx| {
132 let mut stmt =
133 tx.prepare("INSERT INTO graph_nodes (node_id, node_json) VALUES (?1, ?2)")?;
134 for node in &nodes {
135 let node_json = encode_json(node);
136 stmt.execute(params![node.node_id, node_json])?;
137 }
138 Ok(())
139 })
140 .await;
141 if let Err(err) = result {
142 tracing::warn!(error = %err, "failed to append session graph nodes");
143 }
144 }
145
146 pub async fn load_session_graph(&self) -> lash_core::SessionGraph {
147 self.conn
148 .call(|conn| Ok(Self::load_session_graph_from_conn(conn, None)))
149 .await
150 .unwrap_or_else(|_| lash_core::SessionGraph::from_nodes(Vec::new(), None))
151 }
152
153 pub async fn gc_unreachable(&self) -> GcReport {
154 match self.try_gc_unreachable().await {
155 Ok(report) => report,
156 Err(err) => {
157 tracing::warn!(error = %err, "gc_unreachable failed; retaining all blobs");
161 GcReport::default()
162 }
163 }
164 }
165
166 fn live_checkpoint_roots(conn: &Connection) -> Result<Vec<RetainedArtifactRef>, StoreError> {
173 let mut roots = Vec::new();
174 if let Some(checkpoint_ref) = load_session_head_meta_from_conn(conn)
175 .as_ref()
176 .and_then(|meta| meta.checkpoint_ref.as_ref())
177 .cloned()
178 {
179 roots.push(RetainedArtifactRef {
180 blob_ref: checkpoint_ref,
181 kind: PersistedArtifactKind::CheckpointManifest,
182 });
183 }
184 Ok(roots)
185 }
186
187 async fn try_gc_unreachable(&self) -> Result<GcReport, StoreError> {
188 self.conn
189 .write(|tx| {
190 Self::gc_unreachable_in_tx(tx).map_err(|err| {
191 rusqlite::Error::ToSqlConversionFailure(Box::new(std::io::Error::other(
192 err.to_string(),
193 )))
194 })
195 })
196 .await
197 .map_err(sqlite_error)
198 }
199
200 fn gc_unreachable_in_tx(tx: &Transaction<'_>) -> Result<GcReport, StoreError> {
204 let mut roots = Self::live_checkpoint_roots(tx)?;
205 {
206 let mut stmt = tx
207 .prepare("SELECT blob_ref FROM artifact_refs ORDER BY artifact_ref")
208 .map_err(sqlite_error)?;
209 let rows = stmt
210 .query_map([], |row| row.get::<_, String>(0))
211 .map_err(sqlite_error)?;
212 for row in rows {
213 roots.push(RetainedArtifactRef {
214 blob_ref: BlobRef(row.map_err(sqlite_error)?),
215 kind: PersistedArtifactKind::LashlangModule,
216 });
217 }
218 }
219 let root_count = roots.len();
220 let mut retained = std::collections::BTreeMap::<String, PersistedArtifactKind>::new();
221 let mut stack = roots;
222 while let Some(current) = stack.pop() {
223 if retained
224 .insert(current.blob_ref.0.clone(), current.kind)
225 .is_some()
226 {
227 continue;
228 }
229 if current.kind != PersistedArtifactKind::CheckpointManifest {
230 continue;
231 }
232 let bytes: Option<Vec<u8>> = tx
240 .query_row(
241 "SELECT content FROM blobs WHERE hash = ?1",
242 params![current.blob_ref.as_str()],
243 |row| row.get::<_, Vec<u8>>(0),
244 )
245 .optional()
246 .map_err(sqlite_error)?;
247 let Some(bytes) = bytes else {
248 continue;
249 };
250 let content = decode_artifact_blob(&bytes).unwrap_or(bytes);
251 let checkpoint = decode_checkpoint(&content).ok_or_else(|| {
252 StoreError::Backend(format!(
253 "gc: live checkpoint manifest `{}` could not be decoded",
254 current.blob_ref
255 ))
256 })?;
257 stack.extend(retained_artifact_refs(&checkpoint));
258 }
259 let all_hashes = {
260 let mut stmt = tx
261 .prepare("SELECT hash FROM blobs ORDER BY hash ASC")
262 .map_err(sqlite_error)?;
263 let rows = stmt
264 .query_map([], |row| row.get::<_, String>(0))
265 .map_err(sqlite_error)?;
266 rows.collect::<Result<Vec<_>, _>>().map_err(sqlite_error)?
267 };
268 let mut deleted_blob_count = 0usize;
269 for hash in &all_hashes {
270 if retained.contains_key(hash) {
271 continue;
272 }
273 tx.execute("DELETE FROM blobs WHERE hash = ?1", params![hash])
274 .map_err(sqlite_error)?;
275 deleted_blob_count += 1;
276 }
277 Ok(GcReport {
278 root_count,
279 retained_blob_count: retained.len(),
280 deleted_blob_count,
281 })
282 }
283}