1use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::collections::BinaryHeap;
20use std::collections::VecDeque;
21use std::fmt;
22use std::sync::Arc;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub struct Seq(u64);
27
28impl Seq {
29 #[must_use]
31 pub const fn zero() -> Self {
32 Self(0)
33 }
34
35 #[must_use]
37 pub const fn next(self) -> Self {
38 Self(self.0.saturating_add(1))
39 }
40
41 #[must_use]
43 pub const fn value(self) -> u64 {
44 self.0
45 }
46}
47
48impl fmt::Display for Seq {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 write!(f, "seq:{}", self.0)
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct TimerEntry {
57 pub timer_id: u64,
59 pub deadline_ms: u64,
61 pub seq: Seq,
63}
64
65impl TimerEntry {
66 #[must_use]
68 pub const fn new(timer_id: u64, deadline_ms: u64, seq: Seq) -> Self {
69 Self {
70 timer_id,
71 deadline_ms,
72 seq,
73 }
74 }
75}
76
77impl PartialEq for TimerEntry {
79 fn eq(&self, other: &Self) -> bool {
80 self.deadline_ms == other.deadline_ms && self.seq == other.seq
81 }
82}
83
84impl Eq for TimerEntry {}
85
86impl PartialOrd for TimerEntry {
87 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88 Some(self.cmp(other))
89 }
90}
91
92impl Ord for TimerEntry {
93 fn cmp(&self, other: &Self) -> Ordering {
94 match other.deadline_ms.cmp(&self.deadline_ms) {
96 Ordering::Equal => other.seq.cmp(&self.seq),
97 ord => ord,
98 }
99 }
100}
101
102#[derive(Debug, Clone)]
104pub enum MacrotaskKind {
105 TimerFired { timer_id: u64 },
107 HostcallComplete {
109 call_id: String,
110 outcome: HostcallOutcome,
111 },
112 InboundEvent {
114 event_id: String,
115 payload: serde_json::Value,
116 },
117}
118
119#[derive(Debug, Clone)]
121pub enum HostcallOutcome {
122 Success(serde_json::Value),
124 Error { code: String, message: String },
126 StreamChunk {
128 sequence: u64,
130 chunk: serde_json::Value,
132 is_final: bool,
134 },
135}
136
137#[derive(Debug, Clone)]
139pub struct Macrotask {
140 pub seq: Seq,
142 pub kind: MacrotaskKind,
144}
145
146impl Macrotask {
147 #[must_use]
149 pub const fn new(seq: Seq, kind: MacrotaskKind) -> Self {
150 Self { seq, kind }
151 }
152}
153
154impl PartialEq for Macrotask {
156 fn eq(&self, other: &Self) -> bool {
157 self.seq == other.seq
158 }
159}
160
161impl Eq for Macrotask {}
162
163impl PartialOrd for Macrotask {
164 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
165 Some(self.cmp(other))
166 }
167}
168
169impl Ord for Macrotask {
170 fn cmp(&self, other: &Self) -> Ordering {
171 self.seq.cmp(&other.seq)
173 }
174}
175
176pub trait Clock: Send + Sync {
178 fn now_ms(&self) -> u64;
180}
181
182impl<C: Clock> Clock for Arc<C> {
183 fn now_ms(&self) -> u64 {
184 self.as_ref().now_ms()
185 }
186}
187
188#[derive(Debug, Clone, Copy, Default)]
190pub struct WallClock;
191
192impl Clock for WallClock {
193 fn now_ms(&self) -> u64 {
194 use std::time::{SystemTime, UNIX_EPOCH};
195 let millis = SystemTime::now()
196 .duration_since(UNIX_EPOCH)
197 .unwrap_or_default()
198 .as_millis();
199 u64::try_from(millis).unwrap_or(u64::MAX)
200 }
201}
202
203#[derive(Debug)]
205pub struct DeterministicClock {
206 current_ms: std::sync::atomic::AtomicU64,
207}
208
209impl DeterministicClock {
210 #[must_use]
212 pub const fn new(start_ms: u64) -> Self {
213 Self {
214 current_ms: std::sync::atomic::AtomicU64::new(start_ms),
215 }
216 }
217
218 pub fn advance(&self, ms: u64) {
220 self.current_ms
221 .fetch_add(ms, std::sync::atomic::Ordering::SeqCst);
222 }
223
224 pub fn set(&self, ms: u64) {
226 self.current_ms
227 .store(ms, std::sync::atomic::Ordering::SeqCst);
228 }
229}
230
231impl Clock for DeterministicClock {
232 fn now_ms(&self) -> u64 {
233 self.current_ms.load(std::sync::atomic::Ordering::SeqCst)
234 }
235}
236
237pub struct Scheduler<C: Clock = WallClock> {
239 seq: Seq,
241 macrotask_queue: VecDeque<Macrotask>,
243 timer_heap: BinaryHeap<TimerEntry>,
245 next_timer_id: u64,
247 cancelled_timers: std::collections::HashSet<u64>,
249 heap_timer_ids: std::collections::HashSet<u64>,
251 clock: C,
253}
254
255impl Scheduler<WallClock> {
256 #[must_use]
258 pub fn new() -> Self {
259 Self::with_clock(WallClock)
260 }
261}
262
263impl Default for Scheduler<WallClock> {
264 fn default() -> Self {
265 Self::new()
266 }
267}
268
269impl<C: Clock> Scheduler<C> {
270 #[must_use]
272 pub fn with_clock(clock: C) -> Self {
273 Self {
274 seq: Seq::zero(),
275 macrotask_queue: VecDeque::new(),
276 timer_heap: BinaryHeap::new(),
277 next_timer_id: 1,
278 cancelled_timers: std::collections::HashSet::new(),
279 heap_timer_ids: std::collections::HashSet::new(),
280 clock,
281 }
282 }
283
284 #[must_use]
286 pub const fn current_seq(&self) -> Seq {
287 self.seq
288 }
289
290 const fn next_seq(&mut self) -> Seq {
292 let current = self.seq;
293 self.seq = self.seq.next();
294 current
295 }
296
297 #[must_use]
299 pub fn now_ms(&self) -> u64 {
300 self.clock.now_ms()
301 }
302
303 #[must_use]
305 pub fn has_pending(&self) -> bool {
306 !self.macrotask_queue.is_empty()
307 || self
308 .timer_heap
309 .iter()
310 .any(|entry| !self.cancelled_timers.contains(&entry.timer_id))
311 }
312
313 #[must_use]
315 pub fn macrotask_count(&self) -> usize {
316 self.macrotask_queue.len()
317 }
318
319 #[must_use]
321 pub fn timer_count(&self) -> usize {
322 self.timer_heap
323 .iter()
324 .filter(|entry| !self.cancelled_timers.contains(&entry.timer_id))
325 .count()
326 }
327
328 pub fn set_timeout(&mut self, delay_ms: u64) -> u64 {
332 let timer_id = self.allocate_timer_id();
333 let deadline_ms = self.clock.now_ms().saturating_add(delay_ms);
334 let seq = self.next_seq();
335
336 self.timer_heap
337 .push(TimerEntry::new(timer_id, deadline_ms, seq));
338 self.heap_timer_ids.insert(timer_id);
339
340 tracing::trace!(
341 event = "scheduler.timer.set",
342 timer_id,
343 delay_ms,
344 deadline_ms,
345 %seq,
346 "Timer scheduled"
347 );
348
349 timer_id
350 }
351
352 fn timer_id_in_use(&self, timer_id: u64) -> bool {
353 self.heap_timer_ids.contains(&timer_id)
354 }
355
356 fn allocate_timer_id(&mut self) -> u64 {
357 let start = self.next_timer_id;
358 let mut candidate = start;
359
360 loop {
361 self.next_timer_id = if candidate == u64::MAX {
363 1
364 } else {
365 candidate + 1
366 };
367
368 if !self.timer_id_in_use(candidate) {
369 return candidate;
370 }
371
372 candidate = self.next_timer_id;
373
374 if candidate == start {
376 break;
377 }
378 }
379
380 tracing::error!(
381 event = "scheduler.timer_id.exhausted",
382 "Timer ID namespace exhausted; falling back to u64::MAX reuse"
383 );
384 u64::MAX
385 }
386
387 pub fn clear_timeout(&mut self, timer_id: u64) -> bool {
391 let pending =
392 self.heap_timer_ids.contains(&timer_id) && !self.cancelled_timers.contains(&timer_id);
393
394 let cancelled = if pending {
395 self.cancelled_timers.insert(timer_id)
396 } else {
397 false
398 };
399
400 tracing::trace!(
401 event = "scheduler.timer.cancel",
402 timer_id,
403 cancelled,
404 "Timer cancelled"
405 );
406
407 cancelled
408 }
409
410 pub fn enqueue_hostcall_complete(&mut self, call_id: String, outcome: HostcallOutcome) {
412 let seq = self.next_seq();
413 tracing::trace!(
414 event = "scheduler.hostcall.enqueue",
415 call_id = %call_id,
416 %seq,
417 "Hostcall completion enqueued"
418 );
419 let task = Macrotask::new(seq, MacrotaskKind::HostcallComplete { call_id, outcome });
420 self.macrotask_queue.push_back(task);
421 }
422
423 pub fn enqueue_hostcall_completions<I>(&mut self, completions: I)
425 where
426 I: IntoIterator<Item = (String, HostcallOutcome)>,
427 {
428 for (call_id, outcome) in completions {
429 self.enqueue_hostcall_complete(call_id, outcome);
430 }
431 }
432
433 pub fn enqueue_stream_chunk(
435 &mut self,
436 call_id: String,
437 sequence: u64,
438 chunk: serde_json::Value,
439 is_final: bool,
440 ) {
441 self.enqueue_hostcall_complete(
442 call_id,
443 HostcallOutcome::StreamChunk {
444 sequence,
445 chunk,
446 is_final,
447 },
448 );
449 }
450
451 pub fn enqueue_event(&mut self, event_id: String, payload: serde_json::Value) {
453 let seq = self.next_seq();
454 tracing::trace!(
455 event = "scheduler.event.enqueue",
456 event_id = %event_id,
457 %seq,
458 "Inbound event enqueued"
459 );
460 let task = Macrotask::new(seq, MacrotaskKind::InboundEvent { event_id, payload });
461 self.macrotask_queue.push_back(task);
462 }
463
464 fn move_due_timers(&mut self) {
468 let now = self.clock.now_ms();
469
470 while let Some(entry) = self.timer_heap.peek() {
471 if entry.deadline_ms > now {
472 break;
473 }
474
475 let entry = self.timer_heap.pop().expect("peeked");
476 self.heap_timer_ids.remove(&entry.timer_id);
477
478 if self.cancelled_timers.remove(&entry.timer_id) {
480 tracing::trace!(
481 event = "scheduler.timer.skip_cancelled",
482 timer_id = entry.timer_id,
483 "Skipped cancelled timer"
484 );
485 continue;
486 }
487
488 let task_seq = self.next_seq();
491 let task = Macrotask::new(
492 task_seq,
493 MacrotaskKind::TimerFired {
494 timer_id: entry.timer_id,
495 },
496 );
497 self.macrotask_queue.push_back(task);
498
499 tracing::trace!(
500 event = "scheduler.timer.fire",
501 timer_id = entry.timer_id,
502 deadline_ms = entry.deadline_ms,
503 now_ms = now,
504 timer_seq = %entry.seq,
505 macrotask_seq = %task_seq,
506 "Timer fired"
507 );
508 }
509 }
510
511 pub fn tick(&mut self) -> Option<Macrotask> {
521 self.move_due_timers();
523
524 let task = self.macrotask_queue.pop_front();
526
527 if let Some(ref task) = task {
528 tracing::debug!(
529 event = "scheduler.tick.execute",
530 seq = %task.seq,
531 kind = ?std::mem::discriminant(&task.kind),
532 "Executing macrotask"
533 );
534 } else {
535 tracing::trace!(event = "scheduler.tick.idle", "No macrotask to execute");
536 }
537
538 task
539 }
540
541 #[must_use]
543 pub fn next_timer_deadline(&self) -> Option<u64> {
544 self.timer_heap
545 .iter()
546 .filter(|entry| !self.cancelled_timers.contains(&entry.timer_id))
547 .map(|entry| entry.deadline_ms)
548 .min()
549 }
550
551 #[must_use]
553 pub fn time_until_next_timer(&self) -> Option<u64> {
554 self.next_timer_deadline()
555 .map(|deadline| deadline.saturating_sub(self.clock.now_ms()))
556 }
557}
558
559#[derive(Debug, Clone)]
565pub struct ReactorMeshConfig {
566 pub shard_count: usize,
568 pub lane_capacity: usize,
570 pub topology: Option<ReactorTopologySnapshot>,
572}
573
574impl Default for ReactorMeshConfig {
575 fn default() -> Self {
576 Self {
577 shard_count: 4,
578 lane_capacity: 1024,
579 topology: None,
580 }
581 }
582}
583
584#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
586pub struct ReactorTopologyCore {
587 pub core_id: usize,
588 pub numa_node: usize,
589}
590
591#[derive(Debug, Clone, PartialEq, Eq, Default)]
593pub struct ReactorTopologySnapshot {
594 pub cores: Vec<ReactorTopologyCore>,
595}
596
597impl ReactorTopologySnapshot {
598 #[must_use]
600 pub fn from_core_node_pairs(pairs: &[(usize, usize)]) -> Self {
601 let mut cores = pairs
602 .iter()
603 .map(|(core_id, numa_node)| ReactorTopologyCore {
604 core_id: *core_id,
605 numa_node: *numa_node,
606 })
607 .collect::<Vec<_>>();
608 cores.sort_unstable();
609 cores.dedup();
610 Self { cores }
611 }
612}
613
614#[derive(Debug, Clone, Copy, PartialEq, Eq)]
616pub enum ReactorPlacementFallbackReason {
617 TopologyUnavailable,
619 TopologyEmpty,
621 SingleNumaNode,
623}
624
625impl ReactorPlacementFallbackReason {
626 #[must_use]
627 pub const fn as_code(self) -> &'static str {
628 match self {
629 Self::TopologyUnavailable => "topology_unavailable",
630 Self::TopologyEmpty => "topology_empty",
631 Self::SingleNumaNode => "single_numa_node",
632 }
633 }
634}
635
636#[derive(Debug, Clone, Copy, PartialEq, Eq)]
638pub struct ReactorShardBinding {
639 pub shard_id: usize,
640 pub core_id: usize,
641 pub numa_node: usize,
642}
643
644#[derive(Debug, Clone, PartialEq, Eq)]
646pub struct ReactorPlacementManifest {
647 pub shard_count: usize,
648 pub numa_node_count: usize,
649 pub bindings: Vec<ReactorShardBinding>,
650 pub fallback_reason: Option<ReactorPlacementFallbackReason>,
651}
652
653impl ReactorPlacementManifest {
654 #[must_use]
656 pub fn plan(shard_count: usize, topology: Option<&ReactorTopologySnapshot>) -> Self {
657 if shard_count == 0 {
658 return Self {
659 shard_count: 0,
660 numa_node_count: 0,
661 bindings: Vec::new(),
662 fallback_reason: None,
663 };
664 }
665
666 let Some(topology) = topology else {
667 let bindings = (0..shard_count)
668 .map(|shard_id| ReactorShardBinding {
669 shard_id,
670 core_id: shard_id,
671 numa_node: 0,
672 })
673 .collect::<Vec<_>>();
674 return Self {
675 shard_count,
676 numa_node_count: 1,
677 bindings,
678 fallback_reason: Some(ReactorPlacementFallbackReason::TopologyUnavailable),
679 };
680 };
681
682 if topology.cores.is_empty() {
683 let bindings = (0..shard_count)
684 .map(|shard_id| ReactorShardBinding {
685 shard_id,
686 core_id: shard_id,
687 numa_node: 0,
688 })
689 .collect::<Vec<_>>();
690 return Self {
691 shard_count,
692 numa_node_count: 1,
693 bindings,
694 fallback_reason: Some(ReactorPlacementFallbackReason::TopologyEmpty),
695 };
696 }
697
698 let mut by_node = BTreeMap::<usize, Vec<usize>>::new();
699 for core in &topology.cores {
700 by_node
701 .entry(core.numa_node)
702 .or_default()
703 .push(core.core_id);
704 }
705 for cores in by_node.values_mut() {
706 cores.sort_unstable();
707 cores.dedup();
708 }
709 let nodes = by_node
710 .into_iter()
711 .filter(|(_, cores)| !cores.is_empty())
712 .collect::<Vec<_>>();
713
714 if nodes.is_empty() {
715 let bindings = (0..shard_count)
716 .map(|shard_id| ReactorShardBinding {
717 shard_id,
718 core_id: shard_id,
719 numa_node: 0,
720 })
721 .collect::<Vec<_>>();
722 return Self {
723 shard_count,
724 numa_node_count: 1,
725 bindings,
726 fallback_reason: Some(ReactorPlacementFallbackReason::TopologyEmpty),
727 };
728 }
729
730 let node_count = nodes.len();
731 let fallback_reason = if node_count == 1 {
732 Some(ReactorPlacementFallbackReason::SingleNumaNode)
733 } else {
734 None
735 };
736
737 let mut bindings = Vec::with_capacity(shard_count);
738 for shard_id in 0..shard_count {
739 let node_idx = shard_id % node_count;
740 let (numa_node, cores) = &nodes[node_idx];
741 let core_idx = (shard_id / node_count) % cores.len();
742 bindings.push(ReactorShardBinding {
743 shard_id,
744 core_id: cores[core_idx],
745 numa_node: *numa_node,
746 });
747 }
748
749 Self {
750 shard_count,
751 numa_node_count: node_count,
752 bindings,
753 fallback_reason,
754 }
755 }
756
757 #[must_use]
759 pub fn as_json(&self) -> serde_json::Value {
760 serde_json::json!({
761 "shard_count": self.shard_count,
762 "numa_node_count": self.numa_node_count,
763 "fallback_reason": self.fallback_reason.map(ReactorPlacementFallbackReason::as_code),
764 "bindings": self.bindings.iter().map(|binding| {
765 serde_json::json!({
766 "shard_id": binding.shard_id,
767 "core_id": binding.core_id,
768 "numa_node": binding.numa_node
769 })
770 }).collect::<Vec<_>>()
771 })
772 }
773}
774
775#[derive(Debug, Clone)]
777pub struct ReactorEnvelope {
778 pub global_seq: Seq,
780 pub shard_seq: u64,
782 pub shard_id: usize,
784 pub task: MacrotaskKind,
786}
787
788impl ReactorEnvelope {
789 const fn new(global_seq: Seq, shard_seq: u64, shard_id: usize, task: MacrotaskKind) -> Self {
790 Self {
791 global_seq,
792 shard_seq,
793 shard_id,
794 task,
795 }
796 }
797}
798
799#[derive(Debug, Clone, Copy, PartialEq, Eq)]
801pub struct ReactorBackpressure {
802 pub shard_id: usize,
803 pub depth: usize,
804 pub capacity: usize,
805}
806
807#[derive(Debug, Clone, PartialEq, Eq)]
809pub struct ReactorMeshTelemetry {
810 pub queue_depths: Vec<usize>,
811 pub max_queue_depths: Vec<usize>,
812 pub rejected_enqueues: u64,
813 pub shard_bindings: Vec<ReactorShardBinding>,
814 pub fallback_reason: Option<ReactorPlacementFallbackReason>,
815}
816
817impl ReactorMeshTelemetry {
818 #[must_use]
820 pub fn as_json(&self) -> serde_json::Value {
821 serde_json::json!({
822 "queue_depths": self.queue_depths,
823 "max_queue_depths": self.max_queue_depths,
824 "rejected_enqueues": self.rejected_enqueues,
825 "fallback_reason": self.fallback_reason.map(ReactorPlacementFallbackReason::as_code),
826 "shard_bindings": self.shard_bindings.iter().map(|binding| {
827 serde_json::json!({
828 "shard_id": binding.shard_id,
829 "core_id": binding.core_id,
830 "numa_node": binding.numa_node,
831 })
832 }).collect::<Vec<_>>()
833 })
834 }
835}
836
837#[derive(Debug, Clone)]
841struct SpscLane<T> {
842 capacity: usize,
843 queue: VecDeque<T>,
844 max_depth: usize,
845}
846
847impl<T> SpscLane<T> {
848 fn new(capacity: usize) -> Self {
849 Self {
850 capacity,
851 queue: VecDeque::with_capacity(capacity),
852 max_depth: 0,
853 }
854 }
855
856 fn len(&self) -> usize {
857 self.queue.len()
858 }
859
860 fn is_empty(&self) -> bool {
861 self.queue.is_empty()
862 }
863
864 fn push(&mut self, value: T) -> Result<(), usize> {
865 if self.queue.len() >= self.capacity {
866 return Err(self.queue.len());
867 }
868 self.queue.push_back(value);
869 self.max_depth = self.max_depth.max(self.queue.len());
870 Ok(())
871 }
872
873 fn pop(&mut self) -> Option<T> {
874 self.queue.pop_front()
875 }
876
877 fn front(&self) -> Option<&T> {
878 self.queue.front()
879 }
880}
881
882#[derive(Debug, Clone)]
892pub struct ReactorMesh {
893 seq: Seq,
894 lanes: Vec<SpscLane<ReactorEnvelope>>,
895 shard_seq: Vec<u64>,
896 rr_cursor: usize,
897 rejected_enqueues: u64,
898 placement_manifest: ReactorPlacementManifest,
899}
900
901impl ReactorMesh {
902 #[must_use]
907 #[allow(clippy::needless_pass_by_value)]
908 pub fn new(config: ReactorMeshConfig) -> Self {
909 if config.shard_count == 0 || config.lane_capacity == 0 {
910 return Self {
911 seq: Seq::zero(),
912 lanes: Vec::new(),
913 shard_seq: Vec::new(),
914 rr_cursor: 0,
915 rejected_enqueues: 0,
916 placement_manifest: ReactorPlacementManifest::plan(0, config.topology.as_ref()),
917 };
918 }
919
920 let shard_count = config.shard_count;
921 let lane_capacity = config.lane_capacity;
922 let placement_manifest =
923 ReactorPlacementManifest::plan(shard_count, config.topology.as_ref());
924 let lanes = (0..shard_count)
925 .map(|_| SpscLane::new(lane_capacity))
926 .collect::<Vec<_>>();
927 Self {
928 seq: Seq::zero(),
929 lanes,
930 shard_seq: vec![0; shard_count],
931 rr_cursor: 0,
932 rejected_enqueues: 0,
933 placement_manifest,
934 }
935 }
936
937 #[must_use]
939 pub fn shard_count(&self) -> usize {
940 self.lanes.len()
941 }
942
943 #[must_use]
945 pub fn total_depth(&self) -> usize {
946 self.lanes.iter().map(SpscLane::len).sum()
947 }
948
949 #[must_use]
951 pub fn has_pending(&self) -> bool {
952 self.total_depth() > 0
953 }
954
955 #[must_use]
957 pub fn queue_depth(&self, shard_id: usize) -> Option<usize> {
958 self.lanes.get(shard_id).map(SpscLane::len)
959 }
960
961 #[must_use]
963 pub fn telemetry(&self) -> ReactorMeshTelemetry {
964 ReactorMeshTelemetry {
965 queue_depths: self.lanes.iter().map(SpscLane::len).collect(),
966 max_queue_depths: self.lanes.iter().map(|lane| lane.max_depth).collect(),
967 rejected_enqueues: self.rejected_enqueues,
968 shard_bindings: self.placement_manifest.bindings.clone(),
969 fallback_reason: self.placement_manifest.fallback_reason,
970 }
971 }
972
973 #[must_use]
975 pub const fn placement_manifest(&self) -> &ReactorPlacementManifest {
976 &self.placement_manifest
977 }
978
979 const fn next_global_seq(&mut self) -> Seq {
980 let current = self.seq;
981 self.seq = self.seq.next();
982 current
983 }
984
985 fn next_shard_seq(&mut self, shard_id: usize) -> u64 {
986 let Some(seq) = self.shard_seq.get_mut(shard_id) else {
987 return 0;
988 };
989 let current = *seq;
990 *seq = seq.saturating_add(1);
991 current
992 }
993
994 fn stable_hash(input: &str) -> u64 {
995 let mut hash = 0xcbf2_9ce4_8422_2325_u64;
997 for byte in input.as_bytes() {
998 hash ^= u64::from(*byte);
999 hash = hash.wrapping_mul(0x0100_0000_01b3_u64);
1000 }
1001 hash
1002 }
1003
1004 fn hash_route(&self, call_id: &str) -> usize {
1005 if self.lanes.len() <= 1 {
1006 return 0;
1007 }
1008 let lanes = u64::try_from(self.lanes.len()).unwrap_or(1);
1009 let slot = Self::stable_hash(call_id) % lanes;
1010 usize::try_from(slot).unwrap_or(0)
1011 }
1012
1013 fn rr_route(&mut self) -> usize {
1014 if self.lanes.len() <= 1 {
1015 return 0;
1016 }
1017 let idx = self.rr_cursor % self.lanes.len();
1018 self.rr_cursor = self.rr_cursor.wrapping_add(1);
1019 idx
1020 }
1021
1022 fn enqueue_with_route(
1023 &mut self,
1024 shard_id: usize,
1025 task: MacrotaskKind,
1026 ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1027 let global_seq = self.next_global_seq();
1028 let shard_seq = self.next_shard_seq(shard_id);
1029 let envelope = ReactorEnvelope::new(global_seq, shard_seq, shard_id, task);
1030 let Some(lane) = self.lanes.get_mut(shard_id) else {
1031 self.rejected_enqueues = self.rejected_enqueues.saturating_add(1);
1032 return Err(ReactorBackpressure {
1033 shard_id,
1034 depth: 0,
1035 capacity: 0,
1036 });
1037 };
1038 match lane.push(envelope.clone()) {
1039 Ok(()) => Ok(envelope),
1040 Err(depth) => {
1041 self.rejected_enqueues = self.rejected_enqueues.saturating_add(1);
1042 Err(ReactorBackpressure {
1043 shard_id,
1044 depth,
1045 capacity: lane.capacity,
1046 })
1047 }
1048 }
1049 }
1050
1051 pub fn enqueue_hostcall_complete(
1053 &mut self,
1054 call_id: String,
1055 outcome: HostcallOutcome,
1056 ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1057 let shard_id = self.hash_route(&call_id);
1058 self.enqueue_with_route(
1059 shard_id,
1060 MacrotaskKind::HostcallComplete { call_id, outcome },
1061 )
1062 }
1063
1064 pub fn enqueue_event(
1066 &mut self,
1067 event_id: String,
1068 payload: serde_json::Value,
1069 ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1070 let shard_id = self.rr_route();
1071 self.enqueue_with_route(shard_id, MacrotaskKind::InboundEvent { event_id, payload })
1072 }
1073
1074 pub fn drain_shard(&mut self, shard_id: usize, budget: usize) -> Vec<ReactorEnvelope> {
1076 let Some(lane) = self.lanes.get_mut(shard_id) else {
1077 return Vec::new();
1078 };
1079 let mut drained = Vec::with_capacity(budget.min(lane.len()));
1080 for _ in 0..budget {
1081 let Some(item) = lane.pop() else {
1082 break;
1083 };
1084 drained.push(item);
1085 }
1086 drained
1087 }
1088
1089 pub fn drain_global_order(&mut self, budget: usize) -> Vec<ReactorEnvelope> {
1091 let mut drained = Vec::with_capacity(budget);
1092 for _ in 0..budget {
1093 let mut best_lane: Option<usize> = None;
1094 let mut best_seq: Option<Seq> = None;
1095 for (idx, lane) in self.lanes.iter().enumerate() {
1096 let Some(front) = lane.front() else {
1097 continue;
1098 };
1099 if best_seq.is_none_or(|seq| front.global_seq < seq) {
1100 best_seq = Some(front.global_seq);
1101 best_lane = Some(idx);
1102 }
1103 }
1104 let Some(chosen_lane) = best_lane else {
1105 break;
1106 };
1107 if let Some(item) = self.lanes[chosen_lane].pop() {
1108 drained.push(item);
1109 }
1110 }
1111 drained
1112 }
1113}
1114
1115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1121pub struct HugepageConfig {
1122 pub page_size_bytes: usize,
1124 pub enabled: bool,
1126}
1127
1128impl Default for HugepageConfig {
1129 fn default() -> Self {
1130 Self {
1131 page_size_bytes: 2 * 1024 * 1024, enabled: true,
1133 }
1134 }
1135}
1136
1137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1139pub enum HugepageFallbackReason {
1140 Disabled,
1142 DetectionUnavailable,
1144 InsufficientHugepages,
1146 AlignmentMismatch,
1148}
1149
1150impl HugepageFallbackReason {
1151 #[must_use]
1152 pub const fn as_code(self) -> &'static str {
1153 match self {
1154 Self::Disabled => "hugepage_disabled",
1155 Self::DetectionUnavailable => "detection_unavailable",
1156 Self::InsufficientHugepages => "insufficient_hugepages",
1157 Self::AlignmentMismatch => "alignment_mismatch",
1158 }
1159 }
1160}
1161
1162#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1164pub struct HugepageStatus {
1165 pub total_pages: u64,
1167 pub free_pages: u64,
1169 pub page_size_bytes: usize,
1171 pub active: bool,
1173 pub fallback_reason: Option<HugepageFallbackReason>,
1175}
1176
1177impl HugepageStatus {
1178 #[must_use]
1180 pub const fn evaluate(config: &HugepageConfig, total: u64, free: u64) -> Self {
1181 if !config.enabled {
1182 return Self {
1183 total_pages: total,
1184 free_pages: free,
1185 page_size_bytes: config.page_size_bytes,
1186 active: false,
1187 fallback_reason: Some(HugepageFallbackReason::Disabled),
1188 };
1189 }
1190 if total == 0 && free == 0 {
1191 return Self {
1192 total_pages: 0,
1193 free_pages: 0,
1194 page_size_bytes: config.page_size_bytes,
1195 active: false,
1196 fallback_reason: Some(HugepageFallbackReason::DetectionUnavailable),
1197 };
1198 }
1199 if free == 0 {
1200 return Self {
1201 total_pages: total,
1202 free_pages: 0,
1203 page_size_bytes: config.page_size_bytes,
1204 active: false,
1205 fallback_reason: Some(HugepageFallbackReason::InsufficientHugepages),
1206 };
1207 }
1208 Self {
1209 total_pages: total,
1210 free_pages: free,
1211 page_size_bytes: config.page_size_bytes,
1212 active: true,
1213 fallback_reason: None,
1214 }
1215 }
1216
1217 #[must_use]
1219 pub fn as_json(&self) -> serde_json::Value {
1220 serde_json::json!({
1221 "total_pages": self.total_pages,
1222 "free_pages": self.free_pages,
1223 "page_size_bytes": self.page_size_bytes,
1224 "active": self.active,
1225 "fallback_reason": self.fallback_reason.map(HugepageFallbackReason::as_code),
1226 })
1227 }
1228}
1229
1230#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1232pub struct NumaSlabConfig {
1233 pub slab_capacity: usize,
1235 pub entry_size_bytes: usize,
1237 pub hugepage: HugepageConfig,
1239}
1240
1241impl Default for NumaSlabConfig {
1242 fn default() -> Self {
1243 Self {
1244 slab_capacity: 256,
1245 entry_size_bytes: 512,
1246 hugepage: HugepageConfig::default(),
1247 }
1248 }
1249}
1250
1251impl NumaSlabConfig {
1252 #[must_use]
1253 pub const fn slab_footprint_bytes(&self) -> Option<usize> {
1254 self.slab_capacity.checked_mul(self.entry_size_bytes)
1255 }
1256
1257 #[must_use]
1258 pub const fn hugepage_alignment_ok(&self) -> bool {
1259 if !self.hugepage.enabled {
1260 return true;
1261 }
1262 let page = self.hugepage.page_size_bytes;
1263 if page == 0 {
1264 return false;
1265 }
1266 match self.slab_footprint_bytes() {
1267 Some(bytes) => bytes != 0 && bytes % page == 0,
1268 None => false,
1269 }
1270 }
1271
1272 #[must_use]
1273 pub const fn alignment_mismatch_status(&self) -> HugepageStatus {
1274 HugepageStatus {
1275 total_pages: 0,
1276 free_pages: 0,
1277 page_size_bytes: self.hugepage.page_size_bytes,
1278 active: false,
1279 fallback_reason: Some(HugepageFallbackReason::AlignmentMismatch),
1280 }
1281 }
1282}
1283
1284#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1289pub struct NumaSlabHandle {
1290 pub node_id: usize,
1292 pub slot_index: usize,
1294 pub generation: u64,
1296}
1297
1298#[derive(Debug, Clone)]
1300pub struct NumaSlab {
1301 node_id: usize,
1302 capacity: usize,
1303 generations: Vec<u64>,
1304 allocated: Vec<bool>,
1305 free_list: Vec<usize>,
1306 total_allocs: u64,
1308 total_frees: u64,
1309 high_water_mark: usize,
1310}
1311
1312impl NumaSlab {
1313 #[must_use]
1315 pub fn new(node_id: usize, capacity: usize) -> Self {
1316 let capacity = capacity.max(1);
1317 let mut free_list = Vec::with_capacity(capacity);
1318 for idx in (0..capacity).rev() {
1320 free_list.push(idx);
1321 }
1322 Self {
1323 node_id,
1324 capacity,
1325 generations: vec![0; capacity],
1326 allocated: vec![false; capacity],
1327 free_list,
1328 total_allocs: 0,
1329 total_frees: 0,
1330 high_water_mark: 0,
1331 }
1332 }
1333
1334 #[must_use]
1336 pub fn in_use(&self) -> usize {
1337 self.capacity.saturating_sub(self.free_list.len())
1338 }
1339
1340 #[must_use]
1342 pub fn has_capacity(&self) -> bool {
1343 !self.free_list.is_empty()
1344 }
1345
1346 pub fn allocate(&mut self) -> Option<NumaSlabHandle> {
1348 let slot_index = self.free_list.pop()?;
1349 self.allocated[slot_index] = true;
1350 self.generations[slot_index] = self.generations[slot_index].saturating_add(1);
1351 self.total_allocs = self.total_allocs.saturating_add(1);
1352 self.high_water_mark = self.high_water_mark.max(self.in_use());
1353 Some(NumaSlabHandle {
1354 node_id: self.node_id,
1355 slot_index,
1356 generation: self.generations[slot_index],
1357 })
1358 }
1359
1360 pub fn deallocate(&mut self, handle: &NumaSlabHandle) -> bool {
1365 if handle.node_id != self.node_id {
1366 return false;
1367 }
1368 if handle.slot_index >= self.capacity {
1369 return false;
1370 }
1371 if !self.allocated[handle.slot_index] {
1372 return false;
1373 }
1374 if self.generations[handle.slot_index] != handle.generation {
1375 return false;
1376 }
1377 self.allocated[handle.slot_index] = false;
1378 self.free_list.push(handle.slot_index);
1379 self.total_frees = self.total_frees.saturating_add(1);
1380 true
1381 }
1382}
1383
1384#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1386pub enum CrossNodeReason {
1387 LocalExhausted,
1389}
1390
1391impl CrossNodeReason {
1392 #[must_use]
1393 pub const fn as_code(self) -> &'static str {
1394 match self {
1395 Self::LocalExhausted => "local_exhausted",
1396 }
1397 }
1398}
1399
1400#[derive(Debug, Clone)]
1402pub struct NumaSlabPool {
1403 slabs: Vec<NumaSlab>,
1404 config: NumaSlabConfig,
1405 hugepage_status: HugepageStatus,
1406 cross_node_allocs: u64,
1407 hugepage_backed_allocs: u64,
1408}
1409
1410impl NumaSlabPool {
1411 #[must_use]
1413 pub fn from_manifest(manifest: &ReactorPlacementManifest, config: NumaSlabConfig) -> Self {
1414 let mut node_ids = manifest
1415 .bindings
1416 .iter()
1417 .map(|b| b.numa_node)
1418 .collect::<Vec<_>>();
1419 node_ids.sort_unstable();
1420 node_ids.dedup();
1421
1422 if node_ids.is_empty() {
1423 node_ids.push(0);
1424 }
1425
1426 let slabs = node_ids
1427 .iter()
1428 .map(|&node_id| NumaSlab::new(node_id, config.slab_capacity))
1429 .collect();
1430
1431 let hugepage_status = if config.hugepage.enabled && !config.hugepage_alignment_ok() {
1434 config.alignment_mismatch_status()
1435 } else {
1436 HugepageStatus::evaluate(&config.hugepage, 0, 0)
1437 };
1438
1439 Self {
1440 slabs,
1441 config,
1442 hugepage_status,
1443 cross_node_allocs: 0,
1444 hugepage_backed_allocs: 0,
1445 }
1446 }
1447
1448 pub const fn set_hugepage_status(&mut self, status: HugepageStatus) {
1450 self.hugepage_status = if !self.config.hugepage.enabled {
1451 HugepageStatus::evaluate(&self.config.hugepage, status.total_pages, status.free_pages)
1452 } else if !self.config.hugepage_alignment_ok() {
1453 self.config.alignment_mismatch_status()
1454 } else {
1455 status
1456 };
1457 }
1458
1459 #[must_use]
1461 pub fn node_count(&self) -> usize {
1462 self.slabs.len()
1463 }
1464
1465 pub fn allocate(
1470 &mut self,
1471 preferred_node: usize,
1472 ) -> Option<(NumaSlabHandle, Option<CrossNodeReason>)> {
1473 if let Some(slab) = self.slabs.iter_mut().find(|s| s.node_id == preferred_node) {
1475 if let Some(handle) = slab.allocate() {
1476 if self.hugepage_status.active {
1477 self.hugepage_backed_allocs = self.hugepage_backed_allocs.saturating_add(1);
1478 }
1479 return Some((handle, None));
1480 }
1481 }
1482 for slab in &mut self.slabs {
1484 if slab.node_id == preferred_node {
1485 continue;
1486 }
1487 if let Some(handle) = slab.allocate() {
1488 self.cross_node_allocs = self.cross_node_allocs.saturating_add(1);
1489 if self.hugepage_status.active {
1490 self.hugepage_backed_allocs = self.hugepage_backed_allocs.saturating_add(1);
1491 }
1492 return Some((handle, Some(CrossNodeReason::LocalExhausted)));
1493 }
1494 }
1495 None
1496 }
1497
1498 pub fn deallocate(&mut self, handle: &NumaSlabHandle) -> bool {
1502 for slab in &mut self.slabs {
1503 if slab.node_id == handle.node_id {
1504 return slab.deallocate(handle);
1505 }
1506 }
1507 false
1508 }
1509
1510 #[must_use]
1512 pub fn telemetry(&self) -> NumaSlabTelemetry {
1513 let per_node = self
1514 .slabs
1515 .iter()
1516 .map(|slab| NumaSlabNodeTelemetry {
1517 node_id: slab.node_id,
1518 capacity: slab.capacity,
1519 in_use: slab.in_use(),
1520 total_allocs: slab.total_allocs,
1521 total_frees: slab.total_frees,
1522 high_water_mark: slab.high_water_mark,
1523 })
1524 .collect();
1525 NumaSlabTelemetry {
1526 per_node,
1527 cross_node_allocs: self.cross_node_allocs,
1528 hugepage_backed_allocs: self.hugepage_backed_allocs,
1529 hugepage_status: self.hugepage_status,
1530 config: self.config,
1531 }
1532 }
1533}
1534
1535#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1537pub struct NumaSlabNodeTelemetry {
1538 pub node_id: usize,
1539 pub capacity: usize,
1540 pub in_use: usize,
1541 pub total_allocs: u64,
1542 pub total_frees: u64,
1543 pub high_water_mark: usize,
1544}
1545
1546#[derive(Debug, Clone, PartialEq, Eq)]
1548pub struct NumaSlabTelemetry {
1549 pub per_node: Vec<NumaSlabNodeTelemetry>,
1550 pub cross_node_allocs: u64,
1551 pub hugepage_backed_allocs: u64,
1552 pub hugepage_status: HugepageStatus,
1553 pub config: NumaSlabConfig,
1554}
1555
1556impl NumaSlabTelemetry {
1557 const RATIO_SCALE_BPS: u64 = 10_000;
1558
1559 #[must_use]
1560 fn ratio_basis_points(numerator: u64, denominator: u64) -> u64 {
1561 if denominator == 0 {
1562 return 0;
1563 }
1564 let scaled =
1565 (u128::from(numerator) * u128::from(Self::RATIO_SCALE_BPS)) / u128::from(denominator);
1566 u64::try_from(scaled).unwrap_or(Self::RATIO_SCALE_BPS)
1567 }
1568
1569 #[must_use]
1570 const fn pressure_band(value_bps: u64) -> &'static str {
1571 if value_bps >= 7_500 {
1572 "high"
1573 } else if value_bps >= 2_500 {
1574 "medium"
1575 } else {
1576 "low"
1577 }
1578 }
1579
1580 #[must_use]
1582 pub fn as_json(&self) -> serde_json::Value {
1583 let total_allocs: u64 = self.per_node.iter().map(|n| n.total_allocs).sum();
1584 let total_frees: u64 = self.per_node.iter().map(|n| n.total_frees).sum();
1585 let total_in_use: usize = self.per_node.iter().map(|n| n.in_use).sum();
1586 let total_capacity: usize = self.per_node.iter().map(|n| n.capacity).sum();
1587 let total_high_water: usize = self.per_node.iter().map(|n| n.high_water_mark).sum();
1588 let remote_allocs = self.cross_node_allocs.min(total_allocs);
1589 let local_allocs = total_allocs.saturating_sub(remote_allocs);
1590 let local_ratio_bps = Self::ratio_basis_points(local_allocs, total_allocs);
1591 let remote_ratio_bps = Self::ratio_basis_points(remote_allocs, total_allocs);
1592 let hugepage_backed_allocs = self.hugepage_backed_allocs.min(total_allocs);
1593 let hugepage_hit_rate_bps = Self::ratio_basis_points(hugepage_backed_allocs, total_allocs);
1594 let total_capacity_u64 = u64::try_from(total_capacity).unwrap_or(u64::MAX);
1595 let total_in_use_u64 = u64::try_from(total_in_use).unwrap_or(u64::MAX);
1596 let total_high_water_u64 = u64::try_from(total_high_water).unwrap_or(u64::MAX);
1597 let occupancy_pressure_bps = Self::ratio_basis_points(total_in_use_u64, total_capacity_u64);
1598 let cache_miss_pressure_bps =
1599 Self::ratio_basis_points(total_high_water_u64, total_capacity_u64);
1600 let tlb_miss_pressure_bps = remote_ratio_bps;
1602 let cross_node_fallback_reason = if self.cross_node_allocs > 0 {
1603 Some(CrossNodeReason::LocalExhausted.as_code())
1604 } else {
1605 None
1606 };
1607 serde_json::json!({
1608 "node_count": self.per_node.len(),
1609 "total_allocs": total_allocs,
1610 "total_frees": total_frees,
1611 "total_in_use": total_in_use,
1612 "cross_node_allocs": self.cross_node_allocs,
1613 "hugepage_backed_allocs": hugepage_backed_allocs,
1614 "local_allocs": local_allocs,
1615 "remote_allocs": remote_allocs,
1616 "allocation_ratio_bps": {
1617 "scale": Self::RATIO_SCALE_BPS,
1618 "local": local_ratio_bps,
1619 "remote": remote_ratio_bps,
1620 },
1621 "hugepage_hit_rate_bps": {
1622 "scale": Self::RATIO_SCALE_BPS,
1623 "value": hugepage_hit_rate_bps,
1624 },
1625 "latency_proxies_bps": {
1626 "scale": Self::RATIO_SCALE_BPS,
1627 "tlb_miss_pressure": tlb_miss_pressure_bps,
1628 "cache_miss_pressure": cache_miss_pressure_bps,
1629 "occupancy_pressure": occupancy_pressure_bps,
1630 },
1631 "pressure_bands": {
1632 "tlb_miss": Self::pressure_band(tlb_miss_pressure_bps),
1633 "cache_miss": Self::pressure_band(cache_miss_pressure_bps),
1634 "occupancy": Self::pressure_band(occupancy_pressure_bps),
1635 },
1636 "fallback_reasons": {
1637 "cross_node": cross_node_fallback_reason,
1638 "hugepage": self.hugepage_status.fallback_reason.map(HugepageFallbackReason::as_code),
1639 },
1640 "config": {
1641 "slab_capacity": self.config.slab_capacity,
1642 "entry_size_bytes": self.config.entry_size_bytes,
1643 "hugepage_enabled": self.config.hugepage.enabled,
1644 "hugepage_page_size_bytes": self.config.hugepage.page_size_bytes,
1645 },
1646 "hugepage": self.hugepage_status.as_json(),
1647 "per_node": self.per_node.iter().map(|n| serde_json::json!({
1648 "node_id": n.node_id,
1649 "capacity": n.capacity,
1650 "in_use": n.in_use,
1651 "total_allocs": n.total_allocs,
1652 "total_frees": n.total_frees,
1653 "high_water_mark": n.high_water_mark,
1654 })).collect::<Vec<_>>(),
1655 })
1656 }
1657}
1658
1659#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1665pub enum AffinityEnforcement {
1666 Advisory,
1668 Strict,
1670 Disabled,
1672}
1673
1674impl AffinityEnforcement {
1675 #[must_use]
1676 pub const fn as_code(self) -> &'static str {
1677 match self {
1678 Self::Advisory => "advisory",
1679 Self::Strict => "strict",
1680 Self::Disabled => "disabled",
1681 }
1682 }
1683}
1684
1685#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1687pub struct ThreadAffinityAdvice {
1688 pub shard_id: usize,
1689 pub recommended_core: usize,
1690 pub recommended_numa_node: usize,
1691 pub enforcement: AffinityEnforcement,
1692}
1693
1694impl ThreadAffinityAdvice {
1695 #[must_use]
1697 pub fn as_json(&self) -> serde_json::Value {
1698 serde_json::json!({
1699 "shard_id": self.shard_id,
1700 "recommended_core": self.recommended_core,
1701 "recommended_numa_node": self.recommended_numa_node,
1702 "enforcement": self.enforcement.as_code(),
1703 })
1704 }
1705}
1706
1707impl ReactorPlacementManifest {
1708 #[must_use]
1710 pub fn affinity_advice(&self, enforcement: AffinityEnforcement) -> Vec<ThreadAffinityAdvice> {
1711 self.bindings
1712 .iter()
1713 .map(|binding| ThreadAffinityAdvice {
1714 shard_id: binding.shard_id,
1715 recommended_core: binding.core_id,
1716 recommended_numa_node: binding.numa_node,
1717 enforcement,
1718 })
1719 .collect()
1720 }
1721
1722 #[must_use]
1724 pub fn numa_node_for_shard(&self, shard_id: usize) -> Option<usize> {
1725 self.bindings
1726 .iter()
1727 .find(|b| b.shard_id == shard_id)
1728 .map(|b| b.numa_node)
1729 }
1730}
1731
1732impl ReactorMesh {
1737 #[must_use]
1739 pub fn preferred_numa_node(&self, shard_id: usize) -> usize {
1740 self.placement_manifest
1741 .numa_node_for_shard(shard_id)
1742 .unwrap_or(0)
1743 }
1744
1745 #[must_use]
1747 pub fn affinity_advice(&self, enforcement: AffinityEnforcement) -> Vec<ThreadAffinityAdvice> {
1748 self.placement_manifest.affinity_advice(enforcement)
1749 }
1750}
1751
1752impl<C: Clock> fmt::Debug for Scheduler<C> {
1753 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1754 f.debug_struct("Scheduler")
1755 .field("seq", &self.seq)
1756 .field("macrotask_count", &self.macrotask_queue.len())
1757 .field("timer_count", &self.timer_count())
1758 .field("next_timer_id", &self.next_timer_id)
1759 .field("cancelled_timers", &self.cancelled_timers.len())
1760 .finish_non_exhaustive()
1761 }
1762}
1763
1764#[cfg(test)]
1765mod tests {
1766 use super::*;
1767
1768 #[test]
1769 fn seq_ordering() {
1770 let a = Seq::zero();
1771 let b = a.next();
1772 let c = b.next();
1773
1774 assert!(a < b);
1775 assert!(b < c);
1776 assert_eq!(a.value(), 0);
1777 assert_eq!(b.value(), 1);
1778 assert_eq!(c.value(), 2);
1779 }
1780
1781 #[test]
1782 fn seq_next_saturates_at_u64_max() {
1783 let max = Seq(u64::MAX);
1784 assert_eq!(max.next(), max);
1785 }
1786
1787 #[test]
1788 fn timer_ordering() {
1789 let t1 = TimerEntry::new(1, 100, Seq(0));
1791 let t2 = TimerEntry::new(2, 200, Seq(1));
1792
1793 assert!(t1 > t2); let t3 = TimerEntry::new(3, 100, Seq(5));
1797 let t4 = TimerEntry::new(4, 100, Seq(10));
1798
1799 assert!(t3 > t4); }
1801
1802 #[test]
1803 fn deterministic_clock() {
1804 let clock = DeterministicClock::new(1000);
1805 assert_eq!(clock.now_ms(), 1000);
1806
1807 clock.advance(500);
1808 assert_eq!(clock.now_ms(), 1500);
1809
1810 clock.set(2000);
1811 assert_eq!(clock.now_ms(), 2000);
1812 }
1813
1814 #[test]
1815 fn scheduler_basic_timer() {
1816 let clock = DeterministicClock::new(0);
1817 let mut sched = Scheduler::with_clock(clock);
1818
1819 let timer_id = sched.set_timeout(100);
1821 assert_eq!(timer_id, 1);
1822 assert_eq!(sched.timer_count(), 1);
1823 assert!(!sched.macrotask_queue.is_empty() || sched.timer_count() > 0);
1824
1825 let task = sched.tick();
1827 assert!(task.is_none());
1828
1829 sched.clock.advance(150);
1831 let task = sched.tick();
1832 assert!(task.is_some());
1833 match task.unwrap().kind {
1834 MacrotaskKind::TimerFired { timer_id: id } => assert_eq!(id, timer_id),
1835 other => unreachable!("Expected TimerFired, got {other:?}"),
1836 }
1837 }
1838
1839 #[test]
1840 fn scheduler_timer_id_wraps_after_u64_max() {
1841 let clock = DeterministicClock::new(0);
1842 let mut sched = Scheduler::with_clock(clock);
1843 sched.next_timer_id = u64::MAX;
1844
1845 let first = sched.set_timeout(10);
1846 let second = sched.set_timeout(20);
1847
1848 assert_eq!(first, u64::MAX);
1849 assert_eq!(second, 1);
1850 }
1851
1852 #[test]
1853 fn scheduler_timer_id_wrap_preserves_cancellation_semantics() {
1854 let clock = DeterministicClock::new(0);
1855 let mut sched = Scheduler::with_clock(clock);
1856 sched.next_timer_id = u64::MAX;
1857
1858 let max_id = sched.set_timeout(10);
1859 let wrapped_id = sched.set_timeout(20);
1860
1861 assert_eq!(max_id, u64::MAX);
1862 assert_eq!(wrapped_id, 1);
1863 assert!(sched.clear_timeout(max_id));
1864 assert!(sched.clear_timeout(wrapped_id));
1865
1866 sched.clock.advance(25);
1867 assert!(sched.tick().is_none());
1868 assert!(sched.tick().is_none());
1869 }
1870
1871 #[test]
1872 fn scheduler_timer_ordering() {
1873 let clock = DeterministicClock::new(0);
1874 let mut sched = Scheduler::with_clock(clock);
1875
1876 let t3 = sched.set_timeout(300);
1878 let t1 = sched.set_timeout(100);
1879 let t2 = sched.set_timeout(200);
1880
1881 sched.clock.advance(400);
1883
1884 let task1 = sched.tick().unwrap();
1886 let task2 = sched.tick().unwrap();
1887 let task3 = sched.tick().unwrap();
1888
1889 match task1.kind {
1890 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t1),
1891 other => unreachable!("Expected t1, got {other:?}"),
1892 }
1893 match task2.kind {
1894 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1895 other => unreachable!("Expected t2, got {other:?}"),
1896 }
1897 match task3.kind {
1898 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t3),
1899 other => unreachable!("Expected t3, got {other:?}"),
1900 }
1901 }
1902
1903 #[test]
1904 fn scheduler_same_deadline_seq_ordering() {
1905 let clock = DeterministicClock::new(0);
1906 let mut sched = Scheduler::with_clock(clock);
1907
1908 let t1 = sched.set_timeout(100);
1910 let t2 = sched.set_timeout(100);
1911 let t3 = sched.set_timeout(100);
1912
1913 sched.clock.advance(150);
1914
1915 let task1 = sched.tick().unwrap();
1916 let task2 = sched.tick().unwrap();
1917 let task3 = sched.tick().unwrap();
1918
1919 match task1.kind {
1921 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t1),
1922 other => unreachable!("Expected t1, got {other:?}"),
1923 }
1924 match task2.kind {
1925 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1926 other => unreachable!("Expected t2, got {other:?}"),
1927 }
1928 match task3.kind {
1929 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t3),
1930 other => unreachable!("Expected t3, got {other:?}"),
1931 }
1932 }
1933
1934 #[test]
1935 fn scheduler_cancel_timer() {
1936 let clock = DeterministicClock::new(0);
1937 let mut sched = Scheduler::with_clock(clock);
1938
1939 let t1 = sched.set_timeout(100);
1940 let t2 = sched.set_timeout(200);
1941
1942 assert!(sched.clear_timeout(t1));
1944
1945 sched.clock.advance(250);
1947
1948 let task = sched.tick().unwrap();
1950 match task.kind {
1951 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1952 other => unreachable!("Expected t2, got {other:?}"),
1953 }
1954
1955 assert!(sched.tick().is_none());
1957 }
1958
1959 #[test]
1960 fn scheduler_hostcall_completion() {
1961 let clock = DeterministicClock::new(0);
1962 let mut sched = Scheduler::with_clock(clock);
1963
1964 sched.enqueue_hostcall_complete(
1965 "call-1".to_string(),
1966 HostcallOutcome::Success(serde_json::json!({"result": 42})),
1967 );
1968
1969 let task = sched.tick().unwrap();
1970 match task.kind {
1971 MacrotaskKind::HostcallComplete { call_id, outcome } => {
1972 assert_eq!(call_id, "call-1");
1973 match outcome {
1974 HostcallOutcome::Success(v) => assert_eq!(v["result"], 42),
1975 other => unreachable!("Expected success, got {other:?}"),
1976 }
1977 }
1978 other => unreachable!("Expected HostcallComplete, got {other:?}"),
1979 }
1980 }
1981
1982 #[test]
1983 fn scheduler_stream_chunk_sequence_and_finality_invariants() {
1984 let clock = DeterministicClock::new(0);
1985 let mut sched = Scheduler::with_clock(clock);
1986
1987 sched.enqueue_stream_chunk(
1988 "call-stream".to_string(),
1989 0,
1990 serde_json::json!({ "part": "a" }),
1991 false,
1992 );
1993 sched.enqueue_stream_chunk(
1994 "call-stream".to_string(),
1995 1,
1996 serde_json::json!({ "part": "b" }),
1997 false,
1998 );
1999 sched.enqueue_stream_chunk(
2000 "call-stream".to_string(),
2001 2,
2002 serde_json::json!({ "part": "c" }),
2003 true,
2004 );
2005
2006 let mut seen = Vec::new();
2007 while let Some(task) = sched.tick() {
2008 let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
2009 unreachable!("expected hostcall completion task");
2010 };
2011 let HostcallOutcome::StreamChunk {
2012 sequence,
2013 chunk,
2014 is_final,
2015 } = outcome
2016 else {
2017 unreachable!("expected stream chunk outcome");
2018 };
2019 seen.push((call_id, sequence, chunk, is_final));
2020 }
2021
2022 assert_eq!(seen.len(), 3);
2023 assert!(
2024 seen.iter()
2025 .all(|(call_id, _, _, _)| call_id == "call-stream")
2026 );
2027 assert_eq!(seen[0].1, 0);
2028 assert_eq!(seen[1].1, 1);
2029 assert_eq!(seen[2].1, 2);
2030 assert_eq!(seen[0].2, serde_json::json!({ "part": "a" }));
2031 assert_eq!(seen[1].2, serde_json::json!({ "part": "b" }));
2032 assert_eq!(seen[2].2, serde_json::json!({ "part": "c" }));
2033
2034 let final_count = seen.iter().filter(|(_, _, _, is_final)| *is_final).count();
2035 assert_eq!(final_count, 1, "expected exactly one final chunk");
2036 assert!(seen[2].3, "final chunk must be last");
2037 }
2038
2039 #[test]
2040 fn scheduler_stream_chunks_multi_call_interleaving_is_deterministic() {
2041 let clock = DeterministicClock::new(0);
2042 let mut sched = Scheduler::with_clock(clock);
2043
2044 sched.enqueue_stream_chunk("call-a".to_string(), 0, serde_json::json!("a0"), false);
2045 sched.enqueue_stream_chunk("call-b".to_string(), 0, serde_json::json!("b0"), false);
2046 sched.enqueue_stream_chunk("call-a".to_string(), 1, serde_json::json!("a1"), true);
2047 sched.enqueue_stream_chunk("call-b".to_string(), 1, serde_json::json!("b1"), true);
2048
2049 let mut trace = Vec::new();
2050 while let Some(task) = sched.tick() {
2051 let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
2052 unreachable!("expected hostcall completion task");
2053 };
2054 let HostcallOutcome::StreamChunk {
2055 sequence, is_final, ..
2056 } = outcome
2057 else {
2058 unreachable!("expected stream chunk outcome");
2059 };
2060 trace.push((call_id, sequence, is_final));
2061 }
2062
2063 assert_eq!(
2064 trace,
2065 vec![
2066 ("call-a".to_string(), 0, false),
2067 ("call-b".to_string(), 0, false),
2068 ("call-a".to_string(), 1, true),
2069 ("call-b".to_string(), 1, true),
2070 ]
2071 );
2072 }
2073
2074 #[test]
2075 fn scheduler_event_ordering() {
2076 let clock = DeterministicClock::new(0);
2077 let mut sched = Scheduler::with_clock(clock);
2078
2079 sched.enqueue_event("evt-1".to_string(), serde_json::json!({"n": 1}));
2081 sched.enqueue_event("evt-2".to_string(), serde_json::json!({"n": 2}));
2082
2083 let task1 = sched.tick().unwrap();
2085 let task2 = sched.tick().unwrap();
2086
2087 match task1.kind {
2088 MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-1"),
2089 other => unreachable!("Expected evt-1, got {other:?}"),
2090 }
2091 match task2.kind {
2092 MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-2"),
2093 other => unreachable!("Expected evt-2, got {other:?}"),
2094 }
2095 }
2096
2097 #[test]
2098 fn scheduler_mixed_tasks_ordering() {
2099 let clock = DeterministicClock::new(0);
2100 let mut sched = Scheduler::with_clock(clock);
2101
2102 let _t1 = sched.set_timeout(50);
2104
2105 sched.enqueue_event("evt-1".to_string(), serde_json::json!({}));
2107
2108 sched.clock.advance(100);
2110
2111 let task1 = sched.tick().unwrap();
2113 match task1.kind {
2114 MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-1"),
2115 other => unreachable!("Expected event first, got {other:?}"),
2116 }
2117
2118 let task2 = sched.tick().unwrap();
2120 match task2.kind {
2121 MacrotaskKind::TimerFired { .. } => {}
2122 other => unreachable!("Expected timer second, got {other:?}"),
2123 }
2124 }
2125
2126 #[test]
2127 fn scheduler_invariant_single_macrotask_per_tick() {
2128 let clock = DeterministicClock::new(0);
2129 let mut sched = Scheduler::with_clock(clock);
2130
2131 sched.enqueue_event("evt-1".to_string(), serde_json::json!({}));
2132 sched.enqueue_event("evt-2".to_string(), serde_json::json!({}));
2133 sched.enqueue_event("evt-3".to_string(), serde_json::json!({}));
2134
2135 assert!(sched.tick().is_some());
2137 assert_eq!(sched.macrotask_count(), 2);
2138
2139 assert!(sched.tick().is_some());
2140 assert_eq!(sched.macrotask_count(), 1);
2141
2142 assert!(sched.tick().is_some());
2143 assert_eq!(sched.macrotask_count(), 0);
2144
2145 assert!(sched.tick().is_none());
2146 }
2147
2148 #[test]
2149 fn scheduler_next_timer_deadline() {
2150 let clock = DeterministicClock::new(0);
2151 let mut sched = Scheduler::with_clock(clock);
2152
2153 assert!(sched.next_timer_deadline().is_none());
2154
2155 sched.set_timeout(200);
2156 sched.set_timeout(100);
2157 sched.set_timeout(300);
2158
2159 assert_eq!(sched.next_timer_deadline(), Some(100));
2160 assert_eq!(sched.time_until_next_timer(), Some(100));
2161
2162 sched.clock.advance(50);
2163 assert_eq!(sched.time_until_next_timer(), Some(50));
2164 }
2165
2166 #[test]
2167 fn scheduler_next_timer_skips_cancelled_timers() {
2168 let clock = DeterministicClock::new(0);
2169 let mut sched = Scheduler::with_clock(clock);
2170
2171 let t1 = sched.set_timeout(100);
2172 let _t2 = sched.set_timeout(200);
2173 let _t3 = sched.set_timeout(300);
2174
2175 assert!(sched.clear_timeout(t1));
2176 assert_eq!(sched.next_timer_deadline(), Some(200));
2177 assert_eq!(sched.time_until_next_timer(), Some(200));
2178 }
2179
2180 #[test]
2181 fn scheduler_debug_format() {
2182 let clock = DeterministicClock::new(0);
2183 let sched = Scheduler::with_clock(clock);
2184 let debug = format!("{sched:?}");
2185 assert!(debug.contains("Scheduler"));
2186 assert!(debug.contains("seq"));
2187 }
2188
2189 #[test]
2190 fn scheduler_debug_reports_live_timer_count() {
2191 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2192 let cancelled = sched.set_timeout(10);
2193 sched.set_timeout(20);
2194
2195 assert!(sched.clear_timeout(cancelled));
2196
2197 let debug = format!("{sched:?}");
2198 assert!(
2199 debug.contains("timer_count: 1"),
2200 "unexpected debug: {debug}"
2201 );
2202 assert!(
2203 debug.contains("cancelled_timers: 1"),
2204 "unexpected debug: {debug}"
2205 );
2206 }
2207
2208 #[derive(Debug, Clone)]
2209 struct XorShift64 {
2210 state: u64,
2211 }
2212
2213 impl XorShift64 {
2214 const fn new(seed: u64) -> Self {
2215 let seed = seed ^ 0x9E37_79B9_7F4A_7C15;
2217 Self { state: seed }
2218 }
2219
2220 fn next_u64(&mut self) -> u64 {
2221 let mut x = self.state;
2222 x ^= x << 13;
2223 x ^= x >> 7;
2224 x ^= x << 17;
2225 self.state = x;
2226 x
2227 }
2228
2229 fn next_range_u64(&mut self, upper_exclusive: u64) -> u64 {
2230 if upper_exclusive == 0 {
2231 return 0;
2232 }
2233 self.next_u64() % upper_exclusive
2234 }
2235
2236 fn next_usize(&mut self, upper_exclusive: usize) -> usize {
2237 let upper = u64::try_from(upper_exclusive).expect("usize fits in u64");
2238 let value = self.next_range_u64(upper);
2239 usize::try_from(value).expect("value < upper_exclusive")
2240 }
2241 }
2242
2243 fn trace_entry(task: &Macrotask) -> String {
2244 match &task.kind {
2245 MacrotaskKind::TimerFired { timer_id } => {
2246 format!("seq={}:timer:{timer_id}", task.seq.value())
2247 }
2248 MacrotaskKind::HostcallComplete { call_id, outcome } => {
2249 let outcome_tag = match outcome {
2250 HostcallOutcome::Success(_) => "ok",
2251 HostcallOutcome::Error { .. } => "err",
2252 HostcallOutcome::StreamChunk { is_final, .. } => {
2253 if *is_final {
2254 "stream_final"
2255 } else {
2256 "chunk"
2257 }
2258 }
2259 };
2260 format!("seq={}:hostcall:{call_id}:{outcome_tag}", task.seq.value())
2261 }
2262 MacrotaskKind::InboundEvent { event_id, payload } => {
2263 format!(
2264 "seq={}:event:{event_id}:payload={payload}",
2265 task.seq.value()
2266 )
2267 }
2268 }
2269 }
2270
2271 fn run_seeded_script(seed: u64) -> Vec<String> {
2272 let clock = DeterministicClock::new(0);
2273 let mut sched = Scheduler::with_clock(clock);
2274 let mut rng = XorShift64::new(seed);
2275 let mut timers = Vec::new();
2276 let mut trace = Vec::new();
2277
2278 for step in 0..256u64 {
2279 match rng.next_range_u64(6) {
2280 0 => {
2281 let delay_ms = rng.next_range_u64(250);
2282 let timer_id = sched.set_timeout(delay_ms);
2283 timers.push(timer_id);
2284 }
2285 1 if !timers.is_empty() => {
2286 let idx = rng.next_usize(timers.len());
2287 let _cancelled = sched.clear_timeout(timers[idx]);
2288 }
2289 2 => {
2290 let call_id = format!("call-{step}-{}", rng.next_u64());
2291 let outcome = HostcallOutcome::Success(serde_json::json!({ "step": step }));
2292 sched.enqueue_hostcall_complete(call_id, outcome);
2293 }
2294 3 => {
2295 let event_id = format!("evt-{step}");
2296 let payload = serde_json::json!({ "step": step, "entropy": rng.next_u64() });
2297 sched.enqueue_event(event_id, payload);
2298 }
2299 4 => {
2300 let delta_ms = rng.next_range_u64(50);
2301 sched.clock.advance(delta_ms);
2302 }
2303 _ => {}
2304 }
2305
2306 if rng.next_range_u64(3) == 0 {
2307 if let Some(task) = sched.tick() {
2308 trace.push(trace_entry(&task));
2309 }
2310 }
2311 }
2312
2313 for _ in 0..10_000 {
2315 if let Some(task) = sched.tick() {
2316 trace.push(trace_entry(&task));
2317 continue;
2318 }
2319
2320 let Some(next_deadline) = sched.next_timer_deadline() else {
2321 break;
2322 };
2323
2324 let now = sched.now_ms();
2325 assert!(
2326 next_deadline > now,
2327 "expected future timer deadline (deadline={next_deadline}, now={now})"
2328 );
2329 sched.clock.set(next_deadline);
2330 }
2331
2332 trace
2333 }
2334
2335 #[test]
2336 fn scheduler_seeded_trace_is_deterministic() {
2337 for seed in [0_u64, 1, 2, 3, 0xDEAD_BEEF] {
2338 let a = run_seeded_script(seed);
2339 let b = run_seeded_script(seed);
2340 assert_eq!(a, b, "trace mismatch for seed={seed}");
2341 }
2342 }
2343
2344 #[test]
2347 fn seq_display_format() {
2348 assert_eq!(format!("{}", Seq::zero()), "seq:0");
2349 assert_eq!(format!("{}", Seq::zero().next()), "seq:1");
2350 }
2351
2352 #[test]
2355 fn empty_scheduler_has_no_pending() {
2356 let sched = Scheduler::with_clock(DeterministicClock::new(0));
2357 assert!(!sched.has_pending());
2358 assert_eq!(sched.macrotask_count(), 0);
2359 assert_eq!(sched.timer_count(), 0);
2360 }
2361
2362 #[test]
2363 fn has_pending_with_timer_only() {
2364 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2365 sched.set_timeout(100);
2366 assert!(sched.has_pending());
2367 assert_eq!(sched.macrotask_count(), 0);
2368 assert_eq!(sched.timer_count(), 1);
2369 }
2370
2371 #[test]
2372 fn has_pending_with_macrotask_only() {
2373 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2374 sched.enqueue_event("e".to_string(), serde_json::json!({}));
2375 assert!(sched.has_pending());
2376 assert_eq!(sched.macrotask_count(), 1);
2377 assert_eq!(sched.timer_count(), 0);
2378 }
2379
2380 #[test]
2381 fn has_pending_ignores_cancelled_timers_without_macrotasks() {
2382 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2383 let timer = sched.set_timeout(10_000);
2384 assert!(sched.clear_timeout(timer));
2385 assert!(!sched.has_pending());
2386 assert_eq!(sched.timer_count(), 0);
2387 }
2388
2389 #[test]
2390 fn timer_count_ignores_cancelled_timers_before_they_are_reaped() {
2391 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2392 let live = sched.set_timeout(50);
2393 let cancelled = sched.set_timeout(100);
2394
2395 assert!(sched.clear_timeout(cancelled));
2396 assert_eq!(sched.timer_count(), 1);
2397 assert_eq!(sched.next_timer_deadline(), Some(50));
2398
2399 sched.clock.advance(60);
2400 let task = sched.tick().expect("live timer should fire");
2401 match task.kind {
2402 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, live),
2403 other => unreachable!("Expected live timer, got {other:?}"),
2404 }
2405 assert_eq!(sched.timer_count(), 0);
2406 }
2407
2408 #[test]
2411 fn wall_clock_returns_positive_ms() {
2412 let clock = WallClock;
2413 let now = clock.now_ms();
2414 assert!(now > 0, "WallClock should return a positive timestamp");
2415 }
2416
2417 #[test]
2420 fn clear_timeout_nonexistent_returns_false() {
2421 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2422 assert!(!sched.clear_timeout(999));
2423 assert!(
2424 sched.cancelled_timers.is_empty(),
2425 "unknown timer ids should not pollute cancelled set"
2426 );
2427 }
2428
2429 #[test]
2430 fn clear_timeout_double_cancel_returns_false() {
2431 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2432 let t = sched.set_timeout(100);
2433 assert!(sched.clear_timeout(t));
2434 assert!(!sched.clear_timeout(t));
2436 }
2437
2438 #[test]
2441 fn time_until_next_timer_none_when_empty() {
2442 let sched = Scheduler::with_clock(DeterministicClock::new(0));
2443 assert!(sched.time_until_next_timer().is_none());
2444 }
2445
2446 #[test]
2447 fn time_until_next_timer_saturates_at_zero() {
2448 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2449 sched.set_timeout(50);
2450 sched.clock.advance(100); assert_eq!(sched.time_until_next_timer(), Some(0));
2452 }
2453
2454 #[test]
2457 fn hostcall_error_outcome() {
2458 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2459 sched.enqueue_hostcall_complete(
2460 "err-call".to_string(),
2461 HostcallOutcome::Error {
2462 code: "E_TIMEOUT".to_string(),
2463 message: "Request timed out".to_string(),
2464 },
2465 );
2466
2467 let task = sched.tick().unwrap();
2468 match task.kind {
2469 MacrotaskKind::HostcallComplete { call_id, outcome } => {
2470 assert_eq!(call_id, "err-call");
2471 match outcome {
2472 HostcallOutcome::Error { code, message } => {
2473 assert_eq!(code, "E_TIMEOUT");
2474 assert_eq!(message, "Request timed out");
2475 }
2476 other => unreachable!("Expected error, got {other:?}"),
2477 }
2478 }
2479 other => unreachable!("Expected HostcallComplete, got {other:?}"),
2480 }
2481 }
2482
2483 #[test]
2486 fn timer_count_decreases_after_fire() {
2487 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2488 sched.set_timeout(50);
2489 sched.set_timeout(100);
2490 assert_eq!(sched.timer_count(), 2);
2491
2492 sched.clock.advance(75);
2493 let _task = sched.tick(); assert_eq!(sched.timer_count(), 1);
2495 }
2496
2497 #[test]
2500 fn empty_scheduler_tick_returns_none() {
2501 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2502 assert!(sched.tick().is_none());
2503 }
2504
2505 #[test]
2508 fn default_scheduler_starts_with_seq_zero() {
2509 let sched = Scheduler::new();
2510 assert_eq!(sched.current_seq(), Seq::zero());
2511 }
2512
2513 #[test]
2516 fn arc_clock_delegation() {
2517 let clock = Arc::new(DeterministicClock::new(42));
2518 assert_eq!(Clock::now_ms(&clock), 42);
2519 clock.advance(10);
2520 assert_eq!(Clock::now_ms(&clock), 52);
2521 }
2522
2523 #[test]
2526 fn timer_entry_equality_ignores_timer_id() {
2527 let a = TimerEntry::new(1, 100, Seq(5));
2528 let b = TimerEntry::new(2, 100, Seq(5));
2529 assert_eq!(a, b);
2531 }
2532
2533 #[test]
2536 fn macrotask_equality_uses_seq_only() {
2537 let a = Macrotask::new(Seq(1), MacrotaskKind::TimerFired { timer_id: 1 });
2538 let b = Macrotask::new(Seq(1), MacrotaskKind::TimerFired { timer_id: 2 });
2539 assert_eq!(a, b); }
2541
2542 #[test]
2545 fn scheduler_ten_concurrent_streams_complete_independently() {
2546 let clock = DeterministicClock::new(0);
2547 let mut sched = Scheduler::with_clock(clock);
2548 let n_streams: usize = 10;
2549 let chunks_per_stream: usize = 5;
2550
2551 for chunk_idx in 0..chunks_per_stream {
2553 for stream_idx in 0..n_streams {
2554 let is_final = chunk_idx == chunks_per_stream - 1;
2555 sched.enqueue_stream_chunk(
2556 format!("stream-{stream_idx}"),
2557 chunk_idx as u64,
2558 serde_json::json!({ "s": stream_idx, "c": chunk_idx }),
2559 is_final,
2560 );
2561 }
2562 }
2563
2564 let mut per_stream: std::collections::HashMap<String, Vec<(u64, bool)>> =
2565 std::collections::HashMap::new();
2566 while let Some(task) = sched.tick() {
2567 let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
2568 unreachable!("expected hostcall completion");
2569 };
2570 let HostcallOutcome::StreamChunk {
2571 sequence, is_final, ..
2572 } = outcome
2573 else {
2574 unreachable!("expected stream chunk");
2575 };
2576 per_stream
2577 .entry(call_id)
2578 .or_default()
2579 .push((sequence, is_final));
2580 }
2581
2582 assert_eq!(per_stream.len(), n_streams);
2583 for (call_id, chunks) in &per_stream {
2584 assert_eq!(
2585 chunks.len(),
2586 chunks_per_stream,
2587 "stream {call_id} incomplete"
2588 );
2589 for (i, (seq, _)) in chunks.iter().enumerate() {
2591 assert_eq!(*seq, i as u64, "stream {call_id}: non-monotonic at {i}");
2592 }
2593 let final_count = chunks.iter().filter(|(_, f)| *f).count();
2595 assert_eq!(
2596 final_count, 1,
2597 "stream {call_id}: expected exactly one final"
2598 );
2599 assert!(
2600 chunks.last().unwrap().1,
2601 "stream {call_id}: final must be last"
2602 );
2603 }
2604 }
2605
2606 #[test]
2607 fn scheduler_mixed_stream_nonstream_ordering() {
2608 let clock = DeterministicClock::new(0);
2609 let mut sched = Scheduler::with_clock(clock);
2610
2611 sched.enqueue_event("evt-1".to_string(), serde_json::json!({"n": 1}));
2613 sched.enqueue_stream_chunk("stream-x".to_string(), 0, serde_json::json!("data"), false);
2614 sched.enqueue_hostcall_complete(
2615 "call-y".to_string(),
2616 HostcallOutcome::Success(serde_json::json!({"ok": true})),
2617 );
2618 sched.enqueue_stream_chunk("stream-x".to_string(), 1, serde_json::json!("end"), true);
2619 sched.enqueue_event("evt-2".to_string(), serde_json::json!({"n": 2}));
2620
2621 let mut trace = Vec::new();
2622 while let Some(task) = sched.tick() {
2623 trace.push(trace_entry(&task));
2624 }
2625
2626 assert_eq!(trace.len(), 5);
2628 assert!(trace[0].contains("event:evt-1"));
2629 assert!(trace[1].contains("stream-x") && trace[1].contains("chunk"));
2630 assert!(trace[2].contains("call-y") && trace[2].contains("ok"));
2631 assert!(trace[3].contains("stream-x") && trace[3].contains("stream_final"));
2632 assert!(trace[4].contains("event:evt-2"));
2633 }
2634
2635 #[test]
2636 fn scheduler_concurrent_streams_deterministic_across_runs() {
2637 fn run_ten_streams() -> Vec<String> {
2638 let clock = DeterministicClock::new(0);
2639 let mut sched = Scheduler::with_clock(clock);
2640
2641 for chunk in 0..3_u64 {
2642 for stream in 0..10 {
2643 sched.enqueue_stream_chunk(
2644 format!("s{stream}"),
2645 chunk,
2646 serde_json::json!(chunk),
2647 chunk == 2,
2648 );
2649 }
2650 }
2651
2652 let mut trace = Vec::new();
2653 while let Some(task) = sched.tick() {
2654 trace.push(trace_entry(&task));
2655 }
2656 trace
2657 }
2658
2659 let a = run_ten_streams();
2660 let b = run_ten_streams();
2661 assert_eq!(a, b, "10-stream trace must be deterministic");
2662 assert_eq!(a.len(), 30, "expected 10 streams x 3 chunks = 30 entries");
2663 }
2664
2665 #[test]
2666 fn scheduler_stream_interleaved_with_timers() {
2667 let clock = DeterministicClock::new(0);
2668 let mut sched = Scheduler::with_clock(clock);
2669
2670 let _t = sched.set_timeout(100);
2672
2673 sched.enqueue_stream_chunk("s1".to_string(), 0, serde_json::json!("a"), false);
2675
2676 sched.clock.advance(150);
2678
2679 sched.enqueue_stream_chunk("s1".to_string(), 1, serde_json::json!("b"), true); let mut trace = Vec::new();
2683 while let Some(task) = sched.tick() {
2684 trace.push(trace_entry(&task));
2685 }
2686
2687 assert_eq!(trace.len(), 3);
2689 assert!(
2690 trace[0].contains("s1") && trace[0].contains("chunk"),
2691 "first: stream chunk 0, got: {}",
2692 trace[0]
2693 );
2694 assert!(
2695 trace[1].contains("s1") && trace[1].contains("stream_final"),
2696 "second: stream final, got: {}",
2697 trace[1]
2698 );
2699 assert!(
2700 trace[2].contains("timer"),
2701 "third: timer, got: {}",
2702 trace[2]
2703 );
2704 }
2705
2706 #[test]
2707 fn scheduler_due_timers_do_not_preempt_queued_macrotasks() {
2708 let clock = DeterministicClock::new(0);
2709 let mut sched = Scheduler::with_clock(clock);
2710
2711 let t1_id = sched.set_timeout(100);
2713
2714 sched.enqueue_event("E1".to_string(), serde_json::json!({}));
2716
2717 sched.clock.advance(100);
2719
2720 let task1 = sched.tick().expect("Should have a task");
2722
2723 let task2 = sched.tick().expect("Should have a task");
2725
2726 let seq1 = task1.seq.value();
2727 let seq2 = task2.seq.value();
2728
2729 assert!(
2731 seq1 < seq2,
2732 "Invariant I5 violation: Task execution not ordered by seq. Executed {seq1} then {seq2}"
2733 );
2734
2735 if let MacrotaskKind::InboundEvent { event_id, .. } = task1.kind {
2736 assert_eq!(event_id, "E1");
2737 } else {
2738 unreachable!();
2739 }
2740
2741 if let MacrotaskKind::TimerFired { timer_id } = task2.kind {
2742 assert_eq!(timer_id, t1_id);
2743 } else {
2744 unreachable!();
2745 }
2746 }
2747
2748 #[test]
2749 fn reactor_mesh_hash_routing_is_stable_for_call_id() {
2750 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2751 shard_count: 8,
2752 lane_capacity: 64,
2753 topology: None,
2754 });
2755
2756 let first = mesh
2757 .enqueue_hostcall_complete(
2758 "call-affinity".to_string(),
2759 HostcallOutcome::Success(serde_json::json!({})),
2760 )
2761 .expect("first enqueue");
2762 let second = mesh
2763 .enqueue_hostcall_complete(
2764 "call-affinity".to_string(),
2765 HostcallOutcome::Success(serde_json::json!({})),
2766 )
2767 .expect("second enqueue");
2768
2769 assert_eq!(
2770 first.shard_id, second.shard_id,
2771 "call_id hash routing must preserve shard affinity"
2772 );
2773 assert_eq!(first.shard_seq + 1, second.shard_seq);
2774 }
2775
2776 #[test]
2777 fn reactor_mesh_round_robin_event_distribution_is_deterministic() {
2778 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2779 shard_count: 3,
2780 lane_capacity: 64,
2781 topology: None,
2782 });
2783
2784 let mut routed = Vec::new();
2785 for idx in 0..6 {
2786 let envelope = mesh
2787 .enqueue_event(format!("evt-{idx}"), serde_json::json!({"i": idx}))
2788 .expect("enqueue event");
2789 routed.push(envelope.shard_id);
2790 }
2791
2792 assert_eq!(routed, vec![0, 1, 2, 0, 1, 2]);
2793 }
2794
2795 #[test]
2796 fn reactor_mesh_drain_global_order_preserves_monotone_seq() {
2797 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2798 shard_count: 4,
2799 lane_capacity: 64,
2800 topology: None,
2801 });
2802
2803 let mut expected = Vec::new();
2804 expected.push(
2805 mesh.enqueue_event("evt-1".to_string(), serde_json::json!({"v": 1}))
2806 .expect("event 1")
2807 .global_seq
2808 .value(),
2809 );
2810 expected.push(
2811 mesh.enqueue_hostcall_complete(
2812 "call-a".to_string(),
2813 HostcallOutcome::Success(serde_json::json!({"ok": true})),
2814 )
2815 .expect("call-a")
2816 .global_seq
2817 .value(),
2818 );
2819 expected.push(
2820 mesh.enqueue_event("evt-2".to_string(), serde_json::json!({"v": 2}))
2821 .expect("event 2")
2822 .global_seq
2823 .value(),
2824 );
2825 expected.push(
2826 mesh.enqueue_hostcall_complete(
2827 "call-b".to_string(),
2828 HostcallOutcome::Error {
2829 code: "E_TEST".to_string(),
2830 message: "boom".to_string(),
2831 },
2832 )
2833 .expect("call-b")
2834 .global_seq
2835 .value(),
2836 );
2837
2838 let drained = mesh.drain_global_order(16);
2839 let observed = drained
2840 .iter()
2841 .map(|entry| entry.global_seq.value())
2842 .collect::<Vec<_>>();
2843 assert_eq!(observed, expected);
2844 }
2845
2846 #[test]
2847 fn reactor_mesh_backpressure_tracks_rejected_enqueues() {
2848 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2849 shard_count: 1,
2850 lane_capacity: 2,
2851 topology: None,
2852 });
2853
2854 mesh.enqueue_event("evt-0".to_string(), serde_json::json!({}))
2855 .expect("enqueue evt-0");
2856 mesh.enqueue_event("evt-1".to_string(), serde_json::json!({}))
2857 .expect("enqueue evt-1");
2858
2859 let err = mesh
2860 .enqueue_event("evt-overflow".to_string(), serde_json::json!({}))
2861 .expect_err("third enqueue should overflow");
2862 assert_eq!(err.shard_id, 0);
2863 assert_eq!(err.capacity, 2);
2864 assert_eq!(err.depth, 2);
2865
2866 let telemetry = mesh.telemetry();
2867 assert_eq!(telemetry.rejected_enqueues, 1);
2868 assert_eq!(telemetry.max_queue_depths, vec![2]);
2869 assert_eq!(telemetry.queue_depths, vec![2]);
2870 }
2871
2872 #[test]
2873 fn reactor_placement_manifest_is_deterministic_across_runs() {
2874 let topology =
2875 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
2876 let first = ReactorPlacementManifest::plan(8, Some(&topology));
2877 let second = ReactorPlacementManifest::plan(8, Some(&topology));
2878 assert_eq!(first, second);
2879 assert_eq!(first.fallback_reason, None);
2880 }
2881
2882 #[test]
2883 fn reactor_topology_snapshot_normalizes_unsorted_duplicate_pairs() {
2884 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[
2885 (7, 5),
2886 (2, 1),
2887 (4, 2),
2888 (2, 1),
2889 (1, 1),
2890 (4, 2),
2891 ]);
2892 let normalized = topology
2893 .cores
2894 .iter()
2895 .map(|core| (core.core_id, core.numa_node))
2896 .collect::<Vec<_>>();
2897 assert_eq!(normalized, vec![(1, 1), (2, 1), (4, 2), (7, 5)]);
2898 }
2899
2900 #[test]
2901 fn reactor_placement_manifest_non_contiguous_numa_ids_is_stable() {
2902 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[
2903 (11, 42),
2904 (7, 5),
2905 (9, 42),
2906 (3, 5),
2907 (11, 42),
2908 (3, 5),
2909 ]);
2910 let first = ReactorPlacementManifest::plan(8, Some(&topology));
2911 let second = ReactorPlacementManifest::plan(8, Some(&topology));
2912 assert_eq!(first, second);
2913 assert_eq!(first.numa_node_count, 2);
2914 assert_eq!(first.fallback_reason, None);
2915
2916 let observed_nodes = first
2917 .bindings
2918 .iter()
2919 .map(|binding| binding.numa_node)
2920 .collect::<Vec<_>>();
2921 assert_eq!(observed_nodes, vec![5, 42, 5, 42, 5, 42, 5, 42]);
2922
2923 let observed_cores = first
2924 .bindings
2925 .iter()
2926 .map(|binding| binding.core_id)
2927 .collect::<Vec<_>>();
2928 assert_eq!(observed_cores, vec![3, 9, 7, 11, 3, 9, 7, 11]);
2929 }
2930
2931 #[test]
2932 fn reactor_placement_manifest_spreads_across_numa_nodes_round_robin() {
2933 let topology =
2934 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
2935 let manifest = ReactorPlacementManifest::plan(6, Some(&topology));
2936 let observed_nodes = manifest
2937 .bindings
2938 .iter()
2939 .map(|binding| binding.numa_node)
2940 .collect::<Vec<_>>();
2941 assert_eq!(observed_nodes, vec![0, 1, 0, 1, 0, 1]);
2942
2943 let observed_cores = manifest
2944 .bindings
2945 .iter()
2946 .map(|binding| binding.core_id)
2947 .collect::<Vec<_>>();
2948 assert_eq!(observed_cores, vec![0, 4, 1, 5, 0, 4]);
2949 }
2950
2951 #[test]
2952 fn reactor_placement_manifest_records_fallback_when_topology_missing() {
2953 let manifest = ReactorPlacementManifest::plan(3, None);
2954 assert_eq!(
2955 manifest.fallback_reason,
2956 Some(ReactorPlacementFallbackReason::TopologyUnavailable)
2957 );
2958 assert_eq!(manifest.numa_node_count, 1);
2959 assert_eq!(manifest.bindings.len(), 3);
2960 assert_eq!(manifest.bindings[0].core_id, 0);
2961 assert_eq!(manifest.bindings[2].core_id, 2);
2962 }
2963
2964 #[test]
2965 fn reactor_mesh_exposes_machine_readable_placement_manifest() {
2966 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(2, 0), (3, 0)]);
2967 let mesh = ReactorMesh::new(ReactorMeshConfig {
2968 shard_count: 3,
2969 lane_capacity: 8,
2970 topology: Some(topology),
2971 });
2972 let manifest = mesh.placement_manifest();
2973 let as_json = manifest.as_json();
2974 assert_eq!(as_json["shard_count"], serde_json::json!(3));
2975 assert_eq!(as_json["numa_node_count"], serde_json::json!(1));
2976 assert_eq!(
2977 as_json["fallback_reason"],
2978 serde_json::json!(Some("single_numa_node"))
2979 );
2980 assert_eq!(
2981 as_json["bindings"].as_array().map(std::vec::Vec::len),
2982 Some(3),
2983 "expected per-shard binding rows"
2984 );
2985 }
2986
2987 #[test]
2988 fn reactor_mesh_telemetry_includes_binding_and_fallback_metadata() {
2989 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(10, 0), (11, 0)]);
2990 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2991 shard_count: 2,
2992 lane_capacity: 4,
2993 topology: Some(topology),
2994 });
2995 mesh.enqueue_event("evt-0".to_string(), serde_json::json!({}))
2996 .expect("enqueue event");
2997
2998 let telemetry = mesh.telemetry();
2999 assert_eq!(
3000 telemetry.fallback_reason,
3001 Some(ReactorPlacementFallbackReason::SingleNumaNode)
3002 );
3003 assert_eq!(telemetry.shard_bindings.len(), 2);
3004 let telemetry_json = telemetry.as_json();
3005 assert_eq!(
3006 telemetry_json["fallback_reason"],
3007 serde_json::json!(Some("single_numa_node"))
3008 );
3009 assert_eq!(
3010 telemetry_json["shard_bindings"]
3011 .as_array()
3012 .map(std::vec::Vec::len),
3013 Some(2)
3014 );
3015 }
3016
3017 #[test]
3022 fn numa_slab_alloc_dealloc_round_trip() {
3023 let mut slab = NumaSlab::new(0, 4);
3024 let handle = slab.allocate().expect("should allocate");
3025 assert_eq!(handle.node_id, 0);
3026 assert_eq!(handle.generation, 1);
3027 assert!(slab.deallocate(&handle));
3028 assert_eq!(slab.in_use(), 0);
3029 }
3030
3031 #[test]
3032 fn numa_slab_exhaustion_returns_none() {
3033 let mut slab = NumaSlab::new(0, 2);
3034 let _a = slab.allocate().expect("first alloc");
3035 let _b = slab.allocate().expect("second alloc");
3036 assert!(slab.allocate().is_none(), "slab should be exhausted");
3037 }
3038
3039 #[test]
3040 fn numa_slab_generation_prevents_stale_dealloc() {
3041 let mut slab = NumaSlab::new(0, 2);
3042 let handle_v1 = slab.allocate().expect("first alloc");
3043 assert!(slab.deallocate(&handle_v1));
3044 let _handle_v2 = slab.allocate().expect("reuse slot");
3045 assert!(
3047 !slab.deallocate(&handle_v1),
3048 "stale generation should reject dealloc"
3049 );
3050 }
3051
3052 #[test]
3053 fn numa_slab_double_free_is_rejected() {
3054 let mut slab = NumaSlab::new(0, 4);
3055 let handle = slab.allocate().expect("alloc");
3056 assert!(slab.deallocate(&handle));
3057 assert!(!slab.deallocate(&handle), "double free must be rejected");
3058 }
3059
3060 #[test]
3061 fn numa_slab_wrong_node_dealloc_rejected() {
3062 let mut slab = NumaSlab::new(0, 4);
3063 let handle = slab.allocate().expect("alloc");
3064 let wrong_handle = NumaSlabHandle {
3065 node_id: 99,
3066 ..handle
3067 };
3068 assert!(
3069 !slab.deallocate(&wrong_handle),
3070 "wrong node_id should reject dealloc"
3071 );
3072 }
3073
3074 #[test]
3075 fn numa_slab_high_water_mark_tracks_peak() {
3076 let mut slab = NumaSlab::new(0, 8);
3077 let a = slab.allocate().expect("a");
3078 let b = slab.allocate().expect("b");
3079 let c = slab.allocate().expect("c");
3080 assert_eq!(slab.high_water_mark, 3);
3081 slab.deallocate(&a);
3082 slab.deallocate(&b);
3083 assert_eq!(
3084 slab.high_water_mark, 3,
3085 "high water mark should not decrease"
3086 );
3087 slab.deallocate(&c);
3088 let _d = slab.allocate().expect("d");
3089 assert_eq!(slab.high_water_mark, 3);
3090 }
3091
3092 #[test]
3093 fn numa_slab_pool_routes_to_local_node() {
3094 let topology =
3095 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
3096 let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3097 let config = NumaSlabConfig {
3098 slab_capacity: 8,
3099 entry_size_bytes: 256,
3100 hugepage: HugepageConfig {
3101 enabled: false,
3102 ..HugepageConfig::default()
3103 },
3104 };
3105 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3106 assert_eq!(pool.node_count(), 2);
3107
3108 let (handle, reason) = pool.allocate(1).expect("allocate on node 1");
3109 assert_eq!(handle.node_id, 1);
3110 assert!(reason.is_none(), "should be local allocation");
3111 }
3112
3113 #[test]
3114 fn numa_slab_pool_cross_node_fallback_tracks_telemetry() {
3115 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (2, 1)]);
3116 let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3117 let config = NumaSlabConfig {
3118 slab_capacity: 1, entry_size_bytes: 64,
3120 hugepage: HugepageConfig {
3121 enabled: false,
3122 ..HugepageConfig::default()
3123 },
3124 };
3125 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3126
3127 let (h0, _) = pool.allocate(0).expect("fill node 0");
3129 assert_eq!(h0.node_id, 0);
3130
3131 let (h1, reason) = pool.allocate(0).expect("fallback to node 1");
3133 assert_eq!(h1.node_id, 1);
3134 assert_eq!(reason, Some(CrossNodeReason::LocalExhausted));
3135
3136 let telemetry = pool.telemetry();
3137 assert_eq!(telemetry.cross_node_allocs, 1);
3138 let json = telemetry.as_json();
3139 assert_eq!(json["total_allocs"], serde_json::json!(2));
3140 assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(0));
3141 assert_eq!(json["local_allocs"], serde_json::json!(1));
3142 assert_eq!(json["remote_allocs"], serde_json::json!(1));
3143 assert_eq!(
3144 json["allocation_ratio_bps"]["local"],
3145 serde_json::json!(5000)
3146 );
3147 assert_eq!(
3148 json["allocation_ratio_bps"]["remote"],
3149 serde_json::json!(5000)
3150 );
3151 assert_eq!(
3152 json["allocation_ratio_bps"]["scale"],
3153 serde_json::json!(10_000)
3154 );
3155 assert_eq!(json["hugepage_hit_rate_bps"]["value"], serde_json::json!(0));
3156 assert_eq!(
3157 json["latency_proxies_bps"]["tlb_miss_pressure"],
3158 serde_json::json!(5000)
3159 );
3160 assert_eq!(
3161 json["latency_proxies_bps"]["cache_miss_pressure"],
3162 serde_json::json!(10_000)
3163 );
3164 assert_eq!(
3165 json["latency_proxies_bps"]["occupancy_pressure"],
3166 serde_json::json!(10_000)
3167 );
3168 assert_eq!(
3169 json["pressure_bands"]["tlb_miss"],
3170 serde_json::json!("medium")
3171 );
3172 assert_eq!(
3173 json["pressure_bands"]["cache_miss"],
3174 serde_json::json!("high")
3175 );
3176 assert_eq!(
3177 json["pressure_bands"]["occupancy"],
3178 serde_json::json!("high")
3179 );
3180 assert_eq!(
3181 json["fallback_reasons"]["cross_node"],
3182 serde_json::json!("local_exhausted")
3183 );
3184 assert_eq!(
3185 json["fallback_reasons"]["hugepage"],
3186 serde_json::json!("hugepage_disabled")
3187 );
3188 }
3189
3190 #[test]
3191 fn numa_slab_pool_total_exhaustion_returns_none() {
3192 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3193 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3194 let config = NumaSlabConfig {
3195 slab_capacity: 1,
3196 entry_size_bytes: 64,
3197 hugepage: HugepageConfig {
3198 enabled: false,
3199 ..HugepageConfig::default()
3200 },
3201 };
3202 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3203 let _ = pool.allocate(0).expect("fill the only slot");
3204 assert!(pool.allocate(0).is_none(), "pool should be exhausted");
3205 }
3206
3207 #[test]
3208 fn numa_slab_pool_deallocate_round_trip() {
3209 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1)]);
3210 let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3211 let config = NumaSlabConfig::default();
3212 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3213
3214 let (handle, _) = pool.allocate(1).expect("alloc");
3215 assert!(pool.deallocate(&handle));
3216 assert!(!pool.deallocate(&handle), "double free must be rejected");
3217 }
3218
3219 #[test]
3220 fn hugepage_status_disabled_reports_fallback() {
3221 let config = HugepageConfig {
3222 enabled: false,
3223 ..HugepageConfig::default()
3224 };
3225 let status = HugepageStatus::evaluate(&config, 1024, 512);
3226 assert!(!status.active);
3227 assert_eq!(
3228 status.fallback_reason,
3229 Some(HugepageFallbackReason::Disabled)
3230 );
3231 }
3232
3233 #[test]
3234 fn hugepage_status_zero_totals_means_unavailable() {
3235 let config = HugepageConfig::default();
3236 let status = HugepageStatus::evaluate(&config, 0, 0);
3237 assert!(!status.active);
3238 assert_eq!(
3239 status.fallback_reason,
3240 Some(HugepageFallbackReason::DetectionUnavailable)
3241 );
3242 }
3243
3244 #[test]
3245 fn hugepage_status_zero_free_means_insufficient() {
3246 let config = HugepageConfig::default();
3247 let status = HugepageStatus::evaluate(&config, 1024, 0);
3248 assert!(!status.active);
3249 assert_eq!(
3250 status.fallback_reason,
3251 Some(HugepageFallbackReason::InsufficientHugepages)
3252 );
3253 }
3254
3255 #[test]
3256 fn hugepage_status_available_is_active() {
3257 let config = HugepageConfig::default();
3258 let status = HugepageStatus::evaluate(&config, 1024, 512);
3259 assert!(status.active);
3260 assert!(status.fallback_reason.is_none());
3261 assert_eq!(status.free_pages, 512);
3262 }
3263
3264 #[test]
3265 fn numa_slab_pool_tracks_hugepage_hit_rate_when_active() {
3266 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3267 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3268 let config = NumaSlabConfig {
3269 slab_capacity: 4,
3270 entry_size_bytes: 1024,
3271 hugepage: HugepageConfig {
3272 page_size_bytes: 4096,
3273 enabled: true,
3274 },
3275 };
3276 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3277 pool.set_hugepage_status(HugepageStatus {
3278 total_pages: 128,
3279 free_pages: 64,
3280 page_size_bytes: 4096,
3281 active: true,
3282 fallback_reason: None,
3283 });
3284
3285 let _ = pool.allocate(0).expect("first hugepage-backed alloc");
3286 let _ = pool.allocate(0).expect("second hugepage-backed alloc");
3287
3288 let telemetry = pool.telemetry();
3289 let json = telemetry.as_json();
3290 assert_eq!(json["total_allocs"], serde_json::json!(2));
3291 assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(2));
3292 assert_eq!(
3293 json["hugepage_hit_rate_bps"]["value"],
3294 serde_json::json!(10_000)
3295 );
3296 assert_eq!(
3297 json["hugepage_hit_rate_bps"]["scale"],
3298 serde_json::json!(10_000)
3299 );
3300 assert_eq!(json["hugepage"]["active"], serde_json::json!(true));
3301 }
3302
3303 #[test]
3304 fn numa_slab_pool_misaligned_hugepage_config_reports_alignment_mismatch() {
3305 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3306 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3307 let config = NumaSlabConfig {
3308 slab_capacity: 3,
3309 entry_size_bytes: 1024,
3310 hugepage: HugepageConfig {
3311 page_size_bytes: 2048,
3312 enabled: true,
3313 },
3314 };
3315
3316 let pool = NumaSlabPool::from_manifest(&manifest, config);
3317 let telemetry = pool.telemetry();
3318 assert!(!telemetry.hugepage_status.active);
3319 assert_eq!(
3320 telemetry.hugepage_status.fallback_reason,
3321 Some(HugepageFallbackReason::AlignmentMismatch)
3322 );
3323
3324 let json = telemetry.as_json();
3325 assert_eq!(
3326 json["hugepage"]["fallback_reason"],
3327 serde_json::json!("alignment_mismatch")
3328 );
3329 assert_eq!(
3330 json["fallback_reasons"]["hugepage"],
3331 serde_json::json!("alignment_mismatch")
3332 );
3333 }
3334
3335 #[test]
3336 fn numa_slab_pool_aligned_hugepage_config_defaults_to_detection_unavailable() {
3337 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3338 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3339 let config = NumaSlabConfig {
3340 slab_capacity: 4,
3341 entry_size_bytes: 1024,
3342 hugepage: HugepageConfig {
3343 page_size_bytes: 4096,
3344 enabled: true,
3345 },
3346 };
3347
3348 let pool = NumaSlabPool::from_manifest(&manifest, config);
3349 let telemetry = pool.telemetry();
3350 assert!(!telemetry.hugepage_status.active);
3351 assert_eq!(
3352 telemetry.hugepage_status.fallback_reason,
3353 Some(HugepageFallbackReason::DetectionUnavailable)
3354 );
3355 }
3356
3357 #[test]
3358 fn misaligned_hugepage_config_rejects_external_status_override() {
3359 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3360 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3361 let config = NumaSlabConfig {
3362 slab_capacity: 3,
3363 entry_size_bytes: 1024,
3364 hugepage: HugepageConfig {
3365 page_size_bytes: 2048,
3366 enabled: true,
3367 },
3368 };
3369 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3370
3371 let forced = HugepageStatus::evaluate(&config.hugepage, 256, 64);
3372 assert!(forced.active);
3373 assert!(forced.fallback_reason.is_none());
3374
3375 pool.set_hugepage_status(forced);
3376 let telemetry = pool.telemetry();
3377 assert!(!telemetry.hugepage_status.active);
3378 assert_eq!(
3379 telemetry.hugepage_status.fallback_reason,
3380 Some(HugepageFallbackReason::AlignmentMismatch)
3381 );
3382 }
3383
3384 #[test]
3385 fn disabled_hugepage_config_rejects_external_active_status_override() {
3386 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3387 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3388 let config = NumaSlabConfig {
3389 slab_capacity: 4,
3390 entry_size_bytes: 1024,
3391 hugepage: HugepageConfig {
3392 page_size_bytes: 4096,
3393 enabled: false,
3394 },
3395 };
3396 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3397
3398 let forced = HugepageStatus {
3399 total_pages: 512,
3400 free_pages: 256,
3401 page_size_bytes: 4096,
3402 active: true,
3403 fallback_reason: None,
3404 };
3405 pool.set_hugepage_status(forced);
3406
3407 let telemetry = pool.telemetry();
3408 assert!(!telemetry.hugepage_status.active);
3409 assert_eq!(
3410 telemetry.hugepage_status.fallback_reason,
3411 Some(HugepageFallbackReason::Disabled)
3412 );
3413 assert_eq!(telemetry.hugepage_status.total_pages, 512);
3414 assert_eq!(telemetry.hugepage_status.free_pages, 256);
3415
3416 let json = telemetry.as_json();
3417 assert_eq!(
3418 json["hugepage"]["fallback_reason"],
3419 serde_json::json!("hugepage_disabled")
3420 );
3421 }
3422
3423 #[test]
3424 fn disabled_hugepage_config_uses_disabled_reason_even_if_slab_is_misaligned() {
3425 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3426 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3427 let config = NumaSlabConfig {
3428 slab_capacity: 3,
3429 entry_size_bytes: 1024,
3430 hugepage: HugepageConfig {
3431 page_size_bytes: 2048,
3432 enabled: false,
3433 },
3434 };
3435
3436 let pool = NumaSlabPool::from_manifest(&manifest, config);
3437 let telemetry = pool.telemetry();
3438 assert!(!telemetry.hugepage_status.active);
3439 assert_eq!(
3440 telemetry.hugepage_status.fallback_reason,
3441 Some(HugepageFallbackReason::Disabled)
3442 );
3443 }
3444
3445 #[test]
3446 fn hugepage_alignment_rejects_zero_page_size_and_fails_closed() {
3447 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3448 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3449 let config = NumaSlabConfig {
3450 slab_capacity: 4,
3451 entry_size_bytes: 1024,
3452 hugepage: HugepageConfig {
3453 page_size_bytes: 0,
3454 enabled: true,
3455 },
3456 };
3457 assert!(!config.hugepage_alignment_ok());
3458
3459 let pool = NumaSlabPool::from_manifest(&manifest, config);
3460 let telemetry = pool.telemetry();
3461 assert!(!telemetry.hugepage_status.active);
3462 assert_eq!(
3463 telemetry.hugepage_status.fallback_reason,
3464 Some(HugepageFallbackReason::AlignmentMismatch)
3465 );
3466 }
3467
3468 #[test]
3469 fn hugepage_alignment_rejects_zero_footprint_and_fails_closed() {
3470 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3471 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3472 let config = NumaSlabConfig {
3473 slab_capacity: 0,
3474 entry_size_bytes: 1024,
3475 hugepage: HugepageConfig {
3476 page_size_bytes: 2048,
3477 enabled: true,
3478 },
3479 };
3480 assert_eq!(config.slab_footprint_bytes(), Some(0));
3481 assert!(!config.hugepage_alignment_ok());
3482
3483 let pool = NumaSlabPool::from_manifest(&manifest, config);
3484 let telemetry = pool.telemetry();
3485 assert!(!telemetry.hugepage_status.active);
3486 assert_eq!(
3487 telemetry.hugepage_status.fallback_reason,
3488 Some(HugepageFallbackReason::AlignmentMismatch)
3489 );
3490 }
3491
3492 #[test]
3493 fn hugepage_alignment_rejects_checked_mul_overflow_without_panicking() {
3494 let config = NumaSlabConfig {
3495 slab_capacity: usize::MAX,
3496 entry_size_bytes: 2,
3497 hugepage: HugepageConfig {
3498 page_size_bytes: 4096,
3499 enabled: true,
3500 },
3501 };
3502 assert!(config.slab_footprint_bytes().is_none());
3503 assert!(!config.hugepage_alignment_ok());
3504
3505 let status = config.alignment_mismatch_status();
3506 assert!(!status.active);
3507 assert_eq!(
3508 status.fallback_reason,
3509 Some(HugepageFallbackReason::AlignmentMismatch)
3510 );
3511 }
3512
3513 #[test]
3514 fn zero_page_size_config_rejects_external_status_override() {
3515 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3516 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3517 let config = NumaSlabConfig {
3518 slab_capacity: 4,
3519 entry_size_bytes: 1024,
3520 hugepage: HugepageConfig {
3521 page_size_bytes: 0,
3522 enabled: true,
3523 },
3524 };
3525 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3526
3527 let forced = HugepageStatus {
3528 total_pages: 128,
3529 free_pages: 64,
3530 page_size_bytes: 0,
3531 active: true,
3532 fallback_reason: None,
3533 };
3534 pool.set_hugepage_status(forced);
3535
3536 let telemetry = pool.telemetry();
3537 assert!(!telemetry.hugepage_status.active);
3538 assert_eq!(
3539 telemetry.hugepage_status.fallback_reason,
3540 Some(HugepageFallbackReason::AlignmentMismatch)
3541 );
3542 }
3543
3544 #[test]
3545 fn zero_footprint_config_rejects_external_status_override() {
3546 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3547 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3548 let config = NumaSlabConfig {
3549 slab_capacity: 0,
3550 entry_size_bytes: 1024,
3551 hugepage: HugepageConfig {
3552 page_size_bytes: 2048,
3553 enabled: true,
3554 },
3555 };
3556 assert_eq!(config.slab_footprint_bytes(), Some(0));
3557
3558 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3559 let forced = HugepageStatus {
3560 total_pages: 128,
3561 free_pages: 64,
3562 page_size_bytes: 2048,
3563 active: true,
3564 fallback_reason: None,
3565 };
3566 pool.set_hugepage_status(forced);
3567
3568 let telemetry = pool.telemetry();
3569 assert!(!telemetry.hugepage_status.active);
3570 assert_eq!(
3571 telemetry.hugepage_status.fallback_reason,
3572 Some(HugepageFallbackReason::AlignmentMismatch)
3573 );
3574 assert_eq!(telemetry.hugepage_status.total_pages, 0);
3575 assert_eq!(telemetry.hugepage_status.free_pages, 0);
3576 }
3577
3578 #[test]
3579 fn checked_mul_overflow_config_rejects_external_status_override() {
3580 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3581 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3582 let config = NumaSlabConfig {
3583 slab_capacity: 2,
3584 entry_size_bytes: usize::MAX,
3585 hugepage: HugepageConfig {
3586 page_size_bytes: 4096,
3587 enabled: true,
3588 },
3589 };
3590 assert!(config.slab_footprint_bytes().is_none());
3591
3592 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3593 let forced = HugepageStatus {
3594 total_pages: 512,
3595 free_pages: 256,
3596 page_size_bytes: 4096,
3597 active: true,
3598 fallback_reason: None,
3599 };
3600 pool.set_hugepage_status(forced);
3601
3602 let telemetry = pool.telemetry();
3603 assert!(!telemetry.hugepage_status.active);
3604 assert_eq!(
3605 telemetry.hugepage_status.fallback_reason,
3606 Some(HugepageFallbackReason::AlignmentMismatch)
3607 );
3608 assert_eq!(telemetry.hugepage_status.total_pages, 0);
3609 assert_eq!(telemetry.hugepage_status.free_pages, 0);
3610 }
3611
3612 #[test]
3613 fn hugepage_status_json_is_stable() {
3614 let config = HugepageConfig::default();
3615 let status = HugepageStatus::evaluate(&config, 1024, 128);
3616 let json = status.as_json();
3617 assert_eq!(json["total_pages"], serde_json::json!(1024));
3618 assert_eq!(json["free_pages"], serde_json::json!(128));
3619 assert_eq!(json["active"], serde_json::json!(true));
3620 assert!(json["fallback_reason"].is_null());
3621 }
3622
3623 #[test]
3624 fn numa_slab_telemetry_json_has_expected_shape() {
3625 let topology =
3626 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
3627 let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3628 let config = NumaSlabConfig {
3629 slab_capacity: 16,
3630 entry_size_bytes: 128,
3631 hugepage: HugepageConfig {
3632 enabled: false,
3633 ..HugepageConfig::default()
3634 },
3635 };
3636 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3637 let _ = pool.allocate(0);
3638 let _ = pool.allocate(1);
3639 let _ = pool.allocate(0);
3640
3641 let telemetry = pool.telemetry();
3642 let json = telemetry.as_json();
3643 assert_eq!(json["node_count"], serde_json::json!(2));
3644 assert_eq!(json["total_allocs"], serde_json::json!(3));
3645 assert_eq!(json["total_in_use"], serde_json::json!(3));
3646 assert_eq!(json["cross_node_allocs"], serde_json::json!(0));
3647 assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(0));
3648 assert_eq!(json["local_allocs"], serde_json::json!(3));
3649 assert_eq!(json["remote_allocs"], serde_json::json!(0));
3650 assert_eq!(
3651 json["allocation_ratio_bps"]["local"],
3652 serde_json::json!(10_000)
3653 );
3654 assert_eq!(json["allocation_ratio_bps"]["remote"], serde_json::json!(0));
3655 assert_eq!(
3656 json["allocation_ratio_bps"]["scale"],
3657 serde_json::json!(10_000)
3658 );
3659 assert_eq!(json["hugepage_hit_rate_bps"]["value"], serde_json::json!(0));
3660 assert_eq!(
3661 json["latency_proxies_bps"]["tlb_miss_pressure"],
3662 serde_json::json!(0)
3663 );
3664 assert_eq!(
3665 json["latency_proxies_bps"]["cache_miss_pressure"],
3666 serde_json::json!(937)
3667 );
3668 assert_eq!(
3669 json["latency_proxies_bps"]["occupancy_pressure"],
3670 serde_json::json!(937)
3671 );
3672 assert_eq!(
3673 json["latency_proxies_bps"]["scale"],
3674 serde_json::json!(10_000)
3675 );
3676 assert_eq!(json["pressure_bands"]["tlb_miss"], serde_json::json!("low"));
3677 assert_eq!(
3678 json["pressure_bands"]["cache_miss"],
3679 serde_json::json!("low")
3680 );
3681 assert_eq!(
3682 json["pressure_bands"]["occupancy"],
3683 serde_json::json!("low")
3684 );
3685 assert_eq!(
3686 json["fallback_reasons"]["cross_node"],
3687 serde_json::Value::Null
3688 );
3689 assert_eq!(
3690 json["fallback_reasons"]["hugepage"],
3691 serde_json::json!("hugepage_disabled")
3692 );
3693 assert_eq!(json["config"]["slab_capacity"], serde_json::json!(16));
3694 assert_eq!(json["per_node"].as_array().map(std::vec::Vec::len), Some(2));
3695 }
3696
3697 #[test]
3698 fn thread_affinity_advice_matches_placement_manifest() {
3699 let topology =
3700 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
3701 let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3702 let advice = manifest.affinity_advice(AffinityEnforcement::Advisory);
3703 assert_eq!(advice.len(), 4);
3704 assert_eq!(advice[0].shard_id, 0);
3705 assert_eq!(advice[0].recommended_core, 0);
3706 assert_eq!(advice[0].recommended_numa_node, 0);
3707 assert_eq!(advice[0].enforcement, AffinityEnforcement::Advisory);
3708 assert_eq!(advice[1].recommended_numa_node, 1);
3709 assert_eq!(advice[3].recommended_numa_node, 1);
3710 }
3711
3712 #[test]
3713 fn thread_affinity_advice_json_is_stable() {
3714 let advice = ThreadAffinityAdvice {
3715 shard_id: 0,
3716 recommended_core: 3,
3717 recommended_numa_node: 1,
3718 enforcement: AffinityEnforcement::Strict,
3719 };
3720 let json = advice.as_json();
3721 assert_eq!(json["shard_id"], serde_json::json!(0));
3722 assert_eq!(json["recommended_core"], serde_json::json!(3));
3723 assert_eq!(json["enforcement"], serde_json::json!("strict"));
3724 }
3725
3726 #[test]
3727 fn reactor_mesh_preferred_numa_node_uses_manifest() {
3728 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (4, 1), (8, 2)]);
3729 let mesh = ReactorMesh::new(ReactorMeshConfig {
3730 shard_count: 3,
3731 lane_capacity: 8,
3732 topology: Some(topology),
3733 });
3734 assert_eq!(mesh.preferred_numa_node(0), 0);
3735 assert_eq!(mesh.preferred_numa_node(1), 1);
3736 assert_eq!(mesh.preferred_numa_node(2), 2);
3737 assert_eq!(mesh.preferred_numa_node(99), 0); }
3739
3740 #[test]
3741 fn reactor_mesh_affinity_advice_covers_all_shards() {
3742 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1)]);
3743 let mesh = ReactorMesh::new(ReactorMeshConfig {
3744 shard_count: 2,
3745 lane_capacity: 8,
3746 topology: Some(topology),
3747 });
3748 let advice = mesh.affinity_advice(AffinityEnforcement::Disabled);
3749 assert_eq!(advice.len(), 2);
3750 assert_eq!(advice[0].enforcement, AffinityEnforcement::Disabled);
3751 assert_eq!(advice[1].enforcement, AffinityEnforcement::Disabled);
3752 }
3753
3754 #[test]
3755 fn numa_slab_pool_from_manifest_with_no_topology_creates_single_node() {
3756 let manifest = ReactorPlacementManifest::plan(4, None);
3757 let pool = NumaSlabPool::from_manifest(&manifest, NumaSlabConfig::default());
3758 assert_eq!(pool.node_count(), 1);
3759 }
3760
3761 #[test]
3762 fn numa_node_for_shard_returns_none_for_unknown() {
3763 let manifest = ReactorPlacementManifest::plan(2, None);
3764 assert!(manifest.numa_node_for_shard(0).is_some());
3765 assert!(manifest.numa_node_for_shard(99).is_none());
3766 }
3767
3768 #[test]
3769 fn numa_slab_capacity_clamp_to_at_least_one() {
3770 let slab = NumaSlab::new(0, 0);
3771 assert_eq!(slab.capacity, 1);
3772 }
3773
3774 #[test]
3775 fn cross_node_reason_code_matches() {
3776 assert_eq!(CrossNodeReason::LocalExhausted.as_code(), "local_exhausted");
3777 }
3778
3779 #[test]
3780 fn affinity_enforcement_code_coverage() {
3781 assert_eq!(AffinityEnforcement::Advisory.as_code(), "advisory");
3782 assert_eq!(AffinityEnforcement::Strict.as_code(), "strict");
3783 assert_eq!(AffinityEnforcement::Disabled.as_code(), "disabled");
3784 }
3785
3786 #[test]
3789 fn enqueue_hostcall_completions_batch_preserves_order() {
3790 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
3791 let completions = vec![
3792 (
3793 "c-1".to_string(),
3794 HostcallOutcome::Success(serde_json::json!(1)),
3795 ),
3796 (
3797 "c-2".to_string(),
3798 HostcallOutcome::Success(serde_json::json!(2)),
3799 ),
3800 (
3801 "c-3".to_string(),
3802 HostcallOutcome::Success(serde_json::json!(3)),
3803 ),
3804 ];
3805 sched.enqueue_hostcall_completions(completions);
3806 assert_eq!(sched.macrotask_count(), 3);
3807
3808 for expected in ["c-1", "c-2", "c-3"] {
3810 let task = sched.tick().expect("should have macrotask");
3811 match task.kind {
3812 MacrotaskKind::HostcallComplete { ref call_id, .. } => {
3813 assert_eq!(call_id, expected);
3814 }
3815 _ => unreachable!(),
3816 }
3817 }
3818 assert!(sched.tick().is_none());
3819 }
3820
3821 #[test]
3822 fn time_until_next_timer_positive_case() {
3823 let mut sched = Scheduler::with_clock(DeterministicClock::new(100));
3824 sched.set_timeout(50); assert_eq!(sched.time_until_next_timer(), Some(50));
3826
3827 sched.clock.advance(20); assert_eq!(sched.time_until_next_timer(), Some(30));
3829 }
3830
3831 #[test]
3832 fn deterministic_clock_set_overrides_current_time() {
3833 let clock = DeterministicClock::new(0);
3834 assert_eq!(clock.now_ms(), 0);
3835 clock.advance(50);
3836 assert_eq!(clock.now_ms(), 50);
3837 clock.set(1000);
3838 assert_eq!(clock.now_ms(), 1000);
3839 clock.advance(5);
3840 assert_eq!(clock.now_ms(), 1005);
3841 }
3842
3843 #[test]
3844 fn reactor_mesh_queue_depth_per_shard() {
3845 let config = ReactorMeshConfig {
3846 shard_count: 4,
3847 lane_capacity: 64,
3848 topology: None,
3849 };
3850 let mut mesh = ReactorMesh::new(config);
3851
3852 for shard in 0..4 {
3854 assert_eq!(mesh.queue_depth(shard), Some(0));
3855 }
3856 assert_eq!(mesh.queue_depth(99), None);
3858
3859 for i in 0..4 {
3861 mesh.enqueue_event(format!("evt-{i}"), serde_json::json!(null))
3862 .expect("enqueue should succeed");
3863 }
3864 for shard in 0..4 {
3866 assert_eq!(mesh.queue_depth(shard), Some(1), "shard {shard} depth");
3867 }
3868 }
3869
3870 #[test]
3871 fn reactor_mesh_shard_count_and_total_depth() {
3872 let config = ReactorMeshConfig {
3873 shard_count: 3,
3874 lane_capacity: 16,
3875 topology: None,
3876 };
3877 let mut mesh = ReactorMesh::new(config);
3878 assert_eq!(mesh.shard_count(), 3);
3879 assert_eq!(mesh.total_depth(), 0);
3880 assert!(!mesh.has_pending());
3881
3882 mesh.enqueue_event("e1".to_string(), serde_json::json!(null))
3883 .unwrap();
3884 mesh.enqueue_event("e2".to_string(), serde_json::json!(null))
3885 .unwrap();
3886 assert_eq!(mesh.total_depth(), 2);
3887 assert!(mesh.has_pending());
3888 }
3889
3890 #[test]
3891 fn reactor_mesh_drain_shard_out_of_range_returns_empty() {
3892 let config = ReactorMeshConfig {
3893 shard_count: 2,
3894 lane_capacity: 16,
3895 topology: None,
3896 };
3897 let mut mesh = ReactorMesh::new(config);
3898 mesh.enqueue_event("e1".to_string(), serde_json::json!(null))
3899 .unwrap();
3900 let drained = mesh.drain_shard(99, 10);
3901 assert!(drained.is_empty());
3902 }
3903
3904 #[test]
3905 fn reactor_mesh_zero_shards_is_empty_and_rejects_enqueues() {
3906 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
3907 shard_count: 0,
3908 lane_capacity: 16,
3909 topology: None,
3910 });
3911
3912 assert_eq!(mesh.shard_count(), 0);
3913 assert_eq!(mesh.total_depth(), 0);
3914 assert!(!mesh.has_pending());
3915 assert_eq!(mesh.queue_depth(0), None);
3916 assert!(mesh.telemetry().queue_depths.is_empty());
3917
3918 let err = mesh
3919 .enqueue_event("evt".to_string(), serde_json::json!(null))
3920 .expect_err("empty mesh should reject enqueues");
3921 assert_eq!(err.shard_id, 0);
3922 assert_eq!(err.depth, 0);
3923 assert_eq!(err.capacity, 0);
3924 assert_eq!(mesh.telemetry().rejected_enqueues, 1);
3925 }
3926
3927 #[test]
3928 fn reactor_mesh_zero_capacity_is_empty_and_rejects_enqueues() {
3929 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
3930 shard_count: 4,
3931 lane_capacity: 0,
3932 topology: None,
3933 });
3934
3935 assert_eq!(mesh.shard_count(), 0);
3936 assert_eq!(mesh.total_depth(), 0);
3937 assert!(!mesh.has_pending());
3938 assert_eq!(mesh.telemetry().queue_depths, Vec::<usize>::new());
3939
3940 let err = mesh
3941 .enqueue_hostcall_complete(
3942 "call".to_string(),
3943 HostcallOutcome::Success(serde_json::json!({"ok": true})),
3944 )
3945 .expect_err("empty mesh should reject hostcall completions");
3946 assert_eq!(err.shard_id, 0);
3947 assert_eq!(err.depth, 0);
3948 assert_eq!(err.capacity, 0);
3949 assert_eq!(mesh.telemetry().rejected_enqueues, 1);
3950 }
3951
3952 #[test]
3953 fn reactor_placement_manifest_zero_shards() {
3954 let manifest = ReactorPlacementManifest::plan(0, None);
3955 assert_eq!(manifest.shard_count, 0);
3956 assert!(manifest.bindings.is_empty());
3957 assert!(manifest.fallback_reason.is_none());
3958 }
3959
3960 #[test]
3961 fn reactor_placement_manifest_as_json_has_expected_fields() {
3962 let topology =
3963 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
3964 let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3965 let json = manifest.as_json();
3966
3967 assert_eq!(json["shard_count"], 4);
3968 assert_eq!(json["numa_node_count"], 2);
3969 assert!(json["fallback_reason"].is_null());
3970 let bindings = json["bindings"].as_array().expect("bindings array");
3971 assert_eq!(bindings.len(), 4);
3972 for binding in bindings {
3973 assert!(binding.get("shard_id").is_some());
3974 assert!(binding.get("core_id").is_some());
3975 assert!(binding.get("numa_node").is_some());
3976 }
3977 }
3978
3979 #[test]
3980 fn reactor_placement_manifest_single_node_fallback() {
3981 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0)]);
3982 let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3983 assert_eq!(
3984 manifest.fallback_reason,
3985 Some(ReactorPlacementFallbackReason::SingleNumaNode)
3986 );
3987 }
3988
3989 #[test]
3990 fn reactor_placement_manifest_empty_topology_fallback() {
3991 let topology = ReactorTopologySnapshot { cores: vec![] };
3992 let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3993 assert_eq!(
3994 manifest.fallback_reason,
3995 Some(ReactorPlacementFallbackReason::TopologyEmpty)
3996 );
3997 }
3998
3999 #[test]
4000 fn reactor_placement_fallback_reason_as_code_all_variants() {
4001 assert_eq!(
4002 ReactorPlacementFallbackReason::TopologyUnavailable.as_code(),
4003 "topology_unavailable"
4004 );
4005 assert_eq!(
4006 ReactorPlacementFallbackReason::TopologyEmpty.as_code(),
4007 "topology_empty"
4008 );
4009 assert_eq!(
4010 ReactorPlacementFallbackReason::SingleNumaNode.as_code(),
4011 "single_numa_node"
4012 );
4013 }
4014
4015 #[test]
4016 fn hugepage_fallback_reason_as_code_all_variants() {
4017 assert_eq!(
4018 HugepageFallbackReason::Disabled.as_code(),
4019 "hugepage_disabled"
4020 );
4021 assert_eq!(
4022 HugepageFallbackReason::DetectionUnavailable.as_code(),
4023 "detection_unavailable"
4024 );
4025 assert_eq!(
4026 HugepageFallbackReason::InsufficientHugepages.as_code(),
4027 "insufficient_hugepages"
4028 );
4029 assert_eq!(
4030 HugepageFallbackReason::AlignmentMismatch.as_code(),
4031 "alignment_mismatch"
4032 );
4033 }
4034
4035 #[test]
4036 fn numa_slab_pool_set_hugepage_status_and_node_count() {
4037 let manifest = ReactorPlacementManifest::plan(4, None);
4038 let config = NumaSlabConfig {
4039 slab_capacity: 4096,
4040 entry_size_bytes: 512,
4041 hugepage: HugepageConfig {
4042 page_size_bytes: 2 * 1024 * 1024,
4043 enabled: true,
4044 },
4045 };
4046 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
4047 assert_eq!(pool.node_count(), 1);
4048
4049 let status = HugepageStatus::evaluate(&config.hugepage, 512, 256);
4050 assert!(status.active);
4051 pool.set_hugepage_status(status);
4052
4053 let telem = pool.telemetry();
4054 assert!(telem.hugepage_status.active);
4055 assert_eq!(telem.hugepage_status.free_pages, 256);
4056 }
4057
4058 #[test]
4059 fn numa_slab_pool_multi_node_node_count() {
4060 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1), (2, 2)]);
4061 let manifest = ReactorPlacementManifest::plan(3, Some(&topology));
4062 let pool = NumaSlabPool::from_manifest(&manifest, NumaSlabConfig::default());
4063 assert_eq!(pool.node_count(), 3);
4064 }
4065
4066 #[test]
4067 fn reactor_mesh_telemetry_as_json_has_expected_shape() {
4068 let config = ReactorMeshConfig {
4069 shard_count: 2,
4070 lane_capacity: 8,
4071 topology: None,
4072 };
4073 let mesh = ReactorMesh::new(config);
4074 let telem = mesh.telemetry();
4075 let json = telem.as_json();
4076
4077 let depths = json["queue_depths"].as_array().expect("queue_depths");
4078 assert_eq!(depths.len(), 2);
4079 assert_eq!(json["rejected_enqueues"], 0);
4080 let bindings = json["shard_bindings"].as_array().expect("shard_bindings");
4081 assert_eq!(bindings.len(), 2);
4082 assert!(json.get("fallback_reason").is_some());
4083 }
4084
4085 #[test]
4086 fn numa_slab_in_use_and_has_capacity() {
4087 let mut slab = NumaSlab::new(0, 3);
4088 assert_eq!(slab.in_use(), 0);
4089 assert!(slab.has_capacity());
4090
4091 let h1 = slab.allocate().expect("alloc 1");
4092 assert_eq!(slab.in_use(), 1);
4093 assert!(slab.has_capacity());
4094
4095 let h2 = slab.allocate().expect("alloc 2");
4096 let _h3 = slab.allocate().expect("alloc 3");
4097 assert_eq!(slab.in_use(), 3);
4098 assert!(!slab.has_capacity());
4099 assert!(slab.allocate().is_none());
4100
4101 slab.deallocate(&h1);
4102 assert_eq!(slab.in_use(), 2);
4103 assert!(slab.has_capacity());
4104
4105 slab.deallocate(&h2);
4106 assert_eq!(slab.in_use(), 1);
4107 }
4108
4109 #[test]
4110 fn scheduler_macrotask_count_tracks_queue_size() {
4111 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4112 assert_eq!(sched.macrotask_count(), 0);
4113
4114 sched.enqueue_event("e1".to_string(), serde_json::json!(null));
4115 sched.enqueue_event("e2".to_string(), serde_json::json!(null));
4116 assert_eq!(sched.macrotask_count(), 2);
4117
4118 sched.tick();
4119 assert_eq!(sched.macrotask_count(), 1);
4120
4121 sched.tick();
4122 assert_eq!(sched.macrotask_count(), 0);
4123 }
4124
4125 #[test]
4126 fn scheduler_timer_count_reflects_pending_timers() {
4127 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4128 assert_eq!(sched.timer_count(), 0);
4129
4130 sched.set_timeout(100);
4131 sched.set_timeout(200);
4132 assert_eq!(sched.timer_count(), 2);
4133
4134 sched.clock.advance(150);
4136 sched.tick();
4137 assert_eq!(sched.timer_count(), 1);
4138 }
4139
4140 #[test]
4141 fn scheduler_current_seq_advances_with_operations() {
4142 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4143 let initial = sched.current_seq();
4144 assert_eq!(initial.value(), 0);
4145
4146 sched.set_timeout(100); assert!(sched.current_seq().value() > initial.value());
4148
4149 let after_timer = sched.current_seq();
4150 sched.enqueue_event("evt".to_string(), serde_json::json!(null)); assert!(sched.current_seq().value() > after_timer.value());
4152 }
4153
4154 #[test]
4155 fn thread_affinity_advice_as_json_structure() {
4156 let advice = ThreadAffinityAdvice {
4157 shard_id: 2,
4158 recommended_core: 5,
4159 recommended_numa_node: 1,
4160 enforcement: AffinityEnforcement::Strict,
4161 };
4162 let json = advice.as_json();
4163 assert_eq!(json["shard_id"], 2);
4164 assert_eq!(json["recommended_core"], 5);
4165 assert_eq!(json["recommended_numa_node"], 1);
4166 assert_eq!(json["enforcement"], "strict");
4167 }
4168
4169 mod proptest_scheduler {
4172 use super::*;
4173 use proptest::prelude::*;
4174
4175 proptest! {
4176 #[test]
4177 fn seq_next_is_monotonic(start in 0..u64::MAX - 100) {
4178 let s = Seq(start);
4179 let n = s.next();
4180 assert!(n >= s, "Seq::next must be monotonically non-decreasing");
4181 assert!(
4182 n.value() == start + 1 || start == u64::MAX,
4183 "Seq::next must increment by 1 unless saturated"
4184 );
4185 }
4186
4187 #[test]
4188 fn seq_next_saturates(start in u64::MAX - 5..=u64::MAX) {
4189 let s = Seq(start);
4190 let n = s.next();
4191 let _ = n.value();
4193 assert!(n >= s, "must be monotonic even at saturation boundary");
4194 }
4195
4196 #[test]
4197 fn timer_entry_ordering_consistent_with_min_heap(
4198 id_a in 0..1000u64,
4199 id_b in 0..1000u64,
4200 deadline_a in 0..10000u64,
4201 deadline_b in 0..10000u64,
4202 seq_a in 0..1000u64,
4203 seq_b in 0..1000u64,
4204 ) {
4205 let ta = TimerEntry::new(id_a, deadline_a, Seq(seq_a));
4206 let tb = TimerEntry::new(id_b, deadline_b, Seq(seq_b));
4207 if deadline_a < deadline_b {
4209 assert!(ta > tb, "earlier deadline must sort greater (min-heap)");
4210 } else if deadline_a > deadline_b {
4211 assert!(ta < tb, "later deadline must sort less (min-heap)");
4212 } else if seq_a < seq_b {
4213 assert!(ta > tb, "same deadline, earlier seq must sort greater");
4214 } else if seq_a > seq_b {
4215 assert!(ta < tb, "same deadline, later seq must sort less");
4216 } else {
4217 assert!(ta == tb, "same deadline+seq must be equal");
4218 }
4219 }
4220
4221 #[test]
4222 fn stable_hash_is_deterministic(input in "[a-z0-9_.-]{1,64}") {
4223 let h1 = ReactorMesh::stable_hash(&input);
4224 let h2 = ReactorMesh::stable_hash(&input);
4225 assert!(h1 == h2, "stable_hash must be deterministic");
4226 }
4227
4228 #[test]
4229 fn hash_route_returns_valid_shard(
4230 shard_count in 1..32usize,
4231 call_id in "[a-z0-9]{1,20}",
4232 ) {
4233 let config = ReactorMeshConfig {
4234 shard_count,
4235 lane_capacity: 16,
4236 topology: None,
4237 };
4238 let mesh = ReactorMesh::new(config);
4239 let shard = mesh.hash_route(&call_id);
4240 assert!(
4241 shard < mesh.shard_count(),
4242 "hash_route returned {shard} >= shard_count {}",
4243 mesh.shard_count(),
4244 );
4245 }
4246
4247 #[test]
4248 fn rr_route_returns_valid_shard(
4249 shard_count in 1..32usize,
4250 iterations in 1..100usize,
4251 ) {
4252 let config = ReactorMeshConfig {
4253 shard_count,
4254 lane_capacity: 16,
4255 topology: None,
4256 };
4257 let mut mesh = ReactorMesh::new(config);
4258 for _ in 0..iterations {
4259 let shard = mesh.rr_route();
4260 assert!(
4261 shard < mesh.shard_count(),
4262 "rr_route returned {shard} >= shard_count {}",
4263 mesh.shard_count(),
4264 );
4265 }
4266 }
4267
4268 #[test]
4269 fn drain_global_order_is_sorted(
4270 shard_count in 1..8usize,
4271 lane_capacity in 2..16usize,
4272 enqueues in 1..30usize,
4273 ) {
4274 let config = ReactorMeshConfig {
4275 shard_count,
4276 lane_capacity,
4277 topology: None,
4278 };
4279 let mut mesh = ReactorMesh::new(config);
4280 let mut success_count = 0usize;
4281 for i in 0..enqueues {
4282 let call_id = format!("call_{i}");
4283 let outcome = HostcallOutcome::Success(serde_json::Value::Null);
4284 if mesh.enqueue_hostcall_complete(call_id, outcome).is_ok() {
4285 success_count += 1;
4286 }
4287 }
4288 let drained = mesh.drain_global_order(success_count);
4289 for pair in drained.windows(2) {
4291 assert!(
4292 pair[0].global_seq < pair[1].global_seq,
4293 "drain_global_order must emit ascending seq: {:?} vs {:?}",
4294 pair[0].global_seq,
4295 pair[1].global_seq,
4296 );
4297 }
4298 }
4299
4300 #[test]
4301 fn mesh_total_depth_bounded_by_capacity(
4302 shard_count in 1..8usize,
4303 lane_capacity in 1..16usize,
4304 enqueues in 0..100usize,
4305 ) {
4306 let config = ReactorMeshConfig {
4307 shard_count,
4308 lane_capacity,
4309 topology: None,
4310 };
4311 let mut mesh = ReactorMesh::new(config);
4312 for i in 0..enqueues {
4313 let call_id = format!("call_{i}");
4314 let outcome = HostcallOutcome::Success(serde_json::Value::Null);
4315 let _ = mesh.enqueue_hostcall_complete(call_id, outcome);
4316 }
4317 let max_total = shard_count * lane_capacity;
4318 assert!(
4319 mesh.total_depth() <= max_total,
4320 "total_depth {} exceeds max possible {}",
4321 mesh.total_depth(),
4322 max_total,
4323 );
4324 }
4325
4326 #[test]
4327 fn scheduler_timer_cancel_idempotent(
4328 timer_count in 1..10usize,
4329 cancel_idx in 0..10usize,
4330 ) {
4331 let clock = DeterministicClock::new(0);
4332 let mut sched = Scheduler::with_clock(clock);
4333 let mut timer_ids = Vec::new();
4334 for i in 0..timer_count {
4335 timer_ids.push(sched.set_timeout(u64::try_from(i + 1).unwrap() * 100));
4336 }
4337 if cancel_idx < timer_ids.len() {
4338 let tid = timer_ids[cancel_idx];
4339 let first = sched.clear_timeout(tid);
4340 let second = sched.clear_timeout(tid);
4341 assert!(first, "first cancel should succeed");
4342 assert!(!second, "second cancel should return false");
4343 }
4344 }
4345 }
4346 }
4347}