car_server_core/ui_agent_loop.rs
1//! Runtime-side convergence machinery for the UI-improvement loop.
2//!
3//! The agent itself is stateless about loop-level concerns: it
4//! decides one patch at a time, on the current report, with a
5//! double-patch guard for the same render. What it cannot see is
6//! the *temporal* shape of its own decisions across renders —
7//! whether successive reports are walking through the same patch
8//! over and over (A→B→A oscillation), or whether the loop is
9//! making forward progress.
10//!
11//! That observation belongs to the caller, per neo's review:
12//! "controllers use workqueue backoff; reconcilers stay stateless."
13//! This module owns the per-surface history needed to detect
14//! oscillation and short-circuit the apply path when one is
15//! suspected.
16
17use std::collections::VecDeque;
18
19use dashmap::DashMap;
20
21/// Maximum agent-driven patches per surface before the runtime
22/// hard-stops the loop. Picked to be comfortably larger than the
23/// strategy library's typical convergence depth (1-2 rounds for the
24/// builtin strategies, 3-5 plausibly for richer libraries) while
25/// being tight enough that an oscillating or buggy strategy can't
26/// run away. Tunable per-process by replacing the
27/// `IterationBudget::new` call with `IterationBudget::with_max`.
28pub const DEFAULT_MAX_ITERATIONS: u32 = 20;
29
30/// Number of recent patch hashes retained per surface. Tuned for
31/// the v1 loop where strategies converge in 1-2 rounds; oscillation
32/// signals are typically `last == current`, so a small window is
33/// plenty. Bumping this hurts cache locality without paying for
34/// itself until patch graphs get wider.
35pub const HISTORY_DEPTH: usize = 6;
36
37/// Per-surface oscillation detector. Records recent `patch_hash`
38/// values from `Decision::Patch` outcomes and reports whether a
39/// fresh proposal would repeat one we've seen recently — that's
40/// the A→B→A pattern the loop must not commit to.
41///
42/// Thread-safety: backed by `DashMap` so the handler can hold it
43/// behind `Arc` and call `check_and_record` from any async context
44/// without taking a tokio Mutex first.
45#[derive(Default)]
46pub struct OscillationDetector {
47 history: DashMap<String, VecDeque<u64>>,
48}
49
50impl OscillationDetector {
51 pub fn new() -> Self {
52 Self::default()
53 }
54
55 /// Test whether `patch_hash` would oscillate against the
56 /// surface's recent history, and either record it (no
57 /// oscillation, proceed) or leave history untouched
58 /// (oscillation, caller suppresses the apply).
59 ///
60 /// Returns `true` when the caller should APPLY the patch.
61 /// Returns `false` when the patch repeats one in the recent
62 /// window — caller logs and skips.
63 pub fn check_and_record(&self, surface_id: &str, patch_hash: u64) -> bool {
64 let mut entry = self
65 .history
66 .entry(surface_id.to_string())
67 .or_insert_with(|| VecDeque::with_capacity(HISTORY_DEPTH));
68 if entry.iter().any(|h| *h == patch_hash) {
69 // Caller will skip; don't record so the window slides
70 // forward only on actual applies.
71 return false;
72 }
73 if entry.len() >= HISTORY_DEPTH {
74 entry.pop_front();
75 }
76 entry.push_back(patch_hash);
77 true
78 }
79
80 /// Clear history for a surface — use when the surface is
81 /// recreated (sequence counter resets, prior history is
82 /// stale).
83 pub fn reset(&self, surface_id: &str) {
84 self.history.remove(surface_id);
85 }
86
87 /// How many entries are tracked for a surface. Test-only
88 /// surface; not part of the steady-state hot path.
89 #[cfg(test)]
90 pub fn depth(&self, surface_id: &str) -> usize {
91 self.history.get(surface_id).map(|e| e.len()).unwrap_or(0)
92 }
93}
94
95/// Per-surface iteration budget. Counts agent-driven patches per
96/// surface; once a surface hits the cap, the handler short-circuits
97/// the agent entirely until the surface is reset (typically via
98/// `delete_surface` + `create_surface`, or an explicit `reset`).
99///
100/// The budget is in-memory: a single-process daemon keeps its
101/// counters for the lifetime of the process. A durable backend
102/// using `car-state::StateStore::durable` is a v2 — the runtime's
103/// transient strategy library doesn't yet justify persisting
104/// iteration counts across restarts.
105pub struct IterationBudget {
106 counts: DashMap<String, u32>,
107 max_iterations: u32,
108}
109
110impl IterationBudget {
111 pub fn new() -> Self {
112 Self::with_max(DEFAULT_MAX_ITERATIONS)
113 }
114
115 pub fn with_max(max: u32) -> Self {
116 Self {
117 counts: DashMap::new(),
118 max_iterations: max,
119 }
120 }
121
122 /// Atomic check-and-increment. Returns `true` when the surface
123 /// had budget AND the count has been bumped to claim it;
124 /// `false` when the surface is at the cap and the caller must
125 /// skip the agent invocation. The two steps happen under
126 /// DashMap's per-shard lock so concurrent reports for the same
127 /// surface can't both pass at `count = max - 1`.
128 ///
129 /// Pairs with `refund` for the failed-apply path: the budget
130 /// is consumed up-front; if the patch apply errors, the
131 /// handler calls `refund` to release the slot. That keeps the
132 /// "budget reflects successful applies" invariant while
133 /// eliminating the original `has_budget + record_apply` race.
134 pub fn try_consume(&self, surface_id: &str) -> bool {
135 let mut entry = self.counts.entry(surface_id.to_string()).or_insert(0);
136 if *entry >= self.max_iterations {
137 return false;
138 }
139 *entry += 1;
140 true
141 }
142
143 /// Release a budget slot consumed by `try_consume` when the
144 /// downstream apply turned out to fail. Saturates at zero so a
145 /// stray refund without a matching consume can't drive the
146 /// counter negative.
147 pub fn refund(&self, surface_id: &str) {
148 let mut entry = self.counts.entry(surface_id.to_string()).or_insert(0);
149 *entry = entry.saturating_sub(1);
150 }
151
152 /// Clear the surface's counter. Surface recreation triggers
153 /// this via the caller; the loop is allowed to start over.
154 pub fn reset(&self, surface_id: &str) {
155 self.counts.remove(surface_id);
156 }
157
158 /// Current iteration count for a surface. Public so the
159 /// hard-stop log can include it.
160 pub fn count(&self, surface_id: &str) -> u32 {
161 self.counts.get(surface_id).map(|c| *c).unwrap_or(0)
162 }
163
164 pub fn max(&self) -> u32 {
165 self.max_iterations
166 }
167}
168
169impl Default for IterationBudget {
170 fn default() -> Self {
171 Self::new()
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178
179 #[test]
180 fn first_patch_records_and_applies() {
181 let det = OscillationDetector::new();
182 assert!(det.check_and_record("s", 42));
183 assert_eq!(det.depth("s"), 1);
184 }
185
186 #[test]
187 fn same_patch_repeated_blocks_apply() {
188 let det = OscillationDetector::new();
189 assert!(det.check_and_record("s", 42));
190 // Second time around — oscillation, suppress.
191 assert!(!det.check_and_record("s", 42));
192 // History should still show only one entry — we don't
193 // record on skipped applies.
194 assert_eq!(det.depth("s"), 1);
195 }
196
197 #[test]
198 fn distinct_patches_compose_until_window_fills() {
199 let det = OscillationDetector::new();
200 for i in 0..(HISTORY_DEPTH as u64) {
201 assert!(det.check_and_record("s", i));
202 }
203 assert_eq!(det.depth("s"), HISTORY_DEPTH);
204 // One more pushes the oldest out — old hash 0 is now
205 // forgotten and would apply again (caller doesn't, in
206 // practice, because the agent's no-double-patch guard
207 // catches same-sequence repeats; this is the deeper
208 // backoff for cross-sequence A→B→A).
209 let next = HISTORY_DEPTH as u64;
210 assert!(det.check_and_record("s", next));
211 assert_eq!(det.depth("s"), HISTORY_DEPTH);
212 // Hash 0 fell out — would be allowed if proposed again.
213 assert!(det.check_and_record("s", 0));
214 }
215
216 #[test]
217 fn detects_a_b_a_pattern() {
218 let det = OscillationDetector::new();
219 let a: u64 = 0xAAAA;
220 let b: u64 = 0xBBBB;
221 assert!(det.check_and_record("s", a)); // A applied
222 assert!(det.check_and_record("s", b)); // B applied
223 // C is fine — fresh hash
224 assert!(det.check_and_record("s", 0xCCCC));
225 // Re-proposing A within the window — block.
226 assert!(!det.check_and_record("s", a));
227 }
228
229 #[test]
230 fn reset_clears_history() {
231 let det = OscillationDetector::new();
232 det.check_and_record("s", 1);
233 det.check_and_record("s", 2);
234 det.reset("s");
235 assert_eq!(det.depth("s"), 0);
236 // After reset, the old hashes are free to apply again.
237 assert!(det.check_and_record("s", 1));
238 }
239
240 // --- IterationBudget tests ---
241
242 #[test]
243 fn iteration_budget_starts_full() {
244 let b = IterationBudget::with_max(3);
245 assert_eq!(b.count("s"), 0);
246 assert!(b.try_consume("s"));
247 assert_eq!(b.count("s"), 1);
248 }
249
250 #[test]
251 fn iteration_budget_caps_after_max() {
252 let b = IterationBudget::with_max(2);
253 assert!(b.try_consume("s"));
254 assert!(b.try_consume("s"));
255 // Cap reached — further try_consume calls return false
256 // and DO NOT bump the counter.
257 assert!(!b.try_consume("s"));
258 assert!(!b.try_consume("s"));
259 assert_eq!(b.count("s"), 2);
260 }
261
262 #[test]
263 fn iteration_budget_reset_clears() {
264 let b = IterationBudget::with_max(2);
265 b.try_consume("s");
266 b.try_consume("s");
267 assert!(!b.try_consume("s"));
268 b.reset("s");
269 assert!(b.try_consume("s"));
270 assert_eq!(b.count("s"), 1);
271 }
272
273 #[test]
274 fn iteration_budget_per_surface() {
275 let b = IterationBudget::with_max(1);
276 assert!(b.try_consume("a"));
277 assert!(!b.try_consume("a"));
278 assert!(b.try_consume("b"), "budget is per-surface");
279 }
280
281 #[test]
282 fn iteration_budget_refund_releases_slot() {
283 let b = IterationBudget::with_max(2);
284 assert!(b.try_consume("s"));
285 assert!(b.try_consume("s"));
286 assert!(!b.try_consume("s"), "at cap");
287 b.refund("s");
288 assert!(b.try_consume("s"), "refund opens a slot back up");
289 }
290
291 #[test]
292 fn iteration_budget_refund_saturates_at_zero() {
293 let b = IterationBudget::with_max(2);
294 // No consume yet — stray refund should not underflow.
295 b.refund("s");
296 b.refund("s");
297 assert_eq!(b.count("s"), 0);
298 assert!(b.try_consume("s"));
299 }
300
301 #[test]
302 fn iteration_budget_concurrent_consume_respects_cap() {
303 use std::sync::Arc;
304 // 32 threads all try to consume on the same surface
305 // simultaneously. Cap is 10 — exactly 10 should succeed,
306 // no more.
307 let b = Arc::new(IterationBudget::with_max(10));
308 let mut handles = vec![];
309 for _ in 0..32 {
310 let b = b.clone();
311 handles.push(std::thread::spawn(move || b.try_consume("s")));
312 }
313 let successes = handles
314 .into_iter()
315 .map(|h| h.join().unwrap())
316 .filter(|ok| *ok)
317 .count();
318 assert_eq!(successes, 10, "exactly cap-many consumes must succeed");
319 assert_eq!(b.count("s"), 10);
320 }
321
322 #[test]
323 fn per_surface_history_isolated() {
324 let det = OscillationDetector::new();
325 det.check_and_record("a", 42);
326 det.check_and_record("b", 42);
327 // Same hash on a different surface is independent.
328 assert!(!det.check_and_record("a", 42));
329 assert!(!det.check_and_record("b", 42));
330 }
331}