Skip to main content

car_server_core/
ui_agent_loop.rs

1//! Runtime-side convergence machinery for the UI-improvement loop.
2//!
3//! The agent itself is stateless about loop-level concerns: it
4//! decides one patch at a time, on the current report, with a
5//! double-patch guard for the same render. What it cannot see is
6//! the *temporal* shape of its own decisions across renders —
7//! whether successive reports are walking through the same patch
8//! over and over (A→B→A oscillation), or whether the loop is
9//! making forward progress.
10//!
11//! That observation belongs to the caller, per neo's review:
12//! "controllers use workqueue backoff; reconcilers stay stateless."
13//! This module owns the per-surface history needed to detect
14//! oscillation and short-circuit the apply path when one is
15//! suspected.
16
17use std::collections::VecDeque;
18
19use dashmap::DashMap;
20
21/// Maximum agent-driven patches per surface before the runtime
22/// hard-stops the loop. Picked to be comfortably larger than the
23/// strategy library's typical convergence depth (1-2 rounds for the
24/// builtin strategies, 3-5 plausibly for richer libraries) while
25/// being tight enough that an oscillating or buggy strategy can't
26/// run away. Tunable per-process by replacing the
27/// `IterationBudget::new` call with `IterationBudget::with_max`.
28pub const DEFAULT_MAX_ITERATIONS: u32 = 20;
29
30/// Number of recent patch hashes retained per surface. Tuned for
31/// the v1 loop where strategies converge in 1-2 rounds; oscillation
32/// signals are typically `last == current`, so a small window is
33/// plenty. Bumping this hurts cache locality without paying for
34/// itself until patch graphs get wider.
35pub const HISTORY_DEPTH: usize = 6;
36
37/// Per-surface oscillation detector. Records recent `patch_hash`
38/// values from `Decision::Patch` outcomes and reports whether a
39/// fresh proposal would repeat one we've seen recently — that's
40/// the A→B→A pattern the loop must not commit to.
41///
42/// Thread-safety: backed by `DashMap` so the handler can hold it
43/// behind `Arc` and call `check_and_record` from any async context
44/// without taking a tokio Mutex first.
45#[derive(Default)]
46pub struct OscillationDetector {
47    history: DashMap<String, VecDeque<u64>>,
48}
49
50impl OscillationDetector {
51    pub fn new() -> Self {
52        Self::default()
53    }
54
55    /// Test whether `patch_hash` would oscillate against the
56    /// surface's recent history, and either record it (no
57    /// oscillation, proceed) or leave history untouched
58    /// (oscillation, caller suppresses the apply).
59    ///
60    /// Returns `true` when the caller should APPLY the patch.
61    /// Returns `false` when the patch repeats one in the recent
62    /// window — caller logs and skips.
63    pub fn check_and_record(&self, surface_id: &str, patch_hash: u64) -> bool {
64        let mut entry = self
65            .history
66            .entry(surface_id.to_string())
67            .or_insert_with(|| VecDeque::with_capacity(HISTORY_DEPTH));
68        if entry.iter().any(|h| *h == patch_hash) {
69            // Caller will skip; don't record so the window slides
70            // forward only on actual applies.
71            return false;
72        }
73        if entry.len() >= HISTORY_DEPTH {
74            entry.pop_front();
75        }
76        entry.push_back(patch_hash);
77        true
78    }
79
80    /// Clear history for a surface — use when the surface is
81    /// recreated (sequence counter resets, prior history is
82    /// stale).
83    pub fn reset(&self, surface_id: &str) {
84        self.history.remove(surface_id);
85    }
86
87    /// How many entries are tracked for a surface. Test-only
88    /// surface; not part of the steady-state hot path.
89    #[cfg(test)]
90    pub fn depth(&self, surface_id: &str) -> usize {
91        self.history
92            .get(surface_id)
93            .map(|e| e.len())
94            .unwrap_or(0)
95    }
96}
97
98/// Per-surface iteration budget. Counts agent-driven patches per
99/// surface; once a surface hits the cap, the handler short-circuits
100/// the agent entirely until the surface is reset (typically via
101/// `delete_surface` + `create_surface`, or an explicit `reset`).
102///
103/// The budget is in-memory: a single-process daemon keeps its
104/// counters for the lifetime of the process. A durable backend
105/// using `car-state::StateStore::durable` is a v2 — the runtime's
106/// transient strategy library doesn't yet justify persisting
107/// iteration counts across restarts.
108pub struct IterationBudget {
109    counts: DashMap<String, u32>,
110    max_iterations: u32,
111}
112
113impl IterationBudget {
114    pub fn new() -> Self {
115        Self::with_max(DEFAULT_MAX_ITERATIONS)
116    }
117
118    pub fn with_max(max: u32) -> Self {
119        Self {
120            counts: DashMap::new(),
121            max_iterations: max,
122        }
123    }
124
125    /// Atomic check-and-increment. Returns `true` when the surface
126    /// had budget AND the count has been bumped to claim it;
127    /// `false` when the surface is at the cap and the caller must
128    /// skip the agent invocation. The two steps happen under
129    /// DashMap's per-shard lock so concurrent reports for the same
130    /// surface can't both pass at `count = max - 1`.
131    ///
132    /// Pairs with `refund` for the failed-apply path: the budget
133    /// is consumed up-front; if the patch apply errors, the
134    /// handler calls `refund` to release the slot. That keeps the
135    /// "budget reflects successful applies" invariant while
136    /// eliminating the original `has_budget + record_apply` race.
137    pub fn try_consume(&self, surface_id: &str) -> bool {
138        let mut entry = self.counts.entry(surface_id.to_string()).or_insert(0);
139        if *entry >= self.max_iterations {
140            return false;
141        }
142        *entry += 1;
143        true
144    }
145
146    /// Release a budget slot consumed by `try_consume` when the
147    /// downstream apply turned out to fail. Saturates at zero so a
148    /// stray refund without a matching consume can't drive the
149    /// counter negative.
150    pub fn refund(&self, surface_id: &str) {
151        let mut entry = self.counts.entry(surface_id.to_string()).or_insert(0);
152        *entry = entry.saturating_sub(1);
153    }
154
155    /// Clear the surface's counter. Surface recreation triggers
156    /// this via the caller; the loop is allowed to start over.
157    pub fn reset(&self, surface_id: &str) {
158        self.counts.remove(surface_id);
159    }
160
161    /// Current iteration count for a surface. Public so the
162    /// hard-stop log can include it.
163    pub fn count(&self, surface_id: &str) -> u32 {
164        self.counts.get(surface_id).map(|c| *c).unwrap_or(0)
165    }
166
167    pub fn max(&self) -> u32 {
168        self.max_iterations
169    }
170}
171
172impl Default for IterationBudget {
173    fn default() -> Self {
174        Self::new()
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181
182    #[test]
183    fn first_patch_records_and_applies() {
184        let det = OscillationDetector::new();
185        assert!(det.check_and_record("s", 42));
186        assert_eq!(det.depth("s"), 1);
187    }
188
189    #[test]
190    fn same_patch_repeated_blocks_apply() {
191        let det = OscillationDetector::new();
192        assert!(det.check_and_record("s", 42));
193        // Second time around — oscillation, suppress.
194        assert!(!det.check_and_record("s", 42));
195        // History should still show only one entry — we don't
196        // record on skipped applies.
197        assert_eq!(det.depth("s"), 1);
198    }
199
200    #[test]
201    fn distinct_patches_compose_until_window_fills() {
202        let det = OscillationDetector::new();
203        for i in 0..(HISTORY_DEPTH as u64) {
204            assert!(det.check_and_record("s", i));
205        }
206        assert_eq!(det.depth("s"), HISTORY_DEPTH);
207        // One more pushes the oldest out — old hash 0 is now
208        // forgotten and would apply again (caller doesn't, in
209        // practice, because the agent's no-double-patch guard
210        // catches same-sequence repeats; this is the deeper
211        // backoff for cross-sequence A→B→A).
212        let next = HISTORY_DEPTH as u64;
213        assert!(det.check_and_record("s", next));
214        assert_eq!(det.depth("s"), HISTORY_DEPTH);
215        // Hash 0 fell out — would be allowed if proposed again.
216        assert!(det.check_and_record("s", 0));
217    }
218
219    #[test]
220    fn detects_a_b_a_pattern() {
221        let det = OscillationDetector::new();
222        let a: u64 = 0xAAAA;
223        let b: u64 = 0xBBBB;
224        assert!(det.check_and_record("s", a)); // A applied
225        assert!(det.check_and_record("s", b)); // B applied
226        // C is fine — fresh hash
227        assert!(det.check_and_record("s", 0xCCCC));
228        // Re-proposing A within the window — block.
229        assert!(!det.check_and_record("s", a));
230    }
231
232    #[test]
233    fn reset_clears_history() {
234        let det = OscillationDetector::new();
235        det.check_and_record("s", 1);
236        det.check_and_record("s", 2);
237        det.reset("s");
238        assert_eq!(det.depth("s"), 0);
239        // After reset, the old hashes are free to apply again.
240        assert!(det.check_and_record("s", 1));
241    }
242
243    // --- IterationBudget tests ---
244
245    #[test]
246    fn iteration_budget_starts_full() {
247        let b = IterationBudget::with_max(3);
248        assert_eq!(b.count("s"), 0);
249        assert!(b.try_consume("s"));
250        assert_eq!(b.count("s"), 1);
251    }
252
253    #[test]
254    fn iteration_budget_caps_after_max() {
255        let b = IterationBudget::with_max(2);
256        assert!(b.try_consume("s"));
257        assert!(b.try_consume("s"));
258        // Cap reached — further try_consume calls return false
259        // and DO NOT bump the counter.
260        assert!(!b.try_consume("s"));
261        assert!(!b.try_consume("s"));
262        assert_eq!(b.count("s"), 2);
263    }
264
265    #[test]
266    fn iteration_budget_reset_clears() {
267        let b = IterationBudget::with_max(2);
268        b.try_consume("s");
269        b.try_consume("s");
270        assert!(!b.try_consume("s"));
271        b.reset("s");
272        assert!(b.try_consume("s"));
273        assert_eq!(b.count("s"), 1);
274    }
275
276    #[test]
277    fn iteration_budget_per_surface() {
278        let b = IterationBudget::with_max(1);
279        assert!(b.try_consume("a"));
280        assert!(!b.try_consume("a"));
281        assert!(b.try_consume("b"), "budget is per-surface");
282    }
283
284    #[test]
285    fn iteration_budget_refund_releases_slot() {
286        let b = IterationBudget::with_max(2);
287        assert!(b.try_consume("s"));
288        assert!(b.try_consume("s"));
289        assert!(!b.try_consume("s"), "at cap");
290        b.refund("s");
291        assert!(b.try_consume("s"), "refund opens a slot back up");
292    }
293
294    #[test]
295    fn iteration_budget_refund_saturates_at_zero() {
296        let b = IterationBudget::with_max(2);
297        // No consume yet — stray refund should not underflow.
298        b.refund("s");
299        b.refund("s");
300        assert_eq!(b.count("s"), 0);
301        assert!(b.try_consume("s"));
302    }
303
304    #[test]
305    fn iteration_budget_concurrent_consume_respects_cap() {
306        use std::sync::Arc;
307        // 32 threads all try to consume on the same surface
308        // simultaneously. Cap is 10 — exactly 10 should succeed,
309        // no more.
310        let b = Arc::new(IterationBudget::with_max(10));
311        let mut handles = vec![];
312        for _ in 0..32 {
313            let b = b.clone();
314            handles.push(std::thread::spawn(move || b.try_consume("s")));
315        }
316        let successes = handles
317            .into_iter()
318            .map(|h| h.join().unwrap())
319            .filter(|ok| *ok)
320            .count();
321        assert_eq!(successes, 10, "exactly cap-many consumes must succeed");
322        assert_eq!(b.count("s"), 10);
323    }
324
325    #[test]
326    fn per_surface_history_isolated() {
327        let det = OscillationDetector::new();
328        det.check_and_record("a", 42);
329        det.check_and_record("b", 42);
330        // Same hash on a different surface is independent.
331        assert!(!det.check_and_record("a", 42));
332        assert!(!det.check_and_record("b", 42));
333    }
334}