1use super::*;
12
13impl Store {
14 pub(crate) fn insert_artifact_blob_conn(
15 conn: &Connection,
16 descriptor: BlobArtifactDescriptor,
17 content: &[u8],
18 profile: BuiltinBlobProfile,
19 ) -> rusqlite::Result<BlobRef> {
20 let hash = format!("{:x}", Sha256::digest(content));
21 let stored = encode_artifact_blob(&descriptor, profile, content);
22 conn.execute(
23 "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
24 params![hash, stored],
25 )?;
26 Ok(BlobRef(hash))
27 }
28
29 pub(crate) fn put_typed_artifact_blob_conn<T: serde::Serialize>(
30 conn: &Connection,
31 descriptor: BlobArtifactDescriptor,
32 value: &T,
33 profile: BuiltinBlobProfile,
34 ) -> rusqlite::Result<BlobRef> {
35 let bytes = encode_msgpack(value);
36 Self::insert_artifact_blob_conn(conn, descriptor, &bytes, profile)
37 }
38
39 pub(crate) fn put_checkpoint_conn(
40 conn: &Connection,
41 checkpoint: &HydratedSessionCheckpoint,
42 profile: BuiltinBlobProfile,
43 ) -> rusqlite::Result<StoredSessionCheckpoint> {
44 let tool_state_ref = match checkpoint.tool_state.as_ref() {
45 Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
46 conn,
47 BlobArtifactDescriptor::tool_state_snapshot(),
48 snapshot,
49 profile,
50 )?),
51 None => checkpoint.tool_state_ref.clone(),
52 };
53 let plugin_snapshot_ref = match checkpoint.plugin_snapshot.as_ref() {
54 Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
55 conn,
56 BlobArtifactDescriptor::plugin_session_snapshot(),
57 snapshot,
58 profile,
59 )?),
60 None => checkpoint.plugin_snapshot_ref.clone(),
61 };
62 let execution_state_ref = match checkpoint.execution_state.as_ref() {
63 Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
64 conn,
65 BlobArtifactDescriptor::execution_state_snapshot(),
66 snapshot,
67 profile,
68 )?),
69 None => checkpoint.execution_state_ref.clone(),
70 };
71 let manifest = SessionCheckpoint::new(
72 checkpoint.turn_state.clone(),
73 tool_state_ref,
74 plugin_snapshot_ref,
75 checkpoint.plugin_snapshot_revision,
76 execution_state_ref,
77 );
78 let checkpoint_ref = Self::put_typed_artifact_blob_conn(
79 conn,
80 BlobArtifactDescriptor::checkpoint_manifest(),
81 &manifest,
82 profile,
83 )?;
84 Ok(StoredSessionCheckpoint {
85 checkpoint_ref,
86 manifest,
87 })
88 }
89
90 pub(crate) fn get_blob_conn(conn: &Connection, blob_ref: &BlobRef) -> Option<Vec<u8>> {
91 let bytes: Vec<u8> = conn
92 .query_row(
93 "SELECT content FROM blobs WHERE hash = ?1",
94 params![blob_ref.as_str()],
95 |row| row.get(0),
96 )
97 .optional()
98 .ok()
99 .flatten()?;
100 decode_artifact_blob(&bytes).or(Some(bytes))
101 }
102
103 pub(crate) fn get_typed_blob_conn<T: serde::de::DeserializeOwned>(
104 conn: &Connection,
105 blob_ref: &BlobRef,
106 ) -> Option<T> {
107 let bytes = Self::get_blob_conn(conn, blob_ref)?;
108 decode_msgpack(&bytes)
109 }
110
111 pub(crate) fn get_checkpoint_conn(
112 conn: &Connection,
113 blob_ref: &BlobRef,
114 ) -> Result<Option<HydratedSessionCheckpoint>, StoreError> {
115 let Some(bytes) = Self::get_blob_conn(conn, blob_ref) else {
116 return Ok(None);
117 };
118 let record = decode_checkpoint(&bytes)?;
119 Ok(Some(HydratedSessionCheckpoint {
120 turn_state: record.turn_state,
121 tool_state_ref: record.tool_state_ref.clone(),
122 tool_state: record
123 .tool_state_ref
124 .as_ref()
125 .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
126 plugin_snapshot_ref: record.plugin_snapshot_ref.clone(),
127 plugin_snapshot: record
128 .plugin_snapshot_ref
129 .as_ref()
130 .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
131 plugin_snapshot_revision: record.plugin_snapshot_revision,
132 execution_state_ref: record.execution_state_ref.clone(),
133 execution_state: record
134 .execution_state_ref
135 .as_ref()
136 .and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
137 }))
138 }
139
140 pub(crate) fn load_usage_deltas_conn(conn: &Connection) -> Vec<lash_core::TokenLedgerEntry> {
141 let mut stmt = match conn.prepare(
142 "SELECT source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
143 FROM usage_deltas ORDER BY seq ASC",
144 ) {
145 Ok(stmt) => stmt,
146 Err(_) => return Vec::new(),
147 };
148 let rows = match stmt.query_map([], |row| {
149 Ok(lash_core::TokenLedgerEntry {
150 source: row.get(0)?,
151 model: row.get(1)?,
152 usage: lash_core::TokenUsage {
153 input_tokens: row.get(2)?,
154 output_tokens: row.get(3)?,
155 cached_input_tokens: row.get(4)?,
156 reasoning_tokens: row.get(5)?,
157 },
158 })
159 }) {
160 Ok(rows) => rows,
161 Err(_) => return Vec::new(),
162 };
163 rows.filter_map(Result::ok).collect()
164 }
165
166 pub async fn put_blob(&self, content: &[u8]) -> BlobRef {
167 let hash = format!("{:x}", Sha256::digest(content));
168 let hash_for_row = hash.clone();
169 let content = content.to_vec();
170 let result = self
171 .conn
172 .call(move |conn| {
173 conn.execute(
174 "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
175 params![hash_for_row, content],
176 )
177 })
178 .await;
179 if let Err(err) = result {
180 tracing::warn!(error = %err, hash, "failed to persist checkpoint blob");
181 }
182 BlobRef(hash)
183 }
184
185 pub async fn put_artifact_blob(
186 &self,
187 descriptor: BlobArtifactDescriptor,
188 content: &[u8],
189 ) -> BlobRef {
190 let hash = format!("{:x}", Sha256::digest(content));
191 let stored = encode_artifact_blob(&descriptor, self.options.blob_profile, content);
192 let hash_for_row = hash.clone();
193 let result = self
194 .conn
195 .call(move |conn| {
196 conn.execute(
197 "INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
198 params![hash_for_row, stored],
199 )
200 })
201 .await;
202 if let Err(err) = result {
203 tracing::warn!(error = %err, hash, "failed to persist artifact blob");
204 }
205 BlobRef(hash)
206 }
207
208 pub async fn get_blob(&self, blob_ref: &BlobRef) -> Option<Vec<u8>> {
209 let blob_ref = blob_ref.clone();
210 self.conn
211 .call(move |conn| Ok(Self::get_blob_conn(conn, &blob_ref)))
212 .await
213 .ok()
214 .flatten()
215 }
216
217 pub async fn put_typed_blob<T: serde::Serialize>(&self, value: &T) -> BlobRef {
218 let bytes = encode_msgpack(value);
219 self.put_blob(&bytes).await
220 }
221
222 pub async fn put_typed_artifact_blob<T: serde::Serialize>(
223 &self,
224 descriptor: BlobArtifactDescriptor,
225 value: &T,
226 ) -> BlobRef {
227 let bytes = encode_msgpack(value);
228 self.put_artifact_blob(descriptor, &bytes).await
229 }
230
231 pub async fn get_typed_blob<T: serde::de::DeserializeOwned>(
232 &self,
233 blob_ref: &BlobRef,
234 ) -> Option<T> {
235 let bytes = self.get_blob(blob_ref).await?;
236 decode_msgpack(&bytes)
237 }
238
239 pub async fn put_checkpoint(
240 &self,
241 checkpoint: &HydratedSessionCheckpoint,
242 ) -> StoredSessionCheckpoint {
243 let checkpoint = checkpoint.clone();
244 let profile = self.options.blob_profile;
245 self.conn
246 .write(move |tx| Self::put_checkpoint_conn(tx, &checkpoint, profile))
247 .await
248 .expect("checkpoint blob should persist")
249 }
250
251 pub async fn get_checkpoint(&self, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
252 let blob_ref = blob_ref.clone();
253 self.conn
254 .call(move |conn| Ok(Self::get_checkpoint_conn(conn, &blob_ref)))
255 .await
256 .ok()
257 .and_then(Result::ok)
258 .flatten()
259 }
260
261 pub async fn append_usage_deltas(&self, entries: &[lash_core::TokenLedgerEntry]) {
262 if entries.is_empty() {
263 return;
264 }
265 let entries = entries.to_vec();
266 let result = self
267 .conn
268 .write(move |tx| {
269 let mut stmt = tx.prepare(
270 "INSERT INTO usage_deltas (
271 source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
272 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
273 )?;
274 for entry in &entries {
275 stmt.execute(params![
276 entry.source,
277 entry.model,
278 entry.usage.input_tokens,
279 entry.usage.output_tokens,
280 entry.usage.cached_input_tokens,
281 entry.usage.reasoning_tokens,
282 ])?;
283 }
284 Ok(())
285 })
286 .await;
287 if let Err(err) = result {
288 tracing::warn!(error = %err, "failed to persist usage deltas");
289 }
290 }
291
292 pub async fn load_usage_deltas(&self) -> Vec<lash_core::TokenLedgerEntry> {
293 self.conn
294 .call(|conn| Ok(Self::load_usage_deltas_conn(conn)))
295 .await
296 .unwrap_or_default()
297 }
298}