use std::collections::HashMap;
use std::sync::Arc;
use car_connectors::ConnectorStatus;
use car_eventlog::{Event, EventKind, EventLog, RetentionPolicy};
use car_memgine::maintenance::{decide_maintenance, MaintenanceDecision, MaintenanceInput};
use car_memgine::self_evolution::{
run_evolution_cycle, ComponentState, EvolutionCycleReport, EvolutionOutcome, EvolutionPolicy,
EvolutionSignals, EvolvableComponent,
};
use car_memgine::{MemgineEngine, TraceEvent};
use serde_json::Value;
use crate::session::ServerState;
pub fn harness_component_from_events(events: &[Event]) -> Option<ComponentState> {
if events.is_empty() {
return None;
}
let report = car_eventlog::harness_adapt::diagnose(events, 2);
let implicated: usize = report.interventions.iter().map(|i| i.evidence_count).sum();
let pressure = (implicated as f64 / events.len() as f64).min(1.0);
Some(ComponentState {
component: EvolvableComponent::Harness,
signals: EvolutionSignals {
pressure,
evidence: events.len() as u64,
min_evidence: 20,
cost: 2.0,
},
})
}
pub fn harness_component_from_metrics(
m: &car_eventlog::harness_metrics::HarnessMetrics,
) -> Option<ComponentState> {
let eff = &m.trajectory_efficiency;
let attempts = eff.actions_succeeded + eff.failed_attempts;
if attempts == 0 {
return None;
}
Some(ComponentState {
component: EvolvableComponent::Harness,
signals: EvolutionSignals {
pressure: (eff.failed_attempts as f64 / attempts as f64).clamp(0.0, 1.0),
evidence: attempts as u64,
min_evidence: 20,
cost: 2.0,
},
})
}
pub fn tools_component_from_connectors(connectors: &[ConnectorStatus]) -> Option<ComponentState> {
if connectors.is_empty() {
return None;
}
let unhealthy = connectors.iter().filter(|c| !c.connected).count();
Some(ComponentState {
component: EvolvableComponent::Tools,
signals: EvolutionSignals {
pressure: (unhealthy as f64 / connectors.len() as f64).clamp(0.0, 1.0),
evidence: connectors.len() as u64,
min_evidence: 1,
cost: 1.0,
},
})
}
pub fn failed_trace_events(events: &[Event]) -> Vec<TraceEvent> {
events
.iter()
.filter(|ev| {
matches!(
ev.kind,
EventKind::ActionFailed
| EventKind::ActionRejected
| EventKind::PolicyViolation
| EventKind::ReplanExhausted
)
})
.map(|ev| TraceEvent {
kind: serde_json::to_value(&ev.kind)
.ok()
.and_then(|v| v.as_str().map(str::to_string))
.unwrap_or_default(),
action_id: ev.action_id.clone(),
tool: ev
.data
.get("tool")
.and_then(|v| v.as_str())
.map(str::to_string),
data: Value::Object(ev.data.clone().into_iter().collect()),
duration_ms: None,
state_before: None,
state_after: None,
reward: Some(0.0),
})
.collect()
}
pub fn maintenance_input_from_stats(stats: &car_memgine::memsys::MemoryStats) -> MaintenanceInput {
let total = stats.total_facts;
MaintenanceInput {
dirty_regions: stats.outstanding_outdated + stats.facts_superseded,
total_regions: total,
localized_cost_per_region: 1.0,
global_cost_per_region: 1.0,
global_structural_gain: if total > 0 {
(stats.facts_superseded as f64 / total as f64).clamp(0.0, 1.0)
} else {
0.0
},
gain_value: total as f64,
}
}
pub async fn run_memory_evolution(
engine: &Arc<tokio::sync::Mutex<MemgineEngine>>,
dry_run: bool,
) -> Result<EvolutionOutcome, String> {
let mut eng = engine.lock().await;
let decision: MaintenanceDecision =
decide_maintenance(&maintenance_input_from_stats(&eng.memory_stats()));
if dry_run {
return Ok(EvolutionOutcome::no_op(format!(
"dry_run: would consolidate (maintenance: {:?} — {})",
decision.strategy, decision.rationale
)));
}
let report = eng.consolidate().await;
let summary = serde_json::to_string(&serde_json::json!({
"mechanism": "consolidate",
"maintenance": decision,
"expired_pruned": report.expired_pruned,
"superseded_gc": report.superseded_gc,
"turns_compacted": report.turns_compacted,
"domains_evolved": report.domains_evolved,
"total_nodes": report.total_nodes,
}))
.map_err(|e| e.to_string())?;
Ok(EvolutionOutcome::applied(summary))
}
pub async fn run_skills_evolution(
engine: &Arc<tokio::sync::Mutex<MemgineEngine>>,
failed_events: &[TraceEvent],
dry_run: bool,
) -> Result<EvolutionOutcome, String> {
let mut eng = engine.lock().await;
if !eng.has_inference() {
return Err("no inference engine".to_string());
}
let domains = eng.domains_needing_evolution(0.6);
if domains.is_empty() {
return Ok(EvolutionOutcome::no_op(
"no domain below the evolution threshold (success < 0.6 over ≥3 outcomes) — nothing to evolve",
));
}
if dry_run {
return Ok(EvolutionOutcome::no_op(format!(
"dry_run: would evolve domain(s) {:?} over {} failure trace(s)",
domains,
failed_events.len()
)));
}
let mut evolved = 0usize;
for domain in &domains {
evolved += eng.evolve_skills(failed_events, domain).await.len();
}
let summary = format!(
"evolved {} skill(s) across domain(s) {:?} over {} failure trace(s)",
evolved,
domains,
failed_events.len()
);
Ok(if evolved > 0 {
EvolutionOutcome::applied(summary)
} else {
EvolutionOutcome::no_op(summary)
})
}
#[derive(Debug, Default)]
pub struct SkillsBackoff {
map: HashMap<String, DomainAttempts>,
}
#[derive(Debug)]
struct DomainAttempts {
attempts: u32,
next_tick: u64,
}
const BACKOFF_MAX_EXPONENT: u32 = 6;
impl SkillsBackoff {
pub fn due(&self, flagged: &[String], tick: u64) -> Vec<String> {
flagged
.iter()
.filter(|d| self.map.get(*d).map(|a| tick >= a.next_tick).unwrap_or(true))
.cloned()
.collect()
}
pub fn note_attempt(&mut self, domain: &str, tick: u64) {
let entry = self.map.entry(domain.to_string()).or_insert(DomainAttempts {
attempts: 0,
next_tick: tick,
});
entry.attempts = (entry.attempts + 1).min(BACKOFF_MAX_EXPONENT);
entry.next_tick = tick + (1u64 << entry.attempts);
}
pub fn reset_recovered(&mut self, flagged: &[String]) {
self.map.retain(|d, _| flagged.iter().any(|f| f == d));
}
}
pub async fn run_skills_evolution_backoff(
engine: &Arc<tokio::sync::Mutex<MemgineEngine>>,
backoff: &std::sync::Mutex<SkillsBackoff>,
tick: u64,
) -> Result<EvolutionOutcome, String> {
let mut eng = engine.lock().await;
if !eng.has_inference() {
return Err("no inference engine".to_string());
}
let flagged = eng.domains_needing_evolution(0.6);
let due = {
let mut b = backoff.lock().unwrap();
b.reset_recovered(&flagged);
b.due(&flagged, tick)
};
if flagged.is_empty() {
return Ok(EvolutionOutcome::no_op(
"no domain below the evolution threshold — nothing to evolve",
));
}
if due.is_empty() {
return Ok(EvolutionOutcome::no_op(format!(
"all {} flagged domain(s) in backoff — no attempt this tick",
flagged.len()
)));
}
let mut evolved = 0usize;
for domain in &due {
evolved += eng.evolve_skills(&[], domain).await.len();
backoff.lock().unwrap().note_attempt(domain, tick);
}
let summary = format!(
"evolved {} skill(s) across due domain(s) {:?} ({} flagged total)",
evolved,
due,
flagged.len()
);
Ok(if evolved > 0 {
EvolutionOutcome::applied(summary)
} else {
EvolutionOutcome::no_op(summary)
})
}
#[derive(Debug, Default)]
pub struct CycleGuard {
running: std::sync::atomic::AtomicBool,
}
pub struct CycleToken<'a> {
guard: &'a CycleGuard,
}
impl CycleGuard {
pub fn try_begin(&self) -> Option<CycleToken<'_>> {
self.running
.compare_exchange(
false,
true,
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::SeqCst,
)
.ok()
.map(|_| CycleToken { guard: self })
}
}
impl Drop for CycleToken<'_> {
fn drop(&mut self) {
self.guard
.running
.store(false, std::sync::atomic::Ordering::SeqCst);
}
}
pub fn seed_evolution_interval() -> Option<u64> {
let anchor = std::env::var_os("CAR_PROJECT_DIR")
.map(std::path::PathBuf::from)
.or_else(|| std::env::current_dir().ok())?;
let car_dir = car_memgine::project::discover_project(&anchor)?;
car_memgine::project::load_config_overrides(&car_dir)?
.evolution_interval_secs
.filter(|s| *s > 0)
}
pub async fn run_evolution_cadence_cycle(
state: &Arc<ServerState>,
backoff: &std::sync::Mutex<SkillsBackoff>,
tick: u64,
) -> Option<EvolutionCycleReport> {
let engine = state.shared_memgine.as_ref()?.clone();
let mut components = { engine.lock().await.evolution_component_states() };
state.ensure_connectors_loaded().await;
let connector_list = state.connectors().list().await;
if let Some(t) = tools_component_from_connectors(&connector_list) {
components.push(t);
}
let policy = EvolutionPolicy::default();
let report = run_evolution_cycle(&components, &policy, |c| {
let engine = engine.clone();
async move {
match c {
EvolvableComponent::Memory => run_memory_evolution(&engine, false).await,
EvolvableComponent::Skills => {
run_skills_evolution_backoff(&engine, backoff, tick).await
}
EvolvableComponent::Harness => Err(
"not_executable: harness telemetry and the HITL apply path are per-session — \
drive harness evolution via evolution.run on a session"
.to_string(),
),
EvolvableComponent::Context => Err(
"not_executable: context assembly has no autonomous mutation mechanism yet — \
compaction/eviction tuning is an operator config change (.car/config.toml)"
.to_string(),
),
EvolvableComponent::Tools => Err(
"not_executable: connector health has no autonomous remediation — \
reconnect/re-auth via connectors.* is an operator action"
.to_string(),
),
}
}
})
.await;
Some(report)
}
const EVOLUTION_LOG_MAX_EVENTS: usize = 1000;
pub fn spawn_evolution_cadence(
state: Arc<ServerState>,
interval_secs: u64,
) -> Option<tokio::task::JoinHandle<()>> {
if state.shared_memgine.is_none() {
tracing::warn!(
"evolution_interval_secs set but the daemon has no shared engine; cadence not started"
);
return None;
}
let journal = state.journal_dir.join("evolution.jsonl");
Some(tokio::spawn(async move {
let mut log = EventLog::with_journal(journal);
log.set_retention(Some(RetentionPolicy {
max_events: Some(EVOLUTION_LOG_MAX_EVENTS),
max_age_secs: None,
}));
let guard = Arc::new(CycleGuard::default());
let backoff = Arc::new(std::sync::Mutex::new(SkillsBackoff::default()));
let mut tick_no: u64 = 0;
let mut tick =
tokio::time::interval(std::time::Duration::from_secs(interval_secs.max(1)));
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
tick.tick().await;
loop {
tick.tick().await;
tick_no += 1;
let Some(_token) = guard.try_begin() else {
tracing::warn!("evolution cadence tick skipped: previous cycle still running");
continue;
};
let cycle_state = state.clone();
let cycle_backoff = backoff.clone();
let outcome = tokio::spawn(async move {
run_evolution_cadence_cycle(&cycle_state, &cycle_backoff, tick_no).await
})
.await;
let mut data: HashMap<String, Value> = HashMap::new();
data.insert("source".into(), Value::from("cadence"));
match outcome {
Ok(Some(report)) => {
if report.plan.evolve_now.is_empty() && report.steps.is_empty() {
continue;
}
data.insert(
"report".into(),
serde_json::to_value(&report).unwrap_or(Value::Null),
);
}
Ok(None) => {
data.insert("error".into(), Value::from("no shared engine"));
}
Err(e) => {
data.insert("error".into(), Value::from(format!("cycle panicked: {e}")));
}
}
log.append(EventKind::EvolutionTriggered, None, None, data);
}
}))
}
#[cfg(test)]
mod tests {
use super::*;
fn ev(kind: EventKind, action: Option<&str>) -> Event {
Event {
kind,
action_id: action.map(str::to_string),
proposal_id: None,
data: HashMap::new(),
timestamp: chrono::Utc::now(),
prev_hash: None,
hash: None,
}
}
fn connector(slug: &str, connected: bool) -> ConnectorStatus {
ConnectorStatus {
slug: slug.into(),
name: slug.into(),
url: format!("https://example.com/{slug}"),
connected,
tool_count: 1,
enabled_count: 1,
last_error: if connected { None } else { Some("dial failed".into()) },
}
}
#[test]
fn harness_pressure_counts_recurring_failure_share() {
let mut events = vec![
ev(EventKind::ActionRejected, Some("a1")),
ev(EventKind::ActionRejected, Some("a1")),
ev(EventKind::ActionRejected, Some("a1")),
ev(EventKind::ActionRejected, Some("a1")),
];
for _ in 0..4 {
events.push(ev(EventKind::ActionSucceeded, Some("ok")));
}
let c = harness_component_from_events(&events).expect("component");
assert_eq!(c.component, EvolvableComponent::Harness);
assert!((c.signals.pressure - 0.5).abs() < 1e-9, "{c:?}");
assert_eq!(c.signals.evidence, 8);
}
#[test]
fn harness_one_off_failures_are_zero_pressure() {
let events = vec![
ev(EventKind::ActionRejected, Some("a1")),
ev(EventKind::ActionFailed, Some("a2")),
ev(EventKind::ActionSucceeded, Some("a3")),
];
let c = harness_component_from_events(&events).unwrap();
assert_eq!(c.signals.pressure, 0.0);
assert_eq!(c.signals.evidence, 3);
}
#[test]
fn harness_empty_log_is_absent_not_zero() {
assert!(harness_component_from_events(&[]).is_none());
}
#[test]
fn tools_pressure_is_disconnected_share() {
let list = vec![
connector("up", true),
connector("down1", false),
connector("down2", false),
connector("up2", true),
];
let c = tools_component_from_connectors(&list).expect("component");
assert_eq!(c.component, EvolvableComponent::Tools);
assert!((c.signals.pressure - 0.5).abs() < 1e-9, "{c:?}");
assert_eq!(c.signals.evidence, 4);
}
#[test]
fn tools_absent_when_no_connectors_configured() {
assert!(tools_component_from_connectors(&[]).is_none());
}
#[test]
fn failed_trace_events_fold_failure_kinds_only() {
let mut failed = ev(EventKind::ActionFailed, Some("a1"));
failed
.data
.insert("tool".into(), Value::from("http_get"));
failed
.data
.insert("error".into(), Value::from("timeout"));
let events = vec![
failed,
ev(EventKind::ActionSucceeded, Some("a2")),
ev(EventKind::PolicyViolation, Some("a3")),
ev(EventKind::ReplanExhausted, None),
];
let traces = failed_trace_events(&events);
assert_eq!(traces.len(), 3);
assert_eq!(traces[0].kind, "action_failed");
assert_eq!(traces[0].tool.as_deref(), Some("http_get"));
assert_eq!(traces[0].action_id.as_deref(), Some("a1"));
assert_eq!(traces[0].reward, Some(0.0));
assert_eq!(traces[1].kind, "policy_violation");
assert_eq!(traces[2].kind, "replan_exhausted");
}
#[test]
fn maintenance_input_prices_backlog_off_live_stats() {
let stats = car_memgine::memsys::MemoryStats {
total_facts: 100,
outstanding_outdated: 5,
facts_superseded: 10,
..Default::default()
};
let input = maintenance_input_from_stats(&stats);
assert_eq!(input.dirty_regions, 15);
assert_eq!(input.total_regions, 100);
assert!((input.global_structural_gain - 0.10).abs() < 1e-9);
let d = decide_maintenance(&input);
assert_eq!(
d.strategy,
car_memgine::maintenance::MaintenanceStrategy::Localized,
"{d:?}"
);
}
#[test]
fn maintenance_input_clean_store_is_noop() {
let input = maintenance_input_from_stats(&car_memgine::memsys::MemoryStats::default());
let d = decide_maintenance(&input);
assert_eq!(d.strategy, car_memgine::maintenance::MaintenanceStrategy::NoOp);
}
#[tokio::test]
async fn memory_evolution_dry_run_reports_without_consolidating() {
let engine = Arc::new(tokio::sync::Mutex::new(MemgineEngine::new(None)));
let out = run_memory_evolution(&engine, true).await.unwrap();
assert!(out.summary.starts_with("dry_run"), "{out:?}");
assert!(!out.applied, "dry run must not count as applied (S2)");
let real = run_memory_evolution(&engine, false).await.unwrap();
assert!(real.summary.contains("\"mechanism\":\"consolidate\""), "{real:?}");
assert!(real.applied);
}
#[tokio::test]
async fn skills_evolution_without_inference_is_an_honest_error() {
let engine = Arc::new(tokio::sync::Mutex::new(MemgineEngine::new(None)));
let err = run_skills_evolution(&engine, &[], false).await.unwrap_err();
assert_eq!(err, "no inference engine");
}
#[test]
fn skills_backoff_widens_exponentially_and_resets_on_recovery() {
let mut b = SkillsBackoff::default();
let flagged = vec!["web".to_string()];
assert_eq!(b.due(&flagged, 1), flagged);
b.note_attempt("web", 1);
assert!(b.due(&flagged, 2).is_empty(), "tick 2 still backing off");
assert_eq!(b.due(&flagged, 3), flagged);
b.note_attempt("web", 3);
assert!(b.due(&flagged, 6).is_empty());
assert_eq!(b.due(&flagged, 7), flagged);
b.reset_recovered(&[]);
b.note_attempt("web", 10);
assert_eq!(b.due(&flagged, 12), flagged);
}
#[test]
fn skills_backoff_exponent_is_capped() {
let mut b = SkillsBackoff::default();
for t in 0..20 {
b.note_attempt("stuck", t);
}
let flagged = vec!["stuck".to_string()];
assert!(b.due(&flagged, 19 + 63).is_empty());
assert_eq!(b.due(&flagged, 19 + 64), flagged);
}
#[test]
fn skills_backoff_only_gates_the_attempted_domain() {
let mut b = SkillsBackoff::default();
let flagged = vec!["a".to_string(), "b".to_string()];
b.note_attempt("a", 1);
assert_eq!(b.due(&flagged, 2), vec!["b".to_string()]);
}
#[test]
fn cycle_guard_blocks_overlap_and_releases_on_drop() {
let guard = CycleGuard::default();
let token = guard.try_begin().expect("first claim");
assert!(guard.try_begin().is_none(), "in-flight cycle must block");
drop(token);
assert!(guard.try_begin().is_some(), "released after drop");
}
#[test]
fn cycle_guard_releases_even_when_cycle_errors() {
let guard = CycleGuard::default();
let r: Result<(), ()> = (|| {
let _token = guard.try_begin().unwrap();
Err(())
})();
assert!(r.is_err());
assert!(guard.try_begin().is_some(), "drop on error path releases");
}
}