Skip to main content

pi/
scheduler.rs

1//! Deterministic event loop scheduler for PiJS runtime.
2//!
3//! Implements the spec from EXTENSIONS.md §1A.4.5:
4//! - Queue model: microtasks (handled by JS engine), macrotasks, timers
5//! - Timer heap with stable ordering guarantees
6//! - Hostcall completion enqueue with stable tie-breaking
7//! - Single-threaded scheduler loop reproducible under fixed inputs
8//!
9//! # Invariants
10//!
11//! - **I1 (single macrotask):** at most one macrotask executes per tick
12//! - **I2 (microtask fixpoint):** after any macrotask, microtasks drain to empty
13//! - **I3 (stable timers):** timers with equal deadlines fire in increasing seq order
14//! - **I4 (no reentrancy):** hostcall completions enqueue macrotasks, never re-enter
15//! - **I5 (total order):** all observable scheduling is ordered by seq
16
17use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::collections::BinaryHeap;
20use std::collections::VecDeque;
21use std::fmt;
22use std::sync::Arc;
23
24/// Monotonically increasing sequence counter for deterministic ordering.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub struct Seq(u64);
27
28impl Seq {
29    /// Create the initial sequence value.
30    #[must_use]
31    pub const fn zero() -> Self {
32        Self(0)
33    }
34
35    /// Get the next sequence value, incrementing the counter.
36    #[must_use]
37    pub const fn next(self) -> Self {
38        Self(self.0.saturating_add(1))
39    }
40
41    /// Get the raw value.
42    #[must_use]
43    pub const fn value(self) -> u64 {
44        self.0
45    }
46}
47
48impl fmt::Display for Seq {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        write!(f, "seq:{}", self.0)
51    }
52}
53
54/// A timer entry in the timer heap.
55#[derive(Debug, Clone)]
56pub struct TimerEntry {
57    /// Timer ID for cancellation.
58    pub timer_id: u64,
59    /// Absolute deadline in milliseconds.
60    pub deadline_ms: u64,
61    /// Sequence number for stable ordering.
62    pub seq: Seq,
63}
64
65impl TimerEntry {
66    /// Create a new timer entry.
67    #[must_use]
68    pub const fn new(timer_id: u64, deadline_ms: u64, seq: Seq) -> Self {
69        Self {
70            timer_id,
71            deadline_ms,
72            seq,
73        }
74    }
75}
76
77// Order by (deadline_ms, seq) ascending - min-heap needs reversed comparison.
78impl PartialEq for TimerEntry {
79    fn eq(&self, other: &Self) -> bool {
80        self.deadline_ms == other.deadline_ms && self.seq == other.seq
81    }
82}
83
84impl Eq for TimerEntry {}
85
86impl PartialOrd for TimerEntry {
87    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88        Some(self.cmp(other))
89    }
90}
91
92impl Ord for TimerEntry {
93    fn cmp(&self, other: &Self) -> Ordering {
94        // Reverse ordering for min-heap: smaller deadline/seq = higher priority
95        match other.deadline_ms.cmp(&self.deadline_ms) {
96            Ordering::Equal => other.seq.cmp(&self.seq),
97            ord => ord,
98        }
99    }
100}
101
102/// Type of macrotask in the queue.
103#[derive(Debug, Clone)]
104pub enum MacrotaskKind {
105    /// A timer fired.
106    TimerFired { timer_id: u64 },
107    /// A hostcall completed.
108    HostcallComplete {
109        call_id: String,
110        outcome: HostcallOutcome,
111    },
112    /// An inbound event from the host.
113    InboundEvent {
114        event_id: String,
115        payload: serde_json::Value,
116    },
117}
118
119/// Outcome of a hostcall.
120#[derive(Debug, Clone)]
121pub enum HostcallOutcome {
122    /// Successful result.
123    Success(serde_json::Value),
124    /// Error result.
125    Error { code: String, message: String },
126    /// Incremental stream chunk.
127    StreamChunk {
128        /// Monotonically increasing sequence number per call.
129        sequence: u64,
130        /// Arbitrary JSON payload for this chunk.
131        chunk: serde_json::Value,
132        /// `true` on the final chunk.
133        is_final: bool,
134    },
135}
136
137/// A macrotask in the queue.
138#[derive(Debug, Clone)]
139pub struct Macrotask {
140    /// Sequence number for deterministic ordering.
141    pub seq: Seq,
142    /// The task kind and payload.
143    pub kind: MacrotaskKind,
144}
145
146impl Macrotask {
147    /// Create a new macrotask.
148    #[must_use]
149    pub const fn new(seq: Seq, kind: MacrotaskKind) -> Self {
150        Self { seq, kind }
151    }
152}
153
154// Order by seq ascending.
155impl PartialEq for Macrotask {
156    fn eq(&self, other: &Self) -> bool {
157        self.seq == other.seq
158    }
159}
160
161impl Eq for Macrotask {}
162
163impl PartialOrd for Macrotask {
164    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
165        Some(self.cmp(other))
166    }
167}
168
169impl Ord for Macrotask {
170    fn cmp(&self, other: &Self) -> Ordering {
171        // VecDeque FIFO order - no reordering needed, but we use seq for verification
172        self.seq.cmp(&other.seq)
173    }
174}
175
176/// A monotonic clock source for the scheduler.
177pub trait Clock: Send + Sync {
178    /// Get the current time in milliseconds since epoch.
179    fn now_ms(&self) -> u64;
180}
181
182impl<C: Clock> Clock for Arc<C> {
183    fn now_ms(&self) -> u64 {
184        self.as_ref().now_ms()
185    }
186}
187
188/// Real wall clock implementation.
189#[derive(Debug, Clone, Copy, Default)]
190pub struct WallClock;
191
192impl Clock for WallClock {
193    fn now_ms(&self) -> u64 {
194        use std::time::{SystemTime, UNIX_EPOCH};
195        let millis = SystemTime::now()
196            .duration_since(UNIX_EPOCH)
197            .unwrap_or_default()
198            .as_millis();
199        u64::try_from(millis).unwrap_or(u64::MAX)
200    }
201}
202
203/// A deterministic clock for testing.
204#[derive(Debug)]
205pub struct DeterministicClock {
206    current_ms: std::sync::atomic::AtomicU64,
207}
208
209impl DeterministicClock {
210    /// Create a new deterministic clock starting at the given time.
211    #[must_use]
212    pub const fn new(start_ms: u64) -> Self {
213        Self {
214            current_ms: std::sync::atomic::AtomicU64::new(start_ms),
215        }
216    }
217
218    /// Advance the clock by the given duration.
219    pub fn advance(&self, ms: u64) {
220        self.current_ms
221            .fetch_add(ms, std::sync::atomic::Ordering::SeqCst);
222    }
223
224    /// Set the clock to a specific time.
225    pub fn set(&self, ms: u64) {
226        self.current_ms
227            .store(ms, std::sync::atomic::Ordering::SeqCst);
228    }
229}
230
231impl Clock for DeterministicClock {
232    fn now_ms(&self) -> u64 {
233        self.current_ms.load(std::sync::atomic::Ordering::SeqCst)
234    }
235}
236
237/// The deterministic event loop scheduler state.
238pub struct Scheduler<C: Clock = WallClock> {
239    /// Monotone sequence counter.
240    seq: Seq,
241    /// Macrotask queue (FIFO, ordered by seq).
242    macrotask_queue: VecDeque<Macrotask>,
243    /// Timer heap (min-heap by deadline_ms, seq).
244    timer_heap: BinaryHeap<TimerEntry>,
245    /// Next timer ID.
246    next_timer_id: u64,
247    /// Cancelled timer IDs.
248    cancelled_timers: std::collections::HashSet<u64>,
249    /// All timer IDs currently in the heap (active or cancelled).
250    heap_timer_ids: std::collections::HashSet<u64>,
251    /// Clock source.
252    clock: C,
253}
254
255impl Scheduler<WallClock> {
256    /// Create a new scheduler with the default wall clock.
257    #[must_use]
258    pub fn new() -> Self {
259        Self::with_clock(WallClock)
260    }
261}
262
263impl Default for Scheduler<WallClock> {
264    fn default() -> Self {
265        Self::new()
266    }
267}
268
269impl<C: Clock> Scheduler<C> {
270    /// Create a new scheduler with a custom clock.
271    #[must_use]
272    pub fn with_clock(clock: C) -> Self {
273        Self {
274            seq: Seq::zero(),
275            macrotask_queue: VecDeque::new(),
276            timer_heap: BinaryHeap::new(),
277            next_timer_id: 1,
278            cancelled_timers: std::collections::HashSet::new(),
279            heap_timer_ids: std::collections::HashSet::new(),
280            clock,
281        }
282    }
283
284    /// Get the current sequence number.
285    #[must_use]
286    pub const fn current_seq(&self) -> Seq {
287        self.seq
288    }
289
290    /// Get the next sequence number and increment the counter.
291    const fn next_seq(&mut self) -> Seq {
292        let current = self.seq;
293        self.seq = self.seq.next();
294        current
295    }
296
297    /// Get the current time from the clock.
298    #[must_use]
299    pub fn now_ms(&self) -> u64 {
300        self.clock.now_ms()
301    }
302
303    /// Check if there are pending tasks.
304    #[must_use]
305    pub fn has_pending(&self) -> bool {
306        !self.macrotask_queue.is_empty()
307            || self
308                .timer_heap
309                .iter()
310                .any(|entry| !self.cancelled_timers.contains(&entry.timer_id))
311    }
312
313    /// Get the number of pending macrotasks.
314    #[must_use]
315    pub fn macrotask_count(&self) -> usize {
316        self.macrotask_queue.len()
317    }
318
319    /// Get the number of pending timers.
320    #[must_use]
321    pub fn timer_count(&self) -> usize {
322        self.timer_heap.len()
323    }
324
325    /// Schedule a timer to fire at the given deadline.
326    ///
327    /// Returns the timer ID for cancellation.
328    pub fn set_timeout(&mut self, delay_ms: u64) -> u64 {
329        let timer_id = self.allocate_timer_id();
330        let deadline_ms = self.clock.now_ms().saturating_add(delay_ms);
331        let seq = self.next_seq();
332
333        self.timer_heap
334            .push(TimerEntry::new(timer_id, deadline_ms, seq));
335        self.heap_timer_ids.insert(timer_id);
336
337        tracing::trace!(
338            event = "scheduler.timer.set",
339            timer_id,
340            delay_ms,
341            deadline_ms,
342            %seq,
343            "Timer scheduled"
344        );
345
346        timer_id
347    }
348
349    fn timer_id_in_use(&self, timer_id: u64) -> bool {
350        self.heap_timer_ids.contains(&timer_id)
351    }
352
353    fn allocate_timer_id(&mut self) -> u64 {
354        if self.next_timer_id < u64::MAX {
355            let timer_id = self.next_timer_id;
356            self.next_timer_id += 1;
357            return timer_id;
358        }
359
360        if !self.timer_id_in_use(u64::MAX) {
361            self.next_timer_id = 1;
362            return u64::MAX;
363        }
364
365        for candidate in 1..u64::MAX {
366            if !self.timer_id_in_use(candidate) {
367                self.next_timer_id = candidate.saturating_add(1);
368                return candidate;
369            }
370        }
371
372        tracing::error!(
373            event = "scheduler.timer_id.exhausted",
374            "Timer ID namespace exhausted; falling back to u64::MAX reuse"
375        );
376        u64::MAX
377    }
378
379    /// Cancel a timer by ID.
380    ///
381    /// Returns true if the timer was found and cancelled.
382    pub fn clear_timeout(&mut self, timer_id: u64) -> bool {
383        let pending =
384            self.heap_timer_ids.contains(&timer_id) && !self.cancelled_timers.contains(&timer_id);
385
386        let cancelled = if pending {
387            self.cancelled_timers.insert(timer_id)
388        } else {
389            false
390        };
391
392        tracing::trace!(
393            event = "scheduler.timer.cancel",
394            timer_id,
395            cancelled,
396            "Timer cancelled"
397        );
398
399        cancelled
400    }
401
402    /// Enqueue a hostcall completion.
403    pub fn enqueue_hostcall_complete(&mut self, call_id: String, outcome: HostcallOutcome) {
404        let seq = self.next_seq();
405        tracing::trace!(
406            event = "scheduler.hostcall.enqueue",
407            call_id = %call_id,
408            %seq,
409            "Hostcall completion enqueued"
410        );
411        let task = Macrotask::new(seq, MacrotaskKind::HostcallComplete { call_id, outcome });
412        self.macrotask_queue.push_back(task);
413    }
414
415    /// Enqueue multiple hostcall completions in one scheduler mutation pass.
416    pub fn enqueue_hostcall_completions<I>(&mut self, completions: I)
417    where
418        I: IntoIterator<Item = (String, HostcallOutcome)>,
419    {
420        for (call_id, outcome) in completions {
421            self.enqueue_hostcall_complete(call_id, outcome);
422        }
423    }
424
425    /// Convenience: enqueue a stream chunk for a hostcall.
426    pub fn enqueue_stream_chunk(
427        &mut self,
428        call_id: String,
429        sequence: u64,
430        chunk: serde_json::Value,
431        is_final: bool,
432    ) {
433        self.enqueue_hostcall_complete(
434            call_id,
435            HostcallOutcome::StreamChunk {
436                sequence,
437                chunk,
438                is_final,
439            },
440        );
441    }
442
443    /// Enqueue an inbound event from the host.
444    pub fn enqueue_event(&mut self, event_id: String, payload: serde_json::Value) {
445        let seq = self.next_seq();
446        tracing::trace!(
447            event = "scheduler.event.enqueue",
448            event_id = %event_id,
449            %seq,
450            "Inbound event enqueued"
451        );
452        let task = Macrotask::new(seq, MacrotaskKind::InboundEvent { event_id, payload });
453        self.macrotask_queue.push_back(task);
454    }
455
456    /// Move due timers from the timer heap to the macrotask queue.
457    ///
458    /// This is step 2 of the tick() algorithm.
459    fn move_due_timers(&mut self) {
460        let now = self.clock.now_ms();
461
462        while let Some(entry) = self.timer_heap.peek() {
463            if entry.deadline_ms > now {
464                break;
465            }
466
467            let entry = self.timer_heap.pop().expect("peeked");
468            self.heap_timer_ids.remove(&entry.timer_id);
469
470            // Skip cancelled timers
471            if self.cancelled_timers.remove(&entry.timer_id) {
472                tracing::trace!(
473                    event = "scheduler.timer.skip_cancelled",
474                    timer_id = entry.timer_id,
475                    "Skipped cancelled timer"
476                );
477                continue;
478            }
479
480            // Preserve (deadline, timer-seq) order while assigning a fresh
481            // macrotask seq so queue ordering remains globally monotone.
482            let task_seq = self.next_seq();
483            let task = Macrotask::new(
484                task_seq,
485                MacrotaskKind::TimerFired {
486                    timer_id: entry.timer_id,
487                },
488            );
489            self.macrotask_queue.push_back(task);
490
491            tracing::trace!(
492                event = "scheduler.timer.fire",
493                timer_id = entry.timer_id,
494                deadline_ms = entry.deadline_ms,
495                now_ms = now,
496                timer_seq = %entry.seq,
497                macrotask_seq = %task_seq,
498                "Timer fired"
499            );
500        }
501    }
502
503    /// Execute one tick of the event loop.
504    ///
505    /// Algorithm (from spec):
506    /// 1. Ingest host completions (done externally via enqueue methods)
507    /// 2. Move due timers to macrotask queue
508    /// 3. Run one macrotask (if any)
509    /// 4. Drain microtasks (done externally by JS engine)
510    ///
511    /// Returns the macrotask that was executed, if any.
512    pub fn tick(&mut self) -> Option<Macrotask> {
513        // Step 2: Move due timers
514        self.move_due_timers();
515
516        // Step 3: Run one macrotask
517        let task = self.macrotask_queue.pop_front();
518
519        if let Some(ref task) = task {
520            tracing::debug!(
521                event = "scheduler.tick.execute",
522                seq = %task.seq,
523                kind = ?std::mem::discriminant(&task.kind),
524                "Executing macrotask"
525            );
526        } else {
527            tracing::trace!(event = "scheduler.tick.idle", "No macrotask to execute");
528        }
529
530        task
531    }
532
533    /// Get the deadline of the next timer, if any.
534    #[must_use]
535    pub fn next_timer_deadline(&self) -> Option<u64> {
536        self.timer_heap
537            .iter()
538            .filter(|entry| !self.cancelled_timers.contains(&entry.timer_id))
539            .map(|entry| entry.deadline_ms)
540            .min()
541    }
542
543    /// Get the time until the next timer fires, if any.
544    #[must_use]
545    pub fn time_until_next_timer(&self) -> Option<u64> {
546        self.next_timer_deadline()
547            .map(|deadline| deadline.saturating_sub(self.clock.now_ms()))
548    }
549}
550
551// ============================================================================
552// Core-pinned reactor mesh (URPC-style SPSC lanes)
553// ============================================================================
554
555/// Configuration for [`ReactorMesh`].
556#[derive(Debug, Clone)]
557pub struct ReactorMeshConfig {
558    /// Number of core-pinned shards (one SPSC lane per shard).
559    pub shard_count: usize,
560    /// Maximum queued envelopes per shard lane.
561    pub lane_capacity: usize,
562    /// Optional topology snapshot for deterministic shard placement planning.
563    pub topology: Option<ReactorTopologySnapshot>,
564}
565
566impl Default for ReactorMeshConfig {
567    fn default() -> Self {
568        Self {
569            shard_count: 4,
570            lane_capacity: 1024,
571            topology: None,
572        }
573    }
574}
575
576/// Core descriptor used by topology-aware shard placement.
577#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
578pub struct ReactorTopologyCore {
579    pub core_id: usize,
580    pub numa_node: usize,
581}
582
583/// Lightweight machine-provided topology snapshot.
584#[derive(Debug, Clone, PartialEq, Eq, Default)]
585pub struct ReactorTopologySnapshot {
586    pub cores: Vec<ReactorTopologyCore>,
587}
588
589impl ReactorTopologySnapshot {
590    /// Build a normalized topology snapshot from `(core_id, numa_node)` pairs.
591    #[must_use]
592    pub fn from_core_node_pairs(pairs: &[(usize, usize)]) -> Self {
593        let mut cores = pairs
594            .iter()
595            .map(|(core_id, numa_node)| ReactorTopologyCore {
596                core_id: *core_id,
597                numa_node: *numa_node,
598            })
599            .collect::<Vec<_>>();
600        cores.sort_unstable();
601        cores.dedup();
602        Self { cores }
603    }
604}
605
606/// Explicit fallback reason emitted by topology planner.
607#[derive(Debug, Clone, Copy, PartialEq, Eq)]
608pub enum ReactorPlacementFallbackReason {
609    /// No topology snapshot was available at planning time.
610    TopologyUnavailable,
611    /// A topology snapshot was provided but had no usable cores.
612    TopologyEmpty,
613    /// Topology is available but only one NUMA node exists.
614    SingleNumaNode,
615}
616
617impl ReactorPlacementFallbackReason {
618    #[must_use]
619    pub const fn as_code(self) -> &'static str {
620        match self {
621            Self::TopologyUnavailable => "topology_unavailable",
622            Self::TopologyEmpty => "topology_empty",
623            Self::SingleNumaNode => "single_numa_node",
624        }
625    }
626}
627
628/// Deterministic shard binding produced by placement planner.
629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
630pub struct ReactorShardBinding {
631    pub shard_id: usize,
632    pub core_id: usize,
633    pub numa_node: usize,
634}
635
636/// Machine-readable shard placement manifest.
637#[derive(Debug, Clone, PartialEq, Eq)]
638pub struct ReactorPlacementManifest {
639    pub shard_count: usize,
640    pub numa_node_count: usize,
641    pub bindings: Vec<ReactorShardBinding>,
642    pub fallback_reason: Option<ReactorPlacementFallbackReason>,
643}
644
645impl ReactorPlacementManifest {
646    /// Plan deterministic shard placement from optional topology.
647    #[must_use]
648    pub fn plan(shard_count: usize, topology: Option<&ReactorTopologySnapshot>) -> Self {
649        if shard_count == 0 {
650            return Self {
651                shard_count: 0,
652                numa_node_count: 0,
653                bindings: Vec::new(),
654                fallback_reason: None,
655            };
656        }
657
658        let Some(topology) = topology else {
659            let bindings = (0..shard_count)
660                .map(|shard_id| ReactorShardBinding {
661                    shard_id,
662                    core_id: shard_id,
663                    numa_node: 0,
664                })
665                .collect::<Vec<_>>();
666            return Self {
667                shard_count,
668                numa_node_count: 1,
669                bindings,
670                fallback_reason: Some(ReactorPlacementFallbackReason::TopologyUnavailable),
671            };
672        };
673
674        if topology.cores.is_empty() {
675            let bindings = (0..shard_count)
676                .map(|shard_id| ReactorShardBinding {
677                    shard_id,
678                    core_id: shard_id,
679                    numa_node: 0,
680                })
681                .collect::<Vec<_>>();
682            return Self {
683                shard_count,
684                numa_node_count: 1,
685                bindings,
686                fallback_reason: Some(ReactorPlacementFallbackReason::TopologyEmpty),
687            };
688        }
689
690        let mut by_node = BTreeMap::<usize, Vec<usize>>::new();
691        for core in &topology.cores {
692            by_node
693                .entry(core.numa_node)
694                .or_default()
695                .push(core.core_id);
696        }
697        for cores in by_node.values_mut() {
698            cores.sort_unstable();
699            cores.dedup();
700        }
701        let nodes = by_node
702            .into_iter()
703            .filter(|(_, cores)| !cores.is_empty())
704            .collect::<Vec<_>>();
705
706        if nodes.is_empty() {
707            let bindings = (0..shard_count)
708                .map(|shard_id| ReactorShardBinding {
709                    shard_id,
710                    core_id: shard_id,
711                    numa_node: 0,
712                })
713                .collect::<Vec<_>>();
714            return Self {
715                shard_count,
716                numa_node_count: 1,
717                bindings,
718                fallback_reason: Some(ReactorPlacementFallbackReason::TopologyEmpty),
719            };
720        }
721
722        let node_count = nodes.len();
723        let fallback_reason = if node_count == 1 {
724            Some(ReactorPlacementFallbackReason::SingleNumaNode)
725        } else {
726            None
727        };
728
729        let mut bindings = Vec::with_capacity(shard_count);
730        for shard_id in 0..shard_count {
731            let node_idx = shard_id % node_count;
732            let (numa_node, cores) = &nodes[node_idx];
733            let core_idx = (shard_id / node_count) % cores.len();
734            bindings.push(ReactorShardBinding {
735                shard_id,
736                core_id: cores[core_idx],
737                numa_node: *numa_node,
738            });
739        }
740
741        Self {
742            shard_count,
743            numa_node_count: node_count,
744            bindings,
745            fallback_reason,
746        }
747    }
748
749    /// Render placement manifest as stable machine-readable JSON.
750    #[must_use]
751    pub fn as_json(&self) -> serde_json::Value {
752        serde_json::json!({
753            "shard_count": self.shard_count,
754            "numa_node_count": self.numa_node_count,
755            "fallback_reason": self.fallback_reason.map(ReactorPlacementFallbackReason::as_code),
756            "bindings": self.bindings.iter().map(|binding| {
757                serde_json::json!({
758                    "shard_id": binding.shard_id,
759                    "core_id": binding.core_id,
760                    "numa_node": binding.numa_node
761                })
762            }).collect::<Vec<_>>()
763        })
764    }
765}
766
767/// Per-envelope metadata produced by [`ReactorMesh`].
768#[derive(Debug, Clone)]
769pub struct ReactorEnvelope {
770    /// Global sequence for deterministic cross-shard ordering.
771    pub global_seq: Seq,
772    /// Monotone sequence scoped to the destination shard.
773    pub shard_seq: u64,
774    /// Destination shard that owns this envelope.
775    pub shard_id: usize,
776    /// Payload to execute on the shard.
777    pub task: MacrotaskKind,
778}
779
780impl ReactorEnvelope {
781    const fn new(global_seq: Seq, shard_seq: u64, shard_id: usize, task: MacrotaskKind) -> Self {
782        Self {
783            global_seq,
784            shard_seq,
785            shard_id,
786            task,
787        }
788    }
789}
790
791/// Backpressure signal for rejected mesh enqueue operations.
792#[derive(Debug, Clone, Copy, PartialEq, Eq)]
793pub struct ReactorBackpressure {
794    pub shard_id: usize,
795    pub depth: usize,
796    pub capacity: usize,
797}
798
799/// Lightweight telemetry snapshot for mesh queueing behavior.
800#[derive(Debug, Clone, PartialEq, Eq)]
801pub struct ReactorMeshTelemetry {
802    pub queue_depths: Vec<usize>,
803    pub max_queue_depths: Vec<usize>,
804    pub rejected_enqueues: u64,
805    pub shard_bindings: Vec<ReactorShardBinding>,
806    pub fallback_reason: Option<ReactorPlacementFallbackReason>,
807}
808
809impl ReactorMeshTelemetry {
810    /// Render telemetry as machine-readable JSON for diagnostics.
811    #[must_use]
812    pub fn as_json(&self) -> serde_json::Value {
813        serde_json::json!({
814            "queue_depths": self.queue_depths,
815            "max_queue_depths": self.max_queue_depths,
816            "rejected_enqueues": self.rejected_enqueues,
817            "fallback_reason": self.fallback_reason.map(ReactorPlacementFallbackReason::as_code),
818            "shard_bindings": self.shard_bindings.iter().map(|binding| {
819                serde_json::json!({
820                    "shard_id": binding.shard_id,
821                    "core_id": binding.core_id,
822                    "numa_node": binding.numa_node,
823                })
824            }).collect::<Vec<_>>()
825        })
826    }
827}
828
829/// Deterministic SPSC-style lane.
830///
831/// This models the semantics of a bounded SPSC ring without unsafe code.
832#[derive(Debug, Clone)]
833struct SpscLane<T> {
834    capacity: usize,
835    queue: VecDeque<T>,
836    max_depth: usize,
837}
838
839impl<T> SpscLane<T> {
840    fn new(capacity: usize) -> Self {
841        Self {
842            capacity,
843            queue: VecDeque::with_capacity(capacity),
844            max_depth: 0,
845        }
846    }
847
848    fn len(&self) -> usize {
849        self.queue.len()
850    }
851
852    fn is_empty(&self) -> bool {
853        self.queue.is_empty()
854    }
855
856    fn push(&mut self, value: T) -> Result<(), usize> {
857        if self.queue.len() >= self.capacity {
858            return Err(self.queue.len());
859        }
860        self.queue.push_back(value);
861        self.max_depth = self.max_depth.max(self.queue.len());
862        Ok(())
863    }
864
865    fn pop(&mut self) -> Option<T> {
866        self.queue.pop_front()
867    }
868
869    fn front(&self) -> Option<&T> {
870        self.queue.front()
871    }
872}
873
874/// Deterministic multi-shard reactor mesh using bounded per-shard SPSC lanes.
875///
876/// Routing policy:
877/// - Hostcall completions are hash-routed by `call_id` for shard affinity.
878/// - Inbound events use deterministic round-robin distribution across shards.
879///
880/// Drain policy:
881/// - `drain_global_order()` emits envelopes in ascending global sequence
882///   across all shard heads, preserving deterministic external ordering.
883#[derive(Debug, Clone)]
884pub struct ReactorMesh {
885    seq: Seq,
886    lanes: Vec<SpscLane<ReactorEnvelope>>,
887    shard_seq: Vec<u64>,
888    rr_cursor: usize,
889    rejected_enqueues: u64,
890    placement_manifest: ReactorPlacementManifest,
891}
892
893impl ReactorMesh {
894    /// Create a mesh using the provided config.
895    ///
896    /// The mesh is fail-closed for invalid config values:
897    /// `shard_count == 0` or `lane_capacity == 0` returns an empty mesh.
898    #[must_use]
899    #[allow(clippy::needless_pass_by_value)]
900    pub fn new(config: ReactorMeshConfig) -> Self {
901        let shard_count = config.shard_count.max(1);
902        let lane_capacity = config.lane_capacity.max(1);
903        let placement_manifest =
904            ReactorPlacementManifest::plan(shard_count, config.topology.as_ref());
905        let lanes = (0..shard_count)
906            .map(|_| SpscLane::new(lane_capacity))
907            .collect::<Vec<_>>();
908        Self {
909            seq: Seq::zero(),
910            lanes,
911            shard_seq: vec![0; shard_count],
912            rr_cursor: 0,
913            rejected_enqueues: 0,
914            placement_manifest,
915        }
916    }
917
918    /// Number of shard lanes.
919    #[must_use]
920    pub fn shard_count(&self) -> usize {
921        self.lanes.len()
922    }
923
924    /// Total pending envelopes across all shards.
925    #[must_use]
926    pub fn total_depth(&self) -> usize {
927        self.lanes.iter().map(SpscLane::len).sum()
928    }
929
930    /// Whether any lane has pending envelopes.
931    #[must_use]
932    pub fn has_pending(&self) -> bool {
933        self.total_depth() > 0
934    }
935
936    /// Per-shard queue depth.
937    #[must_use]
938    pub fn queue_depth(&self, shard_id: usize) -> Option<usize> {
939        self.lanes.get(shard_id).map(SpscLane::len)
940    }
941
942    /// Snapshot queueing telemetry for diagnostics.
943    #[must_use]
944    pub fn telemetry(&self) -> ReactorMeshTelemetry {
945        ReactorMeshTelemetry {
946            queue_depths: self.lanes.iter().map(SpscLane::len).collect(),
947            max_queue_depths: self.lanes.iter().map(|lane| lane.max_depth).collect(),
948            rejected_enqueues: self.rejected_enqueues,
949            shard_bindings: self.placement_manifest.bindings.clone(),
950            fallback_reason: self.placement_manifest.fallback_reason,
951        }
952    }
953
954    /// Deterministic shard placement manifest used by this mesh.
955    #[must_use]
956    pub const fn placement_manifest(&self) -> &ReactorPlacementManifest {
957        &self.placement_manifest
958    }
959
960    const fn next_global_seq(&mut self) -> Seq {
961        let current = self.seq;
962        self.seq = self.seq.next();
963        current
964    }
965
966    fn next_shard_seq(&mut self, shard_id: usize) -> u64 {
967        let Some(seq) = self.shard_seq.get_mut(shard_id) else {
968            return 0;
969        };
970        let current = *seq;
971        *seq = seq.saturating_add(1);
972        current
973    }
974
975    fn stable_hash(input: &str) -> u64 {
976        // FNV-1a 64-bit for deterministic process-independent routing.
977        let mut hash = 0xcbf2_9ce4_8422_2325_u64;
978        for byte in input.as_bytes() {
979            hash ^= u64::from(*byte);
980            hash = hash.wrapping_mul(0x0100_0000_01b3_u64);
981        }
982        hash
983    }
984
985    fn hash_route(&self, call_id: &str) -> usize {
986        if self.lanes.len() <= 1 {
987            return 0;
988        }
989        let lanes = u64::try_from(self.lanes.len()).unwrap_or(1);
990        let slot = Self::stable_hash(call_id) % lanes;
991        usize::try_from(slot).unwrap_or(0)
992    }
993
994    fn rr_route(&mut self) -> usize {
995        if self.lanes.len() <= 1 {
996            return 0;
997        }
998        let idx = self.rr_cursor % self.lanes.len();
999        self.rr_cursor = self.rr_cursor.wrapping_add(1);
1000        idx
1001    }
1002
1003    fn enqueue_with_route(
1004        &mut self,
1005        shard_id: usize,
1006        task: MacrotaskKind,
1007    ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1008        let global_seq = self.next_global_seq();
1009        let shard_seq = self.next_shard_seq(shard_id);
1010        let envelope = ReactorEnvelope::new(global_seq, shard_seq, shard_id, task);
1011        let Some(lane) = self.lanes.get_mut(shard_id) else {
1012            self.rejected_enqueues = self.rejected_enqueues.saturating_add(1);
1013            return Err(ReactorBackpressure {
1014                shard_id,
1015                depth: 0,
1016                capacity: 0,
1017            });
1018        };
1019        match lane.push(envelope.clone()) {
1020            Ok(()) => Ok(envelope),
1021            Err(depth) => {
1022                self.rejected_enqueues = self.rejected_enqueues.saturating_add(1);
1023                Err(ReactorBackpressure {
1024                    shard_id,
1025                    depth,
1026                    capacity: lane.capacity,
1027                })
1028            }
1029        }
1030    }
1031
1032    /// Enqueue a hostcall completion using deterministic hash routing.
1033    pub fn enqueue_hostcall_complete(
1034        &mut self,
1035        call_id: String,
1036        outcome: HostcallOutcome,
1037    ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1038        let shard_id = self.hash_route(&call_id);
1039        self.enqueue_with_route(
1040            shard_id,
1041            MacrotaskKind::HostcallComplete { call_id, outcome },
1042        )
1043    }
1044
1045    /// Enqueue an inbound event using deterministic round-robin routing.
1046    pub fn enqueue_event(
1047        &mut self,
1048        event_id: String,
1049        payload: serde_json::Value,
1050    ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1051        let shard_id = self.rr_route();
1052        self.enqueue_with_route(shard_id, MacrotaskKind::InboundEvent { event_id, payload })
1053    }
1054
1055    /// Drain one shard up to `budget` envelopes.
1056    pub fn drain_shard(&mut self, shard_id: usize, budget: usize) -> Vec<ReactorEnvelope> {
1057        let Some(lane) = self.lanes.get_mut(shard_id) else {
1058            return Vec::new();
1059        };
1060        let mut drained = Vec::with_capacity(budget.min(lane.len()));
1061        for _ in 0..budget {
1062            let Some(item) = lane.pop() else {
1063                break;
1064            };
1065            drained.push(item);
1066        }
1067        drained
1068    }
1069
1070    /// Drain across shards in deterministic global sequence order.
1071    pub fn drain_global_order(&mut self, budget: usize) -> Vec<ReactorEnvelope> {
1072        let mut drained = Vec::with_capacity(budget);
1073        for _ in 0..budget {
1074            let mut best_lane: Option<usize> = None;
1075            let mut best_seq: Option<Seq> = None;
1076            for (idx, lane) in self.lanes.iter().enumerate() {
1077                let Some(front) = lane.front() else {
1078                    continue;
1079                };
1080                if best_seq.is_none_or(|seq| front.global_seq < seq) {
1081                    best_seq = Some(front.global_seq);
1082                    best_lane = Some(idx);
1083                }
1084            }
1085            let Some(chosen_lane) = best_lane else {
1086                break;
1087            };
1088            if let Some(item) = self.lanes[chosen_lane].pop() {
1089                drained.push(item);
1090            }
1091        }
1092        drained
1093    }
1094}
1095
1096// ============================================================================
1097// NUMA-aware slab allocator for extension hot paths
1098// ============================================================================
1099
1100/// Configuration for hugepage-backed slab allocation.
1101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1102pub struct HugepageConfig {
1103    /// Hugepage size in bytes (default 2 MiB = `2_097_152`).
1104    pub page_size_bytes: usize,
1105    /// Whether hugepage backing is requested (advisory — falls back gracefully).
1106    pub enabled: bool,
1107}
1108
1109impl Default for HugepageConfig {
1110    fn default() -> Self {
1111        Self {
1112            page_size_bytes: 2 * 1024 * 1024, // 2 MiB
1113            enabled: true,
1114        }
1115    }
1116}
1117
1118/// Reason why hugepage-backed allocation was not used.
1119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1120pub enum HugepageFallbackReason {
1121    /// Hugepage support was explicitly disabled in configuration.
1122    Disabled,
1123    /// No hugepage information available from the OS.
1124    DetectionUnavailable,
1125    /// System reports zero free hugepages.
1126    InsufficientHugepages,
1127    /// Requested slab size does not align to hugepage boundaries.
1128    AlignmentMismatch,
1129}
1130
1131impl HugepageFallbackReason {
1132    #[must_use]
1133    pub const fn as_code(self) -> &'static str {
1134        match self {
1135            Self::Disabled => "hugepage_disabled",
1136            Self::DetectionUnavailable => "detection_unavailable",
1137            Self::InsufficientHugepages => "insufficient_hugepages",
1138            Self::AlignmentMismatch => "alignment_mismatch",
1139        }
1140    }
1141}
1142
1143/// Hugepage availability snapshot from the host.
1144#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1145pub struct HugepageStatus {
1146    /// Total hugepages configured on the system.
1147    pub total_pages: u64,
1148    /// Free hugepages available for allocation.
1149    pub free_pages: u64,
1150    /// Page size in bytes (typically 2 MiB).
1151    pub page_size_bytes: usize,
1152    /// Whether hugepages are actually being used by this pool.
1153    pub active: bool,
1154    /// If not active, why.
1155    pub fallback_reason: Option<HugepageFallbackReason>,
1156}
1157
1158impl HugepageStatus {
1159    /// Determine hugepage viability from system metrics.
1160    #[must_use]
1161    pub const fn evaluate(config: &HugepageConfig, total: u64, free: u64) -> Self {
1162        if !config.enabled {
1163            return Self {
1164                total_pages: total,
1165                free_pages: free,
1166                page_size_bytes: config.page_size_bytes,
1167                active: false,
1168                fallback_reason: Some(HugepageFallbackReason::Disabled),
1169            };
1170        }
1171        if total == 0 && free == 0 {
1172            return Self {
1173                total_pages: 0,
1174                free_pages: 0,
1175                page_size_bytes: config.page_size_bytes,
1176                active: false,
1177                fallback_reason: Some(HugepageFallbackReason::DetectionUnavailable),
1178            };
1179        }
1180        if free == 0 {
1181            return Self {
1182                total_pages: total,
1183                free_pages: 0,
1184                page_size_bytes: config.page_size_bytes,
1185                active: false,
1186                fallback_reason: Some(HugepageFallbackReason::InsufficientHugepages),
1187            };
1188        }
1189        Self {
1190            total_pages: total,
1191            free_pages: free,
1192            page_size_bytes: config.page_size_bytes,
1193            active: true,
1194            fallback_reason: None,
1195        }
1196    }
1197
1198    /// Render as stable JSON for diagnostics.
1199    #[must_use]
1200    pub fn as_json(&self) -> serde_json::Value {
1201        serde_json::json!({
1202            "total_pages": self.total_pages,
1203            "free_pages": self.free_pages,
1204            "page_size_bytes": self.page_size_bytes,
1205            "active": self.active,
1206            "fallback_reason": self.fallback_reason.map(HugepageFallbackReason::as_code),
1207        })
1208    }
1209}
1210
1211/// Configuration for a NUMA-local slab pool.
1212#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1213pub struct NumaSlabConfig {
1214    /// Maximum entries per NUMA-node slab.
1215    pub slab_capacity: usize,
1216    /// Logical size of each entry in bytes (for telemetry, not enforced at runtime).
1217    pub entry_size_bytes: usize,
1218    /// Hugepage configuration.
1219    pub hugepage: HugepageConfig,
1220}
1221
1222impl Default for NumaSlabConfig {
1223    fn default() -> Self {
1224        Self {
1225            slab_capacity: 256,
1226            entry_size_bytes: 512,
1227            hugepage: HugepageConfig::default(),
1228        }
1229    }
1230}
1231
1232impl NumaSlabConfig {
1233    #[must_use]
1234    pub const fn slab_footprint_bytes(&self) -> Option<usize> {
1235        self.slab_capacity.checked_mul(self.entry_size_bytes)
1236    }
1237
1238    #[must_use]
1239    pub const fn hugepage_alignment_ok(&self) -> bool {
1240        if !self.hugepage.enabled {
1241            return true;
1242        }
1243        let page = self.hugepage.page_size_bytes;
1244        if page == 0 {
1245            return false;
1246        }
1247        match self.slab_footprint_bytes() {
1248            Some(bytes) => bytes != 0 && bytes % page == 0,
1249            None => false,
1250        }
1251    }
1252
1253    #[must_use]
1254    pub const fn alignment_mismatch_status(&self) -> HugepageStatus {
1255        HugepageStatus {
1256            total_pages: 0,
1257            free_pages: 0,
1258            page_size_bytes: self.hugepage.page_size_bytes,
1259            active: false,
1260            fallback_reason: Some(HugepageFallbackReason::AlignmentMismatch),
1261        }
1262    }
1263}
1264
1265/// Handle returned on successful slab allocation.
1266///
1267/// Contains enough information to deallocate safely and detect use-after-free
1268/// via generation tracking.
1269#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1270pub struct NumaSlabHandle {
1271    /// NUMA node that owns this allocation.
1272    pub node_id: usize,
1273    /// Slot index within the node's slab.
1274    pub slot_index: usize,
1275    /// Generation at allocation time (prevents ABA on reuse).
1276    pub generation: u64,
1277}
1278
1279/// Per-NUMA-node slab with free-list recycling.
1280#[derive(Debug, Clone)]
1281pub struct NumaSlab {
1282    node_id: usize,
1283    capacity: usize,
1284    generations: Vec<u64>,
1285    allocated: Vec<bool>,
1286    free_list: Vec<usize>,
1287    // Telemetry counters.
1288    total_allocs: u64,
1289    total_frees: u64,
1290    high_water_mark: usize,
1291}
1292
1293impl NumaSlab {
1294    /// Create a new slab for the given NUMA node with the specified capacity.
1295    #[must_use]
1296    pub fn new(node_id: usize, capacity: usize) -> Self {
1297        let capacity = capacity.max(1);
1298        let mut free_list = Vec::with_capacity(capacity);
1299        // Fill free list in reverse so slot 0 is popped first (LIFO recycle).
1300        for idx in (0..capacity).rev() {
1301            free_list.push(idx);
1302        }
1303        Self {
1304            node_id,
1305            capacity,
1306            generations: vec![0; capacity],
1307            allocated: vec![false; capacity],
1308            free_list,
1309            total_allocs: 0,
1310            total_frees: 0,
1311            high_water_mark: 0,
1312        }
1313    }
1314
1315    /// Number of currently allocated slots.
1316    #[must_use]
1317    pub fn in_use(&self) -> usize {
1318        self.capacity.saturating_sub(self.free_list.len())
1319    }
1320
1321    /// Whether the slab has available capacity.
1322    #[must_use]
1323    pub fn has_capacity(&self) -> bool {
1324        !self.free_list.is_empty()
1325    }
1326
1327    /// Allocate a slot, returning a handle or `None` if exhausted.
1328    pub fn allocate(&mut self) -> Option<NumaSlabHandle> {
1329        let slot_index = self.free_list.pop()?;
1330        self.allocated[slot_index] = true;
1331        self.generations[slot_index] = self.generations[slot_index].saturating_add(1);
1332        self.total_allocs = self.total_allocs.saturating_add(1);
1333        self.high_water_mark = self.high_water_mark.max(self.in_use());
1334        Some(NumaSlabHandle {
1335            node_id: self.node_id,
1336            slot_index,
1337            generation: self.generations[slot_index],
1338        })
1339    }
1340
1341    /// Deallocate a slot identified by handle.
1342    ///
1343    /// Returns `true` if deallocation succeeded, `false` if the handle is stale
1344    /// (wrong generation) or already freed.
1345    pub fn deallocate(&mut self, handle: &NumaSlabHandle) -> bool {
1346        if handle.node_id != self.node_id {
1347            return false;
1348        }
1349        if handle.slot_index >= self.capacity {
1350            return false;
1351        }
1352        if !self.allocated[handle.slot_index] {
1353            return false;
1354        }
1355        if self.generations[handle.slot_index] != handle.generation {
1356            return false;
1357        }
1358        self.allocated[handle.slot_index] = false;
1359        self.free_list.push(handle.slot_index);
1360        self.total_frees = self.total_frees.saturating_add(1);
1361        true
1362    }
1363}
1364
1365/// Cross-node allocation reason for telemetry.
1366#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1367pub enum CrossNodeReason {
1368    /// Local node slab is exhausted; allocated from nearest neighbor.
1369    LocalExhausted,
1370}
1371
1372impl CrossNodeReason {
1373    #[must_use]
1374    pub const fn as_code(self) -> &'static str {
1375        match self {
1376            Self::LocalExhausted => "local_exhausted",
1377        }
1378    }
1379}
1380
1381/// Multi-node slab pool that routes allocations to NUMA-local slabs.
1382#[derive(Debug, Clone)]
1383pub struct NumaSlabPool {
1384    slabs: Vec<NumaSlab>,
1385    config: NumaSlabConfig,
1386    hugepage_status: HugepageStatus,
1387    cross_node_allocs: u64,
1388    hugepage_backed_allocs: u64,
1389}
1390
1391impl NumaSlabPool {
1392    /// Create a pool with one slab per NUMA node identified in the placement manifest.
1393    #[must_use]
1394    pub fn from_manifest(manifest: &ReactorPlacementManifest, config: NumaSlabConfig) -> Self {
1395        let mut node_ids = manifest
1396            .bindings
1397            .iter()
1398            .map(|b| b.numa_node)
1399            .collect::<Vec<_>>();
1400        node_ids.sort_unstable();
1401        node_ids.dedup();
1402
1403        if node_ids.is_empty() {
1404            node_ids.push(0);
1405        }
1406
1407        let slabs = node_ids
1408            .iter()
1409            .map(|&node_id| NumaSlab::new(node_id, config.slab_capacity))
1410            .collect();
1411
1412        // Hugepage evaluation with zero system data (caller can override via
1413        // `set_hugepage_status` once real data is available).
1414        let hugepage_status = if config.hugepage.enabled && !config.hugepage_alignment_ok() {
1415            config.alignment_mismatch_status()
1416        } else {
1417            HugepageStatus::evaluate(&config.hugepage, 0, 0)
1418        };
1419
1420        Self {
1421            slabs,
1422            config,
1423            hugepage_status,
1424            cross_node_allocs: 0,
1425            hugepage_backed_allocs: 0,
1426        }
1427    }
1428
1429    /// Override hugepage status after querying the host.
1430    pub const fn set_hugepage_status(&mut self, status: HugepageStatus) {
1431        self.hugepage_status = if !self.config.hugepage.enabled {
1432            HugepageStatus::evaluate(&self.config.hugepage, status.total_pages, status.free_pages)
1433        } else if !self.config.hugepage_alignment_ok() {
1434            self.config.alignment_mismatch_status()
1435        } else {
1436            status
1437        };
1438    }
1439
1440    /// Number of NUMA nodes in this pool.
1441    #[must_use]
1442    pub fn node_count(&self) -> usize {
1443        self.slabs.len()
1444    }
1445
1446    /// Allocate from the preferred NUMA node, falling back to any node with capacity.
1447    ///
1448    /// Returns `(handle, cross_node_reason)` where the reason is `Some` when
1449    /// the allocation was served from a non-preferred node.
1450    pub fn allocate(
1451        &mut self,
1452        preferred_node: usize,
1453    ) -> Option<(NumaSlabHandle, Option<CrossNodeReason>)> {
1454        // Try preferred node first.
1455        if let Some(slab) = self.slabs.iter_mut().find(|s| s.node_id == preferred_node) {
1456            if let Some(handle) = slab.allocate() {
1457                if self.hugepage_status.active {
1458                    self.hugepage_backed_allocs = self.hugepage_backed_allocs.saturating_add(1);
1459                }
1460                return Some((handle, None));
1461            }
1462        }
1463        // Fallback: scan all nodes for available capacity.
1464        for slab in &mut self.slabs {
1465            if slab.node_id == preferred_node {
1466                continue;
1467            }
1468            if let Some(handle) = slab.allocate() {
1469                self.cross_node_allocs = self.cross_node_allocs.saturating_add(1);
1470                if self.hugepage_status.active {
1471                    self.hugepage_backed_allocs = self.hugepage_backed_allocs.saturating_add(1);
1472                }
1473                return Some((handle, Some(CrossNodeReason::LocalExhausted)));
1474            }
1475        }
1476        None
1477    }
1478
1479    /// Deallocate a previously allocated handle.
1480    ///
1481    /// Returns `true` if deallocation succeeded.
1482    pub fn deallocate(&mut self, handle: &NumaSlabHandle) -> bool {
1483        for slab in &mut self.slabs {
1484            if slab.node_id == handle.node_id {
1485                return slab.deallocate(handle);
1486            }
1487        }
1488        false
1489    }
1490
1491    /// Snapshot telemetry for this pool.
1492    #[must_use]
1493    pub fn telemetry(&self) -> NumaSlabTelemetry {
1494        let per_node = self
1495            .slabs
1496            .iter()
1497            .map(|slab| NumaSlabNodeTelemetry {
1498                node_id: slab.node_id,
1499                capacity: slab.capacity,
1500                in_use: slab.in_use(),
1501                total_allocs: slab.total_allocs,
1502                total_frees: slab.total_frees,
1503                high_water_mark: slab.high_water_mark,
1504            })
1505            .collect();
1506        NumaSlabTelemetry {
1507            per_node,
1508            cross_node_allocs: self.cross_node_allocs,
1509            hugepage_backed_allocs: self.hugepage_backed_allocs,
1510            hugepage_status: self.hugepage_status,
1511            config: self.config,
1512        }
1513    }
1514}
1515
1516/// Per-node telemetry counters.
1517#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1518pub struct NumaSlabNodeTelemetry {
1519    pub node_id: usize,
1520    pub capacity: usize,
1521    pub in_use: usize,
1522    pub total_allocs: u64,
1523    pub total_frees: u64,
1524    pub high_water_mark: usize,
1525}
1526
1527/// Aggregate slab pool telemetry.
1528#[derive(Debug, Clone, PartialEq, Eq)]
1529pub struct NumaSlabTelemetry {
1530    pub per_node: Vec<NumaSlabNodeTelemetry>,
1531    pub cross_node_allocs: u64,
1532    pub hugepage_backed_allocs: u64,
1533    pub hugepage_status: HugepageStatus,
1534    pub config: NumaSlabConfig,
1535}
1536
1537impl NumaSlabTelemetry {
1538    const RATIO_SCALE_BPS: u64 = 10_000;
1539
1540    #[must_use]
1541    fn ratio_basis_points(numerator: u64, denominator: u64) -> u64 {
1542        if denominator == 0 {
1543            return 0;
1544        }
1545        let scaled =
1546            (u128::from(numerator) * u128::from(Self::RATIO_SCALE_BPS)) / u128::from(denominator);
1547        u64::try_from(scaled).unwrap_or(Self::RATIO_SCALE_BPS)
1548    }
1549
1550    #[must_use]
1551    const fn pressure_band(value_bps: u64) -> &'static str {
1552        if value_bps >= 7_500 {
1553            "high"
1554        } else if value_bps >= 2_500 {
1555            "medium"
1556        } else {
1557            "low"
1558        }
1559    }
1560
1561    /// Render as stable machine-readable JSON for diagnostics.
1562    #[must_use]
1563    pub fn as_json(&self) -> serde_json::Value {
1564        let total_allocs: u64 = self.per_node.iter().map(|n| n.total_allocs).sum();
1565        let total_frees: u64 = self.per_node.iter().map(|n| n.total_frees).sum();
1566        let total_in_use: usize = self.per_node.iter().map(|n| n.in_use).sum();
1567        let total_capacity: usize = self.per_node.iter().map(|n| n.capacity).sum();
1568        let total_high_water: usize = self.per_node.iter().map(|n| n.high_water_mark).sum();
1569        let remote_allocs = self.cross_node_allocs.min(total_allocs);
1570        let local_allocs = total_allocs.saturating_sub(remote_allocs);
1571        let local_ratio_bps = Self::ratio_basis_points(local_allocs, total_allocs);
1572        let remote_ratio_bps = Self::ratio_basis_points(remote_allocs, total_allocs);
1573        let hugepage_backed_allocs = self.hugepage_backed_allocs.min(total_allocs);
1574        let hugepage_hit_rate_bps = Self::ratio_basis_points(hugepage_backed_allocs, total_allocs);
1575        let total_capacity_u64 = u64::try_from(total_capacity).unwrap_or(u64::MAX);
1576        let total_in_use_u64 = u64::try_from(total_in_use).unwrap_or(u64::MAX);
1577        let total_high_water_u64 = u64::try_from(total_high_water).unwrap_or(u64::MAX);
1578        let occupancy_pressure_bps = Self::ratio_basis_points(total_in_use_u64, total_capacity_u64);
1579        let cache_miss_pressure_bps =
1580            Self::ratio_basis_points(total_high_water_u64, total_capacity_u64);
1581        // Remote allocations are a practical proxy for TLB/cache pressure from cross-node traffic.
1582        let tlb_miss_pressure_bps = remote_ratio_bps;
1583        let cross_node_fallback_reason = if self.cross_node_allocs > 0 {
1584            Some(CrossNodeReason::LocalExhausted.as_code())
1585        } else {
1586            None
1587        };
1588        serde_json::json!({
1589            "node_count": self.per_node.len(),
1590            "total_allocs": total_allocs,
1591            "total_frees": total_frees,
1592            "total_in_use": total_in_use,
1593            "cross_node_allocs": self.cross_node_allocs,
1594            "hugepage_backed_allocs": hugepage_backed_allocs,
1595            "local_allocs": local_allocs,
1596            "remote_allocs": remote_allocs,
1597            "allocation_ratio_bps": {
1598                "scale": Self::RATIO_SCALE_BPS,
1599                "local": local_ratio_bps,
1600                "remote": remote_ratio_bps,
1601            },
1602            "hugepage_hit_rate_bps": {
1603                "scale": Self::RATIO_SCALE_BPS,
1604                "value": hugepage_hit_rate_bps,
1605            },
1606            "latency_proxies_bps": {
1607                "scale": Self::RATIO_SCALE_BPS,
1608                "tlb_miss_pressure": tlb_miss_pressure_bps,
1609                "cache_miss_pressure": cache_miss_pressure_bps,
1610                "occupancy_pressure": occupancy_pressure_bps,
1611            },
1612            "pressure_bands": {
1613                "tlb_miss": Self::pressure_band(tlb_miss_pressure_bps),
1614                "cache_miss": Self::pressure_band(cache_miss_pressure_bps),
1615                "occupancy": Self::pressure_band(occupancy_pressure_bps),
1616            },
1617            "fallback_reasons": {
1618                "cross_node": cross_node_fallback_reason,
1619                "hugepage": self.hugepage_status.fallback_reason.map(HugepageFallbackReason::as_code),
1620            },
1621            "config": {
1622                "slab_capacity": self.config.slab_capacity,
1623                "entry_size_bytes": self.config.entry_size_bytes,
1624                "hugepage_enabled": self.config.hugepage.enabled,
1625                "hugepage_page_size_bytes": self.config.hugepage.page_size_bytes,
1626            },
1627            "hugepage": self.hugepage_status.as_json(),
1628            "per_node": self.per_node.iter().map(|n| serde_json::json!({
1629                "node_id": n.node_id,
1630                "capacity": n.capacity,
1631                "in_use": n.in_use,
1632                "total_allocs": n.total_allocs,
1633                "total_frees": n.total_frees,
1634                "high_water_mark": n.high_water_mark,
1635            })).collect::<Vec<_>>(),
1636        })
1637    }
1638}
1639
1640// ============================================================================
1641// Thread affinity advisory
1642// ============================================================================
1643
1644/// Enforcement level for thread-to-core affinity.
1645#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1646pub enum AffinityEnforcement {
1647    /// Affinity is advisory only; scheduler may override.
1648    Advisory,
1649    /// Strict enforcement requested (requires OS support).
1650    Strict,
1651    /// Affinity enforcement is disabled.
1652    Disabled,
1653}
1654
1655impl AffinityEnforcement {
1656    #[must_use]
1657    pub const fn as_code(self) -> &'static str {
1658        match self {
1659            Self::Advisory => "advisory",
1660            Self::Strict => "strict",
1661            Self::Disabled => "disabled",
1662        }
1663    }
1664}
1665
1666/// Advisory thread-to-core binding produced from placement manifest.
1667#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1668pub struct ThreadAffinityAdvice {
1669    pub shard_id: usize,
1670    pub recommended_core: usize,
1671    pub recommended_numa_node: usize,
1672    pub enforcement: AffinityEnforcement,
1673}
1674
1675impl ThreadAffinityAdvice {
1676    /// Render as JSON for diagnostics.
1677    #[must_use]
1678    pub fn as_json(&self) -> serde_json::Value {
1679        serde_json::json!({
1680            "shard_id": self.shard_id,
1681            "recommended_core": self.recommended_core,
1682            "recommended_numa_node": self.recommended_numa_node,
1683            "enforcement": self.enforcement.as_code(),
1684        })
1685    }
1686}
1687
1688impl ReactorPlacementManifest {
1689    /// Generate affinity advice for all shards in this manifest.
1690    #[must_use]
1691    pub fn affinity_advice(&self, enforcement: AffinityEnforcement) -> Vec<ThreadAffinityAdvice> {
1692        self.bindings
1693            .iter()
1694            .map(|binding| ThreadAffinityAdvice {
1695                shard_id: binding.shard_id,
1696                recommended_core: binding.core_id,
1697                recommended_numa_node: binding.numa_node,
1698                enforcement,
1699            })
1700            .collect()
1701    }
1702
1703    /// Look up the NUMA node for a specific shard.
1704    #[must_use]
1705    pub fn numa_node_for_shard(&self, shard_id: usize) -> Option<usize> {
1706        self.bindings
1707            .iter()
1708            .find(|b| b.shard_id == shard_id)
1709            .map(|b| b.numa_node)
1710    }
1711}
1712
1713// ============================================================================
1714// ReactorMesh ↔ NUMA slab integration
1715// ============================================================================
1716
1717impl ReactorMesh {
1718    /// Look up the preferred NUMA node for a shard via the placement manifest.
1719    #[must_use]
1720    pub fn preferred_numa_node(&self, shard_id: usize) -> usize {
1721        self.placement_manifest
1722            .numa_node_for_shard(shard_id)
1723            .unwrap_or(0)
1724    }
1725
1726    /// Generate thread affinity advice from the mesh's placement manifest.
1727    #[must_use]
1728    pub fn affinity_advice(&self, enforcement: AffinityEnforcement) -> Vec<ThreadAffinityAdvice> {
1729        self.placement_manifest.affinity_advice(enforcement)
1730    }
1731}
1732
1733impl<C: Clock> fmt::Debug for Scheduler<C> {
1734    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1735        f.debug_struct("Scheduler")
1736            .field("seq", &self.seq)
1737            .field("macrotask_count", &self.macrotask_queue.len())
1738            .field("timer_count", &self.timer_heap.len())
1739            .field("next_timer_id", &self.next_timer_id)
1740            .field("cancelled_timers", &self.cancelled_timers.len())
1741            .finish_non_exhaustive()
1742    }
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747    use super::*;
1748
1749    #[test]
1750    fn seq_ordering() {
1751        let a = Seq::zero();
1752        let b = a.next();
1753        let c = b.next();
1754
1755        assert!(a < b);
1756        assert!(b < c);
1757        assert_eq!(a.value(), 0);
1758        assert_eq!(b.value(), 1);
1759        assert_eq!(c.value(), 2);
1760    }
1761
1762    #[test]
1763    fn seq_next_saturates_at_u64_max() {
1764        let max = Seq(u64::MAX);
1765        assert_eq!(max.next(), max);
1766    }
1767
1768    #[test]
1769    fn timer_ordering() {
1770        // Earlier deadline = higher priority (lower in min-heap)
1771        let t1 = TimerEntry::new(1, 100, Seq(0));
1772        let t2 = TimerEntry::new(2, 200, Seq(1));
1773
1774        assert!(t1 > t2); // Reversed for min-heap
1775
1776        // Same deadline, earlier seq = higher priority
1777        let t3 = TimerEntry::new(3, 100, Seq(5));
1778        let t4 = TimerEntry::new(4, 100, Seq(10));
1779
1780        assert!(t3 > t4); // Reversed for min-heap
1781    }
1782
1783    #[test]
1784    fn deterministic_clock() {
1785        let clock = DeterministicClock::new(1000);
1786        assert_eq!(clock.now_ms(), 1000);
1787
1788        clock.advance(500);
1789        assert_eq!(clock.now_ms(), 1500);
1790
1791        clock.set(2000);
1792        assert_eq!(clock.now_ms(), 2000);
1793    }
1794
1795    #[test]
1796    fn scheduler_basic_timer() {
1797        let clock = DeterministicClock::new(0);
1798        let mut sched = Scheduler::with_clock(clock);
1799
1800        // Set a timer for 100ms
1801        let timer_id = sched.set_timeout(100);
1802        assert_eq!(timer_id, 1);
1803        assert_eq!(sched.timer_count(), 1);
1804        assert!(!sched.macrotask_queue.is_empty() || sched.timer_count() > 0);
1805
1806        // Tick before deadline - nothing happens
1807        let task = sched.tick();
1808        assert!(task.is_none());
1809
1810        // Advance past deadline
1811        sched.clock.advance(150);
1812        let task = sched.tick();
1813        assert!(task.is_some());
1814        match task.unwrap().kind {
1815            MacrotaskKind::TimerFired { timer_id: id } => assert_eq!(id, timer_id),
1816            other => unreachable!("Expected TimerFired, got {other:?}"),
1817        }
1818    }
1819
1820    #[test]
1821    fn scheduler_timer_id_wraps_after_u64_max() {
1822        let clock = DeterministicClock::new(0);
1823        let mut sched = Scheduler::with_clock(clock);
1824        sched.next_timer_id = u64::MAX;
1825
1826        let first = sched.set_timeout(10);
1827        let second = sched.set_timeout(20);
1828
1829        assert_eq!(first, u64::MAX);
1830        assert_eq!(second, 1);
1831    }
1832
1833    #[test]
1834    fn scheduler_timer_id_wrap_preserves_cancellation_semantics() {
1835        let clock = DeterministicClock::new(0);
1836        let mut sched = Scheduler::with_clock(clock);
1837        sched.next_timer_id = u64::MAX;
1838
1839        let max_id = sched.set_timeout(10);
1840        let wrapped_id = sched.set_timeout(20);
1841
1842        assert_eq!(max_id, u64::MAX);
1843        assert_eq!(wrapped_id, 1);
1844        assert!(sched.clear_timeout(max_id));
1845        assert!(sched.clear_timeout(wrapped_id));
1846
1847        sched.clock.advance(25);
1848        assert!(sched.tick().is_none());
1849        assert!(sched.tick().is_none());
1850    }
1851
1852    #[test]
1853    fn scheduler_timer_ordering() {
1854        let clock = DeterministicClock::new(0);
1855        let mut sched = Scheduler::with_clock(clock);
1856
1857        // Set timers in reverse order
1858        let t3 = sched.set_timeout(300);
1859        let t1 = sched.set_timeout(100);
1860        let t2 = sched.set_timeout(200);
1861
1862        // Advance past all deadlines
1863        sched.clock.advance(400);
1864
1865        // Should fire in deadline order
1866        let task1 = sched.tick().unwrap();
1867        let task2 = sched.tick().unwrap();
1868        let task3 = sched.tick().unwrap();
1869
1870        match task1.kind {
1871            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t1),
1872            other => unreachable!("Expected t1, got {other:?}"),
1873        }
1874        match task2.kind {
1875            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1876            other => unreachable!("Expected t2, got {other:?}"),
1877        }
1878        match task3.kind {
1879            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t3),
1880            other => unreachable!("Expected t3, got {other:?}"),
1881        }
1882    }
1883
1884    #[test]
1885    fn scheduler_same_deadline_seq_ordering() {
1886        let clock = DeterministicClock::new(0);
1887        let mut sched = Scheduler::with_clock(clock);
1888
1889        // Set timers with same deadline - should fire in seq order
1890        let t1 = sched.set_timeout(100);
1891        let t2 = sched.set_timeout(100);
1892        let t3 = sched.set_timeout(100);
1893
1894        sched.clock.advance(150);
1895
1896        let task1 = sched.tick().unwrap();
1897        let task2 = sched.tick().unwrap();
1898        let task3 = sched.tick().unwrap();
1899
1900        // Must fire in order they were created (by seq)
1901        match task1.kind {
1902            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t1),
1903            other => unreachable!("Expected t1, got {other:?}"),
1904        }
1905        match task2.kind {
1906            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1907            other => unreachable!("Expected t2, got {other:?}"),
1908        }
1909        match task3.kind {
1910            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t3),
1911            other => unreachable!("Expected t3, got {other:?}"),
1912        }
1913    }
1914
1915    #[test]
1916    fn scheduler_cancel_timer() {
1917        let clock = DeterministicClock::new(0);
1918        let mut sched = Scheduler::with_clock(clock);
1919
1920        let t1 = sched.set_timeout(100);
1921        let t2 = sched.set_timeout(200);
1922
1923        // Cancel t1
1924        assert!(sched.clear_timeout(t1));
1925
1926        // Advance past both deadlines
1927        sched.clock.advance(250);
1928
1929        // Only t2 should fire
1930        let task = sched.tick().unwrap();
1931        match task.kind {
1932            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1933            other => unreachable!("Expected t2, got {other:?}"),
1934        }
1935
1936        // No more tasks
1937        assert!(sched.tick().is_none());
1938    }
1939
1940    #[test]
1941    fn scheduler_hostcall_completion() {
1942        let clock = DeterministicClock::new(0);
1943        let mut sched = Scheduler::with_clock(clock);
1944
1945        sched.enqueue_hostcall_complete(
1946            "call-1".to_string(),
1947            HostcallOutcome::Success(serde_json::json!({"result": 42})),
1948        );
1949
1950        let task = sched.tick().unwrap();
1951        match task.kind {
1952            MacrotaskKind::HostcallComplete { call_id, outcome } => {
1953                assert_eq!(call_id, "call-1");
1954                match outcome {
1955                    HostcallOutcome::Success(v) => assert_eq!(v["result"], 42),
1956                    other => unreachable!("Expected success, got {other:?}"),
1957                }
1958            }
1959            other => unreachable!("Expected HostcallComplete, got {other:?}"),
1960        }
1961    }
1962
1963    #[test]
1964    fn scheduler_stream_chunk_sequence_and_finality_invariants() {
1965        let clock = DeterministicClock::new(0);
1966        let mut sched = Scheduler::with_clock(clock);
1967
1968        sched.enqueue_stream_chunk(
1969            "call-stream".to_string(),
1970            0,
1971            serde_json::json!({ "part": "a" }),
1972            false,
1973        );
1974        sched.enqueue_stream_chunk(
1975            "call-stream".to_string(),
1976            1,
1977            serde_json::json!({ "part": "b" }),
1978            false,
1979        );
1980        sched.enqueue_stream_chunk(
1981            "call-stream".to_string(),
1982            2,
1983            serde_json::json!({ "part": "c" }),
1984            true,
1985        );
1986
1987        let mut seen = Vec::new();
1988        while let Some(task) = sched.tick() {
1989            let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
1990                unreachable!("expected hostcall completion task");
1991            };
1992            let HostcallOutcome::StreamChunk {
1993                sequence,
1994                chunk,
1995                is_final,
1996            } = outcome
1997            else {
1998                unreachable!("expected stream chunk outcome");
1999            };
2000            seen.push((call_id, sequence, chunk, is_final));
2001        }
2002
2003        assert_eq!(seen.len(), 3);
2004        assert!(
2005            seen.iter()
2006                .all(|(call_id, _, _, _)| call_id == "call-stream")
2007        );
2008        assert_eq!(seen[0].1, 0);
2009        assert_eq!(seen[1].1, 1);
2010        assert_eq!(seen[2].1, 2);
2011        assert_eq!(seen[0].2, serde_json::json!({ "part": "a" }));
2012        assert_eq!(seen[1].2, serde_json::json!({ "part": "b" }));
2013        assert_eq!(seen[2].2, serde_json::json!({ "part": "c" }));
2014
2015        let final_count = seen.iter().filter(|(_, _, _, is_final)| *is_final).count();
2016        assert_eq!(final_count, 1, "expected exactly one final chunk");
2017        assert!(seen[2].3, "final chunk must be last");
2018    }
2019
2020    #[test]
2021    fn scheduler_stream_chunks_multi_call_interleaving_is_deterministic() {
2022        let clock = DeterministicClock::new(0);
2023        let mut sched = Scheduler::with_clock(clock);
2024
2025        sched.enqueue_stream_chunk("call-a".to_string(), 0, serde_json::json!("a0"), false);
2026        sched.enqueue_stream_chunk("call-b".to_string(), 0, serde_json::json!("b0"), false);
2027        sched.enqueue_stream_chunk("call-a".to_string(), 1, serde_json::json!("a1"), true);
2028        sched.enqueue_stream_chunk("call-b".to_string(), 1, serde_json::json!("b1"), true);
2029
2030        let mut trace = Vec::new();
2031        while let Some(task) = sched.tick() {
2032            let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
2033                unreachable!("expected hostcall completion task");
2034            };
2035            let HostcallOutcome::StreamChunk {
2036                sequence, is_final, ..
2037            } = outcome
2038            else {
2039                unreachable!("expected stream chunk outcome");
2040            };
2041            trace.push((call_id, sequence, is_final));
2042        }
2043
2044        assert_eq!(
2045            trace,
2046            vec![
2047                ("call-a".to_string(), 0, false),
2048                ("call-b".to_string(), 0, false),
2049                ("call-a".to_string(), 1, true),
2050                ("call-b".to_string(), 1, true),
2051            ]
2052        );
2053    }
2054
2055    #[test]
2056    fn scheduler_event_ordering() {
2057        let clock = DeterministicClock::new(0);
2058        let mut sched = Scheduler::with_clock(clock);
2059
2060        // Enqueue events in order
2061        sched.enqueue_event("evt-1".to_string(), serde_json::json!({"n": 1}));
2062        sched.enqueue_event("evt-2".to_string(), serde_json::json!({"n": 2}));
2063
2064        // Should dequeue in FIFO order
2065        let task1 = sched.tick().unwrap();
2066        let task2 = sched.tick().unwrap();
2067
2068        match task1.kind {
2069            MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-1"),
2070            other => unreachable!("Expected evt-1, got {other:?}"),
2071        }
2072        match task2.kind {
2073            MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-2"),
2074            other => unreachable!("Expected evt-2, got {other:?}"),
2075        }
2076    }
2077
2078    #[test]
2079    fn scheduler_mixed_tasks_ordering() {
2080        let clock = DeterministicClock::new(0);
2081        let mut sched = Scheduler::with_clock(clock);
2082
2083        // Set a timer
2084        let _t1 = sched.set_timeout(50);
2085
2086        // Enqueue an event (gets earlier seq)
2087        sched.enqueue_event("evt-1".to_string(), serde_json::json!({}));
2088
2089        // Advance past timer
2090        sched.clock.advance(100);
2091
2092        // Event should come first (enqueued before timer moved to queue)
2093        let task1 = sched.tick().unwrap();
2094        match task1.kind {
2095            MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-1"),
2096            other => unreachable!("Expected event first, got {other:?}"),
2097        }
2098
2099        // Then timer
2100        let task2 = sched.tick().unwrap();
2101        match task2.kind {
2102            MacrotaskKind::TimerFired { .. } => {}
2103            other => unreachable!("Expected timer second, got {other:?}"),
2104        }
2105    }
2106
2107    #[test]
2108    fn scheduler_invariant_single_macrotask_per_tick() {
2109        let clock = DeterministicClock::new(0);
2110        let mut sched = Scheduler::with_clock(clock);
2111
2112        sched.enqueue_event("evt-1".to_string(), serde_json::json!({}));
2113        sched.enqueue_event("evt-2".to_string(), serde_json::json!({}));
2114        sched.enqueue_event("evt-3".to_string(), serde_json::json!({}));
2115
2116        // Each tick returns exactly one task (I1)
2117        assert!(sched.tick().is_some());
2118        assert_eq!(sched.macrotask_count(), 2);
2119
2120        assert!(sched.tick().is_some());
2121        assert_eq!(sched.macrotask_count(), 1);
2122
2123        assert!(sched.tick().is_some());
2124        assert_eq!(sched.macrotask_count(), 0);
2125
2126        assert!(sched.tick().is_none());
2127    }
2128
2129    #[test]
2130    fn scheduler_next_timer_deadline() {
2131        let clock = DeterministicClock::new(0);
2132        let mut sched = Scheduler::with_clock(clock);
2133
2134        assert!(sched.next_timer_deadline().is_none());
2135
2136        sched.set_timeout(200);
2137        sched.set_timeout(100);
2138        sched.set_timeout(300);
2139
2140        assert_eq!(sched.next_timer_deadline(), Some(100));
2141        assert_eq!(sched.time_until_next_timer(), Some(100));
2142
2143        sched.clock.advance(50);
2144        assert_eq!(sched.time_until_next_timer(), Some(50));
2145    }
2146
2147    #[test]
2148    fn scheduler_next_timer_skips_cancelled_timers() {
2149        let clock = DeterministicClock::new(0);
2150        let mut sched = Scheduler::with_clock(clock);
2151
2152        let t1 = sched.set_timeout(100);
2153        let _t2 = sched.set_timeout(200);
2154        let _t3 = sched.set_timeout(300);
2155
2156        assert!(sched.clear_timeout(t1));
2157        assert_eq!(sched.next_timer_deadline(), Some(200));
2158        assert_eq!(sched.time_until_next_timer(), Some(200));
2159    }
2160
2161    #[test]
2162    fn scheduler_debug_format() {
2163        let clock = DeterministicClock::new(0);
2164        let sched = Scheduler::with_clock(clock);
2165        let debug = format!("{sched:?}");
2166        assert!(debug.contains("Scheduler"));
2167        assert!(debug.contains("seq"));
2168    }
2169
2170    #[derive(Debug, Clone)]
2171    struct XorShift64 {
2172        state: u64,
2173    }
2174
2175    impl XorShift64 {
2176        const fn new(seed: u64) -> Self {
2177            // Avoid the all-zero state so the stream doesn't get stuck.
2178            let seed = seed ^ 0x9E37_79B9_7F4A_7C15;
2179            Self { state: seed }
2180        }
2181
2182        fn next_u64(&mut self) -> u64 {
2183            let mut x = self.state;
2184            x ^= x << 13;
2185            x ^= x >> 7;
2186            x ^= x << 17;
2187            self.state = x;
2188            x
2189        }
2190
2191        fn next_range_u64(&mut self, upper_exclusive: u64) -> u64 {
2192            if upper_exclusive == 0 {
2193                return 0;
2194            }
2195            self.next_u64() % upper_exclusive
2196        }
2197
2198        fn next_usize(&mut self, upper_exclusive: usize) -> usize {
2199            let upper = u64::try_from(upper_exclusive).expect("usize fits in u64");
2200            let value = self.next_range_u64(upper);
2201            usize::try_from(value).expect("value < upper_exclusive")
2202        }
2203    }
2204
2205    fn trace_entry(task: &Macrotask) -> String {
2206        match &task.kind {
2207            MacrotaskKind::TimerFired { timer_id } => {
2208                format!("seq={}:timer:{timer_id}", task.seq.value())
2209            }
2210            MacrotaskKind::HostcallComplete { call_id, outcome } => {
2211                let outcome_tag = match outcome {
2212                    HostcallOutcome::Success(_) => "ok",
2213                    HostcallOutcome::Error { .. } => "err",
2214                    HostcallOutcome::StreamChunk { is_final, .. } => {
2215                        if *is_final {
2216                            "stream_final"
2217                        } else {
2218                            "chunk"
2219                        }
2220                    }
2221                };
2222                format!("seq={}:hostcall:{call_id}:{outcome_tag}", task.seq.value())
2223            }
2224            MacrotaskKind::InboundEvent { event_id, payload } => {
2225                format!(
2226                    "seq={}:event:{event_id}:payload={payload}",
2227                    task.seq.value()
2228                )
2229            }
2230        }
2231    }
2232
2233    fn run_seeded_script(seed: u64) -> Vec<String> {
2234        let clock = DeterministicClock::new(0);
2235        let mut sched = Scheduler::with_clock(clock);
2236        let mut rng = XorShift64::new(seed);
2237        let mut timers = Vec::new();
2238        let mut trace = Vec::new();
2239
2240        for step in 0..256u64 {
2241            match rng.next_range_u64(6) {
2242                0 => {
2243                    let delay_ms = rng.next_range_u64(250);
2244                    let timer_id = sched.set_timeout(delay_ms);
2245                    timers.push(timer_id);
2246                }
2247                1 => {
2248                    if !timers.is_empty() {
2249                        let idx = rng.next_usize(timers.len());
2250                        let _cancelled = sched.clear_timeout(timers[idx]);
2251                    }
2252                }
2253                2 => {
2254                    let call_id = format!("call-{step}-{}", rng.next_u64());
2255                    let outcome = HostcallOutcome::Success(serde_json::json!({ "step": step }));
2256                    sched.enqueue_hostcall_complete(call_id, outcome);
2257                }
2258                3 => {
2259                    let event_id = format!("evt-{step}");
2260                    let payload = serde_json::json!({ "step": step, "entropy": rng.next_u64() });
2261                    sched.enqueue_event(event_id, payload);
2262                }
2263                4 => {
2264                    let delta_ms = rng.next_range_u64(50);
2265                    sched.clock.advance(delta_ms);
2266                }
2267                _ => {}
2268            }
2269
2270            if rng.next_range_u64(3) == 0 {
2271                if let Some(task) = sched.tick() {
2272                    trace.push(trace_entry(&task));
2273                }
2274            }
2275        }
2276
2277        // Drain remaining tasks and timers deterministically.
2278        for _ in 0..10_000 {
2279            if let Some(task) = sched.tick() {
2280                trace.push(trace_entry(&task));
2281                continue;
2282            }
2283
2284            let Some(next_deadline) = sched.next_timer_deadline() else {
2285                break;
2286            };
2287
2288            let now = sched.now_ms();
2289            assert!(
2290                next_deadline > now,
2291                "expected future timer deadline (deadline={next_deadline}, now={now})"
2292            );
2293            sched.clock.set(next_deadline);
2294        }
2295
2296        trace
2297    }
2298
2299    #[test]
2300    fn scheduler_seeded_trace_is_deterministic() {
2301        for seed in [0_u64, 1, 2, 3, 0xDEAD_BEEF] {
2302            let a = run_seeded_script(seed);
2303            let b = run_seeded_script(seed);
2304            assert_eq!(a, b, "trace mismatch for seed={seed}");
2305        }
2306    }
2307
2308    // ── Seq Display format ──────────────────────────────────────────
2309
2310    #[test]
2311    fn seq_display_format() {
2312        assert_eq!(format!("{}", Seq::zero()), "seq:0");
2313        assert_eq!(format!("{}", Seq::zero().next()), "seq:1");
2314    }
2315
2316    // ── has_pending / macrotask_count / timer_count ──────────────────
2317
2318    #[test]
2319    fn empty_scheduler_has_no_pending() {
2320        let sched = Scheduler::with_clock(DeterministicClock::new(0));
2321        assert!(!sched.has_pending());
2322        assert_eq!(sched.macrotask_count(), 0);
2323        assert_eq!(sched.timer_count(), 0);
2324    }
2325
2326    #[test]
2327    fn has_pending_with_timer_only() {
2328        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2329        sched.set_timeout(100);
2330        assert!(sched.has_pending());
2331        assert_eq!(sched.macrotask_count(), 0);
2332        assert_eq!(sched.timer_count(), 1);
2333    }
2334
2335    #[test]
2336    fn has_pending_with_macrotask_only() {
2337        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2338        sched.enqueue_event("e".to_string(), serde_json::json!({}));
2339        assert!(sched.has_pending());
2340        assert_eq!(sched.macrotask_count(), 1);
2341        assert_eq!(sched.timer_count(), 0);
2342    }
2343
2344    #[test]
2345    fn has_pending_ignores_cancelled_timers_without_macrotasks() {
2346        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2347        let timer = sched.set_timeout(10_000);
2348        assert!(sched.clear_timeout(timer));
2349        assert!(!sched.has_pending());
2350    }
2351
2352    // ── WallClock ────────────────────────────────────────────────────
2353
2354    #[test]
2355    fn wall_clock_returns_positive_ms() {
2356        let clock = WallClock;
2357        let now = clock.now_ms();
2358        assert!(now > 0, "WallClock should return a positive timestamp");
2359    }
2360
2361    // ── clear_timeout edge cases ─────────────────────────────────────
2362
2363    #[test]
2364    fn clear_timeout_nonexistent_returns_false() {
2365        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2366        assert!(!sched.clear_timeout(999));
2367        assert!(
2368            sched.cancelled_timers.is_empty(),
2369            "unknown timer ids should not pollute cancelled set"
2370        );
2371    }
2372
2373    #[test]
2374    fn clear_timeout_double_cancel_returns_false() {
2375        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2376        let t = sched.set_timeout(100);
2377        assert!(sched.clear_timeout(t));
2378        // Second cancel - already in set
2379        assert!(!sched.clear_timeout(t));
2380    }
2381
2382    // ── time_until_next_timer ────────────────────────────────────────
2383
2384    #[test]
2385    fn time_until_next_timer_none_when_empty() {
2386        let sched = Scheduler::with_clock(DeterministicClock::new(0));
2387        assert!(sched.time_until_next_timer().is_none());
2388    }
2389
2390    #[test]
2391    fn time_until_next_timer_saturates_at_zero() {
2392        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2393        sched.set_timeout(50);
2394        sched.clock.advance(100); // Past the deadline
2395        assert_eq!(sched.time_until_next_timer(), Some(0));
2396    }
2397
2398    // ── HostcallOutcome::Error path ──────────────────────────────────
2399
2400    #[test]
2401    fn hostcall_error_outcome() {
2402        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2403        sched.enqueue_hostcall_complete(
2404            "err-call".to_string(),
2405            HostcallOutcome::Error {
2406                code: "E_TIMEOUT".to_string(),
2407                message: "Request timed out".to_string(),
2408            },
2409        );
2410
2411        let task = sched.tick().unwrap();
2412        match task.kind {
2413            MacrotaskKind::HostcallComplete { call_id, outcome } => {
2414                assert_eq!(call_id, "err-call");
2415                match outcome {
2416                    HostcallOutcome::Error { code, message } => {
2417                        assert_eq!(code, "E_TIMEOUT");
2418                        assert_eq!(message, "Request timed out");
2419                    }
2420                    other => unreachable!("Expected error, got {other:?}"),
2421                }
2422            }
2423            other => unreachable!("Expected HostcallComplete, got {other:?}"),
2424        }
2425    }
2426
2427    // ── timer_count decreases after tick ─────────────────────────────
2428
2429    #[test]
2430    fn timer_count_decreases_after_fire() {
2431        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2432        sched.set_timeout(50);
2433        sched.set_timeout(100);
2434        assert_eq!(sched.timer_count(), 2);
2435
2436        sched.clock.advance(75);
2437        let _task = sched.tick(); // Fires first timer
2438        assert_eq!(sched.timer_count(), 1);
2439    }
2440
2441    // ── empty tick returns None ──────────────────────────────────────
2442
2443    #[test]
2444    fn empty_scheduler_tick_returns_none() {
2445        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2446        assert!(sched.tick().is_none());
2447    }
2448
2449    // ── default constructor ──────────────────────────────────────────
2450
2451    #[test]
2452    fn default_scheduler_starts_with_seq_zero() {
2453        let sched = Scheduler::new();
2454        assert_eq!(sched.current_seq(), Seq::zero());
2455    }
2456
2457    // ── Arc<Clock> impl ──────────────────────────────────────────────
2458
2459    #[test]
2460    fn arc_clock_delegation() {
2461        let clock = Arc::new(DeterministicClock::new(42));
2462        assert_eq!(Clock::now_ms(&clock), 42);
2463        clock.advance(10);
2464        assert_eq!(Clock::now_ms(&clock), 52);
2465    }
2466
2467    // ── TimerEntry equality ──────────────────────────────────────────
2468
2469    #[test]
2470    fn timer_entry_equality_ignores_timer_id() {
2471        let a = TimerEntry::new(1, 100, Seq(5));
2472        let b = TimerEntry::new(2, 100, Seq(5));
2473        // PartialEq compares (deadline_ms, seq), not timer_id
2474        assert_eq!(a, b);
2475    }
2476
2477    // ── Macrotask PartialEq uses seq only ────────────────────────────
2478
2479    #[test]
2480    fn macrotask_equality_uses_seq_only() {
2481        let a = Macrotask::new(Seq(1), MacrotaskKind::TimerFired { timer_id: 1 });
2482        let b = Macrotask::new(Seq(1), MacrotaskKind::TimerFired { timer_id: 2 });
2483        assert_eq!(a, b); // Same seq → equal
2484    }
2485
2486    // ── bd-2tl1.5: Streaming concurrency + determinism ──────────────
2487
2488    #[test]
2489    fn scheduler_ten_concurrent_streams_complete_independently() {
2490        let clock = DeterministicClock::new(0);
2491        let mut sched = Scheduler::with_clock(clock);
2492        let n_streams: usize = 10;
2493        let chunks_per_stream: usize = 5;
2494
2495        // Enqueue N streams with M chunks each, interleaved round-robin.
2496        for chunk_idx in 0..chunks_per_stream {
2497            for stream_idx in 0..n_streams {
2498                let is_final = chunk_idx == chunks_per_stream - 1;
2499                sched.enqueue_stream_chunk(
2500                    format!("stream-{stream_idx}"),
2501                    chunk_idx as u64,
2502                    serde_json::json!({ "s": stream_idx, "c": chunk_idx }),
2503                    is_final,
2504                );
2505            }
2506        }
2507
2508        let mut per_stream: std::collections::HashMap<String, Vec<(u64, bool)>> =
2509            std::collections::HashMap::new();
2510        while let Some(task) = sched.tick() {
2511            let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
2512                unreachable!("expected hostcall completion");
2513            };
2514            let HostcallOutcome::StreamChunk {
2515                sequence, is_final, ..
2516            } = outcome
2517            else {
2518                unreachable!("expected stream chunk");
2519            };
2520            per_stream
2521                .entry(call_id)
2522                .or_default()
2523                .push((sequence, is_final));
2524        }
2525
2526        assert_eq!(per_stream.len(), n_streams);
2527        for (call_id, chunks) in &per_stream {
2528            assert_eq!(
2529                chunks.len(),
2530                chunks_per_stream,
2531                "stream {call_id} incomplete"
2532            );
2533            // Sequences are monotonically increasing per stream.
2534            for (i, (seq, _)) in chunks.iter().enumerate() {
2535                assert_eq!(*seq, i as u64, "stream {call_id}: non-monotonic at {i}");
2536            }
2537            // Exactly one final chunk (the last).
2538            let final_count = chunks.iter().filter(|(_, f)| *f).count();
2539            assert_eq!(
2540                final_count, 1,
2541                "stream {call_id}: expected exactly one final"
2542            );
2543            assert!(
2544                chunks.last().unwrap().1,
2545                "stream {call_id}: final must be last"
2546            );
2547        }
2548    }
2549
2550    #[test]
2551    fn scheduler_mixed_stream_nonstream_ordering() {
2552        let clock = DeterministicClock::new(0);
2553        let mut sched = Scheduler::with_clock(clock);
2554
2555        // Enqueue: event, stream chunk, success, stream final, event.
2556        sched.enqueue_event("evt-1".to_string(), serde_json::json!({"n": 1}));
2557        sched.enqueue_stream_chunk("stream-x".to_string(), 0, serde_json::json!("data"), false);
2558        sched.enqueue_hostcall_complete(
2559            "call-y".to_string(),
2560            HostcallOutcome::Success(serde_json::json!({"ok": true})),
2561        );
2562        sched.enqueue_stream_chunk("stream-x".to_string(), 1, serde_json::json!("end"), true);
2563        sched.enqueue_event("evt-2".to_string(), serde_json::json!({"n": 2}));
2564
2565        let mut trace = Vec::new();
2566        while let Some(task) = sched.tick() {
2567            trace.push(trace_entry(&task));
2568        }
2569
2570        // FIFO ordering: all 5 items in enqueue order.
2571        assert_eq!(trace.len(), 5);
2572        assert!(trace[0].contains("event:evt-1"));
2573        assert!(trace[1].contains("stream-x") && trace[1].contains("chunk"));
2574        assert!(trace[2].contains("call-y") && trace[2].contains("ok"));
2575        assert!(trace[3].contains("stream-x") && trace[3].contains("stream_final"));
2576        assert!(trace[4].contains("event:evt-2"));
2577    }
2578
2579    #[test]
2580    fn scheduler_concurrent_streams_deterministic_across_runs() {
2581        fn run_ten_streams() -> Vec<String> {
2582            let clock = DeterministicClock::new(0);
2583            let mut sched = Scheduler::with_clock(clock);
2584
2585            for chunk in 0..3_u64 {
2586                for stream in 0..10 {
2587                    sched.enqueue_stream_chunk(
2588                        format!("s{stream}"),
2589                        chunk,
2590                        serde_json::json!(chunk),
2591                        chunk == 2,
2592                    );
2593                }
2594            }
2595
2596            let mut trace = Vec::new();
2597            while let Some(task) = sched.tick() {
2598                trace.push(trace_entry(&task));
2599            }
2600            trace
2601        }
2602
2603        let a = run_ten_streams();
2604        let b = run_ten_streams();
2605        assert_eq!(a, b, "10-stream trace must be deterministic");
2606        assert_eq!(a.len(), 30, "expected 10 streams x 3 chunks = 30 entries");
2607    }
2608
2609    #[test]
2610    fn scheduler_stream_interleaved_with_timers() {
2611        let clock = DeterministicClock::new(0);
2612        let mut sched = Scheduler::with_clock(clock);
2613
2614        // Set a timer for 100ms.
2615        let _t = sched.set_timeout(100);
2616
2617        // Enqueue first stream chunk.
2618        sched.enqueue_stream_chunk("s1".to_string(), 0, serde_json::json!("a"), false);
2619
2620        // Advance clock past timer deadline.
2621        sched.clock.advance(150);
2622
2623        // Enqueue final stream chunk after timer.
2624        sched.enqueue_stream_chunk("s1".to_string(), 1, serde_json::json!("b"), true); // seq=3
2625
2626        let mut trace = Vec::new();
2627        while let Some(task) = sched.tick() {
2628            trace.push(trace_entry(&task));
2629        }
2630
2631        // Pending macrotasks run first; due timers are enqueued after existing work.
2632        assert_eq!(trace.len(), 3);
2633        assert!(
2634            trace[0].contains("s1") && trace[0].contains("chunk"),
2635            "first: stream chunk 0, got: {}",
2636            trace[0]
2637        );
2638        assert!(
2639            trace[1].contains("s1") && trace[1].contains("stream_final"),
2640            "second: stream final, got: {}",
2641            trace[1]
2642        );
2643        assert!(
2644            trace[2].contains("timer"),
2645            "third: timer, got: {}",
2646            trace[2]
2647        );
2648    }
2649
2650    #[test]
2651    fn scheduler_due_timers_do_not_preempt_queued_macrotasks() {
2652        let clock = DeterministicClock::new(0);
2653        let mut sched = Scheduler::with_clock(clock);
2654
2655        // 1. Set a timer T1. Deadline = 100ms.
2656        let t1_id = sched.set_timeout(100);
2657
2658        // 2. Enqueue an event E1 before timer delivery.
2659        sched.enqueue_event("E1".to_string(), serde_json::json!({}));
2660
2661        // 3. Advance time so T1 is due.
2662        sched.clock.advance(100);
2663
2664        // 4. Tick 1: queued event executes first.
2665        let task1 = sched.tick().expect("Should have a task");
2666
2667        // 5. Tick 2: timer executes next.
2668        let task2 = sched.tick().expect("Should have a task");
2669
2670        let seq1 = task1.seq.value();
2671        let seq2 = task2.seq.value();
2672
2673        // Global macrotask seq is monotone for externally observed execution.
2674        assert!(
2675            seq1 < seq2,
2676            "Invariant I5 violation: Task execution not ordered by seq. Executed {seq1} then {seq2}"
2677        );
2678
2679        if let MacrotaskKind::InboundEvent { event_id, .. } = task1.kind {
2680            assert_eq!(event_id, "E1");
2681        } else {
2682            panic!("Expected InboundEvent first, got {:?}", task1.kind);
2683        }
2684
2685        if let MacrotaskKind::TimerFired { timer_id } = task2.kind {
2686            assert_eq!(timer_id, t1_id);
2687        } else {
2688            panic!("Expected TimerFired second, got {:?}", task2.kind);
2689        }
2690    }
2691
2692    #[test]
2693    fn reactor_mesh_hash_routing_is_stable_for_call_id() {
2694        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2695            shard_count: 8,
2696            lane_capacity: 64,
2697            topology: None,
2698        });
2699
2700        let first = mesh
2701            .enqueue_hostcall_complete(
2702                "call-affinity".to_string(),
2703                HostcallOutcome::Success(serde_json::json!({})),
2704            )
2705            .expect("first enqueue");
2706        let second = mesh
2707            .enqueue_hostcall_complete(
2708                "call-affinity".to_string(),
2709                HostcallOutcome::Success(serde_json::json!({})),
2710            )
2711            .expect("second enqueue");
2712
2713        assert_eq!(
2714            first.shard_id, second.shard_id,
2715            "call_id hash routing must preserve shard affinity"
2716        );
2717        assert_eq!(first.shard_seq + 1, second.shard_seq);
2718    }
2719
2720    #[test]
2721    fn reactor_mesh_round_robin_event_distribution_is_deterministic() {
2722        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2723            shard_count: 3,
2724            lane_capacity: 64,
2725            topology: None,
2726        });
2727
2728        let mut routed = Vec::new();
2729        for idx in 0..6 {
2730            let envelope = mesh
2731                .enqueue_event(format!("evt-{idx}"), serde_json::json!({"i": idx}))
2732                .expect("enqueue event");
2733            routed.push(envelope.shard_id);
2734        }
2735
2736        assert_eq!(routed, vec![0, 1, 2, 0, 1, 2]);
2737    }
2738
2739    #[test]
2740    fn reactor_mesh_drain_global_order_preserves_monotone_seq() {
2741        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2742            shard_count: 4,
2743            lane_capacity: 64,
2744            topology: None,
2745        });
2746
2747        let mut expected = Vec::new();
2748        expected.push(
2749            mesh.enqueue_event("evt-1".to_string(), serde_json::json!({"v": 1}))
2750                .expect("event 1")
2751                .global_seq
2752                .value(),
2753        );
2754        expected.push(
2755            mesh.enqueue_hostcall_complete(
2756                "call-a".to_string(),
2757                HostcallOutcome::Success(serde_json::json!({"ok": true})),
2758            )
2759            .expect("call-a")
2760            .global_seq
2761            .value(),
2762        );
2763        expected.push(
2764            mesh.enqueue_event("evt-2".to_string(), serde_json::json!({"v": 2}))
2765                .expect("event 2")
2766                .global_seq
2767                .value(),
2768        );
2769        expected.push(
2770            mesh.enqueue_hostcall_complete(
2771                "call-b".to_string(),
2772                HostcallOutcome::Error {
2773                    code: "E_TEST".to_string(),
2774                    message: "boom".to_string(),
2775                },
2776            )
2777            .expect("call-b")
2778            .global_seq
2779            .value(),
2780        );
2781
2782        let drained = mesh.drain_global_order(16);
2783        let observed = drained
2784            .iter()
2785            .map(|entry| entry.global_seq.value())
2786            .collect::<Vec<_>>();
2787        assert_eq!(observed, expected);
2788    }
2789
2790    #[test]
2791    fn reactor_mesh_backpressure_tracks_rejected_enqueues() {
2792        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2793            shard_count: 1,
2794            lane_capacity: 2,
2795            topology: None,
2796        });
2797
2798        mesh.enqueue_event("evt-0".to_string(), serde_json::json!({}))
2799            .expect("enqueue evt-0");
2800        mesh.enqueue_event("evt-1".to_string(), serde_json::json!({}))
2801            .expect("enqueue evt-1");
2802
2803        let err = mesh
2804            .enqueue_event("evt-overflow".to_string(), serde_json::json!({}))
2805            .expect_err("third enqueue should overflow");
2806        assert_eq!(err.shard_id, 0);
2807        assert_eq!(err.capacity, 2);
2808        assert_eq!(err.depth, 2);
2809
2810        let telemetry = mesh.telemetry();
2811        assert_eq!(telemetry.rejected_enqueues, 1);
2812        assert_eq!(telemetry.max_queue_depths, vec![2]);
2813        assert_eq!(telemetry.queue_depths, vec![2]);
2814    }
2815
2816    #[test]
2817    fn reactor_placement_manifest_is_deterministic_across_runs() {
2818        let topology =
2819            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
2820        let first = ReactorPlacementManifest::plan(8, Some(&topology));
2821        let second = ReactorPlacementManifest::plan(8, Some(&topology));
2822        assert_eq!(first, second);
2823        assert_eq!(first.fallback_reason, None);
2824    }
2825
2826    #[test]
2827    fn reactor_topology_snapshot_normalizes_unsorted_duplicate_pairs() {
2828        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[
2829            (7, 5),
2830            (2, 1),
2831            (4, 2),
2832            (2, 1),
2833            (1, 1),
2834            (4, 2),
2835        ]);
2836        let normalized = topology
2837            .cores
2838            .iter()
2839            .map(|core| (core.core_id, core.numa_node))
2840            .collect::<Vec<_>>();
2841        assert_eq!(normalized, vec![(1, 1), (2, 1), (4, 2), (7, 5)]);
2842    }
2843
2844    #[test]
2845    fn reactor_placement_manifest_non_contiguous_numa_ids_is_stable() {
2846        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[
2847            (11, 42),
2848            (7, 5),
2849            (9, 42),
2850            (3, 5),
2851            (11, 42),
2852            (3, 5),
2853        ]);
2854        let first = ReactorPlacementManifest::plan(8, Some(&topology));
2855        let second = ReactorPlacementManifest::plan(8, Some(&topology));
2856        assert_eq!(first, second);
2857        assert_eq!(first.numa_node_count, 2);
2858        assert_eq!(first.fallback_reason, None);
2859
2860        let observed_nodes = first
2861            .bindings
2862            .iter()
2863            .map(|binding| binding.numa_node)
2864            .collect::<Vec<_>>();
2865        assert_eq!(observed_nodes, vec![5, 42, 5, 42, 5, 42, 5, 42]);
2866
2867        let observed_cores = first
2868            .bindings
2869            .iter()
2870            .map(|binding| binding.core_id)
2871            .collect::<Vec<_>>();
2872        assert_eq!(observed_cores, vec![3, 9, 7, 11, 3, 9, 7, 11]);
2873    }
2874
2875    #[test]
2876    fn reactor_placement_manifest_spreads_across_numa_nodes_round_robin() {
2877        let topology =
2878            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
2879        let manifest = ReactorPlacementManifest::plan(6, Some(&topology));
2880        let observed_nodes = manifest
2881            .bindings
2882            .iter()
2883            .map(|binding| binding.numa_node)
2884            .collect::<Vec<_>>();
2885        assert_eq!(observed_nodes, vec![0, 1, 0, 1, 0, 1]);
2886
2887        let observed_cores = manifest
2888            .bindings
2889            .iter()
2890            .map(|binding| binding.core_id)
2891            .collect::<Vec<_>>();
2892        assert_eq!(observed_cores, vec![0, 4, 1, 5, 0, 4]);
2893    }
2894
2895    #[test]
2896    fn reactor_placement_manifest_records_fallback_when_topology_missing() {
2897        let manifest = ReactorPlacementManifest::plan(3, None);
2898        assert_eq!(
2899            manifest.fallback_reason,
2900            Some(ReactorPlacementFallbackReason::TopologyUnavailable)
2901        );
2902        assert_eq!(manifest.numa_node_count, 1);
2903        assert_eq!(manifest.bindings.len(), 3);
2904        assert_eq!(manifest.bindings[0].core_id, 0);
2905        assert_eq!(manifest.bindings[2].core_id, 2);
2906    }
2907
2908    #[test]
2909    fn reactor_mesh_exposes_machine_readable_placement_manifest() {
2910        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(2, 0), (3, 0)]);
2911        let mesh = ReactorMesh::new(ReactorMeshConfig {
2912            shard_count: 3,
2913            lane_capacity: 8,
2914            topology: Some(topology),
2915        });
2916        let manifest = mesh.placement_manifest();
2917        let as_json = manifest.as_json();
2918        assert_eq!(as_json["shard_count"], serde_json::json!(3));
2919        assert_eq!(as_json["numa_node_count"], serde_json::json!(1));
2920        assert_eq!(
2921            as_json["fallback_reason"],
2922            serde_json::json!(Some("single_numa_node"))
2923        );
2924        assert_eq!(
2925            as_json["bindings"].as_array().map(std::vec::Vec::len),
2926            Some(3),
2927            "expected per-shard binding rows"
2928        );
2929    }
2930
2931    #[test]
2932    fn reactor_mesh_telemetry_includes_binding_and_fallback_metadata() {
2933        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(10, 0), (11, 0)]);
2934        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2935            shard_count: 2,
2936            lane_capacity: 4,
2937            topology: Some(topology),
2938        });
2939        mesh.enqueue_event("evt-0".to_string(), serde_json::json!({}))
2940            .expect("enqueue event");
2941
2942        let telemetry = mesh.telemetry();
2943        assert_eq!(
2944            telemetry.fallback_reason,
2945            Some(ReactorPlacementFallbackReason::SingleNumaNode)
2946        );
2947        assert_eq!(telemetry.shard_bindings.len(), 2);
2948        let telemetry_json = telemetry.as_json();
2949        assert_eq!(
2950            telemetry_json["fallback_reason"],
2951            serde_json::json!(Some("single_numa_node"))
2952        );
2953        assert_eq!(
2954            telemetry_json["shard_bindings"]
2955                .as_array()
2956                .map(std::vec::Vec::len),
2957            Some(2)
2958        );
2959    }
2960
2961    // ====================================================================
2962    // NUMA slab allocator tests
2963    // ====================================================================
2964
2965    #[test]
2966    fn numa_slab_alloc_dealloc_round_trip() {
2967        let mut slab = NumaSlab::new(0, 4);
2968        let handle = slab.allocate().expect("should allocate");
2969        assert_eq!(handle.node_id, 0);
2970        assert_eq!(handle.generation, 1);
2971        assert!(slab.deallocate(&handle));
2972        assert_eq!(slab.in_use(), 0);
2973    }
2974
2975    #[test]
2976    fn numa_slab_exhaustion_returns_none() {
2977        let mut slab = NumaSlab::new(0, 2);
2978        let _a = slab.allocate().expect("first alloc");
2979        let _b = slab.allocate().expect("second alloc");
2980        assert!(slab.allocate().is_none(), "slab should be exhausted");
2981    }
2982
2983    #[test]
2984    fn numa_slab_generation_prevents_stale_dealloc() {
2985        let mut slab = NumaSlab::new(0, 2);
2986        let handle_v1 = slab.allocate().expect("first alloc");
2987        assert!(slab.deallocate(&handle_v1));
2988        let _handle_v2 = slab.allocate().expect("reuse slot");
2989        // The old handle has generation 1, the new allocation has generation 2.
2990        assert!(
2991            !slab.deallocate(&handle_v1),
2992            "stale generation should reject dealloc"
2993        );
2994    }
2995
2996    #[test]
2997    fn numa_slab_double_free_is_rejected() {
2998        let mut slab = NumaSlab::new(0, 4);
2999        let handle = slab.allocate().expect("alloc");
3000        assert!(slab.deallocate(&handle));
3001        assert!(!slab.deallocate(&handle), "double free must be rejected");
3002    }
3003
3004    #[test]
3005    fn numa_slab_wrong_node_dealloc_rejected() {
3006        let mut slab = NumaSlab::new(0, 4);
3007        let handle = slab.allocate().expect("alloc");
3008        let wrong_handle = NumaSlabHandle {
3009            node_id: 99,
3010            ..handle
3011        };
3012        assert!(
3013            !slab.deallocate(&wrong_handle),
3014            "wrong node_id should reject dealloc"
3015        );
3016    }
3017
3018    #[test]
3019    fn numa_slab_high_water_mark_tracks_peak() {
3020        let mut slab = NumaSlab::new(0, 8);
3021        let a = slab.allocate().expect("a");
3022        let b = slab.allocate().expect("b");
3023        let c = slab.allocate().expect("c");
3024        assert_eq!(slab.high_water_mark, 3);
3025        slab.deallocate(&a);
3026        slab.deallocate(&b);
3027        assert_eq!(
3028            slab.high_water_mark, 3,
3029            "high water mark should not decrease"
3030        );
3031        slab.deallocate(&c);
3032        let _d = slab.allocate().expect("d");
3033        assert_eq!(slab.high_water_mark, 3);
3034    }
3035
3036    #[test]
3037    fn numa_slab_pool_routes_to_local_node() {
3038        let topology =
3039            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
3040        let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3041        let config = NumaSlabConfig {
3042            slab_capacity: 8,
3043            entry_size_bytes: 256,
3044            hugepage: HugepageConfig {
3045                enabled: false,
3046                ..HugepageConfig::default()
3047            },
3048        };
3049        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3050        assert_eq!(pool.node_count(), 2);
3051
3052        let (handle, reason) = pool.allocate(1).expect("allocate on node 1");
3053        assert_eq!(handle.node_id, 1);
3054        assert!(reason.is_none(), "should be local allocation");
3055    }
3056
3057    #[test]
3058    fn numa_slab_pool_cross_node_fallback_tracks_telemetry() {
3059        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (2, 1)]);
3060        let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3061        let config = NumaSlabConfig {
3062            slab_capacity: 1, // Only 1 slot per node
3063            entry_size_bytes: 64,
3064            hugepage: HugepageConfig {
3065                enabled: false,
3066                ..HugepageConfig::default()
3067            },
3068        };
3069        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3070
3071        // Fill node 0's single slot.
3072        let (h0, _) = pool.allocate(0).expect("fill node 0");
3073        assert_eq!(h0.node_id, 0);
3074
3075        // Next alloc for node 0 should fall back to node 1.
3076        let (h1, reason) = pool.allocate(0).expect("fallback to node 1");
3077        assert_eq!(h1.node_id, 1);
3078        assert_eq!(reason, Some(CrossNodeReason::LocalExhausted));
3079
3080        let telemetry = pool.telemetry();
3081        assert_eq!(telemetry.cross_node_allocs, 1);
3082        let json = telemetry.as_json();
3083        assert_eq!(json["total_allocs"], serde_json::json!(2));
3084        assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(0));
3085        assert_eq!(json["local_allocs"], serde_json::json!(1));
3086        assert_eq!(json["remote_allocs"], serde_json::json!(1));
3087        assert_eq!(
3088            json["allocation_ratio_bps"]["local"],
3089            serde_json::json!(5000)
3090        );
3091        assert_eq!(
3092            json["allocation_ratio_bps"]["remote"],
3093            serde_json::json!(5000)
3094        );
3095        assert_eq!(
3096            json["allocation_ratio_bps"]["scale"],
3097            serde_json::json!(10_000)
3098        );
3099        assert_eq!(json["hugepage_hit_rate_bps"]["value"], serde_json::json!(0));
3100        assert_eq!(
3101            json["latency_proxies_bps"]["tlb_miss_pressure"],
3102            serde_json::json!(5000)
3103        );
3104        assert_eq!(
3105            json["latency_proxies_bps"]["cache_miss_pressure"],
3106            serde_json::json!(10_000)
3107        );
3108        assert_eq!(
3109            json["latency_proxies_bps"]["occupancy_pressure"],
3110            serde_json::json!(10_000)
3111        );
3112        assert_eq!(
3113            json["pressure_bands"]["tlb_miss"],
3114            serde_json::json!("medium")
3115        );
3116        assert_eq!(
3117            json["pressure_bands"]["cache_miss"],
3118            serde_json::json!("high")
3119        );
3120        assert_eq!(
3121            json["pressure_bands"]["occupancy"],
3122            serde_json::json!("high")
3123        );
3124        assert_eq!(
3125            json["fallback_reasons"]["cross_node"],
3126            serde_json::json!("local_exhausted")
3127        );
3128        assert_eq!(
3129            json["fallback_reasons"]["hugepage"],
3130            serde_json::json!("hugepage_disabled")
3131        );
3132    }
3133
3134    #[test]
3135    fn numa_slab_pool_total_exhaustion_returns_none() {
3136        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3137        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3138        let config = NumaSlabConfig {
3139            slab_capacity: 1,
3140            entry_size_bytes: 64,
3141            hugepage: HugepageConfig {
3142                enabled: false,
3143                ..HugepageConfig::default()
3144            },
3145        };
3146        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3147        let _ = pool.allocate(0).expect("fill the only slot");
3148        assert!(pool.allocate(0).is_none(), "pool should be exhausted");
3149    }
3150
3151    #[test]
3152    fn numa_slab_pool_deallocate_round_trip() {
3153        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1)]);
3154        let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3155        let config = NumaSlabConfig::default();
3156        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3157
3158        let (handle, _) = pool.allocate(1).expect("alloc");
3159        assert!(pool.deallocate(&handle));
3160        assert!(!pool.deallocate(&handle), "double free must be rejected");
3161    }
3162
3163    #[test]
3164    fn hugepage_status_disabled_reports_fallback() {
3165        let config = HugepageConfig {
3166            enabled: false,
3167            ..HugepageConfig::default()
3168        };
3169        let status = HugepageStatus::evaluate(&config, 1024, 512);
3170        assert!(!status.active);
3171        assert_eq!(
3172            status.fallback_reason,
3173            Some(HugepageFallbackReason::Disabled)
3174        );
3175    }
3176
3177    #[test]
3178    fn hugepage_status_zero_totals_means_unavailable() {
3179        let config = HugepageConfig::default();
3180        let status = HugepageStatus::evaluate(&config, 0, 0);
3181        assert!(!status.active);
3182        assert_eq!(
3183            status.fallback_reason,
3184            Some(HugepageFallbackReason::DetectionUnavailable)
3185        );
3186    }
3187
3188    #[test]
3189    fn hugepage_status_zero_free_means_insufficient() {
3190        let config = HugepageConfig::default();
3191        let status = HugepageStatus::evaluate(&config, 1024, 0);
3192        assert!(!status.active);
3193        assert_eq!(
3194            status.fallback_reason,
3195            Some(HugepageFallbackReason::InsufficientHugepages)
3196        );
3197    }
3198
3199    #[test]
3200    fn hugepage_status_available_is_active() {
3201        let config = HugepageConfig::default();
3202        let status = HugepageStatus::evaluate(&config, 1024, 512);
3203        assert!(status.active);
3204        assert!(status.fallback_reason.is_none());
3205        assert_eq!(status.free_pages, 512);
3206    }
3207
3208    #[test]
3209    fn numa_slab_pool_tracks_hugepage_hit_rate_when_active() {
3210        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3211        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3212        let config = NumaSlabConfig {
3213            slab_capacity: 4,
3214            entry_size_bytes: 1024,
3215            hugepage: HugepageConfig {
3216                page_size_bytes: 4096,
3217                enabled: true,
3218            },
3219        };
3220        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3221        pool.set_hugepage_status(HugepageStatus {
3222            total_pages: 128,
3223            free_pages: 64,
3224            page_size_bytes: 4096,
3225            active: true,
3226            fallback_reason: None,
3227        });
3228
3229        let _ = pool.allocate(0).expect("first hugepage-backed alloc");
3230        let _ = pool.allocate(0).expect("second hugepage-backed alloc");
3231
3232        let telemetry = pool.telemetry();
3233        let json = telemetry.as_json();
3234        assert_eq!(json["total_allocs"], serde_json::json!(2));
3235        assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(2));
3236        assert_eq!(
3237            json["hugepage_hit_rate_bps"]["value"],
3238            serde_json::json!(10_000)
3239        );
3240        assert_eq!(
3241            json["hugepage_hit_rate_bps"]["scale"],
3242            serde_json::json!(10_000)
3243        );
3244        assert_eq!(json["hugepage"]["active"], serde_json::json!(true));
3245    }
3246
3247    #[test]
3248    fn numa_slab_pool_misaligned_hugepage_config_reports_alignment_mismatch() {
3249        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3250        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3251        let config = NumaSlabConfig {
3252            slab_capacity: 3,
3253            entry_size_bytes: 1024,
3254            hugepage: HugepageConfig {
3255                page_size_bytes: 2048,
3256                enabled: true,
3257            },
3258        };
3259
3260        let pool = NumaSlabPool::from_manifest(&manifest, config);
3261        let telemetry = pool.telemetry();
3262        assert!(!telemetry.hugepage_status.active);
3263        assert_eq!(
3264            telemetry.hugepage_status.fallback_reason,
3265            Some(HugepageFallbackReason::AlignmentMismatch)
3266        );
3267
3268        let json = telemetry.as_json();
3269        assert_eq!(
3270            json["hugepage"]["fallback_reason"],
3271            serde_json::json!("alignment_mismatch")
3272        );
3273        assert_eq!(
3274            json["fallback_reasons"]["hugepage"],
3275            serde_json::json!("alignment_mismatch")
3276        );
3277    }
3278
3279    #[test]
3280    fn numa_slab_pool_aligned_hugepage_config_defaults_to_detection_unavailable() {
3281        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3282        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3283        let config = NumaSlabConfig {
3284            slab_capacity: 4,
3285            entry_size_bytes: 1024,
3286            hugepage: HugepageConfig {
3287                page_size_bytes: 4096,
3288                enabled: true,
3289            },
3290        };
3291
3292        let pool = NumaSlabPool::from_manifest(&manifest, config);
3293        let telemetry = pool.telemetry();
3294        assert!(!telemetry.hugepage_status.active);
3295        assert_eq!(
3296            telemetry.hugepage_status.fallback_reason,
3297            Some(HugepageFallbackReason::DetectionUnavailable)
3298        );
3299    }
3300
3301    #[test]
3302    fn misaligned_hugepage_config_rejects_external_status_override() {
3303        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3304        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3305        let config = NumaSlabConfig {
3306            slab_capacity: 3,
3307            entry_size_bytes: 1024,
3308            hugepage: HugepageConfig {
3309                page_size_bytes: 2048,
3310                enabled: true,
3311            },
3312        };
3313        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3314
3315        let forced = HugepageStatus::evaluate(&config.hugepage, 256, 64);
3316        assert!(forced.active);
3317        assert!(forced.fallback_reason.is_none());
3318
3319        pool.set_hugepage_status(forced);
3320        let telemetry = pool.telemetry();
3321        assert!(!telemetry.hugepage_status.active);
3322        assert_eq!(
3323            telemetry.hugepage_status.fallback_reason,
3324            Some(HugepageFallbackReason::AlignmentMismatch)
3325        );
3326    }
3327
3328    #[test]
3329    fn disabled_hugepage_config_rejects_external_active_status_override() {
3330        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3331        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3332        let config = NumaSlabConfig {
3333            slab_capacity: 4,
3334            entry_size_bytes: 1024,
3335            hugepage: HugepageConfig {
3336                page_size_bytes: 4096,
3337                enabled: false,
3338            },
3339        };
3340        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3341
3342        let forced = HugepageStatus {
3343            total_pages: 512,
3344            free_pages: 256,
3345            page_size_bytes: 4096,
3346            active: true,
3347            fallback_reason: None,
3348        };
3349        pool.set_hugepage_status(forced);
3350
3351        let telemetry = pool.telemetry();
3352        assert!(!telemetry.hugepage_status.active);
3353        assert_eq!(
3354            telemetry.hugepage_status.fallback_reason,
3355            Some(HugepageFallbackReason::Disabled)
3356        );
3357        assert_eq!(telemetry.hugepage_status.total_pages, 512);
3358        assert_eq!(telemetry.hugepage_status.free_pages, 256);
3359
3360        let json = telemetry.as_json();
3361        assert_eq!(
3362            json["hugepage"]["fallback_reason"],
3363            serde_json::json!("hugepage_disabled")
3364        );
3365    }
3366
3367    #[test]
3368    fn disabled_hugepage_config_uses_disabled_reason_even_if_slab_is_misaligned() {
3369        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3370        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3371        let config = NumaSlabConfig {
3372            slab_capacity: 3,
3373            entry_size_bytes: 1024,
3374            hugepage: HugepageConfig {
3375                page_size_bytes: 2048,
3376                enabled: false,
3377            },
3378        };
3379
3380        let pool = NumaSlabPool::from_manifest(&manifest, config);
3381        let telemetry = pool.telemetry();
3382        assert!(!telemetry.hugepage_status.active);
3383        assert_eq!(
3384            telemetry.hugepage_status.fallback_reason,
3385            Some(HugepageFallbackReason::Disabled)
3386        );
3387    }
3388
3389    #[test]
3390    fn hugepage_alignment_rejects_zero_page_size_and_fails_closed() {
3391        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3392        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3393        let config = NumaSlabConfig {
3394            slab_capacity: 4,
3395            entry_size_bytes: 1024,
3396            hugepage: HugepageConfig {
3397                page_size_bytes: 0,
3398                enabled: true,
3399            },
3400        };
3401        assert!(!config.hugepage_alignment_ok());
3402
3403        let pool = NumaSlabPool::from_manifest(&manifest, config);
3404        let telemetry = pool.telemetry();
3405        assert!(!telemetry.hugepage_status.active);
3406        assert_eq!(
3407            telemetry.hugepage_status.fallback_reason,
3408            Some(HugepageFallbackReason::AlignmentMismatch)
3409        );
3410    }
3411
3412    #[test]
3413    fn hugepage_alignment_rejects_zero_footprint_and_fails_closed() {
3414        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3415        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3416        let config = NumaSlabConfig {
3417            slab_capacity: 0,
3418            entry_size_bytes: 1024,
3419            hugepage: HugepageConfig {
3420                page_size_bytes: 2048,
3421                enabled: true,
3422            },
3423        };
3424        assert_eq!(config.slab_footprint_bytes(), Some(0));
3425        assert!(!config.hugepage_alignment_ok());
3426
3427        let pool = NumaSlabPool::from_manifest(&manifest, config);
3428        let telemetry = pool.telemetry();
3429        assert!(!telemetry.hugepage_status.active);
3430        assert_eq!(
3431            telemetry.hugepage_status.fallback_reason,
3432            Some(HugepageFallbackReason::AlignmentMismatch)
3433        );
3434    }
3435
3436    #[test]
3437    fn hugepage_alignment_rejects_checked_mul_overflow_without_panicking() {
3438        let config = NumaSlabConfig {
3439            slab_capacity: usize::MAX,
3440            entry_size_bytes: 2,
3441            hugepage: HugepageConfig {
3442                page_size_bytes: 4096,
3443                enabled: true,
3444            },
3445        };
3446        assert!(config.slab_footprint_bytes().is_none());
3447        assert!(!config.hugepage_alignment_ok());
3448
3449        let status = config.alignment_mismatch_status();
3450        assert!(!status.active);
3451        assert_eq!(
3452            status.fallback_reason,
3453            Some(HugepageFallbackReason::AlignmentMismatch)
3454        );
3455    }
3456
3457    #[test]
3458    fn zero_page_size_config_rejects_external_status_override() {
3459        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3460        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3461        let config = NumaSlabConfig {
3462            slab_capacity: 4,
3463            entry_size_bytes: 1024,
3464            hugepage: HugepageConfig {
3465                page_size_bytes: 0,
3466                enabled: true,
3467            },
3468        };
3469        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3470
3471        let forced = HugepageStatus {
3472            total_pages: 128,
3473            free_pages: 64,
3474            page_size_bytes: 0,
3475            active: true,
3476            fallback_reason: None,
3477        };
3478        pool.set_hugepage_status(forced);
3479
3480        let telemetry = pool.telemetry();
3481        assert!(!telemetry.hugepage_status.active);
3482        assert_eq!(
3483            telemetry.hugepage_status.fallback_reason,
3484            Some(HugepageFallbackReason::AlignmentMismatch)
3485        );
3486    }
3487
3488    #[test]
3489    fn zero_footprint_config_rejects_external_status_override() {
3490        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3491        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3492        let config = NumaSlabConfig {
3493            slab_capacity: 0,
3494            entry_size_bytes: 1024,
3495            hugepage: HugepageConfig {
3496                page_size_bytes: 2048,
3497                enabled: true,
3498            },
3499        };
3500        assert_eq!(config.slab_footprint_bytes(), Some(0));
3501
3502        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3503        let forced = HugepageStatus {
3504            total_pages: 128,
3505            free_pages: 64,
3506            page_size_bytes: 2048,
3507            active: true,
3508            fallback_reason: None,
3509        };
3510        pool.set_hugepage_status(forced);
3511
3512        let telemetry = pool.telemetry();
3513        assert!(!telemetry.hugepage_status.active);
3514        assert_eq!(
3515            telemetry.hugepage_status.fallback_reason,
3516            Some(HugepageFallbackReason::AlignmentMismatch)
3517        );
3518        assert_eq!(telemetry.hugepage_status.total_pages, 0);
3519        assert_eq!(telemetry.hugepage_status.free_pages, 0);
3520    }
3521
3522    #[test]
3523    fn checked_mul_overflow_config_rejects_external_status_override() {
3524        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3525        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3526        let config = NumaSlabConfig {
3527            slab_capacity: 2,
3528            entry_size_bytes: usize::MAX,
3529            hugepage: HugepageConfig {
3530                page_size_bytes: 4096,
3531                enabled: true,
3532            },
3533        };
3534        assert!(config.slab_footprint_bytes().is_none());
3535
3536        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3537        let forced = HugepageStatus {
3538            total_pages: 512,
3539            free_pages: 256,
3540            page_size_bytes: 4096,
3541            active: true,
3542            fallback_reason: None,
3543        };
3544        pool.set_hugepage_status(forced);
3545
3546        let telemetry = pool.telemetry();
3547        assert!(!telemetry.hugepage_status.active);
3548        assert_eq!(
3549            telemetry.hugepage_status.fallback_reason,
3550            Some(HugepageFallbackReason::AlignmentMismatch)
3551        );
3552        assert_eq!(telemetry.hugepage_status.total_pages, 0);
3553        assert_eq!(telemetry.hugepage_status.free_pages, 0);
3554    }
3555
3556    #[test]
3557    fn hugepage_status_json_is_stable() {
3558        let config = HugepageConfig::default();
3559        let status = HugepageStatus::evaluate(&config, 1024, 128);
3560        let json = status.as_json();
3561        assert_eq!(json["total_pages"], serde_json::json!(1024));
3562        assert_eq!(json["free_pages"], serde_json::json!(128));
3563        assert_eq!(json["active"], serde_json::json!(true));
3564        assert!(json["fallback_reason"].is_null());
3565    }
3566
3567    #[test]
3568    fn numa_slab_telemetry_json_has_expected_shape() {
3569        let topology =
3570            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
3571        let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3572        let config = NumaSlabConfig {
3573            slab_capacity: 16,
3574            entry_size_bytes: 128,
3575            hugepage: HugepageConfig {
3576                enabled: false,
3577                ..HugepageConfig::default()
3578            },
3579        };
3580        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3581        let _ = pool.allocate(0);
3582        let _ = pool.allocate(1);
3583        let _ = pool.allocate(0);
3584
3585        let telemetry = pool.telemetry();
3586        let json = telemetry.as_json();
3587        assert_eq!(json["node_count"], serde_json::json!(2));
3588        assert_eq!(json["total_allocs"], serde_json::json!(3));
3589        assert_eq!(json["total_in_use"], serde_json::json!(3));
3590        assert_eq!(json["cross_node_allocs"], serde_json::json!(0));
3591        assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(0));
3592        assert_eq!(json["local_allocs"], serde_json::json!(3));
3593        assert_eq!(json["remote_allocs"], serde_json::json!(0));
3594        assert_eq!(
3595            json["allocation_ratio_bps"]["local"],
3596            serde_json::json!(10_000)
3597        );
3598        assert_eq!(json["allocation_ratio_bps"]["remote"], serde_json::json!(0));
3599        assert_eq!(
3600            json["allocation_ratio_bps"]["scale"],
3601            serde_json::json!(10_000)
3602        );
3603        assert_eq!(json["hugepage_hit_rate_bps"]["value"], serde_json::json!(0));
3604        assert_eq!(
3605            json["latency_proxies_bps"]["tlb_miss_pressure"],
3606            serde_json::json!(0)
3607        );
3608        assert_eq!(
3609            json["latency_proxies_bps"]["cache_miss_pressure"],
3610            serde_json::json!(937)
3611        );
3612        assert_eq!(
3613            json["latency_proxies_bps"]["occupancy_pressure"],
3614            serde_json::json!(937)
3615        );
3616        assert_eq!(
3617            json["latency_proxies_bps"]["scale"],
3618            serde_json::json!(10_000)
3619        );
3620        assert_eq!(json["pressure_bands"]["tlb_miss"], serde_json::json!("low"));
3621        assert_eq!(
3622            json["pressure_bands"]["cache_miss"],
3623            serde_json::json!("low")
3624        );
3625        assert_eq!(
3626            json["pressure_bands"]["occupancy"],
3627            serde_json::json!("low")
3628        );
3629        assert_eq!(
3630            json["fallback_reasons"]["cross_node"],
3631            serde_json::Value::Null
3632        );
3633        assert_eq!(
3634            json["fallback_reasons"]["hugepage"],
3635            serde_json::json!("hugepage_disabled")
3636        );
3637        assert_eq!(json["config"]["slab_capacity"], serde_json::json!(16));
3638        assert_eq!(json["per_node"].as_array().map(std::vec::Vec::len), Some(2));
3639    }
3640
3641    #[test]
3642    fn thread_affinity_advice_matches_placement_manifest() {
3643        let topology =
3644            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
3645        let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3646        let advice = manifest.affinity_advice(AffinityEnforcement::Advisory);
3647        assert_eq!(advice.len(), 4);
3648        assert_eq!(advice[0].shard_id, 0);
3649        assert_eq!(advice[0].recommended_core, 0);
3650        assert_eq!(advice[0].recommended_numa_node, 0);
3651        assert_eq!(advice[0].enforcement, AffinityEnforcement::Advisory);
3652        assert_eq!(advice[1].recommended_numa_node, 1);
3653        assert_eq!(advice[3].recommended_numa_node, 1);
3654    }
3655
3656    #[test]
3657    fn thread_affinity_advice_json_is_stable() {
3658        let advice = ThreadAffinityAdvice {
3659            shard_id: 0,
3660            recommended_core: 3,
3661            recommended_numa_node: 1,
3662            enforcement: AffinityEnforcement::Strict,
3663        };
3664        let json = advice.as_json();
3665        assert_eq!(json["shard_id"], serde_json::json!(0));
3666        assert_eq!(json["recommended_core"], serde_json::json!(3));
3667        assert_eq!(json["enforcement"], serde_json::json!("strict"));
3668    }
3669
3670    #[test]
3671    fn reactor_mesh_preferred_numa_node_uses_manifest() {
3672        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (4, 1), (8, 2)]);
3673        let mesh = ReactorMesh::new(ReactorMeshConfig {
3674            shard_count: 3,
3675            lane_capacity: 8,
3676            topology: Some(topology),
3677        });
3678        assert_eq!(mesh.preferred_numa_node(0), 0);
3679        assert_eq!(mesh.preferred_numa_node(1), 1);
3680        assert_eq!(mesh.preferred_numa_node(2), 2);
3681        assert_eq!(mesh.preferred_numa_node(99), 0); // fallback
3682    }
3683
3684    #[test]
3685    fn reactor_mesh_affinity_advice_covers_all_shards() {
3686        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1)]);
3687        let mesh = ReactorMesh::new(ReactorMeshConfig {
3688            shard_count: 2,
3689            lane_capacity: 8,
3690            topology: Some(topology),
3691        });
3692        let advice = mesh.affinity_advice(AffinityEnforcement::Disabled);
3693        assert_eq!(advice.len(), 2);
3694        assert_eq!(advice[0].enforcement, AffinityEnforcement::Disabled);
3695        assert_eq!(advice[1].enforcement, AffinityEnforcement::Disabled);
3696    }
3697
3698    #[test]
3699    fn numa_slab_pool_from_manifest_with_no_topology_creates_single_node() {
3700        let manifest = ReactorPlacementManifest::plan(4, None);
3701        let pool = NumaSlabPool::from_manifest(&manifest, NumaSlabConfig::default());
3702        assert_eq!(pool.node_count(), 1);
3703    }
3704
3705    #[test]
3706    fn numa_node_for_shard_returns_none_for_unknown() {
3707        let manifest = ReactorPlacementManifest::plan(2, None);
3708        assert!(manifest.numa_node_for_shard(0).is_some());
3709        assert!(manifest.numa_node_for_shard(99).is_none());
3710    }
3711
3712    #[test]
3713    fn numa_slab_capacity_clamp_to_at_least_one() {
3714        let slab = NumaSlab::new(0, 0);
3715        assert_eq!(slab.capacity, 1);
3716    }
3717
3718    #[test]
3719    fn cross_node_reason_code_matches() {
3720        assert_eq!(CrossNodeReason::LocalExhausted.as_code(), "local_exhausted");
3721    }
3722
3723    #[test]
3724    fn affinity_enforcement_code_coverage() {
3725        assert_eq!(AffinityEnforcement::Advisory.as_code(), "advisory");
3726        assert_eq!(AffinityEnforcement::Strict.as_code(), "strict");
3727        assert_eq!(AffinityEnforcement::Disabled.as_code(), "disabled");
3728    }
3729
3730    // ── Additional coverage for untested public APIs ──
3731
3732    #[test]
3733    fn enqueue_hostcall_completions_batch_preserves_order() {
3734        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
3735        let completions = vec![
3736            (
3737                "c-1".to_string(),
3738                HostcallOutcome::Success(serde_json::json!(1)),
3739            ),
3740            (
3741                "c-2".to_string(),
3742                HostcallOutcome::Success(serde_json::json!(2)),
3743            ),
3744            (
3745                "c-3".to_string(),
3746                HostcallOutcome::Success(serde_json::json!(3)),
3747            ),
3748        ];
3749        sched.enqueue_hostcall_completions(completions);
3750        assert_eq!(sched.macrotask_count(), 3);
3751
3752        // Verify FIFO order: c-1, c-2, c-3
3753        for expected in ["c-1", "c-2", "c-3"] {
3754            let task = sched.tick().expect("should have macrotask");
3755            match task.kind {
3756                MacrotaskKind::HostcallComplete { ref call_id, .. } => {
3757                    assert_eq!(call_id, expected);
3758                }
3759                _ => panic!("expected HostcallComplete"),
3760            }
3761        }
3762        assert!(sched.tick().is_none());
3763    }
3764
3765    #[test]
3766    fn time_until_next_timer_positive_case() {
3767        let mut sched = Scheduler::with_clock(DeterministicClock::new(100));
3768        sched.set_timeout(50); // deadline = 150
3769        assert_eq!(sched.time_until_next_timer(), Some(50));
3770
3771        sched.clock.advance(20); // now = 120, remaining = 30
3772        assert_eq!(sched.time_until_next_timer(), Some(30));
3773    }
3774
3775    #[test]
3776    fn deterministic_clock_set_overrides_current_time() {
3777        let clock = DeterministicClock::new(0);
3778        assert_eq!(clock.now_ms(), 0);
3779        clock.advance(50);
3780        assert_eq!(clock.now_ms(), 50);
3781        clock.set(1000);
3782        assert_eq!(clock.now_ms(), 1000);
3783        clock.advance(5);
3784        assert_eq!(clock.now_ms(), 1005);
3785    }
3786
3787    #[test]
3788    fn reactor_mesh_queue_depth_per_shard() {
3789        let config = ReactorMeshConfig {
3790            shard_count: 4,
3791            lane_capacity: 64,
3792            topology: None,
3793        };
3794        let mut mesh = ReactorMesh::new(config);
3795
3796        // All shards start empty
3797        for shard in 0..4 {
3798            assert_eq!(mesh.queue_depth(shard), Some(0));
3799        }
3800        // Out of range returns None
3801        assert_eq!(mesh.queue_depth(99), None);
3802
3803        // Enqueue events via round-robin (hits shards 0, 1, 2, 3 in order)
3804        for i in 0..4 {
3805            mesh.enqueue_event(format!("evt-{i}"), serde_json::json!(null))
3806                .expect("enqueue should succeed");
3807        }
3808        // Each shard should have exactly 1
3809        for shard in 0..4 {
3810            assert_eq!(mesh.queue_depth(shard), Some(1), "shard {shard} depth");
3811        }
3812    }
3813
3814    #[test]
3815    fn reactor_mesh_shard_count_and_total_depth() {
3816        let config = ReactorMeshConfig {
3817            shard_count: 3,
3818            lane_capacity: 16,
3819            topology: None,
3820        };
3821        let mut mesh = ReactorMesh::new(config);
3822        assert_eq!(mesh.shard_count(), 3);
3823        assert_eq!(mesh.total_depth(), 0);
3824        assert!(!mesh.has_pending());
3825
3826        mesh.enqueue_event("e1".to_string(), serde_json::json!(null))
3827            .unwrap();
3828        mesh.enqueue_event("e2".to_string(), serde_json::json!(null))
3829            .unwrap();
3830        assert_eq!(mesh.total_depth(), 2);
3831        assert!(mesh.has_pending());
3832    }
3833
3834    #[test]
3835    fn reactor_mesh_drain_shard_out_of_range_returns_empty() {
3836        let config = ReactorMeshConfig {
3837            shard_count: 2,
3838            lane_capacity: 16,
3839            topology: None,
3840        };
3841        let mut mesh = ReactorMesh::new(config);
3842        mesh.enqueue_event("e1".to_string(), serde_json::json!(null))
3843            .unwrap();
3844        let drained = mesh.drain_shard(99, 10);
3845        assert!(drained.is_empty());
3846    }
3847
3848    #[test]
3849    fn reactor_placement_manifest_zero_shards() {
3850        let manifest = ReactorPlacementManifest::plan(0, None);
3851        assert_eq!(manifest.shard_count, 0);
3852        assert!(manifest.bindings.is_empty());
3853        assert!(manifest.fallback_reason.is_none());
3854    }
3855
3856    #[test]
3857    fn reactor_placement_manifest_as_json_has_expected_fields() {
3858        let topology =
3859            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
3860        let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3861        let json = manifest.as_json();
3862
3863        assert_eq!(json["shard_count"], 4);
3864        assert_eq!(json["numa_node_count"], 2);
3865        assert!(json["fallback_reason"].is_null());
3866        let bindings = json["bindings"].as_array().expect("bindings array");
3867        assert_eq!(bindings.len(), 4);
3868        for binding in bindings {
3869            assert!(binding.get("shard_id").is_some());
3870            assert!(binding.get("core_id").is_some());
3871            assert!(binding.get("numa_node").is_some());
3872        }
3873    }
3874
3875    #[test]
3876    fn reactor_placement_manifest_single_node_fallback() {
3877        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0)]);
3878        let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3879        assert_eq!(
3880            manifest.fallback_reason,
3881            Some(ReactorPlacementFallbackReason::SingleNumaNode)
3882        );
3883    }
3884
3885    #[test]
3886    fn reactor_placement_manifest_empty_topology_fallback() {
3887        let topology = ReactorTopologySnapshot { cores: vec![] };
3888        let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3889        assert_eq!(
3890            manifest.fallback_reason,
3891            Some(ReactorPlacementFallbackReason::TopologyEmpty)
3892        );
3893    }
3894
3895    #[test]
3896    fn reactor_placement_fallback_reason_as_code_all_variants() {
3897        assert_eq!(
3898            ReactorPlacementFallbackReason::TopologyUnavailable.as_code(),
3899            "topology_unavailable"
3900        );
3901        assert_eq!(
3902            ReactorPlacementFallbackReason::TopologyEmpty.as_code(),
3903            "topology_empty"
3904        );
3905        assert_eq!(
3906            ReactorPlacementFallbackReason::SingleNumaNode.as_code(),
3907            "single_numa_node"
3908        );
3909    }
3910
3911    #[test]
3912    fn hugepage_fallback_reason_as_code_all_variants() {
3913        assert_eq!(
3914            HugepageFallbackReason::Disabled.as_code(),
3915            "hugepage_disabled"
3916        );
3917        assert_eq!(
3918            HugepageFallbackReason::DetectionUnavailable.as_code(),
3919            "detection_unavailable"
3920        );
3921        assert_eq!(
3922            HugepageFallbackReason::InsufficientHugepages.as_code(),
3923            "insufficient_hugepages"
3924        );
3925        assert_eq!(
3926            HugepageFallbackReason::AlignmentMismatch.as_code(),
3927            "alignment_mismatch"
3928        );
3929    }
3930
3931    #[test]
3932    fn numa_slab_pool_set_hugepage_status_and_node_count() {
3933        let manifest = ReactorPlacementManifest::plan(4, None);
3934        let config = NumaSlabConfig {
3935            slab_capacity: 4096,
3936            entry_size_bytes: 512,
3937            hugepage: HugepageConfig {
3938                page_size_bytes: 2 * 1024 * 1024,
3939                enabled: true,
3940            },
3941        };
3942        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3943        assert_eq!(pool.node_count(), 1);
3944
3945        let status = HugepageStatus::evaluate(&config.hugepage, 512, 256);
3946        assert!(status.active);
3947        pool.set_hugepage_status(status);
3948
3949        let telem = pool.telemetry();
3950        assert!(telem.hugepage_status.active);
3951        assert_eq!(telem.hugepage_status.free_pages, 256);
3952    }
3953
3954    #[test]
3955    fn numa_slab_pool_multi_node_node_count() {
3956        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1), (2, 2)]);
3957        let manifest = ReactorPlacementManifest::plan(3, Some(&topology));
3958        let pool = NumaSlabPool::from_manifest(&manifest, NumaSlabConfig::default());
3959        assert_eq!(pool.node_count(), 3);
3960    }
3961
3962    #[test]
3963    fn reactor_mesh_telemetry_as_json_has_expected_shape() {
3964        let config = ReactorMeshConfig {
3965            shard_count: 2,
3966            lane_capacity: 8,
3967            topology: None,
3968        };
3969        let mesh = ReactorMesh::new(config);
3970        let telem = mesh.telemetry();
3971        let json = telem.as_json();
3972
3973        let depths = json["queue_depths"].as_array().expect("queue_depths");
3974        assert_eq!(depths.len(), 2);
3975        assert_eq!(json["rejected_enqueues"], 0);
3976        let bindings = json["shard_bindings"].as_array().expect("shard_bindings");
3977        assert_eq!(bindings.len(), 2);
3978        assert!(json.get("fallback_reason").is_some());
3979    }
3980
3981    #[test]
3982    fn numa_slab_in_use_and_has_capacity() {
3983        let mut slab = NumaSlab::new(0, 3);
3984        assert_eq!(slab.in_use(), 0);
3985        assert!(slab.has_capacity());
3986
3987        let h1 = slab.allocate().expect("alloc 1");
3988        assert_eq!(slab.in_use(), 1);
3989        assert!(slab.has_capacity());
3990
3991        let h2 = slab.allocate().expect("alloc 2");
3992        let _h3 = slab.allocate().expect("alloc 3");
3993        assert_eq!(slab.in_use(), 3);
3994        assert!(!slab.has_capacity());
3995        assert!(slab.allocate().is_none());
3996
3997        slab.deallocate(&h1);
3998        assert_eq!(slab.in_use(), 2);
3999        assert!(slab.has_capacity());
4000
4001        slab.deallocate(&h2);
4002        assert_eq!(slab.in_use(), 1);
4003    }
4004
4005    #[test]
4006    fn scheduler_macrotask_count_tracks_queue_size() {
4007        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4008        assert_eq!(sched.macrotask_count(), 0);
4009
4010        sched.enqueue_event("e1".to_string(), serde_json::json!(null));
4011        sched.enqueue_event("e2".to_string(), serde_json::json!(null));
4012        assert_eq!(sched.macrotask_count(), 2);
4013
4014        sched.tick();
4015        assert_eq!(sched.macrotask_count(), 1);
4016
4017        sched.tick();
4018        assert_eq!(sched.macrotask_count(), 0);
4019    }
4020
4021    #[test]
4022    fn scheduler_timer_count_reflects_pending_timers() {
4023        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4024        assert_eq!(sched.timer_count(), 0);
4025
4026        sched.set_timeout(100);
4027        sched.set_timeout(200);
4028        assert_eq!(sched.timer_count(), 2);
4029
4030        // Advance past first timer and tick to move it
4031        sched.clock.advance(150);
4032        sched.tick();
4033        assert_eq!(sched.timer_count(), 1);
4034    }
4035
4036    #[test]
4037    fn scheduler_current_seq_advances_with_operations() {
4038        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4039        let initial = sched.current_seq();
4040        assert_eq!(initial.value(), 0);
4041
4042        sched.set_timeout(100); // uses one seq
4043        assert!(sched.current_seq().value() > initial.value());
4044
4045        let after_timer = sched.current_seq();
4046        sched.enqueue_event("evt".to_string(), serde_json::json!(null)); // uses another seq
4047        assert!(sched.current_seq().value() > after_timer.value());
4048    }
4049
4050    #[test]
4051    fn thread_affinity_advice_as_json_structure() {
4052        let advice = ThreadAffinityAdvice {
4053            shard_id: 2,
4054            recommended_core: 5,
4055            recommended_numa_node: 1,
4056            enforcement: AffinityEnforcement::Strict,
4057        };
4058        let json = advice.as_json();
4059        assert_eq!(json["shard_id"], 2);
4060        assert_eq!(json["recommended_core"], 5);
4061        assert_eq!(json["recommended_numa_node"], 1);
4062        assert_eq!(json["enforcement"], "strict");
4063    }
4064
4065    // ── Property tests ──
4066
4067    mod proptest_scheduler {
4068        use super::*;
4069        use proptest::prelude::*;
4070
4071        proptest! {
4072            #[test]
4073            fn seq_next_is_monotonic(start in 0..u64::MAX - 100) {
4074                let s = Seq(start);
4075                let n = s.next();
4076                assert!(n >= s, "Seq::next must be monotonically non-decreasing");
4077                assert!(
4078                    n.value() == start + 1 || start == u64::MAX,
4079                    "Seq::next must increment by 1 unless saturated"
4080                );
4081            }
4082
4083            #[test]
4084            fn seq_next_saturates(start in u64::MAX - 5..=u64::MAX) {
4085                let s = Seq(start);
4086                let n = s.next();
4087                // Verify next() does not panic and produces a valid value.
4088                let _ = n.value();
4089                assert!(n >= s, "must be monotonic even at saturation boundary");
4090            }
4091
4092            #[test]
4093            fn timer_entry_ordering_consistent_with_min_heap(
4094                id_a in 0..1000u64,
4095                id_b in 0..1000u64,
4096                deadline_a in 0..10000u64,
4097                deadline_b in 0..10000u64,
4098                seq_a in 0..1000u64,
4099                seq_b in 0..1000u64,
4100            ) {
4101                let ta = TimerEntry::new(id_a, deadline_a, Seq(seq_a));
4102                let tb = TimerEntry::new(id_b, deadline_b, Seq(seq_b));
4103                // Reversed ordering: earlier deadline = GREATER (for BinaryHeap min-heap)
4104                if deadline_a < deadline_b {
4105                    assert!(ta > tb, "earlier deadline must sort greater (min-heap)");
4106                } else if deadline_a > deadline_b {
4107                    assert!(ta < tb, "later deadline must sort less (min-heap)");
4108                } else if seq_a < seq_b {
4109                    assert!(ta > tb, "same deadline, earlier seq must sort greater");
4110                } else if seq_a > seq_b {
4111                    assert!(ta < tb, "same deadline, later seq must sort less");
4112                } else {
4113                    assert!(ta == tb, "same deadline+seq must be equal");
4114                }
4115            }
4116
4117            #[test]
4118            fn stable_hash_is_deterministic(input in "[a-z0-9_.-]{1,64}") {
4119                let h1 = ReactorMesh::stable_hash(&input);
4120                let h2 = ReactorMesh::stable_hash(&input);
4121                assert!(h1 == h2, "stable_hash must be deterministic");
4122            }
4123
4124            #[test]
4125            fn hash_route_returns_valid_shard(
4126                shard_count in 1..32usize,
4127                call_id in "[a-z0-9]{1,20}",
4128            ) {
4129                let config = ReactorMeshConfig {
4130                    shard_count,
4131                    lane_capacity: 16,
4132                    topology: None,
4133                };
4134                let mesh = ReactorMesh::new(config);
4135                let shard = mesh.hash_route(&call_id);
4136                assert!(
4137                    shard < mesh.shard_count(),
4138                    "hash_route returned {shard} >= shard_count {}",
4139                    mesh.shard_count(),
4140                );
4141            }
4142
4143            #[test]
4144            fn rr_route_returns_valid_shard(
4145                shard_count in 1..32usize,
4146                iterations in 1..100usize,
4147            ) {
4148                let config = ReactorMeshConfig {
4149                    shard_count,
4150                    lane_capacity: 16,
4151                    topology: None,
4152                };
4153                let mut mesh = ReactorMesh::new(config);
4154                for _ in 0..iterations {
4155                    let shard = mesh.rr_route();
4156                    assert!(
4157                        shard < mesh.shard_count(),
4158                        "rr_route returned {shard} >= shard_count {}",
4159                        mesh.shard_count(),
4160                    );
4161                }
4162            }
4163
4164            #[test]
4165            fn drain_global_order_is_sorted(
4166                shard_count in 1..8usize,
4167                lane_capacity in 2..16usize,
4168                enqueues in 1..30usize,
4169            ) {
4170                let config = ReactorMeshConfig {
4171                    shard_count,
4172                    lane_capacity,
4173                    topology: None,
4174                };
4175                let mut mesh = ReactorMesh::new(config);
4176                let mut success_count = 0usize;
4177                for i in 0..enqueues {
4178                    let call_id = format!("call_{i}");
4179                    let outcome = HostcallOutcome::Success(serde_json::Value::Null);
4180                    if mesh.enqueue_hostcall_complete(call_id, outcome).is_ok() {
4181                        success_count += 1;
4182                    }
4183                }
4184                let drained = mesh.drain_global_order(success_count);
4185                // Verify ascending global_seq
4186                for pair in drained.windows(2) {
4187                    assert!(
4188                        pair[0].global_seq < pair[1].global_seq,
4189                        "drain_global_order must emit ascending seq: {:?} vs {:?}",
4190                        pair[0].global_seq,
4191                        pair[1].global_seq,
4192                    );
4193                }
4194            }
4195
4196            #[test]
4197            fn mesh_total_depth_bounded_by_capacity(
4198                shard_count in 1..8usize,
4199                lane_capacity in 1..16usize,
4200                enqueues in 0..100usize,
4201            ) {
4202                let config = ReactorMeshConfig {
4203                    shard_count,
4204                    lane_capacity,
4205                    topology: None,
4206                };
4207                let mut mesh = ReactorMesh::new(config);
4208                for i in 0..enqueues {
4209                    let call_id = format!("call_{i}");
4210                    let outcome = HostcallOutcome::Success(serde_json::Value::Null);
4211                    let _ = mesh.enqueue_hostcall_complete(call_id, outcome);
4212                }
4213                let max_total = shard_count * lane_capacity;
4214                assert!(
4215                    mesh.total_depth() <= max_total,
4216                    "total_depth {} exceeds max possible {}",
4217                    mesh.total_depth(),
4218                    max_total,
4219                );
4220            }
4221
4222            #[test]
4223            fn scheduler_timer_cancel_idempotent(
4224                timer_count in 1..10usize,
4225                cancel_idx in 0..10usize,
4226            ) {
4227                let clock = DeterministicClock::new(0);
4228                let mut sched = Scheduler::with_clock(clock);
4229                let mut timer_ids = Vec::new();
4230                for i in 0..timer_count {
4231                    timer_ids.push(sched.set_timeout(u64::try_from(i + 1).unwrap() * 100));
4232                }
4233                if cancel_idx < timer_ids.len() {
4234                    let tid = timer_ids[cancel_idx];
4235                    let first = sched.clear_timeout(tid);
4236                    let second = sched.clear_timeout(tid);
4237                    assert!(first, "first cancel should succeed");
4238                    assert!(!second, "second cancel should return false");
4239                }
4240            }
4241        }
4242    }
4243}