1use super::*;
12
13fn blob_content_hash(content: &[u8]) -> String {
15 format!("{:x}", Sha256::digest(content))
16}
17
18impl Store {
19 pub(crate) fn insert_artifact_blob_conn(
20 conn: &Connection,
21 descriptor: BlobArtifactDescriptor,
22 content: &[u8],
23 profile: BuiltinBlobProfile,
24 ) -> rusqlite::Result<BlobRef> {
25 let hash = blob_content_hash(content);
26 let stored = encode_artifact_blob(&descriptor, profile, content);
27 conn.execute(
28 "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
29 params![hash, stored],
30 )?;
31 Ok(BlobRef(hash))
32 }
33
34 pub(crate) fn put_typed_artifact_blob_conn<T: serde::Serialize>(
35 conn: &Connection,
36 descriptor: BlobArtifactDescriptor,
37 value: &T,
38 profile: BuiltinBlobProfile,
39 ) -> rusqlite::Result<BlobRef> {
40 let bytes = encode_msgpack(value);
41 Self::insert_artifact_blob_conn(conn, descriptor, &bytes, profile)
42 }
43
44 pub(crate) fn put_checkpoint_conn(
45 conn: &Connection,
46 checkpoint: &HydratedSessionCheckpoint,
47 profile: BuiltinBlobProfile,
48 ) -> rusqlite::Result<StoredSessionCheckpoint> {
49 let tool_state_ref = match checkpoint.tool_state.as_ref() {
50 Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
51 conn,
52 BlobArtifactDescriptor::tool_state_snapshot(),
53 snapshot,
54 profile,
55 )?),
56 None => checkpoint.tool_state_ref.clone(),
57 };
58 let plugin_snapshot_ref = match checkpoint.plugin_snapshot.as_ref() {
59 Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
60 conn,
61 BlobArtifactDescriptor::plugin_session_snapshot(),
62 snapshot,
63 profile,
64 )?),
65 None => checkpoint.plugin_snapshot_ref.clone(),
66 };
67 let execution_state_ref = match checkpoint.execution_state.as_ref() {
68 Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
69 conn,
70 BlobArtifactDescriptor::execution_state_snapshot(),
71 snapshot,
72 profile,
73 )?),
74 None => checkpoint.execution_state_ref.clone(),
75 };
76 let manifest = SessionCheckpoint::new(
77 checkpoint.turn_state.clone(),
78 tool_state_ref,
79 plugin_snapshot_ref,
80 checkpoint.plugin_snapshot_revision,
81 execution_state_ref,
82 );
83 let checkpoint_ref = Self::put_typed_artifact_blob_conn(
84 conn,
85 BlobArtifactDescriptor::checkpoint_manifest(),
86 &manifest,
87 profile,
88 )?;
89 Ok(StoredSessionCheckpoint {
90 checkpoint_ref,
91 manifest,
92 })
93 }
94
95 pub(crate) fn get_blob_conn(conn: &Connection, blob_ref: &BlobRef) -> Option<Vec<u8>> {
96 let bytes: Vec<u8> = conn
97 .query_row(
98 "SELECT content FROM blobs WHERE hash = ?1",
99 params![blob_ref.as_str()],
100 |row| row.get(0),
101 )
102 .optional()
103 .ok()
104 .flatten()?;
105 decode_artifact_blob(&bytes).or(Some(bytes))
106 }
107
108 pub(crate) fn get_typed_blob_conn<T: serde::de::DeserializeOwned>(
109 conn: &Connection,
110 blob_ref: &BlobRef,
111 ) -> Option<T> {
112 let bytes = Self::get_blob_conn(conn, blob_ref)?;
113 decode_msgpack(&bytes)
114 }
115
116 pub(crate) fn get_checkpoint_conn(
117 conn: &Connection,
118 blob_ref: &BlobRef,
119 ) -> Result<Option<HydratedSessionCheckpoint>, StoreError> {
120 let Some(bytes) = Self::get_blob_conn(conn, blob_ref) else {
121 return Ok(None);
122 };
123 let record = decode_checkpoint(&bytes)?;
124 Ok(Some(HydratedSessionCheckpoint {
125 turn_state: record.turn_state,
126 tool_state_ref: record.tool_state_ref.clone(),
127 tool_state: record
128 .tool_state_ref
129 .as_ref()
130 .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
131 plugin_snapshot_ref: record.plugin_snapshot_ref.clone(),
132 plugin_snapshot: record
133 .plugin_snapshot_ref
134 .as_ref()
135 .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
136 plugin_snapshot_revision: record.plugin_snapshot_revision,
137 execution_state_ref: record.execution_state_ref.clone(),
138 execution_state: record
139 .execution_state_ref
140 .as_ref()
141 .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
142 }))
143 }
144
145 pub(crate) fn load_usage_deltas_conn(conn: &Connection) -> Vec<lash_core::TokenLedgerEntry> {
146 let mut stmt = match conn.prepare(
147 "SELECT source, model, input_tokens, output_tokens, cache_read_input_tokens, cache_write_input_tokens, reasoning_output_tokens
148 FROM usage_deltas ORDER BY seq ASC",
149 ) {
150 Ok(stmt) => stmt,
151 Err(_) => return Vec::new(),
152 };
153 let rows = match stmt.query_map([], |row| {
154 Ok(lash_core::TokenLedgerEntry {
155 source: row.get(0)?,
156 model: row.get(1)?,
157 usage: lash_core::TokenUsage {
158 input_tokens: row.get(2)?,
159 output_tokens: row.get(3)?,
160 cache_read_input_tokens: row.get(4)?,
161 cache_write_input_tokens: row.get(5)?,
162 reasoning_output_tokens: row.get(6)?,
163 },
164 })
165 }) {
166 Ok(rows) => rows,
167 Err(_) => return Vec::new(),
168 };
169 rows.filter_map(Result::ok).collect()
170 }
171
172 async fn insert_blob_row(&self, hash: String, stored: Vec<u8>, warn_label: &str) -> BlobRef {
175 let hash_for_row = hash.clone();
176 let result = self
177 .conn
178 .call(move |conn| {
179 conn.execute(
180 "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
181 params![hash_for_row, stored],
182 )
183 })
184 .await;
185 if let Err(err) = result {
186 tracing::warn!(error = %err, hash, "{warn_label}");
187 }
188 BlobRef(hash)
189 }
190
191 pub async fn put_blob(&self, content: &[u8]) -> BlobRef {
192 let hash = blob_content_hash(content);
193 self.insert_blob_row(hash, content.to_vec(), "failed to persist checkpoint blob")
194 .await
195 }
196
197 pub async fn put_artifact_blob(
198 &self,
199 descriptor: BlobArtifactDescriptor,
200 content: &[u8],
201 ) -> BlobRef {
202 let hash = blob_content_hash(content);
203 let stored = encode_artifact_blob(&descriptor, self.options.blob_profile, content);
204 self.insert_blob_row(hash, stored, "failed to persist artifact blob")
205 .await
206 }
207
208 pub async fn get_blob(&self, blob_ref: &BlobRef) -> Option<Vec<u8>> {
209 let blob_ref = blob_ref.clone();
210 self.conn
211 .call(move |conn| Ok(Self::get_blob_conn(conn, &blob_ref)))
212 .await
213 .ok()
214 .flatten()
215 }
216
217 pub async fn put_typed_blob<T: serde::Serialize>(&self, value: &T) -> BlobRef {
218 let bytes = encode_msgpack(value);
219 self.put_blob(&bytes).await
220 }
221
222 pub async fn put_typed_artifact_blob<T: serde::Serialize>(
223 &self,
224 descriptor: BlobArtifactDescriptor,
225 value: &T,
226 ) -> BlobRef {
227 let bytes = encode_msgpack(value);
228 self.put_artifact_blob(descriptor, &bytes).await
229 }
230
231 pub async fn get_typed_blob<T: serde::de::DeserializeOwned>(
232 &self,
233 blob_ref: &BlobRef,
234 ) -> Option<T> {
235 let bytes = self.get_blob(blob_ref).await?;
236 decode_msgpack(&bytes)
237 }
238
239 pub async fn put_checkpoint(
240 &self,
241 checkpoint: &HydratedSessionCheckpoint,
242 ) -> StoredSessionCheckpoint {
243 let checkpoint = checkpoint.clone();
244 let profile = self.options.blob_profile;
245 self.conn
246 .write(move |tx| Self::put_checkpoint_conn(tx, &checkpoint, profile))
247 .await
248 .expect("checkpoint blob should persist")
249 }
250
251 pub async fn get_checkpoint(&self, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
252 let blob_ref = blob_ref.clone();
253 self.conn
254 .call(move |conn| Ok(Self::get_checkpoint_conn(conn, &blob_ref)))
255 .await
256 .ok()
257 .and_then(Result::ok)
258 .flatten()
259 }
260
261 pub async fn append_usage_deltas(&self, entries: &[lash_core::TokenLedgerEntry]) {
262 if entries.is_empty() {
263 return;
264 }
265 let entries = entries.to_vec();
266 let result = self
267 .conn
268 .write(move |tx| {
269 let mut stmt = tx.prepare(
270 "INSERT INTO usage_deltas (
271 source, model, input_tokens, output_tokens, cache_read_input_tokens, cache_write_input_tokens, reasoning_output_tokens
272 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
273 )?;
274 for entry in &entries {
275 stmt.execute(params![
276 entry.source,
277 entry.model,
278 entry.usage.input_tokens,
279 entry.usage.output_tokens,
280 entry.usage.cache_read_input_tokens,
281 entry.usage.cache_write_input_tokens,
282 entry.usage.reasoning_output_tokens,
283 ])?;
284 }
285 Ok(())
286 })
287 .await;
288 if let Err(err) = result {
289 tracing::warn!(error = %err, "failed to persist usage deltas");
290 }
291 }
292
293 pub async fn load_usage_deltas(&self) -> Vec<lash_core::TokenLedgerEntry> {
294 self.conn
295 .call(|conn| Ok(Self::load_usage_deltas_conn(conn)))
296 .await
297 .unwrap_or_default()
298 }
299}