Skip to main content

lash_sqlite_store/
blobs.rs

1//! Content-addressed blob, artifact, checkpoint, and usage-ledger storage on
2//! [`Store`].
3//!
4//! Reference module (with `lifecycle.rs`) for the translation pattern. The
5//! `*_conn` helpers here are **synchronous** and take a `&rusqlite::Connection`
6//! so they can be reused from inside any `conn.call`/`conn.write` closure (the
7//! checkpoint/persistence/graph modules call them while already on the
8//! connection thread). The public async methods wrap a single helper call in
9//! `self.conn.call(...)`.
10
11use super::*;
12
13impl Store {
14    pub(crate) fn insert_artifact_blob_conn(
15        conn: &Connection,
16        descriptor: BlobArtifactDescriptor,
17        content: &[u8],
18        profile: BuiltinBlobProfile,
19    ) -> rusqlite::Result<BlobRef> {
20        let hash = format!("{:x}", Sha256::digest(content));
21        let stored = encode_artifact_blob(&descriptor, profile, content);
22        conn.execute(
23            "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
24            params![hash, stored],
25        )?;
26        Ok(BlobRef(hash))
27    }
28
29    pub(crate) fn put_typed_artifact_blob_conn<T: serde::Serialize>(
30        conn: &Connection,
31        descriptor: BlobArtifactDescriptor,
32        value: &T,
33        profile: BuiltinBlobProfile,
34    ) -> rusqlite::Result<BlobRef> {
35        let bytes = encode_msgpack(value);
36        Self::insert_artifact_blob_conn(conn, descriptor, &bytes, profile)
37    }
38
39    pub(crate) fn put_checkpoint_conn(
40        conn: &Connection,
41        checkpoint: &HydratedSessionCheckpoint,
42        profile: BuiltinBlobProfile,
43    ) -> rusqlite::Result<StoredSessionCheckpoint> {
44        let tool_state_ref = match checkpoint.tool_state.as_ref() {
45            Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
46                conn,
47                BlobArtifactDescriptor::tool_state_snapshot(),
48                snapshot,
49                profile,
50            )?),
51            None => checkpoint.tool_state_ref.clone(),
52        };
53        let plugin_snapshot_ref = match checkpoint.plugin_snapshot.as_ref() {
54            Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
55                conn,
56                BlobArtifactDescriptor::plugin_session_snapshot(),
57                snapshot,
58                profile,
59            )?),
60            None => checkpoint.plugin_snapshot_ref.clone(),
61        };
62        let execution_state_ref = match checkpoint.execution_state.as_ref() {
63            Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
64                conn,
65                BlobArtifactDescriptor::execution_state_snapshot(),
66                snapshot,
67                profile,
68            )?),
69            None => checkpoint.execution_state_ref.clone(),
70        };
71        let manifest = SessionCheckpoint {
72            turn_state: checkpoint.turn_state.clone(),
73            tool_state_ref,
74            plugin_snapshot_ref,
75            plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
76            execution_state_ref,
77        };
78        let checkpoint_ref = Self::put_typed_artifact_blob_conn(
79            conn,
80            BlobArtifactDescriptor::checkpoint_manifest(),
81            &manifest,
82            profile,
83        )?;
84        Ok(StoredSessionCheckpoint {
85            checkpoint_ref,
86            manifest,
87        })
88    }
89
90    pub(crate) fn get_blob_conn(conn: &Connection, blob_ref: &BlobRef) -> Option<Vec<u8>> {
91        let bytes: Vec<u8> = conn
92            .query_row(
93                "SELECT content FROM blobs WHERE hash = ?1",
94                params![blob_ref.as_str()],
95                |row| row.get(0),
96            )
97            .optional()
98            .ok()
99            .flatten()?;
100        decode_artifact_blob(&bytes).or(Some(bytes))
101    }
102
103    pub(crate) fn get_typed_blob_conn<T: serde::de::DeserializeOwned>(
104        conn: &Connection,
105        blob_ref: &BlobRef,
106    ) -> Option<T> {
107        let bytes = Self::get_blob_conn(conn, blob_ref)?;
108        decode_msgpack(&bytes)
109    }
110
111    pub(crate) fn get_checkpoint_conn(
112        conn: &Connection,
113        blob_ref: &BlobRef,
114    ) -> Option<HydratedSessionCheckpoint> {
115        let record: SessionCheckpoint = Self::get_typed_blob_conn(conn, blob_ref)?;
116        Some(HydratedSessionCheckpoint {
117            turn_state: record.turn_state,
118            tool_state_ref: record.tool_state_ref.clone(),
119            tool_state: record
120                .tool_state_ref
121                .as_ref()
122                .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
123            plugin_snapshot_ref: record.plugin_snapshot_ref.clone(),
124            plugin_snapshot: record
125                .plugin_snapshot_ref
126                .as_ref()
127                .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
128            plugin_snapshot_revision: record.plugin_snapshot_revision,
129            execution_state_ref: record.execution_state_ref.clone(),
130            execution_state: record
131                .execution_state_ref
132                .as_ref()
133                .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
134        })
135    }
136
137    pub(crate) fn load_usage_deltas_conn(conn: &Connection) -> Vec<lash_core::TokenLedgerEntry> {
138        let mut stmt = match conn.prepare(
139            "SELECT source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
140             FROM usage_deltas ORDER BY seq ASC",
141        ) {
142            Ok(stmt) => stmt,
143            Err(_) => return Vec::new(),
144        };
145        let rows = match stmt.query_map([], |row| {
146            Ok(lash_core::TokenLedgerEntry {
147                source: row.get(0)?,
148                model: row.get(1)?,
149                usage: lash_core::TokenUsage {
150                    input_tokens: row.get(2)?,
151                    output_tokens: row.get(3)?,
152                    cached_input_tokens: row.get(4)?,
153                    reasoning_tokens: row.get(5)?,
154                },
155            })
156        }) {
157            Ok(rows) => rows,
158            Err(_) => return Vec::new(),
159        };
160        rows.filter_map(Result::ok).collect()
161    }
162
163    pub async fn put_blob(&self, content: &[u8]) -> BlobRef {
164        let hash = format!("{:x}", Sha256::digest(content));
165        let hash_for_row = hash.clone();
166        let content = content.to_vec();
167        let result = self
168            .conn
169            .call(move |conn| {
170                conn.execute(
171                    "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
172                    params![hash_for_row, content],
173                )
174            })
175            .await;
176        if let Err(err) = result {
177            tracing::warn!(error = %err, hash, "failed to persist checkpoint blob");
178        }
179        BlobRef(hash)
180    }
181
182    pub async fn put_artifact_blob(
183        &self,
184        descriptor: BlobArtifactDescriptor,
185        content: &[u8],
186    ) -> BlobRef {
187        let hash = format!("{:x}", Sha256::digest(content));
188        let stored = encode_artifact_blob(&descriptor, self.options.blob_profile, content);
189        let hash_for_row = hash.clone();
190        let result = self
191            .conn
192            .call(move |conn| {
193                conn.execute(
194                    "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
195                    params![hash_for_row, stored],
196                )
197            })
198            .await;
199        if let Err(err) = result {
200            tracing::warn!(error = %err, hash, "failed to persist artifact blob");
201        }
202        BlobRef(hash)
203    }
204
205    pub async fn get_blob(&self, blob_ref: &BlobRef) -> Option<Vec<u8>> {
206        let blob_ref = blob_ref.clone();
207        self.conn
208            .call(move |conn| Ok(Self::get_blob_conn(conn, &blob_ref)))
209            .await
210            .ok()
211            .flatten()
212    }
213
214    pub async fn put_typed_blob<T: serde::Serialize>(&self, value: &T) -> BlobRef {
215        let bytes = encode_msgpack(value);
216        self.put_blob(&bytes).await
217    }
218
219    pub async fn put_typed_artifact_blob<T: serde::Serialize>(
220        &self,
221        descriptor: BlobArtifactDescriptor,
222        value: &T,
223    ) -> BlobRef {
224        let bytes = encode_msgpack(value);
225        self.put_artifact_blob(descriptor, &bytes).await
226    }
227
228    pub async fn get_typed_blob<T: serde::de::DeserializeOwned>(
229        &self,
230        blob_ref: &BlobRef,
231    ) -> Option<T> {
232        let bytes = self.get_blob(blob_ref).await?;
233        decode_msgpack(&bytes)
234    }
235
236    pub async fn put_checkpoint(
237        &self,
238        checkpoint: &HydratedSessionCheckpoint,
239    ) -> StoredSessionCheckpoint {
240        let checkpoint = checkpoint.clone();
241        let profile = self.options.blob_profile;
242        self.conn
243            .write(move |tx| Self::put_checkpoint_conn(tx, &checkpoint, profile))
244            .await
245            .expect("checkpoint blob should persist")
246    }
247
248    pub async fn get_checkpoint(&self, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
249        let blob_ref = blob_ref.clone();
250        self.conn
251            .call(move |conn| Ok(Self::get_checkpoint_conn(conn, &blob_ref)))
252            .await
253            .ok()
254            .flatten()
255    }
256
257    pub async fn append_usage_deltas(&self, entries: &[lash_core::TokenLedgerEntry]) {
258        if entries.is_empty() {
259            return;
260        }
261        let entries = entries.to_vec();
262        let result = self
263            .conn
264            .write(move |tx| {
265                let mut stmt = tx.prepare(
266                    "INSERT INTO usage_deltas (
267                        source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
268                    ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
269                )?;
270                for entry in &entries {
271                    stmt.execute(params![
272                        entry.source,
273                        entry.model,
274                        entry.usage.input_tokens,
275                        entry.usage.output_tokens,
276                        entry.usage.cached_input_tokens,
277                        entry.usage.reasoning_tokens,
278                    ])?;
279                }
280                Ok(())
281            })
282            .await;
283        if let Err(err) = result {
284            tracing::warn!(error = %err, "failed to persist usage deltas");
285        }
286    }
287
288    pub async fn load_usage_deltas(&self) -> Vec<lash_core::TokenLedgerEntry> {
289        self.conn
290            .call(|conn| Ok(Self::load_usage_deltas_conn(conn)))
291            .await
292            .unwrap_or_default()
293    }
294}