Skip to main content

pi/
hostcall_amac.rs

1//! AMAC-style interleaved hostcall batch executor with stall-aware toggling.
2//!
3//! AMAC (Asynchronous Memory Access Chaining) interleaves multiple independent
4//! hostcall state machines per scheduler tick to hide memory-access latency.
5//! When the working set exceeds the LLC, sequential dispatch stalls on cache
6//! misses; interleaving lets one request's computation overlap another's
7//! memory fetch, improving throughput by up to the memory-level parallelism
8//! ratio.
9//!
10//! The executor dynamically toggles between batched-interleaved and sequential
11//! dispatch based on observed per-call timing telemetry as a proxy for LLC miss
12//! rates and stall cycles.
13
14use crate::extensions_js::HostcallKind;
15use crate::extensions_js::HostcallRequest;
16use crate::scheduler::HostcallOutcome;
17use serde::{Deserialize, Serialize};
18
19// ── Configuration constants ──────────────────────────────────────────────
20
21/// Minimum batch size to consider AMAC interleaving (below this, sequential
22/// dispatch has less overhead).
23const AMAC_MIN_BATCH_SIZE: usize = 4;
24
25/// Maximum number of in-flight state machines per interleave round.
26const AMAC_MAX_INTERLEAVE_WIDTH: usize = 16;
27
28/// Stall-detection threshold: if a request takes longer than this many
29/// nanoseconds, it's treated as a "stall" (likely LLC miss or IO wait).
30const AMAC_STALL_THRESHOLD_NS: u64 = 100_000; // 100us
31
32/// Exponential moving average decay factor (fixed-point, 0..256 maps to 0..1).
33/// EMA_ALPHA=51 ≈ 0.2, giving 80% weight to history.
34const EMA_ALPHA: u64 = 51;
35const EMA_SCALE: u64 = 256;
36
37/// Minimum stall ratio (fixed-point, 0..1000) to enable AMAC interleaving.
38/// 200 = 20% stall rate.
39const AMAC_STALL_RATIO_THRESHOLD: u64 = 200;
40
41/// Maximum ratio (fixed-point, 0..1000) above which we assume all calls are
42/// memory-bound and interleaving provides maximum benefit.
43const AMAC_STALL_RATIO_SATURATED: u64 = 800;
44
45/// How many recent timing samples to retain for decision-making.
46const TELEMETRY_WINDOW_SIZE: usize = 64;
47
48// ── Core types ───────────────────────────────────────────────────────────
49
50/// Grouping key for batching compatible hostcall requests together.
51#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
52pub enum AmacGroupKey {
53    /// Session read operations (get_state, get_messages, etc.) - highly batchable.
54    SessionRead,
55    /// Session write operations (set_model, set_name, etc.) - must preserve order.
56    SessionWrite,
57    /// Event queries (get_model, get_flag, list_flags) - batchable.
58    EventRead,
59    /// Event mutations (set_model, register_*) - preserve order.
60    EventWrite,
61    /// Tool invocations - independent, can interleave.
62    Tool,
63    /// Exec invocations - independent but may have side effects.
64    Exec,
65    /// HTTP requests - independent, high latency → good AMAC candidates.
66    Http,
67    /// UI operations - typically sequential.
68    Ui,
69    /// Log operations - fire-and-forget, trivially batchable.
70    Log,
71}
72
73impl AmacGroupKey {
74    /// Classify a hostcall request into its batch group.
75    #[must_use]
76    pub fn from_request(request: &HostcallRequest) -> Self {
77        match &request.kind {
78            HostcallKind::Session { op } => {
79                if is_session_read_op(op) {
80                    Self::SessionRead
81                } else {
82                    Self::SessionWrite
83                }
84            }
85            HostcallKind::Events { op } => {
86                if is_event_read_op(op) {
87                    Self::EventRead
88                } else {
89                    Self::EventWrite
90                }
91            }
92            HostcallKind::Tool { .. } => Self::Tool,
93            HostcallKind::Exec { .. } => Self::Exec,
94            HostcallKind::Http => Self::Http,
95            HostcallKind::Ui { .. } => Self::Ui,
96            HostcallKind::Log => Self::Log,
97        }
98    }
99
100    /// Whether requests in this group are safe to interleave (no ordering
101    /// dependencies within the group).
102    #[must_use]
103    pub const fn interleave_safe(&self) -> bool {
104        matches!(
105            self,
106            Self::SessionRead | Self::EventRead | Self::Tool | Self::Http | Self::Log
107        )
108    }
109
110    /// Estimated memory-boundedness weight for this group (0..100).
111    /// Higher means more likely to benefit from interleaving.
112    #[must_use]
113    pub const fn memory_weight(&self) -> u32 {
114        match self {
115            Self::Http => 90,              // Network IO = high stall
116            Self::Tool | Self::Exec => 70, // File IO or subprocess
117            Self::SessionRead => 50,       // In-memory but large working set
118            Self::EventRead => 40,         // Small working set, fast
119            Self::SessionWrite => 30,
120            Self::EventWrite => 20,
121            Self::Ui => 10,
122            Self::Log => 5,
123        }
124    }
125}
126
127/// A group of hostcall requests that share a batch key and can be
128/// dispatched together.
129#[derive(Debug)]
130pub struct AmacBatchGroup {
131    /// The batch key for this group.
132    pub key: AmacGroupKey,
133    /// Requests in this group, in original drain order.
134    pub requests: Vec<HostcallRequest>,
135}
136
137impl AmacBatchGroup {
138    #[must_use]
139    pub fn len(&self) -> usize {
140        self.requests.len()
141    }
142
143    #[must_use]
144    pub fn is_empty(&self) -> bool {
145        self.requests.is_empty()
146    }
147}
148
149/// The decision of whether to use AMAC interleaving or sequential dispatch.
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
151pub enum AmacToggleDecision {
152    /// Use interleaved dispatch for this batch.
153    Interleave {
154        /// Number of concurrent state machines per round.
155        width: usize,
156    },
157    /// Use sequential dispatch (AMAC overhead not justified).
158    Sequential {
159        /// Reason for falling back.
160        reason: &'static str,
161    },
162}
163
164impl AmacToggleDecision {
165    #[must_use]
166    pub const fn is_interleave(&self) -> bool {
167        matches!(self, Self::Interleave { .. })
168    }
169}
170
171/// Per-call timing observation for stall detection.
172#[derive(Debug, Clone, Copy)]
173struct TimingSample {
174    /// Wall-clock nanoseconds for this dispatch.
175    elapsed_ns: u64,
176    /// Whether this was classified as a stall.
177    stalled: bool,
178}
179
180/// Stall telemetry tracker using exponential moving averages.
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct AmacStallTelemetry {
183    /// EMA of per-call dispatch time (nanoseconds, fixed-point ×256).
184    ema_elapsed_scaled: u64,
185    /// EMA of stall ratio (fixed-point, 0..1000 ×256).
186    ema_stall_ratio_scaled: u64,
187    /// Total calls observed.
188    total_calls: u64,
189    /// Total stalls observed.
190    total_stalls: u64,
191    /// Recent timing window for variance estimation.
192    #[serde(skip)]
193    recent_samples: Vec<TimingSample>,
194    /// Stall threshold in nanoseconds.
195    stall_threshold_ns: u64,
196    /// Number of AMAC toggle decisions made.
197    pub toggle_decisions: u64,
198    /// Number of times AMAC was selected over sequential.
199    pub interleave_selections: u64,
200}
201
202impl Default for AmacStallTelemetry {
203    fn default() -> Self {
204        Self::new(AMAC_STALL_THRESHOLD_NS)
205    }
206}
207
208impl AmacStallTelemetry {
209    #[must_use]
210    pub fn new(stall_threshold_ns: u64) -> Self {
211        Self {
212            ema_elapsed_scaled: 0,
213            ema_stall_ratio_scaled: 0,
214            total_calls: 0,
215            total_stalls: 0,
216            recent_samples: Vec::with_capacity(TELEMETRY_WINDOW_SIZE),
217            stall_threshold_ns,
218            toggle_decisions: 0,
219            interleave_selections: 0,
220        }
221    }
222
223    /// Record a timing observation.
224    pub fn record(&mut self, elapsed_ns: u64) {
225        let stalled = elapsed_ns > self.stall_threshold_ns;
226        self.total_calls = self.total_calls.saturating_add(1);
227        if stalled {
228            self.total_stalls = self.total_stalls.saturating_add(1);
229        }
230
231        // Update EMA for elapsed time.
232        let scaled_elapsed = elapsed_ns.saturating_mul(EMA_SCALE);
233        self.ema_elapsed_scaled = if self.total_calls == 1 {
234            scaled_elapsed
235        } else {
236            // EMA = alpha * new + (1 - alpha) * old
237            let alpha_new = scaled_elapsed.saturating_mul(EMA_ALPHA) / EMA_SCALE;
238            let alpha_old = self
239                .ema_elapsed_scaled
240                .saturating_mul(EMA_SCALE.saturating_sub(EMA_ALPHA))
241                / EMA_SCALE;
242            alpha_new.saturating_add(alpha_old)
243        };
244
245        // Update EMA for stall ratio.
246        let stall_point = if stalled { 1000 * EMA_SCALE } else { 0 };
247        self.ema_stall_ratio_scaled = if self.total_calls == 1 {
248            stall_point
249        } else {
250            let alpha_new = stall_point.saturating_mul(EMA_ALPHA) / EMA_SCALE;
251            let alpha_old = self
252                .ema_stall_ratio_scaled
253                .saturating_mul(EMA_SCALE.saturating_sub(EMA_ALPHA))
254                / EMA_SCALE;
255            alpha_new.saturating_add(alpha_old)
256        };
257
258        // Maintain sliding window.
259        let sample = TimingSample {
260            elapsed_ns,
261            stalled,
262        };
263        if self.recent_samples.len() >= TELEMETRY_WINDOW_SIZE {
264            self.recent_samples.remove(0);
265        }
266        self.recent_samples.push(sample);
267    }
268
269    /// Current smoothed stall ratio (0..1000).
270    #[must_use]
271    pub fn stall_ratio(&self) -> u64 {
272        self.ema_stall_ratio_scaled / EMA_SCALE.max(1)
273    }
274
275    /// Current smoothed average elapsed nanoseconds.
276    #[must_use]
277    pub fn avg_elapsed_ns(&self) -> u64 {
278        self.ema_elapsed_scaled / EMA_SCALE.max(1)
279    }
280
281    /// Variance of recent timing samples (nanoseconds squared).
282    #[must_use]
283    pub fn recent_variance(&self) -> u64 {
284        if self.recent_samples.len() < 2 {
285            return 0;
286        }
287        let n = self.recent_samples.len() as u64;
288        let sum: u64 = self
289            .recent_samples
290            .iter()
291            .map(|sample| sample.elapsed_ns)
292            .sum();
293        let mean = sum / n;
294        let variance: u64 = self
295            .recent_samples
296            .iter()
297            .map(|sample| {
298                let diff = sample.elapsed_ns.abs_diff(mean);
299                diff.saturating_mul(diff)
300            })
301            .sum::<u64>()
302            / n;
303        variance
304    }
305
306    /// Snapshot of current telemetry state.
307    #[must_use]
308    pub fn snapshot(&self) -> AmacStallTelemetrySnapshot {
309        AmacStallTelemetrySnapshot {
310            stall_ratio: self.stall_ratio(),
311            avg_elapsed_ns: self.avg_elapsed_ns(),
312            recent_variance: self.recent_variance(),
313            total_calls: self.total_calls,
314            total_stalls: self.total_stalls,
315            stall_threshold_ns: self.stall_threshold_ns,
316            toggle_decisions: self.toggle_decisions,
317            interleave_selections: self.interleave_selections,
318            recent_window_size: self.recent_samples.len(),
319        }
320    }
321}
322
323/// Immutable snapshot of stall telemetry for reporting.
324#[derive(Debug, Clone, Serialize, Deserialize)]
325pub struct AmacStallTelemetrySnapshot {
326    pub stall_ratio: u64,
327    pub avg_elapsed_ns: u64,
328    pub recent_variance: u64,
329    pub total_calls: u64,
330    pub total_stalls: u64,
331    pub stall_threshold_ns: u64,
332    pub toggle_decisions: u64,
333    pub interleave_selections: u64,
334    pub recent_window_size: usize,
335}
336
337// ── Batch plan ───────────────────────────────────────────────────────────
338
339/// Execution plan for a batch of hostcall requests.
340#[derive(Debug)]
341pub struct AmacBatchPlan {
342    /// Groups to dispatch, in priority order.
343    pub groups: Vec<AmacBatchGroup>,
344    /// Per-group toggle decisions.
345    pub decisions: Vec<AmacToggleDecision>,
346    /// Total requests in the batch.
347    pub total_requests: usize,
348    /// How many groups will use interleaving.
349    pub interleaved_groups: usize,
350    /// How many groups will use sequential dispatch.
351    pub sequential_groups: usize,
352}
353
354/// Result of executing a batch plan.
355#[derive(Debug)]
356pub struct AmacBatchResult {
357    /// Completed hostcall outcomes, in call_id order for deterministic
358    /// scheduler enqueuing.
359    pub completions: Vec<(String, HostcallOutcome)>,
360    /// Telemetry from this batch execution.
361    pub batch_telemetry: AmacBatchTelemetry,
362}
363
364/// Per-batch execution telemetry.
365#[derive(Debug, Clone, Serialize, Deserialize)]
366pub struct AmacBatchTelemetry {
367    pub total_requests: usize,
368    pub groups_dispatched: usize,
369    pub interleaved_groups: usize,
370    pub sequential_groups: usize,
371    pub total_elapsed_ns: u64,
372}
373
374// ── Executor ─────────────────────────────────────────────────────────────
375
376/// AMAC batch executor configuration.
377#[derive(Debug, Clone)]
378pub struct AmacBatchExecutorConfig {
379    /// Minimum batch size to consider interleaving.
380    pub min_batch_size: usize,
381    /// Maximum interleave width (concurrent state machines).
382    pub max_interleave_width: usize,
383    /// Enable/disable AMAC globally.
384    pub enabled: bool,
385    /// Stall classification threshold in nanoseconds.
386    pub stall_threshold_ns: u64,
387    /// Stall-ratio threshold (0..1000) required before AMAC interleaving.
388    pub stall_ratio_threshold: u64,
389}
390
391impl Default for AmacBatchExecutorConfig {
392    fn default() -> Self {
393        Self::from_env()
394    }
395}
396
397impl AmacBatchExecutorConfig {
398    #[must_use]
399    pub fn from_env() -> Self {
400        let enabled = std::env::var("PI_HOSTCALL_AMAC")
401            .ok()
402            .as_deref()
403            .is_none_or(|value| {
404                !matches!(
405                    value.trim().to_ascii_lowercase().as_str(),
406                    "0" | "false" | "off" | "disabled"
407                )
408            });
409        let min_batch_size = std::env::var("PI_HOSTCALL_AMAC_MIN_BATCH")
410            .ok()
411            .and_then(|raw| raw.trim().parse::<usize>().ok())
412            .unwrap_or(AMAC_MIN_BATCH_SIZE)
413            .max(2);
414        let max_interleave_width = std::env::var("PI_HOSTCALL_AMAC_MAX_WIDTH")
415            .ok()
416            .and_then(|raw| raw.trim().parse::<usize>().ok())
417            .unwrap_or(AMAC_MAX_INTERLEAVE_WIDTH)
418            .max(2);
419        let stall_threshold_ns = std::env::var("PI_HOSTCALL_AMAC_STALL_THRESHOLD_NS")
420            .ok()
421            .and_then(|raw| raw.trim().parse::<u64>().ok())
422            .unwrap_or(AMAC_STALL_THRESHOLD_NS)
423            .max(1);
424        let stall_ratio_threshold = std::env::var("PI_HOSTCALL_AMAC_STALL_RATIO_THRESHOLD")
425            .ok()
426            .and_then(|raw| raw.trim().parse::<u64>().ok())
427            .unwrap_or(AMAC_STALL_RATIO_THRESHOLD)
428            .clamp(1, 1_000);
429        Self {
430            min_batch_size,
431            max_interleave_width,
432            enabled,
433            stall_threshold_ns,
434            stall_ratio_threshold,
435        }
436    }
437
438    #[must_use]
439    pub const fn new(enabled: bool, min_batch_size: usize, max_interleave_width: usize) -> Self {
440        Self {
441            min_batch_size,
442            max_interleave_width,
443            enabled,
444            stall_threshold_ns: AMAC_STALL_THRESHOLD_NS,
445            stall_ratio_threshold: AMAC_STALL_RATIO_THRESHOLD,
446        }
447    }
448
449    #[must_use]
450    pub fn with_thresholds(mut self, stall_threshold_ns: u64, stall_ratio_threshold: u64) -> Self {
451        self.stall_threshold_ns = stall_threshold_ns.max(1);
452        self.stall_ratio_threshold = stall_ratio_threshold.clamp(1, 1_000);
453        self
454    }
455}
456
457/// The AMAC batch executor.
458///
459/// Groups incoming hostcall requests by kind, decides per-group whether
460/// interleaving or sequential dispatch is optimal based on stall telemetry,
461/// and produces an execution plan.
462#[derive(Debug, Clone)]
463pub struct AmacBatchExecutor {
464    config: AmacBatchExecutorConfig,
465    telemetry: AmacStallTelemetry,
466}
467
468impl AmacBatchExecutor {
469    #[must_use]
470    pub fn new(config: AmacBatchExecutorConfig) -> Self {
471        Self {
472            telemetry: AmacStallTelemetry::new(config.stall_threshold_ns),
473            config,
474        }
475    }
476
477    #[must_use]
478    pub const fn with_telemetry(
479        config: AmacBatchExecutorConfig,
480        telemetry: AmacStallTelemetry,
481    ) -> Self {
482        Self { config, telemetry }
483    }
484
485    /// Access the current stall telemetry.
486    #[must_use]
487    pub const fn telemetry(&self) -> &AmacStallTelemetry {
488        &self.telemetry
489    }
490
491    /// Mutable access to telemetry for recording observations.
492    pub const fn telemetry_mut(&mut self) -> &mut AmacStallTelemetry {
493        &mut self.telemetry
494    }
495
496    /// Whether AMAC is enabled.
497    #[must_use]
498    pub const fn enabled(&self) -> bool {
499        self.config.enabled
500    }
501
502    /// Group a batch of drained hostcall requests and produce an execution plan.
503    ///
504    /// The plan preserves original request ordering within each group and
505    /// chooses interleave vs. sequential per group based on telemetry.
506    #[must_use]
507    #[allow(clippy::too_many_lines)]
508    pub fn plan_batch(&mut self, requests: Vec<HostcallRequest>) -> AmacBatchPlan {
509        let total_requests = requests.len();
510
511        if !self.config.enabled || total_requests == 0 {
512            return AmacBatchPlan {
513                groups: Vec::new(),
514                decisions: Vec::new(),
515                total_requests,
516                interleaved_groups: 0,
517                sequential_groups: 0,
518            };
519        }
520
521        let mut groups = Vec::new();
522        let mut decisions = Vec::new();
523        let mut interleaved_groups = 0_usize;
524        let mut sequential_groups = 0_usize;
525
526        // Group by contiguous runs of the same key to preserve global ordering,
527        // with "Log Sinking" optimization.
528        let request_iter = requests.into_iter();
529        let mut buffered_logs = Vec::new();
530
531        let mut current_key_opt: Option<AmacGroupKey> = None;
532        let mut current_requests = Vec::new();
533
534        for request in request_iter {
535            let key = AmacGroupKey::from_request(&request);
536
537            if key == AmacGroupKey::Log {
538                buffered_logs.push(request);
539                continue;
540            }
541
542            let key_changed = current_key_opt
543                .as_ref()
544                .is_none_or(|current| *current != key);
545
546            if key_changed {
547                // Flush previous batch if it existed
548                if let Some(prev_key) = current_key_opt.take() {
549                    let decision = self.decide_toggle(&prev_key, current_requests.len());
550                    if decision.is_interleave() {
551                        interleaved_groups += 1;
552                    } else {
553                        sequential_groups += 1;
554                    }
555                    groups.push(AmacBatchGroup {
556                        key: prev_key,
557                        requests: std::mem::take(&mut current_requests),
558                    });
559                    decisions.push(decision);
560
561                    // Flush logs sunk during the previous batch
562                    if !buffered_logs.is_empty() {
563                        let log_reqs = std::mem::take(&mut buffered_logs);
564                        let decision = self.decide_toggle(&AmacGroupKey::Log, log_reqs.len());
565                        if decision.is_interleave() {
566                            interleaved_groups += 1;
567                        } else {
568                            sequential_groups += 1;
569                        }
570                        groups.push(AmacBatchGroup {
571                            key: AmacGroupKey::Log,
572                            requests: log_reqs,
573                        });
574                        decisions.push(decision);
575                    }
576                }
577            }
578
579            // First non-log request, or first request after a flushed run.
580            current_key_opt = Some(key);
581            current_requests.push(request);
582        }
583
584        // Flush final run
585        if let Some(current_key) = current_key_opt {
586            if !current_requests.is_empty() {
587                let decision = self.decide_toggle(&current_key, current_requests.len());
588                if decision.is_interleave() {
589                    interleaved_groups += 1;
590                } else {
591                    sequential_groups += 1;
592                }
593                groups.push(AmacBatchGroup {
594                    key: current_key,
595                    requests: current_requests,
596                });
597                decisions.push(decision);
598            }
599        }
600
601        // Flush trailing logs (or if input was only logs).
602        if !buffered_logs.is_empty() {
603            let decision = self.decide_toggle(&AmacGroupKey::Log, buffered_logs.len());
604            if decision.is_interleave() {
605                interleaved_groups += 1;
606            } else {
607                sequential_groups += 1;
608            }
609            groups.push(AmacBatchGroup {
610                key: AmacGroupKey::Log,
611                requests: buffered_logs,
612            });
613            decisions.push(decision);
614        }
615
616        self.telemetry.toggle_decisions = self
617            .telemetry
618            .toggle_decisions
619            .saturating_add(groups.len() as u64);
620        self.telemetry.interleave_selections = self
621            .telemetry
622            .interleave_selections
623            .saturating_add(interleaved_groups as u64);
624
625        AmacBatchPlan {
626            groups,
627            decisions,
628            total_requests,
629            interleaved_groups,
630            sequential_groups,
631        }
632    }
633
634    /// Decide whether a group should use interleaved or sequential dispatch.
635    fn decide_toggle(&self, key: &AmacGroupKey, group_size: usize) -> AmacToggleDecision {
636        // Rule 1: Too small to benefit from interleaving.
637        if group_size < self.config.min_batch_size {
638            return AmacToggleDecision::Sequential {
639                reason: "batch_too_small",
640            };
641        }
642
643        // Rule 2: Group is not safe to interleave (ordering dependencies).
644        if !key.interleave_safe() {
645            return AmacToggleDecision::Sequential {
646                reason: "ordering_dependency",
647            };
648        }
649
650        // Rule 3: Insufficient telemetry history → conservative sequential.
651        if self.telemetry.total_calls < TELEMETRY_WINDOW_SIZE as u64 {
652            return AmacToggleDecision::Sequential {
653                reason: "insufficient_telemetry",
654            };
655        }
656
657        // Rule 4: Stall ratio below threshold → sequential is fine.
658        let stall_ratio = self.telemetry.stall_ratio();
659        if stall_ratio < self.config.stall_ratio_threshold {
660            return AmacToggleDecision::Sequential {
661                reason: "low_stall_ratio",
662            };
663        }
664
665        // Rule 5: Compute interleave width based on stall severity.
666        let width = compute_interleave_width(
667            stall_ratio,
668            key.memory_weight(),
669            group_size,
670            self.config.max_interleave_width,
671        );
672
673        if width < 2 {
674            return AmacToggleDecision::Sequential {
675                reason: "computed_width_too_low",
676            };
677        }
678
679        AmacToggleDecision::Interleave { width }
680    }
681
682    /// Record a per-call timing observation for stall detection.
683    pub fn observe_call(&mut self, elapsed_ns: u64) {
684        self.telemetry.record(elapsed_ns);
685    }
686}
687
688impl Default for AmacBatchExecutor {
689    fn default() -> Self {
690        Self::new(AmacBatchExecutorConfig::default())
691    }
692}
693
694// ── Helper functions ─────────────────────────────────────────────────────
695
696/// Compute optimal interleave width from stall ratio and group characteristics.
697fn compute_interleave_width(
698    stall_ratio: u64,
699    memory_weight: u32,
700    group_size: usize,
701    max_width: usize,
702) -> usize {
703    // Scale width proportionally to stall severity × memory weight.
704    // At AMAC_STALL_RATIO_SATURATED, we use max_width.
705    let effective_ratio = stall_ratio
706        .saturating_sub(AMAC_STALL_RATIO_THRESHOLD)
707        .min(AMAC_STALL_RATIO_SATURATED - AMAC_STALL_RATIO_THRESHOLD);
708    let ratio_range = AMAC_STALL_RATIO_SATURATED.saturating_sub(AMAC_STALL_RATIO_THRESHOLD);
709
710    // Avoid division by zero.
711    if ratio_range == 0 {
712        return 2;
713    }
714
715    let base_width = 2_u64
716        + (effective_ratio * u64::from(memory_weight) * (max_width as u64 - 2))
717            / (ratio_range * 100);
718
719    // Safe: base_width is bounded by max_width (which fits in usize).
720    let width = usize::try_from(base_width).unwrap_or(max_width);
721    width.min(max_width).min(group_size).max(2)
722}
723
724/// Check if a session operation is read-only.
725fn is_session_read_op(op: &str) -> bool {
726    let normalized = op.trim().to_ascii_lowercase();
727    let normalized = normalized.replace('_', "");
728    matches!(
729        normalized.as_str(),
730        "getstate"
731            | "getmessages"
732            | "getentries"
733            | "getname"
734            | "getmodel"
735            | "getlabel"
736            | "getlabels"
737            | "getallsessions"
738    )
739}
740
741/// Check if an event operation is read-only.
742fn is_event_read_op(op: &str) -> bool {
743    let normalized = op.trim().to_ascii_lowercase();
744    let normalized = normalized.replace('_', "");
745    matches!(
746        normalized.as_str(),
747        "getactivetools"
748            | "getalltools"
749            | "getmodel"
750            | "getthinkinglevel"
751            | "getflag"
752            | "listflags"
753    )
754}
755
756// ── Tests ────────────────────────────────────────────────────────────────
757
758#[cfg(test)]
759mod tests {
760    use super::*;
761    use serde_json::json;
762
763    fn make_request(kind: HostcallKind) -> HostcallRequest {
764        HostcallRequest {
765            call_id: format!("test-{}", rand_id()),
766            kind,
767            payload: json!({}),
768            trace_id: 0,
769            extension_id: None,
770        }
771    }
772
773    fn rand_id() -> u64 {
774        static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
775        COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
776    }
777
778    fn session_read_request() -> HostcallRequest {
779        make_request(HostcallKind::Session {
780            op: "get_state".to_string(),
781        })
782    }
783
784    fn session_write_request() -> HostcallRequest {
785        make_request(HostcallKind::Session {
786            op: "set_model".to_string(),
787        })
788    }
789
790    fn event_read_request() -> HostcallRequest {
791        make_request(HostcallKind::Events {
792            op: "get_model".to_string(),
793        })
794    }
795
796    fn tool_request() -> HostcallRequest {
797        make_request(HostcallKind::Tool {
798            name: "read".to_string(),
799        })
800    }
801
802    fn http_request() -> HostcallRequest {
803        make_request(HostcallKind::Http)
804    }
805
806    fn log_request() -> HostcallRequest {
807        make_request(HostcallKind::Log)
808    }
809
810    // ── AmacGroupKey tests ───────────────────────────────────────────
811
812    #[test]
813    fn group_key_classifies_session_reads_correctly() {
814        let req = session_read_request();
815        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::SessionRead);
816    }
817
818    #[test]
819    fn group_key_classifies_session_writes_correctly() {
820        let req = session_write_request();
821        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::SessionWrite);
822    }
823
824    #[test]
825    fn group_key_classifies_event_reads_correctly() {
826        let req = event_read_request();
827        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::EventRead);
828    }
829
830    #[test]
831    fn group_key_classifies_tools_correctly() {
832        let req = tool_request();
833        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Tool);
834    }
835
836    #[test]
837    fn group_key_classifies_http_correctly() {
838        let req = http_request();
839        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Http);
840    }
841
842    #[test]
843    fn group_key_classifies_log_correctly() {
844        let req = log_request();
845        assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Log);
846    }
847
848    #[test]
849    fn interleave_safe_for_read_and_independent_groups() {
850        assert!(AmacGroupKey::SessionRead.interleave_safe());
851        assert!(AmacGroupKey::EventRead.interleave_safe());
852        assert!(AmacGroupKey::Tool.interleave_safe());
853        assert!(AmacGroupKey::Http.interleave_safe());
854        assert!(AmacGroupKey::Log.interleave_safe());
855    }
856
857    #[test]
858    fn interleave_unsafe_for_write_and_ui_groups() {
859        assert!(!AmacGroupKey::SessionWrite.interleave_safe());
860        assert!(!AmacGroupKey::EventWrite.interleave_safe());
861        assert!(!AmacGroupKey::Ui.interleave_safe());
862        assert!(!AmacGroupKey::Exec.interleave_safe());
863    }
864
865    // ── Telemetry tests ──────────────────────────────────────────────
866
867    #[test]
868    fn telemetry_records_and_tracks_stall_ratio() {
869        let mut telemetry = AmacStallTelemetry::new(100_000);
870
871        // Record some fast calls (no stalls).
872        for _ in 0..10 {
873            telemetry.record(50_000);
874        }
875        assert_eq!(telemetry.total_calls, 10);
876        assert_eq!(telemetry.total_stalls, 0);
877        assert!(telemetry.stall_ratio() < AMAC_STALL_RATIO_THRESHOLD);
878
879        // Record some slow calls (stalls).
880        for _ in 0..20 {
881            telemetry.record(200_000);
882        }
883        assert_eq!(telemetry.total_calls, 30);
884        assert_eq!(telemetry.total_stalls, 20);
885        assert!(telemetry.stall_ratio() > 0);
886    }
887
888    #[test]
889    fn telemetry_ema_converges_to_steady_state() {
890        let mut telemetry = AmacStallTelemetry::new(100_000);
891
892        // All fast → stall ratio should converge near 0.
893        for _ in 0..100 {
894            telemetry.record(10_000);
895        }
896        assert!(telemetry.stall_ratio() < 50, "expected low stall ratio");
897
898        // All slow → stall ratio should converge near 1000.
899        for _ in 0..200 {
900            telemetry.record(500_000);
901        }
902        assert!(
903            telemetry.stall_ratio() > 900,
904            "expected high stall ratio, got {}",
905            telemetry.stall_ratio()
906        );
907    }
908
909    #[test]
910    fn telemetry_sliding_window_bounded() {
911        let mut telemetry = AmacStallTelemetry::new(100_000);
912        for i in 0..200 {
913            telemetry.record(i * 1000);
914        }
915        assert_eq!(telemetry.recent_samples.len(), TELEMETRY_WINDOW_SIZE);
916    }
917
918    #[test]
919    fn telemetry_variance_zero_for_constant_input() {
920        let mut telemetry = AmacStallTelemetry::new(100_000);
921        for _ in 0..10 {
922            telemetry.record(50_000);
923        }
924        assert_eq!(telemetry.recent_variance(), 0);
925    }
926
927    #[test]
928    fn telemetry_snapshot_captures_state() {
929        let mut telemetry = AmacStallTelemetry::new(100_000);
930        for _ in 0..5 {
931            telemetry.record(50_000);
932        }
933        let snap = telemetry.snapshot();
934        assert_eq!(snap.total_calls, 5);
935        assert_eq!(snap.total_stalls, 0);
936        assert_eq!(snap.recent_window_size, 5);
937    }
938
939    // ── Executor plan tests ──────────────────────────────────────────
940
941    #[test]
942    fn plan_empty_batch_returns_empty_plan() {
943        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
944        let plan = executor.plan_batch(Vec::new());
945        assert_eq!(plan.total_requests, 0);
946        assert!(plan.groups.is_empty());
947        assert!(plan.decisions.is_empty());
948    }
949
950    #[test]
951    fn plan_disabled_executor_returns_empty_groups() {
952        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(false, 4, 16));
953        let requests = vec![tool_request(), tool_request()];
954        let plan = executor.plan_batch(requests);
955        assert_eq!(plan.total_requests, 2);
956        assert!(plan.groups.is_empty());
957    }
958
959    #[test]
960    fn plan_groups_requests_by_kind() {
961        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
962        let requests = vec![
963            session_read_request(),
964            tool_request(),
965            session_read_request(),
966            http_request(),
967            tool_request(),
968        ];
969        let plan = executor.plan_batch(requests);
970        assert_eq!(plan.total_requests, 5);
971        // Grouping is done by contiguous runs to preserve global ordering.
972        assert_eq!(plan.groups.len(), 5);
973        assert_eq!(plan.groups[0].key, AmacGroupKey::SessionRead);
974        assert_eq!(plan.groups[1].key, AmacGroupKey::Tool);
975        assert_eq!(plan.groups[2].key, AmacGroupKey::SessionRead);
976        assert_eq!(plan.groups[3].key, AmacGroupKey::Http);
977        assert_eq!(plan.groups[4].key, AmacGroupKey::Tool);
978    }
979
980    #[test]
981    fn plan_preserves_intra_group_order() {
982        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
983        let req1 = session_read_request();
984        let req2 = session_read_request();
985        let id1 = req1.call_id.clone();
986        let id2 = req2.call_id.clone();
987
988        let requests = vec![req1, req2];
989        let plan = executor.plan_batch(requests);
990        assert_eq!(plan.groups.len(), 1);
991        assert_eq!(plan.groups[0].requests[0].call_id, id1);
992        assert_eq!(plan.groups[0].requests[1].call_id, id2);
993    }
994
995    #[test]
996    fn plan_sequential_for_small_groups_without_telemetry() {
997        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
998        let requests = vec![tool_request(), tool_request()]; // < min_batch_size=4
999        let plan = executor.plan_batch(requests);
1000        assert!(plan.decisions.iter().all(|d| !d.is_interleave()));
1001    }
1002
1003    #[test]
1004    fn plan_sequential_for_write_groups() {
1005        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1006
1007        // Prime telemetry with high stall ratio.
1008        for _ in 0..100 {
1009            executor.observe_call(500_000);
1010        }
1011
1012        let requests = vec![
1013            session_write_request(),
1014            session_write_request(),
1015            session_write_request(),
1016            session_write_request(),
1017        ];
1018        let plan = executor.plan_batch(requests);
1019        assert_eq!(plan.groups.len(), 1);
1020        assert!(
1021            plan.decisions[0]
1022                == AmacToggleDecision::Sequential {
1023                    reason: "ordering_dependency"
1024                }
1025        );
1026    }
1027
1028    #[test]
1029    fn plan_interleave_with_high_stall_ratio_and_sufficient_batch() {
1030        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1031
1032        // Prime telemetry with high stall ratio.
1033        for _ in 0..100 {
1034            executor.observe_call(500_000);
1035        }
1036
1037        let requests: Vec<HostcallRequest> = (0..8).map(|_| http_request()).collect();
1038        let plan = executor.plan_batch(requests);
1039        assert_eq!(plan.groups.len(), 1);
1040        assert!(plan.decisions[0].is_interleave());
1041        if let AmacToggleDecision::Interleave { width } = plan.decisions[0] {
1042            assert!(width >= 2);
1043            assert!(width <= 16);
1044        }
1045    }
1046
1047    #[test]
1048    fn plan_sequential_with_low_stall_ratio() {
1049        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1050
1051        // Prime telemetry with low stall ratio.
1052        for _ in 0..100 {
1053            executor.observe_call(10_000);
1054        }
1055
1056        let requests: Vec<HostcallRequest> = (0..8).map(|_| http_request()).collect();
1057        let plan = executor.plan_batch(requests);
1058        assert_eq!(plan.groups.len(), 1);
1059        assert!(!plan.decisions[0].is_interleave());
1060    }
1061
1062    // ── Toggle decision tests ────────────────────────────────────────
1063
1064    #[test]
1065    fn toggle_interleave_width_scales_with_stall_severity() {
1066        // Higher stall ratio → wider interleave.
1067        let width_low = compute_interleave_width(300, 90, 16, 16);
1068        let width_high = compute_interleave_width(700, 90, 16, 16);
1069        assert!(
1070            width_high >= width_low,
1071            "higher stall ratio should give wider interleave: low={width_low}, high={width_high}"
1072        );
1073    }
1074
1075    #[test]
1076    fn toggle_width_capped_by_group_size() {
1077        let width = compute_interleave_width(800, 90, 3, 16);
1078        assert!(width <= 3);
1079    }
1080
1081    #[test]
1082    fn toggle_width_capped_by_max_width() {
1083        let width = compute_interleave_width(800, 90, 100, 8);
1084        assert!(width <= 8);
1085    }
1086
1087    #[test]
1088    fn toggle_width_minimum_is_two() {
1089        let width = compute_interleave_width(201, 5, 100, 16);
1090        assert!(width >= 2);
1091    }
1092
1093    // ── Session/event operation classification ───────────────────────
1094
1095    #[test]
1096    fn session_read_ops_classified_correctly() {
1097        assert!(is_session_read_op("get_state"));
1098        assert!(is_session_read_op("getState"));
1099        assert!(is_session_read_op("get_messages"));
1100        assert!(is_session_read_op("getMessages"));
1101        assert!(is_session_read_op("get_entries"));
1102        assert!(is_session_read_op("getEntries"));
1103    }
1104
1105    #[test]
1106    fn session_write_ops_classified_correctly() {
1107        assert!(!is_session_read_op("set_model"));
1108        assert!(!is_session_read_op("setModel"));
1109        assert!(!is_session_read_op("set_name"));
1110        assert!(!is_session_read_op("add_label"));
1111    }
1112
1113    #[test]
1114    fn event_read_ops_classified_correctly() {
1115        assert!(is_event_read_op("get_active_tools"));
1116        assert!(is_event_read_op("getActiveTools"));
1117        assert!(is_event_read_op("get_all_tools"));
1118        assert!(is_event_read_op("get_model"));
1119        assert!(is_event_read_op("get_flag"));
1120        assert!(is_event_read_op("list_flags"));
1121    }
1122
1123    #[test]
1124    fn event_write_ops_classified_correctly() {
1125        assert!(!is_event_read_op("set_active_tools"));
1126        assert!(!is_event_read_op("set_model"));
1127        assert!(!is_event_read_op("register_command"));
1128        assert!(!is_event_read_op("register_provider"));
1129    }
1130
1131    // ── Serialization round-trip ─────────────────────────────────────
1132
1133    #[test]
1134    fn telemetry_snapshot_serializes_deterministically() {
1135        let mut telemetry = AmacStallTelemetry::new(100_000);
1136        for i in 0..10 {
1137            telemetry.record(i * 10_000);
1138        }
1139        let snap = telemetry.snapshot();
1140        let json = serde_json::to_string(&snap).expect("serialize snapshot");
1141        let deserialized: AmacStallTelemetrySnapshot =
1142            serde_json::from_str(&json).expect("deserialize snapshot");
1143        assert_eq!(deserialized.total_calls, snap.total_calls);
1144        assert_eq!(deserialized.total_stalls, snap.total_stalls);
1145        assert_eq!(deserialized.toggle_decisions, snap.toggle_decisions);
1146    }
1147
1148    #[test]
1149    fn group_key_serializes_round_trip() {
1150        let keys = vec![
1151            AmacGroupKey::SessionRead,
1152            AmacGroupKey::SessionWrite,
1153            AmacGroupKey::EventRead,
1154            AmacGroupKey::EventWrite,
1155            AmacGroupKey::Tool,
1156            AmacGroupKey::Exec,
1157            AmacGroupKey::Http,
1158            AmacGroupKey::Ui,
1159            AmacGroupKey::Log,
1160        ];
1161        for key in keys {
1162            let json = serde_json::to_string(&key).expect("serialize key");
1163            let deserialized: AmacGroupKey = serde_json::from_str(&json).expect("deserialize key");
1164            assert_eq!(deserialized, key);
1165        }
1166    }
1167
1168    #[test]
1169    fn toggle_decision_serializes_round_trip() {
1170        let interleave = AmacToggleDecision::Interleave { width: 8 };
1171        let json = serde_json::to_string(&interleave).expect("serialize");
1172        let json: &'static str = Box::leak(json.into_boxed_str());
1173        let deserialized: AmacToggleDecision = serde_json::from_str(json).expect("deserialize");
1174        assert_eq!(deserialized, interleave);
1175
1176        let sequential = AmacToggleDecision::Sequential {
1177            reason: "batch_too_small",
1178        };
1179        let json = serde_json::to_string(&sequential).expect("serialize");
1180        let json: &'static str = Box::leak(json.into_boxed_str());
1181        let deserialized: AmacToggleDecision = serde_json::from_str(json).expect("deserialize");
1182        assert_eq!(deserialized, sequential);
1183    }
1184
1185    // ── Mixed batch scenarios ────────────────────────────────────────
1186
1187    #[test]
1188    fn mixed_batch_groups_independently() {
1189        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1190        let requests = vec![
1191            session_read_request(),
1192            tool_request(),
1193            http_request(),
1194            session_write_request(),
1195            log_request(),
1196            event_read_request(),
1197            session_read_request(),
1198            tool_request(),
1199        ];
1200        let plan = executor.plan_batch(requests);
1201        assert_eq!(plan.total_requests, 8);
1202        // Contiguous-run grouping keeps ordering strict, and log calls are sunk
1203        // to their own group once a non-log key boundary is crossed.
1204        assert_eq!(plan.groups.len(), 8);
1205        assert_eq!(plan.groups[0].key, AmacGroupKey::SessionRead);
1206        assert_eq!(plan.groups[1].key, AmacGroupKey::Tool);
1207        assert_eq!(plan.groups[2].key, AmacGroupKey::Http);
1208        assert_eq!(plan.groups[3].key, AmacGroupKey::SessionWrite);
1209        assert_eq!(plan.groups[4].key, AmacGroupKey::Log);
1210        assert_eq!(plan.groups[5].key, AmacGroupKey::EventRead);
1211        assert_eq!(plan.groups[6].key, AmacGroupKey::SessionRead);
1212        assert_eq!(plan.groups[7].key, AmacGroupKey::Tool);
1213    }
1214
1215    #[test]
1216    fn executor_tracks_toggle_decision_counts() {
1217        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1218
1219        // Prime with high stalls.
1220        for _ in 0..100 {
1221            executor.observe_call(500_000);
1222        }
1223
1224        let requests: Vec<HostcallRequest> = (0..6).map(|_| http_request()).collect();
1225        let plan = executor.plan_batch(requests);
1226
1227        let snap = executor.telemetry().snapshot();
1228        assert_eq!(snap.toggle_decisions, plan.groups.len() as u64);
1229        assert!(snap.interleave_selections > 0);
1230    }
1231
1232    #[test]
1233    fn single_request_batch_always_sequential() {
1234        let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1235
1236        // Even with high stalls.
1237        for _ in 0..100 {
1238            executor.observe_call(500_000);
1239        }
1240
1241        let requests = vec![http_request()];
1242        let plan = executor.plan_batch(requests);
1243        assert_eq!(plan.groups.len(), 1);
1244        // Single item in group < min_batch_size=2 would be edge case,
1245        // but batch has 1 item total.
1246        assert!(plan.decisions.iter().all(|d| !d.is_interleave()));
1247    }
1248
1249    // ── Clone semantics ─────────────────────────────────────────────
1250
1251    #[test]
1252    fn executor_clone_preserves_telemetry_state() {
1253        let mut original = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1254        for _ in 0..50 {
1255            original.observe_call(200_000);
1256        }
1257        let snap_before = original.telemetry().snapshot();
1258        assert_eq!(snap_before.total_calls, 50);
1259
1260        let cloned = original.clone();
1261        let snap_cloned = cloned.telemetry().snapshot();
1262        assert_eq!(snap_cloned.total_calls, snap_before.total_calls);
1263        assert_eq!(snap_cloned.total_stalls, snap_before.total_stalls);
1264        assert_eq!(snap_cloned.stall_ratio, snap_before.stall_ratio);
1265    }
1266
1267    #[test]
1268    fn executor_clone_is_independent() {
1269        let mut original = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1270        for _ in 0..10 {
1271            original.observe_call(50_000);
1272        }
1273
1274        let mut cloned = original.clone();
1275        // Mutate only the clone.
1276        for _ in 0..100 {
1277            cloned.observe_call(500_000);
1278        }
1279
1280        // Original should be unaffected.
1281        assert_eq!(original.telemetry().snapshot().total_calls, 10);
1282        assert_eq!(cloned.telemetry().snapshot().total_calls, 110);
1283    }
1284
1285    // ── Config from env ─────────────────────────────────────────────
1286
1287    #[test]
1288    fn config_new_matches_parameters() {
1289        let config = AmacBatchExecutorConfig::new(false, 8, 32);
1290        assert!(!config.enabled);
1291        assert_eq!(config.min_batch_size, 8);
1292        assert_eq!(config.max_interleave_width, 32);
1293        assert_eq!(config.stall_threshold_ns, AMAC_STALL_THRESHOLD_NS);
1294        assert_eq!(config.stall_ratio_threshold, AMAC_STALL_RATIO_THRESHOLD);
1295    }
1296
1297    #[test]
1298    fn default_executor_is_enabled() {
1299        // Default from_env with no env vars set → enabled.
1300        let executor = AmacBatchExecutor::default();
1301        assert!(executor.enabled());
1302    }
1303
1304    #[test]
1305    fn config_with_thresholds_applies_clamps() {
1306        let config = AmacBatchExecutorConfig::new(true, 4, 16).with_thresholds(0, 9_999);
1307        assert_eq!(config.stall_threshold_ns, 1);
1308        assert_eq!(config.stall_ratio_threshold, 1_000);
1309    }
1310
1311    // ── Batch result types ──────────────────────────────────────────
1312
1313    #[test]
1314    fn batch_telemetry_serializes() {
1315        let telem = AmacBatchTelemetry {
1316            total_requests: 10,
1317            groups_dispatched: 3,
1318            interleaved_groups: 1,
1319            sequential_groups: 2,
1320            total_elapsed_ns: 5_000_000,
1321        };
1322        let json = serde_json::to_string(&telem).expect("serialize");
1323        let deser: AmacBatchTelemetry = serde_json::from_str(&json).expect("deserialize");
1324        assert_eq!(deser.total_requests, 10);
1325        assert_eq!(deser.interleaved_groups, 1);
1326    }
1327
1328    // ── Property tests ──
1329
1330    mod proptest_amac {
1331        use super::*;
1332        use proptest::prelude::*;
1333
1334        proptest! {
1335            #[test]
1336            fn stall_ratio_bounded_0_to_1000(
1337                observations in prop::collection::vec(0..1_000_000u64, 1..100),
1338            ) {
1339                let mut telemetry = AmacStallTelemetry::new(100_000);
1340                for elapsed in &observations {
1341                    telemetry.record(*elapsed);
1342                }
1343                let ratio = telemetry.stall_ratio();
1344                assert!(ratio <= 1_000, "stall_ratio was {ratio}, expected <= 1000");
1345            }
1346
1347            #[test]
1348            fn total_stalls_never_exceeds_total_calls(
1349                observations in prop::collection::vec(0..1_000_000u64, 1..100),
1350            ) {
1351                let mut telemetry = AmacStallTelemetry::new(100_000);
1352                for elapsed in &observations {
1353                    telemetry.record(*elapsed);
1354                }
1355                assert!(
1356                    telemetry.total_stalls <= telemetry.total_calls,
1357                    "stalls {} > calls {}",
1358                    telemetry.total_stalls,
1359                    telemetry.total_calls,
1360                );
1361            }
1362
1363            #[test]
1364            fn total_calls_matches_observation_count(
1365                observations in prop::collection::vec(0..1_000_000u64, 1..100),
1366            ) {
1367                let mut telemetry = AmacStallTelemetry::new(100_000);
1368                for elapsed in &observations {
1369                    telemetry.record(*elapsed);
1370                }
1371                assert_eq!(
1372                    telemetry.total_calls,
1373                    observations.len() as u64,
1374                );
1375            }
1376
1377            #[test]
1378            fn recent_window_never_exceeds_capacity(
1379                observations in prop::collection::vec(0..1_000_000u64, 1..200),
1380            ) {
1381                let mut telemetry = AmacStallTelemetry::new(100_000);
1382                for elapsed in &observations {
1383                    telemetry.record(*elapsed);
1384                }
1385                let snap = telemetry.snapshot();
1386                assert!(
1387                    snap.recent_window_size <= TELEMETRY_WINDOW_SIZE,
1388                    "window {} > capacity {}",
1389                    snap.recent_window_size,
1390                    TELEMETRY_WINDOW_SIZE,
1391                );
1392            }
1393
1394            #[test]
1395            fn interleave_width_bounded(
1396                stall_ratio in 0..2000u64,
1397                memory_weight in 0..100u32,
1398                group_size in 2..100usize,
1399                max_width in 2..32usize,
1400            ) {
1401                let width = compute_interleave_width(
1402                    stall_ratio,
1403                    memory_weight,
1404                    group_size,
1405                    max_width,
1406                );
1407                assert!(width >= 2, "width must be >= 2, got {width}");
1408                assert!(width <= max_width, "width {width} > max_width {max_width}");
1409                assert!(width <= group_size, "width {width} > group_size {group_size}");
1410            }
1411
1412            #[test]
1413            fn interleave_width_monotone_in_stall_ratio(
1414                base_ratio in 200..600u64,
1415                delta in 1..400u64,
1416                memory_weight in 1..100u32,
1417                group_size in 4..50usize,
1418                max_width in 4..32usize,
1419            ) {
1420                let low = compute_interleave_width(
1421                    base_ratio,
1422                    memory_weight,
1423                    group_size,
1424                    max_width,
1425                );
1426                let high = compute_interleave_width(
1427                    base_ratio + delta,
1428                    memory_weight,
1429                    group_size,
1430                    max_width,
1431                );
1432                assert!(
1433                    high >= low,
1434                    "higher stall ratio should give >= width: low={low} (ratio={base_ratio}), high={high} (ratio={})",
1435                    base_ratio + delta,
1436                );
1437            }
1438
1439            #[test]
1440            fn group_key_interleave_safe_stable(
1441                idx in 0..9usize,
1442            ) {
1443                let keys = [
1444                    AmacGroupKey::SessionRead,
1445                    AmacGroupKey::SessionWrite,
1446                    AmacGroupKey::EventRead,
1447                    AmacGroupKey::EventWrite,
1448                    AmacGroupKey::Tool,
1449                    AmacGroupKey::Exec,
1450                    AmacGroupKey::Http,
1451                    AmacGroupKey::Ui,
1452                    AmacGroupKey::Log,
1453                ];
1454                let key = &keys[idx];
1455                let s1 = key.interleave_safe();
1456                let s2 = key.interleave_safe();
1457                assert_eq!(s1, s2, "interleave_safe must be deterministic");
1458            }
1459        }
1460    }
1461}