use std::sync::Arc;
use async_trait::async_trait;
use uuid::Uuid;
use khive_storage::error::StorageError;
use khive_storage::note::{FilterOp, Note, NoteFilter, SortDir};
use khive_storage::types::{BatchWriteSummary, DeleteMode, Page, PageRequest, SqlValue};
use khive_storage::NoteStore;
use khive_storage::StorageCapability;
use crate::error::SqliteError;
use crate::pool::ConnectionPool;
fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
StorageError::driver(StorageCapability::Notes, op, e)
}
fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
StorageError::driver(StorageCapability::Notes, op, e)
}
pub struct SqlNoteStore {
pool: Arc<ConnectionPool>,
is_file_backed: bool,
}
impl SqlNoteStore {
pub fn new(pool: Arc<ConnectionPool>, is_file_backed: bool) -> Self {
Self {
pool,
is_file_backed,
}
}
fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
let config = self.pool.config();
let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
operation: "note_reader".into(),
message: "in-memory databases do not support standalone connections".into(),
})?;
let conn = rusqlite::Connection::open_with_flags(
path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
| rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
| rusqlite::OpenFlags::SQLITE_OPEN_URI,
)
.map_err(|e| map_err(e, "open_note_reader"))?;
conn.busy_timeout(config.busy_timeout)
.map_err(|e| map_err(e, "open_note_reader"))?;
conn.pragma_update(None, "foreign_keys", "ON")
.map_err(|e| map_err(e, "open_note_reader"))?;
conn.pragma_update(None, "synchronous", "NORMAL")
.map_err(|e| map_err(e, "open_note_reader"))?;
Ok(conn)
}
async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
where
F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
R: Send + 'static,
{
let pool = Arc::clone(&self.pool);
tokio::task::spawn_blocking(move || {
let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
f(guard.conn()).map_err(|e| map_err(e, op))
})
.await
.map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
}
async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
where
F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
R: Send + 'static,
{
if self.is_file_backed {
let conn = self.open_standalone_reader()?;
tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
.await
.map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
} else {
let pool = Arc::clone(&self.pool);
tokio::task::spawn_blocking(move || {
let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
f(guard.conn()).map_err(|e| map_err(e, op))
})
.await
.map_err(|e| StorageError::driver(StorageCapability::Notes, op, e))?
}
}
}
fn read_note(row: &rusqlite::Row<'_>) -> Result<Note, rusqlite::Error> {
let id_str: String = row.get(0)?;
let namespace: String = row.get(1)?;
let kind: String = row.get(2)?;
let status: String = row.get(3)?;
let name: Option<String> = row.get(4)?;
let content: String = row.get(5)?;
let salience: Option<f64> = row.get(6)?;
let decay_factor: Option<f64> = row.get(7)?;
let expires_at: Option<i64> = row.get(8)?;
let properties_str: Option<String> = row.get(9)?;
let created_at: i64 = row.get(10)?;
let updated_at: i64 = row.get(11)?;
let deleted_at: Option<i64> = row.get(12)?;
let id = parse_uuid(&id_str)?;
let properties = properties_str
.map(|s| {
serde_json::from_str(&s).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
9,
rusqlite::types::Type::Text,
Box::new(e),
)
})
})
.transpose()?;
Ok(Note {
id,
namespace,
kind,
status,
name,
content,
salience,
decay_factor,
expires_at,
properties,
created_at,
updated_at,
deleted_at,
})
}
fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
Uuid::parse_str(s).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
})
}
fn build_note_where(
namespace: &str,
kind: Option<&str>,
) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
let mut conditions: Vec<String> = vec![
"namespace = ?1".to_string(),
"deleted_at IS NULL".to_string(),
];
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
if let Some(k) = kind {
params.push(Box::new(k.to_string()));
conditions.push(format!("kind = ?{}", params.len()));
}
let clause = format!(" WHERE {}", conditions.join(" AND "));
(clause, params)
}
fn validate_json_path(path: &str) -> Result<(), StorageError> {
let valid = path.starts_with("$.")
&& path[2..].split('.').all(|part| {
!part.is_empty() && part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
});
if valid {
Ok(())
} else {
Err(StorageError::InvalidInput {
capability: StorageCapability::Notes,
operation: "query_notes_filtered".into(),
message: format!("invalid JSON path for note filter: {path:?}"),
})
}
}
fn json_extract_expr(path: &str) -> String {
format!("json_extract(properties, '{path}')")
}
fn json_type_expr(path: &str) -> String {
format!("json_type(properties, '{path}')")
}
fn sql_value_param(value: &SqlValue) -> Result<Box<dyn rusqlite::types::ToSql>, rusqlite::Error> {
Ok(match value {
SqlValue::Null => Box::new(Option::<String>::None),
SqlValue::Bool(v) => Box::new(*v as i64),
SqlValue::Integer(v) => Box::new(*v),
SqlValue::Float(v) => Box::new(*v),
SqlValue::Text(v) => Box::new(v.clone()),
SqlValue::Blob(v) => Box::new(v.clone()),
SqlValue::Json(v) => Box::new(
serde_json::to_string(v)
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?,
),
SqlValue::Uuid(v) => Box::new(v.to_string()),
SqlValue::Timestamp(v) => Box::new(v.timestamp_micros()),
})
}
fn build_note_filter_where(
namespace: &str,
filter: &NoteFilter,
) -> Result<(String, Vec<Box<dyn rusqlite::types::ToSql>>), rusqlite::Error> {
let mut conditions = vec![
"namespace = ?1".to_string(),
"deleted_at IS NULL".to_string(),
];
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
if let Some(kind) = &filter.kind {
params.push(Box::new(kind.clone()));
conditions.push(format!("kind = ?{}", params.len()));
}
for pf in &filter.property_filters {
match pf.op {
FilterOp::EqOrMissing => {
let expr = json_extract_expr(&pf.json_path);
params.push(sql_value_param(&pf.value)?);
conditions.push(format!(
"({expr} = ?{n} OR {expr} IS NULL)",
n = params.len()
));
}
FilterOp::JsonTypeEq => {
let type_expr = json_type_expr(&pf.json_path);
params.push(sql_value_param(&pf.value)?);
conditions.push(format!("{type_expr} = ?{}", params.len()));
}
FilterOp::JsonTypeNeMissing => {
let type_expr = json_type_expr(&pf.json_path);
params.push(sql_value_param(&pf.value)?);
let n = params.len();
conditions.push(format!("({type_expr} IS NULL OR {type_expr} != ?{n})"));
}
_ => {
let expr = json_extract_expr(&pf.json_path);
let op = match pf.op {
FilterOp::Eq => "=",
FilterOp::Ne => "!=",
FilterOp::Lt => "<",
FilterOp::Lte => "<=",
FilterOp::Gt => ">",
FilterOp::Gte => ">=",
FilterOp::EqOrMissing | FilterOp::JsonTypeEq | FilterOp::JsonTypeNeMissing => {
unreachable!()
}
};
params.push(sql_value_param(&pf.value)?);
conditions.push(format!("{expr} {op} ?{}", params.len()));
}
}
}
Ok((format!(" WHERE {}", conditions.join(" AND ")), params))
}
#[async_trait]
impl NoteStore for SqlNoteStore {
async fn upsert_note(&self, note: Note) -> Result<(), StorageError> {
let namespace = note.namespace.clone();
let id_str = note.id.to_string();
let kind_str = note.kind.to_string();
let status_str = note.status.clone();
let properties_str = note
.properties
.as_ref()
.map(|v| serde_json::to_string(v).unwrap_or_default());
self.with_writer("upsert_note", move |conn| {
conn.execute(
"INSERT OR REPLACE INTO notes \
(id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
properties, created_at, updated_at, deleted_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
rusqlite::params![
id_str,
namespace,
kind_str,
status_str,
note.name,
note.content,
note.salience,
note.decay_factor,
note.expires_at,
properties_str,
note.created_at,
note.updated_at,
note.deleted_at,
],
)?;
Ok(())
})
.await
}
async fn upsert_notes(&self, notes: Vec<Note>) -> Result<BatchWriteSummary, StorageError> {
let attempted = notes.len() as u64;
self.with_writer("upsert_notes", move |conn| {
conn.execute_batch("BEGIN IMMEDIATE")?;
let mut affected = 0u64;
let mut failed = 0u64;
let mut first_error = String::new();
for note in ¬es {
let id_str = note.id.to_string();
let kind_str = note.kind.to_string();
let status_str = note.status.clone();
let properties_str = note
.properties
.as_ref()
.map(|v| serde_json::to_string(v).unwrap_or_default());
match conn.execute(
"INSERT OR REPLACE INTO notes \
(id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
properties, created_at, updated_at, deleted_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
rusqlite::params![
id_str,
¬e.namespace,
kind_str,
status_str,
¬e.name,
note.content,
note.salience,
note.decay_factor,
note.expires_at,
properties_str,
note.created_at,
note.updated_at,
note.deleted_at,
],
) {
Ok(_) => affected += 1,
Err(e) => {
if first_error.is_empty() {
first_error = e.to_string();
}
failed += 1;
}
}
}
if let Err(e) = conn.execute_batch("COMMIT") {
let _ = conn.execute_batch("ROLLBACK");
return Err(e);
}
Ok(BatchWriteSummary {
attempted,
affected,
failed,
first_error,
})
})
.await
}
async fn get_note(&self, id: Uuid) -> Result<Option<Note>, StorageError> {
let id_str = id.to_string();
self.with_reader("get_note", move |conn| {
let mut stmt = conn.prepare(
"SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
properties, created_at, updated_at, deleted_at \
FROM notes WHERE id = ?1 AND deleted_at IS NULL",
)?;
let mut rows = stmt.query(rusqlite::params![id_str])?;
match rows.next()? {
Some(row) => Ok(Some(read_note(row)?)),
None => Ok(None),
}
})
.await
}
async fn get_note_including_deleted(&self, id: Uuid) -> Result<Option<Note>, StorageError> {
let id_str = id.to_string();
self.with_reader("get_note_including_deleted", move |conn| {
let mut stmt = conn.prepare(
"SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
properties, created_at, updated_at, deleted_at \
FROM notes WHERE id = ?1",
)?;
let mut rows = stmt.query(rusqlite::params![id_str])?;
match rows.next()? {
Some(row) => Ok(Some(read_note(row)?)),
None => Ok(None),
}
})
.await
}
async fn get_notes_batch(&self, ids: &[Uuid]) -> Result<Vec<Note>, StorageError> {
if ids.is_empty() {
return Ok(vec![]);
}
let id_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
self.with_reader("get_notes_batch", move |conn| {
let placeholders: String = (1..=id_strings.len())
.map(|i| format!("?{i}"))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
properties, created_at, updated_at, deleted_at \
FROM notes WHERE id IN ({placeholders}) AND deleted_at IS NULL"
);
let mut stmt = conn.prepare(&sql)?;
let params: Vec<&dyn rusqlite::types::ToSql> = id_strings
.iter()
.map(|s| s as &dyn rusqlite::types::ToSql)
.collect();
let rows = stmt.query_map(params.as_slice(), read_note)?;
let mut out = Vec::new();
for row in rows {
out.push(row?);
}
Ok(out)
})
.await
}
async fn delete_note(&self, id: Uuid, mode: DeleteMode) -> Result<bool, StorageError> {
let id_str = id.to_string();
match mode {
DeleteMode::Soft => {
self.with_writer("delete_note_soft", move |conn| {
let now = chrono::Utc::now().timestamp_micros();
let deleted = conn.execute(
"UPDATE notes SET status = 'deleted', deleted_at = ?1 \
WHERE id = ?2 AND deleted_at IS NULL",
rusqlite::params![now, id_str],
)?;
Ok(deleted > 0)
})
.await
}
DeleteMode::Hard => {
self.with_writer("delete_note_hard", move |conn| {
let deleted =
conn.execute("DELETE FROM notes WHERE id = ?1", rusqlite::params![id_str])?;
Ok(deleted > 0)
})
.await
}
}
}
async fn query_notes(
&self,
namespace: &str,
kind: Option<&str>,
page: PageRequest,
) -> Result<Page<Note>, StorageError> {
let namespace = namespace.to_string();
let kind = kind.map(|k| k.to_string());
self.with_reader("query_notes", move |conn| {
let (count_sql, count_params) = build_note_where(&namespace, kind.as_deref());
let total: i64 = {
let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
let mut stmt = conn.prepare(&sql)?;
let param_refs: Vec<&dyn rusqlite::types::ToSql> =
count_params.iter().map(|p| p.as_ref()).collect();
stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
};
let (where_sql, mut data_params) = build_note_where(&namespace, kind.as_deref());
data_params.push(Box::new(page.limit as i64));
data_params.push(Box::new(page.offset as i64));
let limit_idx = data_params.len() - 1;
let offset_idx = data_params.len();
let data_sql = format!(
"SELECT id, namespace, kind, status, name, content, salience, decay_factor, expires_at, \
properties, created_at, updated_at, deleted_at \
FROM notes{} ORDER BY created_at DESC LIMIT ?{} OFFSET ?{}",
where_sql, limit_idx, offset_idx,
);
let mut stmt = conn.prepare(&data_sql)?;
let param_refs: Vec<&dyn rusqlite::types::ToSql> =
data_params.iter().map(|p| p.as_ref()).collect();
let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
let mut items = Vec::new();
for row in rows {
items.push(row?);
}
Ok(Page {
items,
total: Some(total as u64),
})
})
.await
}
async fn query_notes_filtered(
&self,
namespace: &str,
filter: &NoteFilter,
page: PageRequest,
) -> Result<Page<Note>, StorageError> {
for pf in &filter.property_filters {
validate_json_path(&pf.json_path)?;
}
if let Some((path, _)) = &filter.order_by {
validate_json_path(path)?;
}
let namespace = namespace.to_string();
let filter = filter.clone();
self.with_reader("query_notes_filtered", move |conn| {
let (count_sql, count_params) = build_note_filter_where(&namespace, &filter)?;
let total: i64 = {
let sql = format!("SELECT COUNT(*) FROM notes{}", count_sql);
let mut stmt = conn.prepare(&sql)?;
let param_refs: Vec<&dyn rusqlite::types::ToSql> =
count_params.iter().map(|p| p.as_ref()).collect();
stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
};
let (where_sql, mut data_params) = build_note_filter_where(&namespace, &filter)?;
data_params.push(Box::new(page.limit as i64));
data_params.push(Box::new(page.offset as i64));
let order_clause = match &filter.order_by {
Some((path, dir)) => {
let dir_str = match dir {
SortDir::Asc => "ASC",
SortDir::Desc => "DESC",
};
format!(" ORDER BY {} {dir_str}", json_extract_expr(path))
}
None => " ORDER BY created_at DESC".to_string(),
};
let limit_idx = data_params.len() - 1;
let offset_idx = data_params.len();
let data_sql = format!(
"SELECT id, namespace, kind, status, name, content, salience, decay_factor, \
expires_at, properties, created_at, updated_at, deleted_at \
FROM notes{}{order_clause} LIMIT ?{} OFFSET ?{}",
where_sql, limit_idx, offset_idx,
);
let mut stmt = conn.prepare(&data_sql)?;
let param_refs: Vec<&dyn rusqlite::types::ToSql> =
data_params.iter().map(|p| p.as_ref()).collect();
let rows = stmt.query_map(param_refs.as_slice(), read_note)?;
let mut items = Vec::new();
for row in rows {
items.push(row?);
}
Ok(Page {
items,
total: Some(total as u64),
})
})
.await
}
async fn count_notes(&self, namespace: &str, kind: Option<&str>) -> Result<u64, StorageError> {
let namespace = namespace.to_string();
let kind = kind.map(|k| k.to_string());
self.with_reader("count_notes", move |conn| {
let (where_sql, params) = build_note_where(&namespace, kind.as_deref());
let sql = format!("SELECT COUNT(*) FROM notes{}", where_sql);
let mut stmt = conn.prepare(&sql)?;
let param_refs: Vec<&dyn rusqlite::types::ToSql> =
params.iter().map(|p| p.as_ref()).collect();
let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
Ok(count as u64)
})
.await
}
}
const NOTES_DDL: &str = include_str!("../../sql/notes-ddl.sql");
pub(crate) fn ensure_notes_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
conn.execute_batch(NOTES_DDL)
}
#[cfg(test)]
#[path = "note_tests.rs"]
mod tests;