use car_eventlog::{EventKind, EventLog};
use car_verify::concurrency::{
analyze as analyze_concurrency, gate_concurrency, AgentOp, ConcurrencyGate,
ConcurrencyGatePolicy, ConcurrencyReport, Disposition, Remediation,
};
use serde_json::Value;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex as TokioMutex;
#[derive(Clone)]
pub struct ConcurrencyControl {
policy: ConcurrencyGatePolicy,
clock: Arc<AtomicU64>,
}
impl ConcurrencyControl {
pub fn new(policy: ConcurrencyGatePolicy) -> Self {
Self {
policy,
clock: Arc::new(AtomicU64::new(0)),
}
}
pub fn with_default_policy() -> Self {
Self::new(ConcurrencyGatePolicy::default())
}
pub fn policy(&self) -> &ConcurrencyGatePolicy {
&self.policy
}
pub fn tick(&self) -> u64 {
self.clock.fetch_add(1, Ordering::SeqCst)
}
pub async fn guard(
&self,
ops: &[AgentOp],
log: &Arc<TokioMutex<EventLog>>,
) -> ConcurrencyGuard {
let report = analyze_concurrency(ops);
let gate = gate_concurrency(&report, &self.policy);
let guard = ConcurrencyGuard::from_gate(report, gate);
guard.audit(log).await;
guard
}
}
#[derive(Debug, Clone)]
pub struct ConcurrencyGuard {
pub report: ConcurrencyReport,
pub gate: ConcurrencyGate,
pub abort: bool,
pub rejected_ops: HashSet<String>,
}
impl ConcurrencyGuard {
fn from_gate(report: ConcurrencyReport, gate: ConcurrencyGate) -> Self {
let mut abort = false;
let mut rejected_ops = HashSet::new();
for r in &gate.remediations {
match r.disposition {
Disposition::Abort => abort = true,
Disposition::RequireApproval => {
if let Some(op) = remediation_primary_op(&r.remediation) {
rejected_ops.insert(op);
}
}
Disposition::AutoRemediate => {}
}
}
Self {
report,
gate,
abort,
rejected_ops,
}
}
pub fn is_clean(&self) -> bool {
self.gate.safe
}
pub fn may_commit(&self, op_id: &str) -> bool {
!self.abort && !self.rejected_ops.contains(op_id)
}
pub fn rejection_reason(&self, op_id: &str) -> Option<String> {
if self.abort {
return Some(format!(
"concurrency gate aborted the batch at consistency level {:?}: {}",
self.report.level,
self.anomaly_summary()
));
}
if self.rejected_ops.contains(op_id) {
return Some(format!(
"concurrency gate rejected this commit (lost-update/stale generation) — {}",
self.anomaly_summary()
));
}
None
}
pub fn anomaly_summary(&self) -> String {
if self.report.anomalies.is_empty() {
return "no anomalies".to_string();
}
self.report
.anomalies
.iter()
.map(|a| a.explanation.clone())
.collect::<Vec<_>>()
.join("; ")
}
fn decision_label(&self) -> &'static str {
if self.abort {
"reject"
} else if !self.rejected_ops.is_empty() {
"needs_approval"
} else if self.gate.remediations.is_empty() {
"allow"
} else {
"allow"
}
}
async fn audit(&self, log: &Arc<TokioMutex<EventLog>>) {
let mut data: HashMap<String, Value> = HashMap::new();
data.insert("gate".to_string(), Value::from("concurrency"));
data.insert("phase".to_string(), Value::from("commit_barrier"));
data.insert("decision".to_string(), Value::from(self.decision_label()));
data.insert(
"level".to_string(),
serde_json::to_value(self.report.level).unwrap_or(Value::Null),
);
data.insert("abort".to_string(), Value::from(self.abort));
if !self.report.anomalies.is_empty() {
data.insert("reason".to_string(), Value::from(self.anomaly_summary()));
data.insert(
"anomalies".to_string(),
serde_json::to_value(&self.report.anomalies).unwrap_or(Value::Null),
);
}
if !self.rejected_ops.is_empty() {
let mut blocked: Vec<String> = self.rejected_ops.iter().cloned().collect();
blocked.sort();
data.insert(
"blocked".to_string(),
serde_json::to_value(blocked).unwrap_or(Value::Null),
);
}
if !self.gate.remediations.is_empty() {
data.insert(
"remediations".to_string(),
serde_json::to_value(&self.gate.remediations).unwrap_or(Value::Null),
);
}
let mut log = log.lock().await;
log.append(EventKind::AdmissionGateDecision, None, None, data);
}
}
fn remediation_primary_op(r: &Remediation) -> Option<String> {
match r {
Remediation::RereadAndRegenerate { op, .. } => Some(op.clone()),
Remediation::PinToolRegistry { op, .. } => Some(op.clone()),
Remediation::EnforceCausalOrder { dependent, .. } => Some(dependent.clone()),
Remediation::SerializeWriters { ops, .. } => ops.first().cloned(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use car_eventlog::EventLog;
fn op(id: &str, read_at: u64, commit_at: u64) -> AgentOp {
AgentOp {
id: id.to_string(),
read_at,
commit_at,
..Default::default()
}
}
fn log() -> Arc<TokioMutex<EventLog>> {
Arc::new(TokioMutex::new(EventLog::new()))
}
#[tokio::test]
async fn clean_schedule_permits_all_commits() {
let ctrl = ConcurrencyControl::with_default_policy();
let mut a = op("a", 0, 1);
a.write_set = vec!["x".into()];
let mut b = op("b", 2, 3);
b.write_set = vec!["y".into()];
let g = ctrl.guard(&[a, b], &log()).await;
assert!(g.is_clean());
assert!(!g.abort);
assert!(g.may_commit("a") && g.may_commit("b"));
}
#[tokio::test]
async fn causal_cascade_aborts_whole_batch() {
let ctrl = ConcurrencyControl::with_default_policy();
let mut c = op("c", 0, 5);
c.write_set = vec!["k".into()];
let mut d = op("d", 0, 1);
d.depends_on = vec!["c".into()];
let g = ctrl.guard(&[c, d], &log()).await;
assert!(g.abort);
assert!(!g.may_commit("c"));
assert!(!g.may_commit("d"));
assert!(g.rejection_reason("c").is_some());
}
#[tokio::test]
async fn stale_generation_rejects_offending_op_only() {
let ctrl = ConcurrencyControl::with_default_policy();
let mut a = op("a", 0, 2);
a.read_set = vec!["k".into()];
a.write_set = vec!["k".into()];
let mut b = op("b", 1, 1);
b.write_set = vec!["k".into()];
let g = ctrl.guard(&[a, b], &log()).await;
assert!(!g.abort, "stale generation must not abort the whole batch");
assert!(!g.may_commit("a"), "the stale writer is held back");
assert!(g.may_commit("b"), "the other writer still commits");
assert!(g.rejection_reason("a").is_some());
}
#[tokio::test]
async fn reorder_auto_remediates_all_commit() {
let ctrl = ConcurrencyControl::with_default_policy();
let mut a = op("a", 0, 3);
a.write_set = vec!["k".into()];
let mut b = op("b", 1, 4);
b.write_set = vec!["k".into()];
let g = ctrl.guard(&[a, b], &log()).await;
assert!(!g.abort);
assert!(g.rejected_ops.is_empty());
assert!(g.may_commit("a") && g.may_commit("b"));
assert!(!g.is_clean(), "an anomaly was still detected and audited");
}
#[tokio::test]
async fn guard_emits_admission_event() {
let ctrl = ConcurrencyControl::with_default_policy();
let l = log();
let mut a = op("a", 0, 3);
a.write_set = vec!["k".into()];
let mut b = op("b", 1, 4);
b.write_set = vec!["k".into()];
ctrl.guard(&[a, b], &l).await;
let guard = l.lock().await;
let events = guard.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, EventKind::AdmissionGateDecision);
assert_eq!(
events[0].data.get("gate").and_then(|v| v.as_str()),
Some("concurrency")
);
}
}