car_server_core/ui_agent_loop.rs
1//! Runtime-side convergence machinery for the UI-improvement loop.
2//!
3//! The agent itself is stateless about loop-level concerns: it
4//! decides one patch at a time, on the current report, with a
5//! double-patch guard for the same render. What it cannot see is
6//! the *temporal* shape of its own decisions across renders —
7//! whether successive reports are walking through the same patch
8//! over and over (A→B→A oscillation), or whether the loop is
9//! making forward progress.
10//!
11//! That observation belongs to the caller, per neo's review:
12//! "controllers use workqueue backoff; reconcilers stay stateless."
13//! This module owns the per-surface history needed to detect
14//! oscillation and short-circuit the apply path when one is
15//! suspected.
16
17use std::collections::VecDeque;
18
19use dashmap::DashMap;
20
21/// Maximum agent-driven patches per surface before the runtime
22/// hard-stops the loop. Picked to be comfortably larger than the
23/// strategy library's typical convergence depth (1-2 rounds for the
24/// builtin strategies, 3-5 plausibly for richer libraries) while
25/// being tight enough that an oscillating or buggy strategy can't
26/// run away. Tunable per-process by replacing the
27/// `IterationBudget::new` call with `IterationBudget::with_max`.
28pub const DEFAULT_MAX_ITERATIONS: u32 = 20;
29
30/// Number of recent patch hashes retained per surface. Tuned for
31/// the v1 loop where strategies converge in 1-2 rounds; oscillation
32/// signals are typically `last == current`, so a small window is
33/// plenty. Bumping this hurts cache locality without paying for
34/// itself until patch graphs get wider.
35pub const HISTORY_DEPTH: usize = 6;
36
37/// Per-surface oscillation detector. Records recent `patch_hash`
38/// values from `Decision::Patch` outcomes and reports whether a
39/// fresh proposal would repeat one we've seen recently — that's
40/// the A→B→A pattern the loop must not commit to.
41///
42/// Thread-safety: backed by `DashMap` so the handler can hold it
43/// behind `Arc` and call `check_and_record` from any async context
44/// without taking a tokio Mutex first.
45#[derive(Default)]
46pub struct OscillationDetector {
47 history: DashMap<String, VecDeque<u64>>,
48}
49
50impl OscillationDetector {
51 pub fn new() -> Self {
52 Self::default()
53 }
54
55 /// Test whether `patch_hash` would oscillate against the
56 /// surface's recent history, and either record it (no
57 /// oscillation, proceed) or leave history untouched
58 /// (oscillation, caller suppresses the apply).
59 ///
60 /// Returns `true` when the caller should APPLY the patch.
61 /// Returns `false` when the patch repeats one in the recent
62 /// window — caller logs and skips.
63 pub fn check_and_record(&self, surface_id: &str, patch_hash: u64) -> bool {
64 let mut entry = self
65 .history
66 .entry(surface_id.to_string())
67 .or_insert_with(|| VecDeque::with_capacity(HISTORY_DEPTH));
68 if entry.iter().any(|h| *h == patch_hash) {
69 // Caller will skip; don't record so the window slides
70 // forward only on actual applies.
71 return false;
72 }
73 if entry.len() >= HISTORY_DEPTH {
74 entry.pop_front();
75 }
76 entry.push_back(patch_hash);
77 true
78 }
79
80 /// Clear history for a surface — use when the surface is
81 /// recreated (sequence counter resets, prior history is
82 /// stale).
83 pub fn reset(&self, surface_id: &str) {
84 self.history.remove(surface_id);
85 }
86
87 /// How many entries are tracked for a surface. Test-only
88 /// surface; not part of the steady-state hot path.
89 #[cfg(test)]
90 pub fn depth(&self, surface_id: &str) -> usize {
91 self.history
92 .get(surface_id)
93 .map(|e| e.len())
94 .unwrap_or(0)
95 }
96}
97
98/// Per-surface iteration budget. Counts agent-driven patches per
99/// surface; once a surface hits the cap, the handler short-circuits
100/// the agent entirely until the surface is reset (typically via
101/// `delete_surface` + `create_surface`, or an explicit `reset`).
102///
103/// The budget is in-memory: a single-process daemon keeps its
104/// counters for the lifetime of the process. A durable backend
105/// using `car-state::StateStore::durable` is a v2 — the runtime's
106/// transient strategy library doesn't yet justify persisting
107/// iteration counts across restarts.
108pub struct IterationBudget {
109 counts: DashMap<String, u32>,
110 max_iterations: u32,
111}
112
113impl IterationBudget {
114 pub fn new() -> Self {
115 Self::with_max(DEFAULT_MAX_ITERATIONS)
116 }
117
118 pub fn with_max(max: u32) -> Self {
119 Self {
120 counts: DashMap::new(),
121 max_iterations: max,
122 }
123 }
124
125 /// Atomic check-and-increment. Returns `true` when the surface
126 /// had budget AND the count has been bumped to claim it;
127 /// `false` when the surface is at the cap and the caller must
128 /// skip the agent invocation. The two steps happen under
129 /// DashMap's per-shard lock so concurrent reports for the same
130 /// surface can't both pass at `count = max - 1`.
131 ///
132 /// Pairs with `refund` for the failed-apply path: the budget
133 /// is consumed up-front; if the patch apply errors, the
134 /// handler calls `refund` to release the slot. That keeps the
135 /// "budget reflects successful applies" invariant while
136 /// eliminating the original `has_budget + record_apply` race.
137 pub fn try_consume(&self, surface_id: &str) -> bool {
138 let mut entry = self.counts.entry(surface_id.to_string()).or_insert(0);
139 if *entry >= self.max_iterations {
140 return false;
141 }
142 *entry += 1;
143 true
144 }
145
146 /// Release a budget slot consumed by `try_consume` when the
147 /// downstream apply turned out to fail. Saturates at zero so a
148 /// stray refund without a matching consume can't drive the
149 /// counter negative.
150 pub fn refund(&self, surface_id: &str) {
151 let mut entry = self.counts.entry(surface_id.to_string()).or_insert(0);
152 *entry = entry.saturating_sub(1);
153 }
154
155 /// Clear the surface's counter. Surface recreation triggers
156 /// this via the caller; the loop is allowed to start over.
157 pub fn reset(&self, surface_id: &str) {
158 self.counts.remove(surface_id);
159 }
160
161 /// Current iteration count for a surface. Public so the
162 /// hard-stop log can include it.
163 pub fn count(&self, surface_id: &str) -> u32 {
164 self.counts.get(surface_id).map(|c| *c).unwrap_or(0)
165 }
166
167 pub fn max(&self) -> u32 {
168 self.max_iterations
169 }
170}
171
172impl Default for IterationBudget {
173 fn default() -> Self {
174 Self::new()
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181
182 #[test]
183 fn first_patch_records_and_applies() {
184 let det = OscillationDetector::new();
185 assert!(det.check_and_record("s", 42));
186 assert_eq!(det.depth("s"), 1);
187 }
188
189 #[test]
190 fn same_patch_repeated_blocks_apply() {
191 let det = OscillationDetector::new();
192 assert!(det.check_and_record("s", 42));
193 // Second time around — oscillation, suppress.
194 assert!(!det.check_and_record("s", 42));
195 // History should still show only one entry — we don't
196 // record on skipped applies.
197 assert_eq!(det.depth("s"), 1);
198 }
199
200 #[test]
201 fn distinct_patches_compose_until_window_fills() {
202 let det = OscillationDetector::new();
203 for i in 0..(HISTORY_DEPTH as u64) {
204 assert!(det.check_and_record("s", i));
205 }
206 assert_eq!(det.depth("s"), HISTORY_DEPTH);
207 // One more pushes the oldest out — old hash 0 is now
208 // forgotten and would apply again (caller doesn't, in
209 // practice, because the agent's no-double-patch guard
210 // catches same-sequence repeats; this is the deeper
211 // backoff for cross-sequence A→B→A).
212 let next = HISTORY_DEPTH as u64;
213 assert!(det.check_and_record("s", next));
214 assert_eq!(det.depth("s"), HISTORY_DEPTH);
215 // Hash 0 fell out — would be allowed if proposed again.
216 assert!(det.check_and_record("s", 0));
217 }
218
219 #[test]
220 fn detects_a_b_a_pattern() {
221 let det = OscillationDetector::new();
222 let a: u64 = 0xAAAA;
223 let b: u64 = 0xBBBB;
224 assert!(det.check_and_record("s", a)); // A applied
225 assert!(det.check_and_record("s", b)); // B applied
226 // C is fine — fresh hash
227 assert!(det.check_and_record("s", 0xCCCC));
228 // Re-proposing A within the window — block.
229 assert!(!det.check_and_record("s", a));
230 }
231
232 #[test]
233 fn reset_clears_history() {
234 let det = OscillationDetector::new();
235 det.check_and_record("s", 1);
236 det.check_and_record("s", 2);
237 det.reset("s");
238 assert_eq!(det.depth("s"), 0);
239 // After reset, the old hashes are free to apply again.
240 assert!(det.check_and_record("s", 1));
241 }
242
243 // --- IterationBudget tests ---
244
245 #[test]
246 fn iteration_budget_starts_full() {
247 let b = IterationBudget::with_max(3);
248 assert_eq!(b.count("s"), 0);
249 assert!(b.try_consume("s"));
250 assert_eq!(b.count("s"), 1);
251 }
252
253 #[test]
254 fn iteration_budget_caps_after_max() {
255 let b = IterationBudget::with_max(2);
256 assert!(b.try_consume("s"));
257 assert!(b.try_consume("s"));
258 // Cap reached — further try_consume calls return false
259 // and DO NOT bump the counter.
260 assert!(!b.try_consume("s"));
261 assert!(!b.try_consume("s"));
262 assert_eq!(b.count("s"), 2);
263 }
264
265 #[test]
266 fn iteration_budget_reset_clears() {
267 let b = IterationBudget::with_max(2);
268 b.try_consume("s");
269 b.try_consume("s");
270 assert!(!b.try_consume("s"));
271 b.reset("s");
272 assert!(b.try_consume("s"));
273 assert_eq!(b.count("s"), 1);
274 }
275
276 #[test]
277 fn iteration_budget_per_surface() {
278 let b = IterationBudget::with_max(1);
279 assert!(b.try_consume("a"));
280 assert!(!b.try_consume("a"));
281 assert!(b.try_consume("b"), "budget is per-surface");
282 }
283
284 #[test]
285 fn iteration_budget_refund_releases_slot() {
286 let b = IterationBudget::with_max(2);
287 assert!(b.try_consume("s"));
288 assert!(b.try_consume("s"));
289 assert!(!b.try_consume("s"), "at cap");
290 b.refund("s");
291 assert!(b.try_consume("s"), "refund opens a slot back up");
292 }
293
294 #[test]
295 fn iteration_budget_refund_saturates_at_zero() {
296 let b = IterationBudget::with_max(2);
297 // No consume yet — stray refund should not underflow.
298 b.refund("s");
299 b.refund("s");
300 assert_eq!(b.count("s"), 0);
301 assert!(b.try_consume("s"));
302 }
303
304 #[test]
305 fn iteration_budget_concurrent_consume_respects_cap() {
306 use std::sync::Arc;
307 // 32 threads all try to consume on the same surface
308 // simultaneously. Cap is 10 — exactly 10 should succeed,
309 // no more.
310 let b = Arc::new(IterationBudget::with_max(10));
311 let mut handles = vec![];
312 for _ in 0..32 {
313 let b = b.clone();
314 handles.push(std::thread::spawn(move || b.try_consume("s")));
315 }
316 let successes = handles
317 .into_iter()
318 .map(|h| h.join().unwrap())
319 .filter(|ok| *ok)
320 .count();
321 assert_eq!(successes, 10, "exactly cap-many consumes must succeed");
322 assert_eq!(b.count("s"), 10);
323 }
324
325 #[test]
326 fn per_surface_history_isolated() {
327 let det = OscillationDetector::new();
328 det.check_and_record("a", 42);
329 det.check_and_record("b", 42);
330 // Same hash on a different surface is independent.
331 assert!(!det.check_and_record("a", 42));
332 assert!(!det.check_and_record("b", 42));
333 }
334}