1use crate::extensions_js::HostcallKind;
15use crate::extensions_js::HostcallRequest;
16use crate::scheduler::HostcallOutcome;
17use serde::{Deserialize, Serialize};
18
19const AMAC_MIN_BATCH_SIZE: usize = 4;
24
25const AMAC_MAX_INTERLEAVE_WIDTH: usize = 16;
27
28const AMAC_STALL_THRESHOLD_NS: u64 = 100_000; const EMA_ALPHA: u64 = 51;
35const EMA_SCALE: u64 = 256;
36
37const AMAC_STALL_RATIO_THRESHOLD: u64 = 200;
40
41const AMAC_STALL_RATIO_SATURATED: u64 = 800;
44
45const TELEMETRY_WINDOW_SIZE: usize = 64;
47
48#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
52pub enum AmacGroupKey {
53 SessionRead,
55 SessionWrite,
57 EventRead,
59 EventWrite,
61 Tool,
63 Exec,
65 Http,
67 Ui,
69 Log,
71}
72
73impl AmacGroupKey {
74 #[must_use]
76 pub fn from_request(request: &HostcallRequest) -> Self {
77 match &request.kind {
78 HostcallKind::Session { op } => {
79 if is_session_read_op(op) {
80 Self::SessionRead
81 } else {
82 Self::SessionWrite
83 }
84 }
85 HostcallKind::Events { op } => {
86 if is_event_read_op(op) {
87 Self::EventRead
88 } else {
89 Self::EventWrite
90 }
91 }
92 HostcallKind::Tool { .. } => Self::Tool,
93 HostcallKind::Exec { .. } => Self::Exec,
94 HostcallKind::Http => Self::Http,
95 HostcallKind::Ui { .. } => Self::Ui,
96 HostcallKind::Log => Self::Log,
97 }
98 }
99
100 #[must_use]
103 pub const fn interleave_safe(&self) -> bool {
104 matches!(
105 self,
106 Self::SessionRead | Self::EventRead | Self::Tool | Self::Http | Self::Log
107 )
108 }
109
110 #[must_use]
113 pub const fn memory_weight(&self) -> u32 {
114 match self {
115 Self::Http => 90, Self::Tool | Self::Exec => 70, Self::SessionRead => 50, Self::EventRead => 40, Self::SessionWrite => 30,
120 Self::EventWrite => 20,
121 Self::Ui => 10,
122 Self::Log => 5,
123 }
124 }
125}
126
127#[derive(Debug)]
130pub struct AmacBatchGroup {
131 pub key: AmacGroupKey,
133 pub requests: Vec<HostcallRequest>,
135}
136
137impl AmacBatchGroup {
138 #[must_use]
139 pub fn len(&self) -> usize {
140 self.requests.len()
141 }
142
143 #[must_use]
144 pub fn is_empty(&self) -> bool {
145 self.requests.is_empty()
146 }
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
151pub enum AmacToggleDecision {
152 Interleave {
154 width: usize,
156 },
157 Sequential {
159 reason: &'static str,
161 },
162}
163
164impl AmacToggleDecision {
165 #[must_use]
166 pub const fn is_interleave(&self) -> bool {
167 matches!(self, Self::Interleave { .. })
168 }
169}
170
171#[derive(Debug, Clone, Copy)]
173struct TimingSample {
174 elapsed_ns: u64,
176 stalled: bool,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct AmacStallTelemetry {
183 ema_elapsed_scaled: u64,
185 ema_stall_ratio_scaled: u64,
187 total_calls: u64,
189 total_stalls: u64,
191 #[serde(skip)]
193 recent_samples: Vec<TimingSample>,
194 stall_threshold_ns: u64,
196 pub toggle_decisions: u64,
198 pub interleave_selections: u64,
200}
201
202impl Default for AmacStallTelemetry {
203 fn default() -> Self {
204 Self::new(AMAC_STALL_THRESHOLD_NS)
205 }
206}
207
208impl AmacStallTelemetry {
209 #[must_use]
210 pub fn new(stall_threshold_ns: u64) -> Self {
211 Self {
212 ema_elapsed_scaled: 0,
213 ema_stall_ratio_scaled: 0,
214 total_calls: 0,
215 total_stalls: 0,
216 recent_samples: Vec::with_capacity(TELEMETRY_WINDOW_SIZE),
217 stall_threshold_ns,
218 toggle_decisions: 0,
219 interleave_selections: 0,
220 }
221 }
222
223 pub fn record(&mut self, elapsed_ns: u64) {
225 let stalled = elapsed_ns > self.stall_threshold_ns;
226 self.total_calls = self.total_calls.saturating_add(1);
227 if stalled {
228 self.total_stalls = self.total_stalls.saturating_add(1);
229 }
230
231 let scaled_elapsed = elapsed_ns.saturating_mul(EMA_SCALE);
233 self.ema_elapsed_scaled = if self.total_calls == 1 {
234 scaled_elapsed
235 } else {
236 let alpha_new = scaled_elapsed.saturating_mul(EMA_ALPHA) / EMA_SCALE;
238 let alpha_old = self
239 .ema_elapsed_scaled
240 .saturating_mul(EMA_SCALE.saturating_sub(EMA_ALPHA))
241 / EMA_SCALE;
242 alpha_new.saturating_add(alpha_old)
243 };
244
245 let stall_point = if stalled { 1000 * EMA_SCALE } else { 0 };
247 self.ema_stall_ratio_scaled = if self.total_calls == 1 {
248 stall_point
249 } else {
250 let alpha_new = stall_point.saturating_mul(EMA_ALPHA) / EMA_SCALE;
251 let alpha_old = self
252 .ema_stall_ratio_scaled
253 .saturating_mul(EMA_SCALE.saturating_sub(EMA_ALPHA))
254 / EMA_SCALE;
255 alpha_new.saturating_add(alpha_old)
256 };
257
258 let sample = TimingSample {
260 elapsed_ns,
261 stalled,
262 };
263 if self.recent_samples.len() >= TELEMETRY_WINDOW_SIZE {
264 self.recent_samples.remove(0);
265 }
266 self.recent_samples.push(sample);
267 }
268
269 #[must_use]
271 pub fn stall_ratio(&self) -> u64 {
272 self.ema_stall_ratio_scaled / EMA_SCALE.max(1)
273 }
274
275 #[must_use]
277 pub fn avg_elapsed_ns(&self) -> u64 {
278 self.ema_elapsed_scaled / EMA_SCALE.max(1)
279 }
280
281 #[must_use]
283 pub fn recent_variance(&self) -> u64 {
284 if self.recent_samples.len() < 2 {
285 return 0;
286 }
287 let n = self.recent_samples.len() as u64;
288 let sum: u64 = self
289 .recent_samples
290 .iter()
291 .map(|sample| sample.elapsed_ns)
292 .sum();
293 let mean = sum / n;
294 let variance: u64 = self
295 .recent_samples
296 .iter()
297 .map(|sample| {
298 let diff = sample.elapsed_ns.abs_diff(mean);
299 diff.saturating_mul(diff)
300 })
301 .sum::<u64>()
302 / n;
303 variance
304 }
305
306 #[must_use]
308 pub fn snapshot(&self) -> AmacStallTelemetrySnapshot {
309 AmacStallTelemetrySnapshot {
310 stall_ratio: self.stall_ratio(),
311 avg_elapsed_ns: self.avg_elapsed_ns(),
312 recent_variance: self.recent_variance(),
313 total_calls: self.total_calls,
314 total_stalls: self.total_stalls,
315 stall_threshold_ns: self.stall_threshold_ns,
316 toggle_decisions: self.toggle_decisions,
317 interleave_selections: self.interleave_selections,
318 recent_window_size: self.recent_samples.len(),
319 }
320 }
321}
322
323#[derive(Debug, Clone, Serialize, Deserialize)]
325pub struct AmacStallTelemetrySnapshot {
326 pub stall_ratio: u64,
327 pub avg_elapsed_ns: u64,
328 pub recent_variance: u64,
329 pub total_calls: u64,
330 pub total_stalls: u64,
331 pub stall_threshold_ns: u64,
332 pub toggle_decisions: u64,
333 pub interleave_selections: u64,
334 pub recent_window_size: usize,
335}
336
337#[derive(Debug)]
341pub struct AmacBatchPlan {
342 pub groups: Vec<AmacBatchGroup>,
344 pub decisions: Vec<AmacToggleDecision>,
346 pub total_requests: usize,
348 pub interleaved_groups: usize,
350 pub sequential_groups: usize,
352}
353
354#[derive(Debug)]
356pub struct AmacBatchResult {
357 pub completions: Vec<(String, HostcallOutcome)>,
360 pub batch_telemetry: AmacBatchTelemetry,
362}
363
364#[derive(Debug, Clone, Serialize, Deserialize)]
366pub struct AmacBatchTelemetry {
367 pub total_requests: usize,
368 pub groups_dispatched: usize,
369 pub interleaved_groups: usize,
370 pub sequential_groups: usize,
371 pub total_elapsed_ns: u64,
372}
373
374#[derive(Debug, Clone)]
378pub struct AmacBatchExecutorConfig {
379 pub min_batch_size: usize,
381 pub max_interleave_width: usize,
383 pub enabled: bool,
385 pub stall_threshold_ns: u64,
387 pub stall_ratio_threshold: u64,
389}
390
391impl Default for AmacBatchExecutorConfig {
392 fn default() -> Self {
393 Self::from_env()
394 }
395}
396
397impl AmacBatchExecutorConfig {
398 #[must_use]
399 pub fn from_env() -> Self {
400 let enabled = std::env::var("PI_HOSTCALL_AMAC")
401 .ok()
402 .as_deref()
403 .is_none_or(|value| {
404 !matches!(
405 value.trim().to_ascii_lowercase().as_str(),
406 "0" | "false" | "off" | "disabled"
407 )
408 });
409 let min_batch_size = std::env::var("PI_HOSTCALL_AMAC_MIN_BATCH")
410 .ok()
411 .and_then(|raw| raw.trim().parse::<usize>().ok())
412 .unwrap_or(AMAC_MIN_BATCH_SIZE)
413 .max(2);
414 let max_interleave_width = std::env::var("PI_HOSTCALL_AMAC_MAX_WIDTH")
415 .ok()
416 .and_then(|raw| raw.trim().parse::<usize>().ok())
417 .unwrap_or(AMAC_MAX_INTERLEAVE_WIDTH)
418 .max(2);
419 let stall_threshold_ns = std::env::var("PI_HOSTCALL_AMAC_STALL_THRESHOLD_NS")
420 .ok()
421 .and_then(|raw| raw.trim().parse::<u64>().ok())
422 .unwrap_or(AMAC_STALL_THRESHOLD_NS)
423 .max(1);
424 let stall_ratio_threshold = std::env::var("PI_HOSTCALL_AMAC_STALL_RATIO_THRESHOLD")
425 .ok()
426 .and_then(|raw| raw.trim().parse::<u64>().ok())
427 .unwrap_or(AMAC_STALL_RATIO_THRESHOLD)
428 .clamp(1, 1_000);
429 Self {
430 min_batch_size,
431 max_interleave_width,
432 enabled,
433 stall_threshold_ns,
434 stall_ratio_threshold,
435 }
436 }
437
438 #[must_use]
439 pub const fn new(enabled: bool, min_batch_size: usize, max_interleave_width: usize) -> Self {
440 Self {
441 min_batch_size,
442 max_interleave_width,
443 enabled,
444 stall_threshold_ns: AMAC_STALL_THRESHOLD_NS,
445 stall_ratio_threshold: AMAC_STALL_RATIO_THRESHOLD,
446 }
447 }
448
449 #[must_use]
450 pub fn with_thresholds(mut self, stall_threshold_ns: u64, stall_ratio_threshold: u64) -> Self {
451 self.stall_threshold_ns = stall_threshold_ns.max(1);
452 self.stall_ratio_threshold = stall_ratio_threshold.clamp(1, 1_000);
453 self
454 }
455}
456
457#[derive(Debug, Clone)]
463pub struct AmacBatchExecutor {
464 config: AmacBatchExecutorConfig,
465 telemetry: AmacStallTelemetry,
466}
467
468impl AmacBatchExecutor {
469 #[must_use]
470 pub fn new(config: AmacBatchExecutorConfig) -> Self {
471 Self {
472 telemetry: AmacStallTelemetry::new(config.stall_threshold_ns),
473 config,
474 }
475 }
476
477 #[must_use]
478 pub const fn with_telemetry(
479 config: AmacBatchExecutorConfig,
480 telemetry: AmacStallTelemetry,
481 ) -> Self {
482 Self { config, telemetry }
483 }
484
485 #[must_use]
487 pub const fn telemetry(&self) -> &AmacStallTelemetry {
488 &self.telemetry
489 }
490
491 pub const fn telemetry_mut(&mut self) -> &mut AmacStallTelemetry {
493 &mut self.telemetry
494 }
495
496 #[must_use]
498 pub const fn enabled(&self) -> bool {
499 self.config.enabled
500 }
501
502 #[must_use]
507 #[allow(clippy::too_many_lines)]
508 pub fn plan_batch(&mut self, requests: Vec<HostcallRequest>) -> AmacBatchPlan {
509 let total_requests = requests.len();
510
511 if !self.config.enabled || total_requests == 0 {
512 return AmacBatchPlan {
513 groups: Vec::new(),
514 decisions: Vec::new(),
515 total_requests,
516 interleaved_groups: 0,
517 sequential_groups: 0,
518 };
519 }
520
521 let mut groups = Vec::new();
522 let mut decisions = Vec::new();
523 let mut interleaved_groups = 0_usize;
524 let mut sequential_groups = 0_usize;
525
526 let request_iter = requests.into_iter();
529 let mut buffered_logs = Vec::new();
530
531 let mut current_key_opt: Option<AmacGroupKey> = None;
532 let mut current_requests = Vec::new();
533
534 for request in request_iter {
535 let key = AmacGroupKey::from_request(&request);
536
537 if key == AmacGroupKey::Log {
538 buffered_logs.push(request);
539 continue;
540 }
541
542 let key_changed = current_key_opt
543 .as_ref()
544 .is_none_or(|current| *current != key);
545
546 if key_changed {
547 if let Some(prev_key) = current_key_opt.take() {
549 let decision = self.decide_toggle(&prev_key, current_requests.len());
550 if decision.is_interleave() {
551 interleaved_groups += 1;
552 } else {
553 sequential_groups += 1;
554 }
555 groups.push(AmacBatchGroup {
556 key: prev_key,
557 requests: std::mem::take(&mut current_requests),
558 });
559 decisions.push(decision);
560
561 if !buffered_logs.is_empty() {
563 let log_reqs = std::mem::take(&mut buffered_logs);
564 let decision = self.decide_toggle(&AmacGroupKey::Log, log_reqs.len());
565 if decision.is_interleave() {
566 interleaved_groups += 1;
567 } else {
568 sequential_groups += 1;
569 }
570 groups.push(AmacBatchGroup {
571 key: AmacGroupKey::Log,
572 requests: log_reqs,
573 });
574 decisions.push(decision);
575 }
576 }
577 }
578
579 current_key_opt = Some(key);
581 current_requests.push(request);
582 }
583
584 if let Some(current_key) = current_key_opt {
586 if !current_requests.is_empty() {
587 let decision = self.decide_toggle(¤t_key, current_requests.len());
588 if decision.is_interleave() {
589 interleaved_groups += 1;
590 } else {
591 sequential_groups += 1;
592 }
593 groups.push(AmacBatchGroup {
594 key: current_key,
595 requests: current_requests,
596 });
597 decisions.push(decision);
598 }
599 }
600
601 if !buffered_logs.is_empty() {
603 let decision = self.decide_toggle(&AmacGroupKey::Log, buffered_logs.len());
604 if decision.is_interleave() {
605 interleaved_groups += 1;
606 } else {
607 sequential_groups += 1;
608 }
609 groups.push(AmacBatchGroup {
610 key: AmacGroupKey::Log,
611 requests: buffered_logs,
612 });
613 decisions.push(decision);
614 }
615
616 self.telemetry.toggle_decisions = self
617 .telemetry
618 .toggle_decisions
619 .saturating_add(groups.len() as u64);
620 self.telemetry.interleave_selections = self
621 .telemetry
622 .interleave_selections
623 .saturating_add(interleaved_groups as u64);
624
625 AmacBatchPlan {
626 groups,
627 decisions,
628 total_requests,
629 interleaved_groups,
630 sequential_groups,
631 }
632 }
633
634 fn decide_toggle(&self, key: &AmacGroupKey, group_size: usize) -> AmacToggleDecision {
636 if group_size < self.config.min_batch_size {
638 return AmacToggleDecision::Sequential {
639 reason: "batch_too_small",
640 };
641 }
642
643 if !key.interleave_safe() {
645 return AmacToggleDecision::Sequential {
646 reason: "ordering_dependency",
647 };
648 }
649
650 if self.telemetry.total_calls < TELEMETRY_WINDOW_SIZE as u64 {
652 return AmacToggleDecision::Sequential {
653 reason: "insufficient_telemetry",
654 };
655 }
656
657 let stall_ratio = self.telemetry.stall_ratio();
659 if stall_ratio < self.config.stall_ratio_threshold {
660 return AmacToggleDecision::Sequential {
661 reason: "low_stall_ratio",
662 };
663 }
664
665 let width = compute_interleave_width(
667 stall_ratio,
668 key.memory_weight(),
669 group_size,
670 self.config.max_interleave_width,
671 );
672
673 if width < 2 {
674 return AmacToggleDecision::Sequential {
675 reason: "computed_width_too_low",
676 };
677 }
678
679 AmacToggleDecision::Interleave { width }
680 }
681
682 pub fn observe_call(&mut self, elapsed_ns: u64) {
684 self.telemetry.record(elapsed_ns);
685 }
686}
687
688impl Default for AmacBatchExecutor {
689 fn default() -> Self {
690 Self::new(AmacBatchExecutorConfig::default())
691 }
692}
693
694fn compute_interleave_width(
698 stall_ratio: u64,
699 memory_weight: u32,
700 group_size: usize,
701 max_width: usize,
702) -> usize {
703 let effective_ratio = stall_ratio
706 .saturating_sub(AMAC_STALL_RATIO_THRESHOLD)
707 .min(AMAC_STALL_RATIO_SATURATED - AMAC_STALL_RATIO_THRESHOLD);
708 let ratio_range = AMAC_STALL_RATIO_SATURATED.saturating_sub(AMAC_STALL_RATIO_THRESHOLD);
709
710 if ratio_range == 0 {
712 return 2;
713 }
714
715 let base_width = 2_u64
716 + (effective_ratio * u64::from(memory_weight) * (max_width as u64 - 2))
717 / (ratio_range * 100);
718
719 let width = usize::try_from(base_width).unwrap_or(max_width);
721 width.min(max_width).min(group_size).max(2)
722}
723
724fn is_session_read_op(op: &str) -> bool {
726 let normalized = op.trim().to_ascii_lowercase();
727 let normalized = normalized.replace('_', "");
728 matches!(
729 normalized.as_str(),
730 "getstate"
731 | "getmessages"
732 | "getentries"
733 | "getname"
734 | "getmodel"
735 | "getlabel"
736 | "getlabels"
737 | "getallsessions"
738 )
739}
740
741fn is_event_read_op(op: &str) -> bool {
743 let normalized = op.trim().to_ascii_lowercase();
744 let normalized = normalized.replace('_', "");
745 matches!(
746 normalized.as_str(),
747 "getactivetools"
748 | "getalltools"
749 | "getmodel"
750 | "getthinkinglevel"
751 | "getflag"
752 | "listflags"
753 )
754}
755
756#[cfg(test)]
759mod tests {
760 use super::*;
761 use serde_json::json;
762
763 fn make_request(kind: HostcallKind) -> HostcallRequest {
764 HostcallRequest {
765 call_id: format!("test-{}", rand_id()),
766 kind,
767 payload: json!({}),
768 trace_id: 0,
769 extension_id: None,
770 }
771 }
772
773 fn rand_id() -> u64 {
774 static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
775 COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
776 }
777
778 fn session_read_request() -> HostcallRequest {
779 make_request(HostcallKind::Session {
780 op: "get_state".to_string(),
781 })
782 }
783
784 fn session_write_request() -> HostcallRequest {
785 make_request(HostcallKind::Session {
786 op: "set_model".to_string(),
787 })
788 }
789
790 fn event_read_request() -> HostcallRequest {
791 make_request(HostcallKind::Events {
792 op: "get_model".to_string(),
793 })
794 }
795
796 fn tool_request() -> HostcallRequest {
797 make_request(HostcallKind::Tool {
798 name: "read".to_string(),
799 })
800 }
801
802 fn http_request() -> HostcallRequest {
803 make_request(HostcallKind::Http)
804 }
805
806 fn log_request() -> HostcallRequest {
807 make_request(HostcallKind::Log)
808 }
809
810 #[test]
813 fn group_key_classifies_session_reads_correctly() {
814 let req = session_read_request();
815 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::SessionRead);
816 }
817
818 #[test]
819 fn group_key_classifies_session_writes_correctly() {
820 let req = session_write_request();
821 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::SessionWrite);
822 }
823
824 #[test]
825 fn group_key_classifies_event_reads_correctly() {
826 let req = event_read_request();
827 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::EventRead);
828 }
829
830 #[test]
831 fn group_key_classifies_tools_correctly() {
832 let req = tool_request();
833 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Tool);
834 }
835
836 #[test]
837 fn group_key_classifies_http_correctly() {
838 let req = http_request();
839 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Http);
840 }
841
842 #[test]
843 fn group_key_classifies_log_correctly() {
844 let req = log_request();
845 assert_eq!(AmacGroupKey::from_request(&req), AmacGroupKey::Log);
846 }
847
848 #[test]
849 fn interleave_safe_for_read_and_independent_groups() {
850 assert!(AmacGroupKey::SessionRead.interleave_safe());
851 assert!(AmacGroupKey::EventRead.interleave_safe());
852 assert!(AmacGroupKey::Tool.interleave_safe());
853 assert!(AmacGroupKey::Http.interleave_safe());
854 assert!(AmacGroupKey::Log.interleave_safe());
855 }
856
857 #[test]
858 fn interleave_unsafe_for_write_and_ui_groups() {
859 assert!(!AmacGroupKey::SessionWrite.interleave_safe());
860 assert!(!AmacGroupKey::EventWrite.interleave_safe());
861 assert!(!AmacGroupKey::Ui.interleave_safe());
862 assert!(!AmacGroupKey::Exec.interleave_safe());
863 }
864
865 #[test]
868 fn telemetry_records_and_tracks_stall_ratio() {
869 let mut telemetry = AmacStallTelemetry::new(100_000);
870
871 for _ in 0..10 {
873 telemetry.record(50_000);
874 }
875 assert_eq!(telemetry.total_calls, 10);
876 assert_eq!(telemetry.total_stalls, 0);
877 assert!(telemetry.stall_ratio() < AMAC_STALL_RATIO_THRESHOLD);
878
879 for _ in 0..20 {
881 telemetry.record(200_000);
882 }
883 assert_eq!(telemetry.total_calls, 30);
884 assert_eq!(telemetry.total_stalls, 20);
885 assert!(telemetry.stall_ratio() > 0);
886 }
887
888 #[test]
889 fn telemetry_ema_converges_to_steady_state() {
890 let mut telemetry = AmacStallTelemetry::new(100_000);
891
892 for _ in 0..100 {
894 telemetry.record(10_000);
895 }
896 assert!(telemetry.stall_ratio() < 50, "expected low stall ratio");
897
898 for _ in 0..200 {
900 telemetry.record(500_000);
901 }
902 assert!(
903 telemetry.stall_ratio() > 900,
904 "expected high stall ratio, got {}",
905 telemetry.stall_ratio()
906 );
907 }
908
909 #[test]
910 fn telemetry_sliding_window_bounded() {
911 let mut telemetry = AmacStallTelemetry::new(100_000);
912 for i in 0..200 {
913 telemetry.record(i * 1000);
914 }
915 assert_eq!(telemetry.recent_samples.len(), TELEMETRY_WINDOW_SIZE);
916 }
917
918 #[test]
919 fn telemetry_variance_zero_for_constant_input() {
920 let mut telemetry = AmacStallTelemetry::new(100_000);
921 for _ in 0..10 {
922 telemetry.record(50_000);
923 }
924 assert_eq!(telemetry.recent_variance(), 0);
925 }
926
927 #[test]
928 fn telemetry_snapshot_captures_state() {
929 let mut telemetry = AmacStallTelemetry::new(100_000);
930 for _ in 0..5 {
931 telemetry.record(50_000);
932 }
933 let snap = telemetry.snapshot();
934 assert_eq!(snap.total_calls, 5);
935 assert_eq!(snap.total_stalls, 0);
936 assert_eq!(snap.recent_window_size, 5);
937 }
938
939 #[test]
942 fn plan_empty_batch_returns_empty_plan() {
943 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
944 let plan = executor.plan_batch(Vec::new());
945 assert_eq!(plan.total_requests, 0);
946 assert!(plan.groups.is_empty());
947 assert!(plan.decisions.is_empty());
948 }
949
950 #[test]
951 fn plan_disabled_executor_returns_empty_groups() {
952 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(false, 4, 16));
953 let requests = vec![tool_request(), tool_request()];
954 let plan = executor.plan_batch(requests);
955 assert_eq!(plan.total_requests, 2);
956 assert!(plan.groups.is_empty());
957 }
958
959 #[test]
960 fn plan_groups_requests_by_kind() {
961 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
962 let requests = vec![
963 session_read_request(),
964 tool_request(),
965 session_read_request(),
966 http_request(),
967 tool_request(),
968 ];
969 let plan = executor.plan_batch(requests);
970 assert_eq!(plan.total_requests, 5);
971 assert_eq!(plan.groups.len(), 5);
973 assert_eq!(plan.groups[0].key, AmacGroupKey::SessionRead);
974 assert_eq!(plan.groups[1].key, AmacGroupKey::Tool);
975 assert_eq!(plan.groups[2].key, AmacGroupKey::SessionRead);
976 assert_eq!(plan.groups[3].key, AmacGroupKey::Http);
977 assert_eq!(plan.groups[4].key, AmacGroupKey::Tool);
978 }
979
980 #[test]
981 fn plan_preserves_intra_group_order() {
982 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
983 let req1 = session_read_request();
984 let req2 = session_read_request();
985 let id1 = req1.call_id.clone();
986 let id2 = req2.call_id.clone();
987
988 let requests = vec![req1, req2];
989 let plan = executor.plan_batch(requests);
990 assert_eq!(plan.groups.len(), 1);
991 assert_eq!(plan.groups[0].requests[0].call_id, id1);
992 assert_eq!(plan.groups[0].requests[1].call_id, id2);
993 }
994
995 #[test]
996 fn plan_sequential_for_small_groups_without_telemetry() {
997 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
998 let requests = vec![tool_request(), tool_request()]; let plan = executor.plan_batch(requests);
1000 assert!(plan.decisions.iter().all(|d| !d.is_interleave()));
1001 }
1002
1003 #[test]
1004 fn plan_sequential_for_write_groups() {
1005 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1006
1007 for _ in 0..100 {
1009 executor.observe_call(500_000);
1010 }
1011
1012 let requests = vec![
1013 session_write_request(),
1014 session_write_request(),
1015 session_write_request(),
1016 session_write_request(),
1017 ];
1018 let plan = executor.plan_batch(requests);
1019 assert_eq!(plan.groups.len(), 1);
1020 assert!(
1021 plan.decisions[0]
1022 == AmacToggleDecision::Sequential {
1023 reason: "ordering_dependency"
1024 }
1025 );
1026 }
1027
1028 #[test]
1029 fn plan_interleave_with_high_stall_ratio_and_sufficient_batch() {
1030 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1031
1032 for _ in 0..100 {
1034 executor.observe_call(500_000);
1035 }
1036
1037 let requests: Vec<HostcallRequest> = (0..8).map(|_| http_request()).collect();
1038 let plan = executor.plan_batch(requests);
1039 assert_eq!(plan.groups.len(), 1);
1040 assert!(plan.decisions[0].is_interleave());
1041 if let AmacToggleDecision::Interleave { width } = plan.decisions[0] {
1042 assert!(width >= 2);
1043 assert!(width <= 16);
1044 }
1045 }
1046
1047 #[test]
1048 fn plan_sequential_with_low_stall_ratio() {
1049 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1050
1051 for _ in 0..100 {
1053 executor.observe_call(10_000);
1054 }
1055
1056 let requests: Vec<HostcallRequest> = (0..8).map(|_| http_request()).collect();
1057 let plan = executor.plan_batch(requests);
1058 assert_eq!(plan.groups.len(), 1);
1059 assert!(!plan.decisions[0].is_interleave());
1060 }
1061
1062 #[test]
1065 fn toggle_interleave_width_scales_with_stall_severity() {
1066 let width_low = compute_interleave_width(300, 90, 16, 16);
1068 let width_high = compute_interleave_width(700, 90, 16, 16);
1069 assert!(
1070 width_high >= width_low,
1071 "higher stall ratio should give wider interleave: low={width_low}, high={width_high}"
1072 );
1073 }
1074
1075 #[test]
1076 fn toggle_width_capped_by_group_size() {
1077 let width = compute_interleave_width(800, 90, 3, 16);
1078 assert!(width <= 3);
1079 }
1080
1081 #[test]
1082 fn toggle_width_capped_by_max_width() {
1083 let width = compute_interleave_width(800, 90, 100, 8);
1084 assert!(width <= 8);
1085 }
1086
1087 #[test]
1088 fn toggle_width_minimum_is_two() {
1089 let width = compute_interleave_width(201, 5, 100, 16);
1090 assert!(width >= 2);
1091 }
1092
1093 #[test]
1096 fn session_read_ops_classified_correctly() {
1097 assert!(is_session_read_op("get_state"));
1098 assert!(is_session_read_op("getState"));
1099 assert!(is_session_read_op("get_messages"));
1100 assert!(is_session_read_op("getMessages"));
1101 assert!(is_session_read_op("get_entries"));
1102 assert!(is_session_read_op("getEntries"));
1103 }
1104
1105 #[test]
1106 fn session_write_ops_classified_correctly() {
1107 assert!(!is_session_read_op("set_model"));
1108 assert!(!is_session_read_op("setModel"));
1109 assert!(!is_session_read_op("set_name"));
1110 assert!(!is_session_read_op("add_label"));
1111 }
1112
1113 #[test]
1114 fn event_read_ops_classified_correctly() {
1115 assert!(is_event_read_op("get_active_tools"));
1116 assert!(is_event_read_op("getActiveTools"));
1117 assert!(is_event_read_op("get_all_tools"));
1118 assert!(is_event_read_op("get_model"));
1119 assert!(is_event_read_op("get_flag"));
1120 assert!(is_event_read_op("list_flags"));
1121 }
1122
1123 #[test]
1124 fn event_write_ops_classified_correctly() {
1125 assert!(!is_event_read_op("set_active_tools"));
1126 assert!(!is_event_read_op("set_model"));
1127 assert!(!is_event_read_op("register_command"));
1128 assert!(!is_event_read_op("register_provider"));
1129 }
1130
1131 #[test]
1134 fn telemetry_snapshot_serializes_deterministically() {
1135 let mut telemetry = AmacStallTelemetry::new(100_000);
1136 for i in 0..10 {
1137 telemetry.record(i * 10_000);
1138 }
1139 let snap = telemetry.snapshot();
1140 let json = serde_json::to_string(&snap).expect("serialize snapshot");
1141 let deserialized: AmacStallTelemetrySnapshot =
1142 serde_json::from_str(&json).expect("deserialize snapshot");
1143 assert_eq!(deserialized.total_calls, snap.total_calls);
1144 assert_eq!(deserialized.total_stalls, snap.total_stalls);
1145 assert_eq!(deserialized.toggle_decisions, snap.toggle_decisions);
1146 }
1147
1148 #[test]
1149 fn group_key_serializes_round_trip() {
1150 let keys = vec![
1151 AmacGroupKey::SessionRead,
1152 AmacGroupKey::SessionWrite,
1153 AmacGroupKey::EventRead,
1154 AmacGroupKey::EventWrite,
1155 AmacGroupKey::Tool,
1156 AmacGroupKey::Exec,
1157 AmacGroupKey::Http,
1158 AmacGroupKey::Ui,
1159 AmacGroupKey::Log,
1160 ];
1161 for key in keys {
1162 let json = serde_json::to_string(&key).expect("serialize key");
1163 let deserialized: AmacGroupKey = serde_json::from_str(&json).expect("deserialize key");
1164 assert_eq!(deserialized, key);
1165 }
1166 }
1167
1168 #[test]
1169 fn toggle_decision_serializes_round_trip() {
1170 let interleave = AmacToggleDecision::Interleave { width: 8 };
1171 let json = serde_json::to_string(&interleave).expect("serialize");
1172 let json: &'static str = Box::leak(json.into_boxed_str());
1173 let deserialized: AmacToggleDecision = serde_json::from_str(json).expect("deserialize");
1174 assert_eq!(deserialized, interleave);
1175
1176 let sequential = AmacToggleDecision::Sequential {
1177 reason: "batch_too_small",
1178 };
1179 let json = serde_json::to_string(&sequential).expect("serialize");
1180 let json: &'static str = Box::leak(json.into_boxed_str());
1181 let deserialized: AmacToggleDecision = serde_json::from_str(json).expect("deserialize");
1182 assert_eq!(deserialized, sequential);
1183 }
1184
1185 #[test]
1188 fn mixed_batch_groups_independently() {
1189 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1190 let requests = vec![
1191 session_read_request(),
1192 tool_request(),
1193 http_request(),
1194 session_write_request(),
1195 log_request(),
1196 event_read_request(),
1197 session_read_request(),
1198 tool_request(),
1199 ];
1200 let plan = executor.plan_batch(requests);
1201 assert_eq!(plan.total_requests, 8);
1202 assert_eq!(plan.groups.len(), 8);
1205 assert_eq!(plan.groups[0].key, AmacGroupKey::SessionRead);
1206 assert_eq!(plan.groups[1].key, AmacGroupKey::Tool);
1207 assert_eq!(plan.groups[2].key, AmacGroupKey::Http);
1208 assert_eq!(plan.groups[3].key, AmacGroupKey::SessionWrite);
1209 assert_eq!(plan.groups[4].key, AmacGroupKey::Log);
1210 assert_eq!(plan.groups[5].key, AmacGroupKey::EventRead);
1211 assert_eq!(plan.groups[6].key, AmacGroupKey::SessionRead);
1212 assert_eq!(plan.groups[7].key, AmacGroupKey::Tool);
1213 }
1214
1215 #[test]
1216 fn executor_tracks_toggle_decision_counts() {
1217 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1218
1219 for _ in 0..100 {
1221 executor.observe_call(500_000);
1222 }
1223
1224 let requests: Vec<HostcallRequest> = (0..6).map(|_| http_request()).collect();
1225 let plan = executor.plan_batch(requests);
1226
1227 let snap = executor.telemetry().snapshot();
1228 assert_eq!(snap.toggle_decisions, plan.groups.len() as u64);
1229 assert!(snap.interleave_selections > 0);
1230 }
1231
1232 #[test]
1233 fn single_request_batch_always_sequential() {
1234 let mut executor = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 2, 16));
1235
1236 for _ in 0..100 {
1238 executor.observe_call(500_000);
1239 }
1240
1241 let requests = vec![http_request()];
1242 let plan = executor.plan_batch(requests);
1243 assert_eq!(plan.groups.len(), 1);
1244 assert!(plan.decisions.iter().all(|d| !d.is_interleave()));
1247 }
1248
1249 #[test]
1252 fn executor_clone_preserves_telemetry_state() {
1253 let mut original = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1254 for _ in 0..50 {
1255 original.observe_call(200_000);
1256 }
1257 let snap_before = original.telemetry().snapshot();
1258 assert_eq!(snap_before.total_calls, 50);
1259
1260 let cloned = original.clone();
1261 let snap_cloned = cloned.telemetry().snapshot();
1262 assert_eq!(snap_cloned.total_calls, snap_before.total_calls);
1263 assert_eq!(snap_cloned.total_stalls, snap_before.total_stalls);
1264 assert_eq!(snap_cloned.stall_ratio, snap_before.stall_ratio);
1265 }
1266
1267 #[test]
1268 fn executor_clone_is_independent() {
1269 let mut original = AmacBatchExecutor::new(AmacBatchExecutorConfig::new(true, 4, 16));
1270 for _ in 0..10 {
1271 original.observe_call(50_000);
1272 }
1273
1274 let mut cloned = original.clone();
1275 for _ in 0..100 {
1277 cloned.observe_call(500_000);
1278 }
1279
1280 assert_eq!(original.telemetry().snapshot().total_calls, 10);
1282 assert_eq!(cloned.telemetry().snapshot().total_calls, 110);
1283 }
1284
1285 #[test]
1288 fn config_new_matches_parameters() {
1289 let config = AmacBatchExecutorConfig::new(false, 8, 32);
1290 assert!(!config.enabled);
1291 assert_eq!(config.min_batch_size, 8);
1292 assert_eq!(config.max_interleave_width, 32);
1293 assert_eq!(config.stall_threshold_ns, AMAC_STALL_THRESHOLD_NS);
1294 assert_eq!(config.stall_ratio_threshold, AMAC_STALL_RATIO_THRESHOLD);
1295 }
1296
1297 #[test]
1298 fn default_executor_is_enabled() {
1299 let executor = AmacBatchExecutor::default();
1301 assert!(executor.enabled());
1302 }
1303
1304 #[test]
1305 fn config_with_thresholds_applies_clamps() {
1306 let config = AmacBatchExecutorConfig::new(true, 4, 16).with_thresholds(0, 9_999);
1307 assert_eq!(config.stall_threshold_ns, 1);
1308 assert_eq!(config.stall_ratio_threshold, 1_000);
1309 }
1310
1311 #[test]
1314 fn batch_telemetry_serializes() {
1315 let telem = AmacBatchTelemetry {
1316 total_requests: 10,
1317 groups_dispatched: 3,
1318 interleaved_groups: 1,
1319 sequential_groups: 2,
1320 total_elapsed_ns: 5_000_000,
1321 };
1322 let json = serde_json::to_string(&telem).expect("serialize");
1323 let deser: AmacBatchTelemetry = serde_json::from_str(&json).expect("deserialize");
1324 assert_eq!(deser.total_requests, 10);
1325 assert_eq!(deser.interleaved_groups, 1);
1326 }
1327
1328 mod proptest_amac {
1331 use super::*;
1332 use proptest::prelude::*;
1333
1334 proptest! {
1335 #[test]
1336 fn stall_ratio_bounded_0_to_1000(
1337 observations in prop::collection::vec(0..1_000_000u64, 1..100),
1338 ) {
1339 let mut telemetry = AmacStallTelemetry::new(100_000);
1340 for elapsed in &observations {
1341 telemetry.record(*elapsed);
1342 }
1343 let ratio = telemetry.stall_ratio();
1344 assert!(ratio <= 1_000, "stall_ratio was {ratio}, expected <= 1000");
1345 }
1346
1347 #[test]
1348 fn total_stalls_never_exceeds_total_calls(
1349 observations in prop::collection::vec(0..1_000_000u64, 1..100),
1350 ) {
1351 let mut telemetry = AmacStallTelemetry::new(100_000);
1352 for elapsed in &observations {
1353 telemetry.record(*elapsed);
1354 }
1355 assert!(
1356 telemetry.total_stalls <= telemetry.total_calls,
1357 "stalls {} > calls {}",
1358 telemetry.total_stalls,
1359 telemetry.total_calls,
1360 );
1361 }
1362
1363 #[test]
1364 fn total_calls_matches_observation_count(
1365 observations in prop::collection::vec(0..1_000_000u64, 1..100),
1366 ) {
1367 let mut telemetry = AmacStallTelemetry::new(100_000);
1368 for elapsed in &observations {
1369 telemetry.record(*elapsed);
1370 }
1371 assert_eq!(
1372 telemetry.total_calls,
1373 observations.len() as u64,
1374 );
1375 }
1376
1377 #[test]
1378 fn recent_window_never_exceeds_capacity(
1379 observations in prop::collection::vec(0..1_000_000u64, 1..200),
1380 ) {
1381 let mut telemetry = AmacStallTelemetry::new(100_000);
1382 for elapsed in &observations {
1383 telemetry.record(*elapsed);
1384 }
1385 let snap = telemetry.snapshot();
1386 assert!(
1387 snap.recent_window_size <= TELEMETRY_WINDOW_SIZE,
1388 "window {} > capacity {}",
1389 snap.recent_window_size,
1390 TELEMETRY_WINDOW_SIZE,
1391 );
1392 }
1393
1394 #[test]
1395 fn interleave_width_bounded(
1396 stall_ratio in 0..2000u64,
1397 memory_weight in 0..100u32,
1398 group_size in 2..100usize,
1399 max_width in 2..32usize,
1400 ) {
1401 let width = compute_interleave_width(
1402 stall_ratio,
1403 memory_weight,
1404 group_size,
1405 max_width,
1406 );
1407 assert!(width >= 2, "width must be >= 2, got {width}");
1408 assert!(width <= max_width, "width {width} > max_width {max_width}");
1409 assert!(width <= group_size, "width {width} > group_size {group_size}");
1410 }
1411
1412 #[test]
1413 fn interleave_width_monotone_in_stall_ratio(
1414 base_ratio in 200..600u64,
1415 delta in 1..400u64,
1416 memory_weight in 1..100u32,
1417 group_size in 4..50usize,
1418 max_width in 4..32usize,
1419 ) {
1420 let low = compute_interleave_width(
1421 base_ratio,
1422 memory_weight,
1423 group_size,
1424 max_width,
1425 );
1426 let high = compute_interleave_width(
1427 base_ratio + delta,
1428 memory_weight,
1429 group_size,
1430 max_width,
1431 );
1432 assert!(
1433 high >= low,
1434 "higher stall ratio should give >= width: low={low} (ratio={base_ratio}), high={high} (ratio={})",
1435 base_ratio + delta,
1436 );
1437 }
1438
1439 #[test]
1440 fn group_key_interleave_safe_stable(
1441 idx in 0..9usize,
1442 ) {
1443 let keys = [
1444 AmacGroupKey::SessionRead,
1445 AmacGroupKey::SessionWrite,
1446 AmacGroupKey::EventRead,
1447 AmacGroupKey::EventWrite,
1448 AmacGroupKey::Tool,
1449 AmacGroupKey::Exec,
1450 AmacGroupKey::Http,
1451 AmacGroupKey::Ui,
1452 AmacGroupKey::Log,
1453 ];
1454 let key = &keys[idx];
1455 let s1 = key.interleave_safe();
1456 let s2 = key.interleave_safe();
1457 assert_eq!(s1, s2, "interleave_safe must be deterministic");
1458 }
1459 }
1460 }
1461}