1use crate::extensions_js::HostcallKind;
15use crate::extensions_js::HostcallRequest;
16use crate::scheduler::HostcallOutcome;
17use serde::{Deserialize, Serialize};
18
19const AMAC_MIN_BATCH_SIZE: usize = 4;
24
25const AMAC_MAX_INTERLEAVE_WIDTH: usize = 16;
27
28const AMAC_STALL_THRESHOLD_NS: u64 = 100_000; const EMA_ALPHA: u64 = 51;
35const EMA_SCALE: u64 = 256;
36
37const AMAC_STALL_RATIO_THRESHOLD: u64 = 200;
40
41const AMAC_STALL_RATIO_SATURATED: u64 = 800;
44
45const TELEMETRY_WINDOW_SIZE: usize = 64;
47
48#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
52pub enum AmacGroupKey {
53 SessionRead,
55 SessionWrite,
57 EventRead,
59 EventWrite,
61 Tool,
63 Exec,
65 Http,
67 Ui,
69 Log,
71}
72
73impl AmacGroupKey {
74 #[must_use]
76 pub fn from_request(request: &HostcallRequest) -> Self {
77 match &request.kind {
78 HostcallKind::Session { op } => {
79 if is_session_read_op(op) {
80 Self::SessionRead
81 } else {
82 Self::SessionWrite
83 }
84 }
85 HostcallKind::Events { op } => {
86 if is_event_read_op(op) {
87 Self::EventRead
88 } else {
89 Self::EventWrite
90 }
91 }
92 HostcallKind::Tool { .. } => Self::Tool,
93 HostcallKind::Exec { .. } => Self::Exec,
94 HostcallKind::Http => Self::Http,
95 HostcallKind::Ui { .. } => Self::Ui,
96 HostcallKind::Log => Self::Log,
97 }
98 }
99
100 #[must_use]
103 pub const fn interleave_safe(&self) -> bool {
104 matches!(
105 self,
106 Self::SessionRead | Self::EventRead | Self::Tool | Self::Http | Self::Log
107 )
108 }
109
110 #[must_use]
113 pub const fn memory_weight(&self) -> u32 {
114 match self {
115 Self::Http => 90, Self::Tool | Self::Exec => 70, Self::SessionRead => 50, Self::EventRead => 40, Self::SessionWrite => 30,
120 Self::EventWrite => 20,
121 Self::Ui => 10,
122 Self::Log => 5,
123 }
124 }
125}
126
127#[derive(Debug)]
130pub struct AmacBatchGroup {
131 pub key: AmacGroupKey,
133 pub requests: Vec<HostcallRequest>,
135}
136
137impl AmacBatchGroup {
138 #[must_use]
139 pub fn len(&self) -> usize {
140 self.requests.len()
141 }
142
143 #[must_use]
144 pub fn is_empty(&self) -> bool {
145 self.requests.is_empty()
146 }
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
151pub enum AmacToggleDecision {
152 Interleave {
154 width: usize,
156 },
157 Sequential {
159 reason: &'static str,
161 },
162}
163
164impl AmacToggleDecision {
165 #[must_use]
166 pub const fn is_interleave(&self) -> bool {
167 matches!(self, Self::Interleave { .. })
168 }
169}
170
171#[derive(Debug, Clone, Copy)]
173struct TimingSample {
174 elapsed_ns: u64,
176 stalled: bool,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct AmacStallTelemetry {
183 ema_elapsed_scaled: u64,
185 ema_stall_ratio_scaled: u64,
187 total_calls: u64,
189 total_stalls: u64,
191 #[serde(skip)]
193 recent_samples: Vec<TimingSample>,
194 stall_threshold_ns: u64,
196 pub toggle_decisions: u64,
198 pub interleave_selections: u64,
200}
201
202impl Default for AmacStallTelemetry {
203 fn default() -> Self {
204 Self::new(AMAC_STALL_THRESHOLD_NS)
205 }
206}
207
208impl AmacStallTelemetry {
209 #[must_use]
210 pub fn new(stall_threshold_ns: u64) -> Self {
211 Self {
212 ema_elapsed_scaled: 0,
213 ema_stall_ratio_scaled: 0,
214 total_calls: 0,
215 total_stalls: 0,
216 recent_samples: Vec::with_capacity(TELEMETRY_WINDOW_SIZE),
217 stall_threshold_ns,
218 toggle_decisions: 0,
219 interleave_selections: 0,
220 }
221 }
222
223 pub fn record(&mut self, elapsed_ns: u64) {
225 let stalled = elapsed_ns > self.stall_threshold_ns;
226 self.total_calls = self.total_calls.saturating_add(1);
227 if stalled {
228 self.total_stalls = self.total_stalls.saturating_add(1);
229 }
230
231 let scaled_elapsed = elapsed_ns.saturating_mul(EMA_SCALE);
233 self.ema_elapsed_scaled = if self.total_calls == 1 {
234 scaled_elapsed
235 } else {
236 let alpha_new = scaled_elapsed.saturating_mul(EMA_ALPHA) / EMA_SCALE;
238 let alpha_old = self
239 .ema_elapsed_scaled
240 .saturating_mul(EMA_SCALE.saturating_sub(EMA_ALPHA))
241 / EMA_SCALE;
242 alpha_new.saturating_add(alpha_old)
243 };
244
245 let stall_point = if stalled { 1000 * EMA_SCALE } else { 0 };
247 self.ema_stall_ratio_scaled = if self.total_calls == 1 {
248 stall_point
249 } else {
250 let alpha_new = stall_point.saturating_mul(EMA_ALPHA) / EMA_SCALE;
251 let alpha_old = self
252 .ema_stall_ratio_scaled
253 .saturating_mul(EMA_SCALE.saturating_sub(EMA_ALPHA))
254 / EMA_SCALE;
255 alpha_new.saturating_add(alpha_old)
256 };
257
258 let sample = TimingSample {
260 elapsed_ns,
261 stalled,
262 };
263 if self.recent_samples.len() >= TELEMETRY_WINDOW_SIZE {
264 self.recent_samples.remove(0);
265 }
266 self.recent_samples.push(sample);
267 }
268
269 #[must_use]
271 pub fn stall_ratio(&self) -> u64 {
272 self.ema_stall_ratio_scaled / EMA_SCALE.max(1)
273 }
274
275 #[must_use]
277 pub fn avg_elapsed_ns(&self) -> u64 {
278 self.ema_elapsed_scaled / EMA_SCALE.max(1)
279 }
280
281 #[must_use]
283 pub fn recent_variance(&self) -> u64 {
284 if self.recent_samples.len() < 2 {
285 return 0;
286 }
287 let n = self.recent_samples.len() as u64;
288 let sum: u64 = self
289 .recent_samples
290 .iter()
291 .fold(0u64, |acc, sample| acc.saturating_add(sample.elapsed_ns));
292 let mean = sum / n;
293 let variance_u128 = self
294 .recent_samples
295 .iter()
296 .map(|sample| {
297 let diff = u128::from(sample.elapsed_ns.abs_diff(mean));
298 diff * diff
299 })
300 .fold(0u128, u128::saturating_add)
301 / u128::from(n);
302 u64::try_from(variance_u128).unwrap_or(u64::MAX)
303 }
304
305 #[must_use]
307 pub fn snapshot(&self) -> AmacStallTelemetrySnapshot {
308 AmacStallTelemetrySnapshot {
309 stall_ratio: self.stall_ratio(),
310 avg_elapsed_ns: self.avg_elapsed_ns(),
311 recent_variance: self.recent_variance(),
312 total_calls: self.total_calls,
313 total_stalls: self.total_stalls,
314 stall_threshold_ns: self.stall_threshold_ns,
315 toggle_decisions: self.toggle_decisions,
316 interleave_selections: self.interleave_selections,
317 recent_window_size: self.recent_samples.len(),
318 }
319 }
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct AmacStallTelemetrySnapshot {
325 pub stall_ratio: u64,
326 pub avg_elapsed_ns: u64,
327 pub recent_variance: u64,
328 pub total_calls: u64,
329 pub total_stalls: u64,
330 pub stall_threshold_ns: u64,
331 pub toggle_decisions: u64,
332 pub interleave_selections: u64,
333 pub recent_window_size: usize,
334}
335
336#[derive(Debug)]
340pub struct AmacBatchPlan {
341 pub groups: Vec<AmacBatchGroup>,
343 pub decisions: Vec<AmacToggleDecision>,
345 pub total_requests: usize,
347 pub interleaved_groups: usize,
349 pub sequential_groups: usize,
351}
352
353#[derive(Debug)]
355pub struct AmacBatchResult {
356 pub completions: Vec<(String, HostcallOutcome)>,
359 pub batch_telemetry: AmacBatchTelemetry,
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct AmacBatchTelemetry {
366 pub total_requests: usize,
367 pub groups_dispatched: usize,
368 pub interleaved_groups: usize,
369 pub sequential_groups: usize,
370 pub total_elapsed_ns: u64,
371}
372
373#[derive(Debug, Clone)]
377pub struct AmacBatchExecutorConfig {
378 pub min_batch_size: usize,
380 pub max_interleave_width: usize,
382 pub enabled: bool,
384 pub stall_threshold_ns: u64,
386 pub stall_ratio_threshold: u64,
388}
389
390impl Default for AmacBatchExecutorConfig {
391 fn default() -> Self {
392 Self::from_env()
393 }
394}
395
396impl AmacBatchExecutorConfig {
397 #[must_use]
398 pub fn from_env() -> Self {
399 let enabled = std::env::var("PI_HOSTCALL_AMAC")
400 .ok()
401 .as_deref()
402 .is_none_or(|value| {
403 !matches!(
404 value.trim().to_ascii_lowercase().as_str(),
405 "0" | "false" | "off" | "disabled"
406 )
407 });
408 let min_batch_size = std::env::var("PI_HOSTCALL_AMAC_MIN_BATCH")
409 .ok()
410 .and_then(|raw| raw.trim().parse::<usize>().ok())
411 .unwrap_or(AMAC_MIN_BATCH_SIZE)
412 .max(2);
413 let max_interleave_width = std::env::var("PI_HOSTCALL_AMAC_MAX_WIDTH")
414 .ok()
415 .and_then(|raw| raw.trim().parse::<usize>().ok())
416 .unwrap_or(AMAC_MAX_INTERLEAVE_WIDTH)
417 .max(2);
418 let stall_threshold_ns = std::env::var("PI_HOSTCALL_AMAC_STALL_THRESHOLD_NS")
419 .ok()
420 .and_then(|raw| raw.trim().parse::<u64>().ok())
421 .unwrap_or(AMAC_STALL_THRESHOLD_NS)
422 .max(1);
423 let stall_ratio_threshold = std::env::var("PI_HOSTCALL_AMAC_STALL_RATIO_THRESHOLD")
424 .ok()
425 .and_then(|raw| raw.trim().parse::<u64>().ok())
426 .unwrap_or(AMAC_STALL_RATIO_THRESHOLD)
427 .clamp(1, 1_000);
428 Self {
429 min_batch_size,
430 max_interleave_width,
431 enabled,
432 stall_threshold_ns,
433 stall_ratio_threshold,
434 }
435 }
436
437 #[must_use]
438 pub const fn new(enabled: bool, min_batch_size: usize, max_interleave_width: usize) -> Self {
439 Self {
440 min_batch_size,
441 max_interleave_width,
442 enabled,
443 stall_threshold_ns: AMAC_STALL_THRESHOLD_NS,
444 stall_ratio_threshold: AMAC_STALL_RATIO_THRESHOLD,
445 }
446 }
447
448 #[must_use]
449 pub fn with_thresholds(mut self, stall_threshold_ns: u64, stall_ratio_threshold: u64) -> Self {
450 self.stall_threshold_ns = stall_threshold_ns.max(1);
451 self.stall_ratio_threshold = stall_ratio_threshold.clamp(1, 1_000);
452 self
453 }
454}
455
456#[derive(Debug, Clone)]
462pub struct AmacBatchExecutor {
463 config: AmacBatchExecutorConfig,
464 telemetry: AmacStallTelemetry,
465}
466
467impl AmacBatchExecutor {
468 #[must_use]
469 pub fn new(config: AmacBatchExecutorConfig) -> Self {
470 Self {
471 telemetry: AmacStallTelemetry::new(config.stall_threshold_ns),
472 config,
473 }
474 }
475
476 #[must_use]
477 pub const fn with_telemetry(
478 config: AmacBatchExecutorConfig,
479 telemetry: AmacStallTelemetry,
480 ) -> Self {
481 Self { config, telemetry }
482 }
483
484 #[must_use]
486 pub const fn telemetry(&self) -> &AmacStallTelemetry {
487 &self.telemetry
488 }
489
490 pub const fn telemetry_mut(&mut self) -> &mut AmacStallTelemetry {
492 &mut self.telemetry
493 }
494
495 #[must_use]
497 pub const fn enabled(&self) -> bool {
498 self.config.enabled
499 }
500
501 #[must_use]
506 #[allow(clippy::too_many_lines)]
507 pub fn plan_batch(&mut self, requests: Vec<HostcallRequest>) -> AmacBatchPlan {
508 let total_requests = requests.len();
509
510 if !self.config.enabled || total_requests == 0 {
511 return AmacBatchPlan {
512 groups: Vec::new(),
513 decisions: Vec::new(),
514 total_requests,
515 interleaved_groups: 0,
516 sequential_groups: 0,
517 };
518 }
519
520 let mut groups = Vec::new();
521 let mut decisions = Vec::new();
522 let mut interleaved_groups = 0_usize;
523 let mut sequential_groups = 0_usize;
524
525 let request_iter = requests.into_iter();
528 let mut buffered_logs = Vec::new();
529
530 let mut current_key_opt: Option<AmacGroupKey> = None;
531 let mut current_requests = Vec::new();
532
533 for request in request_iter {
534 let key = AmacGroupKey::from_request(&request);
535
536 if key == AmacGroupKey::Log {
537 buffered_logs.push(request);
538 continue;
539 }
540
541 let key_changed = current_key_opt
542 .as_ref()
543 .is_none_or(|current| *current != key);
544
545 if key_changed {
546 if let Some(prev_key) = current_key_opt.take() {
548 let decision = self.decide_toggle(&prev_key, current_requests.len());
549 if decision.is_interleave() {
550 interleaved_groups += 1;
551 } else {
552 sequential_groups += 1;
553 }
554 groups.push(AmacBatchGroup {
555 key: prev_key,
556 requests: std::mem::take(&mut current_requests),
557 });
558 decisions.push(decision);
559
560 if !buffered_logs.is_empty() {
562 let log_reqs = std::mem::take(&mut buffered_logs);
563 let decision = self.decide_toggle(&AmacGroupKey::Log, log_reqs.len());
564 if decision.is_interleave() {
565 interleaved_groups += 1;
566 } else {
567 sequential_groups += 1;
568 }
569 groups.push(AmacBatchGroup {
570 key: AmacGroupKey::Log,
571 requests: log_reqs,
572 });
573 decisions.push(decision);
574 }
575 }
576 }
577
578 current_key_opt = Some(key);
580 current_requests.push(request);
581 }
582
583 if let Some(current_key) = current_key_opt {
585 if !current_requests.is_empty() {
586 let decision = self.decide_toggle(¤t_key, current_requests.len());
587 if decision.is_interleave() {
588 interleaved_groups += 1;
589 } else {
590 sequential_groups += 1;
591 }
592 groups.push(AmacBatchGroup {
593 key: current_key,
594 requests: current_requests,
595 });
596 decisions.push(decision);
597 }
598 }
599
600 if !buffered_logs.is_empty() {
602 let decision = self.decide_toggle(&AmacGroupKey::Log, buffered_logs.len());
603 if decision.is_interleave() {
604 interleaved_groups += 1;
605 } else {
606 sequential_groups += 1;
607 }
608 groups.push(AmacBatchGroup {
609 key: AmacGroupKey::Log,
610 requests: buffered_logs,
611 });
612 decisions.push(decision);
613 }
614
615 self.telemetry.toggle_decisions = self
616 .telemetry
617 .toggle_decisions
618 .saturating_add(groups.len() as u64);
619 self.telemetry.interleave_selections = self
620 .telemetry
621 .interleave_selections
622 .saturating_add(interleaved_groups as u64);
623
624 AmacBatchPlan {
625 groups,
626 decisions,
627 total_requests,
628 interleaved_groups,
629 sequential_groups,
630 }
631 }
632
633 fn decide_toggle(&self, key: &AmacGroupKey, group_size: usize) -> AmacToggleDecision {
635 if group_size < self.config.min_batch_size {
637 return AmacToggleDecision::Sequential {
638 reason: "batch_too_small",
639 };
640 }
641
642 if !key.interleave_safe() {
644 return AmacToggleDecision::Sequential {
645 reason: "ordering_dependency",
646 };
647 }
648
649 if self.telemetry.total_calls < TELEMETRY_WINDOW_SIZE as u64 {
651 return AmacToggleDecision::Sequential {
652 reason: "insufficient_telemetry",
653 };
654 }
655
656 let stall_ratio = self.telemetry.stall_ratio();
658 if stall_ratio < self.config.stall_ratio_threshold {
659 return AmacToggleDecision::Sequential {
660 reason: "low_stall_ratio",
661 };
662 }
663
664 let width = compute_interleave_width(
666 stall_ratio,
667 key.memory_weight(),
668 group_size,
669 self.config.max_interleave_width,
670 );
671
672 if width < 2 {
673 return AmacToggleDecision::Sequential {
674 reason: "computed_width_too_low",
675 };
676 }
677
678 AmacToggleDecision::Interleave { width }
679 }
680
681 pub fn observe_call(&mut self, elapsed_ns: u64) {
683 self.telemetry.record(elapsed_ns);
684 }
685}
686
687impl Default for AmacBatchExecutor {
688 fn default() -> Self {
689 Self::new(AmacBatchExecutorConfig::default())
690 }
691}
692
693fn compute_interleave_width(
697 stall_ratio: u64,
698 memory_weight: u32,
699 group_size: usize,
700 max_width: usize,
701) -> usize {
702 let effective_ratio = stall_ratio
705 .saturating_sub(AMAC_STALL_RATIO_THRESHOLD)
706 .min(AMAC_STALL_RATIO_SATURATED - AMAC_STALL_RATIO_THRESHOLD);
707 let ratio_range = AMAC_STALL_RATIO_SATURATED.saturating_sub(AMAC_STALL_RATIO_THRESHOLD);
708
709 if ratio_range == 0 {
711 return 2;
712 }
713
714 let base_width = 2_u64
715 + (effective_ratio * u64::from(memory_weight) * (max_width as u64 - 2))
716 / (ratio_range * 100);
717
718 let width = usize::try_from(base_width).unwrap_or(max_width);
720 width.min(max_width).min(group_size).max(2)
721}
722
723fn is_session_read_op(op: &str) -> bool {
725 let normalized = op.trim().to_ascii_lowercase();
726 let normalized = normalized.replace('_', "");
727 matches!(
728 normalized.as_str(),
729 "getstate"
730 | "getmessages"
731 | "getentries"
732 | "getname"
733 | "getmodel"
734 | "getlabel"
735 | "getlabels"
736 | "getallsessions"
737 )
738}
739
740fn is_event_read_op(op: &str) -> bool {
742 let normalized = op.trim().to_ascii_lowercase();
743 let normalized = normalized.replace('_', "");
744 matches!(
745 normalized.as_str(),
746 "getactivetools"
747 | "getalltools"
748 | "getmodel"
749 | "getthinkinglevel"
750 | "getflag"
751 | "listflags"
752 )
753}
754
755#[cfg(test)]
758mod tests {
759 use super::*;
760 use serde_json::json;
761
762 fn make_request(kind: HostcallKind) -> HostcallRequest {
763 HostcallRequest {
764 call_id: format!("test-{}", rand_id()),
765 kind,
766 payload: json!({}),
767 trace_id: 0,
768 extension_id: None,
769 }
770 }
771
772 fn rand_id() -> u64 {
773 static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
774 COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
775 }
776
777 fn session_read_request() -> HostcallRequest {
778 make_request(HostcallKind::Session {
779 op: "get_state".to_string(),
780 })
781 }
782
783 fn session_write_request() -> HostcallRequest {
784 make_request(HostcallKind::Session {
785 op: "set_model".to_string(),
786 })
787 }
788
789 fn event_read_request() -> HostcallRequest {
790 make_request(HostcallKind::Events {
791 op: "get_model".to_string(),
792 })
793 }
794
795 fn tool_request() -> HostcallRequest {
796 make_request(HostcallKind::Tool {
797 name: "read".to_string(),
798 })
799 }
800
801 fn http_request() -> HostcallRequest {
802 make_request(HostcallKind::Http)
803 }
804
805 fn log_request() -> HostcallRequest {
806 make_request(HostcallKind::Log)
807 }
808
809 #[test]
812 fn group_key_classifies_session_reads_correctly() {
813 let req = session_read_request();
814 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::SessionRead);
815 }
816
817 #[test]
818 fn group_key_classifies_session_writes_correctly() {
819 let req = session_write_request();
820 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::SessionWrite);
821 }
822
823 #[test]
824 fn group_key_classifies_event_reads_correctly() {
825 let req = event_read_request();
826 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::EventRead);
827 }
828
829 #[test]
830 fn group_key_classifies_tools_correctly() {
831 let req = tool_request();
832 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Tool);
833 }
834
835 #[test]
836 fn group_key_classifies_http_correctly() {
837 let req = http_request();
838 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Http);
839 }
840
841 #[test]
842 fn group_key_classifies_log_correctly() {
843 let req = log_request();
844 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Log);
845 }
846
847 #[test]
848 fn interleave_safe_for_read_and_independent_groups() {
849 assert!(AmacGroupKey::SessionRead.interleave_safe());
850 assert!(AmacGroupKey::EventRead.interleave_safe());
851 assert!(AmacGroupKey::Tool.interleave_safe());
852 assert!(AmacGroupKey::Http.interleave_safe());
853 assert!(AmacGroupKey::Log.interleave_safe());
854 }
855
856 #[test]
857 fn interleave_unsafe_for_write_and_ui_groups() {
858 assert!(!AmacGroupKey::SessionWrite.interleave_safe());
859 assert!(!AmacGroupKey::EventWrite.interleave_safe());
860 assert!(!AmacGroupKey::Ui.interleave_safe());
861 assert!(!AmacGroupKey::Exec.interleave_safe());
862 }
863
864 #[test]
867 fn telemetry_records_and_tracks_stall_ratio() {
868 let mut telemetry = AmacStallTelemetry::new(100_000);
869
870 for _ in 0..10 {
872 telemetry.record(50_000);
873 }
874 assert_eq!(telemetry.total_calls, 10);
875 assert_eq!(telemetry.total_stalls, 0);
876 assert!(telemetry.stall_ratio() < AMAC_STALL_RATIO_THRESHOLD);
877
878 for _ in 0..20 {
880 telemetry.record(200_000);
881 }
882 assert_eq!(telemetry.total_calls, 30);
883 assert_eq!(telemetry.total_stalls, 20);
884 assert!(telemetry.stall_ratio() > 0);
885 }
886
887 #[test]
888 fn telemetry_ema_converges_to_steady_state() {
889 let mut telemetry = AmacStallTelemetry::new(100_000);
890
891 for _ in 0..100 {
893 telemetry.record(10_000);
894 }
895 assert!(telemetry.stall_ratio() < 50, "expected low stall ratio");
896
897 for _ in 0..200 {
899 telemetry.record(500_000);
900 }
901 assert!(
902 telemetry.stall_ratio() > 900,
903 "expected high stall ratio, got {}",
904 telemetry.stall_ratio()
905 );
906 }
907
908 #[test]
909 fn telemetry_sliding_window_bounded() {
910 let mut telemetry = AmacStallTelemetry::new(100_000);
911 for i in 0..200 {
912 telemetry.record(i * 1000);
913 }
914 assert_eq!(telemetry.recent_samples.len(), TELEMETRY_WINDOW_SIZE);
915 }
916
917 #[test]
918 fn telemetry_variance_zero_for_constant_input() {
919 let mut telemetry = AmacStallTelemetry::new(100_000);
920 for _ in 0..10 {
921 telemetry.record(50_000);
922 }
923 assert_eq!(telemetry.recent_variance(), 0);
924 }
925
926 #[test]
927 fn telemetry_snapshot_captures_state() {
928 let mut telemetry = AmacStallTelemetry::new(100_000);
929 for _ in 0..5 {
930 telemetry.record(50_000);
931 }
932 let snap = telemetry.snapshot();
933 assert_eq!(snap.total_calls, 5);
934 assert_eq!(snap.total_stalls, 0);
935 assert_eq!(snap.recent_window_size, 5);
936 }
937
938 #[test]
941 fn plan_empty_batch_returns_empty_plan() {
942 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
943 let plan = executor.plan_batch(Vec::new());
944 assert_eq!(plan.total_requests, 0);
945 assert!(plan.groups.is_empty());
946 assert!(plan.decisions.is_empty());
947 }
948
949 #[test]
950 fn plan_disabled_executor_returns_empty_groups() {
951 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(false, 4, 16));
952 let requests = vec![tool_request(), tool_request()];
953 let plan = executor.plan_batch(requests);
954 assert_eq!(plan.total_requests, 2);
955 assert!(plan.groups.is_empty());
956 }
957
958 #[test]
959 fn plan_groups_requests_by_kind() {
960 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
961 let requests = vec![
962 session_read_request(),
963 tool_request(),
964 session_read_request(),
965 http_request(),
966 tool_request(),
967 ];
968 let plan = executor.plan_batch(requests);
969 assert_eq!(plan.total_requests, 5);
970 assert_eq!(plan.groups.len(), 5);
972 assert_eq!(plan.groups[0].key, AmacGroupKey::SessionRead);
973 assert_eq!(plan.groups[1].key, AmacGroupKey::Tool);
974 assert_eq!(plan.groups[2].key, AmacGroupKey::SessionRead);
975 assert_eq!(plan.groups[3].key, AmacGroupKey::Http);
976 assert_eq!(plan.groups[4].key, AmacGroupKey::Tool);
977 }
978
979 #[test]
980 fn plan_preserves_intra_group_order() {
981 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
982 let req1 = session_read_request();
983 let req2 = session_read_request();
984 let id1 = req1.call_id.clone();
985 let id2 = req2.call_id.clone();
986
987 let requests = vec![req1, req2];
988 let plan = executor.plan_batch(requests);
989 assert_eq!(plan.groups.len(), 1);
990 assert_eq!(plan.groups[0].requests[0].call_id, id1);
991 assert_eq!(plan.groups[0].requests[1].call_id, id2);
992 }
993
994 #[test]
995 fn plan_sequential_for_small_groups_without_telemetry() {
996 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
997 let requests = vec![tool_request(), tool_request()]; let plan = executor.plan_batch(requests);
999 assert!(plan.decisions.iter().all(|d| !d.is_interleave()));
1000 }
1001
1002 #[test]
1003 fn plan_sequential_for_write_groups() {
1004 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1005
1006 for _ in 0..100 {
1008 executor.observe_call(500_000);
1009 }
1010
1011 let requests = vec![
1012 session_write_request(),
1013 session_write_request(),
1014 session_write_request(),
1015 session_write_request(),
1016 ];
1017 let plan = executor.plan_batch(requests);
1018 assert_eq!(plan.groups.len(), 1);
1019 assert!(
1020 plan.decisions[0]
1021 == AmacToggleDecision::Sequential {
1022 reason: "ordering_dependency"
1023 }
1024 );
1025 }
1026
1027 #[test]
1028 fn plan_interleave_with_high_stall_ratio_and_sufficient_batch() {
1029 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1030
1031 for _ in 0..100 {
1033 executor.observe_call(500_000);
1034 }
1035
1036 let requests: Vec<HostcallRequest> = (0..8).map(|_| http_request()).collect();
1037 let plan = executor.plan_batch(requests);
1038 assert_eq!(plan.groups.len(), 1);
1039 assert!(plan.decisions[0].is_interleave());
1040 if let AmacToggleDecision::Interleave { width } = plan.decisions[0] {
1041 assert!(width >= 2);
1042 assert!(width <= 16);
1043 }
1044 }
1045
1046 #[test]
1047 fn plan_sequential_with_low_stall_ratio() {
1048 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1049
1050 for _ in 0..100 {
1052 executor.observe_call(10_000);
1053 }
1054
1055 let requests: Vec<HostcallRequest> = (0..8).map(|_| http_request()).collect();
1056 let plan = executor.plan_batch(requests);
1057 assert_eq!(plan.groups.len(), 1);
1058 assert!(!plan.decisions[0].is_interleave());
1059 }
1060
1061 #[test]
1064 fn toggle_interleave_width_scales_with_stall_severity() {
1065 let width_low = compute_interleave_width(300, 90, 16, 16);
1067 let width_high = compute_interleave_width(700, 90, 16, 16);
1068 assert!(
1069 width_high >= width_low,
1070 "higher stall ratio should give wider interleave: low={width_low}, high={width_high}"
1071 );
1072 }
1073
1074 #[test]
1075 fn toggle_width_capped_by_group_size() {
1076 let width = compute_interleave_width(800, 90, 3, 16);
1077 assert!(width <= 3);
1078 }
1079
1080 #[test]
1081 fn toggle_width_capped_by_max_width() {
1082 let width = compute_interleave_width(800, 90, 100, 8);
1083 assert!(width <= 8);
1084 }
1085
1086 #[test]
1087 fn toggle_width_minimum_is_two() {
1088 let width = compute_interleave_width(201, 5, 100, 16);
1089 assert!(width >= 2);
1090 }
1091
1092 #[test]
1095 fn session_read_ops_classified_correctly() {
1096 assert!(is_session_read_op("get_state"));
1097 assert!(is_session_read_op("getState"));
1098 assert!(is_session_read_op("get_messages"));
1099 assert!(is_session_read_op("getMessages"));
1100 assert!(is_session_read_op("get_entries"));
1101 assert!(is_session_read_op("getEntries"));
1102 }
1103
1104 #[test]
1105 fn session_write_ops_classified_correctly() {
1106 assert!(!is_session_read_op("set_model"));
1107 assert!(!is_session_read_op("setModel"));
1108 assert!(!is_session_read_op("set_name"));
1109 assert!(!is_session_read_op("add_label"));
1110 }
1111
1112 #[test]
1113 fn event_read_ops_classified_correctly() {
1114 assert!(is_event_read_op("get_active_tools"));
1115 assert!(is_event_read_op("getActiveTools"));
1116 assert!(is_event_read_op("get_all_tools"));
1117 assert!(is_event_read_op("get_model"));
1118 assert!(is_event_read_op("get_flag"));
1119 assert!(is_event_read_op("list_flags"));
1120 }
1121
1122 #[test]
1123 fn event_write_ops_classified_correctly() {
1124 assert!(!is_event_read_op("set_active_tools"));
1125 assert!(!is_event_read_op("set_model"));
1126 assert!(!is_event_read_op("register_command"));
1127 assert!(!is_event_read_op("register_provider"));
1128 }
1129
1130 #[test]
1133 fn telemetry_snapshot_serializes_deterministically() {
1134 let mut telemetry = AmacStallTelemetry::new(100_000);
1135 for i in 0..10 {
1136 telemetry.record(i * 10_000);
1137 }
1138 let snap = telemetry.snapshot();
1139 let json = serde_json::to_string(&snap).expect("serialize snapshot");
1140 let deserialized: AmacStallTelemetrySnapshot =
1141 serde_json::from_str(&json).expect("deserialize snapshot");
1142 assert_eq!(deserialized.total_calls, snap.total_calls);
1143 assert_eq!(deserialized.total_stalls, snap.total_stalls);
1144 assert_eq!(deserialized.toggle_decisions, snap.toggle_decisions);
1145 }
1146
1147 #[test]
1148 fn group_key_serializes_round_trip() {
1149 let keys = vec![
1150 AmacGroupKey::SessionRead,
1151 AmacGroupKey::SessionWrite,
1152 AmacGroupKey::EventRead,
1153 AmacGroupKey::EventWrite,
1154 AmacGroupKey::Tool,
1155 AmacGroupKey::Exec,
1156 AmacGroupKey::Http,
1157 AmacGroupKey::Ui,
1158 AmacGroupKey::Log,
1159 ];
1160 for key in keys {
1161 let json = serde_json::to_string(&key).expect("serialize key");
1162 let deserialized: AmacGroupKey = serde_json::from_str(&json).expect("deserialize key");
1163 assert_eq!(deserialized, key);
1164 }
1165 }
1166
1167 #[test]
1168 fn toggle_decision_serializes_round_trip() {
1169 let interleave = AmacToggleDecision::Interleave { width: 8 };
1170 let json = serde_json::to_string(&interleave).expect("serialize");
1171 let json: &'static str = Box::leak(json.into_boxed_str());
1172 let deserialized: AmacToggleDecision = serde_json::from_str(json).expect("deserialize");
1173 assert_eq!(deserialized, interleave);
1174
1175 let sequential = AmacToggleDecision::Sequential {
1176 reason: "batch_too_small",
1177 };
1178 let json = serde_json::to_string(&sequential).expect("serialize");
1179 let json: &'static str = Box::leak(json.into_boxed_str());
1180 let deserialized: AmacToggleDecision = serde_json::from_str(json).expect("deserialize");
1181 assert_eq!(deserialized, sequential);
1182 }
1183
1184 #[test]
1187 fn mixed_batch_groups_independently() {
1188 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1189 let requests = vec![
1190 session_read_request(),
1191 tool_request(),
1192 http_request(),
1193 session_write_request(),
1194 log_request(),
1195 event_read_request(),
1196 session_read_request(),
1197 tool_request(),
1198 ];
1199 let plan = executor.plan_batch(requests);
1200 assert_eq!(plan.total_requests, 8);
1201 assert_eq!(plan.groups.len(), 8);
1204 assert_eq!(plan.groups[0].key, AmacGroupKey::SessionRead);
1205 assert_eq!(plan.groups[1].key, AmacGroupKey::Tool);
1206 assert_eq!(plan.groups[2].key, AmacGroupKey::Http);
1207 assert_eq!(plan.groups[3].key, AmacGroupKey::SessionWrite);
1208 assert_eq!(plan.groups[4].key, AmacGroupKey::Log);
1209 assert_eq!(plan.groups[5].key, AmacGroupKey::EventRead);
1210 assert_eq!(plan.groups[6].key, AmacGroupKey::SessionRead);
1211 assert_eq!(plan.groups[7].key, AmacGroupKey::Tool);
1212 }
1213
1214 #[test]
1215 fn executor_tracks_toggle_decision_counts() {
1216 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1217
1218 for _ in 0..100 {
1220 executor.observe_call(500_000);
1221 }
1222
1223 let requests: Vec<HostcallRequest> = (0..6).map(|_| http_request()).collect();
1224 let plan = executor.plan_batch(requests);
1225
1226 let snap = executor.telemetry().snapshot();
1227 assert_eq!(snap.toggle_decisions, plan.groups.len() as u64);
1228 assert!(snap.interleave_selections > 0);
1229 }
1230
1231 #[test]
1232 fn single_request_batch_always_sequential() {
1233 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1234
1235 for _ in 0..100 {
1237 executor.observe_call(500_000);
1238 }
1239
1240 let requests = vec![http_request()];
1241 let plan = executor.plan_batch(requests);
1242 assert_eq!(plan.groups.len(), 1);
1243 assert!(plan.decisions.iter().all(|d| !d.is_interleave()));
1246 }
1247
1248 #[test]
1251 fn executor_clone_preserves_telemetry_state() {
1252 let mut original = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1253 for _ in 0..50 {
1254 original.observe_call(200_000);
1255 }
1256 let snap_before = original.telemetry().snapshot();
1257 assert_eq!(snap_before.total_calls, 50);
1258
1259 let cloned = original.clone();
1260 let snap_cloned = cloned.telemetry().snapshot();
1261 assert_eq!(snap_cloned.total_calls, snap_before.total_calls);
1262 assert_eq!(snap_cloned.total_stalls, snap_before.total_stalls);
1263 assert_eq!(snap_cloned.stall_ratio, snap_before.stall_ratio);
1264 }
1265
1266 #[test]
1267 fn executor_clone_is_independent() {
1268 let mut original = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1269 for _ in 0..10 {
1270 original.observe_call(50_000);
1271 }
1272
1273 let mut cloned = original.clone();
1274 for _ in 0..100 {
1276 cloned.observe_call(500_000);
1277 }
1278
1279 assert_eq!(original.telemetry().snapshot().total_calls, 10);
1281 assert_eq!(cloned.telemetry().snapshot().total_calls, 110);
1282 }
1283
1284 #[test]
1287 fn config_new_matches_parameters() {
1288 let config = AmacBatchExecutorConfig::new(false, 8, 32);
1289 assert!(!config.enabled);
1290 assert_eq!(config.min_batch_size, 8);
1291 assert_eq!(config.max_interleave_width, 32);
1292 assert_eq!(config.stall_threshold_ns, AMAC_STALL_THRESHOLD_NS);
1293 assert_eq!(config.stall_ratio_threshold, AMAC_STALL_RATIO_THRESHOLD);
1294 }
1295
1296 #[test]
1297 fn default_executor_is_enabled() {
1298 let executor = AmacBatchExecutor::default();
1300 assert!(executor.enabled());
1301 }
1302
1303 #[test]
1304 fn config_with_thresholds_applies_clamps() {
1305 let config = AmacBatchExecutorConfig::new(true, 4, 16).with_thresholds(0, 9_999);
1306 assert_eq!(config.stall_threshold_ns, 1);
1307 assert_eq!(config.stall_ratio_threshold, 1_000);
1308 }
1309
1310 #[test]
1313 fn batch_telemetry_serializes() {
1314 let telem = AmacBatchTelemetry {
1315 total_requests: 10,
1316 groups_dispatched: 3,
1317 interleaved_groups: 1,
1318 sequential_groups: 2,
1319 total_elapsed_ns: 5_000_000,
1320 };
1321 let json = serde_json::to_string(&telem).expect("serialize");
1322 let deser: AmacBatchTelemetry = serde_json::from_str(&json).expect("deserialize");
1323 assert_eq!(deser.total_requests, 10);
1324 assert_eq!(deser.interleaved_groups, 1);
1325 }
1326
1327 mod proptest_amac {
1330 use super::*;
1331 use proptest::prelude::*;
1332
1333 proptest! {
1334 #[test]
1335 fn stall_ratio_bounded_0_to_1000(
1336 observations in prop::collection::vec(0..1_000_000u64, 1..100),
1337 ) {
1338 let mut telemetry = AmacStallTelemetry::new(100_000);
1339 for elapsed in &observations {
1340 telemetry.record(*elapsed);
1341 }
1342 let ratio = telemetry.stall_ratio();
1343 assert!(ratio <= 1_000, "stall_ratio was {ratio}, expected <= 1000");
1344 }
1345
1346 #[test]
1347 fn total_stalls_never_exceeds_total_calls(
1348 observations in prop::collection::vec(0..1_000_000u64, 1..100),
1349 ) {
1350 let mut telemetry = AmacStallTelemetry::new(100_000);
1351 for elapsed in &observations {
1352 telemetry.record(*elapsed);
1353 }
1354 assert!(
1355 telemetry.total_stalls <= telemetry.total_calls,
1356 "stalls {} > calls {}",
1357 telemetry.total_stalls,
1358 telemetry.total_calls,
1359 );
1360 }
1361
1362 #[test]
1363 fn total_calls_matches_observation_count(
1364 observations in prop::collection::vec(0..1_000_000u64, 1..100),
1365 ) {
1366 let mut telemetry = AmacStallTelemetry::new(100_000);
1367 for elapsed in &observations {
1368 telemetry.record(*elapsed);
1369 }
1370 assert_eq!(
1371 telemetry.total_calls,
1372 observations.len() as u64,
1373 );
1374 }
1375
1376 #[test]
1377 fn recent_window_never_exceeds_capacity(
1378 observations in prop::collection::vec(0..1_000_000u64, 1..200),
1379 ) {
1380 let mut telemetry = AmacStallTelemetry::new(100_000);
1381 for elapsed in &observations {
1382 telemetry.record(*elapsed);
1383 }
1384 let snap = telemetry.snapshot();
1385 assert!(
1386 snap.recent_window_size <= TELEMETRY_WINDOW_SIZE,
1387 "window {} > capacity {}",
1388 snap.recent_window_size,
1389 TELEMETRY_WINDOW_SIZE,
1390 );
1391 }
1392
1393 #[test]
1394 fn interleave_width_bounded(
1395 stall_ratio in 0..2000u64,
1396 memory_weight in 0..100u32,
1397 group_size in 2..100usize,
1398 max_width in 2..32usize,
1399 ) {
1400 let width = compute_interleave_width(
1401 stall_ratio,
1402 memory_weight,
1403 group_size,
1404 max_width,
1405 );
1406 assert!(width >= 2, "width must be >= 2, got {width}");
1407 assert!(width <= max_width, "width {width} > max_width {max_width}");
1408 assert!(width <= group_size, "width {width} > group_size {group_size}");
1409 }
1410
1411 #[test]
1412 fn interleave_width_monotone_in_stall_ratio(
1413 base_ratio in 200..600u64,
1414 delta in 1..400u64,
1415 memory_weight in 1..100u32,
1416 group_size in 4..50usize,
1417 max_width in 4..32usize,
1418 ) {
1419 let low = compute_interleave_width(
1420 base_ratio,
1421 memory_weight,
1422 group_size,
1423 max_width,
1424 );
1425 let high = compute_interleave_width(
1426 base_ratio + delta,
1427 memory_weight,
1428 group_size,
1429 max_width,
1430 );
1431 assert!(
1432 high >= low,
1433 "higher stall ratio should give >= width: low={low} (ratio={base_ratio}), high={high} (ratio={})",
1434 base_ratio + delta,
1435 );
1436 }
1437
1438 #[test]
1439 fn group_key_interleave_safe_stable(
1440 idx in 0..9usize,
1441 ) {
1442 let keys = [
1443 AmacGroupKey::SessionRead,
1444 AmacGroupKey::SessionWrite,
1445 AmacGroupKey::EventRead,
1446 AmacGroupKey::EventWrite,
1447 AmacGroupKey::Tool,
1448 AmacGroupKey::Exec,
1449 AmacGroupKey::Http,
1450 AmacGroupKey::Ui,
1451 AmacGroupKey::Log,
1452 ];
1453 let key = &keys[idx];
1454 let s1 = key.interleave_safe();
1455 let s2 = key.interleave_safe();
1456 assert_eq!(s1, s2, "interleave_safe must be deterministic");
1457 }
1458 }
1459 }
1460}