1use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::collections::BinaryHeap;
20use std::collections::VecDeque;
21use std::fmt;
22use std::sync::Arc;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub struct Seq(u64);
27
28impl Seq {
29 #[must_use]
31 pub const fn zero() -> Self {
32 Self(0)
33 }
34
35 #[must_use]
37 pub const fn next(self) -> Self {
38 Self(self.0.saturating_add(1))
39 }
40
41 #[must_use]
43 pub const fn value(self) -> u64 {
44 self.0
45 }
46}
47
48impl fmt::Display for Seq {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 write!(f, "seq:{}", self.0)
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct TimerEntry {
57 pub timer_id: u64,
59 pub deadline_ms: u64,
61 pub seq: Seq,
63}
64
65impl TimerEntry {
66 #[must_use]
68 pub const fn new(timer_id: u64, deadline_ms: u64, seq: Seq) -> Self {
69 Self {
70 timer_id,
71 deadline_ms,
72 seq,
73 }
74 }
75}
76
77impl PartialEq for TimerEntry {
79 fn eq(&self, other: &Self) -> bool {
80 self.deadline_ms == other.deadline_ms && self.seq == other.seq
81 }
82}
83
84impl Eq for TimerEntry {}
85
86impl PartialOrd for TimerEntry {
87 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88 Some(self.cmp(other))
89 }
90}
91
92impl Ord for TimerEntry {
93 fn cmp(&self, other: &Self) -> Ordering {
94 match other.deadline_ms.cmp(&self.deadline_ms) {
96 Ordering::Equal => other.seq.cmp(&self.seq),
97 ord => ord,
98 }
99 }
100}
101
102#[derive(Debug, Clone)]
104pub enum MacrotaskKind {
105 TimerFired { timer_id: u64 },
107 HostcallComplete {
109 call_id: String,
110 outcome: HostcallOutcome,
111 },
112 InboundEvent {
114 event_id: String,
115 payload: serde_json::Value,
116 },
117}
118
119#[derive(Debug, Clone)]
121pub enum HostcallOutcome {
122 Success(serde_json::Value),
124 Error { code: String, message: String },
126 StreamChunk {
128 sequence: u64,
130 chunk: serde_json::Value,
132 is_final: bool,
134 },
135}
136
137#[derive(Debug, Clone)]
139pub struct Macrotask {
140 pub seq: Seq,
142 pub kind: MacrotaskKind,
144}
145
146impl Macrotask {
147 #[must_use]
149 pub const fn new(seq: Seq, kind: MacrotaskKind) -> Self {
150 Self { seq, kind }
151 }
152}
153
154impl PartialEq for Macrotask {
156 fn eq(&self, other: &Self) -> bool {
157 self.seq == other.seq
158 }
159}
160
161impl Eq for Macrotask {}
162
163impl PartialOrd for Macrotask {
164 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
165 Some(self.cmp(other))
166 }
167}
168
169impl Ord for Macrotask {
170 fn cmp(&self, other: &Self) -> Ordering {
171 self.seq.cmp(&other.seq)
173 }
174}
175
176pub trait Clock: Send + Sync {
178 fn now_ms(&self) -> u64;
180}
181
182impl<C: Clock> Clock for Arc<C> {
183 fn now_ms(&self) -> u64 {
184 self.as_ref().now_ms()
185 }
186}
187
188#[derive(Debug, Clone, Copy, Default)]
190pub struct WallClock;
191
192impl Clock for WallClock {
193 fn now_ms(&self) -> u64 {
194 use std::time::{SystemTime, UNIX_EPOCH};
195 let millis = SystemTime::now()
196 .duration_since(UNIX_EPOCH)
197 .unwrap_or_default()
198 .as_millis();
199 u64::try_from(millis).unwrap_or(u64::MAX)
200 }
201}
202
203#[derive(Debug)]
205pub struct DeterministicClock {
206 current_ms: std::sync::atomic::AtomicU64,
207}
208
209impl DeterministicClock {
210 #[must_use]
212 pub const fn new(start_ms: u64) -> Self {
213 Self {
214 current_ms: std::sync::atomic::AtomicU64::new(start_ms),
215 }
216 }
217
218 pub fn advance(&self, ms: u64) {
220 self.current_ms
221 .fetch_add(ms, std::sync::atomic::Ordering::SeqCst);
222 }
223
224 pub fn set(&self, ms: u64) {
226 self.current_ms
227 .store(ms, std::sync::atomic::Ordering::SeqCst);
228 }
229}
230
231impl Clock for DeterministicClock {
232 fn now_ms(&self) -> u64 {
233 self.current_ms.load(std::sync::atomic::Ordering::SeqCst)
234 }
235}
236
237pub struct Scheduler<C: Clock = WallClock> {
239 seq: Seq,
241 macrotask_queue: VecDeque<Macrotask>,
243 timer_heap: BinaryHeap<TimerEntry>,
245 next_timer_id: u64,
247 cancelled_timers: std::collections::HashSet<u64>,
249 heap_timer_ids: std::collections::HashSet<u64>,
251 clock: C,
253}
254
255impl Scheduler<WallClock> {
256 #[must_use]
258 pub fn new() -> Self {
259 Self::with_clock(WallClock)
260 }
261}
262
263impl Default for Scheduler<WallClock> {
264 fn default() -> Self {
265 Self::new()
266 }
267}
268
269impl<C: Clock> Scheduler<C> {
270 #[must_use]
272 pub fn with_clock(clock: C) -> Self {
273 Self {
274 seq: Seq::zero(),
275 macrotask_queue: VecDeque::new(),
276 timer_heap: BinaryHeap::new(),
277 next_timer_id: 1,
278 cancelled_timers: std::collections::HashSet::new(),
279 heap_timer_ids: std::collections::HashSet::new(),
280 clock,
281 }
282 }
283
284 #[must_use]
286 pub const fn current_seq(&self) -> Seq {
287 self.seq
288 }
289
290 const fn next_seq(&mut self) -> Seq {
292 let current = self.seq;
293 self.seq = self.seq.next();
294 current
295 }
296
297 #[must_use]
299 pub fn now_ms(&self) -> u64 {
300 self.clock.now_ms()
301 }
302
303 #[must_use]
305 pub fn has_pending(&self) -> bool {
306 !self.macrotask_queue.is_empty()
307 || self
308 .timer_heap
309 .iter()
310 .any(|entry| !self.cancelled_timers.contains(&entry.timer_id))
311 }
312
313 #[must_use]
315 pub fn macrotask_count(&self) -> usize {
316 self.macrotask_queue.len()
317 }
318
319 #[must_use]
321 pub fn timer_count(&self) -> usize {
322 self.timer_heap.len()
323 }
324
325 pub fn set_timeout(&mut self, delay_ms: u64) -> u64 {
329 let timer_id = self.allocate_timer_id();
330 let deadline_ms = self.clock.now_ms().saturating_add(delay_ms);
331 let seq = self.next_seq();
332
333 self.timer_heap
334 .push(TimerEntry::new(timer_id, deadline_ms, seq));
335 self.heap_timer_ids.insert(timer_id);
336
337 tracing::trace!(
338 event = "scheduler.timer.set",
339 timer_id,
340 delay_ms,
341 deadline_ms,
342 %seq,
343 "Timer scheduled"
344 );
345
346 timer_id
347 }
348
349 fn timer_id_in_use(&self, timer_id: u64) -> bool {
350 self.heap_timer_ids.contains(&timer_id)
351 }
352
353 fn allocate_timer_id(&mut self) -> u64 {
354 if self.next_timer_id < u64::MAX {
355 let timer_id = self.next_timer_id;
356 self.next_timer_id += 1;
357 return timer_id;
358 }
359
360 if !self.timer_id_in_use(u64::MAX) {
361 self.next_timer_id = 1;
362 return u64::MAX;
363 }
364
365 for candidate in 1..u64::MAX {
366 if !self.timer_id_in_use(candidate) {
367 self.next_timer_id = candidate.saturating_add(1);
368 return candidate;
369 }
370 }
371
372 tracing::error!(
373 event = "scheduler.timer_id.exhausted",
374 "Timer ID namespace exhausted; falling back to u64::MAX reuse"
375 );
376 u64::MAX
377 }
378
379 pub fn clear_timeout(&mut self, timer_id: u64) -> bool {
383 let pending =
384 self.heap_timer_ids.contains(&timer_id) && !self.cancelled_timers.contains(&timer_id);
385
386 let cancelled = if pending {
387 self.cancelled_timers.insert(timer_id)
388 } else {
389 false
390 };
391
392 tracing::trace!(
393 event = "scheduler.timer.cancel",
394 timer_id,
395 cancelled,
396 "Timer cancelled"
397 );
398
399 cancelled
400 }
401
402 pub fn enqueue_hostcall_complete(&mut self, call_id: String, outcome: HostcallOutcome) {
404 let seq = self.next_seq();
405 tracing::trace!(
406 event = "scheduler.hostcall.enqueue",
407 call_id = %call_id,
408 %seq,
409 "Hostcall completion enqueued"
410 );
411 let task = Macrotask::new(seq, MacrotaskKind::HostcallComplete { call_id, outcome });
412 self.macrotask_queue.push_back(task);
413 }
414
415 pub fn enqueue_hostcall_completions<I>(&mut self, completions: I)
417 where
418 I: IntoIterator<Item = (String, HostcallOutcome)>,
419 {
420 for (call_id, outcome) in completions {
421 self.enqueue_hostcall_complete(call_id, outcome);
422 }
423 }
424
425 pub fn enqueue_stream_chunk(
427 &mut self,
428 call_id: String,
429 sequence: u64,
430 chunk: serde_json::Value,
431 is_final: bool,
432 ) {
433 self.enqueue_hostcall_complete(
434 call_id,
435 HostcallOutcome::StreamChunk {
436 sequence,
437 chunk,
438 is_final,
439 },
440 );
441 }
442
443 pub fn enqueue_event(&mut self, event_id: String, payload: serde_json::Value) {
445 let seq = self.next_seq();
446 tracing::trace!(
447 event = "scheduler.event.enqueue",
448 event_id = %event_id,
449 %seq,
450 "Inbound event enqueued"
451 );
452 let task = Macrotask::new(seq, MacrotaskKind::InboundEvent { event_id, payload });
453 self.macrotask_queue.push_back(task);
454 }
455
456 fn move_due_timers(&mut self) {
460 let now = self.clock.now_ms();
461
462 while let Some(entry) = self.timer_heap.peek() {
463 if entry.deadline_ms > now {
464 break;
465 }
466
467 let entry = self.timer_heap.pop().expect("peeked");
468 self.heap_timer_ids.remove(&entry.timer_id);
469
470 if self.cancelled_timers.remove(&entry.timer_id) {
472 tracing::trace!(
473 event = "scheduler.timer.skip_cancelled",
474 timer_id = entry.timer_id,
475 "Skipped cancelled timer"
476 );
477 continue;
478 }
479
480 let task_seq = self.next_seq();
483 let task = Macrotask::new(
484 task_seq,
485 MacrotaskKind::TimerFired {
486 timer_id: entry.timer_id,
487 },
488 );
489 self.macrotask_queue.push_back(task);
490
491 tracing::trace!(
492 event = "scheduler.timer.fire",
493 timer_id = entry.timer_id,
494 deadline_ms = entry.deadline_ms,
495 now_ms = now,
496 timer_seq = %entry.seq,
497 macrotask_seq = %task_seq,
498 "Timer fired"
499 );
500 }
501 }
502
503 pub fn tick(&mut self) -> Option<Macrotask> {
513 self.move_due_timers();
515
516 let task = self.macrotask_queue.pop_front();
518
519 if let Some(ref task) = task {
520 tracing::debug!(
521 event = "scheduler.tick.execute",
522 seq = %task.seq,
523 kind = ?std::mem::discriminant(&task.kind),
524 "Executing macrotask"
525 );
526 } else {
527 tracing::trace!(event = "scheduler.tick.idle", "No macrotask to execute");
528 }
529
530 task
531 }
532
533 #[must_use]
535 pub fn next_timer_deadline(&self) -> Option<u64> {
536 self.timer_heap
537 .iter()
538 .filter(|entry| !self.cancelled_timers.contains(&entry.timer_id))
539 .map(|entry| entry.deadline_ms)
540 .min()
541 }
542
543 #[must_use]
545 pub fn time_until_next_timer(&self) -> Option<u64> {
546 self.next_timer_deadline()
547 .map(|deadline| deadline.saturating_sub(self.clock.now_ms()))
548 }
549}
550
551#[derive(Debug, Clone)]
557pub struct ReactorMeshConfig {
558 pub shard_count: usize,
560 pub lane_capacity: usize,
562 pub topology: Option<ReactorTopologySnapshot>,
564}
565
566impl Default for ReactorMeshConfig {
567 fn default() -> Self {
568 Self {
569 shard_count: 4,
570 lane_capacity: 1024,
571 topology: None,
572 }
573 }
574}
575
576#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
578pub struct ReactorTopologyCore {
579 pub core_id: usize,
580 pub numa_node: usize,
581}
582
583#[derive(Debug, Clone, PartialEq, Eq, Default)]
585pub struct ReactorTopologySnapshot {
586 pub cores: Vec<ReactorTopologyCore>,
587}
588
589impl ReactorTopologySnapshot {
590 #[must_use]
592 pub fn from_core_node_pairs(pairs: &[(usize, usize)]) -> Self {
593 let mut cores = pairs
594 .iter()
595 .map(|(core_id, numa_node)| ReactorTopologyCore {
596 core_id: *core_id,
597 numa_node: *numa_node,
598 })
599 .collect::<Vec<_>>();
600 cores.sort_unstable();
601 cores.dedup();
602 Self { cores }
603 }
604}
605
606#[derive(Debug, Clone, Copy, PartialEq, Eq)]
608pub enum ReactorPlacementFallbackReason {
609 TopologyUnavailable,
611 TopologyEmpty,
613 SingleNumaNode,
615}
616
617impl ReactorPlacementFallbackReason {
618 #[must_use]
619 pub const fn as_code(self) -> &'static str {
620 match self {
621 Self::TopologyUnavailable => "topology_unavailable",
622 Self::TopologyEmpty => "topology_empty",
623 Self::SingleNumaNode => "single_numa_node",
624 }
625 }
626}
627
628#[derive(Debug, Clone, Copy, PartialEq, Eq)]
630pub struct ReactorShardBinding {
631 pub shard_id: usize,
632 pub core_id: usize,
633 pub numa_node: usize,
634}
635
636#[derive(Debug, Clone, PartialEq, Eq)]
638pub struct ReactorPlacementManifest {
639 pub shard_count: usize,
640 pub numa_node_count: usize,
641 pub bindings: Vec<ReactorShardBinding>,
642 pub fallback_reason: Option<ReactorPlacementFallbackReason>,
643}
644
645impl ReactorPlacementManifest {
646 #[must_use]
648 pub fn plan(shard_count: usize, topology: Option<&ReactorTopologySnapshot>) -> Self {
649 if shard_count == 0 {
650 return Self {
651 shard_count: 0,
652 numa_node_count: 0,
653 bindings: Vec::new(),
654 fallback_reason: None,
655 };
656 }
657
658 let Some(topology) = topology else {
659 let bindings = (0..shard_count)
660 .map(|shard_id| ReactorShardBinding {
661 shard_id,
662 core_id: shard_id,
663 numa_node: 0,
664 })
665 .collect::<Vec<_>>();
666 return Self {
667 shard_count,
668 numa_node_count: 1,
669 bindings,
670 fallback_reason: Some(ReactorPlacementFallbackReason::TopologyUnavailable),
671 };
672 };
673
674 if topology.cores.is_empty() {
675 let bindings = (0..shard_count)
676 .map(|shard_id| ReactorShardBinding {
677 shard_id,
678 core_id: shard_id,
679 numa_node: 0,
680 })
681 .collect::<Vec<_>>();
682 return Self {
683 shard_count,
684 numa_node_count: 1,
685 bindings,
686 fallback_reason: Some(ReactorPlacementFallbackReason::TopologyEmpty),
687 };
688 }
689
690 let mut by_node = BTreeMap::<usize, Vec<usize>>::new();
691 for core in &topology.cores {
692 by_node
693 .entry(core.numa_node)
694 .or_default()
695 .push(core.core_id);
696 }
697 for cores in by_node.values_mut() {
698 cores.sort_unstable();
699 cores.dedup();
700 }
701 let nodes = by_node
702 .into_iter()
703 .filter(|(_, cores)| !cores.is_empty())
704 .collect::<Vec<_>>();
705
706 if nodes.is_empty() {
707 let bindings = (0..shard_count)
708 .map(|shard_id| ReactorShardBinding {
709 shard_id,
710 core_id: shard_id,
711 numa_node: 0,
712 })
713 .collect::<Vec<_>>();
714 return Self {
715 shard_count,
716 numa_node_count: 1,
717 bindings,
718 fallback_reason: Some(ReactorPlacementFallbackReason::TopologyEmpty),
719 };
720 }
721
722 let node_count = nodes.len();
723 let fallback_reason = if node_count == 1 {
724 Some(ReactorPlacementFallbackReason::SingleNumaNode)
725 } else {
726 None
727 };
728
729 let mut bindings = Vec::with_capacity(shard_count);
730 for shard_id in 0..shard_count {
731 let node_idx = shard_id % node_count;
732 let (numa_node, cores) = &nodes[node_idx];
733 let core_idx = (shard_id / node_count) % cores.len();
734 bindings.push(ReactorShardBinding {
735 shard_id,
736 core_id: cores[core_idx],
737 numa_node: *numa_node,
738 });
739 }
740
741 Self {
742 shard_count,
743 numa_node_count: node_count,
744 bindings,
745 fallback_reason,
746 }
747 }
748
749 #[must_use]
751 pub fn as_json(&self) -> serde_json::Value {
752 serde_json::json!({
753 "shard_count": self.shard_count,
754 "numa_node_count": self.numa_node_count,
755 "fallback_reason": self.fallback_reason.map(ReactorPlacementFallbackReason::as_code),
756 "bindings": self.bindings.iter().map(|binding| {
757 serde_json::json!({
758 "shard_id": binding.shard_id,
759 "core_id": binding.core_id,
760 "numa_node": binding.numa_node
761 })
762 }).collect::<Vec<_>>()
763 })
764 }
765}
766
767#[derive(Debug, Clone)]
769pub struct ReactorEnvelope {
770 pub global_seq: Seq,
772 pub shard_seq: u64,
774 pub shard_id: usize,
776 pub task: MacrotaskKind,
778}
779
780impl ReactorEnvelope {
781 const fn new(global_seq: Seq, shard_seq: u64, shard_id: usize, task: MacrotaskKind) -> Self {
782 Self {
783 global_seq,
784 shard_seq,
785 shard_id,
786 task,
787 }
788 }
789}
790
791#[derive(Debug, Clone, Copy, PartialEq, Eq)]
793pub struct ReactorBackpressure {
794 pub shard_id: usize,
795 pub depth: usize,
796 pub capacity: usize,
797}
798
799#[derive(Debug, Clone, PartialEq, Eq)]
801pub struct ReactorMeshTelemetry {
802 pub queue_depths: Vec<usize>,
803 pub max_queue_depths: Vec<usize>,
804 pub rejected_enqueues: u64,
805 pub shard_bindings: Vec<ReactorShardBinding>,
806 pub fallback_reason: Option<ReactorPlacementFallbackReason>,
807}
808
809impl ReactorMeshTelemetry {
810 #[must_use]
812 pub fn as_json(&self) -> serde_json::Value {
813 serde_json::json!({
814 "queue_depths": self.queue_depths,
815 "max_queue_depths": self.max_queue_depths,
816 "rejected_enqueues": self.rejected_enqueues,
817 "fallback_reason": self.fallback_reason.map(ReactorPlacementFallbackReason::as_code),
818 "shard_bindings": self.shard_bindings.iter().map(|binding| {
819 serde_json::json!({
820 "shard_id": binding.shard_id,
821 "core_id": binding.core_id,
822 "numa_node": binding.numa_node,
823 })
824 }).collect::<Vec<_>>()
825 })
826 }
827}
828
829#[derive(Debug, Clone)]
833struct SpscLane<T> {
834 capacity: usize,
835 queue: VecDeque<T>,
836 max_depth: usize,
837}
838
839impl<T> SpscLane<T> {
840 fn new(capacity: usize) -> Self {
841 Self {
842 capacity,
843 queue: VecDeque::with_capacity(capacity),
844 max_depth: 0,
845 }
846 }
847
848 fn len(&self) -> usize {
849 self.queue.len()
850 }
851
852 fn is_empty(&self) -> bool {
853 self.queue.is_empty()
854 }
855
856 fn push(&mut self, value: T) -> Result<(), usize> {
857 if self.queue.len() >= self.capacity {
858 return Err(self.queue.len());
859 }
860 self.queue.push_back(value);
861 self.max_depth = self.max_depth.max(self.queue.len());
862 Ok(())
863 }
864
865 fn pop(&mut self) -> Option<T> {
866 self.queue.pop_front()
867 }
868
869 fn front(&self) -> Option<&T> {
870 self.queue.front()
871 }
872}
873
874#[derive(Debug, Clone)]
884pub struct ReactorMesh {
885 seq: Seq,
886 lanes: Vec<SpscLane<ReactorEnvelope>>,
887 shard_seq: Vec<u64>,
888 rr_cursor: usize,
889 rejected_enqueues: u64,
890 placement_manifest: ReactorPlacementManifest,
891}
892
893impl ReactorMesh {
894 #[must_use]
899 #[allow(clippy::needless_pass_by_value)]
900 pub fn new(config: ReactorMeshConfig) -> Self {
901 let shard_count = config.shard_count.max(1);
902 let lane_capacity = config.lane_capacity.max(1);
903 let placement_manifest =
904 ReactorPlacementManifest::plan(shard_count, config.topology.as_ref());
905 let lanes = (0..shard_count)
906 .map(|_| SpscLane::new(lane_capacity))
907 .collect::<Vec<_>>();
908 Self {
909 seq: Seq::zero(),
910 lanes,
911 shard_seq: vec![0; shard_count],
912 rr_cursor: 0,
913 rejected_enqueues: 0,
914 placement_manifest,
915 }
916 }
917
918 #[must_use]
920 pub fn shard_count(&self) -> usize {
921 self.lanes.len()
922 }
923
924 #[must_use]
926 pub fn total_depth(&self) -> usize {
927 self.lanes.iter().map(SpscLane::len).sum()
928 }
929
930 #[must_use]
932 pub fn has_pending(&self) -> bool {
933 self.total_depth() > 0
934 }
935
936 #[must_use]
938 pub fn queue_depth(&self, shard_id: usize) -> Option<usize> {
939 self.lanes.get(shard_id).map(SpscLane::len)
940 }
941
942 #[must_use]
944 pub fn telemetry(&self) -> ReactorMeshTelemetry {
945 ReactorMeshTelemetry {
946 queue_depths: self.lanes.iter().map(SpscLane::len).collect(),
947 max_queue_depths: self.lanes.iter().map(|lane| lane.max_depth).collect(),
948 rejected_enqueues: self.rejected_enqueues,
949 shard_bindings: self.placement_manifest.bindings.clone(),
950 fallback_reason: self.placement_manifest.fallback_reason,
951 }
952 }
953
954 #[must_use]
956 pub const fn placement_manifest(&self) -> &ReactorPlacementManifest {
957 &self.placement_manifest
958 }
959
960 const fn next_global_seq(&mut self) -> Seq {
961 let current = self.seq;
962 self.seq = self.seq.next();
963 current
964 }
965
966 fn next_shard_seq(&mut self, shard_id: usize) -> u64 {
967 let Some(seq) = self.shard_seq.get_mut(shard_id) else {
968 return 0;
969 };
970 let current = *seq;
971 *seq = seq.saturating_add(1);
972 current
973 }
974
975 fn stable_hash(input: &str) -> u64 {
976 let mut hash = 0xcbf2_9ce4_8422_2325_u64;
978 for byte in input.as_bytes() {
979 hash ^= u64::from(*byte);
980 hash = hash.wrapping_mul(0x0100_0000_01b3_u64);
981 }
982 hash
983 }
984
985 fn hash_route(&self, call_id: &str) -> usize {
986 if self.lanes.len() <= 1 {
987 return 0;
988 }
989 let lanes = u64::try_from(self.lanes.len()).unwrap_or(1);
990 let slot = Self::stable_hash(call_id) % lanes;
991 usize::try_from(slot).unwrap_or(0)
992 }
993
994 fn rr_route(&mut self) -> usize {
995 if self.lanes.len() <= 1 {
996 return 0;
997 }
998 let idx = self.rr_cursor % self.lanes.len();
999 self.rr_cursor = self.rr_cursor.wrapping_add(1);
1000 idx
1001 }
1002
1003 fn enqueue_with_route(
1004 &mut self,
1005 shard_id: usize,
1006 task: MacrotaskKind,
1007 ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1008 let global_seq = self.next_global_seq();
1009 let shard_seq = self.next_shard_seq(shard_id);
1010 let envelope = ReactorEnvelope::new(global_seq, shard_seq, shard_id, task);
1011 let Some(lane) = self.lanes.get_mut(shard_id) else {
1012 self.rejected_enqueues = self.rejected_enqueues.saturating_add(1);
1013 return Err(ReactorBackpressure {
1014 shard_id,
1015 depth: 0,
1016 capacity: 0,
1017 });
1018 };
1019 match lane.push(envelope.clone()) {
1020 Ok(()) => Ok(envelope),
1021 Err(depth) => {
1022 self.rejected_enqueues = self.rejected_enqueues.saturating_add(1);
1023 Err(ReactorBackpressure {
1024 shard_id,
1025 depth,
1026 capacity: lane.capacity,
1027 })
1028 }
1029 }
1030 }
1031
1032 pub fn enqueue_hostcall_complete(
1034 &mut self,
1035 call_id: String,
1036 outcome: HostcallOutcome,
1037 ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1038 let shard_id = self.hash_route(&call_id);
1039 self.enqueue_with_route(
1040 shard_id,
1041 MacrotaskKind::HostcallComplete { call_id, outcome },
1042 )
1043 }
1044
1045 pub fn enqueue_event(
1047 &mut self,
1048 event_id: String,
1049 payload: serde_json::Value,
1050 ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1051 let shard_id = self.rr_route();
1052 self.enqueue_with_route(shard_id, MacrotaskKind::InboundEvent { event_id, payload })
1053 }
1054
1055 pub fn drain_shard(&mut self, shard_id: usize, budget: usize) -> Vec<ReactorEnvelope> {
1057 let Some(lane) = self.lanes.get_mut(shard_id) else {
1058 return Vec::new();
1059 };
1060 let mut drained = Vec::with_capacity(budget.min(lane.len()));
1061 for _ in 0..budget {
1062 let Some(item) = lane.pop() else {
1063 break;
1064 };
1065 drained.push(item);
1066 }
1067 drained
1068 }
1069
1070 pub fn drain_global_order(&mut self, budget: usize) -> Vec<ReactorEnvelope> {
1072 let mut drained = Vec::with_capacity(budget);
1073 for _ in 0..budget {
1074 let mut best_lane: Option<usize> = None;
1075 let mut best_seq: Option<Seq> = None;
1076 for (idx, lane) in self.lanes.iter().enumerate() {
1077 let Some(front) = lane.front() else {
1078 continue;
1079 };
1080 if best_seq.is_none_or(|seq| front.global_seq < seq) {
1081 best_seq = Some(front.global_seq);
1082 best_lane = Some(idx);
1083 }
1084 }
1085 let Some(chosen_lane) = best_lane else {
1086 break;
1087 };
1088 if let Some(item) = self.lanes[chosen_lane].pop() {
1089 drained.push(item);
1090 }
1091 }
1092 drained
1093 }
1094}
1095
1096#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1102pub struct HugepageConfig {
1103 pub page_size_bytes: usize,
1105 pub enabled: bool,
1107}
1108
1109impl Default for HugepageConfig {
1110 fn default() -> Self {
1111 Self {
1112 page_size_bytes: 2 * 1024 * 1024, enabled: true,
1114 }
1115 }
1116}
1117
1118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1120pub enum HugepageFallbackReason {
1121 Disabled,
1123 DetectionUnavailable,
1125 InsufficientHugepages,
1127 AlignmentMismatch,
1129}
1130
1131impl HugepageFallbackReason {
1132 #[must_use]
1133 pub const fn as_code(self) -> &'static str {
1134 match self {
1135 Self::Disabled => "hugepage_disabled",
1136 Self::DetectionUnavailable => "detection_unavailable",
1137 Self::InsufficientHugepages => "insufficient_hugepages",
1138 Self::AlignmentMismatch => "alignment_mismatch",
1139 }
1140 }
1141}
1142
1143#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1145pub struct HugepageStatus {
1146 pub total_pages: u64,
1148 pub free_pages: u64,
1150 pub page_size_bytes: usize,
1152 pub active: bool,
1154 pub fallback_reason: Option<HugepageFallbackReason>,
1156}
1157
1158impl HugepageStatus {
1159 #[must_use]
1161 pub const fn evaluate(config: &HugepageConfig, total: u64, free: u64) -> Self {
1162 if !config.enabled {
1163 return Self {
1164 total_pages: total,
1165 free_pages: free,
1166 page_size_bytes: config.page_size_bytes,
1167 active: false,
1168 fallback_reason: Some(HugepageFallbackReason::Disabled),
1169 };
1170 }
1171 if total == 0 && free == 0 {
1172 return Self {
1173 total_pages: 0,
1174 free_pages: 0,
1175 page_size_bytes: config.page_size_bytes,
1176 active: false,
1177 fallback_reason: Some(HugepageFallbackReason::DetectionUnavailable),
1178 };
1179 }
1180 if free == 0 {
1181 return Self {
1182 total_pages: total,
1183 free_pages: 0,
1184 page_size_bytes: config.page_size_bytes,
1185 active: false,
1186 fallback_reason: Some(HugepageFallbackReason::InsufficientHugepages),
1187 };
1188 }
1189 Self {
1190 total_pages: total,
1191 free_pages: free,
1192 page_size_bytes: config.page_size_bytes,
1193 active: true,
1194 fallback_reason: None,
1195 }
1196 }
1197
1198 #[must_use]
1200 pub fn as_json(&self) -> serde_json::Value {
1201 serde_json::json!({
1202 "total_pages": self.total_pages,
1203 "free_pages": self.free_pages,
1204 "page_size_bytes": self.page_size_bytes,
1205 "active": self.active,
1206 "fallback_reason": self.fallback_reason.map(HugepageFallbackReason::as_code),
1207 })
1208 }
1209}
1210
1211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1213pub struct NumaSlabConfig {
1214 pub slab_capacity: usize,
1216 pub entry_size_bytes: usize,
1218 pub hugepage: HugepageConfig,
1220}
1221
1222impl Default for NumaSlabConfig {
1223 fn default() -> Self {
1224 Self {
1225 slab_capacity: 256,
1226 entry_size_bytes: 512,
1227 hugepage: HugepageConfig::default(),
1228 }
1229 }
1230}
1231
1232impl NumaSlabConfig {
1233 #[must_use]
1234 pub const fn slab_footprint_bytes(&self) -> Option<usize> {
1235 self.slab_capacity.checked_mul(self.entry_size_bytes)
1236 }
1237
1238 #[must_use]
1239 pub const fn hugepage_alignment_ok(&self) -> bool {
1240 if !self.hugepage.enabled {
1241 return true;
1242 }
1243 let page = self.hugepage.page_size_bytes;
1244 if page == 0 {
1245 return false;
1246 }
1247 match self.slab_footprint_bytes() {
1248 Some(bytes) => bytes != 0 && bytes % page == 0,
1249 None => false,
1250 }
1251 }
1252
1253 #[must_use]
1254 pub const fn alignment_mismatch_status(&self) -> HugepageStatus {
1255 HugepageStatus {
1256 total_pages: 0,
1257 free_pages: 0,
1258 page_size_bytes: self.hugepage.page_size_bytes,
1259 active: false,
1260 fallback_reason: Some(HugepageFallbackReason::AlignmentMismatch),
1261 }
1262 }
1263}
1264
1265#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1270pub struct NumaSlabHandle {
1271 pub node_id: usize,
1273 pub slot_index: usize,
1275 pub generation: u64,
1277}
1278
1279#[derive(Debug, Clone)]
1281pub struct NumaSlab {
1282 node_id: usize,
1283 capacity: usize,
1284 generations: Vec<u64>,
1285 allocated: Vec<bool>,
1286 free_list: Vec<usize>,
1287 total_allocs: u64,
1289 total_frees: u64,
1290 high_water_mark: usize,
1291}
1292
1293impl NumaSlab {
1294 #[must_use]
1296 pub fn new(node_id: usize, capacity: usize) -> Self {
1297 let capacity = capacity.max(1);
1298 let mut free_list = Vec::with_capacity(capacity);
1299 for idx in (0..capacity).rev() {
1301 free_list.push(idx);
1302 }
1303 Self {
1304 node_id,
1305 capacity,
1306 generations: vec![0; capacity],
1307 allocated: vec![false; capacity],
1308 free_list,
1309 total_allocs: 0,
1310 total_frees: 0,
1311 high_water_mark: 0,
1312 }
1313 }
1314
1315 #[must_use]
1317 pub fn in_use(&self) -> usize {
1318 self.capacity.saturating_sub(self.free_list.len())
1319 }
1320
1321 #[must_use]
1323 pub fn has_capacity(&self) -> bool {
1324 !self.free_list.is_empty()
1325 }
1326
1327 pub fn allocate(&mut self) -> Option<NumaSlabHandle> {
1329 let slot_index = self.free_list.pop()?;
1330 self.allocated[slot_index] = true;
1331 self.generations[slot_index] = self.generations[slot_index].saturating_add(1);
1332 self.total_allocs = self.total_allocs.saturating_add(1);
1333 self.high_water_mark = self.high_water_mark.max(self.in_use());
1334 Some(NumaSlabHandle {
1335 node_id: self.node_id,
1336 slot_index,
1337 generation: self.generations[slot_index],
1338 })
1339 }
1340
1341 pub fn deallocate(&mut self, handle: &NumaSlabHandle) -> bool {
1346 if handle.node_id != self.node_id {
1347 return false;
1348 }
1349 if handle.slot_index >= self.capacity {
1350 return false;
1351 }
1352 if !self.allocated[handle.slot_index] {
1353 return false;
1354 }
1355 if self.generations[handle.slot_index] != handle.generation {
1356 return false;
1357 }
1358 self.allocated[handle.slot_index] = false;
1359 self.free_list.push(handle.slot_index);
1360 self.total_frees = self.total_frees.saturating_add(1);
1361 true
1362 }
1363}
1364
1365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1367pub enum CrossNodeReason {
1368 LocalExhausted,
1370}
1371
1372impl CrossNodeReason {
1373 #[must_use]
1374 pub const fn as_code(self) -> &'static str {
1375 match self {
1376 Self::LocalExhausted => "local_exhausted",
1377 }
1378 }
1379}
1380
1381#[derive(Debug, Clone)]
1383pub struct NumaSlabPool {
1384 slabs: Vec<NumaSlab>,
1385 config: NumaSlabConfig,
1386 hugepage_status: HugepageStatus,
1387 cross_node_allocs: u64,
1388 hugepage_backed_allocs: u64,
1389}
1390
1391impl NumaSlabPool {
1392 #[must_use]
1394 pub fn from_manifest(manifest: &ReactorPlacementManifest, config: NumaSlabConfig) -> Self {
1395 let mut node_ids = manifest
1396 .bindings
1397 .iter()
1398 .map(|b| b.numa_node)
1399 .collect::<Vec<_>>();
1400 node_ids.sort_unstable();
1401 node_ids.dedup();
1402
1403 if node_ids.is_empty() {
1404 node_ids.push(0);
1405 }
1406
1407 let slabs = node_ids
1408 .iter()
1409 .map(|&node_id| NumaSlab::new(node_id, config.slab_capacity))
1410 .collect();
1411
1412 let hugepage_status = if config.hugepage.enabled && !config.hugepage_alignment_ok() {
1415 config.alignment_mismatch_status()
1416 } else {
1417 HugepageStatus::evaluate(&config.hugepage, 0, 0)
1418 };
1419
1420 Self {
1421 slabs,
1422 config,
1423 hugepage_status,
1424 cross_node_allocs: 0,
1425 hugepage_backed_allocs: 0,
1426 }
1427 }
1428
1429 pub const fn set_hugepage_status(&mut self, status: HugepageStatus) {
1431 self.hugepage_status = if !self.config.hugepage.enabled {
1432 HugepageStatus::evaluate(&self.config.hugepage, status.total_pages, status.free_pages)
1433 } else if !self.config.hugepage_alignment_ok() {
1434 self.config.alignment_mismatch_status()
1435 } else {
1436 status
1437 };
1438 }
1439
1440 #[must_use]
1442 pub fn node_count(&self) -> usize {
1443 self.slabs.len()
1444 }
1445
1446 pub fn allocate(
1451 &mut self,
1452 preferred_node: usize,
1453 ) -> Option<(NumaSlabHandle, Option<CrossNodeReason>)> {
1454 if let Some(slab) = self.slabs.iter_mut().find(|s| s.node_id == preferred_node) {
1456 if let Some(handle) = slab.allocate() {
1457 if self.hugepage_status.active {
1458 self.hugepage_backed_allocs = self.hugepage_backed_allocs.saturating_add(1);
1459 }
1460 return Some((handle, None));
1461 }
1462 }
1463 for slab in &mut self.slabs {
1465 if slab.node_id == preferred_node {
1466 continue;
1467 }
1468 if let Some(handle) = slab.allocate() {
1469 self.cross_node_allocs = self.cross_node_allocs.saturating_add(1);
1470 if self.hugepage_status.active {
1471 self.hugepage_backed_allocs = self.hugepage_backed_allocs.saturating_add(1);
1472 }
1473 return Some((handle, Some(CrossNodeReason::LocalExhausted)));
1474 }
1475 }
1476 None
1477 }
1478
1479 pub fn deallocate(&mut self, handle: &NumaSlabHandle) -> bool {
1483 for slab in &mut self.slabs {
1484 if slab.node_id == handle.node_id {
1485 return slab.deallocate(handle);
1486 }
1487 }
1488 false
1489 }
1490
1491 #[must_use]
1493 pub fn telemetry(&self) -> NumaSlabTelemetry {
1494 let per_node = self
1495 .slabs
1496 .iter()
1497 .map(|slab| NumaSlabNodeTelemetry {
1498 node_id: slab.node_id,
1499 capacity: slab.capacity,
1500 in_use: slab.in_use(),
1501 total_allocs: slab.total_allocs,
1502 total_frees: slab.total_frees,
1503 high_water_mark: slab.high_water_mark,
1504 })
1505 .collect();
1506 NumaSlabTelemetry {
1507 per_node,
1508 cross_node_allocs: self.cross_node_allocs,
1509 hugepage_backed_allocs: self.hugepage_backed_allocs,
1510 hugepage_status: self.hugepage_status,
1511 config: self.config,
1512 }
1513 }
1514}
1515
1516#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1518pub struct NumaSlabNodeTelemetry {
1519 pub node_id: usize,
1520 pub capacity: usize,
1521 pub in_use: usize,
1522 pub total_allocs: u64,
1523 pub total_frees: u64,
1524 pub high_water_mark: usize,
1525}
1526
1527#[derive(Debug, Clone, PartialEq, Eq)]
1529pub struct NumaSlabTelemetry {
1530 pub per_node: Vec<NumaSlabNodeTelemetry>,
1531 pub cross_node_allocs: u64,
1532 pub hugepage_backed_allocs: u64,
1533 pub hugepage_status: HugepageStatus,
1534 pub config: NumaSlabConfig,
1535}
1536
1537impl NumaSlabTelemetry {
1538 const RATIO_SCALE_BPS: u64 = 10_000;
1539
1540 #[must_use]
1541 fn ratio_basis_points(numerator: u64, denominator: u64) -> u64 {
1542 if denominator == 0 {
1543 return 0;
1544 }
1545 let scaled =
1546 (u128::from(numerator) * u128::from(Self::RATIO_SCALE_BPS)) / u128::from(denominator);
1547 u64::try_from(scaled).unwrap_or(Self::RATIO_SCALE_BPS)
1548 }
1549
1550 #[must_use]
1551 const fn pressure_band(value_bps: u64) -> &'static str {
1552 if value_bps >= 7_500 {
1553 "high"
1554 } else if value_bps >= 2_500 {
1555 "medium"
1556 } else {
1557 "low"
1558 }
1559 }
1560
1561 #[must_use]
1563 pub fn as_json(&self) -> serde_json::Value {
1564 let total_allocs: u64 = self.per_node.iter().map(|n| n.total_allocs).sum();
1565 let total_frees: u64 = self.per_node.iter().map(|n| n.total_frees).sum();
1566 let total_in_use: usize = self.per_node.iter().map(|n| n.in_use).sum();
1567 let total_capacity: usize = self.per_node.iter().map(|n| n.capacity).sum();
1568 let total_high_water: usize = self.per_node.iter().map(|n| n.high_water_mark).sum();
1569 let remote_allocs = self.cross_node_allocs.min(total_allocs);
1570 let local_allocs = total_allocs.saturating_sub(remote_allocs);
1571 let local_ratio_bps = Self::ratio_basis_points(local_allocs, total_allocs);
1572 let remote_ratio_bps = Self::ratio_basis_points(remote_allocs, total_allocs);
1573 let hugepage_backed_allocs = self.hugepage_backed_allocs.min(total_allocs);
1574 let hugepage_hit_rate_bps = Self::ratio_basis_points(hugepage_backed_allocs, total_allocs);
1575 let total_capacity_u64 = u64::try_from(total_capacity).unwrap_or(u64::MAX);
1576 let total_in_use_u64 = u64::try_from(total_in_use).unwrap_or(u64::MAX);
1577 let total_high_water_u64 = u64::try_from(total_high_water).unwrap_or(u64::MAX);
1578 let occupancy_pressure_bps = Self::ratio_basis_points(total_in_use_u64, total_capacity_u64);
1579 let cache_miss_pressure_bps =
1580 Self::ratio_basis_points(total_high_water_u64, total_capacity_u64);
1581 let tlb_miss_pressure_bps = remote_ratio_bps;
1583 let cross_node_fallback_reason = if self.cross_node_allocs > 0 {
1584 Some(CrossNodeReason::LocalExhausted.as_code())
1585 } else {
1586 None
1587 };
1588 serde_json::json!({
1589 "node_count": self.per_node.len(),
1590 "total_allocs": total_allocs,
1591 "total_frees": total_frees,
1592 "total_in_use": total_in_use,
1593 "cross_node_allocs": self.cross_node_allocs,
1594 "hugepage_backed_allocs": hugepage_backed_allocs,
1595 "local_allocs": local_allocs,
1596 "remote_allocs": remote_allocs,
1597 "allocation_ratio_bps": {
1598 "scale": Self::RATIO_SCALE_BPS,
1599 "local": local_ratio_bps,
1600 "remote": remote_ratio_bps,
1601 },
1602 "hugepage_hit_rate_bps": {
1603 "scale": Self::RATIO_SCALE_BPS,
1604 "value": hugepage_hit_rate_bps,
1605 },
1606 "latency_proxies_bps": {
1607 "scale": Self::RATIO_SCALE_BPS,
1608 "tlb_miss_pressure": tlb_miss_pressure_bps,
1609 "cache_miss_pressure": cache_miss_pressure_bps,
1610 "occupancy_pressure": occupancy_pressure_bps,
1611 },
1612 "pressure_bands": {
1613 "tlb_miss": Self::pressure_band(tlb_miss_pressure_bps),
1614 "cache_miss": Self::pressure_band(cache_miss_pressure_bps),
1615 "occupancy": Self::pressure_band(occupancy_pressure_bps),
1616 },
1617 "fallback_reasons": {
1618 "cross_node": cross_node_fallback_reason,
1619 "hugepage": self.hugepage_status.fallback_reason.map(HugepageFallbackReason::as_code),
1620 },
1621 "config": {
1622 "slab_capacity": self.config.slab_capacity,
1623 "entry_size_bytes": self.config.entry_size_bytes,
1624 "hugepage_enabled": self.config.hugepage.enabled,
1625 "hugepage_page_size_bytes": self.config.hugepage.page_size_bytes,
1626 },
1627 "hugepage": self.hugepage_status.as_json(),
1628 "per_node": self.per_node.iter().map(|n| serde_json::json!({
1629 "node_id": n.node_id,
1630 "capacity": n.capacity,
1631 "in_use": n.in_use,
1632 "total_allocs": n.total_allocs,
1633 "total_frees": n.total_frees,
1634 "high_water_mark": n.high_water_mark,
1635 })).collect::<Vec<_>>(),
1636 })
1637 }
1638}
1639
1640#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1646pub enum AffinityEnforcement {
1647 Advisory,
1649 Strict,
1651 Disabled,
1653}
1654
1655impl AffinityEnforcement {
1656 #[must_use]
1657 pub const fn as_code(self) -> &'static str {
1658 match self {
1659 Self::Advisory => "advisory",
1660 Self::Strict => "strict",
1661 Self::Disabled => "disabled",
1662 }
1663 }
1664}
1665
1666#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1668pub struct ThreadAffinityAdvice {
1669 pub shard_id: usize,
1670 pub recommended_core: usize,
1671 pub recommended_numa_node: usize,
1672 pub enforcement: AffinityEnforcement,
1673}
1674
1675impl ThreadAffinityAdvice {
1676 #[must_use]
1678 pub fn as_json(&self) -> serde_json::Value {
1679 serde_json::json!({
1680 "shard_id": self.shard_id,
1681 "recommended_core": self.recommended_core,
1682 "recommended_numa_node": self.recommended_numa_node,
1683 "enforcement": self.enforcement.as_code(),
1684 })
1685 }
1686}
1687
1688impl ReactorPlacementManifest {
1689 #[must_use]
1691 pub fn affinity_advice(&self, enforcement: AffinityEnforcement) -> Vec<ThreadAffinityAdvice> {
1692 self.bindings
1693 .iter()
1694 .map(|binding| ThreadAffinityAdvice {
1695 shard_id: binding.shard_id,
1696 recommended_core: binding.core_id,
1697 recommended_numa_node: binding.numa_node,
1698 enforcement,
1699 })
1700 .collect()
1701 }
1702
1703 #[must_use]
1705 pub fn numa_node_for_shard(&self, shard_id: usize) -> Option<usize> {
1706 self.bindings
1707 .iter()
1708 .find(|b| b.shard_id == shard_id)
1709 .map(|b| b.numa_node)
1710 }
1711}
1712
1713impl ReactorMesh {
1718 #[must_use]
1720 pub fn preferred_numa_node(&self, shard_id: usize) -> usize {
1721 self.placement_manifest
1722 .numa_node_for_shard(shard_id)
1723 .unwrap_or(0)
1724 }
1725
1726 #[must_use]
1728 pub fn affinity_advice(&self, enforcement: AffinityEnforcement) -> Vec<ThreadAffinityAdvice> {
1729 self.placement_manifest.affinity_advice(enforcement)
1730 }
1731}
1732
1733impl<C: Clock> fmt::Debug for Scheduler<C> {
1734 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1735 f.debug_struct("Scheduler")
1736 .field("seq", &self.seq)
1737 .field("macrotask_count", &self.macrotask_queue.len())
1738 .field("timer_count", &self.timer_heap.len())
1739 .field("next_timer_id", &self.next_timer_id)
1740 .field("cancelled_timers", &self.cancelled_timers.len())
1741 .finish_non_exhaustive()
1742 }
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747 use super::*;
1748
1749 #[test]
1750 fn seq_ordering() {
1751 let a = Seq::zero();
1752 let b = a.next();
1753 let c = b.next();
1754
1755 assert!(a < b);
1756 assert!(b < c);
1757 assert_eq!(a.value(), 0);
1758 assert_eq!(b.value(), 1);
1759 assert_eq!(c.value(), 2);
1760 }
1761
1762 #[test]
1763 fn seq_next_saturates_at_u64_max() {
1764 let max = Seq(u64::MAX);
1765 assert_eq!(max.next(), max);
1766 }
1767
1768 #[test]
1769 fn timer_ordering() {
1770 let t1 = TimerEntry::new(1, 100, Seq(0));
1772 let t2 = TimerEntry::new(2, 200, Seq(1));
1773
1774 assert!(t1 > t2); let t3 = TimerEntry::new(3, 100, Seq(5));
1778 let t4 = TimerEntry::new(4, 100, Seq(10));
1779
1780 assert!(t3 > t4); }
1782
1783 #[test]
1784 fn deterministic_clock() {
1785 let clock = DeterministicClock::new(1000);
1786 assert_eq!(clock.now_ms(), 1000);
1787
1788 clock.advance(500);
1789 assert_eq!(clock.now_ms(), 1500);
1790
1791 clock.set(2000);
1792 assert_eq!(clock.now_ms(), 2000);
1793 }
1794
1795 #[test]
1796 fn scheduler_basic_timer() {
1797 let clock = DeterministicClock::new(0);
1798 let mut sched = Scheduler::with_clock(clock);
1799
1800 let timer_id = sched.set_timeout(100);
1802 assert_eq!(timer_id, 1);
1803 assert_eq!(sched.timer_count(), 1);
1804 assert!(!sched.macrotask_queue.is_empty() || sched.timer_count() > 0);
1805
1806 let task = sched.tick();
1808 assert!(task.is_none());
1809
1810 sched.clock.advance(150);
1812 let task = sched.tick();
1813 assert!(task.is_some());
1814 match task.unwrap().kind {
1815 MacrotaskKind::TimerFired { timer_id: id } => assert_eq!(id, timer_id),
1816 other => unreachable!("Expected TimerFired, got {other:?}"),
1817 }
1818 }
1819
1820 #[test]
1821 fn scheduler_timer_id_wraps_after_u64_max() {
1822 let clock = DeterministicClock::new(0);
1823 let mut sched = Scheduler::with_clock(clock);
1824 sched.next_timer_id = u64::MAX;
1825
1826 let first = sched.set_timeout(10);
1827 let second = sched.set_timeout(20);
1828
1829 assert_eq!(first, u64::MAX);
1830 assert_eq!(second, 1);
1831 }
1832
1833 #[test]
1834 fn scheduler_timer_id_wrap_preserves_cancellation_semantics() {
1835 let clock = DeterministicClock::new(0);
1836 let mut sched = Scheduler::with_clock(clock);
1837 sched.next_timer_id = u64::MAX;
1838
1839 let max_id = sched.set_timeout(10);
1840 let wrapped_id = sched.set_timeout(20);
1841
1842 assert_eq!(max_id, u64::MAX);
1843 assert_eq!(wrapped_id, 1);
1844 assert!(sched.clear_timeout(max_id));
1845 assert!(sched.clear_timeout(wrapped_id));
1846
1847 sched.clock.advance(25);
1848 assert!(sched.tick().is_none());
1849 assert!(sched.tick().is_none());
1850 }
1851
1852 #[test]
1853 fn scheduler_timer_ordering() {
1854 let clock = DeterministicClock::new(0);
1855 let mut sched = Scheduler::with_clock(clock);
1856
1857 let t3 = sched.set_timeout(300);
1859 let t1 = sched.set_timeout(100);
1860 let t2 = sched.set_timeout(200);
1861
1862 sched.clock.advance(400);
1864
1865 let task1 = sched.tick().unwrap();
1867 let task2 = sched.tick().unwrap();
1868 let task3 = sched.tick().unwrap();
1869
1870 match task1.kind {
1871 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t1),
1872 other => unreachable!("Expected t1, got {other:?}"),
1873 }
1874 match task2.kind {
1875 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1876 other => unreachable!("Expected t2, got {other:?}"),
1877 }
1878 match task3.kind {
1879 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t3),
1880 other => unreachable!("Expected t3, got {other:?}"),
1881 }
1882 }
1883
1884 #[test]
1885 fn scheduler_same_deadline_seq_ordering() {
1886 let clock = DeterministicClock::new(0);
1887 let mut sched = Scheduler::with_clock(clock);
1888
1889 let t1 = sched.set_timeout(100);
1891 let t2 = sched.set_timeout(100);
1892 let t3 = sched.set_timeout(100);
1893
1894 sched.clock.advance(150);
1895
1896 let task1 = sched.tick().unwrap();
1897 let task2 = sched.tick().unwrap();
1898 let task3 = sched.tick().unwrap();
1899
1900 match task1.kind {
1902 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t1),
1903 other => unreachable!("Expected t1, got {other:?}"),
1904 }
1905 match task2.kind {
1906 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1907 other => unreachable!("Expected t2, got {other:?}"),
1908 }
1909 match task3.kind {
1910 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t3),
1911 other => unreachable!("Expected t3, got {other:?}"),
1912 }
1913 }
1914
1915 #[test]
1916 fn scheduler_cancel_timer() {
1917 let clock = DeterministicClock::new(0);
1918 let mut sched = Scheduler::with_clock(clock);
1919
1920 let t1 = sched.set_timeout(100);
1921 let t2 = sched.set_timeout(200);
1922
1923 assert!(sched.clear_timeout(t1));
1925
1926 sched.clock.advance(250);
1928
1929 let task = sched.tick().unwrap();
1931 match task.kind {
1932 MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1933 other => unreachable!("Expected t2, got {other:?}"),
1934 }
1935
1936 assert!(sched.tick().is_none());
1938 }
1939
1940 #[test]
1941 fn scheduler_hostcall_completion() {
1942 let clock = DeterministicClock::new(0);
1943 let mut sched = Scheduler::with_clock(clock);
1944
1945 sched.enqueue_hostcall_complete(
1946 "call-1".to_string(),
1947 HostcallOutcome::Success(serde_json::json!({"result": 42})),
1948 );
1949
1950 let task = sched.tick().unwrap();
1951 match task.kind {
1952 MacrotaskKind::HostcallComplete { call_id, outcome } => {
1953 assert_eq!(call_id, "call-1");
1954 match outcome {
1955 HostcallOutcome::Success(v) => assert_eq!(v["result"], 42),
1956 other => unreachable!("Expected success, got {other:?}"),
1957 }
1958 }
1959 other => unreachable!("Expected HostcallComplete, got {other:?}"),
1960 }
1961 }
1962
1963 #[test]
1964 fn scheduler_stream_chunk_sequence_and_finality_invariants() {
1965 let clock = DeterministicClock::new(0);
1966 let mut sched = Scheduler::with_clock(clock);
1967
1968 sched.enqueue_stream_chunk(
1969 "call-stream".to_string(),
1970 0,
1971 serde_json::json!({ "part": "a" }),
1972 false,
1973 );
1974 sched.enqueue_stream_chunk(
1975 "call-stream".to_string(),
1976 1,
1977 serde_json::json!({ "part": "b" }),
1978 false,
1979 );
1980 sched.enqueue_stream_chunk(
1981 "call-stream".to_string(),
1982 2,
1983 serde_json::json!({ "part": "c" }),
1984 true,
1985 );
1986
1987 let mut seen = Vec::new();
1988 while let Some(task) = sched.tick() {
1989 let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
1990 unreachable!("expected hostcall completion task");
1991 };
1992 let HostcallOutcome::StreamChunk {
1993 sequence,
1994 chunk,
1995 is_final,
1996 } = outcome
1997 else {
1998 unreachable!("expected stream chunk outcome");
1999 };
2000 seen.push((call_id, sequence, chunk, is_final));
2001 }
2002
2003 assert_eq!(seen.len(), 3);
2004 assert!(
2005 seen.iter()
2006 .all(|(call_id, _, _, _)| call_id == "call-stream")
2007 );
2008 assert_eq!(seen[0].1, 0);
2009 assert_eq!(seen[1].1, 1);
2010 assert_eq!(seen[2].1, 2);
2011 assert_eq!(seen[0].2, serde_json::json!({ "part": "a" }));
2012 assert_eq!(seen[1].2, serde_json::json!({ "part": "b" }));
2013 assert_eq!(seen[2].2, serde_json::json!({ "part": "c" }));
2014
2015 let final_count = seen.iter().filter(|(_, _, _, is_final)| *is_final).count();
2016 assert_eq!(final_count, 1, "expected exactly one final chunk");
2017 assert!(seen[2].3, "final chunk must be last");
2018 }
2019
2020 #[test]
2021 fn scheduler_stream_chunks_multi_call_interleaving_is_deterministic() {
2022 let clock = DeterministicClock::new(0);
2023 let mut sched = Scheduler::with_clock(clock);
2024
2025 sched.enqueue_stream_chunk("call-a".to_string(), 0, serde_json::json!("a0"), false);
2026 sched.enqueue_stream_chunk("call-b".to_string(), 0, serde_json::json!("b0"), false);
2027 sched.enqueue_stream_chunk("call-a".to_string(), 1, serde_json::json!("a1"), true);
2028 sched.enqueue_stream_chunk("call-b".to_string(), 1, serde_json::json!("b1"), true);
2029
2030 let mut trace = Vec::new();
2031 while let Some(task) = sched.tick() {
2032 let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
2033 unreachable!("expected hostcall completion task");
2034 };
2035 let HostcallOutcome::StreamChunk {
2036 sequence, is_final, ..
2037 } = outcome
2038 else {
2039 unreachable!("expected stream chunk outcome");
2040 };
2041 trace.push((call_id, sequence, is_final));
2042 }
2043
2044 assert_eq!(
2045 trace,
2046 vec![
2047 ("call-a".to_string(), 0, false),
2048 ("call-b".to_string(), 0, false),
2049 ("call-a".to_string(), 1, true),
2050 ("call-b".to_string(), 1, true),
2051 ]
2052 );
2053 }
2054
2055 #[test]
2056 fn scheduler_event_ordering() {
2057 let clock = DeterministicClock::new(0);
2058 let mut sched = Scheduler::with_clock(clock);
2059
2060 sched.enqueue_event("evt-1".to_string(), serde_json::json!({"n": 1}));
2062 sched.enqueue_event("evt-2".to_string(), serde_json::json!({"n": 2}));
2063
2064 let task1 = sched.tick().unwrap();
2066 let task2 = sched.tick().unwrap();
2067
2068 match task1.kind {
2069 MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-1"),
2070 other => unreachable!("Expected evt-1, got {other:?}"),
2071 }
2072 match task2.kind {
2073 MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-2"),
2074 other => unreachable!("Expected evt-2, got {other:?}"),
2075 }
2076 }
2077
2078 #[test]
2079 fn scheduler_mixed_tasks_ordering() {
2080 let clock = DeterministicClock::new(0);
2081 let mut sched = Scheduler::with_clock(clock);
2082
2083 let _t1 = sched.set_timeout(50);
2085
2086 sched.enqueue_event("evt-1".to_string(), serde_json::json!({}));
2088
2089 sched.clock.advance(100);
2091
2092 let task1 = sched.tick().unwrap();
2094 match task1.kind {
2095 MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-1"),
2096 other => unreachable!("Expected event first, got {other:?}"),
2097 }
2098
2099 let task2 = sched.tick().unwrap();
2101 match task2.kind {
2102 MacrotaskKind::TimerFired { .. } => {}
2103 other => unreachable!("Expected timer second, got {other:?}"),
2104 }
2105 }
2106
2107 #[test]
2108 fn scheduler_invariant_single_macrotask_per_tick() {
2109 let clock = DeterministicClock::new(0);
2110 let mut sched = Scheduler::with_clock(clock);
2111
2112 sched.enqueue_event("evt-1".to_string(), serde_json::json!({}));
2113 sched.enqueue_event("evt-2".to_string(), serde_json::json!({}));
2114 sched.enqueue_event("evt-3".to_string(), serde_json::json!({}));
2115
2116 assert!(sched.tick().is_some());
2118 assert_eq!(sched.macrotask_count(), 2);
2119
2120 assert!(sched.tick().is_some());
2121 assert_eq!(sched.macrotask_count(), 1);
2122
2123 assert!(sched.tick().is_some());
2124 assert_eq!(sched.macrotask_count(), 0);
2125
2126 assert!(sched.tick().is_none());
2127 }
2128
2129 #[test]
2130 fn scheduler_next_timer_deadline() {
2131 let clock = DeterministicClock::new(0);
2132 let mut sched = Scheduler::with_clock(clock);
2133
2134 assert!(sched.next_timer_deadline().is_none());
2135
2136 sched.set_timeout(200);
2137 sched.set_timeout(100);
2138 sched.set_timeout(300);
2139
2140 assert_eq!(sched.next_timer_deadline(), Some(100));
2141 assert_eq!(sched.time_until_next_timer(), Some(100));
2142
2143 sched.clock.advance(50);
2144 assert_eq!(sched.time_until_next_timer(), Some(50));
2145 }
2146
2147 #[test]
2148 fn scheduler_next_timer_skips_cancelled_timers() {
2149 let clock = DeterministicClock::new(0);
2150 let mut sched = Scheduler::with_clock(clock);
2151
2152 let t1 = sched.set_timeout(100);
2153 let _t2 = sched.set_timeout(200);
2154 let _t3 = sched.set_timeout(300);
2155
2156 assert!(sched.clear_timeout(t1));
2157 assert_eq!(sched.next_timer_deadline(), Some(200));
2158 assert_eq!(sched.time_until_next_timer(), Some(200));
2159 }
2160
2161 #[test]
2162 fn scheduler_debug_format() {
2163 let clock = DeterministicClock::new(0);
2164 let sched = Scheduler::with_clock(clock);
2165 let debug = format!("{sched:?}");
2166 assert!(debug.contains("Scheduler"));
2167 assert!(debug.contains("seq"));
2168 }
2169
2170 #[derive(Debug, Clone)]
2171 struct XorShift64 {
2172 state: u64,
2173 }
2174
2175 impl XorShift64 {
2176 const fn new(seed: u64) -> Self {
2177 let seed = seed ^ 0x9E37_79B9_7F4A_7C15;
2179 Self { state: seed }
2180 }
2181
2182 fn next_u64(&mut self) -> u64 {
2183 let mut x = self.state;
2184 x ^= x << 13;
2185 x ^= x >> 7;
2186 x ^= x << 17;
2187 self.state = x;
2188 x
2189 }
2190
2191 fn next_range_u64(&mut self, upper_exclusive: u64) -> u64 {
2192 if upper_exclusive == 0 {
2193 return 0;
2194 }
2195 self.next_u64() % upper_exclusive
2196 }
2197
2198 fn next_usize(&mut self, upper_exclusive: usize) -> usize {
2199 let upper = u64::try_from(upper_exclusive).expect("usize fits in u64");
2200 let value = self.next_range_u64(upper);
2201 usize::try_from(value).expect("value < upper_exclusive")
2202 }
2203 }
2204
2205 fn trace_entry(task: &Macrotask) -> String {
2206 match &task.kind {
2207 MacrotaskKind::TimerFired { timer_id } => {
2208 format!("seq={}:timer:{timer_id}", task.seq.value())
2209 }
2210 MacrotaskKind::HostcallComplete { call_id, outcome } => {
2211 let outcome_tag = match outcome {
2212 HostcallOutcome::Success(_) => "ok",
2213 HostcallOutcome::Error { .. } => "err",
2214 HostcallOutcome::StreamChunk { is_final, .. } => {
2215 if *is_final {
2216 "stream_final"
2217 } else {
2218 "chunk"
2219 }
2220 }
2221 };
2222 format!("seq={}:hostcall:{call_id}:{outcome_tag}", task.seq.value())
2223 }
2224 MacrotaskKind::InboundEvent { event_id, payload } => {
2225 format!(
2226 "seq={}:event:{event_id}:payload={payload}",
2227 task.seq.value()
2228 )
2229 }
2230 }
2231 }
2232
2233 fn run_seeded_script(seed: u64) -> Vec<String> {
2234 let clock = DeterministicClock::new(0);
2235 let mut sched = Scheduler::with_clock(clock);
2236 let mut rng = XorShift64::new(seed);
2237 let mut timers = Vec::new();
2238 let mut trace = Vec::new();
2239
2240 for step in 0..256u64 {
2241 match rng.next_range_u64(6) {
2242 0 => {
2243 let delay_ms = rng.next_range_u64(250);
2244 let timer_id = sched.set_timeout(delay_ms);
2245 timers.push(timer_id);
2246 }
2247 1 => {
2248 if !timers.is_empty() {
2249 let idx = rng.next_usize(timers.len());
2250 let _cancelled = sched.clear_timeout(timers[idx]);
2251 }
2252 }
2253 2 => {
2254 let call_id = format!("call-{step}-{}", rng.next_u64());
2255 let outcome = HostcallOutcome::Success(serde_json::json!({ "step": step }));
2256 sched.enqueue_hostcall_complete(call_id, outcome);
2257 }
2258 3 => {
2259 let event_id = format!("evt-{step}");
2260 let payload = serde_json::json!({ "step": step, "entropy": rng.next_u64() });
2261 sched.enqueue_event(event_id, payload);
2262 }
2263 4 => {
2264 let delta_ms = rng.next_range_u64(50);
2265 sched.clock.advance(delta_ms);
2266 }
2267 _ => {}
2268 }
2269
2270 if rng.next_range_u64(3) == 0 {
2271 if let Some(task) = sched.tick() {
2272 trace.push(trace_entry(&task));
2273 }
2274 }
2275 }
2276
2277 for _ in 0..10_000 {
2279 if let Some(task) = sched.tick() {
2280 trace.push(trace_entry(&task));
2281 continue;
2282 }
2283
2284 let Some(next_deadline) = sched.next_timer_deadline() else {
2285 break;
2286 };
2287
2288 let now = sched.now_ms();
2289 assert!(
2290 next_deadline > now,
2291 "expected future timer deadline (deadline={next_deadline}, now={now})"
2292 );
2293 sched.clock.set(next_deadline);
2294 }
2295
2296 trace
2297 }
2298
2299 #[test]
2300 fn scheduler_seeded_trace_is_deterministic() {
2301 for seed in [0_u64, 1, 2, 3, 0xDEAD_BEEF] {
2302 let a = run_seeded_script(seed);
2303 let b = run_seeded_script(seed);
2304 assert_eq!(a, b, "trace mismatch for seed={seed}");
2305 }
2306 }
2307
2308 #[test]
2311 fn seq_display_format() {
2312 assert_eq!(format!("{}", Seq::zero()), "seq:0");
2313 assert_eq!(format!("{}", Seq::zero().next()), "seq:1");
2314 }
2315
2316 #[test]
2319 fn empty_scheduler_has_no_pending() {
2320 let sched = Scheduler::with_clock(DeterministicClock::new(0));
2321 assert!(!sched.has_pending());
2322 assert_eq!(sched.macrotask_count(), 0);
2323 assert_eq!(sched.timer_count(), 0);
2324 }
2325
2326 #[test]
2327 fn has_pending_with_timer_only() {
2328 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2329 sched.set_timeout(100);
2330 assert!(sched.has_pending());
2331 assert_eq!(sched.macrotask_count(), 0);
2332 assert_eq!(sched.timer_count(), 1);
2333 }
2334
2335 #[test]
2336 fn has_pending_with_macrotask_only() {
2337 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2338 sched.enqueue_event("e".to_string(), serde_json::json!({}));
2339 assert!(sched.has_pending());
2340 assert_eq!(sched.macrotask_count(), 1);
2341 assert_eq!(sched.timer_count(), 0);
2342 }
2343
2344 #[test]
2345 fn has_pending_ignores_cancelled_timers_without_macrotasks() {
2346 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2347 let timer = sched.set_timeout(10_000);
2348 assert!(sched.clear_timeout(timer));
2349 assert!(!sched.has_pending());
2350 }
2351
2352 #[test]
2355 fn wall_clock_returns_positive_ms() {
2356 let clock = WallClock;
2357 let now = clock.now_ms();
2358 assert!(now > 0, "WallClock should return a positive timestamp");
2359 }
2360
2361 #[test]
2364 fn clear_timeout_nonexistent_returns_false() {
2365 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2366 assert!(!sched.clear_timeout(999));
2367 assert!(
2368 sched.cancelled_timers.is_empty(),
2369 "unknown timer ids should not pollute cancelled set"
2370 );
2371 }
2372
2373 #[test]
2374 fn clear_timeout_double_cancel_returns_false() {
2375 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2376 let t = sched.set_timeout(100);
2377 assert!(sched.clear_timeout(t));
2378 assert!(!sched.clear_timeout(t));
2380 }
2381
2382 #[test]
2385 fn time_until_next_timer_none_when_empty() {
2386 let sched = Scheduler::with_clock(DeterministicClock::new(0));
2387 assert!(sched.time_until_next_timer().is_none());
2388 }
2389
2390 #[test]
2391 fn time_until_next_timer_saturates_at_zero() {
2392 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2393 sched.set_timeout(50);
2394 sched.clock.advance(100); assert_eq!(sched.time_until_next_timer(), Some(0));
2396 }
2397
2398 #[test]
2401 fn hostcall_error_outcome() {
2402 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2403 sched.enqueue_hostcall_complete(
2404 "err-call".to_string(),
2405 HostcallOutcome::Error {
2406 code: "E_TIMEOUT".to_string(),
2407 message: "Request timed out".to_string(),
2408 },
2409 );
2410
2411 let task = sched.tick().unwrap();
2412 match task.kind {
2413 MacrotaskKind::HostcallComplete { call_id, outcome } => {
2414 assert_eq!(call_id, "err-call");
2415 match outcome {
2416 HostcallOutcome::Error { code, message } => {
2417 assert_eq!(code, "E_TIMEOUT");
2418 assert_eq!(message, "Request timed out");
2419 }
2420 other => unreachable!("Expected error, got {other:?}"),
2421 }
2422 }
2423 other => unreachable!("Expected HostcallComplete, got {other:?}"),
2424 }
2425 }
2426
2427 #[test]
2430 fn timer_count_decreases_after_fire() {
2431 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2432 sched.set_timeout(50);
2433 sched.set_timeout(100);
2434 assert_eq!(sched.timer_count(), 2);
2435
2436 sched.clock.advance(75);
2437 let _task = sched.tick(); assert_eq!(sched.timer_count(), 1);
2439 }
2440
2441 #[test]
2444 fn empty_scheduler_tick_returns_none() {
2445 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2446 assert!(sched.tick().is_none());
2447 }
2448
2449 #[test]
2452 fn default_scheduler_starts_with_seq_zero() {
2453 let sched = Scheduler::new();
2454 assert_eq!(sched.current_seq(), Seq::zero());
2455 }
2456
2457 #[test]
2460 fn arc_clock_delegation() {
2461 let clock = Arc::new(DeterministicClock::new(42));
2462 assert_eq!(Clock::now_ms(&clock), 42);
2463 clock.advance(10);
2464 assert_eq!(Clock::now_ms(&clock), 52);
2465 }
2466
2467 #[test]
2470 fn timer_entry_equality_ignores_timer_id() {
2471 let a = TimerEntry::new(1, 100, Seq(5));
2472 let b = TimerEntry::new(2, 100, Seq(5));
2473 assert_eq!(a, b);
2475 }
2476
2477 #[test]
2480 fn macrotask_equality_uses_seq_only() {
2481 let a = Macrotask::new(Seq(1), MacrotaskKind::TimerFired { timer_id: 1 });
2482 let b = Macrotask::new(Seq(1), MacrotaskKind::TimerFired { timer_id: 2 });
2483 assert_eq!(a, b); }
2485
2486 #[test]
2489 fn scheduler_ten_concurrent_streams_complete_independently() {
2490 let clock = DeterministicClock::new(0);
2491 let mut sched = Scheduler::with_clock(clock);
2492 let n_streams: usize = 10;
2493 let chunks_per_stream: usize = 5;
2494
2495 for chunk_idx in 0..chunks_per_stream {
2497 for stream_idx in 0..n_streams {
2498 let is_final = chunk_idx == chunks_per_stream - 1;
2499 sched.enqueue_stream_chunk(
2500 format!("stream-{stream_idx}"),
2501 chunk_idx as u64,
2502 serde_json::json!({ "s": stream_idx, "c": chunk_idx }),
2503 is_final,
2504 );
2505 }
2506 }
2507
2508 let mut per_stream: std::collections::HashMap<String, Vec<(u64, bool)>> =
2509 std::collections::HashMap::new();
2510 while let Some(task) = sched.tick() {
2511 let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
2512 unreachable!("expected hostcall completion");
2513 };
2514 let HostcallOutcome::StreamChunk {
2515 sequence, is_final, ..
2516 } = outcome
2517 else {
2518 unreachable!("expected stream chunk");
2519 };
2520 per_stream
2521 .entry(call_id)
2522 .or_default()
2523 .push((sequence, is_final));
2524 }
2525
2526 assert_eq!(per_stream.len(), n_streams);
2527 for (call_id, chunks) in &per_stream {
2528 assert_eq!(
2529 chunks.len(),
2530 chunks_per_stream,
2531 "stream {call_id} incomplete"
2532 );
2533 for (i, (seq, _)) in chunks.iter().enumerate() {
2535 assert_eq!(*seq, i as u64, "stream {call_id}: non-monotonic at {i}");
2536 }
2537 let final_count = chunks.iter().filter(|(_, f)| *f).count();
2539 assert_eq!(
2540 final_count, 1,
2541 "stream {call_id}: expected exactly one final"
2542 );
2543 assert!(
2544 chunks.last().unwrap().1,
2545 "stream {call_id}: final must be last"
2546 );
2547 }
2548 }
2549
2550 #[test]
2551 fn scheduler_mixed_stream_nonstream_ordering() {
2552 let clock = DeterministicClock::new(0);
2553 let mut sched = Scheduler::with_clock(clock);
2554
2555 sched.enqueue_event("evt-1".to_string(), serde_json::json!({"n": 1}));
2557 sched.enqueue_stream_chunk("stream-x".to_string(), 0, serde_json::json!("data"), false);
2558 sched.enqueue_hostcall_complete(
2559 "call-y".to_string(),
2560 HostcallOutcome::Success(serde_json::json!({"ok": true})),
2561 );
2562 sched.enqueue_stream_chunk("stream-x".to_string(), 1, serde_json::json!("end"), true);
2563 sched.enqueue_event("evt-2".to_string(), serde_json::json!({"n": 2}));
2564
2565 let mut trace = Vec::new();
2566 while let Some(task) = sched.tick() {
2567 trace.push(trace_entry(&task));
2568 }
2569
2570 assert_eq!(trace.len(), 5);
2572 assert!(trace[0].contains("event:evt-1"));
2573 assert!(trace[1].contains("stream-x") && trace[1].contains("chunk"));
2574 assert!(trace[2].contains("call-y") && trace[2].contains("ok"));
2575 assert!(trace[3].contains("stream-x") && trace[3].contains("stream_final"));
2576 assert!(trace[4].contains("event:evt-2"));
2577 }
2578
2579 #[test]
2580 fn scheduler_concurrent_streams_deterministic_across_runs() {
2581 fn run_ten_streams() -> Vec<String> {
2582 let clock = DeterministicClock::new(0);
2583 let mut sched = Scheduler::with_clock(clock);
2584
2585 for chunk in 0..3_u64 {
2586 for stream in 0..10 {
2587 sched.enqueue_stream_chunk(
2588 format!("s{stream}"),
2589 chunk,
2590 serde_json::json!(chunk),
2591 chunk == 2,
2592 );
2593 }
2594 }
2595
2596 let mut trace = Vec::new();
2597 while let Some(task) = sched.tick() {
2598 trace.push(trace_entry(&task));
2599 }
2600 trace
2601 }
2602
2603 let a = run_ten_streams();
2604 let b = run_ten_streams();
2605 assert_eq!(a, b, "10-stream trace must be deterministic");
2606 assert_eq!(a.len(), 30, "expected 10 streams x 3 chunks = 30 entries");
2607 }
2608
2609 #[test]
2610 fn scheduler_stream_interleaved_with_timers() {
2611 let clock = DeterministicClock::new(0);
2612 let mut sched = Scheduler::with_clock(clock);
2613
2614 let _t = sched.set_timeout(100);
2616
2617 sched.enqueue_stream_chunk("s1".to_string(), 0, serde_json::json!("a"), false);
2619
2620 sched.clock.advance(150);
2622
2623 sched.enqueue_stream_chunk("s1".to_string(), 1, serde_json::json!("b"), true); let mut trace = Vec::new();
2627 while let Some(task) = sched.tick() {
2628 trace.push(trace_entry(&task));
2629 }
2630
2631 assert_eq!(trace.len(), 3);
2633 assert!(
2634 trace[0].contains("s1") && trace[0].contains("chunk"),
2635 "first: stream chunk 0, got: {}",
2636 trace[0]
2637 );
2638 assert!(
2639 trace[1].contains("s1") && trace[1].contains("stream_final"),
2640 "second: stream final, got: {}",
2641 trace[1]
2642 );
2643 assert!(
2644 trace[2].contains("timer"),
2645 "third: timer, got: {}",
2646 trace[2]
2647 );
2648 }
2649
2650 #[test]
2651 fn scheduler_due_timers_do_not_preempt_queued_macrotasks() {
2652 let clock = DeterministicClock::new(0);
2653 let mut sched = Scheduler::with_clock(clock);
2654
2655 let t1_id = sched.set_timeout(100);
2657
2658 sched.enqueue_event("E1".to_string(), serde_json::json!({}));
2660
2661 sched.clock.advance(100);
2663
2664 let task1 = sched.tick().expect("Should have a task");
2666
2667 let task2 = sched.tick().expect("Should have a task");
2669
2670 let seq1 = task1.seq.value();
2671 let seq2 = task2.seq.value();
2672
2673 assert!(
2675 seq1 < seq2,
2676 "Invariant I5 violation: Task execution not ordered by seq. Executed {seq1} then {seq2}"
2677 );
2678
2679 if let MacrotaskKind::InboundEvent { event_id, .. } = task1.kind {
2680 assert_eq!(event_id, "E1");
2681 } else {
2682 panic!("Expected InboundEvent first, got {:?}", task1.kind);
2683 }
2684
2685 if let MacrotaskKind::TimerFired { timer_id } = task2.kind {
2686 assert_eq!(timer_id, t1_id);
2687 } else {
2688 panic!("Expected TimerFired second, got {:?}", task2.kind);
2689 }
2690 }
2691
2692 #[test]
2693 fn reactor_mesh_hash_routing_is_stable_for_call_id() {
2694 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2695 shard_count: 8,
2696 lane_capacity: 64,
2697 topology: None,
2698 });
2699
2700 let first = mesh
2701 .enqueue_hostcall_complete(
2702 "call-affinity".to_string(),
2703 HostcallOutcome::Success(serde_json::json!({})),
2704 )
2705 .expect("first enqueue");
2706 let second = mesh
2707 .enqueue_hostcall_complete(
2708 "call-affinity".to_string(),
2709 HostcallOutcome::Success(serde_json::json!({})),
2710 )
2711 .expect("second enqueue");
2712
2713 assert_eq!(
2714 first.shard_id, second.shard_id,
2715 "call_id hash routing must preserve shard affinity"
2716 );
2717 assert_eq!(first.shard_seq + 1, second.shard_seq);
2718 }
2719
2720 #[test]
2721 fn reactor_mesh_round_robin_event_distribution_is_deterministic() {
2722 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2723 shard_count: 3,
2724 lane_capacity: 64,
2725 topology: None,
2726 });
2727
2728 let mut routed = Vec::new();
2729 for idx in 0..6 {
2730 let envelope = mesh
2731 .enqueue_event(format!("evt-{idx}"), serde_json::json!({"i": idx}))
2732 .expect("enqueue event");
2733 routed.push(envelope.shard_id);
2734 }
2735
2736 assert_eq!(routed, vec![0, 1, 2, 0, 1, 2]);
2737 }
2738
2739 #[test]
2740 fn reactor_mesh_drain_global_order_preserves_monotone_seq() {
2741 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2742 shard_count: 4,
2743 lane_capacity: 64,
2744 topology: None,
2745 });
2746
2747 let mut expected = Vec::new();
2748 expected.push(
2749 mesh.enqueue_event("evt-1".to_string(), serde_json::json!({"v": 1}))
2750 .expect("event 1")
2751 .global_seq
2752 .value(),
2753 );
2754 expected.push(
2755 mesh.enqueue_hostcall_complete(
2756 "call-a".to_string(),
2757 HostcallOutcome::Success(serde_json::json!({"ok": true})),
2758 )
2759 .expect("call-a")
2760 .global_seq
2761 .value(),
2762 );
2763 expected.push(
2764 mesh.enqueue_event("evt-2".to_string(), serde_json::json!({"v": 2}))
2765 .expect("event 2")
2766 .global_seq
2767 .value(),
2768 );
2769 expected.push(
2770 mesh.enqueue_hostcall_complete(
2771 "call-b".to_string(),
2772 HostcallOutcome::Error {
2773 code: "E_TEST".to_string(),
2774 message: "boom".to_string(),
2775 },
2776 )
2777 .expect("call-b")
2778 .global_seq
2779 .value(),
2780 );
2781
2782 let drained = mesh.drain_global_order(16);
2783 let observed = drained
2784 .iter()
2785 .map(|entry| entry.global_seq.value())
2786 .collect::<Vec<_>>();
2787 assert_eq!(observed, expected);
2788 }
2789
2790 #[test]
2791 fn reactor_mesh_backpressure_tracks_rejected_enqueues() {
2792 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2793 shard_count: 1,
2794 lane_capacity: 2,
2795 topology: None,
2796 });
2797
2798 mesh.enqueue_event("evt-0".to_string(), serde_json::json!({}))
2799 .expect("enqueue evt-0");
2800 mesh.enqueue_event("evt-1".to_string(), serde_json::json!({}))
2801 .expect("enqueue evt-1");
2802
2803 let err = mesh
2804 .enqueue_event("evt-overflow".to_string(), serde_json::json!({}))
2805 .expect_err("third enqueue should overflow");
2806 assert_eq!(err.shard_id, 0);
2807 assert_eq!(err.capacity, 2);
2808 assert_eq!(err.depth, 2);
2809
2810 let telemetry = mesh.telemetry();
2811 assert_eq!(telemetry.rejected_enqueues, 1);
2812 assert_eq!(telemetry.max_queue_depths, vec![2]);
2813 assert_eq!(telemetry.queue_depths, vec![2]);
2814 }
2815
2816 #[test]
2817 fn reactor_placement_manifest_is_deterministic_across_runs() {
2818 let topology =
2819 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
2820 let first = ReactorPlacementManifest::plan(8, Some(&topology));
2821 let second = ReactorPlacementManifest::plan(8, Some(&topology));
2822 assert_eq!(first, second);
2823 assert_eq!(first.fallback_reason, None);
2824 }
2825
2826 #[test]
2827 fn reactor_topology_snapshot_normalizes_unsorted_duplicate_pairs() {
2828 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[
2829 (7, 5),
2830 (2, 1),
2831 (4, 2),
2832 (2, 1),
2833 (1, 1),
2834 (4, 2),
2835 ]);
2836 let normalized = topology
2837 .cores
2838 .iter()
2839 .map(|core| (core.core_id, core.numa_node))
2840 .collect::<Vec<_>>();
2841 assert_eq!(normalized, vec![(1, 1), (2, 1), (4, 2), (7, 5)]);
2842 }
2843
2844 #[test]
2845 fn reactor_placement_manifest_non_contiguous_numa_ids_is_stable() {
2846 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[
2847 (11, 42),
2848 (7, 5),
2849 (9, 42),
2850 (3, 5),
2851 (11, 42),
2852 (3, 5),
2853 ]);
2854 let first = ReactorPlacementManifest::plan(8, Some(&topology));
2855 let second = ReactorPlacementManifest::plan(8, Some(&topology));
2856 assert_eq!(first, second);
2857 assert_eq!(first.numa_node_count, 2);
2858 assert_eq!(first.fallback_reason, None);
2859
2860 let observed_nodes = first
2861 .bindings
2862 .iter()
2863 .map(|binding| binding.numa_node)
2864 .collect::<Vec<_>>();
2865 assert_eq!(observed_nodes, vec![5, 42, 5, 42, 5, 42, 5, 42]);
2866
2867 let observed_cores = first
2868 .bindings
2869 .iter()
2870 .map(|binding| binding.core_id)
2871 .collect::<Vec<_>>();
2872 assert_eq!(observed_cores, vec![3, 9, 7, 11, 3, 9, 7, 11]);
2873 }
2874
2875 #[test]
2876 fn reactor_placement_manifest_spreads_across_numa_nodes_round_robin() {
2877 let topology =
2878 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
2879 let manifest = ReactorPlacementManifest::plan(6, Some(&topology));
2880 let observed_nodes = manifest
2881 .bindings
2882 .iter()
2883 .map(|binding| binding.numa_node)
2884 .collect::<Vec<_>>();
2885 assert_eq!(observed_nodes, vec![0, 1, 0, 1, 0, 1]);
2886
2887 let observed_cores = manifest
2888 .bindings
2889 .iter()
2890 .map(|binding| binding.core_id)
2891 .collect::<Vec<_>>();
2892 assert_eq!(observed_cores, vec![0, 4, 1, 5, 0, 4]);
2893 }
2894
2895 #[test]
2896 fn reactor_placement_manifest_records_fallback_when_topology_missing() {
2897 let manifest = ReactorPlacementManifest::plan(3, None);
2898 assert_eq!(
2899 manifest.fallback_reason,
2900 Some(ReactorPlacementFallbackReason::TopologyUnavailable)
2901 );
2902 assert_eq!(manifest.numa_node_count, 1);
2903 assert_eq!(manifest.bindings.len(), 3);
2904 assert_eq!(manifest.bindings[0].core_id, 0);
2905 assert_eq!(manifest.bindings[2].core_id, 2);
2906 }
2907
2908 #[test]
2909 fn reactor_mesh_exposes_machine_readable_placement_manifest() {
2910 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(2, 0), (3, 0)]);
2911 let mesh = ReactorMesh::new(ReactorMeshConfig {
2912 shard_count: 3,
2913 lane_capacity: 8,
2914 topology: Some(topology),
2915 });
2916 let manifest = mesh.placement_manifest();
2917 let as_json = manifest.as_json();
2918 assert_eq!(as_json["shard_count"], serde_json::json!(3));
2919 assert_eq!(as_json["numa_node_count"], serde_json::json!(1));
2920 assert_eq!(
2921 as_json["fallback_reason"],
2922 serde_json::json!(Some("single_numa_node"))
2923 );
2924 assert_eq!(
2925 as_json["bindings"].as_array().map(std::vec::Vec::len),
2926 Some(3),
2927 "expected per-shard binding rows"
2928 );
2929 }
2930
2931 #[test]
2932 fn reactor_mesh_telemetry_includes_binding_and_fallback_metadata() {
2933 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(10, 0), (11, 0)]);
2934 let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2935 shard_count: 2,
2936 lane_capacity: 4,
2937 topology: Some(topology),
2938 });
2939 mesh.enqueue_event("evt-0".to_string(), serde_json::json!({}))
2940 .expect("enqueue event");
2941
2942 let telemetry = mesh.telemetry();
2943 assert_eq!(
2944 telemetry.fallback_reason,
2945 Some(ReactorPlacementFallbackReason::SingleNumaNode)
2946 );
2947 assert_eq!(telemetry.shard_bindings.len(), 2);
2948 let telemetry_json = telemetry.as_json();
2949 assert_eq!(
2950 telemetry_json["fallback_reason"],
2951 serde_json::json!(Some("single_numa_node"))
2952 );
2953 assert_eq!(
2954 telemetry_json["shard_bindings"]
2955 .as_array()
2956 .map(std::vec::Vec::len),
2957 Some(2)
2958 );
2959 }
2960
2961 #[test]
2966 fn numa_slab_alloc_dealloc_round_trip() {
2967 let mut slab = NumaSlab::new(0, 4);
2968 let handle = slab.allocate().expect("should allocate");
2969 assert_eq!(handle.node_id, 0);
2970 assert_eq!(handle.generation, 1);
2971 assert!(slab.deallocate(&handle));
2972 assert_eq!(slab.in_use(), 0);
2973 }
2974
2975 #[test]
2976 fn numa_slab_exhaustion_returns_none() {
2977 let mut slab = NumaSlab::new(0, 2);
2978 let _a = slab.allocate().expect("first alloc");
2979 let _b = slab.allocate().expect("second alloc");
2980 assert!(slab.allocate().is_none(), "slab should be exhausted");
2981 }
2982
2983 #[test]
2984 fn numa_slab_generation_prevents_stale_dealloc() {
2985 let mut slab = NumaSlab::new(0, 2);
2986 let handle_v1 = slab.allocate().expect("first alloc");
2987 assert!(slab.deallocate(&handle_v1));
2988 let _handle_v2 = slab.allocate().expect("reuse slot");
2989 assert!(
2991 !slab.deallocate(&handle_v1),
2992 "stale generation should reject dealloc"
2993 );
2994 }
2995
2996 #[test]
2997 fn numa_slab_double_free_is_rejected() {
2998 let mut slab = NumaSlab::new(0, 4);
2999 let handle = slab.allocate().expect("alloc");
3000 assert!(slab.deallocate(&handle));
3001 assert!(!slab.deallocate(&handle), "double free must be rejected");
3002 }
3003
3004 #[test]
3005 fn numa_slab_wrong_node_dealloc_rejected() {
3006 let mut slab = NumaSlab::new(0, 4);
3007 let handle = slab.allocate().expect("alloc");
3008 let wrong_handle = NumaSlabHandle {
3009 node_id: 99,
3010 ..handle
3011 };
3012 assert!(
3013 !slab.deallocate(&wrong_handle),
3014 "wrong node_id should reject dealloc"
3015 );
3016 }
3017
3018 #[test]
3019 fn numa_slab_high_water_mark_tracks_peak() {
3020 let mut slab = NumaSlab::new(0, 8);
3021 let a = slab.allocate().expect("a");
3022 let b = slab.allocate().expect("b");
3023 let c = slab.allocate().expect("c");
3024 assert_eq!(slab.high_water_mark, 3);
3025 slab.deallocate(&a);
3026 slab.deallocate(&b);
3027 assert_eq!(
3028 slab.high_water_mark, 3,
3029 "high water mark should not decrease"
3030 );
3031 slab.deallocate(&c);
3032 let _d = slab.allocate().expect("d");
3033 assert_eq!(slab.high_water_mark, 3);
3034 }
3035
3036 #[test]
3037 fn numa_slab_pool_routes_to_local_node() {
3038 let topology =
3039 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
3040 let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3041 let config = NumaSlabConfig {
3042 slab_capacity: 8,
3043 entry_size_bytes: 256,
3044 hugepage: HugepageConfig {
3045 enabled: false,
3046 ..HugepageConfig::default()
3047 },
3048 };
3049 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3050 assert_eq!(pool.node_count(), 2);
3051
3052 let (handle, reason) = pool.allocate(1).expect("allocate on node 1");
3053 assert_eq!(handle.node_id, 1);
3054 assert!(reason.is_none(), "should be local allocation");
3055 }
3056
3057 #[test]
3058 fn numa_slab_pool_cross_node_fallback_tracks_telemetry() {
3059 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (2, 1)]);
3060 let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3061 let config = NumaSlabConfig {
3062 slab_capacity: 1, entry_size_bytes: 64,
3064 hugepage: HugepageConfig {
3065 enabled: false,
3066 ..HugepageConfig::default()
3067 },
3068 };
3069 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3070
3071 let (h0, _) = pool.allocate(0).expect("fill node 0");
3073 assert_eq!(h0.node_id, 0);
3074
3075 let (h1, reason) = pool.allocate(0).expect("fallback to node 1");
3077 assert_eq!(h1.node_id, 1);
3078 assert_eq!(reason, Some(CrossNodeReason::LocalExhausted));
3079
3080 let telemetry = pool.telemetry();
3081 assert_eq!(telemetry.cross_node_allocs, 1);
3082 let json = telemetry.as_json();
3083 assert_eq!(json["total_allocs"], serde_json::json!(2));
3084 assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(0));
3085 assert_eq!(json["local_allocs"], serde_json::json!(1));
3086 assert_eq!(json["remote_allocs"], serde_json::json!(1));
3087 assert_eq!(
3088 json["allocation_ratio_bps"]["local"],
3089 serde_json::json!(5000)
3090 );
3091 assert_eq!(
3092 json["allocation_ratio_bps"]["remote"],
3093 serde_json::json!(5000)
3094 );
3095 assert_eq!(
3096 json["allocation_ratio_bps"]["scale"],
3097 serde_json::json!(10_000)
3098 );
3099 assert_eq!(json["hugepage_hit_rate_bps"]["value"], serde_json::json!(0));
3100 assert_eq!(
3101 json["latency_proxies_bps"]["tlb_miss_pressure"],
3102 serde_json::json!(5000)
3103 );
3104 assert_eq!(
3105 json["latency_proxies_bps"]["cache_miss_pressure"],
3106 serde_json::json!(10_000)
3107 );
3108 assert_eq!(
3109 json["latency_proxies_bps"]["occupancy_pressure"],
3110 serde_json::json!(10_000)
3111 );
3112 assert_eq!(
3113 json["pressure_bands"]["tlb_miss"],
3114 serde_json::json!("medium")
3115 );
3116 assert_eq!(
3117 json["pressure_bands"]["cache_miss"],
3118 serde_json::json!("high")
3119 );
3120 assert_eq!(
3121 json["pressure_bands"]["occupancy"],
3122 serde_json::json!("high")
3123 );
3124 assert_eq!(
3125 json["fallback_reasons"]["cross_node"],
3126 serde_json::json!("local_exhausted")
3127 );
3128 assert_eq!(
3129 json["fallback_reasons"]["hugepage"],
3130 serde_json::json!("hugepage_disabled")
3131 );
3132 }
3133
3134 #[test]
3135 fn numa_slab_pool_total_exhaustion_returns_none() {
3136 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3137 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3138 let config = NumaSlabConfig {
3139 slab_capacity: 1,
3140 entry_size_bytes: 64,
3141 hugepage: HugepageConfig {
3142 enabled: false,
3143 ..HugepageConfig::default()
3144 },
3145 };
3146 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3147 let _ = pool.allocate(0).expect("fill the only slot");
3148 assert!(pool.allocate(0).is_none(), "pool should be exhausted");
3149 }
3150
3151 #[test]
3152 fn numa_slab_pool_deallocate_round_trip() {
3153 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1)]);
3154 let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3155 let config = NumaSlabConfig::default();
3156 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3157
3158 let (handle, _) = pool.allocate(1).expect("alloc");
3159 assert!(pool.deallocate(&handle));
3160 assert!(!pool.deallocate(&handle), "double free must be rejected");
3161 }
3162
3163 #[test]
3164 fn hugepage_status_disabled_reports_fallback() {
3165 let config = HugepageConfig {
3166 enabled: false,
3167 ..HugepageConfig::default()
3168 };
3169 let status = HugepageStatus::evaluate(&config, 1024, 512);
3170 assert!(!status.active);
3171 assert_eq!(
3172 status.fallback_reason,
3173 Some(HugepageFallbackReason::Disabled)
3174 );
3175 }
3176
3177 #[test]
3178 fn hugepage_status_zero_totals_means_unavailable() {
3179 let config = HugepageConfig::default();
3180 let status = HugepageStatus::evaluate(&config, 0, 0);
3181 assert!(!status.active);
3182 assert_eq!(
3183 status.fallback_reason,
3184 Some(HugepageFallbackReason::DetectionUnavailable)
3185 );
3186 }
3187
3188 #[test]
3189 fn hugepage_status_zero_free_means_insufficient() {
3190 let config = HugepageConfig::default();
3191 let status = HugepageStatus::evaluate(&config, 1024, 0);
3192 assert!(!status.active);
3193 assert_eq!(
3194 status.fallback_reason,
3195 Some(HugepageFallbackReason::InsufficientHugepages)
3196 );
3197 }
3198
3199 #[test]
3200 fn hugepage_status_available_is_active() {
3201 let config = HugepageConfig::default();
3202 let status = HugepageStatus::evaluate(&config, 1024, 512);
3203 assert!(status.active);
3204 assert!(status.fallback_reason.is_none());
3205 assert_eq!(status.free_pages, 512);
3206 }
3207
3208 #[test]
3209 fn numa_slab_pool_tracks_hugepage_hit_rate_when_active() {
3210 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3211 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3212 let config = NumaSlabConfig {
3213 slab_capacity: 4,
3214 entry_size_bytes: 1024,
3215 hugepage: HugepageConfig {
3216 page_size_bytes: 4096,
3217 enabled: true,
3218 },
3219 };
3220 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3221 pool.set_hugepage_status(HugepageStatus {
3222 total_pages: 128,
3223 free_pages: 64,
3224 page_size_bytes: 4096,
3225 active: true,
3226 fallback_reason: None,
3227 });
3228
3229 let _ = pool.allocate(0).expect("first hugepage-backed alloc");
3230 let _ = pool.allocate(0).expect("second hugepage-backed alloc");
3231
3232 let telemetry = pool.telemetry();
3233 let json = telemetry.as_json();
3234 assert_eq!(json["total_allocs"], serde_json::json!(2));
3235 assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(2));
3236 assert_eq!(
3237 json["hugepage_hit_rate_bps"]["value"],
3238 serde_json::json!(10_000)
3239 );
3240 assert_eq!(
3241 json["hugepage_hit_rate_bps"]["scale"],
3242 serde_json::json!(10_000)
3243 );
3244 assert_eq!(json["hugepage"]["active"], serde_json::json!(true));
3245 }
3246
3247 #[test]
3248 fn numa_slab_pool_misaligned_hugepage_config_reports_alignment_mismatch() {
3249 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3250 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3251 let config = NumaSlabConfig {
3252 slab_capacity: 3,
3253 entry_size_bytes: 1024,
3254 hugepage: HugepageConfig {
3255 page_size_bytes: 2048,
3256 enabled: true,
3257 },
3258 };
3259
3260 let pool = NumaSlabPool::from_manifest(&manifest, config);
3261 let telemetry = pool.telemetry();
3262 assert!(!telemetry.hugepage_status.active);
3263 assert_eq!(
3264 telemetry.hugepage_status.fallback_reason,
3265 Some(HugepageFallbackReason::AlignmentMismatch)
3266 );
3267
3268 let json = telemetry.as_json();
3269 assert_eq!(
3270 json["hugepage"]["fallback_reason"],
3271 serde_json::json!("alignment_mismatch")
3272 );
3273 assert_eq!(
3274 json["fallback_reasons"]["hugepage"],
3275 serde_json::json!("alignment_mismatch")
3276 );
3277 }
3278
3279 #[test]
3280 fn numa_slab_pool_aligned_hugepage_config_defaults_to_detection_unavailable() {
3281 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3282 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3283 let config = NumaSlabConfig {
3284 slab_capacity: 4,
3285 entry_size_bytes: 1024,
3286 hugepage: HugepageConfig {
3287 page_size_bytes: 4096,
3288 enabled: true,
3289 },
3290 };
3291
3292 let pool = NumaSlabPool::from_manifest(&manifest, config);
3293 let telemetry = pool.telemetry();
3294 assert!(!telemetry.hugepage_status.active);
3295 assert_eq!(
3296 telemetry.hugepage_status.fallback_reason,
3297 Some(HugepageFallbackReason::DetectionUnavailable)
3298 );
3299 }
3300
3301 #[test]
3302 fn misaligned_hugepage_config_rejects_external_status_override() {
3303 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3304 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3305 let config = NumaSlabConfig {
3306 slab_capacity: 3,
3307 entry_size_bytes: 1024,
3308 hugepage: HugepageConfig {
3309 page_size_bytes: 2048,
3310 enabled: true,
3311 },
3312 };
3313 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3314
3315 let forced = HugepageStatus::evaluate(&config.hugepage, 256, 64);
3316 assert!(forced.active);
3317 assert!(forced.fallback_reason.is_none());
3318
3319 pool.set_hugepage_status(forced);
3320 let telemetry = pool.telemetry();
3321 assert!(!telemetry.hugepage_status.active);
3322 assert_eq!(
3323 telemetry.hugepage_status.fallback_reason,
3324 Some(HugepageFallbackReason::AlignmentMismatch)
3325 );
3326 }
3327
3328 #[test]
3329 fn disabled_hugepage_config_rejects_external_active_status_override() {
3330 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3331 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3332 let config = NumaSlabConfig {
3333 slab_capacity: 4,
3334 entry_size_bytes: 1024,
3335 hugepage: HugepageConfig {
3336 page_size_bytes: 4096,
3337 enabled: false,
3338 },
3339 };
3340 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3341
3342 let forced = HugepageStatus {
3343 total_pages: 512,
3344 free_pages: 256,
3345 page_size_bytes: 4096,
3346 active: true,
3347 fallback_reason: None,
3348 };
3349 pool.set_hugepage_status(forced);
3350
3351 let telemetry = pool.telemetry();
3352 assert!(!telemetry.hugepage_status.active);
3353 assert_eq!(
3354 telemetry.hugepage_status.fallback_reason,
3355 Some(HugepageFallbackReason::Disabled)
3356 );
3357 assert_eq!(telemetry.hugepage_status.total_pages, 512);
3358 assert_eq!(telemetry.hugepage_status.free_pages, 256);
3359
3360 let json = telemetry.as_json();
3361 assert_eq!(
3362 json["hugepage"]["fallback_reason"],
3363 serde_json::json!("hugepage_disabled")
3364 );
3365 }
3366
3367 #[test]
3368 fn disabled_hugepage_config_uses_disabled_reason_even_if_slab_is_misaligned() {
3369 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3370 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3371 let config = NumaSlabConfig {
3372 slab_capacity: 3,
3373 entry_size_bytes: 1024,
3374 hugepage: HugepageConfig {
3375 page_size_bytes: 2048,
3376 enabled: false,
3377 },
3378 };
3379
3380 let pool = NumaSlabPool::from_manifest(&manifest, config);
3381 let telemetry = pool.telemetry();
3382 assert!(!telemetry.hugepage_status.active);
3383 assert_eq!(
3384 telemetry.hugepage_status.fallback_reason,
3385 Some(HugepageFallbackReason::Disabled)
3386 );
3387 }
3388
3389 #[test]
3390 fn hugepage_alignment_rejects_zero_page_size_and_fails_closed() {
3391 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3392 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3393 let config = NumaSlabConfig {
3394 slab_capacity: 4,
3395 entry_size_bytes: 1024,
3396 hugepage: HugepageConfig {
3397 page_size_bytes: 0,
3398 enabled: true,
3399 },
3400 };
3401 assert!(!config.hugepage_alignment_ok());
3402
3403 let pool = NumaSlabPool::from_manifest(&manifest, config);
3404 let telemetry = pool.telemetry();
3405 assert!(!telemetry.hugepage_status.active);
3406 assert_eq!(
3407 telemetry.hugepage_status.fallback_reason,
3408 Some(HugepageFallbackReason::AlignmentMismatch)
3409 );
3410 }
3411
3412 #[test]
3413 fn hugepage_alignment_rejects_zero_footprint_and_fails_closed() {
3414 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3415 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3416 let config = NumaSlabConfig {
3417 slab_capacity: 0,
3418 entry_size_bytes: 1024,
3419 hugepage: HugepageConfig {
3420 page_size_bytes: 2048,
3421 enabled: true,
3422 },
3423 };
3424 assert_eq!(config.slab_footprint_bytes(), Some(0));
3425 assert!(!config.hugepage_alignment_ok());
3426
3427 let pool = NumaSlabPool::from_manifest(&manifest, config);
3428 let telemetry = pool.telemetry();
3429 assert!(!telemetry.hugepage_status.active);
3430 assert_eq!(
3431 telemetry.hugepage_status.fallback_reason,
3432 Some(HugepageFallbackReason::AlignmentMismatch)
3433 );
3434 }
3435
3436 #[test]
3437 fn hugepage_alignment_rejects_checked_mul_overflow_without_panicking() {
3438 let config = NumaSlabConfig {
3439 slab_capacity: usize::MAX,
3440 entry_size_bytes: 2,
3441 hugepage: HugepageConfig {
3442 page_size_bytes: 4096,
3443 enabled: true,
3444 },
3445 };
3446 assert!(config.slab_footprint_bytes().is_none());
3447 assert!(!config.hugepage_alignment_ok());
3448
3449 let status = config.alignment_mismatch_status();
3450 assert!(!status.active);
3451 assert_eq!(
3452 status.fallback_reason,
3453 Some(HugepageFallbackReason::AlignmentMismatch)
3454 );
3455 }
3456
3457 #[test]
3458 fn zero_page_size_config_rejects_external_status_override() {
3459 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3460 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3461 let config = NumaSlabConfig {
3462 slab_capacity: 4,
3463 entry_size_bytes: 1024,
3464 hugepage: HugepageConfig {
3465 page_size_bytes: 0,
3466 enabled: true,
3467 },
3468 };
3469 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3470
3471 let forced = HugepageStatus {
3472 total_pages: 128,
3473 free_pages: 64,
3474 page_size_bytes: 0,
3475 active: true,
3476 fallback_reason: None,
3477 };
3478 pool.set_hugepage_status(forced);
3479
3480 let telemetry = pool.telemetry();
3481 assert!(!telemetry.hugepage_status.active);
3482 assert_eq!(
3483 telemetry.hugepage_status.fallback_reason,
3484 Some(HugepageFallbackReason::AlignmentMismatch)
3485 );
3486 }
3487
3488 #[test]
3489 fn zero_footprint_config_rejects_external_status_override() {
3490 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3491 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3492 let config = NumaSlabConfig {
3493 slab_capacity: 0,
3494 entry_size_bytes: 1024,
3495 hugepage: HugepageConfig {
3496 page_size_bytes: 2048,
3497 enabled: true,
3498 },
3499 };
3500 assert_eq!(config.slab_footprint_bytes(), Some(0));
3501
3502 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3503 let forced = HugepageStatus {
3504 total_pages: 128,
3505 free_pages: 64,
3506 page_size_bytes: 2048,
3507 active: true,
3508 fallback_reason: None,
3509 };
3510 pool.set_hugepage_status(forced);
3511
3512 let telemetry = pool.telemetry();
3513 assert!(!telemetry.hugepage_status.active);
3514 assert_eq!(
3515 telemetry.hugepage_status.fallback_reason,
3516 Some(HugepageFallbackReason::AlignmentMismatch)
3517 );
3518 assert_eq!(telemetry.hugepage_status.total_pages, 0);
3519 assert_eq!(telemetry.hugepage_status.free_pages, 0);
3520 }
3521
3522 #[test]
3523 fn checked_mul_overflow_config_rejects_external_status_override() {
3524 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3525 let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3526 let config = NumaSlabConfig {
3527 slab_capacity: 2,
3528 entry_size_bytes: usize::MAX,
3529 hugepage: HugepageConfig {
3530 page_size_bytes: 4096,
3531 enabled: true,
3532 },
3533 };
3534 assert!(config.slab_footprint_bytes().is_none());
3535
3536 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3537 let forced = HugepageStatus {
3538 total_pages: 512,
3539 free_pages: 256,
3540 page_size_bytes: 4096,
3541 active: true,
3542 fallback_reason: None,
3543 };
3544 pool.set_hugepage_status(forced);
3545
3546 let telemetry = pool.telemetry();
3547 assert!(!telemetry.hugepage_status.active);
3548 assert_eq!(
3549 telemetry.hugepage_status.fallback_reason,
3550 Some(HugepageFallbackReason::AlignmentMismatch)
3551 );
3552 assert_eq!(telemetry.hugepage_status.total_pages, 0);
3553 assert_eq!(telemetry.hugepage_status.free_pages, 0);
3554 }
3555
3556 #[test]
3557 fn hugepage_status_json_is_stable() {
3558 let config = HugepageConfig::default();
3559 let status = HugepageStatus::evaluate(&config, 1024, 128);
3560 let json = status.as_json();
3561 assert_eq!(json["total_pages"], serde_json::json!(1024));
3562 assert_eq!(json["free_pages"], serde_json::json!(128));
3563 assert_eq!(json["active"], serde_json::json!(true));
3564 assert!(json["fallback_reason"].is_null());
3565 }
3566
3567 #[test]
3568 fn numa_slab_telemetry_json_has_expected_shape() {
3569 let topology =
3570 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
3571 let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3572 let config = NumaSlabConfig {
3573 slab_capacity: 16,
3574 entry_size_bytes: 128,
3575 hugepage: HugepageConfig {
3576 enabled: false,
3577 ..HugepageConfig::default()
3578 },
3579 };
3580 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3581 let _ = pool.allocate(0);
3582 let _ = pool.allocate(1);
3583 let _ = pool.allocate(0);
3584
3585 let telemetry = pool.telemetry();
3586 let json = telemetry.as_json();
3587 assert_eq!(json["node_count"], serde_json::json!(2));
3588 assert_eq!(json["total_allocs"], serde_json::json!(3));
3589 assert_eq!(json["total_in_use"], serde_json::json!(3));
3590 assert_eq!(json["cross_node_allocs"], serde_json::json!(0));
3591 assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(0));
3592 assert_eq!(json["local_allocs"], serde_json::json!(3));
3593 assert_eq!(json["remote_allocs"], serde_json::json!(0));
3594 assert_eq!(
3595 json["allocation_ratio_bps"]["local"],
3596 serde_json::json!(10_000)
3597 );
3598 assert_eq!(json["allocation_ratio_bps"]["remote"], serde_json::json!(0));
3599 assert_eq!(
3600 json["allocation_ratio_bps"]["scale"],
3601 serde_json::json!(10_000)
3602 );
3603 assert_eq!(json["hugepage_hit_rate_bps"]["value"], serde_json::json!(0));
3604 assert_eq!(
3605 json["latency_proxies_bps"]["tlb_miss_pressure"],
3606 serde_json::json!(0)
3607 );
3608 assert_eq!(
3609 json["latency_proxies_bps"]["cache_miss_pressure"],
3610 serde_json::json!(937)
3611 );
3612 assert_eq!(
3613 json["latency_proxies_bps"]["occupancy_pressure"],
3614 serde_json::json!(937)
3615 );
3616 assert_eq!(
3617 json["latency_proxies_bps"]["scale"],
3618 serde_json::json!(10_000)
3619 );
3620 assert_eq!(json["pressure_bands"]["tlb_miss"], serde_json::json!("low"));
3621 assert_eq!(
3622 json["pressure_bands"]["cache_miss"],
3623 serde_json::json!("low")
3624 );
3625 assert_eq!(
3626 json["pressure_bands"]["occupancy"],
3627 serde_json::json!("low")
3628 );
3629 assert_eq!(
3630 json["fallback_reasons"]["cross_node"],
3631 serde_json::Value::Null
3632 );
3633 assert_eq!(
3634 json["fallback_reasons"]["hugepage"],
3635 serde_json::json!("hugepage_disabled")
3636 );
3637 assert_eq!(json["config"]["slab_capacity"], serde_json::json!(16));
3638 assert_eq!(json["per_node"].as_array().map(std::vec::Vec::len), Some(2));
3639 }
3640
3641 #[test]
3642 fn thread_affinity_advice_matches_placement_manifest() {
3643 let topology =
3644 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
3645 let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3646 let advice = manifest.affinity_advice(AffinityEnforcement::Advisory);
3647 assert_eq!(advice.len(), 4);
3648 assert_eq!(advice[0].shard_id, 0);
3649 assert_eq!(advice[0].recommended_core, 0);
3650 assert_eq!(advice[0].recommended_numa_node, 0);
3651 assert_eq!(advice[0].enforcement, AffinityEnforcement::Advisory);
3652 assert_eq!(advice[1].recommended_numa_node, 1);
3653 assert_eq!(advice[3].recommended_numa_node, 1);
3654 }
3655
3656 #[test]
3657 fn thread_affinity_advice_json_is_stable() {
3658 let advice = ThreadAffinityAdvice {
3659 shard_id: 0,
3660 recommended_core: 3,
3661 recommended_numa_node: 1,
3662 enforcement: AffinityEnforcement::Strict,
3663 };
3664 let json = advice.as_json();
3665 assert_eq!(json["shard_id"], serde_json::json!(0));
3666 assert_eq!(json["recommended_core"], serde_json::json!(3));
3667 assert_eq!(json["enforcement"], serde_json::json!("strict"));
3668 }
3669
3670 #[test]
3671 fn reactor_mesh_preferred_numa_node_uses_manifest() {
3672 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (4, 1), (8, 2)]);
3673 let mesh = ReactorMesh::new(ReactorMeshConfig {
3674 shard_count: 3,
3675 lane_capacity: 8,
3676 topology: Some(topology),
3677 });
3678 assert_eq!(mesh.preferred_numa_node(0), 0);
3679 assert_eq!(mesh.preferred_numa_node(1), 1);
3680 assert_eq!(mesh.preferred_numa_node(2), 2);
3681 assert_eq!(mesh.preferred_numa_node(99), 0); }
3683
3684 #[test]
3685 fn reactor_mesh_affinity_advice_covers_all_shards() {
3686 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1)]);
3687 let mesh = ReactorMesh::new(ReactorMeshConfig {
3688 shard_count: 2,
3689 lane_capacity: 8,
3690 topology: Some(topology),
3691 });
3692 let advice = mesh.affinity_advice(AffinityEnforcement::Disabled);
3693 assert_eq!(advice.len(), 2);
3694 assert_eq!(advice[0].enforcement, AffinityEnforcement::Disabled);
3695 assert_eq!(advice[1].enforcement, AffinityEnforcement::Disabled);
3696 }
3697
3698 #[test]
3699 fn numa_slab_pool_from_manifest_with_no_topology_creates_single_node() {
3700 let manifest = ReactorPlacementManifest::plan(4, None);
3701 let pool = NumaSlabPool::from_manifest(&manifest, NumaSlabConfig::default());
3702 assert_eq!(pool.node_count(), 1);
3703 }
3704
3705 #[test]
3706 fn numa_node_for_shard_returns_none_for_unknown() {
3707 let manifest = ReactorPlacementManifest::plan(2, None);
3708 assert!(manifest.numa_node_for_shard(0).is_some());
3709 assert!(manifest.numa_node_for_shard(99).is_none());
3710 }
3711
3712 #[test]
3713 fn numa_slab_capacity_clamp_to_at_least_one() {
3714 let slab = NumaSlab::new(0, 0);
3715 assert_eq!(slab.capacity, 1);
3716 }
3717
3718 #[test]
3719 fn cross_node_reason_code_matches() {
3720 assert_eq!(CrossNodeReason::LocalExhausted.as_code(), "local_exhausted");
3721 }
3722
3723 #[test]
3724 fn affinity_enforcement_code_coverage() {
3725 assert_eq!(AffinityEnforcement::Advisory.as_code(), "advisory");
3726 assert_eq!(AffinityEnforcement::Strict.as_code(), "strict");
3727 assert_eq!(AffinityEnforcement::Disabled.as_code(), "disabled");
3728 }
3729
3730 #[test]
3733 fn enqueue_hostcall_completions_batch_preserves_order() {
3734 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
3735 let completions = vec![
3736 (
3737 "c-1".to_string(),
3738 HostcallOutcome::Success(serde_json::json!(1)),
3739 ),
3740 (
3741 "c-2".to_string(),
3742 HostcallOutcome::Success(serde_json::json!(2)),
3743 ),
3744 (
3745 "c-3".to_string(),
3746 HostcallOutcome::Success(serde_json::json!(3)),
3747 ),
3748 ];
3749 sched.enqueue_hostcall_completions(completions);
3750 assert_eq!(sched.macrotask_count(), 3);
3751
3752 for expected in ["c-1", "c-2", "c-3"] {
3754 let task = sched.tick().expect("should have macrotask");
3755 match task.kind {
3756 MacrotaskKind::HostcallComplete { ref call_id, .. } => {
3757 assert_eq!(call_id, expected);
3758 }
3759 _ => panic!("expected HostcallComplete"),
3760 }
3761 }
3762 assert!(sched.tick().is_none());
3763 }
3764
3765 #[test]
3766 fn time_until_next_timer_positive_case() {
3767 let mut sched = Scheduler::with_clock(DeterministicClock::new(100));
3768 sched.set_timeout(50); assert_eq!(sched.time_until_next_timer(), Some(50));
3770
3771 sched.clock.advance(20); assert_eq!(sched.time_until_next_timer(), Some(30));
3773 }
3774
3775 #[test]
3776 fn deterministic_clock_set_overrides_current_time() {
3777 let clock = DeterministicClock::new(0);
3778 assert_eq!(clock.now_ms(), 0);
3779 clock.advance(50);
3780 assert_eq!(clock.now_ms(), 50);
3781 clock.set(1000);
3782 assert_eq!(clock.now_ms(), 1000);
3783 clock.advance(5);
3784 assert_eq!(clock.now_ms(), 1005);
3785 }
3786
3787 #[test]
3788 fn reactor_mesh_queue_depth_per_shard() {
3789 let config = ReactorMeshConfig {
3790 shard_count: 4,
3791 lane_capacity: 64,
3792 topology: None,
3793 };
3794 let mut mesh = ReactorMesh::new(config);
3795
3796 for shard in 0..4 {
3798 assert_eq!(mesh.queue_depth(shard), Some(0));
3799 }
3800 assert_eq!(mesh.queue_depth(99), None);
3802
3803 for i in 0..4 {
3805 mesh.enqueue_event(format!("evt-{i}"), serde_json::json!(null))
3806 .expect("enqueue should succeed");
3807 }
3808 for shard in 0..4 {
3810 assert_eq!(mesh.queue_depth(shard), Some(1), "shard {shard} depth");
3811 }
3812 }
3813
3814 #[test]
3815 fn reactor_mesh_shard_count_and_total_depth() {
3816 let config = ReactorMeshConfig {
3817 shard_count: 3,
3818 lane_capacity: 16,
3819 topology: None,
3820 };
3821 let mut mesh = ReactorMesh::new(config);
3822 assert_eq!(mesh.shard_count(), 3);
3823 assert_eq!(mesh.total_depth(), 0);
3824 assert!(!mesh.has_pending());
3825
3826 mesh.enqueue_event("e1".to_string(), serde_json::json!(null))
3827 .unwrap();
3828 mesh.enqueue_event("e2".to_string(), serde_json::json!(null))
3829 .unwrap();
3830 assert_eq!(mesh.total_depth(), 2);
3831 assert!(mesh.has_pending());
3832 }
3833
3834 #[test]
3835 fn reactor_mesh_drain_shard_out_of_range_returns_empty() {
3836 let config = ReactorMeshConfig {
3837 shard_count: 2,
3838 lane_capacity: 16,
3839 topology: None,
3840 };
3841 let mut mesh = ReactorMesh::new(config);
3842 mesh.enqueue_event("e1".to_string(), serde_json::json!(null))
3843 .unwrap();
3844 let drained = mesh.drain_shard(99, 10);
3845 assert!(drained.is_empty());
3846 }
3847
3848 #[test]
3849 fn reactor_placement_manifest_zero_shards() {
3850 let manifest = ReactorPlacementManifest::plan(0, None);
3851 assert_eq!(manifest.shard_count, 0);
3852 assert!(manifest.bindings.is_empty());
3853 assert!(manifest.fallback_reason.is_none());
3854 }
3855
3856 #[test]
3857 fn reactor_placement_manifest_as_json_has_expected_fields() {
3858 let topology =
3859 ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
3860 let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3861 let json = manifest.as_json();
3862
3863 assert_eq!(json["shard_count"], 4);
3864 assert_eq!(json["numa_node_count"], 2);
3865 assert!(json["fallback_reason"].is_null());
3866 let bindings = json["bindings"].as_array().expect("bindings array");
3867 assert_eq!(bindings.len(), 4);
3868 for binding in bindings {
3869 assert!(binding.get("shard_id").is_some());
3870 assert!(binding.get("core_id").is_some());
3871 assert!(binding.get("numa_node").is_some());
3872 }
3873 }
3874
3875 #[test]
3876 fn reactor_placement_manifest_single_node_fallback() {
3877 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0)]);
3878 let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3879 assert_eq!(
3880 manifest.fallback_reason,
3881 Some(ReactorPlacementFallbackReason::SingleNumaNode)
3882 );
3883 }
3884
3885 #[test]
3886 fn reactor_placement_manifest_empty_topology_fallback() {
3887 let topology = ReactorTopologySnapshot { cores: vec![] };
3888 let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3889 assert_eq!(
3890 manifest.fallback_reason,
3891 Some(ReactorPlacementFallbackReason::TopologyEmpty)
3892 );
3893 }
3894
3895 #[test]
3896 fn reactor_placement_fallback_reason_as_code_all_variants() {
3897 assert_eq!(
3898 ReactorPlacementFallbackReason::TopologyUnavailable.as_code(),
3899 "topology_unavailable"
3900 );
3901 assert_eq!(
3902 ReactorPlacementFallbackReason::TopologyEmpty.as_code(),
3903 "topology_empty"
3904 );
3905 assert_eq!(
3906 ReactorPlacementFallbackReason::SingleNumaNode.as_code(),
3907 "single_numa_node"
3908 );
3909 }
3910
3911 #[test]
3912 fn hugepage_fallback_reason_as_code_all_variants() {
3913 assert_eq!(
3914 HugepageFallbackReason::Disabled.as_code(),
3915 "hugepage_disabled"
3916 );
3917 assert_eq!(
3918 HugepageFallbackReason::DetectionUnavailable.as_code(),
3919 "detection_unavailable"
3920 );
3921 assert_eq!(
3922 HugepageFallbackReason::InsufficientHugepages.as_code(),
3923 "insufficient_hugepages"
3924 );
3925 assert_eq!(
3926 HugepageFallbackReason::AlignmentMismatch.as_code(),
3927 "alignment_mismatch"
3928 );
3929 }
3930
3931 #[test]
3932 fn numa_slab_pool_set_hugepage_status_and_node_count() {
3933 let manifest = ReactorPlacementManifest::plan(4, None);
3934 let config = NumaSlabConfig {
3935 slab_capacity: 4096,
3936 entry_size_bytes: 512,
3937 hugepage: HugepageConfig {
3938 page_size_bytes: 2 * 1024 * 1024,
3939 enabled: true,
3940 },
3941 };
3942 let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3943 assert_eq!(pool.node_count(), 1);
3944
3945 let status = HugepageStatus::evaluate(&config.hugepage, 512, 256);
3946 assert!(status.active);
3947 pool.set_hugepage_status(status);
3948
3949 let telem = pool.telemetry();
3950 assert!(telem.hugepage_status.active);
3951 assert_eq!(telem.hugepage_status.free_pages, 256);
3952 }
3953
3954 #[test]
3955 fn numa_slab_pool_multi_node_node_count() {
3956 let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1), (2, 2)]);
3957 let manifest = ReactorPlacementManifest::plan(3, Some(&topology));
3958 let pool = NumaSlabPool::from_manifest(&manifest, NumaSlabConfig::default());
3959 assert_eq!(pool.node_count(), 3);
3960 }
3961
3962 #[test]
3963 fn reactor_mesh_telemetry_as_json_has_expected_shape() {
3964 let config = ReactorMeshConfig {
3965 shard_count: 2,
3966 lane_capacity: 8,
3967 topology: None,
3968 };
3969 let mesh = ReactorMesh::new(config);
3970 let telem = mesh.telemetry();
3971 let json = telem.as_json();
3972
3973 let depths = json["queue_depths"].as_array().expect("queue_depths");
3974 assert_eq!(depths.len(), 2);
3975 assert_eq!(json["rejected_enqueues"], 0);
3976 let bindings = json["shard_bindings"].as_array().expect("shard_bindings");
3977 assert_eq!(bindings.len(), 2);
3978 assert!(json.get("fallback_reason").is_some());
3979 }
3980
3981 #[test]
3982 fn numa_slab_in_use_and_has_capacity() {
3983 let mut slab = NumaSlab::new(0, 3);
3984 assert_eq!(slab.in_use(), 0);
3985 assert!(slab.has_capacity());
3986
3987 let h1 = slab.allocate().expect("alloc 1");
3988 assert_eq!(slab.in_use(), 1);
3989 assert!(slab.has_capacity());
3990
3991 let h2 = slab.allocate().expect("alloc 2");
3992 let _h3 = slab.allocate().expect("alloc 3");
3993 assert_eq!(slab.in_use(), 3);
3994 assert!(!slab.has_capacity());
3995 assert!(slab.allocate().is_none());
3996
3997 slab.deallocate(&h1);
3998 assert_eq!(slab.in_use(), 2);
3999 assert!(slab.has_capacity());
4000
4001 slab.deallocate(&h2);
4002 assert_eq!(slab.in_use(), 1);
4003 }
4004
4005 #[test]
4006 fn scheduler_macrotask_count_tracks_queue_size() {
4007 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4008 assert_eq!(sched.macrotask_count(), 0);
4009
4010 sched.enqueue_event("e1".to_string(), serde_json::json!(null));
4011 sched.enqueue_event("e2".to_string(), serde_json::json!(null));
4012 assert_eq!(sched.macrotask_count(), 2);
4013
4014 sched.tick();
4015 assert_eq!(sched.macrotask_count(), 1);
4016
4017 sched.tick();
4018 assert_eq!(sched.macrotask_count(), 0);
4019 }
4020
4021 #[test]
4022 fn scheduler_timer_count_reflects_pending_timers() {
4023 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4024 assert_eq!(sched.timer_count(), 0);
4025
4026 sched.set_timeout(100);
4027 sched.set_timeout(200);
4028 assert_eq!(sched.timer_count(), 2);
4029
4030 sched.clock.advance(150);
4032 sched.tick();
4033 assert_eq!(sched.timer_count(), 1);
4034 }
4035
4036 #[test]
4037 fn scheduler_current_seq_advances_with_operations() {
4038 let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4039 let initial = sched.current_seq();
4040 assert_eq!(initial.value(), 0);
4041
4042 sched.set_timeout(100); assert!(sched.current_seq().value() > initial.value());
4044
4045 let after_timer = sched.current_seq();
4046 sched.enqueue_event("evt".to_string(), serde_json::json!(null)); assert!(sched.current_seq().value() > after_timer.value());
4048 }
4049
4050 #[test]
4051 fn thread_affinity_advice_as_json_structure() {
4052 let advice = ThreadAffinityAdvice {
4053 shard_id: 2,
4054 recommended_core: 5,
4055 recommended_numa_node: 1,
4056 enforcement: AffinityEnforcement::Strict,
4057 };
4058 let json = advice.as_json();
4059 assert_eq!(json["shard_id"], 2);
4060 assert_eq!(json["recommended_core"], 5);
4061 assert_eq!(json["recommended_numa_node"], 1);
4062 assert_eq!(json["enforcement"], "strict");
4063 }
4064
4065 mod proptest_scheduler {
4068 use super::*;
4069 use proptest::prelude::*;
4070
4071 proptest! {
4072 #[test]
4073 fn seq_next_is_monotonic(start in 0..u64::MAX - 100) {
4074 let s = Seq(start);
4075 let n = s.next();
4076 assert!(n >= s, "Seq::next must be monotonically non-decreasing");
4077 assert!(
4078 n.value() == start + 1 || start == u64::MAX,
4079 "Seq::next must increment by 1 unless saturated"
4080 );
4081 }
4082
4083 #[test]
4084 fn seq_next_saturates(start in u64::MAX - 5..=u64::MAX) {
4085 let s = Seq(start);
4086 let n = s.next();
4087 let _ = n.value();
4089 assert!(n >= s, "must be monotonic even at saturation boundary");
4090 }
4091
4092 #[test]
4093 fn timer_entry_ordering_consistent_with_min_heap(
4094 id_a in 0..1000u64,
4095 id_b in 0..1000u64,
4096 deadline_a in 0..10000u64,
4097 deadline_b in 0..10000u64,
4098 seq_a in 0..1000u64,
4099 seq_b in 0..1000u64,
4100 ) {
4101 let ta = TimerEntry::new(id_a, deadline_a, Seq(seq_a));
4102 let tb = TimerEntry::new(id_b, deadline_b, Seq(seq_b));
4103 if deadline_a < deadline_b {
4105 assert!(ta > tb, "earlier deadline must sort greater (min-heap)");
4106 } else if deadline_a > deadline_b {
4107 assert!(ta < tb, "later deadline must sort less (min-heap)");
4108 } else if seq_a < seq_b {
4109 assert!(ta > tb, "same deadline, earlier seq must sort greater");
4110 } else if seq_a > seq_b {
4111 assert!(ta < tb, "same deadline, later seq must sort less");
4112 } else {
4113 assert!(ta == tb, "same deadline+seq must be equal");
4114 }
4115 }
4116
4117 #[test]
4118 fn stable_hash_is_deterministic(input in "[a-z0-9_.-]{1,64}") {
4119 let h1 = ReactorMesh::stable_hash(&input);
4120 let h2 = ReactorMesh::stable_hash(&input);
4121 assert!(h1 == h2, "stable_hash must be deterministic");
4122 }
4123
4124 #[test]
4125 fn hash_route_returns_valid_shard(
4126 shard_count in 1..32usize,
4127 call_id in "[a-z0-9]{1,20}",
4128 ) {
4129 let config = ReactorMeshConfig {
4130 shard_count,
4131 lane_capacity: 16,
4132 topology: None,
4133 };
4134 let mesh = ReactorMesh::new(config);
4135 let shard = mesh.hash_route(&call_id);
4136 assert!(
4137 shard < mesh.shard_count(),
4138 "hash_route returned {shard} >= shard_count {}",
4139 mesh.shard_count(),
4140 );
4141 }
4142
4143 #[test]
4144 fn rr_route_returns_valid_shard(
4145 shard_count in 1..32usize,
4146 iterations in 1..100usize,
4147 ) {
4148 let config = ReactorMeshConfig {
4149 shard_count,
4150 lane_capacity: 16,
4151 topology: None,
4152 };
4153 let mut mesh = ReactorMesh::new(config);
4154 for _ in 0..iterations {
4155 let shard = mesh.rr_route();
4156 assert!(
4157 shard < mesh.shard_count(),
4158 "rr_route returned {shard} >= shard_count {}",
4159 mesh.shard_count(),
4160 );
4161 }
4162 }
4163
4164 #[test]
4165 fn drain_global_order_is_sorted(
4166 shard_count in 1..8usize,
4167 lane_capacity in 2..16usize,
4168 enqueues in 1..30usize,
4169 ) {
4170 let config = ReactorMeshConfig {
4171 shard_count,
4172 lane_capacity,
4173 topology: None,
4174 };
4175 let mut mesh = ReactorMesh::new(config);
4176 let mut success_count = 0usize;
4177 for i in 0..enqueues {
4178 let call_id = format!("call_{i}");
4179 let outcome = HostcallOutcome::Success(serde_json::Value::Null);
4180 if mesh.enqueue_hostcall_complete(call_id, outcome).is_ok() {
4181 success_count += 1;
4182 }
4183 }
4184 let drained = mesh.drain_global_order(success_count);
4185 for pair in drained.windows(2) {
4187 assert!(
4188 pair[0].global_seq < pair[1].global_seq,
4189 "drain_global_order must emit ascending seq: {:?} vs {:?}",
4190 pair[0].global_seq,
4191 pair[1].global_seq,
4192 );
4193 }
4194 }
4195
4196 #[test]
4197 fn mesh_total_depth_bounded_by_capacity(
4198 shard_count in 1..8usize,
4199 lane_capacity in 1..16usize,
4200 enqueues in 0..100usize,
4201 ) {
4202 let config = ReactorMeshConfig {
4203 shard_count,
4204 lane_capacity,
4205 topology: None,
4206 };
4207 let mut mesh = ReactorMesh::new(config);
4208 for i in 0..enqueues {
4209 let call_id = format!("call_{i}");
4210 let outcome = HostcallOutcome::Success(serde_json::Value::Null);
4211 let _ = mesh.enqueue_hostcall_complete(call_id, outcome);
4212 }
4213 let max_total = shard_count * lane_capacity;
4214 assert!(
4215 mesh.total_depth() <= max_total,
4216 "total_depth {} exceeds max possible {}",
4217 mesh.total_depth(),
4218 max_total,
4219 );
4220 }
4221
4222 #[test]
4223 fn scheduler_timer_cancel_idempotent(
4224 timer_count in 1..10usize,
4225 cancel_idx in 0..10usize,
4226 ) {
4227 let clock = DeterministicClock::new(0);
4228 let mut sched = Scheduler::with_clock(clock);
4229 let mut timer_ids = Vec::new();
4230 for i in 0..timer_count {
4231 timer_ids.push(sched.set_timeout(u64::try_from(i + 1).unwrap() * 100));
4232 }
4233 if cancel_idx < timer_ids.len() {
4234 let tid = timer_ids[cancel_idx];
4235 let first = sched.clear_timeout(tid);
4236 let second = sched.clear_timeout(tid);
4237 assert!(first, "first cancel should succeed");
4238 assert!(!second, "second cancel should return false");
4239 }
4240 }
4241 }
4242 }
4243}