Skip to main content

pi/
hostcall_amac.rs

1//! AMAC-style interleaved hostcall batch executor with stall-aware toggling.
2//!
3//! AMAC (Asynchronous Memory Access Chaining) interleaves multiple independent
4//! hostcall state machines per scheduler tick to hide memory-access latency.
5//! When the working set exceeds the LLC, sequential dispatch stalls on cache
6//! misses; interleaving lets one request's computation overlap another's
7//! memory fetch, improving throughput by up to the memory-level parallelism
8//! ratio.
9//!
10//! The executor dynamically toggles between batched-interleaved and sequential
11//! dispatch based on observed per-call timing telemetry as a proxy for LLC miss
12//! rates and stall cycles.
13
14use crate::extensions_js::HostcallKind;
15use crate::extensions_js::HostcallRequest;
16use crate::scheduler::HostcallOutcome;
17use serde::{Deserialize, Serialize};
18
19// ── Configuration constants ──────────────────────────────────────────────
20
21/// Minimum batch size to consider AMAC interleaving (below this, sequential
22/// dispatch has less overhead).
23const AMAC_MIN_BATCH_SIZE: usize = 4;
24
25/// Maximum number of in-flight state machines per interleave round.
26const AMAC_MAX_INTERLEAVE_WIDTH: usize = 16;
27
28/// Stall-detection threshold: if a request takes longer than this many
29/// nanoseconds, it's treated as a "stall" (likely LLC miss or IO wait).
30const AMAC_STALL_THRESHOLD_NS: u64 = 100_000; // 100us
31
32/// Exponential moving average decay factor (fixed-point, 0..256 maps to 0..1).
33/// EMA_ALPHA=51 ≈ 0.2, giving 80% weight to history.
34const EMA_ALPHA: u64 = 51;
35const EMA_SCALE: u64 = 256;
36
37/// Minimum stall ratio (fixed-point, 0..1000) to enable AMAC interleaving.
38/// 200 = 20% stall rate.
39const AMAC_STALL_RATIO_THRESHOLD: u64 = 200;
40
41/// Maximum ratio (fixed-point, 0..1000) above which we assume all calls are
42/// memory-bound and interleaving provides maximum benefit.
43const AMAC_STALL_RATIO_SATURATED: u64 = 800;
44
45/// How many recent timing samples to retain for decision-making.
46const TELEMETRY_WINDOW_SIZE: usize = 64;
47
48// ── Core types ───────────────────────────────────────────────────────────
49
50/// Grouping key for batching compatible hostcall requests together.
51#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
52pub enum AmacGroupKey {
53    /// Session read operations (get_state, get_messages, etc.) - highly batchable.
54    SessionRead,
55    /// Session write operations (set_model, set_name, etc.) - must preserve order.
56    SessionWrite,
57    /// Event queries (get_model, get_flag, list_flags) - batchable.
58    EventRead,
59    /// Event mutations (set_model, register_*) - preserve order.
60    EventWrite,
61    /// Tool invocations - independent, can interleave.
62    Tool,
63    /// Exec invocations - independent but may have side effects.
64    Exec,
65    /// HTTP requests - independent, high latency → good AMAC candidates.
66    Http,
67    /// UI operations - typically sequential.
68    Ui,
69    /// Log operations - fire-and-forget, trivially batchable.
70    Log,
71}
72
73impl AmacGroupKey {
74    /// Classify a hostcall request into its batch group.
75    #[must_use]
76    pub fn from_request(request: &HostcallRequest) -> Self {
77        match &request.kind {
78            HostcallKind::Session { op } => {
79                if is_session_read_op(op) {
80                    Self::SessionRead
81                } else {
82                    Self::SessionWrite
83                }
84            }
85            HostcallKind::Events { op } => {
86                if is_event_read_op(op) {
87                    Self::EventRead
88                } else {
89                    Self::EventWrite
90                }
91            }
92            HostcallKind::Tool { .. } => Self::Tool,
93            HostcallKind::Exec { .. } => Self::Exec,
94            HostcallKind::Http => Self::Http,
95            HostcallKind::Ui { .. } => Self::Ui,
96            HostcallKind::Log => Self::Log,
97        }
98    }
99
100    /// Whether requests in this group are safe to interleave (no ordering
101    /// dependencies within the group).
102    #[must_use]
103    pub const fn interleave_safe(&self) -> bool {
104        matches!(
105            self,
106            Self::SessionRead | Self::EventRead | Self::Tool | Self::Http | Self::Log
107        )
108    }
109
110    /// Estimated memory-boundedness weight for this group (0..100).
111    /// Higher means more likely to benefit from interleaving.
112    #[must_use]
113    pub const fn memory_weight(&self) -> u32 {
114        match self {
115            Self::Http => 90,              // Network IO = high stall
116            Self::Tool | Self::Exec => 70, // File IO or subprocess
117            Self::SessionRead => 50,       // In-memory but large working set
118            Self::EventRead => 40,         // Small working set, fast
119            Self::SessionWrite => 30,
120            Self::EventWrite => 20,
121            Self::Ui => 10,
122            Self::Log => 5,
123        }
124    }
125}
126
127/// A group of hostcall requests that share a batch key and can be
128/// dispatched together.
129#[derive(Debug)]
130pub struct AmacBatchGroup {
131    /// The batch key for this group.
132    pub key: AmacGroupKey,
133    /// Requests in this group, in original drain order.
134    pub requests: Vec<HostcallRequest>,
135}
136
137impl AmacBatchGroup {
138    #[must_use]
139    pub fn len(&self) -> usize {
140        self.requests.len()
141    }
142
143    #[must_use]
144    pub fn is_empty(&self) -> bool {
145        self.requests.is_empty()
146    }
147}
148
149/// The decision of whether to use AMAC interleaving or sequential dispatch.
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
151pub enum AmacToggleDecision {
152    /// Use interleaved dispatch for this batch.
153    Interleave {
154        /// Number of concurrent state machines per round.
155        width: usize,
156    },
157    /// Use sequential dispatch (AMAC overhead not justified).
158    Sequential {
159        /// Reason for falling back.
160        reason: &'static str,
161    },
162}
163
164impl AmacToggleDecision {
165    #[must_use]
166    pub const fn is_interleave(&self) -> bool {
167        matches!(self, Self::Interleave { .. })
168    }
169}
170
171/// Per-call timing observation for stall detection.
172#[derive(Debug, Clone, Copy)]
173struct TimingSample {
174    /// Wall-clock nanoseconds for this dispatch.
175    elapsed_ns: u64,
176    /// Whether this was classified as a stall.
177    stalled: bool,
178}
179
180/// Stall telemetry tracker using exponential moving averages.
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct AmacStallTelemetry {
183    /// EMA of per-call dispatch time (nanoseconds, fixed-point ×256).
184    ema_elapsed_scaled: u64,
185    /// EMA of stall ratio (fixed-point, 0..1000 ×256).
186    ema_stall_ratio_scaled: u64,
187    /// Total calls observed.
188    total_calls: u64,
189    /// Total stalls observed.
190    total_stalls: u64,
191    /// Recent timing window for variance estimation.
192    #[serde(skip)]
193    recent_samples: Vec<TimingSample>,
194    /// Stall threshold in nanoseconds.
195    stall_threshold_ns: u64,
196    /// Number of AMAC toggle decisions made.
197    pub toggle_decisions: u64,
198    /// Number of times AMAC was selected over sequential.
199    pub interleave_selections: u64,
200}
201
202impl Default for AmacStallTelemetry {
203    fn default() -> Self {
204        Self::new(AMAC_STALL_THRESHOLD_NS)
205    }
206}
207
208impl AmacStallTelemetry {
209    #[must_use]
210    pub fn new(stall_threshold_ns: u64) -> Self {
211        Self {
212            ema_elapsed_scaled: 0,
213            ema_stall_ratio_scaled: 0,
214            total_calls: 0,
215            total_stalls: 0,
216            recent_samples: Vec::with_capacity(TELEMETRY_WINDOW_SIZE),
217            stall_threshold_ns,
218            toggle_decisions: 0,
219            interleave_selections: 0,
220        }
221    }
222
223    /// Record a timing observation.
224    pub fn record(&mut self, elapsed_ns: u64) {
225        let stalled = elapsed_ns > self.stall_threshold_ns;
226        self.total_calls = self.total_calls.saturating_add(1);
227        if stalled {
228            self.total_stalls = self.total_stalls.saturating_add(1);
229        }
230
231        // Update EMA for elapsed time.
232        let scaled_elapsed = elapsed_ns.saturating_mul(EMA_SCALE);
233        self.ema_elapsed_scaled = if self.total_calls == 1 {
234            scaled_elapsed
235        } else {
236            // EMA = alpha * new + (1 - alpha) * old
237            let alpha_new = scaled_elapsed.saturating_mul(EMA_ALPHA) / EMA_SCALE;
238            let alpha_old = self
239                .ema_elapsed_scaled
240                .saturating_mul(EMA_SCALE.saturating_sub(EMA_ALPHA))
241                / EMA_SCALE;
242            alpha_new.saturating_add(alpha_old)
243        };
244
245        // Update EMA for stall ratio.
246        let stall_point = if stalled { 1000 * EMA_SCALE } else { 0 };
247        self.ema_stall_ratio_scaled = if self.total_calls == 1 {
248            stall_point
249        } else {
250            let alpha_new = stall_point.saturating_mul(EMA_ALPHA) / EMA_SCALE;
251            let alpha_old = self
252                .ema_stall_ratio_scaled
253                .saturating_mul(EMA_SCALE.saturating_sub(EMA_ALPHA))
254                / EMA_SCALE;
255            alpha_new.saturating_add(alpha_old)
256        };
257
258        // Maintain sliding window.
259        let sample = TimingSample {
260            elapsed_ns,
261            stalled,
262        };
263        if self.recent_samples.len() >= TELEMETRY_WINDOW_SIZE {
264            self.recent_samples.remove(0);
265        }
266        self.recent_samples.push(sample);
267    }
268
269    /// Current smoothed stall ratio (0..1000).
270    #[must_use]
271    pub fn stall_ratio(&self) -> u64 {
272        self.ema_stall_ratio_scaled / EMA_SCALE.max(1)
273    }
274
275    /// Current smoothed average elapsed nanoseconds.
276    #[must_use]
277    pub fn avg_elapsed_ns(&self) -> u64 {
278        self.ema_elapsed_scaled / EMA_SCALE.max(1)
279    }
280
281    /// Variance of recent timing samples (nanoseconds squared).
282    #[must_use]
283    pub fn recent_variance(&self) -> u64 {
284        if self.recent_samples.len() < 2 {
285            return 0;
286        }
287        let n = self.recent_samples.len() as u64;
288        let sum: u64 = self
289            .recent_samples
290            .iter()
291            .fold(0u64, |acc, sample| acc.saturating_add(sample.elapsed_ns));
292        let mean = sum / n;
293        let variance_u128 = self
294            .recent_samples
295            .iter()
296            .map(|sample| {
297                let diff = u128::from(sample.elapsed_ns.abs_diff(mean));
298                diff * diff
299            })
300            .fold(0u128, u128::saturating_add)
301            / u128::from(n);
302        u64::try_from(variance_u128).unwrap_or(u64::MAX)
303    }
304
305    /// Snapshot of current telemetry state.
306    #[must_use]
307    pub fn snapshot(&self) -> AmacStallTelemetrySnapshot {
308        AmacStallTelemetrySnapshot {
309            stall_ratio: self.stall_ratio(),
310            avg_elapsed_ns: self.avg_elapsed_ns(),
311            recent_variance: self.recent_variance(),
312            total_calls: self.total_calls,
313            total_stalls: self.total_stalls,
314            stall_threshold_ns: self.stall_threshold_ns,
315            toggle_decisions: self.toggle_decisions,
316            interleave_selections: self.interleave_selections,
317            recent_window_size: self.recent_samples.len(),
318        }
319    }
320}
321
322/// Immutable snapshot of stall telemetry for reporting.
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct AmacStallTelemetrySnapshot {
325    pub stall_ratio: u64,
326    pub avg_elapsed_ns: u64,
327    pub recent_variance: u64,
328    pub total_calls: u64,
329    pub total_stalls: u64,
330    pub stall_threshold_ns: u64,
331    pub toggle_decisions: u64,
332    pub interleave_selections: u64,
333    pub recent_window_size: usize,
334}
335
336// ── Batch plan ───────────────────────────────────────────────────────────
337
338/// Execution plan for a batch of hostcall requests.
339#[derive(Debug)]
340pub struct AmacBatchPlan {
341    /// Groups to dispatch, in priority order.
342    pub groups: Vec<AmacBatchGroup>,
343    /// Per-group toggle decisions.
344    pub decisions: Vec<AmacToggleDecision>,
345    /// Total requests in the batch.
346    pub total_requests: usize,
347    /// How many groups will use interleaving.
348    pub interleaved_groups: usize,
349    /// How many groups will use sequential dispatch.
350    pub sequential_groups: usize,
351}
352
353/// Result of executing a batch plan.
354#[derive(Debug)]
355pub struct AmacBatchResult {
356    /// Completed hostcall outcomes, in call_id order for deterministic
357    /// scheduler enqueuing.
358    pub completions: Vec<(String, HostcallOutcome)>,
359    /// Telemetry from this batch execution.
360    pub batch_telemetry: AmacBatchTelemetry,
361}
362
363/// Per-batch execution telemetry.
364#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct AmacBatchTelemetry {
366    pub total_requests: usize,
367    pub groups_dispatched: usize,
368    pub interleaved_groups: usize,
369    pub sequential_groups: usize,
370    pub total_elapsed_ns: u64,
371}
372
373// ── Executor ─────────────────────────────────────────────────────────────
374
375/// AMAC batch executor configuration.
376#[derive(Debug, Clone)]
377pub struct AmacBatchExecutorConfig {
378    /// Minimum batch size to consider interleaving.
379    pub min_batch_size: usize,
380    /// Maximum interleave width (concurrent state machines).
381    pub max_interleave_width: usize,
382    /// Enable/disable AMAC globally.
383    pub enabled: bool,
384    /// Stall classification threshold in nanoseconds.
385    pub stall_threshold_ns: u64,
386    /// Stall-ratio threshold (0..1000) required before AMAC interleaving.
387    pub stall_ratio_threshold: u64,
388}
389
390impl Default for AmacBatchExecutorConfig {
391    fn default() -> Self {
392        Self::from_env()
393    }
394}
395
396impl AmacBatchExecutorConfig {
397    #[must_use]
398    pub fn from_env() -> Self {
399        let enabled = std::env::var("PI_HOSTCALL_AMAC")
400            .ok()
401            .as_deref()
402            .is_none_or(|value| {
403                !matches!(
404                    value.trim().to_ascii_lowercase().as_str(),
405                    "0" | "false" | "off" | "disabled"
406                )
407            });
408        let min_batch_size = std::env::var("PI_HOSTCALL_AMAC_MIN_BATCH")
409            .ok()
410            .and_then(|raw| raw.trim().parse::<usize>().ok())
411            .unwrap_or(AMAC_MIN_BATCH_SIZE)
412            .max(2);
413        let max_interleave_width = std::env::var("PI_HOSTCALL_AMAC_MAX_WIDTH")
414            .ok()
415            .and_then(|raw| raw.trim().parse::<usize>().ok())
416            .unwrap_or(AMAC_MAX_INTERLEAVE_WIDTH)
417            .max(2);
418        let stall_threshold_ns = std::env::var("PI_HOSTCALL_AMAC_STALL_THRESHOLD_NS")
419            .ok()
420            .and_then(|raw| raw.trim().parse::<u64>().ok())
421            .unwrap_or(AMAC_STALL_THRESHOLD_NS)
422            .max(1);
423        let stall_ratio_threshold = std::env::var("PI_HOSTCALL_AMAC_STALL_RATIO_THRESHOLD")
424            .ok()
425            .and_then(|raw| raw.trim().parse::<u64>().ok())
426            .unwrap_or(AMAC_STALL_RATIO_THRESHOLD)
427            .clamp(1, 1_000);
428        Self {
429            min_batch_size,
430            max_interleave_width,
431            enabled,
432            stall_threshold_ns,
433            stall_ratio_threshold,
434        }
435    }
436
437    #[must_use]
438    pub const fn new(enabled: bool, min_batch_size: usize, max_interleave_width: usize) -> Self {
439        Self {
440            min_batch_size,
441            max_interleave_width,
442            enabled,
443            stall_threshold_ns: AMAC_STALL_THRESHOLD_NS,
444            stall_ratio_threshold: AMAC_STALL_RATIO_THRESHOLD,
445        }
446    }
447
448    #[must_use]
449    pub fn with_thresholds(mut self, stall_threshold_ns: u64, stall_ratio_threshold: u64) -> Self {
450        self.stall_threshold_ns = stall_threshold_ns.max(1);
451        self.stall_ratio_threshold = stall_ratio_threshold.clamp(1, 1_000);
452        self
453    }
454}
455
456/// The AMAC batch executor.
457///
458/// Groups incoming hostcall requests by kind, decides per-group whether
459/// interleaving or sequential dispatch is optimal based on stall telemetry,
460/// and produces an execution plan.
461#[derive(Debug, Clone)]
462pub struct AmacBatchExecutor {
463    config: AmacBatchExecutorConfig,
464    telemetry: AmacStallTelemetry,
465}
466
467impl AmacBatchExecutor {
468    #[must_use]
469    pub fn new(config: AmacBatchExecutorConfig) -> Self {
470        Self {
471            telemetry: AmacStallTelemetry::new(config.stall_threshold_ns),
472            config,
473        }
474    }
475
476    #[must_use]
477    pub const fn with_telemetry(
478        config: AmacBatchExecutorConfig,
479        telemetry: AmacStallTelemetry,
480    ) -> Self {
481        Self { config, telemetry }
482    }
483
484    /// Access the current stall telemetry.
485    #[must_use]
486    pub const fn telemetry(&self) -> &AmacStallTelemetry {
487        &self.telemetry
488    }
489
490    /// Mutable access to telemetry for recording observations.
491    pub const fn telemetry_mut(&mut self) -> &mut AmacStallTelemetry {
492        &mut self.telemetry
493    }
494
495    /// Whether AMAC is enabled.
496    #[must_use]
497    pub const fn enabled(&self) -> bool {
498        self.config.enabled
499    }
500
501    /// Group a batch of drained hostcall requests and produce an execution plan.
502    ///
503    /// The plan preserves original request ordering within each group and
504    /// chooses interleave vs. sequential per group based on telemetry.
505    #[must_use]
506    #[allow(clippy::too_many_lines)]
507    pub fn plan_batch(&mut self, requests: Vec<HostcallRequest>) -> AmacBatchPlan {
508        let total_requests = requests.len();
509
510        if !self.config.enabled || total_requests == 0 {
511            return AmacBatchPlan {
512                groups: Vec::new(),
513                decisions: Vec::new(),
514                total_requests,
515                interleaved_groups: 0,
516                sequential_groups: 0,
517            };
518        }
519
520        let mut groups = Vec::new();
521        let mut decisions = Vec::new();
522        let mut interleaved_groups = 0_usize;
523        let mut sequential_groups = 0_usize;
524
525        // Group by contiguous runs of the same key to preserve global ordering,
526        // with "Log Sinking" optimization.
527        let request_iter = requests.into_iter();
528        let mut buffered_logs = Vec::new();
529
530        let mut current_key_opt: Option<AmacGroupKey> = None;
531        let mut current_requests = Vec::new();
532
533        for request in request_iter {
534            let key = AmacGroupKey::from_request(&request);
535
536            if key == AmacGroupKey::Log {
537                buffered_logs.push(request);
538                continue;
539            }
540
541            let key_changed = current_key_opt
542                .as_ref()
543                .is_none_or(|current| *current != key);
544
545            if key_changed {
546                // Flush previous batch if it existed
547                if let Some(prev_key) = current_key_opt.take() {
548                    let decision = self.decide_toggle(&prev_key, current_requests.len());
549                    if decision.is_interleave() {
550                        interleaved_groups += 1;
551                    } else {
552                        sequential_groups += 1;
553                    }
554                    groups.push(AmacBatchGroup {
555                        key: prev_key,
556                        requests: std::mem::take(&mut current_requests),
557                    });
558                    decisions.push(decision);
559
560                    // Flush logs sunk during the previous batch
561                    if !buffered_logs.is_empty() {
562                        let log_reqs = std::mem::take(&mut buffered_logs);
563                        let decision = self.decide_toggle(&AmacGroupKey::Log, log_reqs.len());
564                        if decision.is_interleave() {
565                            interleaved_groups += 1;
566                        } else {
567                            sequential_groups += 1;
568                        }
569                        groups.push(AmacBatchGroup {
570                            key: AmacGroupKey::Log,
571                            requests: log_reqs,
572                        });
573                        decisions.push(decision);
574                    }
575                }
576            }
577
578            // First non-log request, or first request after a flushed run.
579            current_key_opt = Some(key);
580            current_requests.push(request);
581        }
582
583        // Flush final run
584        if let Some(current_key) = current_key_opt {
585            if !current_requests.is_empty() {
586                let decision = self.decide_toggle(&current_key, current_requests.len());
587                if decision.is_interleave() {
588                    interleaved_groups += 1;
589                } else {
590                    sequential_groups += 1;
591                }
592                groups.push(AmacBatchGroup {
593                    key: current_key,
594                    requests: current_requests,
595                });
596                decisions.push(decision);
597            }
598        }
599
600        // Flush trailing logs (or if input was only logs).
601        if !buffered_logs.is_empty() {
602            let decision = self.decide_toggle(&AmacGroupKey::Log, buffered_logs.len());
603            if decision.is_interleave() {
604                interleaved_groups += 1;
605            } else {
606                sequential_groups += 1;
607            }
608            groups.push(AmacBatchGroup {
609                key: AmacGroupKey::Log,
610                requests: buffered_logs,
611            });
612            decisions.push(decision);
613        }
614
615        self.telemetry.toggle_decisions = self
616            .telemetry
617            .toggle_decisions
618            .saturating_add(groups.len() as u64);
619        self.telemetry.interleave_selections = self
620            .telemetry
621            .interleave_selections
622            .saturating_add(interleaved_groups as u64);
623
624        AmacBatchPlan {
625            groups,
626            decisions,
627            total_requests,
628            interleaved_groups,
629            sequential_groups,
630        }
631    }
632
633    /// Decide whether a group should use interleaved or sequential dispatch.
634    fn decide_toggle(&self, key: &AmacGroupKey, group_size: usize) -> AmacToggleDecision {
635        // Rule 1: Too small to benefit from interleaving.
636        if group_size < self.config.min_batch_size {
637            return AmacToggleDecision::Sequential {
638                reason: "batch_too_small",
639            };
640        }
641
642        // Rule 2: Group is not safe to interleave (ordering dependencies).
643        if !key.interleave_safe() {
644            return AmacToggleDecision::Sequential {
645                reason: "ordering_dependency",
646            };
647        }
648
649        // Rule 3: Insufficient telemetry history → conservative sequential.
650        if self.telemetry.total_calls < TELEMETRY_WINDOW_SIZE as u64 {
651            return AmacToggleDecision::Sequential {
652                reason: "insufficient_telemetry",
653            };
654        }
655
656        // Rule 4: Stall ratio below threshold → sequential is fine.
657        let stall_ratio = self.telemetry.stall_ratio();
658        if stall_ratio < self.config.stall_ratio_threshold {
659            return AmacToggleDecision::Sequential {
660                reason: "low_stall_ratio",
661            };
662        }
663
664        // Rule 5: Compute interleave width based on stall severity.
665        let width = compute_interleave_width(
666            stall_ratio,
667            key.memory_weight(),
668            group_size,
669            self.config.max_interleave_width,
670        );
671
672        if width < 2 {
673            return AmacToggleDecision::Sequential {
674                reason: "computed_width_too_low",
675            };
676        }
677
678        AmacToggleDecision::Interleave { width }
679    }
680
681    /// Record a per-call timing observation for stall detection.
682    pub fn observe_call(&mut self, elapsed_ns: u64) {
683        self.telemetry.record(elapsed_ns);
684    }
685}
686
687impl Default for AmacBatchExecutor {
688    fn default() -> Self {
689        Self::new(AmacBatchExecutorConfig::default())
690    }
691}
692
693// ── Helper functions ─────────────────────────────────────────────────────
694
695/// Compute optimal interleave width from stall ratio and group characteristics.
696fn compute_interleave_width(
697    stall_ratio: u64,
698    memory_weight: u32,
699    group_size: usize,
700    max_width: usize,
701) -> usize {
702    // Scale width proportionally to stall severity × memory weight.
703    // At AMAC_STALL_RATIO_SATURATED, we use max_width.
704    let effective_ratio = stall_ratio
705        .saturating_sub(AMAC_STALL_RATIO_THRESHOLD)
706        .min(AMAC_STALL_RATIO_SATURATED - AMAC_STALL_RATIO_THRESHOLD);
707    let ratio_range = AMAC_STALL_RATIO_SATURATED.saturating_sub(AMAC_STALL_RATIO_THRESHOLD);
708
709    // Avoid division by zero.
710    if ratio_range == 0 {
711        return 2;
712    }
713
714    let base_width = 2_u64
715        + (effective_ratio * u64::from(memory_weight) * (max_width as u64 - 2))
716            / (ratio_range * 100);
717
718    // Safe: base_width is bounded by max_width (which fits in usize).
719    let width = usize::try_from(base_width).unwrap_or(max_width);
720    width.min(max_width).min(group_size).max(2)
721}
722
723/// Check if a session operation is read-only.
724fn is_session_read_op(op: &str) -> bool {
725    let normalized = op.trim().to_ascii_lowercase();
726    let normalized = normalized.replace('_', "");
727    matches!(
728        normalized.as_str(),
729        "getstate"
730            | "getmessages"
731            | "getentries"
732            | "getname"
733            | "getmodel"
734            | "getlabel"
735            | "getlabels"
736            | "getallsessions"
737    )
738}
739
740/// Check if an event operation is read-only.
741fn is_event_read_op(op: &str) -> bool {
742    let normalized = op.trim().to_ascii_lowercase();
743    let normalized = normalized.replace('_', "");
744    matches!(
745        normalized.as_str(),
746        "getactivetools"
747            | "getalltools"
748            | "getmodel"
749            | "getthinkinglevel"
750            | "getflag"
751            | "listflags"
752    )
753}
754
755// ── Tests ────────────────────────────────────────────────────────────────
756
757#[cfg(test)]
758mod tests {
759    use super::*;
760    use serde_json::json;
761
762    fn make_request(kind: HostcallKind) -> HostcallRequest {
763        HostcallRequest {
764            call_id: format!("test-{}", rand_id()),
765            kind,
766            payload: json!({}),
767            trace_id: 0,
768            extension_id: None,
769        }
770    }
771
772    fn rand_id() -> u64 {
773        static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
774        COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
775    }
776
777    fn session_read_request() -> HostcallRequest {
778        make_request(HostcallKind::Session {
779            op: "get_state".to_string(),
780        })
781    }
782
783    fn session_write_request() -> HostcallRequest {
784        make_request(HostcallKind::Session {
785            op: "set_model".to_string(),
786        })
787    }
788
789    fn event_read_request() -> HostcallRequest {
790        make_request(HostcallKind::Events {
791            op: "get_model".to_string(),
792        })
793    }
794
795    fn tool_request() -> HostcallRequest {
796        make_request(HostcallKind::Tool {
797            name: "read".to_string(),
798        })
799    }
800
801    fn http_request() -> HostcallRequest {
802        make_request(HostcallKind::Http)
803    }
804
805    fn log_request() -> HostcallRequest {
806        make_request(HostcallKind::Log)
807    }
808
809    // ── AmacGroupKey tests ───────────────────────────────────────────
810
811    #[test]
812    fn group_key_classifies_session_reads_correctly() {
813        let req = session_read_request();
814        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::SessionRead);
815    }
816
817    #[test]
818    fn group_key_classifies_session_writes_correctly() {
819        let req = session_write_request();
820        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::SessionWrite);
821    }
822
823    #[test]
824    fn group_key_classifies_event_reads_correctly() {
825        let req = event_read_request();
826        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::EventRead);
827    }
828
829    #[test]
830    fn group_key_classifies_tools_correctly() {
831        let req = tool_request();
832        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Tool);
833    }
834
835    #[test]
836    fn group_key_classifies_http_correctly() {
837        let req = http_request();
838        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Http);
839    }
840
841    #[test]
842    fn group_key_classifies_log_correctly() {
843        let req = log_request();
844        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Log);
845    }
846
847    #[test]
848    fn interleave_safe_for_read_and_independent_groups() {
849        assert!(AmacGroupKey::SessionRead.interleave_safe());
850        assert!(AmacGroupKey::EventRead.interleave_safe());
851        assert!(AmacGroupKey::Tool.interleave_safe());
852        assert!(AmacGroupKey::Http.interleave_safe());
853        assert!(AmacGroupKey::Log.interleave_safe());
854    }
855
856    #[test]
857    fn interleave_unsafe_for_write_and_ui_groups() {
858        assert!(!AmacGroupKey::SessionWrite.interleave_safe());
859        assert!(!AmacGroupKey::EventWrite.interleave_safe());
860        assert!(!AmacGroupKey::Ui.interleave_safe());
861        assert!(!AmacGroupKey::Exec.interleave_safe());
862    }
863
864    // ── Telemetry tests ──────────────────────────────────────────────
865
866    #[test]
867    fn telemetry_records_and_tracks_stall_ratio() {
868        let mut telemetry = AmacStallTelemetry::new(100_000);
869
870        // Record some fast calls (no stalls).
871        for _ in 0..10 {
872            telemetry.record(50_000);
873        }
874        assert_eq!(telemetry.total_calls, 10);
875        assert_eq!(telemetry.total_stalls, 0);
876        assert!(telemetry.stall_ratio() < AMAC_STALL_RATIO_THRESHOLD);
877
878        // Record some slow calls (stalls).
879        for _ in 0..20 {
880            telemetry.record(200_000);
881        }
882        assert_eq!(telemetry.total_calls, 30);
883        assert_eq!(telemetry.total_stalls, 20);
884        assert!(telemetry.stall_ratio() > 0);
885    }
886
887    #[test]
888    fn telemetry_ema_converges_to_steady_state() {
889        let mut telemetry = AmacStallTelemetry::new(100_000);
890
891        // All fast → stall ratio should converge near 0.
892        for _ in 0..100 {
893            telemetry.record(10_000);
894        }
895        assert!(telemetry.stall_ratio() < 50, "expected low stall ratio");
896
897        // All slow → stall ratio should converge near 1000.
898        for _ in 0..200 {
899            telemetry.record(500_000);
900        }
901        assert!(
902            telemetry.stall_ratio() > 900,
903            "expected high stall ratio, got {}",
904            telemetry.stall_ratio()
905        );
906    }
907
908    #[test]
909    fn telemetry_sliding_window_bounded() {
910        let mut telemetry = AmacStallTelemetry::new(100_000);
911        for i in 0..200 {
912            telemetry.record(i * 1000);
913        }
914        assert_eq!(telemetry.recent_samples.len(), TELEMETRY_WINDOW_SIZE);
915    }
916
917    #[test]
918    fn telemetry_variance_zero_for_constant_input() {
919        let mut telemetry = AmacStallTelemetry::new(100_000);
920        for _ in 0..10 {
921            telemetry.record(50_000);
922        }
923        assert_eq!(telemetry.recent_variance(), 0);
924    }
925
926    #[test]
927    fn telemetry_snapshot_captures_state() {
928        let mut telemetry = AmacStallTelemetry::new(100_000);
929        for _ in 0..5 {
930            telemetry.record(50_000);
931        }
932        let snap = telemetry.snapshot();
933        assert_eq!(snap.total_calls, 5);
934        assert_eq!(snap.total_stalls, 0);
935        assert_eq!(snap.recent_window_size, 5);
936    }
937
938    // ── Executor plan tests ──────────────────────────────────────────
939
940    #[test]
941    fn plan_empty_batch_returns_empty_plan() {
942        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
943        let plan = executor.plan_batch(Vec::new());
944        assert_eq!(plan.total_requests, 0);
945        assert!(plan.groups.is_empty());
946        assert!(plan.decisions.is_empty());
947    }
948
949    #[test]
950    fn plan_disabled_executor_returns_empty_groups() {
951        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(false, 4, 16));
952        let requests = vec![tool_request(), tool_request()];
953        let plan = executor.plan_batch(requests);
954        assert_eq!(plan.total_requests, 2);
955        assert!(plan.groups.is_empty());
956    }
957
958    #[test]
959    fn plan_groups_requests_by_kind() {
960        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
961        let requests = vec![
962            session_read_request(),
963            tool_request(),
964            session_read_request(),
965            http_request(),
966            tool_request(),
967        ];
968        let plan = executor.plan_batch(requests);
969        assert_eq!(plan.total_requests, 5);
970        // Grouping is done by contiguous runs to preserve global ordering.
971        assert_eq!(plan.groups.len(), 5);
972        assert_eq!(plan.groups[0].key, AmacGroupKey::SessionRead);
973        assert_eq!(plan.groups[1].key, AmacGroupKey::Tool);
974        assert_eq!(plan.groups[2].key, AmacGroupKey::SessionRead);
975        assert_eq!(plan.groups[3].key, AmacGroupKey::Http);
976        assert_eq!(plan.groups[4].key, AmacGroupKey::Tool);
977    }
978
979    #[test]
980    fn plan_preserves_intra_group_order() {
981        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
982        let req1 = session_read_request();
983        let req2 = session_read_request();
984        let id1 = req1.call_id.clone();
985        let id2 = req2.call_id.clone();
986
987        let requests = vec![req1, req2];
988        let plan = executor.plan_batch(requests);
989        assert_eq!(plan.groups.len(), 1);
990        assert_eq!(plan.groups[0].requests[0].call_id, id1);
991        assert_eq!(plan.groups[0].requests[1].call_id, id2);
992    }
993
994    #[test]
995    fn plan_sequential_for_small_groups_without_telemetry() {
996        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
997        let requests = vec![tool_request(), tool_request()]; // < min_batch_size=4
998        let plan = executor.plan_batch(requests);
999        assert!(plan.decisions.iter().all(|d| !d.is_interleave()));
1000    }
1001
1002    #[test]
1003    fn plan_sequential_for_write_groups() {
1004        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1005
1006        // Prime telemetry with high stall ratio.
1007        for _ in 0..100 {
1008            executor.observe_call(500_000);
1009        }
1010
1011        let requests = vec![
1012            session_write_request(),
1013            session_write_request(),
1014            session_write_request(),
1015            session_write_request(),
1016        ];
1017        let plan = executor.plan_batch(requests);
1018        assert_eq!(plan.groups.len(), 1);
1019        assert!(
1020            plan.decisions[0]
1021                == AmacToggleDecision::Sequential {
1022                    reason: "ordering_dependency"
1023                }
1024        );
1025    }
1026
1027    #[test]
1028    fn plan_interleave_with_high_stall_ratio_and_sufficient_batch() {
1029        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1030
1031        // Prime telemetry with high stall ratio.
1032        for _ in 0..100 {
1033            executor.observe_call(500_000);
1034        }
1035
1036        let requests: Vec<HostcallRequest> = (0..8).map(|_| http_request()).collect();
1037        let plan = executor.plan_batch(requests);
1038        assert_eq!(plan.groups.len(), 1);
1039        assert!(plan.decisions[0].is_interleave());
1040        if let AmacToggleDecision::Interleave { width } = plan.decisions[0] {
1041            assert!(width >= 2);
1042            assert!(width <= 16);
1043        }
1044    }
1045
1046    #[test]
1047    fn plan_sequential_with_low_stall_ratio() {
1048        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1049
1050        // Prime telemetry with low stall ratio.
1051        for _ in 0..100 {
1052            executor.observe_call(10_000);
1053        }
1054
1055        let requests: Vec<HostcallRequest> = (0..8).map(|_| http_request()).collect();
1056        let plan = executor.plan_batch(requests);
1057        assert_eq!(plan.groups.len(), 1);
1058        assert!(!plan.decisions[0].is_interleave());
1059    }
1060
1061    // ── Toggle decision tests ────────────────────────────────────────
1062
1063    #[test]
1064    fn toggle_interleave_width_scales_with_stall_severity() {
1065        // Higher stall ratio → wider interleave.
1066        let width_low = compute_interleave_width(300, 90, 16, 16);
1067        let width_high = compute_interleave_width(700, 90, 16, 16);
1068        assert!(
1069            width_high >= width_low,
1070            "higher stall ratio should give wider interleave: low={width_low}, high={width_high}"
1071        );
1072    }
1073
1074    #[test]
1075    fn toggle_width_capped_by_group_size() {
1076        let width = compute_interleave_width(800, 90, 3, 16);
1077        assert!(width <= 3);
1078    }
1079
1080    #[test]
1081    fn toggle_width_capped_by_max_width() {
1082        let width = compute_interleave_width(800, 90, 100, 8);
1083        assert!(width <= 8);
1084    }
1085
1086    #[test]
1087    fn toggle_width_minimum_is_two() {
1088        let width = compute_interleave_width(201, 5, 100, 16);
1089        assert!(width >= 2);
1090    }
1091
1092    // ── Session/event operation classification ───────────────────────
1093
1094    #[test]
1095    fn session_read_ops_classified_correctly() {
1096        assert!(is_session_read_op("get_state"));
1097        assert!(is_session_read_op("getState"));
1098        assert!(is_session_read_op("get_messages"));
1099        assert!(is_session_read_op("getMessages"));
1100        assert!(is_session_read_op("get_entries"));
1101        assert!(is_session_read_op("getEntries"));
1102    }
1103
1104    #[test]
1105    fn session_write_ops_classified_correctly() {
1106        assert!(!is_session_read_op("set_model"));
1107        assert!(!is_session_read_op("setModel"));
1108        assert!(!is_session_read_op("set_name"));
1109        assert!(!is_session_read_op("add_label"));
1110    }
1111
1112    #[test]
1113    fn event_read_ops_classified_correctly() {
1114        assert!(is_event_read_op("get_active_tools"));
1115        assert!(is_event_read_op("getActiveTools"));
1116        assert!(is_event_read_op("get_all_tools"));
1117        assert!(is_event_read_op("get_model"));
1118        assert!(is_event_read_op("get_flag"));
1119        assert!(is_event_read_op("list_flags"));
1120    }
1121
1122    #[test]
1123    fn event_write_ops_classified_correctly() {
1124        assert!(!is_event_read_op("set_active_tools"));
1125        assert!(!is_event_read_op("set_model"));
1126        assert!(!is_event_read_op("register_command"));
1127        assert!(!is_event_read_op("register_provider"));
1128    }
1129
1130    // ── Serialization round-trip ─────────────────────────────────────
1131
1132    #[test]
1133    fn telemetry_snapshot_serializes_deterministically() {
1134        let mut telemetry = AmacStallTelemetry::new(100_000);
1135        for i in 0..10 {
1136            telemetry.record(i * 10_000);
1137        }
1138        let snap = telemetry.snapshot();
1139        let json = serde_json::to_string(&snap).expect("serialize snapshot");
1140        let deserialized: AmacStallTelemetrySnapshot =
1141            serde_json::from_str(&json).expect("deserialize snapshot");
1142        assert_eq!(deserialized.total_calls, snap.total_calls);
1143        assert_eq!(deserialized.total_stalls, snap.total_stalls);
1144        assert_eq!(deserialized.toggle_decisions, snap.toggle_decisions);
1145    }
1146
1147    #[test]
1148    fn group_key_serializes_round_trip() {
1149        let keys = vec![
1150            AmacGroupKey::SessionRead,
1151            AmacGroupKey::SessionWrite,
1152            AmacGroupKey::EventRead,
1153            AmacGroupKey::EventWrite,
1154            AmacGroupKey::Tool,
1155            AmacGroupKey::Exec,
1156            AmacGroupKey::Http,
1157            AmacGroupKey::Ui,
1158            AmacGroupKey::Log,
1159        ];
1160        for key in keys {
1161            let json = serde_json::to_string(&key).expect("serialize key");
1162            let deserialized: AmacGroupKey = serde_json::from_str(&json).expect("deserialize key");
1163            assert_eq!(deserialized, key);
1164        }
1165    }
1166
1167    #[test]
1168    fn toggle_decision_serializes_round_trip() {
1169        let interleave = AmacToggleDecision::Interleave { width: 8 };
1170        let json = serde_json::to_string(&interleave).expect("serialize");
1171        let json: &'static str = Box::leak(json.into_boxed_str());
1172        let deserialized: AmacToggleDecision = serde_json::from_str(json).expect("deserialize");
1173        assert_eq!(deserialized, interleave);
1174
1175        let sequential = AmacToggleDecision::Sequential {
1176            reason: "batch_too_small",
1177        };
1178        let json = serde_json::to_string(&sequential).expect("serialize");
1179        let json: &'static str = Box::leak(json.into_boxed_str());
1180        let deserialized: AmacToggleDecision = serde_json::from_str(json).expect("deserialize");
1181        assert_eq!(deserialized, sequential);
1182    }
1183
1184    // ── Mixed batch scenarios ────────────────────────────────────────
1185
1186    #[test]
1187    fn mixed_batch_groups_independently() {
1188        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1189        let requests = vec![
1190            session_read_request(),
1191            tool_request(),
1192            http_request(),
1193            session_write_request(),
1194            log_request(),
1195            event_read_request(),
1196            session_read_request(),
1197            tool_request(),
1198        ];
1199        let plan = executor.plan_batch(requests);
1200        assert_eq!(plan.total_requests, 8);
1201        // Contiguous-run grouping keeps ordering strict, and log calls are sunk
1202        // to their own group once a non-log key boundary is crossed.
1203        assert_eq!(plan.groups.len(), 8);
1204        assert_eq!(plan.groups[0].key, AmacGroupKey::SessionRead);
1205        assert_eq!(plan.groups[1].key, AmacGroupKey::Tool);
1206        assert_eq!(plan.groups[2].key, AmacGroupKey::Http);
1207        assert_eq!(plan.groups[3].key, AmacGroupKey::SessionWrite);
1208        assert_eq!(plan.groups[4].key, AmacGroupKey::Log);
1209        assert_eq!(plan.groups[5].key, AmacGroupKey::EventRead);
1210        assert_eq!(plan.groups[6].key, AmacGroupKey::SessionRead);
1211        assert_eq!(plan.groups[7].key, AmacGroupKey::Tool);
1212    }
1213
1214    #[test]
1215    fn executor_tracks_toggle_decision_counts() {
1216        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1217
1218        // Prime with high stalls.
1219        for _ in 0..100 {
1220            executor.observe_call(500_000);
1221        }
1222
1223        let requests: Vec<HostcallRequest> = (0..6).map(|_| http_request()).collect();
1224        let plan = executor.plan_batch(requests);
1225
1226        let snap = executor.telemetry().snapshot();
1227        assert_eq!(snap.toggle_decisions, plan.groups.len() as u64);
1228        assert!(snap.interleave_selections > 0);
1229    }
1230
1231    #[test]
1232    fn single_request_batch_always_sequential() {
1233        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1234
1235        // Even with high stalls.
1236        for _ in 0..100 {
1237            executor.observe_call(500_000);
1238        }
1239
1240        let requests = vec![http_request()];
1241        let plan = executor.plan_batch(requests);
1242        assert_eq!(plan.groups.len(), 1);
1243        // Single item in group < min_batch_size=2 would be edge case,
1244        // but batch has 1 item total.
1245        assert!(plan.decisions.iter().all(|d| !d.is_interleave()));
1246    }
1247
1248    // ── Clone semantics ─────────────────────────────────────────────
1249
1250    #[test]
1251    fn executor_clone_preserves_telemetry_state() {
1252        let mut original = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1253        for _ in 0..50 {
1254            original.observe_call(200_000);
1255        }
1256        let snap_before = original.telemetry().snapshot();
1257        assert_eq!(snap_before.total_calls, 50);
1258
1259        let cloned = original.clone();
1260        let snap_cloned = cloned.telemetry().snapshot();
1261        assert_eq!(snap_cloned.total_calls, snap_before.total_calls);
1262        assert_eq!(snap_cloned.total_stalls, snap_before.total_stalls);
1263        assert_eq!(snap_cloned.stall_ratio, snap_before.stall_ratio);
1264    }
1265
1266    #[test]
1267    fn executor_clone_is_independent() {
1268        let mut original = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1269        for _ in 0..10 {
1270            original.observe_call(50_000);
1271        }
1272
1273        let mut cloned = original.clone();
1274        // Mutate only the clone.
1275        for _ in 0..100 {
1276            cloned.observe_call(500_000);
1277        }
1278
1279        // Original should be unaffected.
1280        assert_eq!(original.telemetry().snapshot().total_calls, 10);
1281        assert_eq!(cloned.telemetry().snapshot().total_calls, 110);
1282    }
1283
1284    // ── Config from env ─────────────────────────────────────────────
1285
1286    #[test]
1287    fn config_new_matches_parameters() {
1288        let config = AmacBatchExecutorConfig::new(false, 8, 32);
1289        assert!(!config.enabled);
1290        assert_eq!(config.min_batch_size, 8);
1291        assert_eq!(config.max_interleave_width, 32);
1292        assert_eq!(config.stall_threshold_ns, AMAC_STALL_THRESHOLD_NS);
1293        assert_eq!(config.stall_ratio_threshold, AMAC_STALL_RATIO_THRESHOLD);
1294    }
1295
1296    #[test]
1297    fn default_executor_is_enabled() {
1298        // Default from_env with no env vars set → enabled.
1299        let executor = AmacBatchExecutor::default();
1300        assert!(executor.enabled());
1301    }
1302
1303    #[test]
1304    fn config_with_thresholds_applies_clamps() {
1305        let config = AmacBatchExecutorConfig::new(true, 4, 16).with_thresholds(0, 9_999);
1306        assert_eq!(config.stall_threshold_ns, 1);
1307        assert_eq!(config.stall_ratio_threshold, 1_000);
1308    }
1309
1310    // ── Batch result types ──────────────────────────────────────────
1311
1312    #[test]
1313    fn batch_telemetry_serializes() {
1314        let telem = AmacBatchTelemetry {
1315            total_requests: 10,
1316            groups_dispatched: 3,
1317            interleaved_groups: 1,
1318            sequential_groups: 2,
1319            total_elapsed_ns: 5_000_000,
1320        };
1321        let json = serde_json::to_string(&telem).expect("serialize");
1322        let deser: AmacBatchTelemetry = serde_json::from_str(&json).expect("deserialize");
1323        assert_eq!(deser.total_requests, 10);
1324        assert_eq!(deser.interleaved_groups, 1);
1325    }
1326
1327    // ── Property tests ──
1328
1329    mod proptest_amac {
1330        use super::*;
1331        use proptest::prelude::*;
1332
1333        proptest! {
1334            #[test]
1335            fn stall_ratio_bounded_0_to_1000(
1336                observations in prop::collection::vec(0..1_000_000u64, 1..100),
1337            ) {
1338                let mut telemetry = AmacStallTelemetry::new(100_000);
1339                for elapsed in &observations {
1340                    telemetry.record(*elapsed);
1341                }
1342                let ratio = telemetry.stall_ratio();
1343                assert!(ratio <= 1_000, "stall_ratio was {ratio}, expected <= 1000");
1344            }
1345
1346            #[test]
1347            fn total_stalls_never_exceeds_total_calls(
1348                observations in prop::collection::vec(0..1_000_000u64, 1..100),
1349            ) {
1350                let mut telemetry = AmacStallTelemetry::new(100_000);
1351                for elapsed in &observations {
1352                    telemetry.record(*elapsed);
1353                }
1354                assert!(
1355                    telemetry.total_stalls <= telemetry.total_calls,
1356                    "stalls {} > calls {}",
1357                    telemetry.total_stalls,
1358                    telemetry.total_calls,
1359                );
1360            }
1361
1362            #[test]
1363            fn total_calls_matches_observation_count(
1364                observations in prop::collection::vec(0..1_000_000u64, 1..100),
1365            ) {
1366                let mut telemetry = AmacStallTelemetry::new(100_000);
1367                for elapsed in &observations {
1368                    telemetry.record(*elapsed);
1369                }
1370                assert_eq!(
1371                    telemetry.total_calls,
1372                    observations.len() as u64,
1373                );
1374            }
1375
1376            #[test]
1377            fn recent_window_never_exceeds_capacity(
1378                observations in prop::collection::vec(0..1_000_000u64, 1..200),
1379            ) {
1380                let mut telemetry = AmacStallTelemetry::new(100_000);
1381                for elapsed in &observations {
1382                    telemetry.record(*elapsed);
1383                }
1384                let snap = telemetry.snapshot();
1385                assert!(
1386                    snap.recent_window_size <= TELEMETRY_WINDOW_SIZE,
1387                    "window {} > capacity {}",
1388                    snap.recent_window_size,
1389                    TELEMETRY_WINDOW_SIZE,
1390                );
1391            }
1392
1393            #[test]
1394            fn interleave_width_bounded(
1395                stall_ratio in 0..2000u64,
1396                memory_weight in 0..100u32,
1397                group_size in 2..100usize,
1398                max_width in 2..32usize,
1399            ) {
1400                let width = compute_interleave_width(
1401                    stall_ratio,
1402                    memory_weight,
1403                    group_size,
1404                    max_width,
1405                );
1406                assert!(width >= 2, "width must be >= 2, got {width}");
1407                assert!(width <= max_width, "width {width} > max_width {max_width}");
1408                assert!(width <= group_size, "width {width} > group_size {group_size}");
1409            }
1410
1411            #[test]
1412            fn interleave_width_monotone_in_stall_ratio(
1413                base_ratio in 200..600u64,
1414                delta in 1..400u64,
1415                memory_weight in 1..100u32,
1416                group_size in 4..50usize,
1417                max_width in 4..32usize,
1418            ) {
1419                let low = compute_interleave_width(
1420                    base_ratio,
1421                    memory_weight,
1422                    group_size,
1423                    max_width,
1424                );
1425                let high = compute_interleave_width(
1426                    base_ratio + delta,
1427                    memory_weight,
1428                    group_size,
1429                    max_width,
1430                );
1431                assert!(
1432                    high >= low,
1433                    "higher stall ratio should give >= width: low={low} (ratio={base_ratio}), high={high} (ratio={})",
1434                    base_ratio + delta,
1435                );
1436            }
1437
1438            #[test]
1439            fn group_key_interleave_safe_stable(
1440                idx in 0..9usize,
1441            ) {
1442                let keys = [
1443                    AmacGroupKey::SessionRead,
1444                    AmacGroupKey::SessionWrite,
1445                    AmacGroupKey::EventRead,
1446                    AmacGroupKey::EventWrite,
1447                    AmacGroupKey::Tool,
1448                    AmacGroupKey::Exec,
1449                    AmacGroupKey::Http,
1450                    AmacGroupKey::Ui,
1451                    AmacGroupKey::Log,
1452                ];
1453                let key = &keys[idx];
1454                let s1 = key.interleave_safe();
1455                let s2 = key.interleave_safe();
1456                assert_eq!(s1, s2, "interleave_safe must be deterministic");
1457            }
1458        }
1459    }
1460}