Skip to main content

lash_sqlite_store/
blobs.rs

1//! Content-addressed blob, artifact, checkpoint, and usage-ledger storage on
2//! [`Store`].
3//!
4//! Reference module (with `lifecycle.rs`) for the translation pattern. The
5//! `*_conn` helpers here are **synchronous** and take a `&rusqlite::Connection`
6//! so they can be reused from inside any `conn.call`/`conn.write` closure (the
7//! checkpoint/persistence/graph modules call them while already on the
8//! connection thread). The public async methods wrap a single helper call in
9//! `self.conn.call(...)`.
10
11use super::*;
12
13impl Store {
14    pub(crate) fn insert_artifact_blob_conn(
15        conn: &Connection,
16        descriptor: BlobArtifactDescriptor,
17        content: &[u8],
18        profile: BuiltinBlobProfile,
19    ) -> rusqlite::Result<BlobRef> {
20        let hash = format!("{:x}", Sha256::digest(content));
21        let stored = encode_artifact_blob(&descriptor, profile, content);
22        conn.execute(
23            "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
24            params![hash, stored],
25        )?;
26        Ok(BlobRef(hash))
27    }
28
29    pub(crate) fn put_typed_artifact_blob_conn<T: serde::Serialize>(
30        conn: &Connection,
31        descriptor: BlobArtifactDescriptor,
32        value: &T,
33        profile: BuiltinBlobProfile,
34    ) -> rusqlite::Result<BlobRef> {
35        let bytes = encode_msgpack(value);
36        Self::insert_artifact_blob_conn(conn, descriptor, &bytes, profile)
37    }
38
39    pub(crate) fn put_checkpoint_conn(
40        conn: &Connection,
41        checkpoint: &HydratedSessionCheckpoint,
42        profile: BuiltinBlobProfile,
43    ) -> rusqlite::Result<StoredSessionCheckpoint> {
44        let tool_state_ref = match checkpoint.tool_state.as_ref() {
45            Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
46                conn,
47                BlobArtifactDescriptor::tool_state_snapshot(),
48                snapshot,
49                profile,
50            )?),
51            None => checkpoint.tool_state_ref.clone(),
52        };
53        let plugin_snapshot_ref = match checkpoint.plugin_snapshot.as_ref() {
54            Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
55                conn,
56                BlobArtifactDescriptor::plugin_session_snapshot(),
57                snapshot,
58                profile,
59            )?),
60            None => checkpoint.plugin_snapshot_ref.clone(),
61        };
62        let execution_state_ref = match checkpoint.execution_state.as_ref() {
63            Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
64                conn,
65                BlobArtifactDescriptor::execution_state_snapshot(),
66                snapshot,
67                profile,
68            )?),
69            None => checkpoint.execution_state_ref.clone(),
70        };
71        let manifest = SessionCheckpoint::new(
72            checkpoint.turn_state.clone(),
73            tool_state_ref,
74            plugin_snapshot_ref,
75            checkpoint.plugin_snapshot_revision,
76            execution_state_ref,
77        );
78        let checkpoint_ref = Self::put_typed_artifact_blob_conn(
79            conn,
80            BlobArtifactDescriptor::checkpoint_manifest(),
81            &manifest,
82            profile,
83        )?;
84        Ok(StoredSessionCheckpoint {
85            checkpoint_ref,
86            manifest,
87        })
88    }
89
90    pub(crate) fn get_blob_conn(conn: &Connection, blob_ref: &BlobRef) -> Option<Vec<u8>> {
91        let bytes: Vec<u8> = conn
92            .query_row(
93                "SELECT content FROM blobs WHERE hash = ?1",
94                params![blob_ref.as_str()],
95                |row| row.get(0),
96            )
97            .optional()
98            .ok()
99            .flatten()?;
100        decode_artifact_blob(&bytes).or(Some(bytes))
101    }
102
103    pub(crate) fn get_typed_blob_conn<T: serde::de::DeserializeOwned>(
104        conn: &Connection,
105        blob_ref: &BlobRef,
106    ) -> Option<T> {
107        let bytes = Self::get_blob_conn(conn, blob_ref)?;
108        decode_msgpack(&bytes)
109    }
110
111    pub(crate) fn get_checkpoint_conn(
112        conn: &Connection,
113        blob_ref: &BlobRef,
114    ) -> Result<Option<HydratedSessionCheckpoint>, StoreError> {
115        let Some(bytes) = Self::get_blob_conn(conn, blob_ref) else {
116            return Ok(None);
117        };
118        let record = decode_checkpoint(&bytes)?;
119        Ok(Some(HydratedSessionCheckpoint {
120            turn_state: record.turn_state,
121            tool_state_ref: record.tool_state_ref.clone(),
122            tool_state: record
123                .tool_state_ref
124                .as_ref()
125                .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
126            plugin_snapshot_ref: record.plugin_snapshot_ref.clone(),
127            plugin_snapshot: record
128                .plugin_snapshot_ref
129                .as_ref()
130                .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
131            plugin_snapshot_revision: record.plugin_snapshot_revision,
132            execution_state_ref: record.execution_state_ref.clone(),
133            execution_state: record
134                .execution_state_ref
135                .as_ref()
136                .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
137        }))
138    }
139
140    pub(crate) fn load_usage_deltas_conn(conn: &Connection) -> Vec<lash_core::TokenLedgerEntry> {
141        let mut stmt = match conn.prepare(
142            "SELECT source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
143             FROM usage_deltas ORDER BY seq ASC",
144        ) {
145            Ok(stmt) => stmt,
146            Err(_) => return Vec::new(),
147        };
148        let rows = match stmt.query_map([], |row| {
149            Ok(lash_core::TokenLedgerEntry {
150                source: row.get(0)?,
151                model: row.get(1)?,
152                usage: lash_core::TokenUsage {
153                    input_tokens: row.get(2)?,
154                    output_tokens: row.get(3)?,
155                    cached_input_tokens: row.get(4)?,
156                    reasoning_tokens: row.get(5)?,
157                },
158            })
159        }) {
160            Ok(rows) => rows,
161            Err(_) => return Vec::new(),
162        };
163        rows.filter_map(Result::ok).collect()
164    }
165
166    pub async fn put_blob(&self, content: &[u8]) -> BlobRef {
167        let hash = format!("{:x}", Sha256::digest(content));
168        let hash_for_row = hash.clone();
169        let content = content.to_vec();
170        let result = self
171            .conn
172            .call(move |conn| {
173                conn.execute(
174                    "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
175                    params![hash_for_row, content],
176                )
177            })
178            .await;
179        if let Err(err) = result {
180            tracing::warn!(error = %err, hash, "failed to persist checkpoint blob");
181        }
182        BlobRef(hash)
183    }
184
185    pub async fn put_artifact_blob(
186        &self,
187        descriptor: BlobArtifactDescriptor,
188        content: &[u8],
189    ) -> BlobRef {
190        let hash = format!("{:x}", Sha256::digest(content));
191        let stored = encode_artifact_blob(&descriptor, self.options.blob_profile, content);
192        let hash_for_row = hash.clone();
193        let result = self
194            .conn
195            .call(move |conn| {
196                conn.execute(
197                    "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
198                    params![hash_for_row, stored],
199                )
200            })
201            .await;
202        if let Err(err) = result {
203            tracing::warn!(error = %err, hash, "failed to persist artifact blob");
204        }
205        BlobRef(hash)
206    }
207
208    pub async fn get_blob(&self, blob_ref: &BlobRef) -> Option<Vec<u8>> {
209        let blob_ref = blob_ref.clone();
210        self.conn
211            .call(move |conn| Ok(Self::get_blob_conn(conn, &blob_ref)))
212            .await
213            .ok()
214            .flatten()
215    }
216
217    pub async fn put_typed_blob<T: serde::Serialize>(&self, value: &T) -> BlobRef {
218        let bytes = encode_msgpack(value);
219        self.put_blob(&bytes).await
220    }
221
222    pub async fn put_typed_artifact_blob<T: serde::Serialize>(
223        &self,
224        descriptor: BlobArtifactDescriptor,
225        value: &T,
226    ) -> BlobRef {
227        let bytes = encode_msgpack(value);
228        self.put_artifact_blob(descriptor, &bytes).await
229    }
230
231    pub async fn get_typed_blob<T: serde::de::DeserializeOwned>(
232        &self,
233        blob_ref: &BlobRef,
234    ) -> Option<T> {
235        let bytes = self.get_blob(blob_ref).await?;
236        decode_msgpack(&bytes)
237    }
238
239    pub async fn put_checkpoint(
240        &self,
241        checkpoint: &HydratedSessionCheckpoint,
242    ) -> StoredSessionCheckpoint {
243        let checkpoint = checkpoint.clone();
244        let profile = self.options.blob_profile;
245        self.conn
246            .write(move |tx| Self::put_checkpoint_conn(tx, &checkpoint, profile))
247            .await
248            .expect("checkpoint blob should persist")
249    }
250
251    pub async fn get_checkpoint(&self, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
252        let blob_ref = blob_ref.clone();
253        self.conn
254            .call(move |conn| Ok(Self::get_checkpoint_conn(conn, &blob_ref)))
255            .await
256            .ok()
257            .and_then(Result::ok)
258            .flatten()
259    }
260
261    pub async fn append_usage_deltas(&self, entries: &[lash_core::TokenLedgerEntry]) {
262        if entries.is_empty() {
263            return;
264        }
265        let entries = entries.to_vec();
266        let result = self
267            .conn
268            .write(move |tx| {
269                let mut stmt = tx.prepare(
270                    "INSERT INTO usage_deltas (
271                        source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
272                    ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
273                )?;
274                for entry in &entries {
275                    stmt.execute(params![
276                        entry.source,
277                        entry.model,
278                        entry.usage.input_tokens,
279                        entry.usage.output_tokens,
280                        entry.usage.cached_input_tokens,
281                        entry.usage.reasoning_tokens,
282                    ])?;
283                }
284                Ok(())
285            })
286            .await;
287        if let Err(err) = result {
288            tracing::warn!(error = %err, "failed to persist usage deltas");
289        }
290    }
291
292    pub async fn load_usage_deltas(&self) -> Vec<lash_core::TokenLedgerEntry> {
293        self.conn
294            .call(|conn| Ok(Self::load_usage_deltas_conn(conn)))
295            .await
296            .unwrap_or_default()
297    }
298}