use std::path::Path;
use std::sync::Once;
use async_trait::async_trait;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use sqlx::{Row, SqlitePool};
#[allow(unsafe_code)]
fn ensure_sqlite_vec_loaded() {
static ONCE: Once = Once::new();
ONCE.call_once(|| {
unsafe {
libsqlite3_sys::sqlite3_auto_extension(Some(std::mem::transmute::<
unsafe extern "C" fn(),
unsafe extern "C" fn(
*mut libsqlite3_sys::sqlite3,
*mut *mut std::ffi::c_char,
*const libsqlite3_sys::sqlite3_api_routines,
) -> std::ffi::c_int,
>(
sqlite_vec::sqlite3_vec_init
)));
}
});
}
use crate::attribute::AttributeValue;
use crate::content::ContentHash;
use crate::error::{Error, Result};
use crate::link::{Link, LinkKind};
use crate::memory::MemoryId;
use crate::metadata::{
AppendMemoryOutcome, AppendMemoryRequest, AuditEntry, InsertSummaryRequest, MemoryRow,
MetadataStore, MigrationStateRow, SchemaMeta, SnapshotRow, SummaryRow,
};
use crate::partition::PartitionPath;
use crate::summarizer::SummaryStyle;
use crate::summary::content::SummaryInputRange;
use crate::summary::{Scope, SummaryId, SummarySubject};
#[derive(Debug, Clone)]
pub struct SqliteMetadata {
pool: SqlitePool,
id: String,
}
impl SqliteMetadata {
pub async fn connect_file(path: impl AsRef<Path>) -> Result<Self> {
ensure_sqlite_vec_loaded();
let path = path.as_ref();
let opts = SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true)
.pragma("page_size", "16384")
.pragma("auto_vacuum", "INCREMENTAL")
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.pragma("mmap_size", "268435456")
.pragma("cache_size", "-65536")
.pragma("temp_store", "MEMORY")
.pragma("wal_autocheckpoint", "1000")
.pragma("journal_size_limit", "67108864")
.foreign_keys(true);
let pool = SqlitePoolOptions::new()
.max_connections(8)
.connect_with(opts)
.await
.map_err(|e| Error::metadata("sqlite connect", e))?;
Ok(Self {
pool,
id: format!("sqlite:{}", path.display()),
})
}
pub async fn connect_memory() -> Result<Self> {
ensure_sqlite_vec_loaded();
let opts = SqliteConnectOptions::new()
.filename(":memory:")
.in_memory(true)
.pragma("temp_store", "MEMORY")
.foreign_keys(true);
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(opts)
.await
.map_err(|e| Error::metadata("sqlite connect (memory)", e))?;
Ok(Self {
pool,
id: "sqlite::memory:".to_string(),
})
}
fn map_sqlx<T>(label: &'static str, r: std::result::Result<T, sqlx::Error>) -> Result<T> {
r.map_err(|e| Error::metadata(label, e))
}
async fn index_table_exists<'a, E>(executor: E, name: &str) -> bool
where
E: sqlx::Executor<'a, Database = sqlx::Sqlite>,
{
let row = sqlx::query_scalar::<_, i64>(
"SELECT count(*) FROM sqlite_master WHERE type IN ('table','view') AND name = ?1",
)
.bind(name)
.fetch_one(executor)
.await;
matches!(row, Ok(n) if n > 0)
}
#[must_use]
pub fn pool(&self) -> &SqlitePool {
&self.pool
}
}
#[async_trait]
impl MetadataStore for SqliteMetadata {
fn id(&self) -> String {
self.id.clone()
}
fn sqlite_pool(&self) -> Option<&SqlitePool> {
Some(&self.pool)
}
async fn migrate(&self) -> Result<()> {
let has_column: i64 = sqlx::query_scalar(
"SELECT count(*) FROM pragma_table_info('memory') \
WHERE name = 'inline_summary'",
)
.fetch_one(&self.pool)
.await
.unwrap_or(0);
if has_column > 0 {
let stragglers: i64 =
sqlx::query_scalar("SELECT count(*) FROM memory WHERE inline_summary IS NOT NULL")
.fetch_one(&self.pool)
.await
.unwrap_or(0);
if stragglers > 0 {
return Err(Error::metadata(
"drop inline_summary",
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"{stragglers} memory rows still carry a non-NULL \
inline_summary; open the store once with kiromi-ai-memory \
0.0.x to drain into first-class summaries before upgrading"
),
),
));
}
}
sqlx::migrate!("./migrations")
.run(&self.pool)
.await
.map_err(|e| Error::metadata("apply migrations", e))?;
Ok(())
}
async fn create_indices_if_missing(&self, dim: usize) -> Result<()> {
let memory_vec_sql = format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS memory_vec USING vec0(\
memory_id TEXT PRIMARY KEY,\
partition_path TEXT,\
kind TEXT,\
embedding FLOAT[{dim}]\
)"
);
sqlx::query(&memory_vec_sql)
.execute(&self.pool)
.await
.map_err(|e| Error::metadata("create memory_vec", e))?;
let summary_vec_sql = format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS summary_vec USING vec0(\
summary_id TEXT PRIMARY KEY,\
parent_path TEXT,\
style TEXT,\
embedding FLOAT[{dim}]\
)"
);
sqlx::query(&summary_vec_sql)
.execute(&self.pool)
.await
.map_err(|e| Error::metadata("create summary_vec", e))?;
sqlx::query(
"CREATE VIRTUAL TABLE IF NOT EXISTS memory_fts USING fts5(\
content,\
memory_id UNINDEXED,\
partition_path UNINDEXED,\
tokenize = 'porter unicode61 remove_diacritics 1'\
)",
)
.execute(&self.pool)
.await
.map_err(|e| Error::metadata("create memory_fts", e))?;
sqlx::query(
"CREATE VIRTUAL TABLE IF NOT EXISTS summary_fts USING fts5(\
content,\
summary_id UNINDEXED,\
parent_path UNINDEXED,\
tokenize = 'porter unicode61 remove_diacritics 1'\
)",
)
.execute(&self.pool)
.await
.map_err(|e| Error::metadata("create summary_fts", e))?;
Ok(())
}
async fn read_schema_meta(&self) -> Result<Option<SchemaMeta>> {
let row = sqlx::query(
"SELECT partition_scheme, scheme_version, embedder_id, embedder_dims, created_at FROM schema_meta WHERE id = 1",
)
.fetch_optional(&self.pool)
.await;
let row = Self::map_sqlx("read schema_meta", row)?;
Ok(row.map(|r| SchemaMeta {
partition_scheme: r.get::<String, _>("partition_scheme"),
scheme_version: r.get::<i64, _>("scheme_version"),
embedder_id: r.get::<Option<String>, _>("embedder_id"),
embedder_dims: r.get::<Option<i64>, _>("embedder_dims"),
created_at_ms: r.get::<i64, _>("created_at"),
}))
}
async fn write_schema_meta(&self, meta: &SchemaMeta) -> Result<()> {
let res = sqlx::query(
"INSERT INTO schema_meta(id, partition_scheme, scheme_version, embedder_id, embedder_dims, created_at)
VALUES (1, ?1, ?2, ?3, ?4, ?5)
ON CONFLICT(id) DO UPDATE SET
partition_scheme = excluded.partition_scheme,
scheme_version = excluded.scheme_version,
embedder_id = excluded.embedder_id,
embedder_dims = excluded.embedder_dims",
)
.bind(&meta.partition_scheme)
.bind(meta.scheme_version)
.bind(meta.embedder_id.as_deref())
.bind(meta.embedder_dims)
.bind(meta.created_at_ms)
.execute(&self.pool)
.await;
Self::map_sqlx("insert schema_meta", res).map(|_| ())
}
async fn read_migration_state(&self, name: &str) -> Result<Option<MigrationStateRow>> {
let row = sqlx::query(
"SELECT name, status, cursor, started_at, finished_at FROM migration_state WHERE name = ?1",
)
.bind(name)
.fetch_optional(&self.pool)
.await;
let row = Self::map_sqlx("read migration_state", row)?;
Ok(row.map(|r| MigrationStateRow {
name: r.get::<String, _>("name"),
status: r.get::<String, _>("status"),
cursor: r.get::<Option<String>, _>("cursor"),
started_at_ms: r.get::<i64, _>("started_at"),
finished_at_ms: r.get::<Option<i64>, _>("finished_at"),
}))
}
async fn upsert_migration_state(&self, row: MigrationStateRow) -> Result<()> {
let res = sqlx::query(
"INSERT INTO migration_state(name, status, cursor, started_at, finished_at)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(name) DO UPDATE SET
status = excluded.status,
cursor = excluded.cursor,
started_at = excluded.started_at,
finished_at = excluded.finished_at",
)
.bind(&row.name)
.bind(&row.status)
.bind(row.cursor.as_deref())
.bind(row.started_at_ms)
.bind(row.finished_at_ms)
.execute(&self.pool)
.await;
Self::map_sqlx("upsert migration_state", res).map(|_| ())
}
async fn set_scheme_version(&self, version: i64) -> Result<()> {
let res = sqlx::query("UPDATE schema_meta SET scheme_version = ?1 WHERE id = 1")
.bind(version)
.execute(&self.pool)
.await;
Self::map_sqlx("set scheme_version", res).map(|_| ())
}
async fn ensure_partition_chain(
&self,
path: &PartitionPath,
is_leaf: bool,
ts_ms: i64,
) -> Result<()> {
let mut chain: Vec<PartitionPath> = path.ancestors().collect();
chain.reverse();
chain.push(path.clone());
let depth = chain.len();
let mut tx = Self::map_sqlx("begin partition tx", self.pool.begin().await)?;
for (i, p) in chain.iter().enumerate() {
let parent = if i == 0 {
None
} else {
Some(chain[i - 1].as_str())
};
let leaf = if i + 1 == depth {
i64::from(is_leaf)
} else {
0
};
let res = sqlx::query(
"INSERT OR IGNORE INTO partition(path, parent_path, level, is_leaf, created_at)
VALUES (?1, ?2, ?3, ?4, ?5)",
)
.bind(p.as_str())
.bind(parent)
.bind(i64::try_from(i).unwrap_or(0))
.bind(leaf)
.bind(ts_ms)
.execute(&mut *tx)
.await;
Self::map_sqlx("insert partition", res)?;
}
Self::map_sqlx("commit partition tx", tx.commit().await)
}
async fn append_memory(&self, req: AppendMemoryRequest) -> Result<AppendMemoryOutcome> {
let row = req.row;
let memory_vec_exists = Self::index_table_exists(&self.pool, "memory_vec").await;
let memory_fts_exists = Self::index_table_exists(&self.pool, "memory_fts").await;
let mut tx = Self::map_sqlx("begin append tx", self.pool.begin().await)?;
let res = sqlx::query(
"INSERT INTO memory(id, partition_path, data_path, content_kind, content_hash, bytes, embedder_id, tombstoned, created_at, updated_at, valid_from_ms, valid_until_ms, kind, embedding_blob)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 0, ?8, ?9, ?10, ?11, ?12, ?13)",
)
.bind(row.id.to_string())
.bind(row.partition_path.as_str())
.bind(&row.data_path)
.bind(&row.content_kind)
.bind(row.content_hash.as_bytes().as_slice())
.bind(row.bytes)
.bind(&row.embedder_id)
.bind(row.created_at_ms)
.bind(row.updated_at_ms)
.bind(row.valid_from_ms)
.bind(row.valid_until_ms)
.bind(row.kind.map(crate::memory::MemoryKind::as_persisted_str))
.bind(req.embedding_blob.as_slice())
.execute(&mut *tx)
.await;
Self::map_sqlx("insert memory", res)?;
if memory_vec_exists && !req.embedding_blob.is_empty() {
let res = sqlx::query(
"INSERT OR REPLACE INTO memory_vec(memory_id, partition_path, kind, embedding) \
VALUES (?1, ?2, ?3, ?4)",
)
.bind(row.id.to_string())
.bind(row.partition_path.as_str())
.bind(row.kind.map(crate::memory::MemoryKind::as_persisted_str))
.bind(req.embedding_blob.as_slice())
.execute(&mut *tx)
.await;
Self::map_sqlx("insert memory_vec", res)?;
}
if memory_fts_exists {
let res = sqlx::query("DELETE FROM memory_fts WHERE memory_id = ?1")
.bind(row.id.to_string())
.execute(&mut *tx)
.await;
Self::map_sqlx("delete memory_fts (idempotent)", res)?;
let res = sqlx::query(
"INSERT INTO memory_fts(content, memory_id, partition_path) VALUES (?1, ?2, ?3)",
)
.bind(&req.content_for_index)
.bind(row.id.to_string())
.bind(row.partition_path.as_str())
.execute(&mut *tx)
.await;
Self::map_sqlx("insert memory_fts", res)?;
}
for dst in &req.explicit_links {
for (a, b) in [(row.id, dst.id), (dst.id, row.id)] {
let res = sqlx::query(
"INSERT OR IGNORE INTO node_link(src_kind, src_id, dst_kind, dst_id, kind, created_at, note) \
VALUES ('memory', ?1, 'memory', ?2, ?3, ?4, NULL)",
)
.bind(a.to_string())
.bind(b.to_string())
.bind(LinkKind::Explicit.as_persisted_str())
.bind(req.audit.ts_ms)
.execute(&mut *tx)
.await;
Self::map_sqlx("insert link", res)?;
}
}
let audit_seq = insert_audit(&mut tx, &req.audit).await?;
Self::map_sqlx("commit append tx", tx.commit().await)?;
Ok(AppendMemoryOutcome { audit_seq })
}
async fn get_memory(&self, id: &MemoryId) -> Result<Option<MemoryRow>> {
let row = sqlx::query(
"SELECT id, partition_path, data_path, content_kind, content_hash, bytes, embedder_id, tombstoned, created_at, updated_at, valid_from_ms, valid_until_ms, kind
FROM memory WHERE id = ?1",
)
.bind(id.to_string())
.fetch_optional(&self.pool)
.await;
let row = Self::map_sqlx("get memory", row)?;
row.map(row_to_memory).transpose()
}
async fn list_memories(
&self,
path: &PartitionPath,
limit: u32,
cursor: Option<&str>,
include_tombstoned: bool,
) -> Result<(Vec<MemoryRow>, Option<String>)> {
let lim = if limit == 0 {
1_000_000
} else {
i64::from(limit)
};
let cursor_clause = if cursor.is_some() { "AND id > ?3" } else { "" };
let tombstone_clause = if include_tombstoned {
""
} else {
"AND tombstoned = 0"
};
let sql = format!(
"SELECT id, partition_path, data_path, content_kind, content_hash, bytes, embedder_id, tombstoned, created_at, updated_at, valid_from_ms, valid_until_ms, kind
FROM memory
WHERE partition_path = ?1 {tombstone_clause} {cursor_clause}
ORDER BY id
LIMIT ?2"
);
let mut q = sqlx::query(&sql).bind(path.as_str()).bind(lim);
if let Some(c) = cursor {
q = q.bind(c.to_string());
}
let rows = Self::map_sqlx("list memories", q.fetch_all(&self.pool).await)?;
let items: Vec<MemoryRow> = rows
.into_iter()
.map(row_to_memory)
.collect::<Result<Vec<_>>>()?;
let next_cursor = if i64::try_from(items.len()).unwrap_or(0) >= lim {
items.last().map(|m| m.id.to_string())
} else {
None
};
Ok((items, next_cursor))
}
async fn tombstone_memory(&self, id: &MemoryId, audit: AuditEntry) -> Result<()> {
let mut tx = Self::map_sqlx("begin tombstone tx", self.pool.begin().await)?;
let res = sqlx::query("UPDATE memory SET tombstoned = 1, updated_at = ?1 WHERE id = ?2")
.bind(audit.ts_ms)
.bind(id.to_string())
.execute(&mut *tx)
.await;
let result = Self::map_sqlx("tombstone memory", res)?;
if result.rows_affected() == 0 {
return Err(Error::MemoryNotFound(id.to_string()));
}
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit tombstone tx", tx.commit().await)
}
async fn tombstone_partition(&self, path: &PartitionPath, audit: AuditEntry) -> Result<u64> {
let prefix = format!("{}/%", path.as_str());
let mut tx = Self::map_sqlx("begin partition tombstone tx", self.pool.begin().await)?;
let res = sqlx::query(
"UPDATE memory SET tombstoned = 1, updated_at = ?1
WHERE (partition_path = ?2 OR partition_path LIKE ?3) AND tombstoned = 0",
)
.bind(audit.ts_ms)
.bind(path.as_str())
.bind(prefix)
.execute(&mut *tx)
.await;
let result = Self::map_sqlx("tombstone partition", res)?;
let n = result.rows_affected();
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit partition tombstone tx", tx.commit().await)?;
Ok(n)
}
async fn add_link(&self, link: &Link, audit: AuditEntry) -> Result<()> {
if link.src == link.dst {
return Err(Error::LinkInvalid("self-link not allowed".into()));
}
let mut tx = Self::map_sqlx("begin add link tx", self.pool.begin().await)?;
let kind_tag = link.kind.as_persisted_str();
for (a, b) in [(link.src, link.dst), (link.dst, link.src)] {
let res = sqlx::query(
"INSERT OR IGNORE INTO node_link(src_kind, src_id, dst_kind, dst_id, kind, created_at, note) \
VALUES ('memory', ?1, 'memory', ?2, ?3, ?4, NULL)",
)
.bind(a.to_string())
.bind(b.to_string())
.bind(kind_tag)
.bind(link.created_at_ms)
.execute(&mut *tx)
.await;
Self::map_sqlx("insert link", res)?;
}
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit add link tx", tx.commit().await)
}
async fn remove_link(&self, src: &MemoryId, dst: &MemoryId, audit: AuditEntry) -> Result<()> {
let mut tx = Self::map_sqlx("begin remove link tx", self.pool.begin().await)?;
for (a, b) in [(src, dst), (dst, src)] {
let res = sqlx::query(
"DELETE FROM node_link \
WHERE src_kind = 'memory' AND src_id = ?1 AND dst_kind = 'memory' AND dst_id = ?2",
)
.bind(a.to_string())
.bind(b.to_string())
.execute(&mut *tx)
.await;
Self::map_sqlx("delete link", res)?;
}
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit remove link tx", tx.commit().await)
}
async fn list_leaf_partitions(&self) -> Result<Vec<PartitionPath>> {
let rows = sqlx::query("SELECT path FROM partition WHERE is_leaf = 1")
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("list leaves", rows)?;
Ok(rows
.into_iter()
.filter_map(|r| r.get::<String, _>("path").parse().ok())
.collect())
}
async fn audit_since(
&self,
partition: &PartitionPath,
since: i64,
) -> Result<Vec<crate::metadata::AuditReplayRow>> {
let rows = sqlx::query(
"SELECT seq, op, memory_id FROM audit_log \
WHERE seq > ? AND partition_path = ? AND op IN ('append','delete') \
ORDER BY seq ASC",
)
.bind(since)
.bind(partition.as_str())
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("audit_since", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let op = match r.get::<String, _>("op").as_str() {
"append" => crate::audit::AuditOp::Append,
"delete" => crate::audit::AuditOp::Delete,
other => {
return Err(Error::Recovery(format!("unexpected audit op {other}")));
}
};
let id_str: String = r.get("memory_id");
let memory_id: MemoryId = id_str
.parse()
.map_err(|e: ulid::DecodeError| Error::Recovery(format!("ulid: {e}")))?;
out.push(crate::metadata::AuditReplayRow {
seq: r.get::<i64, _>("seq"),
op,
memory_id,
});
}
Ok(out)
}
async fn lookup_for_recovery(&self, id: &MemoryId) -> Result<Option<MemoryRow>> {
let row = sqlx::query(
"SELECT id, partition_path, data_path, content_kind, content_hash, bytes, \
embedder_id, tombstoned, created_at, updated_at, \
valid_from_ms, valid_until_ms, kind \
FROM memory WHERE id = ?",
)
.bind(id.to_string())
.fetch_optional(&self.pool)
.await;
let row = Self::map_sqlx("lookup_for_recovery", row)?;
row.map(row_to_memory).transpose()
}
async fn regenerate_memory_embedding(
&self,
id: &MemoryId,
partition_path: &PartitionPath,
kind: Option<crate::memory::MemoryKind>,
embedding_blob: &[u8],
new_embedder_id: &str,
audit: AuditEntry,
) -> Result<()> {
let memory_vec_exists = Self::index_table_exists(&self.pool, "memory_vec").await;
let mut tx = Self::map_sqlx("begin regen embedding tx", self.pool.begin().await)?;
let res = sqlx::query(
"UPDATE memory SET embedding_blob = ?1, embedder_id = ?2, updated_at = ?3 \
WHERE id = ?4 AND tombstoned = 0",
)
.bind(embedding_blob)
.bind(new_embedder_id)
.bind(audit.ts_ms)
.bind(id.to_string())
.execute(&mut *tx)
.await;
let r = Self::map_sqlx("regen embedding update", res)?;
if r.rows_affected() == 0 {
return Err(Error::MemoryNotFound(id.to_string()));
}
if memory_vec_exists && !embedding_blob.is_empty() {
let res = sqlx::query("DELETE FROM memory_vec WHERE memory_id = ?1")
.bind(id.to_string())
.execute(&mut *tx)
.await;
Self::map_sqlx("regen embedding delete memory_vec", res)?;
let res = sqlx::query(
"INSERT INTO memory_vec(memory_id, partition_path, kind, embedding) \
VALUES (?1, ?2, ?3, ?4)",
)
.bind(id.to_string())
.bind(partition_path.as_str())
.bind(kind.map(crate::memory::MemoryKind::as_persisted_str))
.bind(embedding_blob)
.execute(&mut *tx)
.await;
Self::map_sqlx("regen embedding insert memory_vec", res)?;
}
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit regen embedding tx", tx.commit().await)
}
async fn list_embeddings_for_partition(
&self,
path: &PartitionPath,
dims: usize,
) -> Result<Vec<(MemoryId, Vec<f32>)>> {
let rows = sqlx::query(
"SELECT id, embedding_blob FROM memory \
WHERE partition_path = ?1 \
AND tombstoned = 0 \
AND embedding_blob IS NOT NULL \
ORDER BY id",
)
.bind(path.as_str())
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("list embeddings for partition", rows)?;
let want = dims.checked_mul(4).unwrap_or(0);
let mut out: Vec<(MemoryId, Vec<f32>)> = Vec::with_capacity(rows.len());
for r in rows {
let id_str: String = r.get("id");
let id: MemoryId = id_str.parse().map_err(|e: ulid::DecodeError| {
Error::IndexCorrupt(format!("decode embedding id {id_str}: {e}"))
})?;
let blob: Vec<u8> = r.get("embedding_blob");
if blob.len() != want {
return Err(Error::IndexCorrupt(format!(
"embedding_blob for {id} has {} bytes, expected {want}",
blob.len()
)));
}
let v: &[f32] = bytemuck::try_cast_slice(&blob)
.map_err(|e| Error::IndexCorrupt(format!("embedding_blob for {id} cast: {e}")))?;
out.push((id, v.to_vec()));
}
Ok(out)
}
async fn get_memory_embedding(&self, id: &MemoryId, dims: usize) -> Result<Option<Vec<f32>>> {
let row = sqlx::query(
"SELECT embedding_blob FROM memory \
WHERE id = ?1 AND tombstoned = 0",
)
.bind(id.to_string())
.fetch_optional(&self.pool)
.await;
let row = Self::map_sqlx("get memory embedding", row)?;
let Some(row) = row else { return Ok(None) };
let blob: Option<Vec<u8>> = row.try_get("embedding_blob").ok();
let Some(blob) = blob else { return Ok(None) };
if blob.is_empty() {
return Ok(None);
}
let want = dims.checked_mul(4).unwrap_or(0);
if blob.len() != want {
return Err(Error::IndexCorrupt(format!(
"embedding_blob for {id} has {} bytes, expected {want}",
blob.len()
)));
}
let v: &[f32] = bytemuck::try_cast_slice(&blob)
.map_err(|e| Error::IndexCorrupt(format!("embedding_blob for {id} cast: {e}")))?;
Ok(Some(v.to_vec()))
}
async fn links_of(&self, id: &MemoryId) -> Result<Vec<Link>> {
let rows = sqlx::query(
"SELECT src_id, dst_id, kind, created_at FROM node_link \
WHERE src_kind = 'memory' AND dst_kind = 'memory' AND src_id = ?1 \
ORDER BY created_at, dst_id",
)
.bind(id.to_string())
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("links of", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
out.push(Link {
src: r
.get::<String, _>("src_id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode src_id", io_invalid(&e.to_string()))
})?,
dst: r
.get::<String, _>("dst_id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode dst_id", io_invalid(&e.to_string()))
})?,
kind: LinkKind::from_persisted(&r.get::<String, _>("kind")),
created_at_ms: r.get::<i64, _>("created_at"),
});
}
Ok(out)
}
async fn insert_summary(&self, req: InsertSummaryRequest) -> Result<()> {
let InsertSummaryRequest {
row,
audit,
embedding_blob,
content_for_index,
parent_path,
} = req;
let summary_vec_exists = Self::index_table_exists(&self.pool, "summary_vec").await;
let summary_fts_exists = Self::index_table_exists(&self.pool, "summary_fts").await;
let mut tx = Self::map_sqlx("begin insert_summary tx", self.pool.begin().await)?;
let inputs_json = serde_json::to_string(&row.inputs).unwrap_or_else(|_| "[]".to_string());
let res = sqlx::query(
"INSERT INTO summary(id, subject_kind, subject_path, subject_memory, style, version, data_path, content_hash, bytes, summarizer_id, inputs, superseded_by, tombstoned, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, 0, ?13)",
)
.bind(row.id.to_string())
.bind(&row.subject_kind)
.bind(row.subject_path.as_ref().map(|p| p.as_str()))
.bind(row.subject_memory.map(|m| m.to_string()))
.bind(&row.style)
.bind(row.version)
.bind(&row.data_path)
.bind(row.content_hash.as_bytes().as_slice())
.bind(row.bytes)
.bind(&row.summarizer_id)
.bind(inputs_json)
.bind(row.superseded_by.map(|s| s.to_string()))
.bind(row.created_at_ms)
.execute(&mut *tx)
.await;
Self::map_sqlx("insert summary", res)?;
if summary_vec_exists
&& let Some(blob) = embedding_blob.as_ref()
&& !blob.is_empty()
{
let res = sqlx::query(
"INSERT OR REPLACE INTO summary_vec(summary_id, parent_path, style, embedding) \
VALUES (?1, ?2, ?3, ?4)",
)
.bind(row.id.to_string())
.bind(&parent_path)
.bind(&row.style)
.bind(blob.as_slice())
.execute(&mut *tx)
.await;
Self::map_sqlx("insert summary_vec", res)?;
}
if summary_fts_exists {
let res = sqlx::query("DELETE FROM summary_fts WHERE summary_id = ?1")
.bind(row.id.to_string())
.execute(&mut *tx)
.await;
Self::map_sqlx("delete summary_fts (idempotent)", res)?;
let res = sqlx::query(
"INSERT INTO summary_fts(content, summary_id, parent_path) VALUES (?1, ?2, ?3)",
)
.bind(&content_for_index)
.bind(row.id.to_string())
.bind(&parent_path)
.execute(&mut *tx)
.await;
Self::map_sqlx("insert summary_fts", res)?;
}
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit insert_summary tx", tx.commit().await)
}
async fn get_summary(&self, id: &SummaryId) -> Result<Option<SummaryRow>> {
let row = sqlx::query(
"SELECT id, subject_kind, subject_path, subject_memory, style, version, data_path, content_hash, bytes, summarizer_id, inputs, superseded_by, tombstoned, created_at
FROM summary WHERE id = ?1",
)
.bind(id.to_string())
.fetch_optional(&self.pool)
.await;
let row = Self::map_sqlx("get summary", row)?;
row.map(row_to_summary).transpose()
}
async fn list_summaries_of(&self, subject: &SummarySubject) -> Result<Vec<SummaryRow>> {
let kind = subject.kind_str();
let path = subject.partition_path().map(|p| p.as_str().to_string());
let mid = subject.memory_id().map(|m| m.to_string());
let rows = sqlx::query(
"SELECT id, subject_kind, subject_path, subject_memory, style, version, data_path, content_hash, bytes, summarizer_id, inputs, superseded_by, tombstoned, created_at
FROM summary
WHERE subject_kind = ?1
AND ((?2 IS NULL AND subject_path IS NULL) OR subject_path = ?2)
AND ((?3 IS NULL AND subject_memory IS NULL) OR subject_memory = ?3)
AND tombstoned = 0
ORDER BY version DESC, created_at DESC",
)
.bind(kind)
.bind(path)
.bind(mid)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("list_summaries_of", rows)?;
rows.into_iter().map(row_to_summary).collect()
}
async fn latest_summary(
&self,
subject: &SummarySubject,
style: &SummaryStyle,
) -> Result<Option<SummaryRow>> {
let kind = subject.kind_str();
let path = subject.partition_path().map(|p| p.as_str().to_string());
let mid = subject.memory_id().map(|m| m.to_string());
let style_tag = style.as_str().into_owned();
let row = sqlx::query(
"SELECT id, subject_kind, subject_path, subject_memory, style, version, data_path, content_hash, bytes, summarizer_id, inputs, superseded_by, tombstoned, created_at
FROM summary
WHERE subject_kind = ?1
AND ((?2 IS NULL AND subject_path IS NULL) OR subject_path = ?2)
AND ((?3 IS NULL AND subject_memory IS NULL) OR subject_memory = ?3)
AND style = ?4
AND tombstoned = 0
AND superseded_by IS NULL
ORDER BY version DESC LIMIT 1",
)
.bind(kind)
.bind(path)
.bind(mid)
.bind(style_tag)
.fetch_optional(&self.pool)
.await;
let row = Self::map_sqlx("latest_summary", row)?;
row.map(row_to_summary).transpose()
}
async fn latest_memory_summaries_batch(
&self,
memory_ids: &[crate::memory::MemoryId],
_partition_paths: &[PartitionPath],
style: &SummaryStyle,
) -> Result<std::collections::BTreeMap<crate::memory::MemoryId, SummaryRow>> {
let mut out = std::collections::BTreeMap::new();
if memory_ids.is_empty() {
return Ok(out);
}
let style_tag = style.as_str().into_owned();
const CHUNK: usize = 256;
for chunk in memory_ids.chunks(CHUNK) {
let placeholders = std::iter::repeat_n("?", chunk.len())
.collect::<Vec<_>>()
.join(",");
let sql = format!(
"SELECT id, subject_kind, subject_path, subject_memory, style, version, data_path, content_hash, bytes, summarizer_id, inputs, superseded_by, tombstoned, created_at \
FROM summary s1 \
WHERE subject_kind = 'memory' \
AND subject_memory IN ({placeholders}) \
AND style = ? \
AND tombstoned = 0 \
AND superseded_by IS NULL \
AND version = (SELECT MAX(version) FROM summary s2 \
WHERE s2.subject_memory = s1.subject_memory \
AND s2.style = s1.style \
AND s2.tombstoned = 0 \
AND s2.superseded_by IS NULL)",
);
let mut q = sqlx::query(&sql);
for id in chunk {
q = q.bind(id.to_string());
}
q = q.bind(&style_tag);
let rows = q.fetch_all(&self.pool).await;
let rows = Self::map_sqlx("latest_memory_summaries_batch", rows)?;
for r in rows {
let summary = row_to_summary(r)?;
if let Some(mid) = summary.subject_memory {
out.insert(
mid,
SummaryRow {
subject_memory: Some(mid),
..summary
},
);
}
}
}
Ok(out)
}
async fn supersede_summary(&self, prior_id: &SummaryId, new_id: &SummaryId) -> Result<()> {
let res = sqlx::query("UPDATE summary SET superseded_by = ?1 WHERE id = ?2")
.bind(new_id.to_string())
.bind(prior_id.to_string())
.execute(&self.pool)
.await;
Self::map_sqlx("supersede summary", res).map(|_| ())
}
async fn delete_summary(&self, id: &SummaryId, audit: AuditEntry) -> Result<()> {
let mut tx = Self::map_sqlx("begin delete_summary tx", self.pool.begin().await)?;
let res = sqlx::query("UPDATE summary SET tombstoned = 1 WHERE id = ?1")
.bind(id.to_string())
.execute(&mut *tx)
.await;
Self::map_sqlx("tombstone summary", res)?;
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit delete_summary tx", tx.commit().await)
}
async fn mark_summary_stale(&self, paths: &[PartitionPath]) -> Result<()> {
if paths.is_empty() {
return Ok(());
}
let mut tx = Self::map_sqlx("begin mark_summary_stale tx", self.pool.begin().await)?;
for p in paths {
let res = sqlx::query("UPDATE partition SET summary_stale = 1 WHERE path = ?1")
.bind(p.as_str())
.execute(&mut *tx)
.await;
Self::map_sqlx("mark_summary_stale", res)?;
}
Self::map_sqlx("commit mark_summary_stale tx", tx.commit().await)
}
async fn clear_summary_stale(&self, path: &PartitionPath) -> Result<()> {
let res = sqlx::query("UPDATE partition SET summary_stale = 0 WHERE path = ?1")
.bind(path.as_str())
.execute(&self.pool)
.await;
Self::map_sqlx("clear_summary_stale", res).map(|_| ())
}
async fn mark_child_index_stale(&self, paths: &[PartitionPath]) -> Result<()> {
if paths.is_empty() {
return Ok(());
}
let mut tx = Self::map_sqlx("begin mark_child_index_stale tx", self.pool.begin().await)?;
for p in paths {
let res = sqlx::query("UPDATE partition SET child_index_stale = 1 WHERE path = ?1")
.bind(p.as_str())
.execute(&mut *tx)
.await;
Self::map_sqlx("mark_child_index_stale", res)?;
}
Self::map_sqlx("commit mark_child_index_stale tx", tx.commit().await)
}
async fn children_of(&self, p: &PartitionPath) -> Result<Vec<PartitionPath>> {
let rows = sqlx::query("SELECT path FROM partition WHERE parent_path = ?1 ORDER BY path")
.bind(p.as_str())
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("children_of", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let path: PartitionPath = r.get::<String, _>("path").parse().map_err(|e| {
Error::metadata("decode children path", io_invalid(&format!("{e}")))
})?;
out.push(path);
}
Ok(out)
}
async fn top_level_partitions(&self) -> Result<Vec<PartitionPath>> {
let rows = sqlx::query("SELECT path FROM partition WHERE level = 0 ORDER BY path")
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("top_level_partitions", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let path: PartitionPath = r
.get::<String, _>("path")
.parse()
.map_err(|e| Error::metadata("decode top path", io_invalid(&format!("{e}"))))?;
out.push(path);
}
Ok(out)
}
async fn partition_is_leaf(&self, p: &PartitionPath) -> Result<bool> {
let row = sqlx::query("SELECT is_leaf FROM partition WHERE path = ?1")
.bind(p.as_str())
.fetch_optional(&self.pool)
.await;
let row = Self::map_sqlx("partition_is_leaf", row)?;
Ok(row
.map(|r| r.get::<i64, _>("is_leaf") != 0)
.unwrap_or(false))
}
async fn subjects_needing_summary(
&self,
scope: &Scope,
_style: &SummaryStyle,
) -> Result<Vec<SummarySubject>> {
let rows = sqlx::query(
"SELECT path, level FROM partition WHERE summary_stale = 1 ORDER BY level DESC, path ASC",
)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("subjects_needing_summary", rows)?;
let mut out: Vec<SummarySubject> = Vec::with_capacity(rows.len());
for r in rows {
let path: PartitionPath = r.get::<String, _>("path").parse().map_err(|e| {
Error::metadata("decode partition path", io_invalid(&format!("{e}")))
})?;
let include = match scope {
Scope::All | Scope::Tenant => true,
Scope::Partition(p) => {
path.as_str() == p.as_str()
|| path.as_str().starts_with(&format!("{}/", p.as_str()))
}
Scope::Memory(_) => false,
};
if include {
out.push(SummarySubject::Partition(path));
}
}
if matches!(scope, Scope::All | Scope::Tenant) {
out.push(SummarySubject::Tenant);
}
if let Scope::Memory(r) = scope {
out.clear();
out.push(SummarySubject::Memory(r.clone()));
}
Ok(out)
}
async fn insert_summary_input(
&self,
summary_id: &SummaryId,
input_kind: &str,
input_id: &str,
) -> Result<()> {
self.insert_summary_input_with_range(
summary_id,
input_kind,
input_id,
&SummaryInputRange::default(),
)
.await
}
async fn summaries_citing(&self, input_kind: &str, input_id: &str) -> Result<Vec<SummaryId>> {
let rows = sqlx::query(
"SELECT si.summary_id FROM summary_input si \
INNER JOIN summary s ON s.id = si.summary_id \
WHERE si.input_kind = ? AND si.input_id = ? AND s.tombstoned = 0 \
ORDER BY s.created_at DESC",
)
.bind(input_kind)
.bind(input_id)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("summaries_citing", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let id: SummaryId =
r.get::<String, _>("summary_id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode summary_id", io_invalid(&e.to_string()))
})?;
out.push(id);
}
Ok(out)
}
async fn list_all_summaries_for_backfill(&self) -> Result<Vec<SummaryRow>> {
let rows = sqlx::query(
"SELECT id, subject_kind, subject_path, subject_memory, style, version, data_path, content_hash, bytes, summarizer_id, inputs, superseded_by, tombstoned, created_at \
FROM summary",
)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("list_all_summaries_for_backfill", rows)?;
rows.into_iter().map(row_to_summary).collect()
}
async fn links_in_subtree(&self, scope: &Scope) -> Result<Vec<crate::link::Link>> {
let (sql, binds) = match scope {
Scope::All | Scope::Tenant => (
"SELECT DISTINCT l.src_id, l.dst_id, l.kind, l.created_at \
FROM node_link l \
INNER JOIN memory ms ON ms.id = l.src_id AND ms.tombstoned = 0 \
INNER JOIN memory md ON md.id = l.dst_id AND md.tombstoned = 0 \
WHERE l.src_kind = 'memory' AND l.dst_kind = 'memory' \
ORDER BY l.created_at, l.src_id, l.dst_id"
.to_string(),
Vec::<String>::new(),
),
Scope::Partition(p) => (
"SELECT DISTINCT l.src_id, l.dst_id, l.kind, l.created_at \
FROM node_link l \
INNER JOIN memory ms ON ms.id = l.src_id AND ms.tombstoned = 0 \
INNER JOIN memory md ON md.id = l.dst_id AND md.tombstoned = 0 \
WHERE l.src_kind = 'memory' AND l.dst_kind = 'memory' \
AND (ms.partition_path = ?1 OR ms.partition_path LIKE ?2) \
AND (md.partition_path = ?1 OR md.partition_path LIKE ?2) \
ORDER BY l.created_at, l.src_id, l.dst_id"
.to_string(),
vec![p.as_str().to_string(), format!("{}/%", p.as_str())],
),
Scope::Memory(r) => (
"SELECT DISTINCT l.src_id, l.dst_id, l.kind, l.created_at \
FROM node_link l \
WHERE l.src_kind = 'memory' AND l.dst_kind = 'memory' \
AND (l.src_id = ?1 OR l.dst_id = ?1) \
ORDER BY l.created_at, l.src_id, l.dst_id"
.to_string(),
vec![r.id.to_string()],
),
};
let mut q = sqlx::query(&sql);
for b in &binds {
q = q.bind(b);
}
let rows = Self::map_sqlx("links_in_subtree", q.fetch_all(&self.pool).await)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
out.push(crate::link::Link {
src: r
.get::<String, _>("src_id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode src_id", io_invalid(&e.to_string()))
})?,
dst: r
.get::<String, _>("dst_id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode dst_id", io_invalid(&e.to_string()))
})?,
kind: LinkKind::from_persisted(&r.get::<String, _>("kind")),
created_at_ms: r.get::<i64, _>("created_at"),
});
}
Ok(out)
}
async fn summaries_in_subtree(&self, scope: &Scope) -> Result<Vec<SummaryRow>> {
let (sql, binds) = match scope {
Scope::All | Scope::Tenant => (
"SELECT id, subject_kind, subject_path, subject_memory, style, version, data_path, content_hash, bytes, summarizer_id, inputs, superseded_by, tombstoned, created_at \
FROM summary WHERE tombstoned = 0 \
ORDER BY created_at DESC".to_string(),
Vec::<String>::new(),
),
Scope::Partition(p) => (
"SELECT id, subject_kind, subject_path, subject_memory, style, version, data_path, content_hash, bytes, summarizer_id, inputs, superseded_by, tombstoned, created_at \
FROM summary WHERE tombstoned = 0 AND \
(subject_path = ?1 OR subject_path LIKE ?2) \
ORDER BY created_at DESC".to_string(),
vec![p.as_str().to_string(), format!("{}/%", p.as_str())],
),
Scope::Memory(r) => (
"SELECT id, subject_kind, subject_path, subject_memory, style, version, data_path, content_hash, bytes, summarizer_id, inputs, superseded_by, tombstoned, created_at \
FROM summary WHERE tombstoned = 0 AND subject_memory = ?1 \
ORDER BY created_at DESC".to_string(),
vec![r.id.to_string()],
),
};
let mut q = sqlx::query(&sql);
for b in &binds {
q = q.bind(b);
}
let rows = Self::map_sqlx("summaries_in_subtree", q.fetch_all(&self.pool).await)?;
rows.into_iter().map(row_to_summary).collect()
}
async fn memories_in_subtree(&self, scope: &Scope) -> Result<Vec<MemoryRow>> {
let (sql, binds) = match scope {
Scope::All | Scope::Tenant => (
"SELECT id, partition_path, data_path, content_kind, content_hash, bytes, embedder_id, tombstoned, created_at, updated_at, valid_from_ms, valid_until_ms, kind \
FROM memory WHERE tombstoned = 0 ORDER BY created_at".to_string(),
Vec::<String>::new(),
),
Scope::Partition(p) => (
"SELECT id, partition_path, data_path, content_kind, content_hash, bytes, embedder_id, tombstoned, created_at, updated_at, valid_from_ms, valid_until_ms, kind \
FROM memory WHERE tombstoned = 0 \
AND (partition_path = ?1 OR partition_path LIKE ?2) \
ORDER BY created_at".to_string(),
vec![p.as_str().to_string(), format!("{}/%", p.as_str())],
),
Scope::Memory(r) => (
"SELECT id, partition_path, data_path, content_kind, content_hash, bytes, embedder_id, tombstoned, created_at, updated_at, valid_from_ms, valid_until_ms, kind \
FROM memory WHERE tombstoned = 0 AND id = ?1".to_string(),
vec![r.id.to_string()],
),
};
let mut q = sqlx::query(&sql);
for b in &binds {
q = q.bind(b);
}
let rows = Self::map_sqlx("memories_in_subtree", q.fetch_all(&self.pool).await)?;
rows.into_iter().map(row_to_memory).collect()
}
async fn audit_attaches_for_children_since(
&self,
parent: &PartitionPath,
since: i64,
) -> Result<Vec<crate::summary::SummaryId>> {
let prefix = format!("{}/%", parent.as_str());
let rows = sqlx::query(
"SELECT detail FROM audit_log \
WHERE seq > ?1 AND op = 'summary_attach' \
AND partition_path LIKE ?2 \
ORDER BY seq ASC",
)
.bind(since)
.bind(prefix)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("audit_attaches_for_children_since", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let detail: String = r.get("detail");
let v: serde_json::Value = match serde_json::from_str(&detail) {
Ok(v) => v,
Err(_) => continue,
};
if let Some(sid_str) = v.get("summary_id").and_then(|x| x.as_str())
&& let Ok(sid) = sid_str.parse::<crate::summary::SummaryId>()
{
out.push(sid);
}
}
Ok(out)
}
async fn list_partitions(
&self,
prefix: Option<&PartitionPath>,
) -> Result<Vec<crate::metadata::PartitionInfo>> {
let rows = if let Some(p) = prefix {
let like = format!("{}/%", p.as_str());
sqlx::query(
"SELECT p.path, p.level, p.is_leaf, p.created_at, \
COALESCE(c.n, 0) AS memory_count \
FROM partition p \
LEFT JOIN ( \
SELECT partition_path, COUNT(*) AS n \
FROM memory WHERE tombstoned = 0 \
GROUP BY partition_path \
) c ON c.partition_path = p.path \
WHERE p.path = ?1 OR p.path LIKE ?2 \
ORDER BY p.level, p.path",
)
.bind(p.as_str())
.bind(like)
.fetch_all(&self.pool)
.await
} else {
sqlx::query(
"SELECT p.path, p.level, p.is_leaf, p.created_at, \
COALESCE(c.n, 0) AS memory_count \
FROM partition p \
LEFT JOIN ( \
SELECT partition_path, COUNT(*) AS n \
FROM memory WHERE tombstoned = 0 \
GROUP BY partition_path \
) c ON c.partition_path = p.path \
ORDER BY p.level, p.path",
)
.fetch_all(&self.pool)
.await
};
let rows = Self::map_sqlx("list_partitions", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let path: PartitionPath = r.get::<String, _>("path").parse().map_err(|e| {
Error::metadata("decode partition.path", io_invalid(&format!("{e}")))
})?;
let is_leaf: i64 = r.get("is_leaf");
let memory_count: i64 = r.get("memory_count");
out.push(crate::metadata::PartitionInfo {
path,
level: r.get::<i64, _>("level").try_into().unwrap_or(u32::MAX),
is_leaf: is_leaf != 0,
created_at_ms: r.get("created_at"),
memory_count: u64::try_from(memory_count).unwrap_or(0),
});
}
Ok(out)
}
async fn set_attribute(
&self,
memory_id: &MemoryId,
key: &str,
value: &AttributeValue,
audit: AuditEntry,
) -> Result<()> {
let mut tx = Self::map_sqlx("begin attr tx", self.pool.begin().await)?;
write_attribute(&mut tx, memory_id, key, value).await?;
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit attr tx", tx.commit().await)
}
async fn get_attribute(
&self,
memory_id: &MemoryId,
key: &str,
) -> Result<Option<AttributeValue>> {
let row = sqlx::query(
"SELECT kind, v_string, v_int, v_decimal, v_bool, v_ts, v_array \
FROM memory_attribute WHERE memory_id = ?1 AND key = ?2",
)
.bind(memory_id.to_string())
.bind(key)
.fetch_optional(&self.pool)
.await;
let row = Self::map_sqlx("get attribute", row)?;
match row {
Some(r) => row_to_attribute_value(&r).map(Some),
None => Ok(None),
}
}
async fn clear_attribute(
&self,
memory_id: &MemoryId,
key: &str,
audit: AuditEntry,
) -> Result<()> {
let mut tx = Self::map_sqlx("begin clear attr tx", self.pool.begin().await)?;
let res = sqlx::query("DELETE FROM memory_attribute WHERE memory_id = ?1 AND key = ?2")
.bind(memory_id.to_string())
.bind(key)
.execute(&mut *tx)
.await;
Self::map_sqlx("clear attribute", res)?;
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit clear attr tx", tx.commit().await)
}
async fn list_attributes(
&self,
memory_id: &MemoryId,
) -> Result<std::collections::BTreeMap<String, AttributeValue>> {
let rows = sqlx::query(
"SELECT key, kind, v_string, v_int, v_decimal, v_bool, v_ts, v_array \
FROM memory_attribute WHERE memory_id = ?1 ORDER BY key",
)
.bind(memory_id.to_string())
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("list attributes", rows)?;
let mut out = std::collections::BTreeMap::new();
for r in rows {
let key: String = r.get("key");
let v = row_to_attribute_value(&r)?;
out.insert(key, v);
}
Ok(out)
}
async fn find_by_attribute(&self, key: &str, value: &AttributeValue) -> Result<Vec<MemoryId>> {
let mut q = match value {
AttributeValue::String(s) => sqlx::query(
"SELECT memory_id FROM memory_attribute \
WHERE key = ?1 AND v_string = ?2 ORDER BY memory_id",
)
.bind(key)
.bind(s),
AttributeValue::Int(n) => sqlx::query(
"SELECT memory_id FROM memory_attribute \
WHERE key = ?1 AND v_int = ?2 ORDER BY memory_id",
)
.bind(key)
.bind(*n),
AttributeValue::Decimal(d) => sqlx::query(
"SELECT memory_id FROM memory_attribute \
WHERE key = ?1 AND v_decimal = ?2 ORDER BY memory_id",
)
.bind(key)
.bind(d.to_string()),
AttributeValue::Bool(b) => sqlx::query(
"SELECT memory_id FROM memory_attribute \
WHERE key = ?1 AND v_bool = ?2 ORDER BY memory_id",
)
.bind(key)
.bind(i64::from(*b)),
AttributeValue::Timestamp(n) => sqlx::query(
"SELECT memory_id FROM memory_attribute \
WHERE key = ?1 AND v_ts = ?2 ORDER BY memory_id",
)
.bind(key)
.bind(*n),
AttributeValue::Array(_) => {
let json = serde_json::to_string(value).map_err(|e| Error::InvalidAttribute {
reason: format!("encode array: {e}"),
})?;
sqlx::query(
"SELECT memory_id FROM memory_attribute \
WHERE key = ?1 AND v_array = ?2 ORDER BY memory_id",
)
.bind(key)
.bind(json)
}
};
let _ = &mut q; let rows = Self::map_sqlx("find_by_attribute", q.fetch_all(&self.pool).await)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let id: MemoryId =
r.get::<String, _>("memory_id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode memory_id", io_invalid(&e.to_string()))
})?;
out.push(id);
}
Ok(out)
}
async fn find_by_attribute_range(
&self,
key: &str,
min: &AttributeValue,
max: &AttributeValue,
) -> Result<Vec<MemoryId>> {
match (min, max) {
(AttributeValue::Int(a), AttributeValue::Int(b)) => {
let rows = sqlx::query(
"SELECT memory_id FROM memory_attribute \
WHERE key = ?1 AND v_int BETWEEN ?2 AND ?3 ORDER BY memory_id",
)
.bind(key)
.bind(*a)
.bind(*b)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("find_by_attribute_range int", rows)?;
rows_to_ids(rows)
}
(AttributeValue::Timestamp(a), AttributeValue::Timestamp(b)) => {
let rows = sqlx::query(
"SELECT memory_id FROM memory_attribute \
WHERE key = ?1 AND v_ts BETWEEN ?2 AND ?3 ORDER BY memory_id",
)
.bind(key)
.bind(*a)
.bind(*b)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("find_by_attribute_range ts", rows)?;
rows_to_ids(rows)
}
(AttributeValue::Decimal(a), AttributeValue::Decimal(b)) => {
let rows = sqlx::query(
"SELECT memory_id, v_decimal FROM memory_attribute \
WHERE key = ?1 AND v_decimal IS NOT NULL ORDER BY memory_id",
)
.bind(key)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("find_by_attribute_range decimal", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let s: String = r.get("v_decimal");
let d: rust_decimal::Decimal =
s.parse().map_err(|e: rust_decimal::Error| {
Error::metadata("decode v_decimal", io_invalid(&e.to_string()))
})?;
if d >= *a && d <= *b {
let id: MemoryId = r.get::<String, _>("memory_id").parse().map_err(
|e: ulid::DecodeError| {
Error::metadata("decode memory_id", io_invalid(&e.to_string()))
},
)?;
out.push(id);
}
}
Ok(out)
}
(a, b) => Err(Error::InvalidAttribute {
reason: format!(
"range requires both ends of the same orderable kind (int / decimal / timestamp); got {} / {}",
a.kind_str(),
b.kind_str()
),
}),
}
}
async fn insert_summary_input_with_range(
&self,
summary_id: &SummaryId,
input_kind: &str,
input_id: &str,
range: &SummaryInputRange,
) -> Result<()> {
let res = sqlx::query(
"INSERT OR IGNORE INTO summary_input(\
summary_id, input_kind, input_id, \
byte_start, byte_end, line_start, line_end, time_start_ms, time_end_ms, note\
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(summary_id.to_string())
.bind(input_kind)
.bind(input_id)
.bind(range.byte_start.map(i64::from))
.bind(range.byte_end.map(i64::from))
.bind(range.line_start.map(i64::from))
.bind(range.line_end.map(i64::from))
.bind(range.time_start_ms.map(i64::from))
.bind(range.time_end_ms.map(i64::from))
.bind(range.note.clone())
.execute(&self.pool)
.await;
Self::map_sqlx("insert summary_input_with_range", res).map(|_| ())
}
async fn summaries_citing_with_ranges(
&self,
input_kind: &str,
input_id: &str,
) -> Result<Vec<(SummaryId, SummaryInputRange)>> {
let rows = sqlx::query(
"SELECT si.summary_id, si.byte_start, si.byte_end, si.line_start, si.line_end, \
si.time_start_ms, si.time_end_ms, si.note \
FROM summary_input si \
INNER JOIN summary s ON s.id = si.summary_id \
WHERE si.input_kind = ?1 AND si.input_id = ?2 AND s.tombstoned = 0 \
ORDER BY s.created_at DESC",
)
.bind(input_kind)
.bind(input_id)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("summaries_citing_with_ranges", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let id: SummaryId =
r.get::<String, _>("summary_id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode summary_id", io_invalid(&e.to_string()))
})?;
let range = SummaryInputRange {
byte_start: r
.get::<Option<i64>, _>("byte_start")
.and_then(|n| u32::try_from(n).ok()),
byte_end: r
.get::<Option<i64>, _>("byte_end")
.and_then(|n| u32::try_from(n).ok()),
line_start: r
.get::<Option<i64>, _>("line_start")
.and_then(|n| u32::try_from(n).ok()),
line_end: r
.get::<Option<i64>, _>("line_end")
.and_then(|n| u32::try_from(n).ok()),
time_start_ms: r
.get::<Option<i64>, _>("time_start_ms")
.and_then(|n| u32::try_from(n).ok()),
time_end_ms: r
.get::<Option<i64>, _>("time_end_ms")
.and_then(|n| u32::try_from(n).ok()),
note: r.get::<Option<String>, _>("note"),
};
out.push((id, range));
}
Ok(out)
}
async fn insert_snapshot(&self, row: SnapshotRow, audit: AuditEntry) -> Result<()> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| Error::metadata("insert_snapshot begin", e))?;
sqlx::query(
"INSERT INTO snapshot(id, seq, tag, reason, manifest_path, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
)
.bind(&row.id)
.bind(row.seq)
.bind(row.tag.as_deref())
.bind(row.reason.as_deref())
.bind(&row.manifest_path)
.bind(row.created_at_ms)
.execute(&mut *tx)
.await
.map_err(|e| Error::metadata("insert_snapshot row", e))?;
insert_audit(&mut tx, &audit).await?;
tx.commit()
.await
.map_err(|e| Error::metadata("insert_snapshot commit", e))?;
Ok(())
}
async fn list_snapshots(&self) -> Result<Vec<SnapshotRow>> {
let rows = sqlx::query(
"SELECT id, seq, tag, reason, manifest_path, created_at \
FROM snapshot ORDER BY created_at DESC, id DESC",
)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("list_snapshots", rows)?;
Ok(rows
.into_iter()
.map(|r| SnapshotRow {
id: r.get::<String, _>("id"),
seq: r.get::<i64, _>("seq"),
tag: r.get::<Option<String>, _>("tag"),
reason: r.get::<Option<String>, _>("reason"),
manifest_path: r.get::<String, _>("manifest_path"),
created_at_ms: r.get::<i64, _>("created_at"),
})
.collect())
}
async fn get_snapshot(&self, id: &str) -> Result<Option<SnapshotRow>> {
let row = sqlx::query(
"SELECT id, seq, tag, reason, manifest_path, created_at \
FROM snapshot WHERE id = ?1",
)
.bind(id)
.fetch_optional(&self.pool)
.await;
let row = Self::map_sqlx("get_snapshot", row)?;
Ok(row.map(|r| SnapshotRow {
id: r.get::<String, _>("id"),
seq: r.get::<i64, _>("seq"),
tag: r.get::<Option<String>, _>("tag"),
reason: r.get::<Option<String>, _>("reason"),
manifest_path: r.get::<String, _>("manifest_path"),
created_at_ms: r.get::<i64, _>("created_at"),
}))
}
async fn delete_snapshot(&self, id: &str, audit: AuditEntry) -> Result<()> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| Error::metadata("delete_snapshot begin", e))?;
sqlx::query("DELETE FROM snapshot WHERE id = ?1")
.bind(id)
.execute(&mut *tx)
.await
.map_err(|e| Error::metadata("delete_snapshot row", e))?;
insert_audit(&mut tx, &audit).await?;
tx.commit()
.await
.map_err(|e| Error::metadata("delete_snapshot commit", e))?;
Ok(())
}
async fn current_audit_seq(&self) -> Result<i64> {
let row = sqlx::query("SELECT COALESCE(MAX(seq), 0) AS seq FROM audit_log")
.fetch_one(&self.pool)
.await;
let row = Self::map_sqlx("current_audit_seq", row)?;
Ok(row.get::<i64, _>("seq"))
}
async fn set_memory_tombstone(
&self,
id: &MemoryId,
tombstoned: bool,
ts_ms: i64,
) -> Result<()> {
let res = sqlx::query("UPDATE memory SET tombstoned = ?1, updated_at = ?2 WHERE id = ?3")
.bind(i64::from(tombstoned))
.bind(ts_ms)
.bind(id.to_string())
.execute(&self.pool)
.await;
let r = Self::map_sqlx("set memory tombstone", res)?;
if r.rows_affected() == 0 {
return Err(Error::MemoryNotFound(id.to_string()));
}
Ok(())
}
async fn set_summary_tombstone(&self, id: &SummaryId, tombstoned: bool) -> Result<()> {
let res = sqlx::query("UPDATE summary SET tombstoned = ?1 WHERE id = ?2")
.bind(i64::from(tombstoned))
.bind(id.to_string())
.execute(&self.pool)
.await;
let r = Self::map_sqlx("set summary tombstone", res)?;
if r.rows_affected() == 0 {
return Err(Error::SummaryNotFound(id.to_string()));
}
Ok(())
}
async fn add_link_unaudited(&self, src: &MemoryId, dst: &MemoryId, ts_ms: i64) -> Result<()> {
if src == dst {
return Ok(());
}
let mut tx = Self::map_sqlx("begin add_link_unaudited tx", self.pool.begin().await)?;
let kind_tag = LinkKind::Explicit.as_persisted_str();
for (a, b) in [(*src, *dst), (*dst, *src)] {
let res = sqlx::query(
"INSERT OR IGNORE INTO node_link(src_kind, src_id, dst_kind, dst_id, kind, created_at, note) \
VALUES ('memory', ?1, 'memory', ?2, ?3, ?4, NULL)",
)
.bind(a.to_string())
.bind(b.to_string())
.bind(kind_tag)
.bind(ts_ms)
.execute(&mut *tx)
.await;
if let Err(sqlx::Error::Database(db)) = &res
&& (db.code().as_deref() == Some("787")
|| db.code().as_deref() == Some("FOREIGN KEY"))
{
continue;
}
Self::map_sqlx("insert link unaudited", res)?;
}
Self::map_sqlx("commit add_link_unaudited tx", tx.commit().await)
}
async fn remove_link_unaudited(&self, src: &MemoryId, dst: &MemoryId) -> Result<()> {
let mut tx = Self::map_sqlx("begin remove_link_unaudited tx", self.pool.begin().await)?;
for (a, b) in [(*src, *dst), (*dst, *src)] {
let res = sqlx::query(
"DELETE FROM node_link \
WHERE src_kind = 'memory' AND src_id = ?1 AND dst_kind = 'memory' AND dst_id = ?2",
)
.bind(a.to_string())
.bind(b.to_string())
.execute(&mut *tx)
.await;
Self::map_sqlx("delete link unaudited", res)?;
}
Self::map_sqlx("commit remove_link_unaudited tx", tx.commit().await)
}
async fn list_all_attributes(&self) -> Result<Vec<(MemoryId, String, AttributeValue)>> {
let rows = sqlx::query(
"SELECT memory_id, key, kind, v_string, v_int, v_decimal, v_bool, v_ts, v_array \
FROM memory_attribute ORDER BY memory_id, key",
)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("list all attributes", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let mid: MemoryId =
r.get::<String, _>("memory_id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode memory_id", io_invalid(&e.to_string()))
})?;
let key: String = r.get("key");
let v = row_to_attribute_value(&r)?;
out.push((mid, key, v));
}
Ok(out)
}
async fn list_all_memory_ids(&self) -> Result<Vec<MemoryId>> {
let rows = sqlx::query("SELECT id FROM memory ORDER BY id")
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("list_all_memory_ids", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let id: MemoryId =
r.get::<String, _>("id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode memory id", io_invalid(&e.to_string()))
})?;
out.push(id);
}
Ok(out)
}
async fn list_all_summary_ids(&self) -> Result<Vec<SummaryId>> {
let rows = sqlx::query("SELECT id FROM summary ORDER BY id")
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("list_all_summary_ids", rows)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let id: SummaryId =
r.get::<String, _>("id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode summary id", io_invalid(&e.to_string()))
})?;
out.push(id);
}
Ok(out)
}
async fn insert_restore_audit(&self, audit: AuditEntry) -> Result<i64> {
let mut tx = Self::map_sqlx("begin restore audit tx", self.pool.begin().await)?;
let seq = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit restore audit tx", tx.commit().await)?;
Ok(seq)
}
async fn relocate_memory(
&self,
id: &MemoryId,
new_partition: &PartitionPath,
new_data_path: &str,
ts_ms: i64,
) -> Result<()> {
let res = sqlx::query(
"UPDATE memory SET partition_path = ?1, data_path = ?2, updated_at = ?3 WHERE id = ?4",
)
.bind(new_partition.as_str())
.bind(new_data_path)
.bind(ts_ms)
.bind(id.to_string())
.execute(&self.pool)
.await;
let r = Self::map_sqlx("relocate memory", res)?;
if r.rows_affected() == 0 {
return Err(Error::MemoryNotFound(id.to_string()));
}
Ok(())
}
async fn set_memory_validity(
&self,
id: &MemoryId,
valid_from_ms: Option<i64>,
valid_until_ms: Option<i64>,
audit: AuditEntry,
) -> Result<()> {
let mut tx = Self::map_sqlx("begin set_validity tx", self.pool.begin().await)?;
let res = sqlx::query(
"UPDATE memory SET valid_from_ms = ?1, valid_until_ms = ?2, updated_at = ?3 WHERE id = ?4",
)
.bind(valid_from_ms)
.bind(valid_until_ms)
.bind(audit.ts_ms)
.bind(id.to_string())
.execute(&mut *tx)
.await;
let r = Self::map_sqlx("set_memory_validity", res)?;
if r.rows_affected() == 0 {
return Err(Error::MemoryNotFound(id.to_string()));
}
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit set_validity tx", tx.commit().await)
}
async fn set_summary_attribute(
&self,
summary_id: &SummaryId,
key: &str,
value: &AttributeValue,
audit: AuditEntry,
) -> Result<()> {
let mut tx = Self::map_sqlx("begin summary attr tx", self.pool.begin().await)?;
write_summary_attribute(&mut tx, summary_id, key, value).await?;
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit summary attr tx", tx.commit().await)
}
async fn get_summary_attribute(
&self,
summary_id: &SummaryId,
key: &str,
) -> Result<Option<AttributeValue>> {
let row = sqlx::query(
"SELECT kind, v_string, v_int, v_decimal, v_bool, v_ts, v_array \
FROM summary_attribute WHERE summary_id = ?1 AND key = ?2",
)
.bind(summary_id.to_string())
.bind(key)
.fetch_optional(&self.pool)
.await;
let row = Self::map_sqlx("get summary attribute", row)?;
match row {
Some(r) => row_to_attribute_value(&r).map(Some),
None => Ok(None),
}
}
async fn clear_summary_attribute(
&self,
summary_id: &SummaryId,
key: &str,
audit: AuditEntry,
) -> Result<()> {
let mut tx = Self::map_sqlx("begin clear summary attr tx", self.pool.begin().await)?;
let res = sqlx::query("DELETE FROM summary_attribute WHERE summary_id = ?1 AND key = ?2")
.bind(summary_id.to_string())
.bind(key)
.execute(&mut *tx)
.await;
Self::map_sqlx("clear summary attribute", res)?;
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit clear summary attr tx", tx.commit().await)
}
async fn list_summary_attributes(
&self,
summary_id: &SummaryId,
) -> Result<std::collections::BTreeMap<String, AttributeValue>> {
let rows = sqlx::query(
"SELECT key, kind, v_string, v_int, v_decimal, v_bool, v_ts, v_array \
FROM summary_attribute WHERE summary_id = ?1 ORDER BY key",
)
.bind(summary_id.to_string())
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("list summary attributes", rows)?;
let mut out = std::collections::BTreeMap::new();
for r in rows {
let key: String = r.get("key");
let v = row_to_attribute_value(&r)?;
out.insert(key, v);
}
Ok(out)
}
async fn find_summaries_by_attribute(
&self,
key: &str,
value: &AttributeValue,
) -> Result<Vec<SummaryId>> {
let q = match value {
AttributeValue::String(s) => sqlx::query(
"SELECT summary_id FROM summary_attribute \
WHERE key = ?1 AND v_string = ?2 ORDER BY summary_id",
)
.bind(key)
.bind(s),
AttributeValue::Int(n) => sqlx::query(
"SELECT summary_id FROM summary_attribute \
WHERE key = ?1 AND v_int = ?2 ORDER BY summary_id",
)
.bind(key)
.bind(*n),
AttributeValue::Decimal(d) => sqlx::query(
"SELECT summary_id FROM summary_attribute \
WHERE key = ?1 AND v_decimal = ?2 ORDER BY summary_id",
)
.bind(key)
.bind(d.to_string()),
AttributeValue::Bool(b) => sqlx::query(
"SELECT summary_id FROM summary_attribute \
WHERE key = ?1 AND v_bool = ?2 ORDER BY summary_id",
)
.bind(key)
.bind(i64::from(*b)),
AttributeValue::Timestamp(n) => sqlx::query(
"SELECT summary_id FROM summary_attribute \
WHERE key = ?1 AND v_ts = ?2 ORDER BY summary_id",
)
.bind(key)
.bind(*n),
AttributeValue::Array(_) => {
let json = serde_json::to_string(value).map_err(|e| Error::InvalidAttribute {
reason: format!("encode array: {e}"),
})?;
sqlx::query(
"SELECT summary_id FROM summary_attribute \
WHERE key = ?1 AND v_array = ?2 ORDER BY summary_id",
)
.bind(key)
.bind(json)
}
};
let rows = Self::map_sqlx("find_summaries_by_attribute", q.fetch_all(&self.pool).await)?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let id: SummaryId =
r.get::<String, _>("summary_id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode summary_id", io_invalid(&e.to_string()))
})?;
out.push(id);
}
Ok(out)
}
async fn add_node_link(
&self,
src: &crate::graph::NodeRef,
dst: &crate::graph::NodeRef,
kind: LinkKind,
ts_ms: i64,
note: Option<&str>,
audit: AuditEntry,
) -> Result<()> {
let (src_kind, src_id) = node_ref_persisted(src);
let (dst_kind, dst_id) = node_ref_persisted(dst);
let mut tx = Self::map_sqlx("begin add node_link tx", self.pool.begin().await)?;
let res = sqlx::query(
"INSERT OR IGNORE INTO node_link(src_kind, src_id, dst_kind, dst_id, kind, created_at, note) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
)
.bind(src_kind)
.bind(&src_id)
.bind(dst_kind)
.bind(&dst_id)
.bind(kind.as_persisted_str())
.bind(ts_ms)
.bind(note)
.execute(&mut *tx)
.await;
Self::map_sqlx("insert node_link", res)?;
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit add node_link tx", tx.commit().await)
}
async fn remove_node_link(
&self,
src: &crate::graph::NodeRef,
dst: &crate::graph::NodeRef,
kind: LinkKind,
audit: AuditEntry,
) -> Result<()> {
let (src_kind, src_id) = node_ref_persisted(src);
let (dst_kind, dst_id) = node_ref_persisted(dst);
let mut tx = Self::map_sqlx("begin remove node_link tx", self.pool.begin().await)?;
let res = sqlx::query(
"DELETE FROM node_link \
WHERE src_kind = ?1 AND src_id = ?2 AND dst_kind = ?3 AND dst_id = ?4 AND kind = ?5",
)
.bind(src_kind)
.bind(&src_id)
.bind(dst_kind)
.bind(&dst_id)
.bind(kind.as_persisted_str())
.execute(&mut *tx)
.await;
Self::map_sqlx("delete node_link", res)?;
let _ = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit remove node_link tx", tx.commit().await)
}
async fn node_links_from(&self, src: &crate::graph::NodeRef) -> Result<Vec<crate::link::Edge>> {
let (src_kind, src_id) = node_ref_persisted(src);
let rows = sqlx::query(
"SELECT src_kind, src_id, dst_kind, dst_id, kind, created_at, note \
FROM node_link WHERE src_kind = ?1 AND src_id = ?2 \
ORDER BY created_at, dst_kind, dst_id, kind",
)
.bind(src_kind)
.bind(&src_id)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("node_links_from", rows)?;
rows_to_edges(self, rows).await
}
async fn node_links_to(&self, dst: &crate::graph::NodeRef) -> Result<Vec<crate::link::Edge>> {
let (dst_kind, dst_id) = node_ref_persisted(dst);
let rows = sqlx::query(
"SELECT src_kind, src_id, dst_kind, dst_id, kind, created_at, note \
FROM node_link WHERE dst_kind = ?1 AND dst_id = ?2 \
ORDER BY created_at, src_kind, src_id, kind",
)
.bind(dst_kind)
.bind(&dst_id)
.fetch_all(&self.pool)
.await;
let rows = Self::map_sqlx("node_links_to", rows)?;
rows_to_edges(self, rows).await
}
async fn apply_evolution(
&self,
ops: &crate::evolve::EvolutionOps,
audit: AuditEntry,
) -> Result<crate::evolve::EvolutionApplied> {
use crate::evolve::{MemoryAttributeOp, SummaryAttributeOp};
let mut tx = Self::map_sqlx("begin evolve tx", self.pool.begin().await)?;
for op in &ops.memory_attributes {
match op {
MemoryAttributeOp::Set { mref, key, value } => {
write_attribute(&mut tx, &mref.id, key, value).await?;
}
MemoryAttributeOp::Clear { mref, key } => {
let res = sqlx::query(
"DELETE FROM memory_attribute WHERE memory_id = ?1 AND key = ?2",
)
.bind(mref.id.to_string())
.bind(key)
.execute(&mut *tx)
.await;
Self::map_sqlx("evolve clear memory attribute", res)?;
}
}
}
for op in &ops.summary_attributes {
match op {
SummaryAttributeOp::Set { sref, key, value } => {
write_summary_attribute(&mut tx, &sref.id, key, value).await?;
}
SummaryAttributeOp::Clear { sref, key } => {
let res = sqlx::query(
"DELETE FROM summary_attribute WHERE summary_id = ?1 AND key = ?2",
)
.bind(sref.id.to_string())
.bind(key)
.execute(&mut *tx)
.await;
Self::map_sqlx("evolve clear summary attribute", res)?;
}
}
}
for (src, dst, kind) in &ops.links_added {
let (sk, si) = node_ref_persisted(src);
let (dk, di) = node_ref_persisted(dst);
let res = sqlx::query(
"INSERT OR IGNORE INTO node_link(src_kind, src_id, dst_kind, dst_id, kind, created_at, note) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL)",
)
.bind(sk)
.bind(&si)
.bind(dk)
.bind(&di)
.bind(kind.as_persisted_str())
.bind(audit.ts_ms)
.execute(&mut *tx)
.await;
Self::map_sqlx("evolve add link", res)?;
}
for (src, dst, kind) in &ops.links_removed {
let (sk, si) = node_ref_persisted(src);
let (dk, di) = node_ref_persisted(dst);
let res = sqlx::query(
"DELETE FROM node_link \
WHERE src_kind = ?1 AND src_id = ?2 AND dst_kind = ?3 AND dst_id = ?4 AND kind = ?5",
)
.bind(sk)
.bind(&si)
.bind(dk)
.bind(&di)
.bind(kind.as_persisted_str())
.execute(&mut *tx)
.await;
Self::map_sqlx("evolve remove link", res)?;
}
for (mref, from_ms, until_ms) in &ops.validity_updates {
let res = sqlx::query(
"UPDATE memory SET valid_from_ms = ?1, valid_until_ms = ?2, updated_at = ?3 WHERE id = ?4",
)
.bind(*from_ms)
.bind(*until_ms)
.bind(audit.ts_ms)
.bind(mref.id.to_string())
.execute(&mut *tx)
.await;
let r = Self::map_sqlx("evolve validity update", res)?;
if r.rows_affected() == 0 {
return Err(Error::MemoryNotFound(mref.id.to_string()));
}
}
for (mref, kind) in &ops.kind_updates {
let res = sqlx::query("UPDATE memory SET kind = ?1, updated_at = ?2 WHERE id = ?3")
.bind(kind.as_persisted_str())
.bind(audit.ts_ms)
.bind(mref.id.to_string())
.execute(&mut *tx)
.await;
let r = Self::map_sqlx("evolve kind update", res)?;
if r.rows_affected() == 0 {
return Err(Error::MemoryNotFound(mref.id.to_string()));
}
}
let audit_seq = insert_audit(&mut tx, &audit).await?;
Self::map_sqlx("commit evolve tx", tx.commit().await)?;
Ok(crate::evolve::EvolutionApplied {
applied: ops.op_count(),
audit_seq,
})
}
async fn find_memories_by_kind(
&self,
kind: crate::memory::MemoryKind,
scope: &Scope,
) -> Result<Vec<MemoryRow>> {
let kind_tag = kind.as_persisted_str();
let (sql, mut binds) = match scope {
Scope::All | Scope::Tenant => (
"SELECT id, partition_path, data_path, content_kind, content_hash, bytes, embedder_id, tombstoned, created_at, updated_at, valid_from_ms, valid_until_ms, kind \
FROM memory WHERE tombstoned = 0 AND kind = ?1 ORDER BY created_at".to_string(),
vec![kind_tag.to_string()],
),
Scope::Partition(p) => (
"SELECT id, partition_path, data_path, content_kind, content_hash, bytes, embedder_id, tombstoned, created_at, updated_at, valid_from_ms, valid_until_ms, kind \
FROM memory WHERE tombstoned = 0 AND kind = ?1 \
AND (partition_path = ?2 OR partition_path LIKE ?3) \
ORDER BY created_at".to_string(),
vec![
kind_tag.to_string(),
p.as_str().to_string(),
format!("{}/%", p.as_str()),
],
),
Scope::Memory(r) => (
"SELECT id, partition_path, data_path, content_kind, content_hash, bytes, embedder_id, tombstoned, created_at, updated_at, valid_from_ms, valid_until_ms, kind \
FROM memory WHERE tombstoned = 0 AND kind = ?1 AND id = ?2".to_string(),
vec![kind_tag.to_string(), r.id.to_string()],
),
};
let mut q = sqlx::query(&sql);
for b in binds.drain(..) {
q = q.bind(b);
}
let rows = Self::map_sqlx("find_memories_by_kind", q.fetch_all(&self.pool).await)?;
rows.into_iter().map(row_to_memory).collect()
}
}
async fn insert_audit(
tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
audit: &AuditEntry,
) -> Result<i64> {
let res = sqlx::query(
"INSERT INTO audit_log(ts, actor, op, partition_path, memory_id, detail) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
)
.bind(audit.ts_ms)
.bind(audit.actor.as_deref())
.bind(audit.op.as_str())
.bind(audit.partition_path.as_ref().map(|p| p.as_str()))
.bind(audit.memory_id.map(|m| m.to_string()))
.bind(audit.detail.to_string())
.execute(&mut **tx)
.await
.map_err(|e| Error::metadata("insert audit", e))?;
Ok(res.last_insert_rowid())
}
fn io_invalid(msg: &str) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::InvalidData, msg.to_string())
}
fn node_ref_persisted(n: &crate::graph::NodeRef) -> (&'static str, String) {
match n {
crate::graph::NodeRef::Memory(m) => ("memory", m.id.to_string()),
crate::graph::NodeRef::Summary(s) => ("summary", s.id.to_string()),
crate::graph::NodeRef::Partition(p) => ("partition", p.as_str().to_string()),
}
}
async fn rows_to_edges(
store: &SqliteMetadata,
rows: Vec<sqlx::sqlite::SqliteRow>,
) -> Result<Vec<crate::link::Edge>> {
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let src_kind: String = r.get("src_kind");
let src_id: String = r.get("src_id");
let dst_kind: String = r.get("dst_kind");
let dst_id: String = r.get("dst_id");
let kind_tag: String = r.get("kind");
let created_at_ms: i64 = r.get("created_at");
let note: Option<String> = r.get("note");
let src = hydrate_node_ref(store, &src_kind, &src_id).await?;
let dst = hydrate_node_ref(store, &dst_kind, &dst_id).await?;
out.push(crate::link::Edge {
src,
dst,
kind: LinkKind::from_persisted(&kind_tag),
created_at_ms,
note,
});
}
Ok(out)
}
async fn hydrate_node_ref(
store: &SqliteMetadata,
kind: &str,
id: &str,
) -> Result<crate::graph::NodeRef> {
match kind {
"memory" => {
let mid: MemoryId = id.parse().map_err(|e: ulid::DecodeError| {
Error::metadata("decode memory id", io_invalid(&e.to_string()))
})?;
let row = store.get_memory(&mid).await?;
let partition = row
.map(|r| r.partition_path)
.unwrap_or_else(crate::partition::tenant_root_path);
Ok(crate::graph::NodeRef::Memory(crate::memory::MemoryRef {
id: mid,
partition,
}))
}
"summary" => {
let sid: SummaryId = id.parse().map_err(|e: ulid::DecodeError| {
Error::metadata("decode summary id", io_invalid(&e.to_string()))
})?;
let row = store.get_summary(&sid).await?;
match row {
Some(r) => {
let subject = match r.subject_kind.as_str() {
"memory" => match (r.subject_path, r.subject_memory) {
(Some(p), Some(m)) => {
SummarySubject::Memory(crate::memory::MemoryRef {
id: m,
partition: p,
})
}
_ => SummarySubject::Tenant,
},
"partition" => match r.subject_path {
Some(p) => SummarySubject::Partition(p),
None => SummarySubject::Tenant,
},
_ => SummarySubject::Tenant,
};
let style = SummaryStyle::from_persisted(&r.style);
Ok(crate::graph::NodeRef::Summary(crate::summary::SummaryRef {
id: r.id,
subject,
style,
version: u32::try_from(r.version).unwrap_or(0),
}))
}
None => {
Ok(crate::graph::NodeRef::Summary(crate::summary::SummaryRef {
id: sid,
subject: SummarySubject::Tenant,
style: SummaryStyle::Compact,
version: 0,
}))
}
}
}
"partition" => {
let path: PartitionPath = id.parse().map_err(|e| {
Error::metadata("decode partition path", io_invalid(&format!("{e}")))
})?;
Ok(crate::graph::NodeRef::Partition(path))
}
other => Err(Error::metadata("unknown node_link kind", io_invalid(other))),
}
}
async fn write_summary_attribute(
tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
summary_id: &SummaryId,
key: &str,
value: &AttributeValue,
) -> Result<()> {
let kind = value.kind_str();
let mut q = sqlx::query(
"INSERT OR REPLACE INTO summary_attribute(\
summary_id, key, kind, v_string, v_int, v_decimal, v_bool, v_ts, v_array\
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
)
.bind(summary_id.to_string())
.bind(key)
.bind(kind);
let mut v_string: Option<String> = None;
let mut v_int: Option<i64> = None;
let mut v_decimal: Option<String> = None;
let mut v_bool: Option<i64> = None;
let mut v_ts: Option<i64> = None;
let mut v_array: Option<String> = None;
match value {
AttributeValue::String(s) => v_string = Some(s.clone()),
AttributeValue::Int(n) => v_int = Some(*n),
AttributeValue::Decimal(d) => v_decimal = Some(d.to_string()),
AttributeValue::Bool(b) => v_bool = Some(i64::from(*b)),
AttributeValue::Timestamp(n) => v_ts = Some(*n),
AttributeValue::Array(_) => {
v_array = Some(
serde_json::to_string(value).map_err(|e| Error::InvalidAttribute {
reason: format!("encode array: {e}"),
})?,
);
}
}
q = q
.bind(v_string)
.bind(v_int)
.bind(v_decimal)
.bind(v_bool)
.bind(v_ts)
.bind(v_array);
let res = q.execute(&mut **tx).await;
SqliteMetadata::map_sqlx("write summary attribute", res).map(|_| ())
}
async fn write_attribute(
tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
memory_id: &MemoryId,
key: &str,
value: &AttributeValue,
) -> Result<()> {
let kind = value.kind_str();
let mut q = sqlx::query(
"INSERT OR REPLACE INTO memory_attribute(\
memory_id, key, kind, v_string, v_int, v_decimal, v_bool, v_ts, v_array\
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
)
.bind(memory_id.to_string())
.bind(key)
.bind(kind);
let mut v_string: Option<String> = None;
let mut v_int: Option<i64> = None;
let mut v_decimal: Option<String> = None;
let mut v_bool: Option<i64> = None;
let mut v_ts: Option<i64> = None;
let mut v_array: Option<String> = None;
match value {
AttributeValue::String(s) => v_string = Some(s.clone()),
AttributeValue::Int(n) => v_int = Some(*n),
AttributeValue::Decimal(d) => v_decimal = Some(d.to_string()),
AttributeValue::Bool(b) => v_bool = Some(i64::from(*b)),
AttributeValue::Timestamp(n) => v_ts = Some(*n),
AttributeValue::Array(_) => {
v_array = Some(
serde_json::to_string(value).map_err(|e| Error::InvalidAttribute {
reason: format!("encode array: {e}"),
})?,
);
}
}
q = q
.bind(v_string)
.bind(v_int)
.bind(v_decimal)
.bind(v_bool)
.bind(v_ts)
.bind(v_array);
let res = q.execute(&mut **tx).await;
SqliteMetadata::map_sqlx("write attribute", res).map(|_| ())
}
fn row_to_attribute_value(r: &sqlx::sqlite::SqliteRow) -> Result<AttributeValue> {
let kind: String = r.get("kind");
Ok(match kind.as_str() {
"string" => AttributeValue::String(
r.get::<Option<String>, _>("v_string")
.ok_or_else(|| Error::metadata("decode attr v_string", io_invalid("null")))?,
),
"int" => AttributeValue::Int(
r.get::<Option<i64>, _>("v_int")
.ok_or_else(|| Error::metadata("decode attr v_int", io_invalid("null")))?,
),
"decimal" => {
let s: String = r
.get::<Option<String>, _>("v_decimal")
.ok_or_else(|| Error::metadata("decode attr v_decimal", io_invalid("null")))?;
let d: rust_decimal::Decimal = s.parse().map_err(|e: rust_decimal::Error| {
Error::metadata("decode v_decimal", io_invalid(&e.to_string()))
})?;
AttributeValue::Decimal(d)
}
"bool" => AttributeValue::Bool(
r.get::<Option<i64>, _>("v_bool")
.ok_or_else(|| Error::metadata("decode attr v_bool", io_invalid("null")))?
!= 0,
),
"timestamp" => AttributeValue::Timestamp(
r.get::<Option<i64>, _>("v_ts")
.ok_or_else(|| Error::metadata("decode attr v_ts", io_invalid("null")))?,
),
"array" => {
let s: String = r
.get::<Option<String>, _>("v_array")
.ok_or_else(|| Error::metadata("decode attr v_array", io_invalid("null")))?;
serde_json::from_str(&s).map_err(|e| Error::InvalidAttribute {
reason: format!("decode array: {e}"),
})?
}
other => return Err(Error::metadata("unknown attribute kind", io_invalid(other))),
})
}
fn rows_to_ids(rows: Vec<sqlx::sqlite::SqliteRow>) -> Result<Vec<MemoryId>> {
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let id: MemoryId =
r.get::<String, _>("memory_id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode memory_id", io_invalid(&e.to_string()))
})?;
out.push(id);
}
Ok(out)
}
fn row_to_summary(r: sqlx::sqlite::SqliteRow) -> Result<SummaryRow> {
let hash_blob: Vec<u8> = r.get("content_hash");
if hash_blob.len() != 32 {
return Err(Error::metadata(
"decode summary content_hash",
io_invalid("content_hash blob is not 32 bytes"),
));
}
let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_blob);
let id: SummaryId = r
.get::<String, _>("id")
.parse()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode summary id", io_invalid(&e.to_string()))
})?;
let subject_path: Option<PartitionPath> = r
.get::<Option<String>, _>("subject_path")
.map(|s| s.parse())
.transpose()
.map_err(|e| Error::metadata("decode subject_path", io_invalid(&format!("{e}"))))?;
let subject_memory: Option<MemoryId> = r
.get::<Option<String>, _>("subject_memory")
.map(|s| s.parse())
.transpose()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode subject_memory", io_invalid(&e.to_string()))
})?;
let inputs_json: String = r.get::<String, _>("inputs");
let inputs: Vec<SummarySubject> = serde_json::from_str(&inputs_json).unwrap_or_default();
let superseded_by: Option<SummaryId> = r
.get::<Option<String>, _>("superseded_by")
.map(|s| s.parse())
.transpose()
.map_err(|e: ulid::DecodeError| {
Error::metadata("decode superseded_by", io_invalid(&e.to_string()))
})?;
Ok(SummaryRow {
id,
subject_kind: r.get::<String, _>("subject_kind"),
subject_path,
subject_memory,
style: r.get::<String, _>("style"),
version: r.get::<i64, _>("version"),
data_path: r.get::<String, _>("data_path"),
content_hash: ContentHash(hash),
bytes: r.get::<i64, _>("bytes"),
summarizer_id: r.get::<String, _>("summarizer_id"),
inputs,
superseded_by,
tombstoned: r.get::<i64, _>("tombstoned") != 0,
created_at_ms: r.get::<i64, _>("created_at"),
})
}
fn row_to_memory(r: sqlx::sqlite::SqliteRow) -> Result<MemoryRow> {
let hash_blob: Vec<u8> = r.get("content_hash");
if hash_blob.len() != 32 {
return Err(Error::metadata(
"decode content_hash",
io_invalid("content_hash blob is not 32 bytes"),
));
}
let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_blob);
let id: MemoryId = r
.get::<String, _>("id")
.parse()
.map_err(|e: ulid::DecodeError| Error::metadata("decode id", io_invalid(&e.to_string())))?;
let partition_path: PartitionPath = r
.get::<String, _>("partition_path")
.parse()
.map_err(|e| Error::metadata("decode partition_path", io_invalid(&format!("{e}"))))?;
let valid_from_ms = r.try_get::<Option<i64>, _>("valid_from_ms").unwrap_or(None);
let valid_until_ms = r
.try_get::<Option<i64>, _>("valid_until_ms")
.unwrap_or(None);
let kind = r
.try_get::<Option<String>, _>("kind")
.ok()
.flatten()
.as_deref()
.and_then(crate::memory::MemoryKind::from_persisted);
Ok(MemoryRow {
id,
partition_path,
data_path: r.get::<String, _>("data_path"),
content_kind: r.get::<String, _>("content_kind"),
content_hash: ContentHash(hash),
bytes: r.get::<i64, _>("bytes"),
embedder_id: r.get::<String, _>("embedder_id"),
tombstoned: r.get::<i64, _>("tombstoned") != 0,
created_at_ms: r.get::<i64, _>("created_at"),
updated_at_ms: r.get::<i64, _>("updated_at"),
valid_from_ms,
valid_until_ms,
kind,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn migrate_then_read_meta() {
let m = SqliteMetadata::connect_memory().await.unwrap();
m.migrate().await.unwrap();
assert!(m.read_schema_meta().await.unwrap().is_none());
}
#[tokio::test]
async fn summary_input_insert_and_lookup() {
let m = SqliteMetadata::connect_memory().await.unwrap();
m.migrate().await.unwrap();
let path: PartitionPath = "user=alex".parse().unwrap();
m.ensure_partition_chain(&path, true, 1).await.unwrap();
let sid = SummaryId::generate();
let row = SummaryRow {
id: sid,
subject_kind: "partition".into(),
subject_path: Some(path.clone()),
subject_memory: None,
style: "compact".into(),
version: 1,
data_path: "x".into(),
content_hash: ContentHash([0; 32]),
bytes: 0,
summarizer_id: "x:1".into(),
inputs: vec![],
superseded_by: None,
tombstoned: false,
created_at_ms: 1,
};
let audit = AuditEntry {
ts_ms: 1,
actor: None,
op: crate::audit::AuditOp::SummaryAttach,
partition_path: Some(path.clone()),
memory_id: None,
detail: serde_json::Value::Null,
};
m.insert_summary(crate::metadata::InsertSummaryRequest {
row,
audit,
embedding_blob: None,
content_for_index: String::new(),
parent_path: path.as_str().to_string(),
})
.await
.unwrap();
m.insert_summary_input(&sid, "memory", "01J0Z00000000000000000000Z")
.await
.unwrap();
m.insert_summary_input(&sid, "memory", "01J0Z00000000000000000000Z")
.await
.unwrap();
let citing = m
.summaries_citing("memory", "01J0Z00000000000000000000Z")
.await
.unwrap();
assert_eq!(citing, vec![sid]);
}
#[tokio::test]
async fn write_then_read_meta() {
let m = SqliteMetadata::connect_memory().await.unwrap();
m.migrate().await.unwrap();
let meta = SchemaMeta {
partition_scheme: "user={user}".into(),
scheme_version: 1,
embedder_id: Some("mock:hash:v1".into()),
embedder_dims: Some(384),
created_at_ms: 1,
};
m.write_schema_meta(&meta).await.unwrap();
assert_eq!(m.read_schema_meta().await.unwrap(), Some(meta));
}
async fn fresh_attr_store() -> (SqliteMetadata, MemoryId) {
use crate::content::ContentHash;
let m = SqliteMetadata::connect_memory().await.unwrap();
m.migrate().await.unwrap();
let path: PartitionPath = "user=alex".parse().unwrap();
m.ensure_partition_chain(&path, true, 1).await.unwrap();
let mid = MemoryId::generate();
let row = MemoryRow {
id: mid,
partition_path: path.clone(),
data_path: "alex.md".into(),
content_kind: "md".into(),
content_hash: ContentHash([0u8; 32]),
bytes: 0,
embedder_id: "mock:hash:v1".into(),
tombstoned: false,
created_at_ms: 1,
updated_at_ms: 1,
valid_from_ms: None,
valid_until_ms: None,
kind: None,
};
let audit = AuditEntry {
ts_ms: 1,
actor: None,
op: crate::audit::AuditOp::Append,
partition_path: Some(path),
memory_id: Some(mid),
detail: serde_json::Value::Null,
};
m.append_memory(AppendMemoryRequest {
row,
explicit_links: vec![],
audit,
embedding_blob: vec![],
content_for_index: String::new(),
})
.await
.unwrap();
(m, mid)
}
fn attr_audit(mid: MemoryId, key: &str) -> AuditEntry {
AuditEntry {
ts_ms: 1,
actor: None,
op: crate::audit::AuditOp::AttributeSet,
partition_path: None,
memory_id: Some(mid),
detail: serde_json::json!({ "key": key }),
}
}
#[tokio::test]
async fn attribute_round_trip_each_kind() {
use rust_decimal::prelude::FromStr;
let (m, mid) = fresh_attr_store().await;
let cases = [
("speaker", AttributeValue::String("alex".into())),
("seq", AttributeValue::Int(7)),
(
"amount",
AttributeValue::Decimal(rust_decimal::Decimal::from_str("12.5").unwrap()),
),
("flagged", AttributeValue::Bool(true)),
("ts_start_ms", AttributeValue::Timestamp(1_700_000)),
(
"tags",
AttributeValue::Array(vec![AttributeValue::String("a".into())]),
),
];
for (k, v) in &cases {
m.set_attribute(&mid, k, v, attr_audit(mid, k))
.await
.unwrap();
assert_eq!(
m.get_attribute(&mid, k).await.unwrap(),
Some(v.clone()),
"kind={}",
v.kind_str()
);
}
let all = m.list_attributes(&mid).await.unwrap();
assert_eq!(all.len(), cases.len());
}
#[tokio::test]
async fn attribute_overwrite_is_idempotent() {
let (m, mid) = fresh_attr_store().await;
m.set_attribute(
&mid,
"speaker",
&AttributeValue::String("alex".into()),
attr_audit(mid, "speaker"),
)
.await
.unwrap();
m.set_attribute(
&mid,
"speaker",
&AttributeValue::String("bob".into()),
attr_audit(mid, "speaker"),
)
.await
.unwrap();
assert_eq!(
m.get_attribute(&mid, "speaker").await.unwrap(),
Some(AttributeValue::String("bob".into()))
);
}
#[tokio::test]
async fn find_by_attribute_indexed_query() {
let (m, mid) = fresh_attr_store().await;
m.set_attribute(
&mid,
"speaker",
&AttributeValue::String("alex".into()),
attr_audit(mid, "speaker"),
)
.await
.unwrap();
let hits = m
.find_by_attribute("speaker", &AttributeValue::String("alex".into()))
.await
.unwrap();
assert_eq!(hits, vec![mid]);
let empty = m
.find_by_attribute("speaker", &AttributeValue::String("bob".into()))
.await
.unwrap();
assert!(empty.is_empty());
}
#[tokio::test]
async fn find_by_attribute_range_int_and_timestamp() {
let (m, mid) = fresh_attr_store().await;
m.set_attribute(
&mid,
"seq",
&AttributeValue::Int(50),
attr_audit(mid, "seq"),
)
.await
.unwrap();
m.set_attribute(
&mid,
"ts",
&AttributeValue::Timestamp(100),
attr_audit(mid, "ts"),
)
.await
.unwrap();
let hits = m
.find_by_attribute_range("seq", &AttributeValue::Int(0), &AttributeValue::Int(100))
.await
.unwrap();
assert_eq!(hits, vec![mid]);
let hits = m
.find_by_attribute_range(
"ts",
&AttributeValue::Timestamp(0),
&AttributeValue::Timestamp(50),
)
.await
.unwrap();
assert!(hits.is_empty());
let hits = m
.find_by_attribute_range(
"ts",
&AttributeValue::Timestamp(0),
&AttributeValue::Timestamp(200),
)
.await
.unwrap();
assert_eq!(hits, vec![mid]);
}
#[tokio::test]
async fn find_by_attribute_range_rejects_non_orderable() {
let (m, _) = fresh_attr_store().await;
let r = m
.find_by_attribute_range(
"x",
&AttributeValue::String("a".into()),
&AttributeValue::String("b".into()),
)
.await;
assert!(matches!(r, Err(Error::InvalidAttribute { .. })));
let r = m
.find_by_attribute_range("x", &AttributeValue::Int(0), &AttributeValue::Timestamp(1))
.await;
assert!(matches!(r, Err(Error::InvalidAttribute { .. })));
}
#[tokio::test]
async fn list_attributes_returns_btreemap() {
let (m, mid) = fresh_attr_store().await;
m.set_attribute(&mid, "z", &AttributeValue::Int(1), attr_audit(mid, "z"))
.await
.unwrap();
m.set_attribute(
&mid,
"a",
&AttributeValue::String("hi".into()),
attr_audit(mid, "a"),
)
.await
.unwrap();
let all = m.list_attributes(&mid).await.unwrap();
let keys: Vec<_> = all.keys().cloned().collect();
assert_eq!(keys, vec!["a".to_string(), "z".to_string()]);
}
#[tokio::test]
async fn clear_attribute_removes_row() {
let (m, mid) = fresh_attr_store().await;
m.set_attribute(
&mid,
"speaker",
&AttributeValue::String("alex".into()),
attr_audit(mid, "speaker"),
)
.await
.unwrap();
m.clear_attribute(&mid, "speaker", attr_audit(mid, "speaker"))
.await
.unwrap();
assert!(m.get_attribute(&mid, "speaker").await.unwrap().is_none());
m.clear_attribute(&mid, "speaker", attr_audit(mid, "speaker"))
.await
.unwrap();
}
#[tokio::test]
async fn summary_input_with_range_round_trips() {
let (m, _) = fresh_attr_store().await;
let path: PartitionPath = "user=alex".parse().unwrap();
let sid = SummaryId::generate();
let row = SummaryRow {
id: sid,
subject_kind: "partition".into(),
subject_path: Some(path.clone()),
subject_memory: None,
style: "compact".into(),
version: 1,
data_path: "x".into(),
content_hash: ContentHash([0; 32]),
bytes: 0,
summarizer_id: "x:1".into(),
inputs: vec![],
superseded_by: None,
tombstoned: false,
created_at_ms: 1,
};
let audit = AuditEntry {
ts_ms: 1,
actor: None,
op: crate::audit::AuditOp::SummaryAttach,
partition_path: Some(path.clone()),
memory_id: None,
detail: serde_json::Value::Null,
};
m.insert_summary(crate::metadata::InsertSummaryRequest {
row,
audit,
embedding_blob: None,
content_for_index: String::new(),
parent_path: path.as_str().to_string(),
})
.await
.unwrap();
let mid_str = "01J0Z00000000000000000000Z";
m.insert_summary_input_with_range(
&sid,
"memory",
mid_str,
&SummaryInputRange {
line_start: Some(42),
line_end: Some(44),
time_start_ms: Some(127_000),
time_end_ms: Some(130_000),
..Default::default()
},
)
.await
.unwrap();
m.insert_summary_input_with_range(
&sid,
"memory",
mid_str,
&SummaryInputRange {
line_start: Some(42),
line_end: Some(44),
time_start_ms: Some(127_000),
time_end_ms: Some(130_000),
..Default::default()
},
)
.await
.unwrap();
let hits = m
.summaries_citing_with_ranges("memory", mid_str)
.await
.unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].0, sid);
assert_eq!(hits[0].1.line_start, Some(42));
assert_eq!(hits[0].1.time_end_ms, Some(130_000));
}
#[tokio::test]
async fn snapshot_round_trips() {
let m = SqliteMetadata::connect_memory().await.unwrap();
m.migrate().await.unwrap();
let row = SnapshotRow {
id: "01J0SNAPSHOT0000000000000Z".into(),
seq: 7,
tag: Some("pre-experiment".into()),
reason: Some("about to run risky migration".into()),
manifest_path: "snapshots/01J0SNAPSHOT0000000000000Z.manifest.json".into(),
created_at_ms: 1700,
};
let audit = AuditEntry {
ts_ms: 1700,
actor: None,
op: crate::audit::AuditOp::Snapshot,
partition_path: None,
memory_id: None,
detail: serde_json::Value::Null,
};
m.insert_snapshot(row.clone(), audit).await.unwrap();
let back = m.get_snapshot(&row.id).await.unwrap();
assert_eq!(back.as_ref().unwrap(), &row);
let list = m.list_snapshots().await.unwrap();
assert_eq!(list.len(), 1);
assert_eq!(list[0], row);
}
#[tokio::test]
async fn delete_snapshot_removes_row_idempotently() {
let m = SqliteMetadata::connect_memory().await.unwrap();
m.migrate().await.unwrap();
let row = SnapshotRow {
id: "01J0SNAPSHOT0000000000001Z".into(),
seq: 1,
tag: None,
reason: None,
manifest_path: "snapshots/x.manifest.json".into(),
created_at_ms: 1,
};
let audit_in = AuditEntry {
ts_ms: 1,
actor: None,
op: crate::audit::AuditOp::Snapshot,
partition_path: None,
memory_id: None,
detail: serde_json::Value::Null,
};
m.insert_snapshot(row.clone(), audit_in).await.unwrap();
let audit_del = AuditEntry {
ts_ms: 2,
actor: None,
op: crate::audit::AuditOp::SnapshotDelete,
partition_path: None,
memory_id: None,
detail: serde_json::Value::Null,
};
m.delete_snapshot(&row.id, audit_del.clone()).await.unwrap();
assert!(m.get_snapshot(&row.id).await.unwrap().is_none());
m.delete_snapshot(&row.id, audit_del).await.unwrap();
}
#[tokio::test]
async fn current_audit_seq_starts_at_zero_then_grows() {
let m = SqliteMetadata::connect_memory().await.unwrap();
m.migrate().await.unwrap();
assert_eq!(m.current_audit_seq().await.unwrap(), 0);
let audit = AuditEntry {
ts_ms: 1,
actor: None,
op: crate::audit::AuditOp::Snapshot,
partition_path: None,
memory_id: None,
detail: serde_json::Value::Null,
};
let row = SnapshotRow {
id: "01J0SNAPSHOT0000000000002Z".into(),
seq: 0,
tag: None,
reason: None,
manifest_path: "x".into(),
created_at_ms: 1,
};
m.insert_snapshot(row, audit).await.unwrap();
assert!(m.current_audit_seq().await.unwrap() >= 1);
}
}