Skip to main content

lash_sqlite_store/
blobs.rs

1//! Content-addressed blob, artifact, checkpoint, and usage-ledger storage on
2//! [`Store`].
3//!
4//! Reference module (with `lifecycle.rs`) for the translation pattern. The
5//! `*_conn` helpers here are **synchronous** and take a `&rusqlite::Connection`
6//! so they can be reused from inside any `conn.call`/`conn.write` closure (the
7//! checkpoint/persistence/graph modules call them while already on the
8//! connection thread). The public async methods wrap a single helper call in
9//! `self.conn.call(...)`.
10
11use super::*;
12
13/// Hex SHA-256 content address that keys every row in the `blobs` table.
14fn blob_content_hash(content: &[u8]) -> String {
15    format!("{:x}", Sha256::digest(content))
16}
17
18impl Store {
19    pub(crate) fn insert_artifact_blob_conn(
20        conn: &Connection,
21        descriptor: BlobArtifactDescriptor,
22        content: &[u8],
23        profile: BuiltinBlobProfile,
24    ) -> rusqlite::Result<BlobRef> {
25        let hash = blob_content_hash(content);
26        let stored = encode_artifact_blob(&descriptor, profile, content);
27        conn.execute(
28            "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
29            params![hash, stored],
30        )?;
31        Ok(BlobRef(hash))
32    }
33
34    pub(crate) fn put_typed_artifact_blob_conn<T: serde::Serialize>(
35        conn: &Connection,
36        descriptor: BlobArtifactDescriptor,
37        value: &T,
38        profile: BuiltinBlobProfile,
39    ) -> rusqlite::Result<BlobRef> {
40        let bytes = encode_msgpack(value);
41        Self::insert_artifact_blob_conn(conn, descriptor, &bytes, profile)
42    }
43
44    pub(crate) fn put_checkpoint_conn(
45        conn: &Connection,
46        checkpoint: &HydratedSessionCheckpoint,
47        profile: BuiltinBlobProfile,
48    ) -> rusqlite::Result<StoredSessionCheckpoint> {
49        let tool_state_ref = match checkpoint.tool_state.as_ref() {
50            Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
51                conn,
52                BlobArtifactDescriptor::tool_state_snapshot(),
53                snapshot,
54                profile,
55            )?),
56            None => checkpoint.tool_state_ref.clone(),
57        };
58        let plugin_snapshot_ref = match checkpoint.plugin_snapshot.as_ref() {
59            Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
60                conn,
61                BlobArtifactDescriptor::plugin_session_snapshot(),
62                snapshot,
63                profile,
64            )?),
65            None => checkpoint.plugin_snapshot_ref.clone(),
66        };
67        let execution_state_ref = match checkpoint.execution_state.as_ref() {
68            Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
69                conn,
70                BlobArtifactDescriptor::execution_state_snapshot(),
71                snapshot,
72                profile,
73            )?),
74            None => checkpoint.execution_state_ref.clone(),
75        };
76        let manifest = SessionCheckpoint::new(
77            checkpoint.turn_state.clone(),
78            tool_state_ref,
79            plugin_snapshot_ref,
80            checkpoint.plugin_snapshot_revision,
81            execution_state_ref,
82        );
83        let checkpoint_ref = Self::put_typed_artifact_blob_conn(
84            conn,
85            BlobArtifactDescriptor::checkpoint_manifest(),
86            &manifest,
87            profile,
88        )?;
89        Ok(StoredSessionCheckpoint {
90            checkpoint_ref,
91            manifest,
92        })
93    }
94
95    pub(crate) fn get_blob_conn(conn: &Connection, blob_ref: &BlobRef) -> Option<Vec<u8>> {
96        let bytes: Vec<u8> = conn
97            .query_row(
98                "SELECT content FROM blobs WHERE hash = ?1",
99                params![blob_ref.as_str()],
100                |row| row.get(0),
101            )
102            .optional()
103            .ok()
104            .flatten()?;
105        decode_artifact_blob(&bytes).or(Some(bytes))
106    }
107
108    pub(crate) fn get_typed_blob_conn<T: serde::de::DeserializeOwned>(
109        conn: &Connection,
110        blob_ref: &BlobRef,
111    ) -> Option<T> {
112        let bytes = Self::get_blob_conn(conn, blob_ref)?;
113        decode_msgpack(&bytes)
114    }
115
116    pub(crate) fn get_checkpoint_conn(
117        conn: &Connection,
118        blob_ref: &BlobRef,
119    ) -> Result<Option<HydratedSessionCheckpoint>, StoreError> {
120        let Some(bytes) = Self::get_blob_conn(conn, blob_ref) else {
121            return Ok(None);
122        };
123        let record = decode_checkpoint(&bytes)?;
124        Ok(Some(HydratedSessionCheckpoint {
125            turn_state: record.turn_state,
126            tool_state_ref: record.tool_state_ref.clone(),
127            tool_state: record
128                .tool_state_ref
129                .as_ref()
130                .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
131            plugin_snapshot_ref: record.plugin_snapshot_ref.clone(),
132            plugin_snapshot: record
133                .plugin_snapshot_ref
134                .as_ref()
135                .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
136            plugin_snapshot_revision: record.plugin_snapshot_revision,
137            execution_state_ref: record.execution_state_ref.clone(),
138            execution_state: record
139                .execution_state_ref
140                .as_ref()
141                .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
142        }))
143    }
144
145    pub(crate) fn load_usage_deltas_conn(conn: &Connection) -> Vec<lash_core::TokenLedgerEntry> {
146        let mut stmt = match conn.prepare(
147            "SELECT source, model, input_tokens, output_tokens, cache_read_input_tokens, cache_write_input_tokens, reasoning_output_tokens
148             FROM usage_deltas ORDER BY seq ASC",
149        ) {
150            Ok(stmt) => stmt,
151            Err(_) => return Vec::new(),
152        };
153        let rows = match stmt.query_map([], |row| {
154            Ok(lash_core::TokenLedgerEntry {
155                source: row.get(0)?,
156                model: row.get(1)?,
157                usage: lash_core::TokenUsage {
158                    input_tokens: row.get(2)?,
159                    output_tokens: row.get(3)?,
160                    cache_read_input_tokens: row.get(4)?,
161                    cache_write_input_tokens: row.get(5)?,
162                    reasoning_output_tokens: row.get(6)?,
163                },
164            })
165        }) {
166            Ok(rows) => rows,
167            Err(_) => return Vec::new(),
168        };
169        rows.filter_map(Result::ok).collect()
170    }
171
172    /// Persist `stored` bytes under `hash` in the `blobs` table, warning with
173    /// `warn_label` (and dropping the row) if the write fails.
174    async fn insert_blob_row(&self, hash: String, stored: Vec<u8>, warn_label: &str) -> BlobRef {
175        let hash_for_row = hash.clone();
176        let result = self
177            .conn
178            .call(move |conn| {
179                conn.execute(
180                    "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
181                    params![hash_for_row, stored],
182                )
183            })
184            .await;
185        if let Err(err) = result {
186            tracing::warn!(error = %err, hash, "{warn_label}");
187        }
188        BlobRef(hash)
189    }
190
191    pub async fn put_blob(&self, content: &[u8]) -> BlobRef {
192        let hash = blob_content_hash(content);
193        self.insert_blob_row(hash, content.to_vec(), "failed to persist checkpoint blob")
194            .await
195    }
196
197    pub async fn put_artifact_blob(
198        &self,
199        descriptor: BlobArtifactDescriptor,
200        content: &[u8],
201    ) -> BlobRef {
202        let hash = blob_content_hash(content);
203        let stored = encode_artifact_blob(&descriptor, self.options.blob_profile, content);
204        self.insert_blob_row(hash, stored, "failed to persist artifact blob")
205            .await
206    }
207
208    pub async fn get_blob(&self, blob_ref: &BlobRef) -> Option<Vec<u8>> {
209        let blob_ref = blob_ref.clone();
210        self.conn
211            .call(move |conn| Ok(Self::get_blob_conn(conn, &blob_ref)))
212            .await
213            .ok()
214            .flatten()
215    }
216
217    pub async fn put_typed_blob<T: serde::Serialize>(&self, value: &T) -> BlobRef {
218        let bytes = encode_msgpack(value);
219        self.put_blob(&bytes).await
220    }
221
222    pub async fn put_typed_artifact_blob<T: serde::Serialize>(
223        &self,
224        descriptor: BlobArtifactDescriptor,
225        value: &T,
226    ) -> BlobRef {
227        let bytes = encode_msgpack(value);
228        self.put_artifact_blob(descriptor, &bytes).await
229    }
230
231    pub async fn get_typed_blob<T: serde::de::DeserializeOwned>(
232        &self,
233        blob_ref: &BlobRef,
234    ) -> Option<T> {
235        let bytes = self.get_blob(blob_ref).await?;
236        decode_msgpack(&bytes)
237    }
238
239    pub async fn put_checkpoint(
240        &self,
241        checkpoint: &HydratedSessionCheckpoint,
242    ) -> StoredSessionCheckpoint {
243        let checkpoint = checkpoint.clone();
244        let profile = self.options.blob_profile;
245        self.conn
246            .write(move |tx| Self::put_checkpoint_conn(tx, &checkpoint, profile))
247            .await
248            .expect("checkpoint blob should persist")
249    }
250
251    pub async fn get_checkpoint(&self, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
252        let blob_ref = blob_ref.clone();
253        self.conn
254            .call(move |conn| Ok(Self::get_checkpoint_conn(conn, &blob_ref)))
255            .await
256            .ok()
257            .and_then(Result::ok)
258            .flatten()
259    }
260
261    pub async fn append_usage_deltas(&self, entries: &[lash_core::TokenLedgerEntry]) {
262        if entries.is_empty() {
263            return;
264        }
265        let entries = entries.to_vec();
266        let result = self
267            .conn
268            .write(move |tx| {
269                let mut stmt = tx.prepare(
270                    "INSERT INTO usage_deltas (
271                        source, model, input_tokens, output_tokens, cache_read_input_tokens, cache_write_input_tokens, reasoning_output_tokens
272                    ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
273                )?;
274                for entry in &entries {
275                    stmt.execute(params![
276                        entry.source,
277                        entry.model,
278                        entry.usage.input_tokens,
279                        entry.usage.output_tokens,
280                        entry.usage.cache_read_input_tokens,
281                        entry.usage.cache_write_input_tokens,
282                        entry.usage.reasoning_output_tokens,
283                    ])?;
284                }
285                Ok(())
286            })
287            .await;
288        if let Err(err) = result {
289            tracing::warn!(error = %err, "failed to persist usage deltas");
290        }
291    }
292
293    pub async fn load_usage_deltas(&self) -> Vec<lash_core::TokenLedgerEntry> {
294        self.conn
295            .call(|conn| Ok(Self::load_usage_deltas_conn(conn)))
296            .await
297            .unwrap_or_default()
298    }
299}