use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use rusqlite::{OptionalExtension, params_from_iter};
use crate::error::MiniAppError;
use crate::filter::ListFilter;
use crate::schema::SchemaConfig;
#[derive(Debug, Clone, Serialize)]
pub struct RowRecord {
pub id: String,
pub data: serde_json::Value,
pub created_at: i64,
pub updated_at: i64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum UpdateMode {
#[default]
Merge,
Replace,
}
pub struct Store {
conn: Arc<Mutex<rusqlite::Connection>>,
schema: SchemaConfig,
db_path: PathBuf,
}
const CREATE_TABLE_SQL: &str = "
CREATE TABLE IF NOT EXISTS rows (
id TEXT PRIMARY KEY,
data TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)
";
const CREATE_ALIASES_TABLE_SQL: &str = "
CREATE TABLE IF NOT EXISTS _aliases (
name TEXT PRIMARY KEY,
filter TEXT NOT NULL,
default_limit INTEGER,
description TEXT,
params_schema TEXT
)
";
#[derive(Debug, Clone)]
pub struct AliasRecord {
pub name: String,
pub filter: String,
pub default_limit: Option<u32>,
pub description: Option<String>,
pub params_schema: Option<String>,
}
fn now_secs() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}
fn parse_data(json_str: &str) -> Result<serde_json::Value, MiniAppError> {
serde_json::from_str(json_str).map_err(|e| MiniAppError::Schema(format!("data column: {e}")))
}
fn resolve_id(conn: &rusqlite::Connection, id: &str) -> Result<String, MiniAppError> {
if id.len() == 36 {
return Ok(id.to_string());
}
let mut stmt = conn.prepare("SELECT id FROM rows WHERE id LIKE ?1")?;
let candidates: Vec<String> = stmt
.query_map(rusqlite::params![format!("{}%", id)], |row| {
row.get::<_, String>(0)
})?
.collect::<Result<Vec<_>, _>>()?;
match candidates.len() {
0 => Err(MiniAppError::NotFound { id: id.to_string() }),
1 => {
Ok(candidates.into_iter().next().unwrap())
}
_ => Err(MiniAppError::AmbiguousId {
id_prefix: id.to_string(),
candidates,
}),
}
}
fn shallow_merge(
mut current: serde_json::Value,
patch: serde_json::Value,
schema: &SchemaConfig,
) -> Result<serde_json::Value, MiniAppError> {
let patch_map = patch.as_object().ok_or_else(|| MiniAppError::Validation {
field: "(root)".to_string(),
reason: "patch must be a JSON object".to_string(),
})?;
let current_map = current
.as_object_mut()
.ok_or_else(|| MiniAppError::Validation {
field: "(root)".to_string(),
reason: "stored row is not a JSON object".to_string(),
})?;
for (key, value) in patch_map {
if value.is_null() {
let is_required = schema
.fields
.iter()
.find(|f| &f.name == key)
.map(|f| f.required)
.unwrap_or(false);
if is_required {
return Err(MiniAppError::Validation {
field: key.clone(),
reason: "required field cannot be deleted via null".to_string(),
});
}
current_map.remove(key);
} else {
current_map.insert(key.clone(), value.clone());
}
}
Ok(current)
}
impl Store {
pub async fn open(db_path: &Path, schema: SchemaConfig) -> Result<Self, MiniAppError> {
if let Some(crate::dump::SyncMode::Bidirectional) =
schema.dump.as_ref().and_then(|d| d.sync.as_ref())
{
tracing::warn!(
target: "mini_app_mcp::dump",
"sync=bidirectional configured but not implemented yet; behaving as write-only"
);
}
let stored_db_path = db_path.to_path_buf();
let db_path = db_path.to_path_buf();
let conn =
tokio::task::spawn_blocking(move || -> Result<rusqlite::Connection, MiniAppError> {
let c = rusqlite::Connection::open(&db_path)?;
c.pragma_update(None, "journal_mode", "WAL")?;
let actual_mode: String = c.query_row("PRAGMA journal_mode", [], |r| r.get(0))?;
if actual_mode.to_lowercase() != "wal" {
tracing::warn!(
actual_mode = %actual_mode,
"PRAGMA journal_mode=WAL fell back to non-WAL mode; \
concurrent reload may hit SQLITE_BUSY"
);
}
c.execute_batch(CREATE_TABLE_SQL)?;
c.execute_batch(CREATE_ALIASES_TABLE_SQL)?;
let has_params_schema = c
.prepare("PRAGMA table_info(_aliases)")?
.query_map([], |row| row.get::<_, String>(1))?
.collect::<Result<Vec<_>, _>>()?
.iter()
.any(|name| name == "params_schema");
if !has_params_schema {
c.execute_batch("ALTER TABLE _aliases ADD COLUMN params_schema TEXT")?;
}
Ok(c)
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))??;
Ok(Store {
conn: Arc::new(Mutex::new(conn)),
schema,
db_path: stored_db_path,
})
}
pub fn db_path(&self) -> &Path {
&self.db_path
}
pub fn conn(&self) -> Arc<Mutex<rusqlite::Connection>> {
Arc::clone(&self.conn)
}
pub async fn create(&self, value: serde_json::Value) -> Result<RowRecord, MiniAppError> {
self.schema.validate(&value)?;
let id = uuid::Uuid::new_v4().to_string();
let now = now_secs();
let data_str =
serde_json::to_string(&value).expect("serde_json::Value serialization is infallible");
let conn = self.conn.clone();
let id_inner = id.clone();
let record = tokio::task::spawn_blocking(move || -> Result<RowRecord, MiniAppError> {
let conn = conn
.lock()
.map_err(|_| MiniAppError::Schema("mutex poisoned".to_string()))?;
conn.execute(
"INSERT INTO rows (id, data, created_at, updated_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![id_inner, data_str, now, now],
)?;
Ok(RowRecord {
id: id_inner,
data: value,
created_at: now,
updated_at: now,
})
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))??;
crate::dump::on_change(&self.schema, &record).await?;
Ok(record)
}
pub async fn get(&self, id: &str) -> Result<RowRecord, MiniAppError> {
let conn = self.conn.clone();
let id = id.to_string();
tokio::task::spawn_blocking(move || -> Result<RowRecord, MiniAppError> {
let conn = conn
.lock()
.map_err(|_| MiniAppError::Schema("mutex poisoned".to_string()))?;
let id = resolve_id(&conn, &id)?;
let mut stmt =
conn.prepare("SELECT id, data, created_at, updated_at FROM rows WHERE id = ?1")?;
let row = stmt
.query_row(rusqlite::params![id], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, i64>(3)?,
))
})
.optional()?
.ok_or_else(|| MiniAppError::NotFound { id: id.clone() })?;
let data = parse_data(&row.1)?;
Ok(RowRecord {
id: row.0,
data,
created_at: row.2,
updated_at: row.3,
})
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))?
}
pub async fn list(
&self,
limit: Option<u32>,
offset: Option<u32>,
filter: Option<ListFilter>,
) -> Result<Vec<RowRecord>, MiniAppError> {
let conn = self.conn.clone();
let limit = limit.unwrap_or(100).min(1000) as i64;
let offset = offset.unwrap_or(0) as i64;
let (where_clause, filter_params) = match filter {
None => (String::new(), Vec::new()),
Some(f) => {
let (fragment, params) = f.build_sql()?;
(format!(" WHERE {fragment}"), params)
}
};
tokio::task::spawn_blocking(move || -> Result<Vec<RowRecord>, MiniAppError> {
let conn = conn
.lock()
.map_err(|_| MiniAppError::Schema("mutex poisoned".to_string()))?;
let sql = format!(
"SELECT id, data, created_at, updated_at FROM rows{where_clause} \
ORDER BY created_at DESC LIMIT ? OFFSET ?"
);
let mut all_params: Vec<Box<dyn rusqlite::ToSql>> = filter_params
.into_iter()
.map(|p| -> Box<dyn rusqlite::ToSql> { Box::new(p) })
.collect();
all_params.push(Box::new(limit));
all_params.push(Box::new(offset));
let mut stmt = conn.prepare(&sql)?;
let rows = stmt
.query_map(
params_from_iter(all_params.iter().map(|p| p.as_ref())),
|row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, i64>(3)?,
))
},
)?
.map(|r| {
r.map_err(MiniAppError::Storage).and_then(|row| {
let data = parse_data(&row.1)?;
Ok(RowRecord {
id: row.0,
data,
created_at: row.2,
updated_at: row.3,
})
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(rows)
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))?
}
pub async fn row_count(&self) -> Result<u64, MiniAppError> {
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || -> Result<u64, MiniAppError> {
let conn = conn
.lock()
.map_err(|_| MiniAppError::Schema("mutex poisoned".to_string()))?;
let count: i64 = conn.query_row("SELECT COUNT(*) FROM rows", [], |row| row.get(0))?;
Ok(count.max(0) as u64)
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))?
}
pub async fn update(
&self,
id: &str,
value: serde_json::Value,
mode: UpdateMode,
) -> Result<RowRecord, MiniAppError> {
let now = now_secs();
let conn = self.conn.clone();
let id_str = id.to_string();
let schema = self.schema.clone();
let record = tokio::task::spawn_blocking(move || -> Result<RowRecord, MiniAppError> {
let conn = conn
.lock()
.map_err(|_| MiniAppError::Schema("mutex poisoned".to_string()))?;
let id_str = resolve_id(&conn, &id_str)?;
let row_data: Option<(String, i64)> = conn
.query_row(
"SELECT data, created_at FROM rows WHERE id = ?1",
rusqlite::params![id_str],
|row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
)
.optional()?;
let (current_data_str, created_at) =
row_data.ok_or_else(|| MiniAppError::NotFound { id: id_str.clone() })?;
let merged = match mode {
UpdateMode::Merge => {
let current: serde_json::Value = parse_data(¤t_data_str)?;
let merged = shallow_merge(current, value, &schema)?;
schema.validate(&merged)?;
merged
}
UpdateMode::Replace => {
schema.validate(&value)?;
value
}
};
let merged_str = serde_json::to_string(&merged)
.expect("serde_json::Value serialization is infallible");
conn.execute(
"UPDATE rows SET data = ?1, updated_at = ?2 WHERE id = ?3",
rusqlite::params![merged_str, now, id_str],
)?;
Ok(RowRecord {
id: id_str,
data: merged,
created_at,
updated_at: now,
})
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))??;
crate::dump::on_change(&self.schema, &record).await?;
Ok(record)
}
pub async fn execute_under_savepoint<F, R>(&self, f: F) -> Result<R, MiniAppError>
where
F: FnOnce(&mut rusqlite::Savepoint<'_>) -> Result<R, MiniAppError> + Send + 'static,
R: Send + 'static,
{
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || -> Result<R, MiniAppError> {
let mut guard = conn
.lock()
.map_err(|_| MiniAppError::Schema("mutex poisoned".to_string()))?;
let mut sp = guard.savepoint()?;
sp.set_drop_behavior(rusqlite::DropBehavior::Rollback);
let result = f(&mut sp)?;
sp.commit()?;
Ok(result)
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))?
}
pub async fn delete(&self, id: &str) -> Result<(), MiniAppError> {
let conn = self.conn.clone();
let id = id.to_string();
let resolved_id = tokio::task::spawn_blocking(move || -> Result<String, MiniAppError> {
let conn = conn
.lock()
.map_err(|_| MiniAppError::Schema("mutex poisoned".to_string()))?;
let resolved = resolve_id(&conn, &id)?;
let n = conn.execute(
"DELETE FROM rows WHERE id = ?1",
rusqlite::params![resolved],
)?;
if n == 0 {
return Err(MiniAppError::NotFound { id: resolved });
}
Ok(resolved)
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))??;
crate::dump::on_delete(&self.schema, &resolved_id).await?;
Ok(())
}
pub async fn alias_create(
&self,
name: &str,
filter_json: &str,
default_limit: Option<u32>,
description: Option<String>,
params_schema: Option<String>,
) -> Result<(), MiniAppError> {
let conn = self.conn.clone();
let name = name.to_string();
let filter_json = filter_json.to_string();
tokio::task::spawn_blocking(move || -> Result<(), MiniAppError> {
let conn = conn
.lock()
.map_err(|_| MiniAppError::Schema("mutex poisoned".to_string()))?;
conn.execute(
"INSERT OR IGNORE INTO _aliases \
(name, filter, default_limit, description, params_schema) \
VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![name, filter_json, default_limit, description, params_schema],
)?;
if conn.changes() == 0 {
return Err(MiniAppError::AliasAlreadyExists { name });
}
Ok(())
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))?
}
pub async fn alias_get(&self, name: &str) -> Result<AliasRecord, MiniAppError> {
let conn = self.conn.clone();
let name = name.to_string();
tokio::task::spawn_blocking(move || -> Result<AliasRecord, MiniAppError> {
let conn = conn
.lock()
.map_err(|_| MiniAppError::Schema("mutex poisoned".to_string()))?;
let mut stmt = conn.prepare(
"SELECT name, filter, default_limit, description, params_schema \
FROM _aliases WHERE name = ?1",
)?;
let record = stmt
.query_row(rusqlite::params![name], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<u32>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<String>>(4)?,
))
})
.optional()?
.ok_or_else(|| MiniAppError::AliasNotFound { name: name.clone() })?;
Ok(AliasRecord {
name: record.0,
filter: record.1,
default_limit: record.2,
description: record.3,
params_schema: record.4,
})
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))?
}
pub async fn alias_list(&self) -> Result<Vec<AliasRecord>, MiniAppError> {
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || -> Result<Vec<AliasRecord>, MiniAppError> {
let conn = conn
.lock()
.map_err(|_| MiniAppError::Schema("mutex poisoned".to_string()))?;
let mut stmt = conn.prepare(
"SELECT name, filter, default_limit, description, params_schema \
FROM _aliases ORDER BY name ASC",
)?;
let records = stmt
.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<u32>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<String>>(4)?,
))
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(records
.into_iter()
.map(
|(name, filter, default_limit, description, params_schema)| AliasRecord {
name,
filter,
default_limit,
description,
params_schema,
},
)
.collect())
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))?
}
pub async fn alias_delete(&self, name: &str) -> Result<(), MiniAppError> {
let conn = self.conn.clone();
let name = name.to_string();
tokio::task::spawn_blocking(move || -> Result<(), MiniAppError> {
let conn = conn
.lock()
.map_err(|_| MiniAppError::Schema("mutex poisoned".to_string()))?;
let n = conn.execute(
"DELETE FROM _aliases WHERE name = ?1",
rusqlite::params![name],
)?;
if n == 0 {
return Err(MiniAppError::AliasNotFound { name });
}
Ok(())
})
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")))?
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::schema::{FieldDef, FieldType};
async fn make_test_store() -> Store {
let schema = SchemaConfig {
table: "test".into(),
title: None,
description: None,
fields: vec![
FieldDef {
name: "title".into(),
ty: FieldType::String,
required: true,
description: None,
},
FieldDef {
name: "state".into(),
ty: FieldType::String,
required: false,
description: None,
},
],
dump: None,
};
Store::open(Path::new(":memory:"), schema).await.unwrap()
}
async fn make_test_store_with_dump(dir: &Path) -> Store {
use crate::dump::{DumpConfig, SyncMode};
let schema = SchemaConfig {
table: "test".into(),
title: None,
description: None,
fields: vec![
FieldDef {
name: "title".into(),
ty: FieldType::String,
required: true,
description: None,
},
FieldDef {
name: "body".into(),
ty: FieldType::String,
required: false,
description: None,
},
],
dump: Some(DumpConfig {
dir: Some(dir.to_path_buf()),
title_field: None,
body_field: None,
sync: Some(SyncMode::WriteOnly),
}),
};
Store::open(Path::new(":memory:"), schema).await.unwrap()
}
#[tokio::test]
async fn test_create_and_get_roundtrip() {
let store = make_test_store().await;
let value = serde_json::json!({"title": "hello", "state": "open"});
let row = store.create(value.clone()).await.unwrap();
let fetched = store.get(&row.id).await.unwrap();
assert_eq!(fetched.id, row.id);
assert_eq!(fetched.data, value);
}
#[tokio::test]
async fn test_create_then_list() {
let store = make_test_store().await;
store
.create(serde_json::json!({"title": "t1"}))
.await
.unwrap();
let rows = store.list(None, None, None).await.unwrap();
assert_eq!(rows.len(), 1);
}
#[tokio::test]
async fn test_list_limit_offset() {
let store = make_test_store().await;
for i in 0..5 {
store
.create(serde_json::json!({"title": format!("item-{i}")}))
.await
.unwrap();
}
let page1 = store.list(Some(2), Some(0), None).await.unwrap();
assert_eq!(page1.len(), 2);
let page2 = store.list(Some(2), Some(2), None).await.unwrap();
assert_eq!(page2.len(), 2);
let page3 = store.list(Some(2), Some(4), None).await.unwrap();
assert_eq!(page3.len(), 1);
}
#[tokio::test]
async fn test_update_timestamps() {
let store = make_test_store().await;
let row = store
.create(serde_json::json!({"title": "original"}))
.await
.unwrap();
let updated = store
.update(
&row.id,
serde_json::json!({"title": "changed"}),
UpdateMode::Replace,
)
.await
.unwrap();
assert_eq!(updated.created_at, row.created_at);
assert_eq!(updated.id, row.id);
assert_eq!(updated.data["title"], "changed");
}
#[tokio::test]
async fn test_create_delete_get_not_found() {
let store = make_test_store().await;
let row = store
.create(serde_json::json!({"title": "to-delete"}))
.await
.unwrap();
store.delete(&row.id).await.unwrap();
let err = store.get(&row.id).await.unwrap_err();
assert!(matches!(err, MiniAppError::NotFound { .. }));
}
#[tokio::test]
async fn test_get_unknown_id_not_found() {
let store = make_test_store().await;
let err = store.get("nonexistent-id").await.unwrap_err();
assert!(matches!(err, MiniAppError::NotFound { .. }));
}
#[tokio::test]
async fn test_update_unknown_id_not_found() {
let store = make_test_store().await;
let err = store
.update(
"nonexistent-id",
serde_json::json!({"title": "x"}),
UpdateMode::Replace,
)
.await
.unwrap_err();
assert!(matches!(err, MiniAppError::NotFound { .. }));
}
#[tokio::test]
async fn test_delete_unknown_id_not_found() {
let store = make_test_store().await;
let err = store.delete("nonexistent-id").await.unwrap_err();
assert!(matches!(err, MiniAppError::NotFound { .. }));
}
#[tokio::test]
async fn test_create_missing_required_field_validation_error() {
let store = make_test_store().await;
let err = store
.create(serde_json::json!({"state": "open"}))
.await
.unwrap_err();
assert!(
matches!(err, MiniAppError::Validation { .. }),
"expected Validation, got: {err:?}"
);
}
#[tokio::test]
async fn test_create_type_mismatch_validation_error() {
let store = make_test_store().await;
let err = store
.create(serde_json::json!({"title": 42}))
.await
.unwrap_err();
assert!(
matches!(err, MiniAppError::Validation { .. }),
"expected Validation, got: {err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_store_create_concurrent() {
let store = Arc::new(make_test_store().await);
let handles: Vec<_> = (0..4)
.map(|i| {
let s = store.clone();
tokio::spawn(async move {
s.create(serde_json::json!({"title": format!("task-{i}"), "state": "open"}))
.await
})
})
.collect();
let results: Vec<_> = futures::future::join_all(handles).await;
assert!(results.iter().all(|r| r.as_ref().unwrap().is_ok()));
let rows = store.list(None, None, None).await.unwrap();
assert_eq!(rows.len(), 4);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_store_mutex_no_await_holding_lock() {
let store = Arc::new(make_test_store().await);
let id = store
.create(serde_json::json!({"title": "init", "state": "open"}))
.await
.unwrap()
.id;
let s1 = store.clone();
let id1 = id.clone();
let h1 = tokio::spawn(async move { s1.get(&id1).await });
let s2 = store.clone();
let id2 = id.clone();
let h2 = tokio::spawn(async move {
s2.update(
&id2,
serde_json::json!({"title": "updated", "state": "closed"}),
UpdateMode::Replace,
)
.await
});
let (r1, r2) = tokio::join!(h1, h2);
assert!(r1.unwrap().is_ok());
assert!(r2.unwrap().is_ok());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_store_arc_clone_across_tasks() {
let store = Arc::new(make_test_store().await);
let handles: Vec<_> = (0..8)
.map(|i| {
let s = Arc::clone(&store);
tokio::spawn(async move {
s.create(serde_json::json!({"title": format!("row-{i}"), "state": "open"}))
.await
})
})
.collect();
futures::future::join_all(handles).await;
assert_eq!(store.list(None, None, None).await.unwrap().len(), 8);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_spawn_blocking_join_error_propagation() {
let result: Result<(), _> = tokio::task::spawn_blocking(|| panic!("intentional"))
.await
.map_err(|e| MiniAppError::Schema(format!("blocking task panic: {e}")));
assert!(matches!(result, Err(MiniAppError::Schema(_))));
}
#[tokio::test]
async fn create_triggers_dump_when_configured() {
let tmp = tempfile::tempdir().expect("tempdir");
let store = make_test_store_with_dump(tmp.path()).await;
let row = store
.create(serde_json::json!({"title": "My Issue", "body": "Details"}))
.await
.expect("create ok");
let dump_file = tmp.path().join(format!("{}.md", row.id));
assert!(dump_file.exists(), "dump file must be created after create");
let content = std::fs::read_to_string(&dump_file).expect("read dump file");
assert!(content.starts_with("# My Issue\n"));
assert!(content.contains("Details"));
}
#[tokio::test]
async fn update_overwrites_dump_file() {
let tmp = tempfile::tempdir().expect("tempdir");
let store = make_test_store_with_dump(tmp.path()).await;
let row = store
.create(serde_json::json!({"title": "Original", "body": "v1"}))
.await
.expect("create ok");
store
.update(
&row.id,
serde_json::json!({"title": "Updated", "body": "v2"}),
UpdateMode::Replace,
)
.await
.expect("update ok");
let dump_file = tmp.path().join(format!("{}.md", row.id));
let content = std::fs::read_to_string(&dump_file).expect("read dump file");
assert!(
content.starts_with("# Updated\n"),
"dump file must reflect updated title"
);
assert!(
content.contains("v2"),
"dump file must reflect updated body"
);
}
#[tokio::test]
async fn delete_keeps_dump_file_by_default() {
let tmp = tempfile::tempdir().expect("tempdir");
let store = make_test_store_with_dump(tmp.path()).await;
let row = store
.create(serde_json::json!({"title": "Keep Me", "body": ""}))
.await
.expect("create ok");
let dump_file = tmp.path().join(format!("{}.md", row.id));
assert!(dump_file.exists(), "dump file must exist after create");
store.delete(&row.id).await.expect("delete ok");
assert!(
dump_file.exists(),
"dump file must remain after delete (default: keep)"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_store_create_concurrent_dump_writes_all_files() {
let tmp = tempfile::tempdir().expect("tempdir");
let store = Arc::new(make_test_store_with_dump(tmp.path()).await);
let handles: Vec<_> = (0..4)
.map(|i| {
let s = store.clone();
tokio::spawn(async move {
s.create(serde_json::json!({
"title": format!("concurrent-{i}"),
"body": format!("body-{i}"),
}))
.await
})
})
.collect();
let results: Vec<_> = futures::future::join_all(handles).await;
let rows: Vec<_> = results
.into_iter()
.map(|r| r.expect("spawn ok").expect("create ok"))
.collect();
for row in &rows {
let path = tmp.path().join(format!("{}.md", row.id));
assert!(path.exists(), "dump file must exist for row {}", row.id);
}
assert_eq!(rows.len(), 4);
}
#[tokio::test]
async fn store_open_with_bidirectional_sync_returns_ok() {
use crate::dump::{DumpConfig, SyncMode};
let schema = SchemaConfig {
table: "test".into(),
title: None,
description: None,
fields: vec![FieldDef {
name: "title".into(),
ty: FieldType::String,
required: false,
description: None,
}],
dump: Some(DumpConfig {
dir: None,
title_field: None,
body_field: None,
sync: Some(SyncMode::Bidirectional),
}),
};
let store = Store::open(Path::new(":memory:"), schema).await;
assert!(
store.is_ok(),
"Store::open must succeed even with bidirectional sync configured"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_savepoint_atomic_rollback_on_op_failure() {
let store = make_test_store().await;
let result: Result<(), MiniAppError> = store
.execute_under_savepoint(|sp| {
sp.execute(
"INSERT INTO rows (id, data, created_at, updated_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params!["sp-test-id", r#"{"title":"t"}"#, 1000_i64, 1000_i64],
)?;
Err(MiniAppError::Validation {
field: "test".into(),
reason: "forced rollback".into(),
})
})
.await;
assert!(
result.is_err(),
"execute_under_savepoint must propagate the closure error"
);
assert!(
matches!(result.unwrap_err(), MiniAppError::Validation { .. }),
"error variant must be preserved"
);
let rows = store.list(Some(1000), None, None).await.unwrap();
assert_eq!(
rows.len(),
0,
"SAVEPOINT rollback must revert the INSERT (Crux: SAVEPOINT atomicity)"
);
store
.create(serde_json::json!({"title": "after-rollback"}))
.await
.expect("store must be usable after SAVEPOINT rollback");
assert_eq!(store.list(None, None, None).await.unwrap().len(), 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_savepoint_commit_on_success() {
let store = make_test_store().await;
let result = store
.execute_under_savepoint(|sp| {
sp.execute(
"INSERT INTO rows (id, data, created_at, updated_at) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params!["sp-ok-id", r#"{"title":"committed"}"#, 1000_i64, 1000_i64],
)?;
Ok(42_u32)
})
.await;
assert_eq!(
result.unwrap(),
42_u32,
"successful SAVEPOINT must return value"
);
let rows = store.list(Some(10), None, None).await.unwrap();
assert_eq!(rows.len(), 1, "committed INSERT must persist");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_store_concurrent_create() {
let store = Arc::new(make_test_store().await);
let task_count = 8_usize;
let rows_per_task = 100_usize;
let handles: Vec<_> = (0..task_count)
.map(|task_id| {
let s = Arc::clone(&store);
tokio::spawn(async move {
for i in 0..rows_per_task {
s.create(serde_json::json!({"title": format!("task-{task_id}-row-{i}")}))
.await
.expect("concurrent create must succeed");
}
})
})
.collect();
futures::future::join_all(handles)
.await
.into_iter()
.for_each(|r| r.expect("task must not panic"));
let total = store.list(Some(1000), None, None).await.unwrap().len();
assert_eq!(
total,
task_count * rows_per_task,
"all {total} rows must be present; expected {}",
task_count * rows_per_task
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_store_concurrent_update_same_id() {
let store = Arc::new(make_test_store().await);
let row = store
.create(serde_json::json!({"title": "initial"}))
.await
.unwrap();
let id = row.id.clone();
let task_count = 4_usize;
let updates_per_task = 50_usize;
let handles: Vec<_> = (0..task_count)
.map(|task_id| {
let s = Arc::clone(&store);
let row_id = id.clone();
tokio::spawn(async move {
for i in 0..updates_per_task {
s.update(
&row_id,
serde_json::json!({"title": format!("task-{task_id}-update-{i}")}),
UpdateMode::Replace,
)
.await
.expect("concurrent update must succeed");
}
})
})
.collect();
futures::future::join_all(handles)
.await
.into_iter()
.for_each(|r| r.expect("task must not panic"));
let rows = store.list(None, None, None).await.unwrap();
assert_eq!(rows.len(), 1, "update must not insert extra rows");
assert!(
rows[0].data["title"].is_string(),
"title must be a string after concurrent updates"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_store_mutex_poison_propagated_as_error() {
let store = Arc::new(make_test_store().await);
let conn = store.conn.clone();
let _ = tokio::task::spawn_blocking(move || {
let _guard = conn.lock().unwrap(); panic!("intentional poison"); })
.await;
let err = store.get("any-id").await.unwrap_err();
assert!(
matches!(&err, MiniAppError::Schema(msg) if msg.contains("mutex poisoned")),
"expected Schema(\"mutex poisoned\"), got: {err:?}"
);
}
#[tokio::test]
async fn store_open_sets_wal_journal_mode() {
let tmp = tempfile::tempdir().expect("tempdir");
let db_path = tmp.path().join("test.db");
let schema = SchemaConfig {
table: "test".into(),
title: None,
description: None,
fields: vec![FieldDef {
name: "title".into(),
ty: FieldType::String,
required: false,
description: None,
}],
dump: None,
};
let store = Store::open(&db_path, schema)
.await
.expect("Store::open should succeed");
let mode = {
let conn = store.conn.lock().expect("lock");
conn.query_row("PRAGMA journal_mode", [], |row| row.get::<_, String>(0))
.expect("PRAGMA journal_mode query")
};
assert_eq!(
mode.to_lowercase(),
"wal",
"Store::open must set journal_mode = WAL for dual-registry safety (Crux #1)"
);
}
fn make_schema(fields: Vec<FieldDef>) -> SchemaConfig {
SchemaConfig {
table: "test".into(),
title: None,
description: None,
fields,
dump: None,
}
}
#[test]
fn shallow_merge_preserves_absent_fields() {
let schema = make_schema(vec![
FieldDef {
name: "a".into(),
ty: FieldType::Number,
required: true,
description: None,
},
FieldDef {
name: "b".into(),
ty: FieldType::Number,
required: false,
description: None,
},
]);
let current = serde_json::json!({"a": 1, "b": 2});
let patch = serde_json::json!({"a": 9});
let merged = shallow_merge(current, patch, &schema).expect("merge ok");
assert_eq!(merged["a"], 9, "patched field must be updated");
assert_eq!(
merged["b"], 2,
"absent patch field must be preserved from current"
);
}
#[test]
fn shallow_merge_deletes_null_for_optional_field() {
let schema = make_schema(vec![
FieldDef {
name: "a".into(),
ty: FieldType::Number,
required: true,
description: None,
},
FieldDef {
name: "b".into(),
ty: FieldType::Number,
required: false,
description: None,
},
]);
let current = serde_json::json!({"a": 1, "b": 2});
let patch = serde_json::json!({"b": null});
let merged = shallow_merge(current, patch, &schema).expect("merge ok");
assert_eq!(merged["a"], 1);
assert!(
merged.get("b").is_none(),
"null-patched optional field must be physically removed (not set to null)"
);
}
#[test]
fn shallow_merge_errors_on_null_for_required_field() {
let schema = make_schema(vec![FieldDef {
name: "title".into(),
ty: FieldType::String,
required: true,
description: None,
}]);
let current = serde_json::json!({"title": "hello"});
let patch = serde_json::json!({"title": null});
let err = shallow_merge(current, patch, &schema).expect_err("must error");
match err {
MiniAppError::Validation { field, reason } => {
assert_eq!(field, "title");
assert!(
reason.contains("required field cannot be deleted via null"),
"unexpected reason: {reason}"
);
}
other => panic!("expected Validation error, got: {other:?}"),
}
}
#[test]
fn shallow_merge_replaces_nested_object_wholesale() {
let schema = make_schema(vec![FieldDef {
name: "cfg".into(),
ty: FieldType::Object,
required: false,
description: None,
}]);
let current = serde_json::json!({"cfg": {"x": 1, "y": 2}});
let patch = serde_json::json!({"cfg": {"x": 9}});
let merged = shallow_merge(current, patch, &schema).expect("merge ok");
assert_eq!(merged["cfg"]["x"], 9, "x must be updated");
assert!(
merged["cfg"].get("y").is_none(),
"y must be absent (nested object replaced wholesale, not deep-merged)"
);
}
#[test]
fn shallow_merge_rejects_non_object_patch() {
let schema = make_schema(vec![]);
let current = serde_json::json!({"a": 1});
for bad_patch in [
serde_json::json!([1, 2, 3]),
serde_json::json!(42),
serde_json::json!("string"),
] {
let err = shallow_merge(current.clone(), bad_patch, &schema)
.expect_err("non-object patch must be rejected");
match err {
MiniAppError::Validation { field, .. } => {
assert_eq!(field, "(root)", "error field must be '(root)'");
}
other => panic!("expected Validation error, got: {other:?}"),
}
}
}
#[tokio::test]
async fn store_update_merge_runs_post_merge_validation() {
let store = make_test_store().await;
let row = store
.create(serde_json::json!({"title": "x", "state": "open"}))
.await
.unwrap();
let err = store
.update(&row.id, serde_json::json!({"state": 42}), UpdateMode::Merge)
.await
.expect_err("type mismatch must fail post-merge validation");
assert!(
matches!(err, MiniAppError::Validation { .. }),
"expected Validation error, got: {err:?}"
);
}
use crate::filter::ListFilter;
fn make_filter() -> ListFilter {
ListFilter::Eq {
field: "state".to_string(),
value: serde_json::json!("open"),
}
}
#[tokio::test]
async fn alias_create_and_get_round_trip() {
let store = make_test_store().await;
let filter = make_filter();
let filter_json = serde_json::to_string(&filter).unwrap();
store
.alias_create(
"recent_open",
&filter_json,
Some(20),
Some("desc".to_string()),
None,
)
.await
.expect("alias_create must succeed");
let record = store
.alias_get("recent_open")
.await
.expect("alias_get must succeed");
assert_eq!(record.name, "recent_open");
assert_eq!(record.default_limit, Some(20));
assert_eq!(record.description.as_deref(), Some("desc"));
let restored: ListFilter =
serde_json::from_str(&record.filter).expect("filter must deserialise");
let stored_back = serde_json::to_string(&filter).unwrap();
let stored_back2 = serde_json::to_string(&restored).unwrap();
assert_eq!(
stored_back, stored_back2,
"filter must survive a JSON round-trip"
);
}
#[tokio::test]
async fn alias_create_with_optional_nulls() {
let store = make_test_store().await;
let filter = make_filter();
let filter_json = serde_json::to_string(&filter).unwrap();
store
.alias_create("no_opts", &filter_json, None, None, None)
.await
.expect("alias_create must succeed with None optionals");
let record = store
.alias_get("no_opts")
.await
.expect("alias_get must succeed");
assert_eq!(record.name, "no_opts");
assert!(record.default_limit.is_none());
assert!(record.description.is_none());
}
#[tokio::test]
async fn alias_list_returns_all() {
let store = make_test_store().await;
let filter = make_filter();
let list = store
.alias_list()
.await
.expect("alias_list must succeed on empty store");
assert!(list.is_empty(), "empty store should return empty list");
let filter_json = serde_json::to_string(&filter).unwrap();
store
.alias_create("b_alias", &filter_json, None, None, None)
.await
.unwrap();
store
.alias_create("a_alias", &filter_json, None, None, None)
.await
.unwrap();
let list = store.alias_list().await.expect("alias_list must succeed");
assert_eq!(list.len(), 2, "must return 2 aliases");
assert_eq!(list[0].name, "a_alias");
assert_eq!(list[1].name, "b_alias");
}
#[tokio::test]
async fn alias_delete_removes_alias() {
let store = make_test_store().await;
let filter = make_filter();
let filter_json = serde_json::to_string(&filter).unwrap();
store
.alias_create("to_delete", &filter_json, None, None, None)
.await
.unwrap();
store
.alias_delete("to_delete")
.await
.expect("alias_delete must succeed");
let err = store
.alias_get("to_delete")
.await
.expect_err("alias_get after delete must fail");
assert!(
matches!(err, MiniAppError::AliasNotFound { ref name } if name == "to_delete"),
"expected AliasNotFound, got: {err:?}"
);
}
#[tokio::test]
async fn alias_create_duplicate_returns_already_exists() {
let store = make_test_store().await;
let filter = make_filter();
let filter_json = serde_json::to_string(&filter).unwrap();
store
.alias_create("dup", &filter_json, None, None, None)
.await
.expect("first alias_create must succeed");
let err = store
.alias_create("dup", &filter_json, None, None, None)
.await
.expect_err("second alias_create must fail");
assert!(
matches!(err, MiniAppError::AliasAlreadyExists { ref name } if name == "dup"),
"expected AliasAlreadyExists, got: {err:?}"
);
}
#[tokio::test]
async fn alias_get_missing_returns_not_found() {
let store = make_test_store().await;
let err = store
.alias_get("nonexistent")
.await
.expect_err("alias_get on missing alias must fail");
assert!(
matches!(err, MiniAppError::AliasNotFound { ref name } if name == "nonexistent"),
"expected AliasNotFound, got: {err:?}"
);
}
#[tokio::test]
async fn alias_delete_missing_returns_not_found() {
let store = make_test_store().await;
let err = store
.alias_delete("nonexistent")
.await
.expect_err("alias_delete on missing alias must fail");
assert!(
matches!(err, MiniAppError::AliasNotFound { ref name } if name == "nonexistent"),
"expected AliasNotFound, got: {err:?}"
);
}
#[tokio::test]
async fn alias_namespace_isolation_between_stores() {
let store_a = make_test_store().await;
let store_b = make_test_store().await;
let filter = make_filter();
let filter_json = serde_json::to_string(&filter).unwrap();
store_a
.alias_create("shared_name", &filter_json, None, None, None)
.await
.expect("store_a alias_create must succeed");
let err = store_b
.alias_get("shared_name")
.await
.expect_err("alias created in store_a must not be visible in store_b");
assert!(
matches!(err, MiniAppError::AliasNotFound { .. }),
"expected AliasNotFound in store_b, got: {err:?}"
);
}
#[tokio::test]
async fn test_get_prefix_match_single() {
let store = make_test_store().await;
let row = store
.create(serde_json::json!({"title": "prefix-test"}))
.await
.unwrap();
let prefix = &row.id[..8];
let fetched = store.get(prefix).await.unwrap();
assert_eq!(fetched.id, row.id);
assert_eq!(fetched.data["title"], "prefix-test");
}
#[tokio::test]
async fn test_get_prefix_match_not_found() {
let store = make_test_store().await;
let err = store.get("zzzzzzzz").await.unwrap_err();
assert!(
matches!(err, MiniAppError::NotFound { .. }),
"expected NotFound, got: {err:?}"
);
}
#[tokio::test]
async fn test_get_prefix_match_ambiguous() {
let store = make_test_store().await;
let id1 = "aaaaaaaa-0000-4000-8000-000000000001".to_string();
let id2 = "aaaaaaaa-0000-4000-8000-000000000002".to_string();
let id1_clone = id1.clone();
let id2_clone = id2.clone();
store
.execute_under_savepoint(move |sp| {
sp.execute(
"INSERT INTO rows (id, data, created_at, updated_at) VALUES (?1, ?2, 0, 0)",
rusqlite::params![id1_clone, r#"{"title":"a1"}"#],
)?;
sp.execute(
"INSERT INTO rows (id, data, created_at, updated_at) VALUES (?1, ?2, 0, 0)",
rusqlite::params![id2_clone, r#"{"title":"a2"}"#],
)?;
Ok(())
})
.await
.unwrap();
let err = store.get("aaaaaaaa").await.unwrap_err();
match err {
MiniAppError::AmbiguousId {
ref id_prefix,
ref candidates,
} => {
assert_eq!(id_prefix, "aaaaaaaa");
assert_eq!(candidates.len(), 2);
let mut sorted = candidates.clone();
sorted.sort();
assert_eq!(sorted[0], id1);
assert_eq!(sorted[1], id2);
}
other => panic!("expected AmbiguousId, got: {other:?}"),
}
}
#[tokio::test]
async fn test_get_full_uuid_bypass() {
let store = make_test_store().await;
let row = store
.create(serde_json::json!({"title": "bypass-test"}))
.await
.unwrap();
assert_eq!(row.id.len(), 36, "UUID must be 36 chars");
let fetched = store.get(&row.id).await.unwrap();
assert_eq!(fetched.id, row.id);
}
#[tokio::test]
async fn test_update_prefix_match_single() {
let store = make_test_store().await;
let row = store
.create(serde_json::json!({"title": "before"}))
.await
.unwrap();
let prefix = &row.id[..8];
let updated = store
.update(
prefix,
serde_json::json!({"title": "after"}),
UpdateMode::Replace,
)
.await
.unwrap();
assert_eq!(updated.id, row.id);
assert_eq!(updated.data["title"], "after");
}
#[tokio::test]
async fn test_update_prefix_match_ambiguous() {
let store = make_test_store().await;
let id1 = "bbbbbbbb-0000-4000-8000-000000000001".to_string();
let id2 = "bbbbbbbb-0000-4000-8000-000000000002".to_string();
store
.execute_under_savepoint(move |sp| {
sp.execute(
"INSERT INTO rows (id, data, created_at, updated_at) VALUES (?1, ?2, 0, 0)",
rusqlite::params![id1, r#"{"title":"b1"}"#],
)?;
sp.execute(
"INSERT INTO rows (id, data, created_at, updated_at) VALUES (?1, ?2, 0, 0)",
rusqlite::params![id2, r#"{"title":"b2"}"#],
)?;
Ok(())
})
.await
.unwrap();
let err = store
.update(
"bbbbbbbb",
serde_json::json!({"title": "x"}),
UpdateMode::Replace,
)
.await
.unwrap_err();
assert!(
matches!(err, MiniAppError::AmbiguousId { .. }),
"expected AmbiguousId, got: {err:?}"
);
}
#[tokio::test]
async fn test_delete_prefix_match_single() {
let store = make_test_store().await;
let row = store
.create(serde_json::json!({"title": "to-delete-prefix"}))
.await
.unwrap();
let prefix = &row.id[..8];
store.delete(prefix).await.unwrap();
let err = store.get(&row.id).await.unwrap_err();
assert!(
matches!(err, MiniAppError::NotFound { .. }),
"expected NotFound after delete, got: {err:?}"
);
}
#[tokio::test]
async fn test_delete_prefix_match_ambiguous() {
let store = make_test_store().await;
let id1 = "cccccccc-0000-4000-8000-000000000001".to_string();
let id2 = "cccccccc-0000-4000-8000-000000000002".to_string();
store
.execute_under_savepoint(move |sp| {
sp.execute(
"INSERT INTO rows (id, data, created_at, updated_at) VALUES (?1, ?2, 0, 0)",
rusqlite::params![id1, r#"{"title":"c1"}"#],
)?;
sp.execute(
"INSERT INTO rows (id, data, created_at, updated_at) VALUES (?1, ?2, 0, 0)",
rusqlite::params![id2, r#"{"title":"c2"}"#],
)?;
Ok(())
})
.await
.unwrap();
let err = store.delete("cccccccc").await.unwrap_err();
assert!(
matches!(err, MiniAppError::AmbiguousId { .. }),
"expected AmbiguousId, got: {err:?}"
);
}
}