car-server-core 0.9.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
Documentation
//! Runtime-side convergence machinery for the UI-improvement loop.
//!
//! The agent itself is stateless about loop-level concerns: it
//! decides one patch at a time, on the current report, with a
//! double-patch guard for the same render. What it cannot see is
//! the *temporal* shape of its own decisions across renders —
//! whether successive reports are walking through the same patch
//! over and over (A→B→A oscillation), or whether the loop is
//! making forward progress.
//!
//! That observation belongs to the caller, per neo's review:
//! "controllers use workqueue backoff; reconcilers stay stateless."
//! This module owns the per-surface history needed to detect
//! oscillation and short-circuit the apply path when one is
//! suspected.

use std::collections::VecDeque;

use dashmap::DashMap;

/// Maximum agent-driven patches per surface before the runtime
/// hard-stops the loop. Picked to be comfortably larger than the
/// strategy library's typical convergence depth (1-2 rounds for the
/// builtin strategies, 3-5 plausibly for richer libraries) while
/// being tight enough that an oscillating or buggy strategy can't
/// run away. Tunable per-process by replacing the
/// `IterationBudget::new` call with `IterationBudget::with_max`.
pub const DEFAULT_MAX_ITERATIONS: u32 = 20;

/// Number of recent patch hashes retained per surface. Tuned for
/// the v1 loop where strategies converge in 1-2 rounds; oscillation
/// signals are typically `last == current`, so a small window is
/// plenty. Bumping this hurts cache locality without paying for
/// itself until patch graphs get wider.
pub const HISTORY_DEPTH: usize = 6;

/// Per-surface oscillation detector. Records recent `patch_hash`
/// values from `Decision::Patch` outcomes and reports whether a
/// fresh proposal would repeat one we've seen recently — that's
/// the A→B→A pattern the loop must not commit to.
///
/// Thread-safety: backed by `DashMap` so the handler can hold it
/// behind `Arc` and call `check_and_record` from any async context
/// without taking a tokio Mutex first.
#[derive(Default)]
pub struct OscillationDetector {
    history: DashMap<String, VecDeque<u64>>,
}

impl OscillationDetector {
    pub fn new() -> Self {
        Self::default()
    }

    /// Test whether `patch_hash` would oscillate against the
    /// surface's recent history, and either record it (no
    /// oscillation, proceed) or leave history untouched
    /// (oscillation, caller suppresses the apply).
    ///
    /// Returns `true` when the caller should APPLY the patch.
    /// Returns `false` when the patch repeats one in the recent
    /// window — caller logs and skips.
    pub fn check_and_record(&self, surface_id: &str, patch_hash: u64) -> bool {
        let mut entry = self
            .history
            .entry(surface_id.to_string())
            .or_insert_with(|| VecDeque::with_capacity(HISTORY_DEPTH));
        if entry.iter().any(|h| *h == patch_hash) {
            // Caller will skip; don't record so the window slides
            // forward only on actual applies.
            return false;
        }
        if entry.len() >= HISTORY_DEPTH {
            entry.pop_front();
        }
        entry.push_back(patch_hash);
        true
    }

    /// Clear history for a surface — use when the surface is
    /// recreated (sequence counter resets, prior history is
    /// stale).
    pub fn reset(&self, surface_id: &str) {
        self.history.remove(surface_id);
    }

    /// How many entries are tracked for a surface. Test-only
    /// surface; not part of the steady-state hot path.
    #[cfg(test)]
    pub fn depth(&self, surface_id: &str) -> usize {
        self.history
            .get(surface_id)
            .map(|e| e.len())
            .unwrap_or(0)
    }
}

/// Per-surface iteration budget. Counts agent-driven patches per
/// surface; once a surface hits the cap, the handler short-circuits
/// the agent entirely until the surface is reset (typically via
/// `delete_surface` + `create_surface`, or an explicit `reset`).
///
/// The budget is in-memory: a single-process daemon keeps its
/// counters for the lifetime of the process. A durable backend
/// using `car-state::StateStore::durable` is a v2 — the runtime's
/// transient strategy library doesn't yet justify persisting
/// iteration counts across restarts.
pub struct IterationBudget {
    counts: DashMap<String, u32>,
    max_iterations: u32,
}

impl IterationBudget {
    pub fn new() -> Self {
        Self::with_max(DEFAULT_MAX_ITERATIONS)
    }

    pub fn with_max(max: u32) -> Self {
        Self {
            counts: DashMap::new(),
            max_iterations: max,
        }
    }

    /// Atomic check-and-increment. Returns `true` when the surface
    /// had budget AND the count has been bumped to claim it;
    /// `false` when the surface is at the cap and the caller must
    /// skip the agent invocation. The two steps happen under
    /// DashMap's per-shard lock so concurrent reports for the same
    /// surface can't both pass at `count = max - 1`.
    ///
    /// Pairs with `refund` for the failed-apply path: the budget
    /// is consumed up-front; if the patch apply errors, the
    /// handler calls `refund` to release the slot. That keeps the
    /// "budget reflects successful applies" invariant while
    /// eliminating the original `has_budget + record_apply` race.
    pub fn try_consume(&self, surface_id: &str) -> bool {
        let mut entry = self.counts.entry(surface_id.to_string()).or_insert(0);
        if *entry >= self.max_iterations {
            return false;
        }
        *entry += 1;
        true
    }

    /// Release a budget slot consumed by `try_consume` when the
    /// downstream apply turned out to fail. Saturates at zero so a
    /// stray refund without a matching consume can't drive the
    /// counter negative.
    pub fn refund(&self, surface_id: &str) {
        let mut entry = self.counts.entry(surface_id.to_string()).or_insert(0);
        *entry = entry.saturating_sub(1);
    }

