Skip to main content

car_server_core/
ui_agent_loop.rs

1//! Runtime-side convergence machinery for the UI-improvement loop.
2//!
3//! The agent itself is stateless about loop-level concerns: it
4//! decides one patch at a time, on the current report, with a
5//! double-patch guard for the same render. What it cannot see is
6//! the *temporal* shape of its own decisions across renders —
7//! whether successive reports are walking through the same patch
8//! over and over (A→B→A oscillation), or whether the loop is
9//! making forward progress.
10//!
11//! That observation belongs to the caller, per neo's review:
12//! "controllers use workqueue backoff; reconcilers stay stateless."
13//! This module owns the per-surface history needed to detect
14//! oscillation and short-circuit the apply path when one is
15//! suspected.
16
17use std::collections::VecDeque;
18
19use dashmap::DashMap;
20
21/// Maximum agent-driven patches per surface before the runtime
22/// hard-stops the loop. Picked to be comfortably larger than the
23/// strategy library's typical convergence depth (1-2 rounds for the
24/// builtin strategies, 3-5 plausibly for richer libraries) while
25/// being tight enough that an oscillating or buggy strategy can't
26/// run away. Tunable per-process by replacing the
27/// `IterationBudget::new` call with `IterationBudget::with_max`.
28pub const DEFAULT_MAX_ITERATIONS: u32 = 20;
29
30/// Number of recent patch hashes retained per surface. Tuned for
31/// the v1 loop where strategies converge in 1-2 rounds; oscillation
32/// signals are typically `last == current`, so a small window is
33/// plenty. Bumping this hurts cache locality without paying for
34/// itself until patch graphs get wider.
35pub const HISTORY_DEPTH: usize = 6;
36
37/// Per-surface oscillation detector. Records recent `patch_hash`
38/// values from `Decision::Patch` outcomes and reports whether a
39/// fresh proposal would repeat one we've seen recently — that's
40/// the A→B→A pattern the loop must not commit to.
41///
42/// Thread-safety: backed by `DashMap` so the handler can hold it
43/// behind `Arc` and call `check_and_record` from any async context
44/// without taking a tokio Mutex first.
45#[derive(Default)]
46pub struct OscillationDetector {
47    history: DashMap<String, VecDeque<u64>>,
48}
49
50impl OscillationDetector {
51    pub fn new() -> Self {
52        Self::default()
53    }
54
55    /// Test whether `patch_hash` would oscillate against the
56    /// surface's recent history, and either record it (no
57    /// oscillation, proceed) or leave history untouched
58    /// (oscillation, caller suppresses the apply).
59    ///
60    /// Returns `true` when the caller should APPLY the patch.
61    /// Returns `false` when the patch repeats one in the recent
62    /// window — caller logs and skips.
63    pub fn check_and_record(&self, surface_id: &str, patch_hash: u64) -> bool {
64        let mut entry = self
65            .history
66            .entry(surface_id.to_string())
67            .or_insert_with(|| VecDeque::with_capacity(HISTORY_DEPTH));
68        if entry.iter().any(|h| *h == patch_hash) {
69            // Caller will skip; don't record so the window slides
70            // forward only on actual applies.
71            return false;
72        }
73        if entry.len() >= HISTORY_DEPTH {
74            entry.pop_front();
75        }
76        entry.push_back(patch_hash);
77        true
78    }
79
80    /// Clear history for a surface — use when the surface is
81    /// recreated (sequence counter resets, prior history is
82    /// stale).
83    pub fn reset(&self, surface_id: &str) {
84        self.history.remove(surface_id);
85    }
86
87    /// How many entries are tracked for a surface. Test-only
88    /// surface; not part of the steady-state hot path.
89    #[cfg(test)]
90    pub fn depth(&self, surface_id: &str) -> usize {
91        self.history.get(surface_id).map(|e| e.len()).unwrap_or(0)
92    }
93}
94
95/// Per-surface iteration budget. Counts agent-driven patches per
96/// surface; once a surface hits the cap, the handler short-circuits
97/// the agent entirely until the surface is reset (typically via
98/// `delete_surface` + `create_surface`, or an explicit `reset`).
99///
100/// The budget is in-memory: a single-process daemon keeps its
101/// counters for the lifetime of the process. A durable backend
102/// using `car-state::StateStore::durable` is a v2 — the runtime's
103/// transient strategy library doesn't yet justify persisting
104/// iteration counts across restarts.
105pub struct IterationBudget {
106    counts: DashMap<String, u32>,
107    max_iterations: u32,
108}
109
110impl IterationBudget {
111    pub fn new() -> Self {
112        Self::with_max(DEFAULT_MAX_ITERATIONS)
113    }
114
115    pub fn with_max(max: u32) -> Self {
116        Self {
117            counts: DashMap::new(),
118            max_iterations: max,
119        }
120    }
121
122    /// Atomic check-and-increment. Returns `true` when the surface
123    /// had budget AND the count has been bumped to claim it;
124    /// `false` when the surface is at the cap and the caller must
125    /// skip the agent invocation. The two steps happen under
126    /// DashMap's per-shard lock so concurrent reports for the same
127    /// surface can't both pass at `count = max - 1`.
128    ///
129    /// Pairs with `refund` for the failed-apply path: the budget
130    /// is consumed up-front; if the patch apply errors, the
131    /// handler calls `refund` to release the slot. That keeps the
132    /// "budget reflects successful applies" invariant while
133    /// eliminating the original `has_budget + record_apply` race.
134    pub fn try_consume(&self, surface_id: &str) -> bool {
135        let mut entry = self.counts.entry(surface_id.to_string()).or_insert(0);
136        if *entry >= self.max_iterations {
137            return false;
138        }
139        *entry += 1;
140        true
141    }
142
143    /// Release a budget slot consumed by `try_consume` when the
144    /// downstream apply turned out to fail. Saturates at zero so a
145    /// stray refund without a matching consume can't drive the
146    /// counter negative.
147    pub fn refund(&self, surface_id: &str) {
148        let mut entry = self.counts.entry(surface_id.to_string()).or_insert(0);
149        *entry = entry.saturating_sub(1);
150    }
151
152    /// Clear the surface's counter. Surface recreation triggers
153    /// this via the caller; the loop is allowed to start over.
154    pub fn reset(&self, surface_id: &str) {
155        self.counts.remove(surface_id);
156    }
157
158    /// Current iteration count for a surface. Public so the
159    /// hard-stop log can include it.
160    pub fn count(&self, surface_id: &str) -> u32 {
161        self.counts.get(surface_id).map(|c| *c).unwrap_or(0)
162    }
163
164    pub fn max(&self) -> u32 {
165        self.max_iterations
166    }
167}
168
169impl Default for IterationBudget {
170    fn default() -> Self {
171        Self::new()
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178
179    #[test]
180    fn first_patch_records_and_applies() {
181        let det = OscillationDetector::new();
182        assert!(det.check_and_record("s", 42));
183        assert_eq!(det.depth("s"), 1);
184    }
185
186    #[test]
187    fn same_patch_repeated_blocks_apply() {
188        let det = OscillationDetector::new();
189        assert!(det.check_and_record("s", 42));
190        // Second time around — oscillation, suppress.
191        assert!(!det.check_and_record("s", 42));
192        // History should still show only one entry — we don't
193        // record on skipped applies.
194        assert_eq!(det.depth("s"), 1);
195    }
196
197    #[test]
198    fn distinct_patches_compose_until_window_fills() {
199        let det = OscillationDetector::new();
200        for i in 0..(HISTORY_DEPTH as u64) {
201            assert!(det.check_and_record("s", i));
202        }
203        assert_eq!(det.depth("s"), HISTORY_DEPTH);
204        // One more pushes the oldest out — old hash 0 is now
205        // forgotten and would apply again (caller doesn't, in
206        // practice, because the agent's no-double-patch guard
207        // catches same-sequence repeats; this is the deeper
208        // backoff for cross-sequence A→B→A).
209        let next = HISTORY_DEPTH as u64;
210        assert!(det.check_and_record("s", next));
211        assert_eq!(det.depth("s"), HISTORY_DEPTH);
212        // Hash 0 fell out — would be allowed if proposed again.
213        assert!(det.check_and_record("s", 0));
214    }
215
216    #[test]
217    fn detects_a_b_a_pattern() {
218        let det = OscillationDetector::new();
219        let a: u64 = 0xAAAA;
220        let b: u64 = 0xBBBB;
221        assert!(det.check_and_record("s", a)); // A applied
222        assert!(det.check_and_record("s", b)); // B applied
223                                               // C is fine — fresh hash
224        assert!(det.check_and_record("s", 0xCCCC));
225        // Re-proposing A within the window — block.
226        assert!(!det.check_and_record("s", a));
227    }
228
229    #[test]
230    fn reset_clears_history() {
231        let det = OscillationDetector::new();
232        det.check_and_record("s", 1);
233        det.check_and_record("s", 2);
234        det.reset("s");
235        assert_eq!(det.depth("s"), 0);
236        // After reset, the old hashes are free to apply again.
237        assert!(det.check_and_record("s", 1));
238    }
239
240    // --- IterationBudget tests ---
241
242    #[test]
243    fn iteration_budget_starts_full() {
244        let b = IterationBudget::with_max(3);
245        assert_eq!(b.count("s"), 0);
246        assert!(b.try_consume("s"));
247        assert_eq!(b.count("s"), 1);
248    }
249
250    #[test]
251    fn iteration_budget_caps_after_max() {
252        let b = IterationBudget::with_max(2);
253        assert!(b.try_consume("s"));
254        assert!(b.try_consume("s"));
255        // Cap reached — further try_consume calls return false
256        // and DO NOT bump the counter.
257        assert!(!b.try_consume("s"));
258        assert!(!b.try_consume("s"));
259        assert_eq!(b.count("s"), 2);
260    }
261
262    #[test]
263    fn iteration_budget_reset_clears() {
264        let b = IterationBudget::with_max(2);
265        b.try_consume("s");
266        b.try_consume("s");
267        assert!(!b.try_consume("s"));
268        b.reset("s");
269        assert!(b.try_consume("s"));
270        assert_eq!(b.count("s"), 1);
271    }
272
273    #[test]
274    fn iteration_budget_per_surface() {
275        let b = IterationBudget::with_max(1);
276        assert!(b.try_consume("a"));
277        assert!(!b.try_consume("a"));
278        assert!(b.try_consume("b"), "budget is per-surface");
279    }
280
281    #[test]
282    fn iteration_budget_refund_releases_slot() {
283        let b = IterationBudget::with_max(2);
284        assert!(b.try_consume("s"));
285        assert!(b.try_consume("s"));
286        assert!(!b.try_consume("s"), "at cap");
287        b.refund("s");
288        assert!(b.try_consume("s"), "refund opens a slot back up");
289    }
290
291    #[test]
292    fn iteration_budget_refund_saturates_at_zero() {
293        let b = IterationBudget::with_max(2);
294        // No consume yet — stray refund should not underflow.
295        b.refund("s");
296        b.refund("s");
297        assert_eq!(b.count("s"), 0);
298        assert!(b.try_consume("s"));
299    }
300
301    #[test]
302    fn iteration_budget_concurrent_consume_respects_cap() {
303        use std::sync::Arc;
304        // 32 threads all try to consume on the same surface
305        // simultaneously. Cap is 10 — exactly 10 should succeed,
306        // no more.
307        let b = Arc::new(IterationBudget::with_max(10));
308        let mut handles = vec![];
309        for _ in 0..32 {
310            let b = b.clone();
311            handles.push(std::thread::spawn(move || b.try_consume("s")));
312        }
313        let successes = handles
314            .into_iter()
315            .map(|h| h.join().unwrap())
316            .filter(|ok| *ok)
317            .count();
318        assert_eq!(successes, 10, "exactly cap-many consumes must succeed");
319        assert_eq!(b.count("s"), 10);
320    }
321
322    #[test]
323    fn per_surface_history_isolated() {
324        let det = OscillationDetector::new();
325        det.check_and_record("a", 42);
326        det.check_and_record("b", 42);
327        // Same hash on a different surface is independent.
328        assert!(!det.check_and_record("a", 42));
329        assert!(!det.check_and_record("b", 42));
330    }
331}