use ferro_audit::{AuditActor, AuditEntry, AuditTarget};
use ferro_projections::{ActionDef, CrudPlan};
use sea_orm::{ConnectionTrait, DatabaseBackend, DatabaseConnection, Statement};
use serde_json::Value;
use std::future::Future;
use std::pin::Pin;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum WriteError {
#[error("database error: {0}")]
Database(String),
#[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("guard failed: {0}")]
GuardFailed(String),
#[error("validation error: {0}")]
Validation(String),
#[error("action not found: {0}")]
ActionNotFound(String),
#[cfg(feature = "confirmation")]
#[error("confirmation required for action: {0}")]
ConfirmationRequired(String),
#[error("crud verb not enabled: {0}")]
CrudVerbNotEnabled(String),
#[error("record not found or already deleted")]
RecordNotFound,
}
pub type WriteResult<T> = Result<T, WriteError>;
pub type ExecutorFn = Box<
dyn Fn(
&str, // action_name
&Value, // validated inputs
i64, // tenant_id (from auth, never from payload)
&DatabaseConnection,
) -> Pin<Box<dyn Future<Output = WriteResult<Value>> + Send>>
+ Send
+ Sync,
>;
pub type GuardEvaluatorFn = Box<
dyn Fn(
&str, // guard_name
i64, // tenant_id
&Value, // validated inputs (for record-scoped guards)
&DatabaseConnection,
) -> Pin<Box<dyn Future<Output = WriteResult<bool>> + Send>>
+ Send
+ Sync,
>;
pub type OverrideFn = Box<
dyn Fn(
&str, // action_name
&Value, // validated inputs
i64, // tenant_id (from auth, never from payload)
&DatabaseConnection,
&Value, // base persist result
) -> Pin<Box<dyn Future<Output = WriteResult<()>> + Send>>
+ Send
+ Sync,
>;
pub struct WriteDispatcher {
pub executor: ExecutorFn,
pub guard_evaluator: GuardEvaluatorFn,
pub overrides: std::collections::HashMap<String, OverrideFn>,
}
impl WriteDispatcher {
pub fn new(executor: ExecutorFn, guard_evaluator: GuardEvaluatorFn) -> Self {
Self {
executor,
guard_evaluator,
overrides: std::collections::HashMap::new(),
}
}
pub fn with_override(mut self, action: impl Into<String>, hook: OverrideFn) -> Self {
self.overrides.insert(action.into(), hook);
self
}
}
fn placeholder(backend: DatabaseBackend, index: usize) -> String {
match backend {
DatabaseBackend::Postgres => format!("${index}"),
_ => "?".to_string(),
}
}
fn json_to_sea_value(val: &serde_json::Value) -> sea_orm::Value {
match val {
serde_json::Value::Null => sea_orm::Value::String(None),
serde_json::Value::Bool(b) => sea_orm::Value::Bool(Some(*b)),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
sea_orm::Value::BigInt(Some(i))
} else {
sea_orm::Value::Double(n.as_f64())
}
}
serde_json::Value::String(s) => sea_orm::Value::String(Some(Box::new(s.clone()))),
other => sea_orm::Value::String(Some(Box::new(other.to_string()))),
}
}
fn row_to_json(row: &sea_orm::QueryResult) -> serde_json::Value {
let columns: Vec<String> = row.column_names().iter().map(|s| s.to_string()).collect();
let mut obj = serde_json::Map::new();
for col in &columns {
let val = row
.try_get_by::<i64, _>(col.as_str())
.map(|v| serde_json::Value::Number(v.into()))
.or_else(|_| {
row.try_get_by::<f64, _>(col.as_str()).map(|v| {
serde_json::Number::from_f64(v)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null)
})
})
.or_else(|_| {
row.try_get_by::<bool, _>(col.as_str())
.map(serde_json::Value::Bool)
})
.or_else(|_| {
row.try_get_by::<String, _>(col.as_str())
.map(serde_json::Value::String)
})
.unwrap_or(serde_json::Value::Null);
obj.insert(col.clone(), val);
}
serde_json::Value::Object(obj)
}
async fn execute_crud_plan(
plan: &CrudPlan,
tenant_id: i64,
db: &DatabaseConnection,
) -> WriteResult<Value> {
let backend = db.get_database_backend();
let now_expr = match backend {
DatabaseBackend::Postgres => "NOW()",
_ => "datetime('now')",
};
match plan {
CrudPlan::Create {
table,
columns,
tenant_column,
} => {
let mut col_names: Vec<String> = columns.iter().map(|(c, _)| c.clone()).collect();
col_names.push("created_at".to_string());
if let Some(ref tc) = tenant_column {
col_names.push(tc.column.clone());
}
let mut ph_parts: Vec<String> = (1..=columns.len())
.map(|i| placeholder(backend, i))
.collect();
ph_parts.push(now_expr.to_string()); if tenant_column.is_some() {
ph_parts.push(placeholder(backend, columns.len() + 1));
}
let col_list = col_names.join(", ");
let ph_list = ph_parts.join(", ");
let mut values: Vec<sea_orm::Value> =
columns.iter().map(|(_, v)| json_to_sea_value(v)).collect();
if tenant_column.is_some() {
values.push(sea_orm::Value::BigInt(Some(tenant_id)));
}
let sql = format!("INSERT INTO {table} ({col_list}) VALUES ({ph_list}) RETURNING *");
let stmt = Statement::from_sql_and_values(backend, &sql, values);
let row = db
.query_one(stmt)
.await
.map_err(|e| WriteError::Database(e.to_string()))?
.ok_or_else(|| {
WriteError::Database("INSERT RETURNING returned no row".to_string())
})?;
Ok(row_to_json(&row))
}
CrudPlan::Update {
table,
id_column,
id_value,
patch,
soft_delete_column,
tenant_column,
} => {
if patch.is_empty() {
return Err(WriteError::Validation(
"patch must contain at least one field".into(),
));
}
let set_clauses: Vec<String> = patch
.iter()
.enumerate()
.map(|(i, (col, _))| format!("{col} = {}", placeholder(backend, i + 1)))
.collect();
let set_sql = set_clauses.join(", ");
let id_ph = placeholder(backend, patch.len() + 1);
let sql = if let Some(ref tc) = tenant_column {
let tenant_ph = placeholder(backend, patch.len() + 2);
format!(
"UPDATE {table} SET {set_sql} WHERE {id_column} = {id_ph} \
AND {soft_delete_column} IS NULL AND {tc_col} = {tenant_ph}",
tc_col = tc.column
)
} else {
format!(
"UPDATE {table} SET {set_sql} WHERE {id_column} = {id_ph} AND {soft_delete_column} IS NULL"
)
};
let mut values: Vec<sea_orm::Value> =
patch.iter().map(|(_, v)| json_to_sea_value(v)).collect();
values.push(json_to_sea_value(id_value));
if tenant_column.is_some() {
values.push(sea_orm::Value::BigInt(Some(tenant_id)));
}
let stmt = Statement::from_sql_and_values(backend, &sql, values);
let exec_result = db
.execute(stmt)
.await
.map_err(|e| WriteError::Database(e.to_string()))?;
if exec_result.rows_affected() == 0 {
return Err(WriteError::RecordNotFound);
}
let id_ph2 = placeholder(backend, 1);
let (select_sql, select_values) = if let Some(ref tc) = tenant_column {
let t_ph2 = placeholder(backend, 2);
let sql = format!(
"SELECT * FROM {table} WHERE {id_column} = {id_ph2} \
AND {soft_delete_column} IS NULL AND {tc_col} = {t_ph2}",
tc_col = tc.column
);
let vals = vec![
json_to_sea_value(id_value),
sea_orm::Value::BigInt(Some(tenant_id)),
];
(sql, vals)
} else {
let sql = format!(
"SELECT * FROM {table} WHERE {id_column} = {id_ph2} AND {soft_delete_column} IS NULL"
);
(sql, vec![json_to_sea_value(id_value)])
};
let select_stmt = Statement::from_sql_and_values(backend, &select_sql, select_values);
let row = db
.query_one(select_stmt)
.await
.map_err(|e| WriteError::Database(e.to_string()))?
.ok_or_else(|| {
WriteError::Database("SELECT after UPDATE returned no row".to_string())
})?;
Ok(row_to_json(&row))
}
CrudPlan::Delete {
table,
id_column,
id_value,
soft_delete_column,
tenant_column,
} => {
let id_ph = placeholder(backend, 1);
let sql = if let Some(ref tc) = tenant_column {
let tenant_ph = placeholder(backend, 2);
format!(
"UPDATE {table} SET {soft_delete_column} = {now_expr} \
WHERE {id_column} = {id_ph} AND {soft_delete_column} IS NULL \
AND {tc_col} = {tenant_ph}",
tc_col = tc.column
)
} else {
format!(
"UPDATE {table} SET {soft_delete_column} = {now_expr} \
WHERE {id_column} = {id_ph} AND {soft_delete_column} IS NULL"
)
};
let mut stmt_values = vec![json_to_sea_value(id_value)];
if tenant_column.is_some() {
stmt_values.push(sea_orm::Value::BigInt(Some(tenant_id)));
}
let stmt = Statement::from_sql_and_values(backend, &sql, stmt_values);
let exec_result = db
.execute(stmt)
.await
.map_err(|e| WriteError::Database(e.to_string()))?;
if exec_result.rows_affected() == 0 {
return Err(WriteError::RecordNotFound);
}
Ok(serde_json::json!({ "id": id_value, "deleted": true }))
}
}
}
pub fn merged_guards(preconditions: &[String], transition_guard: Option<&str>) -> Vec<String> {
let mut guards: Vec<String> = preconditions.to_vec();
if let Some(g) = transition_guard {
if !guards.iter().any(|existing| existing == g) {
guards.push(g.to_string());
}
}
guards
}
async fn lookup_idempotency(
tenant_id: i64,
key: &str,
db: &DatabaseConnection,
) -> WriteResult<Option<Value>> {
let backend = db.get_database_backend();
let (sql, values) = match backend {
DatabaseBackend::Postgres => (
"SELECT result FROM mcp_idempotency_keys WHERE tenant_id = $1 AND idempotency_key = $2"
.to_string(),
vec![
sea_orm::Value::BigInt(Some(tenant_id)),
sea_orm::Value::String(Some(Box::new(key.to_string()))),
],
),
_ => (
"SELECT result FROM mcp_idempotency_keys WHERE tenant_id = ? AND idempotency_key = ?"
.to_string(),
vec![
sea_orm::Value::BigInt(Some(tenant_id)),
sea_orm::Value::String(Some(Box::new(key.to_string()))),
],
),
};
let stmt = Statement::from_sql_and_values(backend, &sql, values);
match db
.query_one(stmt)
.await
.map_err(|e| WriteError::Database(e.to_string()))?
{
None => Ok(None),
Some(row) => {
let json_text: String = row
.try_get("", "result")
.map_err(|e| WriteError::Database(e.to_string()))?;
let value: Value = serde_json::from_str(&json_text)
.map_err(|e| WriteError::Database(e.to_string()))?;
Ok(Some(value))
}
}
}
async fn store_idempotency(
tenant_id: i64,
key: &str,
result: &Value,
db: &DatabaseConnection,
) -> WriteResult<()> {
let backend = db.get_database_backend();
let json_text = serde_json::to_string(result).map_err(WriteError::Serialization)?;
let (sql, values) = match backend {
DatabaseBackend::Postgres => (
"INSERT INTO mcp_idempotency_keys (tenant_id, idempotency_key, result, created_at) \
VALUES ($1, $2, $3, NOW()) ON CONFLICT (tenant_id, idempotency_key) DO NOTHING"
.to_string(),
vec![
sea_orm::Value::BigInt(Some(tenant_id)),
sea_orm::Value::String(Some(Box::new(key.to_string()))),
sea_orm::Value::String(Some(Box::new(json_text))),
],
),
_ => (
"INSERT OR IGNORE INTO mcp_idempotency_keys \
(tenant_id, idempotency_key, result) VALUES (?, ?, ?)"
.to_string(),
vec![
sea_orm::Value::BigInt(Some(tenant_id)),
sea_orm::Value::String(Some(Box::new(key.to_string()))),
sea_orm::Value::String(Some(Box::new(json_text))),
],
),
};
let stmt = Statement::from_sql_and_values(backend, &sql, values);
db.execute(stmt)
.await
.map_err(|e| WriteError::Database(e.to_string()))?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn dispatch_write(
action: &ActionDef,
inputs: &Value,
tenant_id: i64,
db: &DatabaseConnection,
dispatcher: &WriteDispatcher,
transition_guard: Option<&str>,
channel: &str,
#[cfg(feature = "confirmation")] is_confirmed: bool,
crud_plan: Option<&CrudPlan>,
) -> WriteResult<Value> {
let guards = merged_guards(&action.preconditions, transition_guard);
for guard_name in &guards {
let passes = (dispatcher.guard_evaluator)(guard_name, tenant_id, inputs, db)
.await
.map_err(|e| WriteError::GuardFailed(format!("{guard_name}: {e}")))?;
if !passes {
return Err(WriteError::GuardFailed(format!(
"precondition '{guard_name}' not met"
)));
}
}
if let Some(key) = inputs.get("idempotency_key").and_then(|v| v.as_str()) {
if key.len() > 128 {
return Err(WriteError::Validation(
"idempotency_key must not exceed 128 characters".into(),
));
}
}
let idempotency_key = inputs.get("idempotency_key").and_then(|v| v.as_str());
if let Some(key) = idempotency_key {
if let Some(stored_result) = lookup_idempotency(tenant_id, key, db).await? {
return Ok(stored_result);
}
}
#[cfg(feature = "confirmation")]
{
let is_destructive = action.transition_trigger.is_some()
|| matches!(crud_plan, Some(CrudPlan::Delete { .. }));
if is_destructive && !is_confirmed {
return Err(WriteError::ConfirmationRequired(action.name.clone()));
}
}
#[cfg(not(feature = "confirmation"))]
let _ = (&action.transition_trigger, crud_plan);
let result = if let Some(plan) = crud_plan {
execute_crud_plan(plan, tenant_id, db).await?
} else {
(dispatcher.executor)(&action.name, inputs, tenant_id, db).await?
};
if let Some(key) = idempotency_key {
store_idempotency(tenant_id, key, &result, db).await?;
}
let record_id = inputs.get("id").map(|v| v.to_string()).unwrap_or_default();
let audit_action = if crud_plan.is_some() {
format!("{channel}.crud.{}", &action.name)
} else {
format!("{channel}.action.{}", &action.name)
};
AuditEntry::record(audit_action)
.tenant(tenant_id.to_string())
.actor(AuditActor::User(tenant_id.to_string()))
.target(AuditTarget::new(&action.name, record_id))
.after(result.clone())
.reason(&action.name)
.write(db)
.await
.map_err(|e| WriteError::Database(e.to_string()))?;
if let Some(hook) = dispatcher.overrides.get(&action.name) {
(hook)(&action.name, inputs, tenant_id, db, &result).await?;
}
Ok(result)
}
#[cfg(test)]
mod tests {
use super::*;
use ferro_projections::ActionDef;
use sea_orm::{ConnectionTrait, Database, DatabaseBackend, Statement};
use serde_json::json;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
async fn setup_db() -> sea_orm::DatabaseConnection {
let db = Database::connect("sqlite::memory:")
.await
.expect("in-memory SQLite connect failed");
db.execute(Statement::from_string(
DatabaseBackend::Sqlite,
"CREATE TABLE IF NOT EXISTS mcp_idempotency_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id INTEGER NOT NULL,
idempotency_key TEXT NOT NULL,
result TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE (tenant_id, idempotency_key)
)"
.to_string(),
))
.await
.expect("create mcp_idempotency_keys table");
db.execute(Statement::from_string(
DatabaseBackend::Sqlite,
"CREATE TABLE IF NOT EXISTS audit_log (
id TEXT PRIMARY KEY NOT NULL,
tenant_id TEXT,
actor_kind TEXT NOT NULL,
actor_id TEXT,
action TEXT NOT NULL,
target_kind TEXT,
target_id TEXT,
before TEXT,
after TEXT,
reason TEXT,
correlation_id TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
.to_string(),
))
.await
.expect("create audit_log table");
db.execute(Statement::from_string(
DatabaseBackend::Sqlite,
"CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
status TEXT NOT NULL DEFAULT 'draft',
amount TEXT,
created_at TEXT DEFAULT (datetime('now')),
deleted_at TEXT
)"
.to_string(),
))
.await
.expect("create orders table");
db
}
fn approve_action() -> ActionDef {
ActionDef::new("approve")
.transition_trigger("approve")
.precondition("is_manager")
}
fn submit_action() -> ActionDef {
ActionDef::new("submit").transition_trigger("submit")
}
fn update_action() -> ActionDef {
ActionDef::new("update")
}
#[tokio::test]
async fn guard_denied_at_call_time() {
let db = setup_db().await;
let dispatcher = WriteDispatcher {
guard_evaluator: Box::new(|_, _, _, _| Box::pin(async { Ok(false) })),
executor: Box::new(|_, _, _, _| {
Box::pin(async { panic!("executor must not run when guard fails") })
}),
overrides: std::collections::HashMap::new(),
};
let result = dispatch_write(
&approve_action(),
&json!({"id": 1}),
1,
&db,
&dispatcher,
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
None,
)
.await;
assert!(
matches!(result, Err(WriteError::GuardFailed(_))),
"expected Err(GuardFailed(_)), got: {result:?}"
);
}
#[tokio::test]
async fn guard_rejects_illegal_transition() {
let db = setup_db().await;
let dispatcher = WriteDispatcher {
guard_evaluator: Box::new(|_, _, _, _| Box::pin(async { Ok(false) })),
executor: Box::new(|_, _, _, _| {
Box::pin(async { panic!("executor must not run when transition guard fails") })
}),
overrides: std::collections::HashMap::new(),
};
let result = dispatch_write(
&submit_action(),
&json!({"id": 1}),
1,
&db,
&dispatcher,
Some("is_manager"),
"mcp",
#[cfg(feature = "confirmation")]
false,
None,
)
.await;
assert!(
matches!(result, Err(WriteError::GuardFailed(_))),
"expected Err(GuardFailed(_)) from the transition guard, got: {result:?}"
);
}
#[tokio::test]
async fn transition_guard_evaluated_at_call_time() {
let db = setup_db().await;
let saw_transition_guard = Arc::new(std::sync::atomic::AtomicBool::new(false));
let dispatcher = WriteDispatcher {
guard_evaluator: Box::new({
let flag = saw_transition_guard.clone();
move |name: &str, _, _, _| {
if name == "is_manager" {
flag.store(true, Ordering::SeqCst);
}
Box::pin(async { Ok(true) })
}
}),
executor: Box::new(|_, _, _, _| {
Box::pin(async { Ok(json!({ "status": "submitted" })) })
}),
overrides: std::collections::HashMap::new(),
};
let result = dispatch_write(
&submit_action(),
&json!({"id": 1}),
1,
&db,
&dispatcher,
Some("is_manager"),
"mcp",
#[cfg(feature = "confirmation")]
true,
None,
)
.await;
assert!(
result.is_ok(),
"guard returns true, write must succeed: {result:?}"
);
assert!(
saw_transition_guard.load(Ordering::SeqCst),
"the transition-level guard 'is_manager' must be evaluated live"
);
}
#[tokio::test]
async fn guard_deduped_when_on_both() {
let db = setup_db().await;
let call_count = Arc::new(AtomicUsize::new(0));
let dispatcher = WriteDispatcher {
guard_evaluator: Box::new({
let count = call_count.clone();
move |name: &str, _, _, _| {
if name == "is_manager" {
count.fetch_add(1, Ordering::SeqCst);
}
Box::pin(async { Ok(true) })
}
}),
executor: Box::new(|_, _, _, _| {
Box::pin(async { Ok(json!({ "status": "approved" })) })
}),
overrides: std::collections::HashMap::new(),
};
let result = dispatch_write(
&approve_action(),
&json!({"id": 1}),
1,
&db,
&dispatcher,
Some("is_manager"),
"mcp",
#[cfg(feature = "confirmation")]
true,
None,
)
.await;
assert!(
result.is_ok(),
"both guards pass; write must succeed: {result:?}"
);
assert_eq!(
call_count.load(Ordering::SeqCst),
1,
"is_manager must be evaluated exactly once (deduped), not twice"
);
}
#[tokio::test]
async fn override_hook_runs_post_persist() {
let db = setup_db().await;
let order = Arc::new(std::sync::Mutex::new(Vec::<&'static str>::new()));
let dispatcher = WriteDispatcher {
guard_evaluator: Box::new(|_, _, _, _| Box::pin(async { Ok(true) })),
executor: Box::new({
let order = order.clone();
move |_, _, _, _| {
order.lock().unwrap().push("persist");
Box::pin(async { Ok(json!({ "status": "submitted" })) })
}
}),
overrides: std::collections::HashMap::new(),
}
.with_override(
"submit",
Box::new({
let order = order.clone();
move |_action, _inputs, _tenant, _db, base_result: &Value| {
assert_eq!(base_result["status"], "submitted");
order.lock().unwrap().push("override");
Box::pin(async { Ok(()) })
}
}),
);
let result = dispatch_write(
&submit_action(),
&json!({"id": 1}),
1,
&db,
&dispatcher,
None,
"mcp",
#[cfg(feature = "confirmation")]
true,
None,
)
.await;
assert!(result.is_ok(), "write must succeed: {result:?}");
assert_eq!(
result.unwrap(),
json!({ "status": "submitted" }),
"base result must be returned unchanged by the override"
);
assert_eq!(
*order.lock().unwrap(),
vec!["persist", "override"],
"override must run AFTER the base persist"
);
}
#[tokio::test]
async fn no_override_is_declaration_only() {
let db = setup_db().await;
let dispatcher = WriteDispatcher {
guard_evaluator: Box::new(|_, _, _, _| Box::pin(async { Ok(true) })),
executor: Box::new(|_, _, _, _| {
Box::pin(async { Ok(json!({ "status": "submitted" })) })
}),
overrides: std::collections::HashMap::new(),
};
let result = dispatch_write(
&submit_action(),
&json!({"id": 1}),
1,
&db,
&dispatcher,
None,
"mcp",
#[cfg(feature = "confirmation")]
true,
None,
)
.await;
assert!(result.is_ok(), "no-override path must succeed: {result:?}");
assert_eq!(result.unwrap(), json!({ "status": "submitted" }));
}
#[tokio::test]
async fn override_error_surfaces() {
let db = setup_db().await;
let dispatcher = WriteDispatcher {
guard_evaluator: Box::new(|_, _, _, _| Box::pin(async { Ok(true) })),
executor: Box::new(|_, _, _, _| {
Box::pin(async { Ok(json!({ "status": "submitted" })) })
}),
overrides: std::collections::HashMap::new(),
}
.with_override(
"submit",
Box::new(|_action, _inputs, _tenant, _db, _base| {
Box::pin(async { Err(WriteError::Validation("override failed".into())) })
}),
);
let result = dispatch_write(
&submit_action(),
&json!({"id": 1, "idempotency_key": "ovr-fail-1"}),
1,
&db,
&dispatcher,
None,
"mcp",
#[cfg(feature = "confirmation")]
true,
None,
)
.await;
assert!(
matches!(result, Err(WriteError::Validation(ref m)) if m == "override failed"),
"override error must propagate, got: {result:?}"
);
let audit_count: i64 = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT COUNT(*) AS c FROM audit_log WHERE action = 'mcp.action.submit'"
.to_string(),
))
.await
.expect("audit query must succeed")
.expect("audit count row must exist")
.try_get::<i64>("", "c")
.expect("audit count column");
assert_eq!(
audit_count, 1,
"base persist must be audited even when the override fails (WR-01)"
);
let stored = lookup_idempotency(1, "ovr-fail-1", &db)
.await
.expect("idempotency lookup must succeed");
assert_eq!(
stored,
Some(json!({ "status": "submitted" })),
"base persist's idempotency key must be stored even when the override fails (WR-01)"
);
}
#[tokio::test]
async fn idempotent_replay_does_not_re_execute() {
let db = setup_db().await;
let exec_count = Arc::new(AtomicUsize::new(0));
let dispatcher = WriteDispatcher {
executor: Box::new({
let count = exec_count.clone();
move |_, _, _, _| {
count.fetch_add(1, Ordering::SeqCst);
Box::pin(async { Ok(json!({ "status": "submitted" })) })
}
}),
guard_evaluator: Box::new(|_, _, _, _| Box::pin(async { Ok(true) })),
overrides: std::collections::HashMap::new(),
};
let args = json!({ "id": 1, "idempotency_key": "k-abc" });
let result1 = dispatch_write(
&update_action(),
&args,
1,
&db,
&dispatcher,
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
None,
)
.await;
let result2 = dispatch_write(
&update_action(),
&args,
1,
&db,
&dispatcher,
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
None,
)
.await;
assert!(result1.is_ok(), "first call must succeed; got: {result1:?}");
assert!(
result2.is_ok(),
"second call must succeed (replay); got: {result2:?}"
);
assert_eq!(
result1.unwrap(),
result2.unwrap(),
"idempotent replay must return the same result"
);
assert_eq!(
exec_count.load(Ordering::SeqCst),
1,
"executor must fire exactly once despite two identical calls"
);
}
#[tokio::test]
async fn audit_channel_is_parameterized() {
let db = setup_db().await;
let dispatcher = WriteDispatcher {
guard_evaluator: Box::new(|_, _, _, _| Box::pin(async { Ok(true) })),
executor: Box::new(|_, _, _, _| {
Box::pin(async { Ok(json!({ "status": "submitted" })) })
}),
overrides: std::collections::HashMap::new(),
};
let result = dispatch_write(
&submit_action(),
&json!({"id": 1}),
1,
&db,
&dispatcher,
None,
"web",
#[cfg(feature = "confirmation")]
true,
None,
)
.await;
assert!(result.is_ok(), "write must succeed: {result:?}");
let web_count: i64 = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT COUNT(*) AS c FROM audit_log WHERE action = 'web.action.submit'"
.to_string(),
))
.await
.expect("audit query must succeed")
.expect("audit count row must exist")
.try_get::<i64>("", "c")
.expect("audit count column");
assert_eq!(
web_count, 1,
"audit action must be prefixed by the channel argument ('web')"
);
let mcp_count: i64 = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT COUNT(*) AS c FROM audit_log WHERE action = 'mcp.action.submit'"
.to_string(),
))
.await
.expect("audit query must succeed")
.expect("audit count row must exist")
.try_get::<i64>("", "c")
.expect("audit count column");
assert_eq!(
mcp_count, 0,
"no mcp-channel audit must be written when channel is 'web'"
);
}
fn crud_action(name: &str) -> ActionDef {
ActionDef::new(name)
}
fn allow_dispatcher() -> WriteDispatcher {
WriteDispatcher {
guard_evaluator: Box::new(|_, _, _, _| Box::pin(async { Ok(true) })),
executor: Box::new(|_, _, _, _| {
Box::pin(async {
panic!("executor must NOT run on the CRUD path (crud_plan=Some)")
})
}),
overrides: std::collections::HashMap::new(),
}
}
fn create_plan(columns: Vec<(&str, serde_json::Value)>) -> CrudPlan {
CrudPlan::Create {
table: "orders".to_string(),
columns: columns
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect(),
tenant_column: None,
}
}
fn update_plan(id: i64, patch: Vec<(&str, serde_json::Value)>) -> CrudPlan {
CrudPlan::Update {
table: "orders".to_string(),
id_column: "id".to_string(),
id_value: serde_json::json!(id),
patch: patch.into_iter().map(|(k, v)| (k.to_string(), v)).collect(),
soft_delete_column: "deleted_at".to_string(),
tenant_column: None,
}
}
fn delete_plan(id: i64) -> CrudPlan {
CrudPlan::Delete {
table: "orders".to_string(),
id_column: "id".to_string(),
id_value: serde_json::json!(id),
soft_delete_column: "deleted_at".to_string(),
tenant_column: None,
}
}
#[tokio::test]
async fn crud_create_inserts_row() {
let db = setup_db().await;
let plan = create_plan(vec![("status", json!("draft")), ("amount", json!("99.00"))]);
let result = dispatch_write(
&crud_action("create_order"),
&json!({}),
1,
&db,
&allow_dispatcher(),
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
Some(&plan),
)
.await
.expect("crud create must succeed");
assert!(
result.get("id").is_some(),
"returned record must have an id: {result:?}"
);
let count: i64 = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT COUNT(*) AS c FROM orders".to_string(),
))
.await
.expect("count query")
.expect("count row")
.try_get::<i64>("", "c")
.expect("count column");
assert_eq!(count, 1, "exactly one row must be in orders after create");
}
#[tokio::test]
async fn crud_update_patches_row() {
let db = setup_db().await;
let create = create_plan(vec![("status", json!("draft")), ("amount", json!("10.00"))]);
dispatch_write(
&crud_action("create_order"),
&json!({}),
1,
&db,
&allow_dispatcher(),
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
Some(&create),
)
.await
.expect("pre-insert must succeed");
let upd = update_plan(1, vec![("amount", json!("55.00"))]);
let result = dispatch_write(
&crud_action("update_order"),
&json!({"id": 1}),
1,
&db,
&allow_dispatcher(),
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
Some(&upd),
)
.await
.expect("crud update must succeed");
assert_eq!(
result["amount"],
json!("55.00"),
"amount must be updated: {result:?}"
);
assert_eq!(
result["status"],
json!("draft"),
"status must remain draft: {result:?}"
);
}
#[tokio::test]
async fn crud_update_soft_deleted_not_found() {
let db = setup_db().await;
let create = create_plan(vec![("status", json!("draft"))]);
dispatch_write(
&crud_action("create_order"),
&json!({}),
1,
&db,
&allow_dispatcher(),
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
Some(&create),
)
.await
.expect("pre-insert must succeed");
db.execute(Statement::from_string(
DatabaseBackend::Sqlite,
"UPDATE orders SET deleted_at = datetime('now') WHERE id = 1".to_string(),
))
.await
.expect("manual soft-delete must succeed");
let upd = update_plan(1, vec![("status", json!("submitted"))]);
let result = dispatch_write(
&crud_action("update_order"),
&json!({"id": 1}),
1,
&db,
&allow_dispatcher(),
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
Some(&upd),
)
.await;
assert!(
matches!(result, Err(WriteError::RecordNotFound)),
"update on soft-deleted row must return RecordNotFound, got: {result:?}"
);
}
#[tokio::test]
async fn crud_delete_sets_deleted_at() {
let db = setup_db().await;
let create = create_plan(vec![("status", json!("draft"))]);
dispatch_write(
&crud_action("create_order"),
&json!({}),
1,
&db,
&allow_dispatcher(),
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
Some(&create),
)
.await
.expect("pre-insert must succeed");
let del = delete_plan(1);
let result = dispatch_write(
&crud_action("delete_order"),
&json!({"id": 1}),
1,
&db,
&allow_dispatcher(),
None,
"mcp",
#[cfg(feature = "confirmation")]
true,
Some(&del),
)
.await
.expect("confirmed delete must succeed");
assert_eq!(
result["deleted"],
json!(true),
"result must carry deleted:true: {result:?}"
);
let count: i64 = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT COUNT(*) AS c FROM orders WHERE id = 1".to_string(),
))
.await
.expect("count query")
.expect("count row")
.try_get::<i64>("", "c")
.expect("count column");
assert_eq!(
count, 1,
"row must still exist physically after soft-delete"
);
let deleted_at: Option<String> = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT deleted_at FROM orders WHERE id = 1".to_string(),
))
.await
.expect("deleted_at query")
.expect("deleted_at row")
.try_get::<Option<String>>("", "deleted_at")
.expect("deleted_at column");
assert!(
deleted_at.is_some(),
"deleted_at must be set after soft-delete"
);
}
#[tokio::test]
async fn crud_deleted_row_hidden_from_list() {
let db = setup_db().await;
let create = create_plan(vec![("status", json!("draft"))]);
dispatch_write(
&crud_action("create_order"),
&json!({}),
1,
&db,
&allow_dispatcher(),
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
Some(&create),
)
.await
.expect("pre-insert must succeed");
let del = delete_plan(1);
dispatch_write(
&crud_action("delete_order"),
&json!({"id": 1}),
1,
&db,
&allow_dispatcher(),
None,
"mcp",
#[cfg(feature = "confirmation")]
true,
Some(&del),
)
.await
.expect("confirmed delete must succeed");
let hidden_count: i64 = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT COUNT(*) AS c FROM orders WHERE id = 1 AND deleted_at IS NULL".to_string(),
))
.await
.expect("hidden count query")
.expect("hidden count row")
.try_get::<i64>("", "c")
.expect("hidden count column");
assert_eq!(
hidden_count, 0,
"soft-deleted row must be hidden by deleted_at IS NULL filter"
);
}
#[cfg(feature = "confirmation")]
#[tokio::test]
async fn crud_delete_requires_confirmation() {
let db = setup_db().await;
let create = create_plan(vec![("status", json!("draft"))]);
dispatch_write(
&crud_action("create_order"),
&json!({}),
1,
&db,
&allow_dispatcher(),
None,
"mcp",
false,
Some(&create),
)
.await
.expect("pre-insert must succeed");
let del = delete_plan(1);
let result = dispatch_write(
&crud_action("delete_order"),
&json!({"id": 1}),
1,
&db,
&allow_dispatcher(),
None,
"mcp",
false, Some(&del),
)
.await;
assert!(
matches!(result, Err(WriteError::ConfirmationRequired(_))),
"bare delete without confirmation must return ConfirmationRequired, got: {result:?}"
);
}
#[tokio::test]
async fn crud_override_replaces_generic() {
let db = setup_db().await;
let hook_fired = Arc::new(std::sync::atomic::AtomicBool::new(false));
let dispatcher = WriteDispatcher {
guard_evaluator: Box::new(|_, _, _, _| Box::pin(async { Ok(true) })),
executor: Box::new(|_, _, _, _| {
Box::pin(async { panic!("executor must NOT run on the CRUD path") })
}),
overrides: std::collections::HashMap::new(),
}
.with_override(
"create_order",
Box::new({
let flag = hook_fired.clone();
move |_action, _inputs, _tenant, _db, base_result: &Value| {
assert!(
base_result.get("id").is_some(),
"override receives the inserted record with id: {base_result:?}"
);
flag.store(true, std::sync::atomic::Ordering::SeqCst);
Box::pin(async { Ok(()) })
}
}),
);
let plan = create_plan(vec![("status", json!("draft"))]);
let result = dispatch_write(
&crud_action("create_order"),
&json!({}),
1,
&db,
&dispatcher,
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
Some(&plan),
)
.await
.expect("create with override must succeed");
assert!(
result.get("id").is_some(),
"result must have id: {result:?}"
);
assert!(
hook_fired.load(std::sync::atomic::Ordering::SeqCst),
"override hook must have fired"
);
let count: i64 = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT COUNT(*) AS c FROM orders".to_string(),
))
.await
.expect("count query")
.expect("count row")
.try_get::<i64>("", "c")
.expect("count column");
assert_eq!(
count, 1,
"generic insert must have run alongside the override hook"
);
}
#[tokio::test]
async fn crud_create_idempotent() {
let db = setup_db().await;
let plan = create_plan(vec![("status", json!("draft"))]);
let inputs = json!({ "idempotency_key": "idem-create-1" });
let result1 = dispatch_write(
&crud_action("create_order"),
&inputs,
1,
&db,
&allow_dispatcher(),
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
Some(&plan),
)
.await
.expect("first create must succeed");
let result2 = dispatch_write(
&crud_action("create_order"),
&inputs,
1,
&db,
&allow_dispatcher(),
None,
"mcp",
#[cfg(feature = "confirmation")]
false,
Some(&plan),
)
.await
.expect("second create (idempotent replay) must succeed");
assert_eq!(
result1, result2,
"idempotent replay must return the same result"
);
let count: i64 = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT COUNT(*) AS c FROM orders".to_string(),
))
.await
.expect("count query")
.expect("count row")
.try_get::<i64>("", "c")
.expect("count column");
assert_eq!(count, 1, "idempotent replay must not insert a second row");
}
async fn setup_tenant_db() -> sea_orm::DatabaseConnection {
let db = Database::connect("sqlite::memory:")
.await
.expect("in-memory SQLite connect failed");
db.execute(Statement::from_string(
DatabaseBackend::Sqlite,
"CREATE TABLE IF NOT EXISTS mcp_idempotency_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id INTEGER NOT NULL,
idempotency_key TEXT NOT NULL,
result TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE (tenant_id, idempotency_key)
)"
.to_string(),
))
.await
.expect("create mcp_idempotency_keys table");
db.execute(Statement::from_string(
DatabaseBackend::Sqlite,
"CREATE TABLE IF NOT EXISTS audit_log (
id TEXT PRIMARY KEY NOT NULL,
tenant_id TEXT,
actor_kind TEXT NOT NULL,
actor_id TEXT,
action TEXT NOT NULL,
target_kind TEXT,
target_id TEXT,
before TEXT,
after TEXT,
reason TEXT,
correlation_id TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)"
.to_string(),
))
.await
.expect("create audit_log table");
db.execute(Statement::from_string(
DatabaseBackend::Sqlite,
"CREATE TABLE IF NOT EXISTS tenanted_orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'draft',
amount TEXT,
created_at TEXT DEFAULT (datetime('now')),
deleted_at TEXT
)"
.to_string(),
))
.await
.expect("create tenanted_orders table");
db
}
fn tenanted_create_plan(columns: Vec<(&str, serde_json::Value)>) -> CrudPlan {
use ferro_projections::TenantColumn;
CrudPlan::Create {
table: "tenanted_orders".to_string(),
columns: columns
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect(),
tenant_column: Some(TenantColumn {
column: "tenant_id".to_string(),
}),
}
}
fn tenanted_update_plan(id: i64, patch: Vec<(&str, serde_json::Value)>) -> CrudPlan {
use ferro_projections::TenantColumn;
CrudPlan::Update {
table: "tenanted_orders".to_string(),
id_column: "id".to_string(),
id_value: serde_json::json!(id),
patch: patch.into_iter().map(|(k, v)| (k.to_string(), v)).collect(),
soft_delete_column: "deleted_at".to_string(),
tenant_column: Some(TenantColumn {
column: "tenant_id".to_string(),
}),
}
}
fn tenanted_delete_plan(id: i64) -> CrudPlan {
use ferro_projections::TenantColumn;
CrudPlan::Delete {
table: "tenanted_orders".to_string(),
id_column: "id".to_string(),
id_value: serde_json::json!(id),
soft_delete_column: "deleted_at".to_string(),
tenant_column: Some(TenantColumn {
column: "tenant_id".to_string(),
}),
}
}
async fn seed_tenanted_row(
db: &sea_orm::DatabaseConnection,
tenant_id: i64,
status: &str,
) -> i64 {
db.execute(Statement::from_sql_and_values(
DatabaseBackend::Sqlite,
"INSERT INTO tenanted_orders (tenant_id, status) VALUES (?, ?)",
vec![
sea_orm::Value::BigInt(Some(tenant_id)),
sea_orm::Value::String(Some(Box::new(status.to_string()))),
],
))
.await
.expect("seed tenanted row");
let id_row = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT last_insert_rowid() AS id".to_string(),
))
.await
.expect("last_insert_rowid query")
.expect("last_insert_rowid row");
id_row.try_get::<i64>("", "id").expect("last_insert_rowid")
}
#[tokio::test]
async fn crud_create_injects_tenant() {
let db = setup_tenant_db().await;
let plan = tenanted_create_plan(vec![("status", json!("draft"))]);
execute_crud_plan(&plan, 7, &db)
.await
.expect("tenant-aware create must succeed");
let tid: i64 = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
"SELECT tenant_id FROM tenanted_orders WHERE id = 1".to_string(),
))
.await
.expect("select query")
.expect("row must exist")
.try_get::<i64>("", "tenant_id")
.expect("tenant_id column");
assert_eq!(
tid, 7,
"CREATE must inject tenant_id=7 into the row, got: {tid}"
);
}
#[tokio::test]
async fn crud_update_tenant_predicate() {
let db = setup_tenant_db().await;
let row_id = seed_tenanted_row(&db, 7, "draft").await;
let plan = tenanted_update_plan(row_id, vec![("status", json!("approved"))]);
let result = execute_crud_plan(&plan, 7, &db)
.await
.expect("same-tenant update must succeed");
assert_eq!(
result["status"],
json!("approved"),
"status must be updated for same-tenant row: {result:?}"
);
}
#[tokio::test]
async fn crud_delete_tenant_predicate() {
let db = setup_tenant_db().await;
let row_id = seed_tenanted_row(&db, 7, "draft").await;
let plan = tenanted_delete_plan(row_id);
let result = execute_crud_plan(&plan, 7, &db)
.await
.expect("same-tenant delete must succeed");
assert_eq!(
result["deleted"],
json!(true),
"same-tenant delete must return deleted:true: {result:?}"
);
let deleted_at: Option<String> = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
format!("SELECT deleted_at FROM tenanted_orders WHERE id = {row_id}"),
))
.await
.expect("deleted_at query")
.expect("row")
.try_get::<Option<String>>("", "deleted_at")
.expect("deleted_at column");
assert!(
deleted_at.is_some(),
"deleted_at must be set after same-tenant delete"
);
}
#[tokio::test]
async fn crud_cross_tenant_update_not_found() {
let db = setup_tenant_db().await;
let row_id = seed_tenanted_row(&db, 2, "original").await;
let plan = tenanted_update_plan(row_id, vec![("status", json!("tampered"))]);
let result = execute_crud_plan(&plan, 7, &db).await;
assert!(
matches!(result, Err(WriteError::RecordNotFound)),
"cross-tenant update must return RecordNotFound, got: {result:?}"
);
let status: String = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
format!("SELECT status FROM tenanted_orders WHERE id = {row_id}"),
))
.await
.expect("status query")
.expect("row must still exist")
.try_get::<String>("", "status")
.expect("status column");
assert_eq!(
status, "original",
"cross-tenant update must leave the row completely unchanged; got status: {status}"
);
}
#[tokio::test]
async fn crud_cross_tenant_delete_not_found() {
let db = setup_tenant_db().await;
let row_id = seed_tenanted_row(&db, 2, "draft").await;
let plan = tenanted_delete_plan(row_id);
let result = execute_crud_plan(&plan, 7, &db).await;
assert!(
matches!(result, Err(WriteError::RecordNotFound)),
"cross-tenant delete must return RecordNotFound, got: {result:?}"
);
let deleted_at: Option<String> = db
.query_one(Statement::from_string(
DatabaseBackend::Sqlite,
format!("SELECT deleted_at FROM tenanted_orders WHERE id = {row_id}"),
))
.await
.expect("deleted_at query")
.expect("row must still exist")
.try_get::<Option<String>>("", "deleted_at")
.expect("deleted_at column");
assert!(
deleted_at.is_none(),
"cross-tenant delete must leave deleted_at NULL; row is untouched"
);
}
}