use crate::core::error;
use crate::core::pool;
use crate::core::time;
use crate::plugins::policy;
use rusqlite::Connection;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::env;
use std::path::{Path, PathBuf};
use std::sync::mpsc::Sender;
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
use ulid::Ulid;
#[allow(dead_code)]
pub struct DbBroker {
audit_log_path: PathBuf,
write_queue: Option<Sender<WriteRequest>>,
}
#[derive(Clone)]
struct CacheEntry {
value: JsonValue,
expires_at: Instant,
}
#[allow(dead_code)]
struct WriteRequest {
db_path: PathBuf,
sql: String,
params: Vec<Box<dyn rusqlite::ToSql + Send>>,
result_tx: std::sync::mpsc::Sender<Result<u64, error::DecapodError>>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BrokerEvent {
#[serde(default = "default_broker_schema_version")]
pub schema_version: String,
#[serde(default)]
pub request_id: String,
pub ts: String,
pub event_id: String,
pub actor: String,
#[serde(default)]
pub actor_id: String,
#[serde(default)]
pub session_id: Option<String>,
#[serde(default)]
pub correlation_id: Option<String>,
#[serde(default)]
pub causation_id: Option<String>,
#[serde(default)]
pub idempotency_key: Option<String>,
pub intent_ref: Option<String>,
pub op: String,
pub db_id: String,
pub status: String,
}
impl DbBroker {
pub fn new(root: &Path) -> Self {
Self {
audit_log_path: root.join("broker.events.jsonl"),
write_queue: None, }
}
pub fn execute_write_sync(
&self,
db_path: &Path,
sql: &str,
params: &[(&str, i64)],
) -> Result<u64, error::DecapodError> {
let audit_path = self.audit_log_path.clone();
let sql = sql.to_string();
let params: Vec<i64> = params.iter().map(|(_, v)| *v).collect();
let db_path_owned = db_path.to_path_buf();
pool::global_pool().with_write(db_path, |conn| {
let mut stmt = conn.prepare(&sql)?;
let param_vec: Vec<Box<dyn rusqlite::ToSql>> = params
.iter()
.map(|v| Box::new(*v) as Box<dyn rusqlite::ToSql>)
.collect();
let params_refs: Vec<&dyn rusqlite::ToSql> =
param_vec.iter().map(|p| p.as_ref()).collect();
stmt.execute(params_refs.as_slice())?;
let rowid = conn.last_insert_rowid();
log_write_event(&audit_path, "queued_write", &db_path_owned)?;
Ok(rowid as u64)
})
}
pub fn with_conn<F, R>(
&self,
db_path: &Path,
actor: &str,
intent_ref: Option<&str>,
op_name: &str,
f: F,
) -> Result<R, error::DecapodError>
where
F: FnOnce(&Connection) -> Result<R, error::DecapodError>,
{
let is_read = policy::is_read_only_operation(op_name) && !op_name.ends_with(".init"); let effective_intent = if let Some(i) = intent_ref {
Some(i.to_string())
} else if !is_read {
Some(format!("intent:auto:{}:{}", op_name, Ulid::new()))
} else {
None
};
if !is_read {
let store_root = self
.audit_log_path
.parent()
.ok_or_else(|| error::DecapodError::PathError("invalid broker root".to_string()))?;
policy::enforce_broker_mutation_policy(store_root, actor, op_name)?;
}
let db_id = db_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
if is_read {
let result = pool::global_pool().with_read(db_path, f);
let status = if result.is_ok() { "success" } else { "error" };
self.log_event(actor, effective_intent.as_deref(), op_name, &db_id, status)?;
result
} else {
self.log_event(
actor,
effective_intent.as_deref(),
op_name,
&db_id,
"pending",
)?;
let result = pool::global_pool().with_write(db_path, f);
let status = if result.is_ok() { "success" } else { "error" };
self.log_event(actor, effective_intent.as_deref(), op_name, &db_id, status)?;
result
}
}
fn log_event(
&self,
actor: &str,
intent_ref: Option<&str>,
op: &str,
db_id: &str,
status: &str,
) -> Result<(), error::DecapodError> {
use std::fs::OpenOptions;
use std::io::Write;
let ts = time::now_epoch_z();
let request_id = time::new_event_id();
let event_id = time::new_event_id();
let session_id = env::var("DECAPOD_SESSION_ID").ok();
let correlation_id = env::var("DECAPOD_CORRELATION_ID")
.ok()
.or_else(|| intent_ref.map(|s| s.to_string()));
let causation_id = env::var("DECAPOD_CAUSATION_ID").ok();
let idempotency_key = env::var("DECAPOD_IDEMPOTENCY_KEY").ok();
let ev = BrokerEvent {
schema_version: default_broker_schema_version(),
request_id,
ts,
event_id,
actor: actor.to_string(),
actor_id: actor.to_string(),
session_id,
correlation_id,
causation_id,
idempotency_key,
intent_ref: intent_ref.map(|s| s.to_string()),
op: op.to_string(),
db_id: db_id.to_string(),
status: status.to_string(),
};
let audit_lock = get_audit_lock();
let _audit_guard = audit_lock
.lock()
.map_err(|_| error::DecapodError::ValidationError("Audit lock poisoned".into()))?;
let mut f = OpenOptions::new()
.create(true)
.append(true)
.open(&self.audit_log_path)
.map_err(error::DecapodError::IoError)?;
let mut line = serde_json::to_string(&ev).unwrap();
line.push('\n');
f.write_all(line.as_bytes())
.map_err(error::DecapodError::IoError)?;
Ok(())
}
fn cache_compound_key(db_path: &Path, scope: &str, key: &str) -> String {
format!("{}::{}::{}", db_path.to_string_lossy(), scope, key)
}
pub fn cache_get_json(db_path: &Path, scope: &str, key: &str) -> Option<JsonValue> {
let compound = Self::cache_compound_key(db_path, scope, key);
let cache = broker_read_cache();
let mut map = cache.lock().ok()?;
if let Some(entry) = map.get(&compound) {
if entry.expires_at > Instant::now() {
return Some(entry.value.clone());
}
}
map.remove(&compound);
None
}
pub fn cache_put_json(
db_path: &Path,
scope: &str,
key: &str,
value: JsonValue,
ttl_secs: u64,
) -> Result<(), error::DecapodError> {
let compound = Self::cache_compound_key(db_path, scope, key);
let expires_at = Instant::now()
.checked_add(Duration::from_secs(ttl_secs.max(1)))
.unwrap_or_else(Instant::now);
let mut map = broker_read_cache().lock().map_err(|_| {
error::DecapodError::ValidationError("broker read cache lock poisoned".to_string())
})?;
map.insert(compound, CacheEntry { value, expires_at });
Ok(())
}
pub fn cache_invalidate_scope(db_path: &Path, scope: &str) -> Result<(), error::DecapodError> {
let prefix = format!("{}::{}::", db_path.to_string_lossy(), scope);
let mut map = broker_read_cache().lock().map_err(|_| {
error::DecapodError::ValidationError("broker read cache lock poisoned".to_string())
})?;
map.retain(|k, _| !k.starts_with(&prefix));
Ok(())
}
pub fn cache_invalidate_key(
db_path: &Path,
scope: &str,
key: &str,
) -> Result<(), error::DecapodError> {
let compound = Self::cache_compound_key(db_path, scope, key);
let mut map = broker_read_cache().lock().map_err(|_| {
error::DecapodError::ValidationError("broker read cache lock poisoned".to_string())
})?;
map.remove(&compound);
Ok(())
}
pub fn verify_replay(&self) -> Result<ReplayReport, error::DecapodError> {
use std::io::BufRead;
if !self.audit_log_path.exists() {
return Ok(ReplayReport {
ts: time::now_epoch_z(),
divergences: vec![],
total_events: 0,
});
}
let f = std::fs::File::open(&self.audit_log_path).map_err(error::DecapodError::IoError)?;
let reader = std::io::BufReader::new(f);
let mut pending_map = HashMap::new();
let mut total_events = 0usize;
for line in reader.lines() {
let line = line.map_err(error::DecapodError::IoError)?;
let ev: BrokerEvent = serde_json::from_str(&line).map_err(|e| {
error::DecapodError::ValidationError(format!("Invalid audit log entry: {}", e))
})?;
total_events += 1;
match ev.status.as_str() {
"pending" => {
pending_map.insert(ev.event_id.clone(), ev);
}
"success" | "error" => {
pending_map.retain(|_, v| {
let intent_match = match (&v.intent_ref, &ev.intent_ref) {
(Some(a), Some(b)) => a == b,
(None, None) => true,
_ => false,
};
!(intent_match && v.op == ev.op && v.db_id == ev.db_id)
});
}
_ => {}
}
}
let divergences = pending_map
.into_values()
.map(|ev| Divergence {
event_id: ev.event_id,
op: ev.op,
db_id: ev.db_id,
ts: ev.ts,
intent_ref: ev.intent_ref,
reason: "Pending event without terminal status (potential crash)".to_string(),
})
.collect();
Ok(ReplayReport {
ts: time::now_epoch_z(),
divergences,
total_events,
})
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ReplayReport {
pub ts: String,
pub divergences: Vec<Divergence>,
pub total_events: usize,
}
fn log_write_event(audit_path: &Path, op: &str, db_path: &Path) -> Result<(), error::DecapodError> {
use std::fs::OpenOptions;
use std::io::Write;
let event = serde_json::json!({
"op": op,
"db": db_path.file_name().map(|s| s.to_string_lossy().to_string()),
"ts": time::now_epoch_z(),
});
if let Ok(mut file) = OpenOptions::new()
.create(true)
.append(true)
.open(audit_path)
{
let _ = writeln!(file, "{}", event);
}
Ok(())
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Divergence {
pub event_id: String,
pub op: String,
pub db_id: String,
pub ts: String,
pub intent_ref: Option<String>,
pub reason: String,
}
fn get_audit_lock() -> &'static Mutex<()> {
static AUDIT_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
AUDIT_LOCK.get_or_init(|| Mutex::new(()))
}
fn broker_read_cache() -> &'static Mutex<HashMap<String, CacheEntry>> {
static READ_CACHE: OnceLock<Mutex<HashMap<String, CacheEntry>>> = OnceLock::new();
READ_CACHE.get_or_init(|| Mutex::new(HashMap::new()))
}
pub fn schema() -> serde_json::Value {
serde_json::json!({
"name": "broker",
"version": "0.1.0",
"description": "State mutation broker (The Thin Waist)",
"commands": [
{ "name": "audit", "description": "Show the mutation audit log" }
],
"envelope": {
"schema_version": "1.0.0",
"fields": [
"schema_version",
"request_id",
"event_id",
"ts",
"actor",
"actor_id",
"session_id",
"correlation_id",
"causation_id",
"idempotency_key",
"intent_ref",
"op",
"db_id",
"status"
]
},
"storage": ["broker.events.jsonl"]
})
}
fn default_broker_schema_version() -> String {
"1.0.0".to_string()
}