use super::*;
impl Store {
pub(crate) fn insert_artifact_blob_conn(
conn: &Connection,
descriptor: BlobArtifactDescriptor,
content: &[u8],
profile: BuiltinBlobProfile,
) -> rusqlite::Result<BlobRef> {
let hash = format!("{:x}", Sha256::digest(content));
let stored = encode_artifact_blob(&descriptor, profile, content);
conn.execute(
"INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
params![hash, stored],
)?;
Ok(BlobRef(hash))
}
pub(crate) fn put_typed_artifact_blob_conn<T: serde::Serialize>(
conn: &Connection,
descriptor: BlobArtifactDescriptor,
value: &T,
profile: BuiltinBlobProfile,
) -> rusqlite::Result<BlobRef> {
let bytes = encode_msgpack(value);
Self::insert_artifact_blob_conn(conn, descriptor, &bytes, profile)
}
pub(crate) fn put_checkpoint_conn(
conn: &Connection,
checkpoint: &HydratedSessionCheckpoint,
profile: BuiltinBlobProfile,
) -> rusqlite::Result<StoredSessionCheckpoint> {
let tool_state_ref = match checkpoint.tool_state.as_ref() {
Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
conn,
BlobArtifactDescriptor::tool_state_snapshot(),
snapshot,
profile,
)?),
None => checkpoint.tool_state_ref.clone(),
};
let plugin_snapshot_ref = match checkpoint.plugin_snapshot.as_ref() {
Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
conn,
BlobArtifactDescriptor::plugin_session_snapshot(),
snapshot,
profile,
)?),
None => checkpoint.plugin_snapshot_ref.clone(),
};
let execution_state_ref = match checkpoint.execution_state.as_ref() {
Some(snapshot) => Some(Self::put_typed_artifact_blob_conn(
conn,
BlobArtifactDescriptor::execution_state_snapshot(),
snapshot,
profile,
)?),
None => checkpoint.execution_state_ref.clone(),
};
let manifest = SessionCheckpoint {
turn_state: checkpoint.turn_state.clone(),
tool_state_ref,
plugin_snapshot_ref,
plugin_snapshot_revision: checkpoint.plugin_snapshot_revision,
execution_state_ref,
};
let checkpoint_ref = Self::put_typed_artifact_blob_conn(
conn,
BlobArtifactDescriptor::checkpoint_manifest(),
&manifest,
profile,
)?;
Ok(StoredSessionCheckpoint {
checkpoint_ref,
manifest,
})
}
pub(crate) fn get_blob_conn(conn: &Connection, blob_ref: &BlobRef) -> Option<Vec<u8>> {
let bytes: Vec<u8> = conn
.query_row(
"SELECT content FROM blobs WHERE hash = ?1",
params![blob_ref.as_str()],
|row| row.get(0),
)
.optional()
.ok()
.flatten()?;
decode_artifact_blob(&bytes).or(Some(bytes))
}
pub(crate) fn get_typed_blob_conn<T: serde::de::DeserializeOwned>(
conn: &Connection,
blob_ref: &BlobRef,
) -> Option<T> {
let bytes = Self::get_blob_conn(conn, blob_ref)?;
decode_msgpack(&bytes)
}
pub(crate) fn get_checkpoint_conn(
conn: &Connection,
blob_ref: &BlobRef,
) -> Option<HydratedSessionCheckpoint> {
let record: SessionCheckpoint = Self::get_typed_blob_conn(conn, blob_ref)?;
Some(HydratedSessionCheckpoint {
turn_state: record.turn_state,
tool_state_ref: record.tool_state_ref.clone(),
tool_state: record
.tool_state_ref
.as_ref()
.and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
plugin_snapshot_ref: record.plugin_snapshot_ref.clone(),
plugin_snapshot: record
.plugin_snapshot_ref
.as_ref()
.and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
plugin_snapshot_revision: record.plugin_snapshot_revision,
execution_state_ref: record.execution_state_ref.clone(),
execution_state: record
.execution_state_ref
.as_ref()
.and_then(|blob_ref| Self::get_typed_blob_conn(conn, blob_ref)),
})
}
pub(crate) fn load_usage_deltas_conn(conn: &Connection) -> Vec<lash_core::TokenLedgerEntry> {
let mut stmt = match conn.prepare(
"SELECT source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
FROM usage_deltas ORDER BY seq ASC",
) {
Ok(stmt) => stmt,
Err(_) => return Vec::new(),
};
let rows = match stmt.query_map([], |row| {
Ok(lash_core::TokenLedgerEntry {
source: row.get(0)?,
model: row.get(1)?,
usage: lash_core::TokenUsage {
input_tokens: row.get(2)?,
output_tokens: row.get(3)?,
cached_input_tokens: row.get(4)?,
reasoning_tokens: row.get(5)?,
},
})
}) {
Ok(rows) => rows,
Err(_) => return Vec::new(),
};
rows.filter_map(Result::ok).collect()
}
pub async fn put_blob(&self, content: &[u8]) -> BlobRef {
let hash = format!("{:x}", Sha256::digest(content));
let hash_for_row = hash.clone();
let content = content.to_vec();
let result = self
.conn
.call(move |conn| {
conn.execute(
"INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
params![hash_for_row, content],
)
})
.await;
if let Err(err) = result {
tracing::warn!(error = %err, hash, "failed to persist checkpoint blob");
}
BlobRef(hash)
}
pub async fn put_artifact_blob(
&self,
descriptor: BlobArtifactDescriptor,
content: &[u8],
) -> BlobRef {
let hash = format!("{:x}", Sha256::digest(content));
let stored = encode_artifact_blob(&descriptor, self.options.blob_profile, content);
let hash_for_row = hash.clone();
let result = self
.conn
.call(move |conn| {
conn.execute(
"INSERT OR IGNORE INTO blobs (hash, content) VALUES (?1, ?2)",
params![hash_for_row, stored],
)
})
.await;
if let Err(err) = result {
tracing::warn!(error = %err, hash, "failed to persist artifact blob");
}
BlobRef(hash)
}
pub async fn get_blob(&self, blob_ref: &BlobRef) -> Option<Vec<u8>> {
let blob_ref = blob_ref.clone();
self.conn
.call(move |conn| Ok(Self::get_blob_conn(conn, &blob_ref)))
.await
.ok()
.flatten()
}
pub async fn put_typed_blob<T: serde::Serialize>(&self, value: &T) -> BlobRef {
let bytes = encode_msgpack(value);
self.put_blob(&bytes).await
}
pub async fn put_typed_artifact_blob<T: serde::Serialize>(
&self,
descriptor: BlobArtifactDescriptor,
value: &T,
) -> BlobRef {
let bytes = encode_msgpack(value);
self.put_artifact_blob(descriptor, &bytes).await
}
pub async fn get_typed_blob<T: serde::de::DeserializeOwned>(
&self,
blob_ref: &BlobRef,
) -> Option<T> {
let bytes = self.get_blob(blob_ref).await?;
decode_msgpack(&bytes)
}
pub async fn put_checkpoint(
&self,
checkpoint: &HydratedSessionCheckpoint,
) -> StoredSessionCheckpoint {
let checkpoint = checkpoint.clone();
let profile = self.options.blob_profile;
self.conn
.write(move |tx| Self::put_checkpoint_conn(tx, &checkpoint, profile))
.await
.expect("checkpoint blob should persist")
}
pub async fn get_checkpoint(&self, blob_ref: &BlobRef) -> Option<HydratedSessionCheckpoint> {
let blob_ref = blob_ref.clone();
self.conn
.call(move |conn| Ok(Self::get_checkpoint_conn(conn, &blob_ref)))
.await
.ok()
.flatten()
}
pub async fn append_usage_deltas(&self, entries: &[lash_core::TokenLedgerEntry]) {
if entries.is_empty() {
return;
}
let entries = entries.to_vec();
let result = self
.conn
.write(move |tx| {
let mut stmt = tx.prepare(
"INSERT INTO usage_deltas (
source, model, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens
) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
)?;
for entry in &entries {
stmt.execute(params![
entry.source,
entry.model,
entry.usage.input_tokens,
entry.usage.output_tokens,
entry.usage.cached_input_tokens,
entry.usage.reasoning_tokens,
])?;
}
Ok(())
})
.await;
if let Err(err) = result {
tracing::warn!(error = %err, "failed to persist usage deltas");
}
}
pub async fn load_usage_deltas(&self) -> Vec<lash_core::TokenLedgerEntry> {
self.conn
.call(|conn| Ok(Self::load_usage_deltas_conn(conn)))
.await
.unwrap_or_default()
}
}