    /// Clear the surface's counter. Surface recreation triggers
    /// this via the caller; the loop is allowed to start over.
    pub fn reset(&self, surface_id: &str) {
        self.counts.remove(surface_id);
    }

    /// Current iteration count for a surface. Public so the
    /// hard-stop log can include it.
    pub fn count(&self, surface_id: &str) -> u32 {
        self.counts.get(surface_id).map(|c| *c).unwrap_or(0)
    }

    pub fn max(&self) -> u32 {
        self.max_iterations
    }
}

impl Default for IterationBudget {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn first_patch_records_and_applies() {
        let det = OscillationDetector::new();
        assert!(det.check_and_record("s", 42));
        assert_eq!(det.depth("s"), 1);
    }

    #[test]
    fn same_patch_repeated_blocks_apply() {
        let det = OscillationDetector::new();
        assert!(det.check_and_record("s", 42));
        // Second time around — oscillation, suppress.
        assert!(!det.check_and_record("s", 42));
        // History should still show only one entry — we don't
        // record on skipped applies.
        assert_eq!(det.depth("s"), 1);
    }

    #[test]
    fn distinct_patches_compose_until_window_fills() {
        let det = OscillationDetector::new();
        for i in 0..(HISTORY_DEPTH as u64) {
            assert!(det.check_and_record("s", i));
        }
        assert_eq!(det.depth("s"), HISTORY_DEPTH);
        // One more pushes the oldest out — old hash 0 is now
        // forgotten and would apply again (caller doesn't, in
        // practice, because the agent's no-double-patch guard
        // catches same-sequence repeats; this is the deeper
        // backoff for cross-sequence A→B→A).
        let next = HISTORY_DEPTH as u64;
        assert!(det.check_and_record("s", next));
        assert_eq!(det.depth("s"), HISTORY_DEPTH);
        // Hash 0 fell out — would be allowed if proposed again.
        assert!(det.check_and_record("s", 0));
    }

    #[test]
    fn detects_a_b_a_pattern() {
        let det = OscillationDetector::new();
        let a: u64 = 0xAAAA;
        let b: u64 = 0xBBBB;
        assert!(det.check_and_record("s", a)); // A applied
        assert!(det.check_and_record("s", b)); // B applied
        // C is fine — fresh hash
        assert!(det.check_and_record("s", 0xCCCC));
        // Re-proposing A within the window — block.
        assert!(!det.check_and_record("s", a));
    }

    #[test]
    fn reset_clears_history() {
        let det = OscillationDetector::new();
        det.check_and_record("s", 1);
        det.check_and_record("s", 2);
        det.reset("s");
        assert_eq!(det.depth("s"), 0);
        // After reset, the old hashes are free to apply again.
        assert!(det.check_and_record("s", 1));
    }

    // --- IterationBudget tests ---

    #[test]
    fn iteration_budget_starts_full() {
        let b = IterationBudget::with_max(3);
        assert_eq!(b.count("s"), 0);
        assert!(b.try_consume("s"));
        assert_eq!(b.count("s"), 1);
    }

    #[test]
    fn iteration_budget_caps_after_max() {
        let b = IterationBudget::with_max(2);
        assert!(b.try_consume("s"));
        assert!(b.try_consume("s"));
        // Cap reached — further try_consume calls return false
        // and DO NOT bump the counter.
        assert!(!b.try_consume("s"));
        assert!(!b.try_consume("s"));
        assert_eq!(b.count("s"), 2);
    }

    #[test]
    fn iteration_budget_reset_clears() {
        let b = IterationBudget::with_max(2);
        b.try_consume("s");
        b.try_consume("s");
        assert!(!b.try_consume("s"));
        b.reset("s");
        assert!(b.try_consume("s"));
        assert_eq!(b.count("s"), 1);
    }

    #[test]
    fn iteration_budget_per_surface() {
        let b = IterationBudget::with_max(1);
        assert!(b.try_consume("a"));
        assert!(!b.try_consume("a"));
        assert!(b.try_consume("b"), "budget is per-surface");
    }

    #[test]
    fn iteration_budget_refund_releases_slot() {
        let b = IterationBudget::with_max(2);
        assert!(b.try_consume("s"));
        assert!(b.try_consume("s"));
        assert!(!b.try_consume("s"), "at cap");
        b.refund("s");
        assert!(b.try_consume("s"), "refund opens a slot back up");
    }

    #[test]
    fn iteration_budget_refund_saturates_at_zero() {
        let b = IterationBudget::with_max(2);
        // No consume yet — stray refund should not underflow.
        b.refund("s");
        b.refund("s");
        assert_eq!(b.count("s"), 0);
        assert!(b.try_consume("s"));
    }

    #[test]
    fn iteration_budget_concurrent_consume_respects_cap() {
        use std::sync::Arc;
        // 32 threads all try to consume on the same surface
        // simultaneously. Cap is 10 — exactly 10 should succeed,
        // no more.
        let b = Arc::new(IterationBudget::with_max(10));
        let mut handles = vec![];
        for _ in 0..32 {
            let b = b.clone();
            handles.push(std::thread::spawn(move || b.try_consume("s")));
        }
        let successes = handles
            .into_iter()
            .map(|h| h.join().unwrap())
            .filter(|ok| *ok)
            .count();
        assert_eq!(successes, 10, "exactly cap-many consumes must succeed");
        assert_eq!(b.count("s"), 10);
    }

    #[test]
    fn per_surface_history_isolated() {
        let det = OscillationDetector::new();
        det.check_and_record("a", 42);
        det.check_and_record("b", 42);
        // Same hash on a different surface is independent.
        assert!(!det.check_and_record("a", 42));
        assert!(!det.check_and_record("b", 42));
    }
}