1use super::*;
12
13impl Store {
14 pub(crate) fn insert_artifact_blob_conn(
15 conn: &Connection,
16 descriptor: BlobArtifactDescriptor,
17 content: &[u8],
18 profile: BuiltinBlobProfile,
19 ) -> rusqlite::Result<BlobRef> {
20 let hash = format!("{:x}", Sha256::digest(content));
21 let stored = encode_artifact_blob(&descriptor, profile, content);
22 conn.execute(
23 "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
24 params![hash, stored],
25 )?;
26 Ok(BlobRef(hash))
27 }
28
29 pub(crate) fn put_typed_artifact_blob_conn<T: serde::Serialize>(
30 conn: &Connection,
31 descriptor: BlobArtifactDescriptor,
32 value: &T,
33 profile: BuiltinBlobProfile,
34 ) -> rusqlite::Result<BlobRef> {
35 let bytes = encode_msgpack(value);
36 Self::insert_artifact_blob_conn(conn, descriptor, &bytes, profile)
37 }
38
39 pub(crate) fn put_checkpoint_conn(
40 conn: &Connection,
41 checkpoint: &HydratedSessionCheckpoint,
42 profile: BuiltinBlobProfile,
43 ) -> rusqlite::Result<StoredSessionCheckpoint> {
44 let tool_state_ref = match checkpoint.tool_state.as_ref() {
45 Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
46 conn,
47 BlobArtifactDescriptor::tool_state_snapshot(),
48 snapshot,
49 profile,
50 )?),
51 None => checkpoint.tool_state_ref.clone(),
52 };
53 let plugin_snapshot_ref = match checkpoint.plugin_snapshot.as_ref() {
54 Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
55 conn,
56 BlobArtifactDescriptor::plugin_session_snapshot(),
57 snapshot,
58 profile,
59 )?),
60 None => checkpoint.plugin_snapshot_ref.clone(),
61 };
62 let execution_state_ref = match checkpoint.execution_state.as_ref() {
63 Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
64 conn,
65 BlobArtifactDescriptor::execution_state_snapshot(),
66 snapshot,
67 profile,
68 )?),
69 None => checkpoint.execution_state_ref.clone(),
70 };
71 let manifest = SessionCheckpoint {
72 turn_state: checkpoint.turn_state.clone(),
73 tool_state_ref,
74 plugin_snapshot_ref,
75 plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
76 execution_state_ref,
77 };
78 let checkpoint_ref = Self::put_typed_artifact_blob_conn(
79 conn,
80 BlobArtifactDescriptor::checkpoint_manifest(),
81 &manifest,
82 profile,
83 )?;
84 Ok(StoredSessionCheckpoint {
85 checkpoint_ref,
86 manifest,
87 })
88 }
89
90 pub(crate) fn get_blob_conn(conn: &Connection, blob_ref: &BlobRef) -> Option<Vec<u8>> {
91 let bytes: Vec<u8> = conn
92 .query_row(
93 "SELECT content FROM blobs WHERE hash = ?1",
94 params![blob_ref.as_str()],
95 |row| row.get(0),
96 )
97 .optional()
98 .ok()
99 .flatten()?;
100 decode_artifact_blob(&bytes).or(Some(bytes))
101 }
102
103 pub(crate) fn get_typed_blob_conn<T: serde::de::DeserializeOwned>(
104 conn: &Connection,
105 blob_ref: &BlobRef,
106 ) -> Option<T> {
107 let bytes = Self::get_blob_conn(conn, blob_ref)?;
108 decode_msgpack(&bytes)
109 }
110
111 pub(crate) fn get_checkpoint_conn(
112 conn: &Connection,
113 blob_ref: &BlobRef,
114 ) -> Option<HydratedSessionCheckpoint> {
115 let record: SessionCheckpoint = Self::get_typed_blob_conn(conn, blob_ref)?;
116 Some(HydratedSessionCheckpoint {
117 turn_state: record.turn_state,
118 tool_state_ref: record.tool_state_ref.clone(),
119 tool_state: record
120 .tool_state_ref
121 .as_ref()
122 .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
123 plugin_snapshot_ref: record.plugin_snapshot_ref.clone(),
124 plugin_snapshot: record
125 .plugin_snapshot_ref
126 .as_ref()
127 .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
128 plugin_snapshot_revision: record.plugin_snapshot_revision,
129 execution_state_ref: record.execution_state_ref.clone(),
130 execution_state: record
131 .execution_state_ref
132 .as_ref()
133 .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
134 })
135 }
136
137 pub(crate) fn load_usage_deltas_conn(conn: &Connection) -> Vec<lash_core::TokenLedgerEntry> {
138 let mut stmt = match conn.prepare(
139 "SELECT source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
140 FROM usage_deltas ORDER BY seq ASC",
141 ) {
142 Ok(stmt) => stmt,
143 Err(_) => return Vec::new(),
144 };
145 let rows = match stmt.query_map([], |row| {
146 Ok(lash_core::TokenLedgerEntry {
147 source: row.get(0)?,
148 model: row.get(1)?,
149 usage: lash_core::TokenUsage {
150 input_tokens: row.get(2)?,
151 output_tokens: row.get(3)?,
152 cached_input_tokens: row.get(4)?,
153 reasoning_tokens: row.get(5)?,
154 },
155 })
156 }) {
157 Ok(rows) => rows,
158 Err(_) => return Vec::new(),
159 };
160 rows.filter_map(Result::ok).collect()
161 }
162
163 pub async fn put_blob(&self, content: &[u8]) -> BlobRef {
164 let hash = format!("{:x}", Sha256::digest(content));
165 let hash_for_row = hash.clone();
166 let content = content.to_vec();
167 let result = self
168 .conn
169 .call(move |conn| {
170 conn.execute(
171 "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
172 params![hash_for_row, content],
173 )
174 })
175 .await;
176 if let Err(err) = result {
177 tracing::warn!(error = %err, hash, "failed to persist checkpoint blob");
178 }
179 BlobRef(hash)
180 }
181
182 pub async fn put_artifact_blob(
183 &self,
184 descriptor: BlobArtifactDescriptor,
185 content: &[u8],
186 ) -> BlobRef {
187 let hash = format!("{:x}", Sha256::digest(content));
188 let stored = encode_artifact_blob(&descriptor, self.options.blob_profile, content);
189 let hash_for_row = hash.clone();
190 let result = self
191 .conn
192 .call(move |conn| {
193 conn.execute(
194 "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
195 params![hash_for_row, stored],
196 )
197 })
198 .await;
199 if let Err(err) = result {
200 tracing::warn!(error = %err, hash, "failed to persist artifact blob");
201 }
202 BlobRef(hash)
203 }
204
205 pub async fn get_blob(&self, blob_ref: &BlobRef) -> Option<Vec<u8>> {
206 let blob_ref = blob_ref.clone();
207 self.conn
208 .call(move |conn| Ok(Self::get_blob_conn(conn, &blob_ref)))
209 .await
210 .ok()
211 .flatten()
212 }
213
214 pub async fn put_typed_blob<T: serde::Serialize>(&self, value: &T) -> BlobRef {
215 let bytes = encode_msgpack(value);
216 self.put_blob(&bytes).await
217 }
218
219 pub async fn put_typed_artifact_blob<T: serde::Serialize>(
220 &self,
221 descriptor: BlobArtifactDescriptor,
222 value: &T,
223 ) -> BlobRef {
224 let bytes = encode_msgpack(value);
225 self.put_artifact_blob(descriptor, &bytes).await
226 }
227
228 pub async fn get_typed_blob<T: serde::de::DeserializeOwned>(
229 &self,
230 blob_ref: &BlobRef,
231 ) -> Option<T> {
232 let bytes = self.get_blob(blob_ref).await?;
233 decode_msgpack(&bytes)
234 }
235
236 pub async fn put_checkpoint(
237 &self,
238 checkpoint: &HydratedSessionCheckpoint,
239 ) -> StoredSessionCheckpoint {
240 let checkpoint = checkpoint.clone();
241 let profile = self.options.blob_profile;
242 self.conn
243 .write(move |tx| Self::put_checkpoint_conn(tx, &checkpoint, profile))
244 .await
245 .expect("checkpoint blob should persist")
246 }
247
248 pub async fn get_checkpoint(&self, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
249 let blob_ref = blob_ref.clone();
250 self.conn
251 .call(move |conn| Ok(Self::get_checkpoint_conn(conn, &blob_ref)))
252 .await
253 .ok()
254 .flatten()
255 }
256
257 pub async fn append_usage_deltas(&self, entries: &[lash_core::TokenLedgerEntry]) {
258 if entries.is_empty() {
259 return;
260 }
261 let entries = entries.to_vec();
262 let result = self
263 .conn
264 .write(move |tx| {
265 let mut stmt = tx.prepare(
266 "INSERT INTO usage_deltas (
267 source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
268 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
269 )?;
270 for entry in &entries {
271 stmt.execute(params![
272 entry.source,
273 entry.model,
274 entry.usage.input_tokens,
275 entry.usage.output_tokens,
276 entry.usage.cached_input_tokens,
277 entry.usage.reasoning_tokens,
278 ])?;
279 }
280 Ok(())
281 })
282 .await;
283 if let Err(err) = result {
284 tracing::warn!(error = %err, "failed to persist usage deltas");
285 }
286 }
287
288 pub async fn load_usage_deltas(&self) -> Vec<lash_core::TokenLedgerEntry> {
289 self.conn
290 .call(|conn| Ok(Self::load_usage_deltas_conn(conn)))
291 .await
292 .unwrap_or_default()
293 }
294}