Skip to main content

pi/
scheduler.rs

1//! Deterministic event loop scheduler for PiJS runtime.
2//!
3//! Implements the spec from EXTENSIONS.md §1A.4.5:
4//! - Queue model: microtasks (handled by JS engine), macrotasks, timers
5//! - Timer heap with stable ordering guarantees
6//! - Hostcall completion enqueue with stable tie-breaking
7//! - Single-threaded scheduler loop reproducible under fixed inputs
8//!
9//! # Invariants
10//!
11//! - **I1 (single macrotask):** at most one macrotask executes per tick
12//! - **I2 (microtask fixpoint):** after any macrotask, microtasks drain to empty
13//! - **I3 (stable timers):** timers with equal deadlines fire in increasing seq order
14//! - **I4 (no reentrancy):** hostcall completions enqueue macrotasks, never re-enter
15//! - **I5 (total order):** all observable scheduling is ordered by seq
16
17use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::collections::BinaryHeap;
20use std::collections::VecDeque;
21use std::fmt;
22use std::sync::Arc;
23
24/// Monotonically increasing sequence counter for deterministic ordering.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
26pub struct Seq(u64);
27
28impl Seq {
29    /// Create the initial sequence value.
30    #[must_use]
31    pub const fn zero() -> Self {
32        Self(0)
33    }
34
35    /// Get the next sequence value, incrementing the counter.
36    #[must_use]
37    pub const fn next(self) -> Self {
38        Self(self.0.saturating_add(1))
39    }
40
41    /// Get the raw value.
42    #[must_use]
43    pub const fn value(self) -> u64 {
44        self.0
45    }
46}
47
48impl fmt::Display for Seq {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        write!(f, "seq:{}", self.0)
51    }
52}
53
54/// A timer entry in the timer heap.
55#[derive(Debug, Clone)]
56pub struct TimerEntry {
57    /// Timer ID for cancellation.
58    pub timer_id: u64,
59    /// Absolute deadline in milliseconds.
60    pub deadline_ms: u64,
61    /// Sequence number for stable ordering.
62    pub seq: Seq,
63}
64
65impl TimerEntry {
66    /// Create a new timer entry.
67    #[must_use]
68    pub const fn new(timer_id: u64, deadline_ms: u64, seq: Seq) -> Self {
69        Self {
70            timer_id,
71            deadline_ms,
72            seq,
73        }
74    }
75}
76
77// Order by (deadline_ms, seq) ascending - min-heap needs reversed comparison.
78impl PartialEq for TimerEntry {
79    fn eq(&self, other: &Self) -> bool {
80        self.deadline_ms == other.deadline_ms && self.seq == other.seq
81    }
82}
83
84impl Eq for TimerEntry {}
85
86impl PartialOrd for TimerEntry {
87    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88        Some(self.cmp(other))
89    }
90}
91
92impl Ord for TimerEntry {
93    fn cmp(&self, other: &Self) -> Ordering {
94        // Reverse ordering for min-heap: smaller deadline/seq = higher priority
95        match other.deadline_ms.cmp(&self.deadline_ms) {
96            Ordering::Equal => other.seq.cmp(&self.seq),
97            ord => ord,
98        }
99    }
100}
101
102/// Type of macrotask in the queue.
103#[derive(Debug, Clone)]
104pub enum MacrotaskKind {
105    /// A timer fired.
106    TimerFired { timer_id: u64 },
107    /// A hostcall completed.
108    HostcallComplete {
109        call_id: String,
110        outcome: HostcallOutcome,
111    },
112    /// An inbound event from the host.
113    InboundEvent {
114        event_id: String,
115        payload: serde_json::Value,
116    },
117}
118
119/// Outcome of a hostcall.
120#[derive(Debug, Clone)]
121pub enum HostcallOutcome {
122    /// Successful result.
123    Success(serde_json::Value),
124    /// Error result.
125    Error { code: String, message: String },
126    /// Incremental stream chunk.
127    StreamChunk {
128        /// Monotonically increasing sequence number per call.
129        sequence: u64,
130        /// Arbitrary JSON payload for this chunk.
131        chunk: serde_json::Value,
132        /// `true` on the final chunk.
133        is_final: bool,
134    },
135}
136
137/// A macrotask in the queue.
138#[derive(Debug, Clone)]
139pub struct Macrotask {
140    /// Sequence number for deterministic ordering.
141    pub seq: Seq,
142    /// The task kind and payload.
143    pub kind: MacrotaskKind,
144}
145
146impl Macrotask {
147    /// Create a new macrotask.
148    #[must_use]
149    pub const fn new(seq: Seq, kind: MacrotaskKind) -> Self {
150        Self { seq, kind }
151    }
152}
153
154// Order by seq ascending.
155impl PartialEq for Macrotask {
156    fn eq(&self, other: &Self) -> bool {
157        self.seq == other.seq
158    }
159}
160
161impl Eq for Macrotask {}
162
163impl PartialOrd for Macrotask {
164    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
165        Some(self.cmp(other))
166    }
167}
168
169impl Ord for Macrotask {
170    fn cmp(&self, other: &Self) -> Ordering {
171        // VecDeque FIFO order - no reordering needed, but we use seq for verification
172        self.seq.cmp(&other.seq)
173    }
174}
175
176/// A monotonic clock source for the scheduler.
177pub trait Clock: Send + Sync {
178    /// Get the current time in milliseconds since epoch.
179    fn now_ms(&self) -> u64;
180}
181
182impl<C: Clock> Clock for Arc<C> {
183    fn now_ms(&self) -> u64 {
184        self.as_ref().now_ms()
185    }
186}
187
188/// Real wall clock implementation.
189#[derive(Debug, Clone, Copy, Default)]
190pub struct WallClock;
191
192impl Clock for WallClock {
193    fn now_ms(&self) -> u64 {
194        use std::time::{SystemTime, UNIX_EPOCH};
195        let millis = SystemTime::now()
196            .duration_since(UNIX_EPOCH)
197            .unwrap_or_default()
198            .as_millis();
199        u64::try_from(millis).unwrap_or(u64::MAX)
200    }
201}
202
203/// A deterministic clock for testing.
204#[derive(Debug)]
205pub struct DeterministicClock {
206    current_ms: std::sync::atomic::AtomicU64,
207}
208
209impl DeterministicClock {
210    /// Create a new deterministic clock starting at the given time.
211    #[must_use]
212    pub const fn new(start_ms: u64) -> Self {
213        Self {
214            current_ms: std::sync::atomic::AtomicU64::new(start_ms),
215        }
216    }
217
218    /// Advance the clock by the given duration.
219    pub fn advance(&self, ms: u64) {
220        self.current_ms
221            .fetch_add(ms, std::sync::atomic::Ordering::SeqCst);
222    }
223
224    /// Set the clock to a specific time.
225    pub fn set(&self, ms: u64) {
226        self.current_ms
227            .store(ms, std::sync::atomic::Ordering::SeqCst);
228    }
229}
230
231impl Clock for DeterministicClock {
232    fn now_ms(&self) -> u64 {
233        self.current_ms.load(std::sync::atomic::Ordering::SeqCst)
234    }
235}
236
237/// The deterministic event loop scheduler state.
238pub struct Scheduler<C: Clock = WallClock> {
239    /// Monotone sequence counter.
240    seq: Seq,
241    /// Macrotask queue (FIFO, ordered by seq).
242    macrotask_queue: VecDeque<Macrotask>,
243    /// Timer heap (min-heap by deadline_ms, seq).
244    timer_heap: BinaryHeap<TimerEntry>,
245    /// Next timer ID.
246    next_timer_id: u64,
247    /// Cancelled timer IDs.
248    cancelled_timers: std::collections::HashSet<u64>,
249    /// All timer IDs currently in the heap (active or cancelled).
250    heap_timer_ids: std::collections::HashSet<u64>,
251    /// Clock source.
252    clock: C,
253}
254
255impl Scheduler<WallClock> {
256    /// Create a new scheduler with the default wall clock.
257    #[must_use]
258    pub fn new() -> Self {
259        Self::with_clock(WallClock)
260    }
261}
262
263impl Default for Scheduler<WallClock> {
264    fn default() -> Self {
265        Self::new()
266    }
267}
268
269impl<C: Clock> Scheduler<C> {
270    /// Create a new scheduler with a custom clock.
271    #[must_use]
272    pub fn with_clock(clock: C) -> Self {
273        Self {
274            seq: Seq::zero(),
275            macrotask_queue: VecDeque::new(),
276            timer_heap: BinaryHeap::new(),
277            next_timer_id: 1,
278            cancelled_timers: std::collections::HashSet::new(),
279            heap_timer_ids: std::collections::HashSet::new(),
280            clock,
281        }
282    }
283
284    /// Get the current sequence number.
285    #[must_use]
286    pub const fn current_seq(&self) -> Seq {
287        self.seq
288    }
289
290    /// Get the next sequence number and increment the counter.
291    const fn next_seq(&mut self) -> Seq {
292        let current = self.seq;
293        self.seq = self.seq.next();
294        current
295    }
296
297    /// Get the current time from the clock.
298    #[must_use]
299    pub fn now_ms(&self) -> u64 {
300        self.clock.now_ms()
301    }
302
303    /// Check if there are pending tasks.
304    #[must_use]
305    pub fn has_pending(&self) -> bool {
306        !self.macrotask_queue.is_empty()
307            || self
308                .timer_heap
309                .iter()
310                .any(|entry| !self.cancelled_timers.contains(&entry.timer_id))
311    }
312
313    /// Get the number of pending macrotasks.
314    #[must_use]
315    pub fn macrotask_count(&self) -> usize {
316        self.macrotask_queue.len()
317    }
318
319    /// Get the number of pending timers.
320    #[must_use]
321    pub fn timer_count(&self) -> usize {
322        self.timer_heap
323            .iter()
324            .filter(|entry| !self.cancelled_timers.contains(&entry.timer_id))
325            .count()
326    }
327
328    /// Schedule a timer to fire at the given deadline.
329    ///
330    /// Returns the timer ID for cancellation.
331    pub fn set_timeout(&mut self, delay_ms: u64) -> u64 {
332        let timer_id = self.allocate_timer_id();
333        let deadline_ms = self.clock.now_ms().saturating_add(delay_ms);
334        let seq = self.next_seq();
335
336        self.timer_heap
337            .push(TimerEntry::new(timer_id, deadline_ms, seq));
338        self.heap_timer_ids.insert(timer_id);
339
340        tracing::trace!(
341            event = "scheduler.timer.set",
342            timer_id,
343            delay_ms,
344            deadline_ms,
345            %seq,
346            "Timer scheduled"
347        );
348
349        timer_id
350    }
351
352    fn timer_id_in_use(&self, timer_id: u64) -> bool {
353        self.heap_timer_ids.contains(&timer_id)
354    }
355
356    fn allocate_timer_id(&mut self) -> u64 {
357        let start = self.next_timer_id;
358        let mut candidate = start;
359
360        loop {
361            // Calculate the next ID to try after this one
362            self.next_timer_id = if candidate == u64::MAX {
363                1
364            } else {
365                candidate + 1
366            };
367
368            if !self.timer_id_in_use(candidate) {
369                return candidate;
370            }
371
372            candidate = self.next_timer_id;
373
374            // If we've looped all the way around back to where we started, we're exhausted.
375            if candidate == start {
376                break;
377            }
378        }
379
380        tracing::error!(
381            event = "scheduler.timer_id.exhausted",
382            "Timer ID namespace exhausted; falling back to u64::MAX reuse"
383        );
384        u64::MAX
385    }
386
387    /// Cancel a timer by ID.
388    ///
389    /// Returns true if the timer was found and cancelled.
390    pub fn clear_timeout(&mut self, timer_id: u64) -> bool {
391        let pending =
392            self.heap_timer_ids.contains(&timer_id) && !self.cancelled_timers.contains(&timer_id);
393
394        let cancelled = if pending {
395            self.cancelled_timers.insert(timer_id)
396        } else {
397            false
398        };
399
400        tracing::trace!(
401            event = "scheduler.timer.cancel",
402            timer_id,
403            cancelled,
404            "Timer cancelled"
405        );
406
407        cancelled
408    }
409
410    /// Enqueue a hostcall completion.
411    pub fn enqueue_hostcall_complete(&mut self, call_id: String, outcome: HostcallOutcome) {
412        let seq = self.next_seq();
413        tracing::trace!(
414            event = "scheduler.hostcall.enqueue",
415            call_id = %call_id,
416            %seq,
417            "Hostcall completion enqueued"
418        );
419        let task = Macrotask::new(seq, MacrotaskKind::HostcallComplete { call_id, outcome });
420        self.macrotask_queue.push_back(task);
421    }
422
423    /// Enqueue multiple hostcall completions in one scheduler mutation pass.
424    pub fn enqueue_hostcall_completions<I>(&mut self, completions: I)
425    where
426        I: IntoIterator<Item = (String, HostcallOutcome)>,
427    {
428        for (call_id, outcome) in completions {
429            self.enqueue_hostcall_complete(call_id, outcome);
430        }
431    }
432
433    /// Convenience: enqueue a stream chunk for a hostcall.
434    pub fn enqueue_stream_chunk(
435        &mut self,
436        call_id: String,
437        sequence: u64,
438        chunk: serde_json::Value,
439        is_final: bool,
440    ) {
441        self.enqueue_hostcall_complete(
442            call_id,
443            HostcallOutcome::StreamChunk {
444                sequence,
445                chunk,
446                is_final,
447            },
448        );
449    }
450
451    /// Enqueue an inbound event from the host.
452    pub fn enqueue_event(&mut self, event_id: String, payload: serde_json::Value) {
453        let seq = self.next_seq();
454        tracing::trace!(
455            event = "scheduler.event.enqueue",
456            event_id = %event_id,
457            %seq,
458            "Inbound event enqueued"
459        );
460        let task = Macrotask::new(seq, MacrotaskKind::InboundEvent { event_id, payload });
461        self.macrotask_queue.push_back(task);
462    }
463
464    /// Move due timers from the timer heap to the macrotask queue.
465    ///
466    /// This is step 2 of the tick() algorithm.
467    fn move_due_timers(&mut self) {
468        let now = self.clock.now_ms();
469
470        while let Some(entry) = self.timer_heap.peek() {
471            if entry.deadline_ms > now {
472                break;
473            }
474
475            let entry = self.timer_heap.pop().expect("peeked");
476            self.heap_timer_ids.remove(&entry.timer_id);
477
478            // Skip cancelled timers
479            if self.cancelled_timers.remove(&entry.timer_id) {
480                tracing::trace!(
481                    event = "scheduler.timer.skip_cancelled",
482                    timer_id = entry.timer_id,
483                    "Skipped cancelled timer"
484                );
485                continue;
486            }
487
488            // Preserve (deadline, timer-seq) order while assigning a fresh
489            // macrotask seq so queue ordering remains globally monotone.
490            let task_seq = self.next_seq();
491            let task = Macrotask::new(
492                task_seq,
493                MacrotaskKind::TimerFired {
494                    timer_id: entry.timer_id,
495                },
496            );
497            self.macrotask_queue.push_back(task);
498
499            tracing::trace!(
500                event = "scheduler.timer.fire",
501                timer_id = entry.timer_id,
502                deadline_ms = entry.deadline_ms,
503                now_ms = now,
504                timer_seq = %entry.seq,
505                macrotask_seq = %task_seq,
506                "Timer fired"
507            );
508        }
509    }
510
511    /// Execute one tick of the event loop.
512    ///
513    /// Algorithm (from spec):
514    /// 1. Ingest host completions (done externally via enqueue methods)
515    /// 2. Move due timers to macrotask queue
516    /// 3. Run one macrotask (if any)
517    /// 4. Drain microtasks (done externally by JS engine)
518    ///
519    /// Returns the macrotask that was executed, if any.
520    pub fn tick(&mut self) -> Option<Macrotask> {
521        // Step 2: Move due timers
522        self.move_due_timers();
523
524        // Step 3: Run one macrotask
525        let task = self.macrotask_queue.pop_front();
526
527        if let Some(ref task) = task {
528            tracing::debug!(
529                event = "scheduler.tick.execute",
530                seq = %task.seq,
531                kind = ?std::mem::discriminant(&task.kind),
532                "Executing macrotask"
533            );
534        } else {
535            tracing::trace!(event = "scheduler.tick.idle", "No macrotask to execute");
536        }
537
538        task
539    }
540
541    /// Get the deadline of the next timer, if any.
542    #[must_use]
543    pub fn next_timer_deadline(&self) -> Option<u64> {
544        self.timer_heap
545            .iter()
546            .filter(|entry| !self.cancelled_timers.contains(&entry.timer_id))
547            .map(|entry| entry.deadline_ms)
548            .min()
549    }
550
551    /// Get the time until the next timer fires, if any.
552    #[must_use]
553    pub fn time_until_next_timer(&self) -> Option<u64> {
554        self.next_timer_deadline()
555            .map(|deadline| deadline.saturating_sub(self.clock.now_ms()))
556    }
557}
558
559// ============================================================================
560// Core-pinned reactor mesh (URPC-style SPSC lanes)
561// ============================================================================
562
563/// Configuration for [`ReactorMesh`].
564#[derive(Debug, Clone)]
565pub struct ReactorMeshConfig {
566    /// Number of core-pinned shards (one SPSC lane per shard).
567    pub shard_count: usize,
568    /// Maximum queued envelopes per shard lane.
569    pub lane_capacity: usize,
570    /// Optional topology snapshot for deterministic shard placement planning.
571    pub topology: Option<ReactorTopologySnapshot>,
572}
573
574impl Default for ReactorMeshConfig {
575    fn default() -> Self {
576        Self {
577            shard_count: 4,
578            lane_capacity: 1024,
579            topology: None,
580        }
581    }
582}
583
584/// Core descriptor used by topology-aware shard placement.
585#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
586pub struct ReactorTopologyCore {
587    pub core_id: usize,
588    pub numa_node: usize,
589}
590
591/// Lightweight machine-provided topology snapshot.
592#[derive(Debug, Clone, PartialEq, Eq, Default)]
593pub struct ReactorTopologySnapshot {
594    pub cores: Vec<ReactorTopologyCore>,
595}
596
597impl ReactorTopologySnapshot {
598    /// Build a normalized topology snapshot from `(core_id, numa_node)` pairs.
599    #[must_use]
600    pub fn from_core_node_pairs(pairs: &[(usize, usize)]) -> Self {
601        let mut cores = pairs
602            .iter()
603            .map(|(core_id, numa_node)| ReactorTopologyCore {
604                core_id: *core_id,
605                numa_node: *numa_node,
606            })
607            .collect::<Vec<_>>();
608        cores.sort_unstable();
609        cores.dedup();
610        Self { cores }
611    }
612}
613
614/// Explicit fallback reason emitted by topology planner.
615#[derive(Debug, Clone, Copy, PartialEq, Eq)]
616pub enum ReactorPlacementFallbackReason {
617    /// No topology snapshot was available at planning time.
618    TopologyUnavailable,
619    /// A topology snapshot was provided but had no usable cores.
620    TopologyEmpty,
621    /// Topology is available but only one NUMA node exists.
622    SingleNumaNode,
623}
624
625impl ReactorPlacementFallbackReason {
626    #[must_use]
627    pub const fn as_code(self) -> &'static str {
628        match self {
629            Self::TopologyUnavailable => "topology_unavailable",
630            Self::TopologyEmpty => "topology_empty",
631            Self::SingleNumaNode => "single_numa_node",
632        }
633    }
634}
635
636/// Deterministic shard binding produced by placement planner.
637#[derive(Debug, Clone, Copy, PartialEq, Eq)]
638pub struct ReactorShardBinding {
639    pub shard_id: usize,
640    pub core_id: usize,
641    pub numa_node: usize,
642}
643
644/// Machine-readable shard placement manifest.
645#[derive(Debug, Clone, PartialEq, Eq)]
646pub struct ReactorPlacementManifest {
647    pub shard_count: usize,
648    pub numa_node_count: usize,
649    pub bindings: Vec<ReactorShardBinding>,
650    pub fallback_reason: Option<ReactorPlacementFallbackReason>,
651}
652
653impl ReactorPlacementManifest {
654    /// Plan deterministic shard placement from optional topology.
655    #[must_use]
656    pub fn plan(shard_count: usize, topology: Option<&ReactorTopologySnapshot>) -> Self {
657        if shard_count == 0 {
658            return Self {
659                shard_count: 0,
660                numa_node_count: 0,
661                bindings: Vec::new(),
662                fallback_reason: None,
663            };
664        }
665
666        let Some(topology) = topology else {
667            let bindings = (0..shard_count)
668                .map(|shard_id| ReactorShardBinding {
669                    shard_id,
670                    core_id: shard_id,
671                    numa_node: 0,
672                })
673                .collect::<Vec<_>>();
674            return Self {
675                shard_count,
676                numa_node_count: 1,
677                bindings,
678                fallback_reason: Some(ReactorPlacementFallbackReason::TopologyUnavailable),
679            };
680        };
681
682        if topology.cores.is_empty() {
683            let bindings = (0..shard_count)
684                .map(|shard_id| ReactorShardBinding {
685                    shard_id,
686                    core_id: shard_id,
687                    numa_node: 0,
688                })
689                .collect::<Vec<_>>();
690            return Self {
691                shard_count,
692                numa_node_count: 1,
693                bindings,
694                fallback_reason: Some(ReactorPlacementFallbackReason::TopologyEmpty),
695            };
696        }
697
698        let mut by_node = BTreeMap::<usize, Vec<usize>>::new();
699        for core in &topology.cores {
700            by_node
701                .entry(core.numa_node)
702                .or_default()
703                .push(core.core_id);
704        }
705        for cores in by_node.values_mut() {
706            cores.sort_unstable();
707            cores.dedup();
708        }
709        let nodes = by_node
710            .into_iter()
711            .filter(|(_, cores)| !cores.is_empty())
712            .collect::<Vec<_>>();
713
714        if nodes.is_empty() {
715            let bindings = (0..shard_count)
716                .map(|shard_id| ReactorShardBinding {
717                    shard_id,
718                    core_id: shard_id,
719                    numa_node: 0,
720                })
721                .collect::<Vec<_>>();
722            return Self {
723                shard_count,
724                numa_node_count: 1,
725                bindings,
726                fallback_reason: Some(ReactorPlacementFallbackReason::TopologyEmpty),
727            };
728        }
729
730        let node_count = nodes.len();
731        let fallback_reason = if node_count == 1 {
732            Some(ReactorPlacementFallbackReason::SingleNumaNode)
733        } else {
734            None
735        };
736
737        let mut bindings = Vec::with_capacity(shard_count);
738        for shard_id in 0..shard_count {
739            let node_idx = shard_id % node_count;
740            let (numa_node, cores) = &nodes[node_idx];
741            let core_idx = (shard_id / node_count) % cores.len();
742            bindings.push(ReactorShardBinding {
743                shard_id,
744                core_id: cores[core_idx],
745                numa_node: *numa_node,
746            });
747        }
748
749        Self {
750            shard_count,
751            numa_node_count: node_count,
752            bindings,
753            fallback_reason,
754        }
755    }
756
757    /// Render placement manifest as stable machine-readable JSON.
758    #[must_use]
759    pub fn as_json(&self) -> serde_json::Value {
760        serde_json::json!({
761            "shard_count": self.shard_count,
762            "numa_node_count": self.numa_node_count,
763            "fallback_reason": self.fallback_reason.map(ReactorPlacementFallbackReason::as_code),
764            "bindings": self.bindings.iter().map(|binding| {
765                serde_json::json!({
766                    "shard_id": binding.shard_id,
767                    "core_id": binding.core_id,
768                    "numa_node": binding.numa_node
769                })
770            }).collect::<Vec<_>>()
771        })
772    }
773}
774
775/// Per-envelope metadata produced by [`ReactorMesh`].
776#[derive(Debug, Clone)]
777pub struct ReactorEnvelope {
778    /// Global sequence for deterministic cross-shard ordering.
779    pub global_seq: Seq,
780    /// Monotone sequence scoped to the destination shard.
781    pub shard_seq: u64,
782    /// Destination shard that owns this envelope.
783    pub shard_id: usize,
784    /// Payload to execute on the shard.
785    pub task: MacrotaskKind,
786}
787
788impl ReactorEnvelope {
789    const fn new(global_seq: Seq, shard_seq: u64, shard_id: usize, task: MacrotaskKind) -> Self {
790        Self {
791            global_seq,
792            shard_seq,
793            shard_id,
794            task,
795        }
796    }
797}
798
799/// Backpressure signal for rejected mesh enqueue operations.
800#[derive(Debug, Clone, Copy, PartialEq, Eq)]
801pub struct ReactorBackpressure {
802    pub shard_id: usize,
803    pub depth: usize,
804    pub capacity: usize,
805}
806
807/// Lightweight telemetry snapshot for mesh queueing behavior.
808#[derive(Debug, Clone, PartialEq, Eq)]
809pub struct ReactorMeshTelemetry {
810    pub queue_depths: Vec<usize>,
811    pub max_queue_depths: Vec<usize>,
812    pub rejected_enqueues: u64,
813    pub shard_bindings: Vec<ReactorShardBinding>,
814    pub fallback_reason: Option<ReactorPlacementFallbackReason>,
815}
816
817impl ReactorMeshTelemetry {
818    /// Render telemetry as machine-readable JSON for diagnostics.
819    #[must_use]
820    pub fn as_json(&self) -> serde_json::Value {
821        serde_json::json!({
822            "queue_depths": self.queue_depths,
823            "max_queue_depths": self.max_queue_depths,
824            "rejected_enqueues": self.rejected_enqueues,
825            "fallback_reason": self.fallback_reason.map(ReactorPlacementFallbackReason::as_code),
826            "shard_bindings": self.shard_bindings.iter().map(|binding| {
827                serde_json::json!({
828                    "shard_id": binding.shard_id,
829                    "core_id": binding.core_id,
830                    "numa_node": binding.numa_node,
831                })
832            }).collect::<Vec<_>>()
833        })
834    }
835}
836
837/// Deterministic SPSC-style lane.
838///
839/// This models the semantics of a bounded SPSC ring without unsafe code.
840#[derive(Debug, Clone)]
841struct SpscLane<T> {
842    capacity: usize,
843    queue: VecDeque<T>,
844    max_depth: usize,
845}
846
847impl<T> SpscLane<T> {
848    fn new(capacity: usize) -> Self {
849        Self {
850            capacity,
851            queue: VecDeque::with_capacity(capacity),
852            max_depth: 0,
853        }
854    }
855
856    fn len(&self) -> usize {
857        self.queue.len()
858    }
859
860    fn is_empty(&self) -> bool {
861        self.queue.is_empty()
862    }
863
864    fn push(&mut self, value: T) -> Result<(), usize> {
865        if self.queue.len() >= self.capacity {
866            return Err(self.queue.len());
867        }
868        self.queue.push_back(value);
869        self.max_depth = self.max_depth.max(self.queue.len());
870        Ok(())
871    }
872
873    fn pop(&mut self) -> Option<T> {
874        self.queue.pop_front()
875    }
876
877    fn front(&self) -> Option<&T> {
878        self.queue.front()
879    }
880}
881
882/// Deterministic multi-shard reactor mesh using bounded per-shard SPSC lanes.
883///
884/// Routing policy:
885/// - Hostcall completions are hash-routed by `call_id` for shard affinity.
886/// - Inbound events use deterministic round-robin distribution across shards.
887///
888/// Drain policy:
889/// - `drain_global_order()` emits envelopes in ascending global sequence
890///   across all shard heads, preserving deterministic external ordering.
891#[derive(Debug, Clone)]
892pub struct ReactorMesh {
893    seq: Seq,
894    lanes: Vec<SpscLane<ReactorEnvelope>>,
895    shard_seq: Vec<u64>,
896    rr_cursor: usize,
897    rejected_enqueues: u64,
898    placement_manifest: ReactorPlacementManifest,
899}
900
901impl ReactorMesh {
902    /// Create a mesh using the provided config.
903    ///
904    /// The mesh is fail-closed for invalid config values:
905    /// `shard_count == 0` or `lane_capacity == 0` returns an empty mesh.
906    #[must_use]
907    #[allow(clippy::needless_pass_by_value)]
908    pub fn new(config: ReactorMeshConfig) -> Self {
909        if config.shard_count == 0 || config.lane_capacity == 0 {
910            return Self {
911                seq: Seq::zero(),
912                lanes: Vec::new(),
913                shard_seq: Vec::new(),
914                rr_cursor: 0,
915                rejected_enqueues: 0,
916                placement_manifest: ReactorPlacementManifest::plan(0, config.topology.as_ref()),
917            };
918        }
919
920        let shard_count = config.shard_count;
921        let lane_capacity = config.lane_capacity;
922        let placement_manifest =
923            ReactorPlacementManifest::plan(shard_count, config.topology.as_ref());
924        let lanes = (0..shard_count)
925            .map(|_| SpscLane::new(lane_capacity))
926            .collect::<Vec<_>>();
927        Self {
928            seq: Seq::zero(),
929            lanes,
930            shard_seq: vec![0; shard_count],
931            rr_cursor: 0,
932            rejected_enqueues: 0,
933            placement_manifest,
934        }
935    }
936
937    /// Number of shard lanes.
938    #[must_use]
939    pub fn shard_count(&self) -> usize {
940        self.lanes.len()
941    }
942
943    /// Total pending envelopes across all shards.
944    #[must_use]
945    pub fn total_depth(&self) -> usize {
946        self.lanes.iter().map(SpscLane::len).sum()
947    }
948
949    /// Whether any lane has pending envelopes.
950    #[must_use]
951    pub fn has_pending(&self) -> bool {
952        self.total_depth() > 0
953    }
954
955    /// Per-shard queue depth.
956    #[must_use]
957    pub fn queue_depth(&self, shard_id: usize) -> Option<usize> {
958        self.lanes.get(shard_id).map(SpscLane::len)
959    }
960
961    /// Snapshot queueing telemetry for diagnostics.
962    #[must_use]
963    pub fn telemetry(&self) -> ReactorMeshTelemetry {
964        ReactorMeshTelemetry {
965            queue_depths: self.lanes.iter().map(SpscLane::len).collect(),
966            max_queue_depths: self.lanes.iter().map(|lane| lane.max_depth).collect(),
967            rejected_enqueues: self.rejected_enqueues,
968            shard_bindings: self.placement_manifest.bindings.clone(),
969            fallback_reason: self.placement_manifest.fallback_reason,
970        }
971    }
972
973    /// Deterministic shard placement manifest used by this mesh.
974    #[must_use]
975    pub const fn placement_manifest(&self) -> &ReactorPlacementManifest {
976        &self.placement_manifest
977    }
978
979    const fn next_global_seq(&mut self) -> Seq {
980        let current = self.seq;
981        self.seq = self.seq.next();
982        current
983    }
984
985    fn next_shard_seq(&mut self, shard_id: usize) -> u64 {
986        let Some(seq) = self.shard_seq.get_mut(shard_id) else {
987            return 0;
988        };
989        let current = *seq;
990        *seq = seq.saturating_add(1);
991        current
992    }
993
994    fn stable_hash(input: &str) -> u64 {
995        // FNV-1a 64-bit for deterministic process-independent routing.
996        let mut hash = 0xcbf2_9ce4_8422_2325_u64;
997        for byte in input.as_bytes() {
998            hash ^= u64::from(*byte);
999            hash = hash.wrapping_mul(0x0100_0000_01b3_u64);
1000        }
1001        hash
1002    }
1003
1004    fn hash_route(&self, call_id: &str) -> usize {
1005        if self.lanes.len() <= 1 {
1006            return 0;
1007        }
1008        let lanes = u64::try_from(self.lanes.len()).unwrap_or(1);
1009        let slot = Self::stable_hash(call_id) % lanes;
1010        usize::try_from(slot).unwrap_or(0)
1011    }
1012
1013    fn rr_route(&mut self) -> usize {
1014        if self.lanes.len() <= 1 {
1015            return 0;
1016        }
1017        let idx = self.rr_cursor % self.lanes.len();
1018        self.rr_cursor = self.rr_cursor.wrapping_add(1);
1019        idx
1020    }
1021
1022    fn enqueue_with_route(
1023        &mut self,
1024        shard_id: usize,
1025        task: MacrotaskKind,
1026    ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1027        let global_seq = self.next_global_seq();
1028        let shard_seq = self.next_shard_seq(shard_id);
1029        let envelope = ReactorEnvelope::new(global_seq, shard_seq, shard_id, task);
1030        let Some(lane) = self.lanes.get_mut(shard_id) else {
1031            self.rejected_enqueues = self.rejected_enqueues.saturating_add(1);
1032            return Err(ReactorBackpressure {
1033                shard_id,
1034                depth: 0,
1035                capacity: 0,
1036            });
1037        };
1038        match lane.push(envelope.clone()) {
1039            Ok(()) => Ok(envelope),
1040            Err(depth) => {
1041                self.rejected_enqueues = self.rejected_enqueues.saturating_add(1);
1042                Err(ReactorBackpressure {
1043                    shard_id,
1044                    depth,
1045                    capacity: lane.capacity,
1046                })
1047            }
1048        }
1049    }
1050
1051    /// Enqueue a hostcall completion using deterministic hash routing.
1052    pub fn enqueue_hostcall_complete(
1053        &mut self,
1054        call_id: String,
1055        outcome: HostcallOutcome,
1056    ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1057        let shard_id = self.hash_route(&call_id);
1058        self.enqueue_with_route(
1059            shard_id,
1060            MacrotaskKind::HostcallComplete { call_id, outcome },
1061        )
1062    }
1063
1064    /// Enqueue an inbound event using deterministic round-robin routing.
1065    pub fn enqueue_event(
1066        &mut self,
1067        event_id: String,
1068        payload: serde_json::Value,
1069    ) -> Result<ReactorEnvelope, ReactorBackpressure> {
1070        let shard_id = self.rr_route();
1071        self.enqueue_with_route(shard_id, MacrotaskKind::InboundEvent { event_id, payload })
1072    }
1073
1074    /// Drain one shard up to `budget` envelopes.
1075    pub fn drain_shard(&mut self, shard_id: usize, budget: usize) -> Vec<ReactorEnvelope> {
1076        let Some(lane) = self.lanes.get_mut(shard_id) else {
1077            return Vec::new();
1078        };
1079        let mut drained = Vec::with_capacity(budget.min(lane.len()));
1080        for _ in 0..budget {
1081            let Some(item) = lane.pop() else {
1082                break;
1083            };
1084            drained.push(item);
1085        }
1086        drained
1087    }
1088
1089    /// Drain across shards in deterministic global sequence order.
1090    pub fn drain_global_order(&mut self, budget: usize) -> Vec<ReactorEnvelope> {
1091        let mut drained = Vec::with_capacity(budget);
1092        for _ in 0..budget {
1093            let mut best_lane: Option<usize> = None;
1094            let mut best_seq: Option<Seq> = None;
1095            for (idx, lane) in self.lanes.iter().enumerate() {
1096                let Some(front) = lane.front() else {
1097                    continue;
1098                };
1099                if best_seq.is_none_or(|seq| front.global_seq < seq) {
1100                    best_seq = Some(front.global_seq);
1101                    best_lane = Some(idx);
1102                }
1103            }
1104            let Some(chosen_lane) = best_lane else {
1105                break;
1106            };
1107            if let Some(item) = self.lanes[chosen_lane].pop() {
1108                drained.push(item);
1109            }
1110        }
1111        drained
1112    }
1113}
1114
1115// ============================================================================
1116// NUMA-aware slab allocator for extension hot paths
1117// ============================================================================
1118
1119/// Configuration for hugepage-backed slab allocation.
1120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1121pub struct HugepageConfig {
1122    /// Hugepage size in bytes (default 2 MiB = `2_097_152`).
1123    pub page_size_bytes: usize,
1124    /// Whether hugepage backing is requested (advisory — falls back gracefully).
1125    pub enabled: bool,
1126}
1127
1128impl Default for HugepageConfig {
1129    fn default() -> Self {
1130        Self {
1131            page_size_bytes: 2 * 1024 * 1024, // 2 MiB
1132            enabled: true,
1133        }
1134    }
1135}
1136
1137/// Reason why hugepage-backed allocation was not used.
1138#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1139pub enum HugepageFallbackReason {
1140    /// Hugepage support was explicitly disabled in configuration.
1141    Disabled,
1142    /// No hugepage information available from the OS.
1143    DetectionUnavailable,
1144    /// System reports zero free hugepages.
1145    InsufficientHugepages,
1146    /// Requested slab size does not align to hugepage boundaries.
1147    AlignmentMismatch,
1148}
1149
1150impl HugepageFallbackReason {
1151    #[must_use]
1152    pub const fn as_code(self) -> &'static str {
1153        match self {
1154            Self::Disabled => "hugepage_disabled",
1155            Self::DetectionUnavailable => "detection_unavailable",
1156            Self::InsufficientHugepages => "insufficient_hugepages",
1157            Self::AlignmentMismatch => "alignment_mismatch",
1158        }
1159    }
1160}
1161
1162/// Hugepage availability snapshot from the host.
1163#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1164pub struct HugepageStatus {
1165    /// Total hugepages configured on the system.
1166    pub total_pages: u64,
1167    /// Free hugepages available for allocation.
1168    pub free_pages: u64,
1169    /// Page size in bytes (typically 2 MiB).
1170    pub page_size_bytes: usize,
1171    /// Whether hugepages are actually being used by this pool.
1172    pub active: bool,
1173    /// If not active, why.
1174    pub fallback_reason: Option<HugepageFallbackReason>,
1175}
1176
1177impl HugepageStatus {
1178    /// Determine hugepage viability from system metrics.
1179    #[must_use]
1180    pub const fn evaluate(config: &HugepageConfig, total: u64, free: u64) -> Self {
1181        if !config.enabled {
1182            return Self {
1183                total_pages: total,
1184                free_pages: free,
1185                page_size_bytes: config.page_size_bytes,
1186                active: false,
1187                fallback_reason: Some(HugepageFallbackReason::Disabled),
1188            };
1189        }
1190        if total == 0 && free == 0 {
1191            return Self {
1192                total_pages: 0,
1193                free_pages: 0,
1194                page_size_bytes: config.page_size_bytes,
1195                active: false,
1196                fallback_reason: Some(HugepageFallbackReason::DetectionUnavailable),
1197            };
1198        }
1199        if free == 0 {
1200            return Self {
1201                total_pages: total,
1202                free_pages: 0,
1203                page_size_bytes: config.page_size_bytes,
1204                active: false,
1205                fallback_reason: Some(HugepageFallbackReason::InsufficientHugepages),
1206            };
1207        }
1208        Self {
1209            total_pages: total,
1210            free_pages: free,
1211            page_size_bytes: config.page_size_bytes,
1212            active: true,
1213            fallback_reason: None,
1214        }
1215    }
1216
1217    /// Render as stable JSON for diagnostics.
1218    #[must_use]
1219    pub fn as_json(&self) -> serde_json::Value {
1220        serde_json::json!({
1221            "total_pages": self.total_pages,
1222            "free_pages": self.free_pages,
1223            "page_size_bytes": self.page_size_bytes,
1224            "active": self.active,
1225            "fallback_reason": self.fallback_reason.map(HugepageFallbackReason::as_code),
1226        })
1227    }
1228}
1229
1230/// Configuration for a NUMA-local slab pool.
1231#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1232pub struct NumaSlabConfig {
1233    /// Maximum entries per NUMA-node slab.
1234    pub slab_capacity: usize,
1235    /// Logical size of each entry in bytes (for telemetry, not enforced at runtime).
1236    pub entry_size_bytes: usize,
1237    /// Hugepage configuration.
1238    pub hugepage: HugepageConfig,
1239}
1240
1241impl Default for NumaSlabConfig {
1242    fn default() -> Self {
1243        Self {
1244            slab_capacity: 256,
1245            entry_size_bytes: 512,
1246            hugepage: HugepageConfig::default(),
1247        }
1248    }
1249}
1250
1251impl NumaSlabConfig {
1252    #[must_use]
1253    pub const fn slab_footprint_bytes(&self) -> Option<usize> {
1254        self.slab_capacity.checked_mul(self.entry_size_bytes)
1255    }
1256
1257    #[must_use]
1258    pub const fn hugepage_alignment_ok(&self) -> bool {
1259        if !self.hugepage.enabled {
1260            return true;
1261        }
1262        let page = self.hugepage.page_size_bytes;
1263        if page == 0 {
1264            return false;
1265        }
1266        match self.slab_footprint_bytes() {
1267            Some(bytes) => bytes != 0 && bytes % page == 0,
1268            None => false,
1269        }
1270    }
1271
1272    #[must_use]
1273    pub const fn alignment_mismatch_status(&self) -> HugepageStatus {
1274        HugepageStatus {
1275            total_pages: 0,
1276            free_pages: 0,
1277            page_size_bytes: self.hugepage.page_size_bytes,
1278            active: false,
1279            fallback_reason: Some(HugepageFallbackReason::AlignmentMismatch),
1280        }
1281    }
1282}
1283
1284/// Handle returned on successful slab allocation.
1285///
1286/// Contains enough information to deallocate safely and detect use-after-free
1287/// via generation tracking.
1288#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1289pub struct NumaSlabHandle {
1290    /// NUMA node that owns this allocation.
1291    pub node_id: usize,
1292    /// Slot index within the node's slab.
1293    pub slot_index: usize,
1294    /// Generation at allocation time (prevents ABA on reuse).
1295    pub generation: u64,
1296}
1297
1298/// Per-NUMA-node slab with free-list recycling.
1299#[derive(Debug, Clone)]
1300pub struct NumaSlab {
1301    node_id: usize,
1302    capacity: usize,
1303    generations: Vec<u64>,
1304    allocated: Vec<bool>,
1305    free_list: Vec<usize>,
1306    // Telemetry counters.
1307    total_allocs: u64,
1308    total_frees: u64,
1309    high_water_mark: usize,
1310}
1311
1312impl NumaSlab {
1313    /// Create a new slab for the given NUMA node with the specified capacity.
1314    #[must_use]
1315    pub fn new(node_id: usize, capacity: usize) -> Self {
1316        let capacity = capacity.max(1);
1317        let mut free_list = Vec::with_capacity(capacity);
1318        // Fill free list in reverse so slot 0 is popped first (LIFO recycle).
1319        for idx in (0..capacity).rev() {
1320            free_list.push(idx);
1321        }
1322        Self {
1323            node_id,
1324            capacity,
1325            generations: vec![0; capacity],
1326            allocated: vec![false; capacity],
1327            free_list,
1328            total_allocs: 0,
1329            total_frees: 0,
1330            high_water_mark: 0,
1331        }
1332    }
1333
1334    /// Number of currently allocated slots.
1335    #[must_use]
1336    pub fn in_use(&self) -> usize {
1337        self.capacity.saturating_sub(self.free_list.len())
1338    }
1339
1340    /// Whether the slab has available capacity.
1341    #[must_use]
1342    pub fn has_capacity(&self) -> bool {
1343        !self.free_list.is_empty()
1344    }
1345
1346    /// Allocate a slot, returning a handle or `None` if exhausted.
1347    pub fn allocate(&mut self) -> Option<NumaSlabHandle> {
1348        let slot_index = self.free_list.pop()?;
1349        self.allocated[slot_index] = true;
1350        self.generations[slot_index] = self.generations[slot_index].saturating_add(1);
1351        self.total_allocs = self.total_allocs.saturating_add(1);
1352        self.high_water_mark = self.high_water_mark.max(self.in_use());
1353        Some(NumaSlabHandle {
1354            node_id: self.node_id,
1355            slot_index,
1356            generation: self.generations[slot_index],
1357        })
1358    }
1359
1360    /// Deallocate a slot identified by handle.
1361    ///
1362    /// Returns `true` if deallocation succeeded, `false` if the handle is stale
1363    /// (wrong generation) or already freed.
1364    pub fn deallocate(&mut self, handle: &NumaSlabHandle) -> bool {
1365        if handle.node_id != self.node_id {
1366            return false;
1367        }
1368        if handle.slot_index >= self.capacity {
1369            return false;
1370        }
1371        if !self.allocated[handle.slot_index] {
1372            return false;
1373        }
1374        if self.generations[handle.slot_index] != handle.generation {
1375            return false;
1376        }
1377        self.allocated[handle.slot_index] = false;
1378        self.free_list.push(handle.slot_index);
1379        self.total_frees = self.total_frees.saturating_add(1);
1380        true
1381    }
1382}
1383
1384/// Cross-node allocation reason for telemetry.
1385#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1386pub enum CrossNodeReason {
1387    /// Local node slab is exhausted; allocated from nearest neighbor.
1388    LocalExhausted,
1389}
1390
1391impl CrossNodeReason {
1392    #[must_use]
1393    pub const fn as_code(self) -> &'static str {
1394        match self {
1395            Self::LocalExhausted => "local_exhausted",
1396        }
1397    }
1398}
1399
1400/// Multi-node slab pool that routes allocations to NUMA-local slabs.
1401#[derive(Debug, Clone)]
1402pub struct NumaSlabPool {
1403    slabs: Vec<NumaSlab>,
1404    config: NumaSlabConfig,
1405    hugepage_status: HugepageStatus,
1406    cross_node_allocs: u64,
1407    hugepage_backed_allocs: u64,
1408}
1409
1410impl NumaSlabPool {
1411    /// Create a pool with one slab per NUMA node identified in the placement manifest.
1412    #[must_use]
1413    pub fn from_manifest(manifest: &ReactorPlacementManifest, config: NumaSlabConfig) -> Self {
1414        let mut node_ids = manifest
1415            .bindings
1416            .iter()
1417            .map(|b| b.numa_node)
1418            .collect::<Vec<_>>();
1419        node_ids.sort_unstable();
1420        node_ids.dedup();
1421
1422        if node_ids.is_empty() {
1423            node_ids.push(0);
1424        }
1425
1426        let slabs = node_ids
1427            .iter()
1428            .map(|&node_id| NumaSlab::new(node_id, config.slab_capacity))
1429            .collect();
1430
1431        // Hugepage evaluation with zero system data (caller can override via
1432        // `set_hugepage_status` once real data is available).
1433        let hugepage_status = if config.hugepage.enabled && !config.hugepage_alignment_ok() {
1434            config.alignment_mismatch_status()
1435        } else {
1436            HugepageStatus::evaluate(&config.hugepage, 0, 0)
1437        };
1438
1439        Self {
1440            slabs,
1441            config,
1442            hugepage_status,
1443            cross_node_allocs: 0,
1444            hugepage_backed_allocs: 0,
1445        }
1446    }
1447
1448    /// Override hugepage status after querying the host.
1449    pub const fn set_hugepage_status(&mut self, status: HugepageStatus) {
1450        self.hugepage_status = if !self.config.hugepage.enabled {
1451            HugepageStatus::evaluate(&self.config.hugepage, status.total_pages, status.free_pages)
1452        } else if !self.config.hugepage_alignment_ok() {
1453            self.config.alignment_mismatch_status()
1454        } else {
1455            status
1456        };
1457    }
1458
1459    /// Number of NUMA nodes in this pool.
1460    #[must_use]
1461    pub fn node_count(&self) -> usize {
1462        self.slabs.len()
1463    }
1464
1465    /// Allocate from the preferred NUMA node, falling back to any node with capacity.
1466    ///
1467    /// Returns `(handle, cross_node_reason)` where the reason is `Some` when
1468    /// the allocation was served from a non-preferred node.
1469    pub fn allocate(
1470        &mut self,
1471        preferred_node: usize,
1472    ) -> Option<(NumaSlabHandle, Option<CrossNodeReason>)> {
1473        // Try preferred node first.
1474        if let Some(slab) = self.slabs.iter_mut().find(|s| s.node_id == preferred_node) {
1475            if let Some(handle) = slab.allocate() {
1476                if self.hugepage_status.active {
1477                    self.hugepage_backed_allocs = self.hugepage_backed_allocs.saturating_add(1);
1478                }
1479                return Some((handle, None));
1480            }
1481        }
1482        // Fallback: scan all nodes for available capacity.
1483        for slab in &mut self.slabs {
1484            if slab.node_id == preferred_node {
1485                continue;
1486            }
1487            if let Some(handle) = slab.allocate() {
1488                self.cross_node_allocs = self.cross_node_allocs.saturating_add(1);
1489                if self.hugepage_status.active {
1490                    self.hugepage_backed_allocs = self.hugepage_backed_allocs.saturating_add(1);
1491                }
1492                return Some((handle, Some(CrossNodeReason::LocalExhausted)));
1493            }
1494        }
1495        None
1496    }
1497
1498    /// Deallocate a previously allocated handle.
1499    ///
1500    /// Returns `true` if deallocation succeeded.
1501    pub fn deallocate(&mut self, handle: &NumaSlabHandle) -> bool {
1502        for slab in &mut self.slabs {
1503            if slab.node_id == handle.node_id {
1504                return slab.deallocate(handle);
1505            }
1506        }
1507        false
1508    }
1509
1510    /// Snapshot telemetry for this pool.
1511    #[must_use]
1512    pub fn telemetry(&self) -> NumaSlabTelemetry {
1513        let per_node = self
1514            .slabs
1515            .iter()
1516            .map(|slab| NumaSlabNodeTelemetry {
1517                node_id: slab.node_id,
1518                capacity: slab.capacity,
1519                in_use: slab.in_use(),
1520                total_allocs: slab.total_allocs,
1521                total_frees: slab.total_frees,
1522                high_water_mark: slab.high_water_mark,
1523            })
1524            .collect();
1525        NumaSlabTelemetry {
1526            per_node,
1527            cross_node_allocs: self.cross_node_allocs,
1528            hugepage_backed_allocs: self.hugepage_backed_allocs,
1529            hugepage_status: self.hugepage_status,
1530            config: self.config,
1531        }
1532    }
1533}
1534
1535/// Per-node telemetry counters.
1536#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1537pub struct NumaSlabNodeTelemetry {
1538    pub node_id: usize,
1539    pub capacity: usize,
1540    pub in_use: usize,
1541    pub total_allocs: u64,
1542    pub total_frees: u64,
1543    pub high_water_mark: usize,
1544}
1545
1546/// Aggregate slab pool telemetry.
1547#[derive(Debug, Clone, PartialEq, Eq)]
1548pub struct NumaSlabTelemetry {
1549    pub per_node: Vec<NumaSlabNodeTelemetry>,
1550    pub cross_node_allocs: u64,
1551    pub hugepage_backed_allocs: u64,
1552    pub hugepage_status: HugepageStatus,
1553    pub config: NumaSlabConfig,
1554}
1555
1556impl NumaSlabTelemetry {
1557    const RATIO_SCALE_BPS: u64 = 10_000;
1558
1559    #[must_use]
1560    fn ratio_basis_points(numerator: u64, denominator: u64) -> u64 {
1561        if denominator == 0 {
1562            return 0;
1563        }
1564        let scaled =
1565            (u128::from(numerator) * u128::from(Self::RATIO_SCALE_BPS)) / u128::from(denominator);
1566        u64::try_from(scaled).unwrap_or(Self::RATIO_SCALE_BPS)
1567    }
1568
1569    #[must_use]
1570    const fn pressure_band(value_bps: u64) -> &'static str {
1571        if value_bps >= 7_500 {
1572            "high"
1573        } else if value_bps >= 2_500 {
1574            "medium"
1575        } else {
1576            "low"
1577        }
1578    }
1579
1580    /// Render as stable machine-readable JSON for diagnostics.
1581    #[must_use]
1582    pub fn as_json(&self) -> serde_json::Value {
1583        let total_allocs: u64 = self.per_node.iter().map(|n| n.total_allocs).sum();
1584        let total_frees: u64 = self.per_node.iter().map(|n| n.total_frees).sum();
1585        let total_in_use: usize = self.per_node.iter().map(|n| n.in_use).sum();
1586        let total_capacity: usize = self.per_node.iter().map(|n| n.capacity).sum();
1587        let total_high_water: usize = self.per_node.iter().map(|n| n.high_water_mark).sum();
1588        let remote_allocs = self.cross_node_allocs.min(total_allocs);
1589        let local_allocs = total_allocs.saturating_sub(remote_allocs);
1590        let local_ratio_bps = Self::ratio_basis_points(local_allocs, total_allocs);
1591        let remote_ratio_bps = Self::ratio_basis_points(remote_allocs, total_allocs);
1592        let hugepage_backed_allocs = self.hugepage_backed_allocs.min(total_allocs);
1593        let hugepage_hit_rate_bps = Self::ratio_basis_points(hugepage_backed_allocs, total_allocs);
1594        let total_capacity_u64 = u64::try_from(total_capacity).unwrap_or(u64::MAX);
1595        let total_in_use_u64 = u64::try_from(total_in_use).unwrap_or(u64::MAX);
1596        let total_high_water_u64 = u64::try_from(total_high_water).unwrap_or(u64::MAX);
1597        let occupancy_pressure_bps = Self::ratio_basis_points(total_in_use_u64, total_capacity_u64);
1598        let cache_miss_pressure_bps =
1599            Self::ratio_basis_points(total_high_water_u64, total_capacity_u64);
1600        // Remote allocations are a practical proxy for TLB/cache pressure from cross-node traffic.
1601        let tlb_miss_pressure_bps = remote_ratio_bps;
1602        let cross_node_fallback_reason = if self.cross_node_allocs > 0 {
1603            Some(CrossNodeReason::LocalExhausted.as_code())
1604        } else {
1605            None
1606        };
1607        serde_json::json!({
1608            "node_count": self.per_node.len(),
1609            "total_allocs": total_allocs,
1610            "total_frees": total_frees,
1611            "total_in_use": total_in_use,
1612            "cross_node_allocs": self.cross_node_allocs,
1613            "hugepage_backed_allocs": hugepage_backed_allocs,
1614            "local_allocs": local_allocs,
1615            "remote_allocs": remote_allocs,
1616            "allocation_ratio_bps": {
1617                "scale": Self::RATIO_SCALE_BPS,
1618                "local": local_ratio_bps,
1619                "remote": remote_ratio_bps,
1620            },
1621            "hugepage_hit_rate_bps": {
1622                "scale": Self::RATIO_SCALE_BPS,
1623                "value": hugepage_hit_rate_bps,
1624            },
1625            "latency_proxies_bps": {
1626                "scale": Self::RATIO_SCALE_BPS,
1627                "tlb_miss_pressure": tlb_miss_pressure_bps,
1628                "cache_miss_pressure": cache_miss_pressure_bps,
1629                "occupancy_pressure": occupancy_pressure_bps,
1630            },
1631            "pressure_bands": {
1632                "tlb_miss": Self::pressure_band(tlb_miss_pressure_bps),
1633                "cache_miss": Self::pressure_band(cache_miss_pressure_bps),
1634                "occupancy": Self::pressure_band(occupancy_pressure_bps),
1635            },
1636            "fallback_reasons": {
1637                "cross_node": cross_node_fallback_reason,
1638                "hugepage": self.hugepage_status.fallback_reason.map(HugepageFallbackReason::as_code),
1639            },
1640            "config": {
1641                "slab_capacity": self.config.slab_capacity,
1642                "entry_size_bytes": self.config.entry_size_bytes,
1643                "hugepage_enabled": self.config.hugepage.enabled,
1644                "hugepage_page_size_bytes": self.config.hugepage.page_size_bytes,
1645            },
1646            "hugepage": self.hugepage_status.as_json(),
1647            "per_node": self.per_node.iter().map(|n| serde_json::json!({
1648                "node_id": n.node_id,
1649                "capacity": n.capacity,
1650                "in_use": n.in_use,
1651                "total_allocs": n.total_allocs,
1652                "total_frees": n.total_frees,
1653                "high_water_mark": n.high_water_mark,
1654            })).collect::<Vec<_>>(),
1655        })
1656    }
1657}
1658
1659// ============================================================================
1660// Thread affinity advisory
1661// ============================================================================
1662
1663/// Enforcement level for thread-to-core affinity.
1664#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1665pub enum AffinityEnforcement {
1666    /// Affinity is advisory only; scheduler may override.
1667    Advisory,
1668    /// Strict enforcement requested (requires OS support).
1669    Strict,
1670    /// Affinity enforcement is disabled.
1671    Disabled,
1672}
1673
1674impl AffinityEnforcement {
1675    #[must_use]
1676    pub const fn as_code(self) -> &'static str {
1677        match self {
1678            Self::Advisory => "advisory",
1679            Self::Strict => "strict",
1680            Self::Disabled => "disabled",
1681        }
1682    }
1683}
1684
1685/// Advisory thread-to-core binding produced from placement manifest.
1686#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1687pub struct ThreadAffinityAdvice {
1688    pub shard_id: usize,
1689    pub recommended_core: usize,
1690    pub recommended_numa_node: usize,
1691    pub enforcement: AffinityEnforcement,
1692}
1693
1694impl ThreadAffinityAdvice {
1695    /// Render as JSON for diagnostics.
1696    #[must_use]
1697    pub fn as_json(&self) -> serde_json::Value {
1698        serde_json::json!({
1699            "shard_id": self.shard_id,
1700            "recommended_core": self.recommended_core,
1701            "recommended_numa_node": self.recommended_numa_node,
1702            "enforcement": self.enforcement.as_code(),
1703        })
1704    }
1705}
1706
1707impl ReactorPlacementManifest {
1708    /// Generate affinity advice for all shards in this manifest.
1709    #[must_use]
1710    pub fn affinity_advice(&self, enforcement: AffinityEnforcement) -> Vec<ThreadAffinityAdvice> {
1711        self.bindings
1712            .iter()
1713            .map(|binding| ThreadAffinityAdvice {
1714                shard_id: binding.shard_id,
1715                recommended_core: binding.core_id,
1716                recommended_numa_node: binding.numa_node,
1717                enforcement,
1718            })
1719            .collect()
1720    }
1721
1722    /// Look up the NUMA node for a specific shard.
1723    #[must_use]
1724    pub fn numa_node_for_shard(&self, shard_id: usize) -> Option<usize> {
1725        self.bindings
1726            .iter()
1727            .find(|b| b.shard_id == shard_id)
1728            .map(|b| b.numa_node)
1729    }
1730}
1731
1732// ============================================================================
1733// ReactorMesh ↔ NUMA slab integration
1734// ============================================================================
1735
1736impl ReactorMesh {
1737    /// Look up the preferred NUMA node for a shard via the placement manifest.
1738    #[must_use]
1739    pub fn preferred_numa_node(&self, shard_id: usize) -> usize {
1740        self.placement_manifest
1741            .numa_node_for_shard(shard_id)
1742            .unwrap_or(0)
1743    }
1744
1745    /// Generate thread affinity advice from the mesh's placement manifest.
1746    #[must_use]
1747    pub fn affinity_advice(&self, enforcement: AffinityEnforcement) -> Vec<ThreadAffinityAdvice> {
1748        self.placement_manifest.affinity_advice(enforcement)
1749    }
1750}
1751
1752impl<C: Clock> fmt::Debug for Scheduler<C> {
1753    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1754        f.debug_struct("Scheduler")
1755            .field("seq", &self.seq)
1756            .field("macrotask_count", &self.macrotask_queue.len())
1757            .field("timer_count", &self.timer_count())
1758            .field("next_timer_id", &self.next_timer_id)
1759            .field("cancelled_timers", &self.cancelled_timers.len())
1760            .finish_non_exhaustive()
1761    }
1762}
1763
1764#[cfg(test)]
1765mod tests {
1766    use super::*;
1767
1768    #[test]
1769    fn seq_ordering() {
1770        let a = Seq::zero();
1771        let b = a.next();
1772        let c = b.next();
1773
1774        assert!(a < b);
1775        assert!(b < c);
1776        assert_eq!(a.value(), 0);
1777        assert_eq!(b.value(), 1);
1778        assert_eq!(c.value(), 2);
1779    }
1780
1781    #[test]
1782    fn seq_next_saturates_at_u64_max() {
1783        let max = Seq(u64::MAX);
1784        assert_eq!(max.next(), max);
1785    }
1786
1787    #[test]
1788    fn timer_ordering() {
1789        // Earlier deadline = higher priority (lower in min-heap)
1790        let t1 = TimerEntry::new(1, 100, Seq(0));
1791        let t2 = TimerEntry::new(2, 200, Seq(1));
1792
1793        assert!(t1 > t2); // Reversed for min-heap
1794
1795        // Same deadline, earlier seq = higher priority
1796        let t3 = TimerEntry::new(3, 100, Seq(5));
1797        let t4 = TimerEntry::new(4, 100, Seq(10));
1798
1799        assert!(t3 > t4); // Reversed for min-heap
1800    }
1801
1802    #[test]
1803    fn deterministic_clock() {
1804        let clock = DeterministicClock::new(1000);
1805        assert_eq!(clock.now_ms(), 1000);
1806
1807        clock.advance(500);
1808        assert_eq!(clock.now_ms(), 1500);
1809
1810        clock.set(2000);
1811        assert_eq!(clock.now_ms(), 2000);
1812    }
1813
1814    #[test]
1815    fn scheduler_basic_timer() {
1816        let clock = DeterministicClock::new(0);
1817        let mut sched = Scheduler::with_clock(clock);
1818
1819        // Set a timer for 100ms
1820        let timer_id = sched.set_timeout(100);
1821        assert_eq!(timer_id, 1);
1822        assert_eq!(sched.timer_count(), 1);
1823        assert!(!sched.macrotask_queue.is_empty() || sched.timer_count() > 0);
1824
1825        // Tick before deadline - nothing happens
1826        let task = sched.tick();
1827        assert!(task.is_none());
1828
1829        // Advance past deadline
1830        sched.clock.advance(150);
1831        let task = sched.tick();
1832        assert!(task.is_some());
1833        match task.unwrap().kind {
1834            MacrotaskKind::TimerFired { timer_id: id } => assert_eq!(id, timer_id),
1835            other => unreachable!("Expected TimerFired, got {other:?}"),
1836        }
1837    }
1838
1839    #[test]
1840    fn scheduler_timer_id_wraps_after_u64_max() {
1841        let clock = DeterministicClock::new(0);
1842        let mut sched = Scheduler::with_clock(clock);
1843        sched.next_timer_id = u64::MAX;
1844
1845        let first = sched.set_timeout(10);
1846        let second = sched.set_timeout(20);
1847
1848        assert_eq!(first, u64::MAX);
1849        assert_eq!(second, 1);
1850    }
1851
1852    #[test]
1853    fn scheduler_timer_id_wrap_preserves_cancellation_semantics() {
1854        let clock = DeterministicClock::new(0);
1855        let mut sched = Scheduler::with_clock(clock);
1856        sched.next_timer_id = u64::MAX;
1857
1858        let max_id = sched.set_timeout(10);
1859        let wrapped_id = sched.set_timeout(20);
1860
1861        assert_eq!(max_id, u64::MAX);
1862        assert_eq!(wrapped_id, 1);
1863        assert!(sched.clear_timeout(max_id));
1864        assert!(sched.clear_timeout(wrapped_id));
1865
1866        sched.clock.advance(25);
1867        assert!(sched.tick().is_none());
1868        assert!(sched.tick().is_none());
1869    }
1870
1871    #[test]
1872    fn scheduler_timer_ordering() {
1873        let clock = DeterministicClock::new(0);
1874        let mut sched = Scheduler::with_clock(clock);
1875
1876        // Set timers in reverse order
1877        let t3 = sched.set_timeout(300);
1878        let t1 = sched.set_timeout(100);
1879        let t2 = sched.set_timeout(200);
1880
1881        // Advance past all deadlines
1882        sched.clock.advance(400);
1883
1884        // Should fire in deadline order
1885        let task1 = sched.tick().unwrap();
1886        let task2 = sched.tick().unwrap();
1887        let task3 = sched.tick().unwrap();
1888
1889        match task1.kind {
1890            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t1),
1891            other => unreachable!("Expected t1, got {other:?}"),
1892        }
1893        match task2.kind {
1894            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1895            other => unreachable!("Expected t2, got {other:?}"),
1896        }
1897        match task3.kind {
1898            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t3),
1899            other => unreachable!("Expected t3, got {other:?}"),
1900        }
1901    }
1902
1903    #[test]
1904    fn scheduler_same_deadline_seq_ordering() {
1905        let clock = DeterministicClock::new(0);
1906        let mut sched = Scheduler::with_clock(clock);
1907
1908        // Set timers with same deadline - should fire in seq order
1909        let t1 = sched.set_timeout(100);
1910        let t2 = sched.set_timeout(100);
1911        let t3 = sched.set_timeout(100);
1912
1913        sched.clock.advance(150);
1914
1915        let task1 = sched.tick().unwrap();
1916        let task2 = sched.tick().unwrap();
1917        let task3 = sched.tick().unwrap();
1918
1919        // Must fire in order they were created (by seq)
1920        match task1.kind {
1921            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t1),
1922            other => unreachable!("Expected t1, got {other:?}"),
1923        }
1924        match task2.kind {
1925            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1926            other => unreachable!("Expected t2, got {other:?}"),
1927        }
1928        match task3.kind {
1929            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t3),
1930            other => unreachable!("Expected t3, got {other:?}"),
1931        }
1932    }
1933
1934    #[test]
1935    fn scheduler_cancel_timer() {
1936        let clock = DeterministicClock::new(0);
1937        let mut sched = Scheduler::with_clock(clock);
1938
1939        let t1 = sched.set_timeout(100);
1940        let t2 = sched.set_timeout(200);
1941
1942        // Cancel t1
1943        assert!(sched.clear_timeout(t1));
1944
1945        // Advance past both deadlines
1946        sched.clock.advance(250);
1947
1948        // Only t2 should fire
1949        let task = sched.tick().unwrap();
1950        match task.kind {
1951            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, t2),
1952            other => unreachable!("Expected t2, got {other:?}"),
1953        }
1954
1955        // No more tasks
1956        assert!(sched.tick().is_none());
1957    }
1958
1959    #[test]
1960    fn scheduler_hostcall_completion() {
1961        let clock = DeterministicClock::new(0);
1962        let mut sched = Scheduler::with_clock(clock);
1963
1964        sched.enqueue_hostcall_complete(
1965            "call-1".to_string(),
1966            HostcallOutcome::Success(serde_json::json!({"result": 42})),
1967        );
1968
1969        let task = sched.tick().unwrap();
1970        match task.kind {
1971            MacrotaskKind::HostcallComplete { call_id, outcome } => {
1972                assert_eq!(call_id, "call-1");
1973                match outcome {
1974                    HostcallOutcome::Success(v) => assert_eq!(v["result"], 42),
1975                    other => unreachable!("Expected success, got {other:?}"),
1976                }
1977            }
1978            other => unreachable!("Expected HostcallComplete, got {other:?}"),
1979        }
1980    }
1981
1982    #[test]
1983    fn scheduler_stream_chunk_sequence_and_finality_invariants() {
1984        let clock = DeterministicClock::new(0);
1985        let mut sched = Scheduler::with_clock(clock);
1986
1987        sched.enqueue_stream_chunk(
1988            "call-stream".to_string(),
1989            0,
1990            serde_json::json!({ "part": "a" }),
1991            false,
1992        );
1993        sched.enqueue_stream_chunk(
1994            "call-stream".to_string(),
1995            1,
1996            serde_json::json!({ "part": "b" }),
1997            false,
1998        );
1999        sched.enqueue_stream_chunk(
2000            "call-stream".to_string(),
2001            2,
2002            serde_json::json!({ "part": "c" }),
2003            true,
2004        );
2005
2006        let mut seen = Vec::new();
2007        while let Some(task) = sched.tick() {
2008            let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
2009                unreachable!("expected hostcall completion task");
2010            };
2011            let HostcallOutcome::StreamChunk {
2012                sequence,
2013                chunk,
2014                is_final,
2015            } = outcome
2016            else {
2017                unreachable!("expected stream chunk outcome");
2018            };
2019            seen.push((call_id, sequence, chunk, is_final));
2020        }
2021
2022        assert_eq!(seen.len(), 3);
2023        assert!(
2024            seen.iter()
2025                .all(|(call_id, _, _, _)| call_id == "call-stream")
2026        );
2027        assert_eq!(seen[0].1, 0);
2028        assert_eq!(seen[1].1, 1);
2029        assert_eq!(seen[2].1, 2);
2030        assert_eq!(seen[0].2, serde_json::json!({ "part": "a" }));
2031        assert_eq!(seen[1].2, serde_json::json!({ "part": "b" }));
2032        assert_eq!(seen[2].2, serde_json::json!({ "part": "c" }));
2033
2034        let final_count = seen.iter().filter(|(_, _, _, is_final)| *is_final).count();
2035        assert_eq!(final_count, 1, "expected exactly one final chunk");
2036        assert!(seen[2].3, "final chunk must be last");
2037    }
2038
2039    #[test]
2040    fn scheduler_stream_chunks_multi_call_interleaving_is_deterministic() {
2041        let clock = DeterministicClock::new(0);
2042        let mut sched = Scheduler::with_clock(clock);
2043
2044        sched.enqueue_stream_chunk("call-a".to_string(), 0, serde_json::json!("a0"), false);
2045        sched.enqueue_stream_chunk("call-b".to_string(), 0, serde_json::json!("b0"), false);
2046        sched.enqueue_stream_chunk("call-a".to_string(), 1, serde_json::json!("a1"), true);
2047        sched.enqueue_stream_chunk("call-b".to_string(), 1, serde_json::json!("b1"), true);
2048
2049        let mut trace = Vec::new();
2050        while let Some(task) = sched.tick() {
2051            let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
2052                unreachable!("expected hostcall completion task");
2053            };
2054            let HostcallOutcome::StreamChunk {
2055                sequence, is_final, ..
2056            } = outcome
2057            else {
2058                unreachable!("expected stream chunk outcome");
2059            };
2060            trace.push((call_id, sequence, is_final));
2061        }
2062
2063        assert_eq!(
2064            trace,
2065            vec![
2066                ("call-a".to_string(), 0, false),
2067                ("call-b".to_string(), 0, false),
2068                ("call-a".to_string(), 1, true),
2069                ("call-b".to_string(), 1, true),
2070            ]
2071        );
2072    }
2073
2074    #[test]
2075    fn scheduler_event_ordering() {
2076        let clock = DeterministicClock::new(0);
2077        let mut sched = Scheduler::with_clock(clock);
2078
2079        // Enqueue events in order
2080        sched.enqueue_event("evt-1".to_string(), serde_json::json!({"n": 1}));
2081        sched.enqueue_event("evt-2".to_string(), serde_json::json!({"n": 2}));
2082
2083        // Should dequeue in FIFO order
2084        let task1 = sched.tick().unwrap();
2085        let task2 = sched.tick().unwrap();
2086
2087        match task1.kind {
2088            MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-1"),
2089            other => unreachable!("Expected evt-1, got {other:?}"),
2090        }
2091        match task2.kind {
2092            MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-2"),
2093            other => unreachable!("Expected evt-2, got {other:?}"),
2094        }
2095    }
2096
2097    #[test]
2098    fn scheduler_mixed_tasks_ordering() {
2099        let clock = DeterministicClock::new(0);
2100        let mut sched = Scheduler::with_clock(clock);
2101
2102        // Set a timer
2103        let _t1 = sched.set_timeout(50);
2104
2105        // Enqueue an event (gets earlier seq)
2106        sched.enqueue_event("evt-1".to_string(), serde_json::json!({}));
2107
2108        // Advance past timer
2109        sched.clock.advance(100);
2110
2111        // Event should come first (enqueued before timer moved to queue)
2112        let task1 = sched.tick().unwrap();
2113        match task1.kind {
2114            MacrotaskKind::InboundEvent { event_id, .. } => assert_eq!(event_id, "evt-1"),
2115            other => unreachable!("Expected event first, got {other:?}"),
2116        }
2117
2118        // Then timer
2119        let task2 = sched.tick().unwrap();
2120        match task2.kind {
2121            MacrotaskKind::TimerFired { .. } => {}
2122            other => unreachable!("Expected timer second, got {other:?}"),
2123        }
2124    }
2125
2126    #[test]
2127    fn scheduler_invariant_single_macrotask_per_tick() {
2128        let clock = DeterministicClock::new(0);
2129        let mut sched = Scheduler::with_clock(clock);
2130
2131        sched.enqueue_event("evt-1".to_string(), serde_json::json!({}));
2132        sched.enqueue_event("evt-2".to_string(), serde_json::json!({}));
2133        sched.enqueue_event("evt-3".to_string(), serde_json::json!({}));
2134
2135        // Each tick returns exactly one task (I1)
2136        assert!(sched.tick().is_some());
2137        assert_eq!(sched.macrotask_count(), 2);
2138
2139        assert!(sched.tick().is_some());
2140        assert_eq!(sched.macrotask_count(), 1);
2141
2142        assert!(sched.tick().is_some());
2143        assert_eq!(sched.macrotask_count(), 0);
2144
2145        assert!(sched.tick().is_none());
2146    }
2147
2148    #[test]
2149    fn scheduler_next_timer_deadline() {
2150        let clock = DeterministicClock::new(0);
2151        let mut sched = Scheduler::with_clock(clock);
2152
2153        assert!(sched.next_timer_deadline().is_none());
2154
2155        sched.set_timeout(200);
2156        sched.set_timeout(100);
2157        sched.set_timeout(300);
2158
2159        assert_eq!(sched.next_timer_deadline(), Some(100));
2160        assert_eq!(sched.time_until_next_timer(), Some(100));
2161
2162        sched.clock.advance(50);
2163        assert_eq!(sched.time_until_next_timer(), Some(50));
2164    }
2165
2166    #[test]
2167    fn scheduler_next_timer_skips_cancelled_timers() {
2168        let clock = DeterministicClock::new(0);
2169        let mut sched = Scheduler::with_clock(clock);
2170
2171        let t1 = sched.set_timeout(100);
2172        let _t2 = sched.set_timeout(200);
2173        let _t3 = sched.set_timeout(300);
2174
2175        assert!(sched.clear_timeout(t1));
2176        assert_eq!(sched.next_timer_deadline(), Some(200));
2177        assert_eq!(sched.time_until_next_timer(), Some(200));
2178    }
2179
2180    #[test]
2181    fn scheduler_debug_format() {
2182        let clock = DeterministicClock::new(0);
2183        let sched = Scheduler::with_clock(clock);
2184        let debug = format!("{sched:?}");
2185        assert!(debug.contains("Scheduler"));
2186        assert!(debug.contains("seq"));
2187    }
2188
2189    #[test]
2190    fn scheduler_debug_reports_live_timer_count() {
2191        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2192        let cancelled = sched.set_timeout(10);
2193        sched.set_timeout(20);
2194
2195        assert!(sched.clear_timeout(cancelled));
2196
2197        let debug = format!("{sched:?}");
2198        assert!(
2199            debug.contains("timer_count: 1"),
2200            "unexpected debug: {debug}"
2201        );
2202        assert!(
2203            debug.contains("cancelled_timers: 1"),
2204            "unexpected debug: {debug}"
2205        );
2206    }
2207
2208    #[derive(Debug, Clone)]
2209    struct XorShift64 {
2210        state: u64,
2211    }
2212
2213    impl XorShift64 {
2214        const fn new(seed: u64) -> Self {
2215            // Avoid the all-zero state so the stream doesn't get stuck.
2216            let seed = seed ^ 0x9E37_79B9_7F4A_7C15;
2217            Self { state: seed }
2218        }
2219
2220        fn next_u64(&mut self) -> u64 {
2221            let mut x = self.state;
2222            x ^= x << 13;
2223            x ^= x >> 7;
2224            x ^= x << 17;
2225            self.state = x;
2226            x
2227        }
2228
2229        fn next_range_u64(&mut self, upper_exclusive: u64) -> u64 {
2230            if upper_exclusive == 0 {
2231                return 0;
2232            }
2233            self.next_u64() % upper_exclusive
2234        }
2235
2236        fn next_usize(&mut self, upper_exclusive: usize) -> usize {
2237            let upper = u64::try_from(upper_exclusive).expect("usize fits in u64");
2238            let value = self.next_range_u64(upper);
2239            usize::try_from(value).expect("value < upper_exclusive")
2240        }
2241    }
2242
2243    fn trace_entry(task: &Macrotask) -> String {
2244        match &task.kind {
2245            MacrotaskKind::TimerFired { timer_id } => {
2246                format!("seq={}:timer:{timer_id}", task.seq.value())
2247            }
2248            MacrotaskKind::HostcallComplete { call_id, outcome } => {
2249                let outcome_tag = match outcome {
2250                    HostcallOutcome::Success(_) => "ok",
2251                    HostcallOutcome::Error { .. } => "err",
2252                    HostcallOutcome::StreamChunk { is_final, .. } => {
2253                        if *is_final {
2254                            "stream_final"
2255                        } else {
2256                            "chunk"
2257                        }
2258                    }
2259                };
2260                format!("seq={}:hostcall:{call_id}:{outcome_tag}", task.seq.value())
2261            }
2262            MacrotaskKind::InboundEvent { event_id, payload } => {
2263                format!(
2264                    "seq={}:event:{event_id}:payload={payload}",
2265                    task.seq.value()
2266                )
2267            }
2268        }
2269    }
2270
2271    fn run_seeded_script(seed: u64) -> Vec<String> {
2272        let clock = DeterministicClock::new(0);
2273        let mut sched = Scheduler::with_clock(clock);
2274        let mut rng = XorShift64::new(seed);
2275        let mut timers = Vec::new();
2276        let mut trace = Vec::new();
2277
2278        for step in 0..256u64 {
2279            match rng.next_range_u64(6) {
2280                0 => {
2281                    let delay_ms = rng.next_range_u64(250);
2282                    let timer_id = sched.set_timeout(delay_ms);
2283                    timers.push(timer_id);
2284                }
2285                1 if !timers.is_empty() => {
2286                    let idx = rng.next_usize(timers.len());
2287                    let _cancelled = sched.clear_timeout(timers[idx]);
2288                }
2289                2 => {
2290                    let call_id = format!("call-{step}-{}", rng.next_u64());
2291                    let outcome = HostcallOutcome::Success(serde_json::json!({ "step": step }));
2292                    sched.enqueue_hostcall_complete(call_id, outcome);
2293                }
2294                3 => {
2295                    let event_id = format!("evt-{step}");
2296                    let payload = serde_json::json!({ "step": step, "entropy": rng.next_u64() });
2297                    sched.enqueue_event(event_id, payload);
2298                }
2299                4 => {
2300                    let delta_ms = rng.next_range_u64(50);
2301                    sched.clock.advance(delta_ms);
2302                }
2303                _ => {}
2304            }
2305
2306            if rng.next_range_u64(3) == 0 {
2307                if let Some(task) = sched.tick() {
2308                    trace.push(trace_entry(&task));
2309                }
2310            }
2311        }
2312
2313        // Drain remaining tasks and timers deterministically.
2314        for _ in 0..10_000 {
2315            if let Some(task) = sched.tick() {
2316                trace.push(trace_entry(&task));
2317                continue;
2318            }
2319
2320            let Some(next_deadline) = sched.next_timer_deadline() else {
2321                break;
2322            };
2323
2324            let now = sched.now_ms();
2325            assert!(
2326                next_deadline > now,
2327                "expected future timer deadline (deadline={next_deadline}, now={now})"
2328            );
2329            sched.clock.set(next_deadline);
2330        }
2331
2332        trace
2333    }
2334
2335    #[test]
2336    fn scheduler_seeded_trace_is_deterministic() {
2337        for seed in [0_u64, 1, 2, 3, 0xDEAD_BEEF] {
2338            let a = run_seeded_script(seed);
2339            let b = run_seeded_script(seed);
2340            assert_eq!(a, b, "trace mismatch for seed={seed}");
2341        }
2342    }
2343
2344    // ── Seq Display format ──────────────────────────────────────────
2345
2346    #[test]
2347    fn seq_display_format() {
2348        assert_eq!(format!("{}", Seq::zero()), "seq:0");
2349        assert_eq!(format!("{}", Seq::zero().next()), "seq:1");
2350    }
2351
2352    // ── has_pending / macrotask_count / timer_count ──────────────────
2353
2354    #[test]
2355    fn empty_scheduler_has_no_pending() {
2356        let sched = Scheduler::with_clock(DeterministicClock::new(0));
2357        assert!(!sched.has_pending());
2358        assert_eq!(sched.macrotask_count(), 0);
2359        assert_eq!(sched.timer_count(), 0);
2360    }
2361
2362    #[test]
2363    fn has_pending_with_timer_only() {
2364        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2365        sched.set_timeout(100);
2366        assert!(sched.has_pending());
2367        assert_eq!(sched.macrotask_count(), 0);
2368        assert_eq!(sched.timer_count(), 1);
2369    }
2370
2371    #[test]
2372    fn has_pending_with_macrotask_only() {
2373        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2374        sched.enqueue_event("e".to_string(), serde_json::json!({}));
2375        assert!(sched.has_pending());
2376        assert_eq!(sched.macrotask_count(), 1);
2377        assert_eq!(sched.timer_count(), 0);
2378    }
2379
2380    #[test]
2381    fn has_pending_ignores_cancelled_timers_without_macrotasks() {
2382        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2383        let timer = sched.set_timeout(10_000);
2384        assert!(sched.clear_timeout(timer));
2385        assert!(!sched.has_pending());
2386        assert_eq!(sched.timer_count(), 0);
2387    }
2388
2389    #[test]
2390    fn timer_count_ignores_cancelled_timers_before_they_are_reaped() {
2391        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2392        let live = sched.set_timeout(50);
2393        let cancelled = sched.set_timeout(100);
2394
2395        assert!(sched.clear_timeout(cancelled));
2396        assert_eq!(sched.timer_count(), 1);
2397        assert_eq!(sched.next_timer_deadline(), Some(50));
2398
2399        sched.clock.advance(60);
2400        let task = sched.tick().expect("live timer should fire");
2401        match task.kind {
2402            MacrotaskKind::TimerFired { timer_id } => assert_eq!(timer_id, live),
2403            other => unreachable!("Expected live timer, got {other:?}"),
2404        }
2405        assert_eq!(sched.timer_count(), 0);
2406    }
2407
2408    // ── WallClock ────────────────────────────────────────────────────
2409
2410    #[test]
2411    fn wall_clock_returns_positive_ms() {
2412        let clock = WallClock;
2413        let now = clock.now_ms();
2414        assert!(now > 0, "WallClock should return a positive timestamp");
2415    }
2416
2417    // ── clear_timeout edge cases ─────────────────────────────────────
2418
2419    #[test]
2420    fn clear_timeout_nonexistent_returns_false() {
2421        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2422        assert!(!sched.clear_timeout(999));
2423        assert!(
2424            sched.cancelled_timers.is_empty(),
2425            "unknown timer ids should not pollute cancelled set"
2426        );
2427    }
2428
2429    #[test]
2430    fn clear_timeout_double_cancel_returns_false() {
2431        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2432        let t = sched.set_timeout(100);
2433        assert!(sched.clear_timeout(t));
2434        // Second cancel - already in set
2435        assert!(!sched.clear_timeout(t));
2436    }
2437
2438    // ── time_until_next_timer ────────────────────────────────────────
2439
2440    #[test]
2441    fn time_until_next_timer_none_when_empty() {
2442        let sched = Scheduler::with_clock(DeterministicClock::new(0));
2443        assert!(sched.time_until_next_timer().is_none());
2444    }
2445
2446    #[test]
2447    fn time_until_next_timer_saturates_at_zero() {
2448        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2449        sched.set_timeout(50);
2450        sched.clock.advance(100); // Past the deadline
2451        assert_eq!(sched.time_until_next_timer(), Some(0));
2452    }
2453
2454    // ── HostcallOutcome::Error path ──────────────────────────────────
2455
2456    #[test]
2457    fn hostcall_error_outcome() {
2458        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2459        sched.enqueue_hostcall_complete(
2460            "err-call".to_string(),
2461            HostcallOutcome::Error {
2462                code: "E_TIMEOUT".to_string(),
2463                message: "Request timed out".to_string(),
2464            },
2465        );
2466
2467        let task = sched.tick().unwrap();
2468        match task.kind {
2469            MacrotaskKind::HostcallComplete { call_id, outcome } => {
2470                assert_eq!(call_id, "err-call");
2471                match outcome {
2472                    HostcallOutcome::Error { code, message } => {
2473                        assert_eq!(code, "E_TIMEOUT");
2474                        assert_eq!(message, "Request timed out");
2475                    }
2476                    other => unreachable!("Expected error, got {other:?}"),
2477                }
2478            }
2479            other => unreachable!("Expected HostcallComplete, got {other:?}"),
2480        }
2481    }
2482
2483    // ── timer_count decreases after tick ─────────────────────────────
2484
2485    #[test]
2486    fn timer_count_decreases_after_fire() {
2487        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2488        sched.set_timeout(50);
2489        sched.set_timeout(100);
2490        assert_eq!(sched.timer_count(), 2);
2491
2492        sched.clock.advance(75);
2493        let _task = sched.tick(); // Fires first timer
2494        assert_eq!(sched.timer_count(), 1);
2495    }
2496
2497    // ── empty tick returns None ──────────────────────────────────────
2498
2499    #[test]
2500    fn empty_scheduler_tick_returns_none() {
2501        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
2502        assert!(sched.tick().is_none());
2503    }
2504
2505    // ── default constructor ──────────────────────────────────────────
2506
2507    #[test]
2508    fn default_scheduler_starts_with_seq_zero() {
2509        let sched = Scheduler::new();
2510        assert_eq!(sched.current_seq(), Seq::zero());
2511    }
2512
2513    // ── Arc<Clock> impl ──────────────────────────────────────────────
2514
2515    #[test]
2516    fn arc_clock_delegation() {
2517        let clock = Arc::new(DeterministicClock::new(42));
2518        assert_eq!(Clock::now_ms(&clock), 42);
2519        clock.advance(10);
2520        assert_eq!(Clock::now_ms(&clock), 52);
2521    }
2522
2523    // ── TimerEntry equality ──────────────────────────────────────────
2524
2525    #[test]
2526    fn timer_entry_equality_ignores_timer_id() {
2527        let a = TimerEntry::new(1, 100, Seq(5));
2528        let b = TimerEntry::new(2, 100, Seq(5));
2529        // PartialEq compares (deadline_ms, seq), not timer_id
2530        assert_eq!(a, b);
2531    }
2532
2533    // ── Macrotask PartialEq uses seq only ────────────────────────────
2534
2535    #[test]
2536    fn macrotask_equality_uses_seq_only() {
2537        let a = Macrotask::new(Seq(1), MacrotaskKind::TimerFired { timer_id: 1 });
2538        let b = Macrotask::new(Seq(1), MacrotaskKind::TimerFired { timer_id: 2 });
2539        assert_eq!(a, b); // Same seq → equal
2540    }
2541
2542    // ── bd-2tl1.5: Streaming concurrency + determinism ──────────────
2543
2544    #[test]
2545    fn scheduler_ten_concurrent_streams_complete_independently() {
2546        let clock = DeterministicClock::new(0);
2547        let mut sched = Scheduler::with_clock(clock);
2548        let n_streams: usize = 10;
2549        let chunks_per_stream: usize = 5;
2550
2551        // Enqueue N streams with M chunks each, interleaved round-robin.
2552        for chunk_idx in 0..chunks_per_stream {
2553            for stream_idx in 0..n_streams {
2554                let is_final = chunk_idx == chunks_per_stream - 1;
2555                sched.enqueue_stream_chunk(
2556                    format!("stream-{stream_idx}"),
2557                    chunk_idx as u64,
2558                    serde_json::json!({ "s": stream_idx, "c": chunk_idx }),
2559                    is_final,
2560                );
2561            }
2562        }
2563
2564        let mut per_stream: std::collections::HashMap<String, Vec<(u64, bool)>> =
2565            std::collections::HashMap::new();
2566        while let Some(task) = sched.tick() {
2567            let MacrotaskKind::HostcallComplete { call_id, outcome } = task.kind else {
2568                unreachable!("expected hostcall completion");
2569            };
2570            let HostcallOutcome::StreamChunk {
2571                sequence, is_final, ..
2572            } = outcome
2573            else {
2574                unreachable!("expected stream chunk");
2575            };
2576            per_stream
2577                .entry(call_id)
2578                .or_default()
2579                .push((sequence, is_final));
2580        }
2581
2582        assert_eq!(per_stream.len(), n_streams);
2583        for (call_id, chunks) in &per_stream {
2584            assert_eq!(
2585                chunks.len(),
2586                chunks_per_stream,
2587                "stream {call_id} incomplete"
2588            );
2589            // Sequences are monotonically increasing per stream.
2590            for (i, (seq, _)) in chunks.iter().enumerate() {
2591                assert_eq!(*seq, i as u64, "stream {call_id}: non-monotonic at {i}");
2592            }
2593            // Exactly one final chunk (the last).
2594            let final_count = chunks.iter().filter(|(_, f)| *f).count();
2595            assert_eq!(
2596                final_count, 1,
2597                "stream {call_id}: expected exactly one final"
2598            );
2599            assert!(
2600                chunks.last().unwrap().1,
2601                "stream {call_id}: final must be last"
2602            );
2603        }
2604    }
2605
2606    #[test]
2607    fn scheduler_mixed_stream_nonstream_ordering() {
2608        let clock = DeterministicClock::new(0);
2609        let mut sched = Scheduler::with_clock(clock);
2610
2611        // Enqueue: event, stream chunk, success, stream final, event.
2612        sched.enqueue_event("evt-1".to_string(), serde_json::json!({"n": 1}));
2613        sched.enqueue_stream_chunk("stream-x".to_string(), 0, serde_json::json!("data"), false);
2614        sched.enqueue_hostcall_complete(
2615            "call-y".to_string(),
2616            HostcallOutcome::Success(serde_json::json!({"ok": true})),
2617        );
2618        sched.enqueue_stream_chunk("stream-x".to_string(), 1, serde_json::json!("end"), true);
2619        sched.enqueue_event("evt-2".to_string(), serde_json::json!({"n": 2}));
2620
2621        let mut trace = Vec::new();
2622        while let Some(task) = sched.tick() {
2623            trace.push(trace_entry(&task));
2624        }
2625
2626        // FIFO ordering: all 5 items in enqueue order.
2627        assert_eq!(trace.len(), 5);
2628        assert!(trace[0].contains("event:evt-1"));
2629        assert!(trace[1].contains("stream-x") && trace[1].contains("chunk"));
2630        assert!(trace[2].contains("call-y") && trace[2].contains("ok"));
2631        assert!(trace[3].contains("stream-x") && trace[3].contains("stream_final"));
2632        assert!(trace[4].contains("event:evt-2"));
2633    }
2634
2635    #[test]
2636    fn scheduler_concurrent_streams_deterministic_across_runs() {
2637        fn run_ten_streams() -> Vec<String> {
2638            let clock = DeterministicClock::new(0);
2639            let mut sched = Scheduler::with_clock(clock);
2640
2641            for chunk in 0..3_u64 {
2642                for stream in 0..10 {
2643                    sched.enqueue_stream_chunk(
2644                        format!("s{stream}"),
2645                        chunk,
2646                        serde_json::json!(chunk),
2647                        chunk == 2,
2648                    );
2649                }
2650            }
2651
2652            let mut trace = Vec::new();
2653            while let Some(task) = sched.tick() {
2654                trace.push(trace_entry(&task));
2655            }
2656            trace
2657        }
2658
2659        let a = run_ten_streams();
2660        let b = run_ten_streams();
2661        assert_eq!(a, b, "10-stream trace must be deterministic");
2662        assert_eq!(a.len(), 30, "expected 10 streams x 3 chunks = 30 entries");
2663    }
2664
2665    #[test]
2666    fn scheduler_stream_interleaved_with_timers() {
2667        let clock = DeterministicClock::new(0);
2668        let mut sched = Scheduler::with_clock(clock);
2669
2670        // Set a timer for 100ms.
2671        let _t = sched.set_timeout(100);
2672
2673        // Enqueue first stream chunk.
2674        sched.enqueue_stream_chunk("s1".to_string(), 0, serde_json::json!("a"), false);
2675
2676        // Advance clock past timer deadline.
2677        sched.clock.advance(150);
2678
2679        // Enqueue final stream chunk after timer.
2680        sched.enqueue_stream_chunk("s1".to_string(), 1, serde_json::json!("b"), true); // seq=3
2681
2682        let mut trace = Vec::new();
2683        while let Some(task) = sched.tick() {
2684            trace.push(trace_entry(&task));
2685        }
2686
2687        // Pending macrotasks run first; due timers are enqueued after existing work.
2688        assert_eq!(trace.len(), 3);
2689        assert!(
2690            trace[0].contains("s1") && trace[0].contains("chunk"),
2691            "first: stream chunk 0, got: {}",
2692            trace[0]
2693        );
2694        assert!(
2695            trace[1].contains("s1") && trace[1].contains("stream_final"),
2696            "second: stream final, got: {}",
2697            trace[1]
2698        );
2699        assert!(
2700            trace[2].contains("timer"),
2701            "third: timer, got: {}",
2702            trace[2]
2703        );
2704    }
2705
2706    #[test]
2707    fn scheduler_due_timers_do_not_preempt_queued_macrotasks() {
2708        let clock = DeterministicClock::new(0);
2709        let mut sched = Scheduler::with_clock(clock);
2710
2711        // 1. Set a timer T1. Deadline = 100ms.
2712        let t1_id = sched.set_timeout(100);
2713
2714        // 2. Enqueue an event E1 before timer delivery.
2715        sched.enqueue_event("E1".to_string(), serde_json::json!({}));
2716
2717        // 3. Advance time so T1 is due.
2718        sched.clock.advance(100);
2719
2720        // 4. Tick 1: queued event executes first.
2721        let task1 = sched.tick().expect("Should have a task");
2722
2723        // 5. Tick 2: timer executes next.
2724        let task2 = sched.tick().expect("Should have a task");
2725
2726        let seq1 = task1.seq.value();
2727        let seq2 = task2.seq.value();
2728
2729        // Global macrotask seq is monotone for externally observed execution.
2730        assert!(
2731            seq1 < seq2,
2732            "Invariant I5 violation: Task execution not ordered by seq. Executed {seq1} then {seq2}"
2733        );
2734
2735        if let MacrotaskKind::InboundEvent { event_id, .. } = task1.kind {
2736            assert_eq!(event_id, "E1");
2737        } else {
2738            unreachable!();
2739        }
2740
2741        if let MacrotaskKind::TimerFired { timer_id } = task2.kind {
2742            assert_eq!(timer_id, t1_id);
2743        } else {
2744            unreachable!();
2745        }
2746    }
2747
2748    #[test]
2749    fn reactor_mesh_hash_routing_is_stable_for_call_id() {
2750        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2751            shard_count: 8,
2752            lane_capacity: 64,
2753            topology: None,
2754        });
2755
2756        let first = mesh
2757            .enqueue_hostcall_complete(
2758                "call-affinity".to_string(),
2759                HostcallOutcome::Success(serde_json::json!({})),
2760            )
2761            .expect("first enqueue");
2762        let second = mesh
2763            .enqueue_hostcall_complete(
2764                "call-affinity".to_string(),
2765                HostcallOutcome::Success(serde_json::json!({})),
2766            )
2767            .expect("second enqueue");
2768
2769        assert_eq!(
2770            first.shard_id, second.shard_id,
2771            "call_id hash routing must preserve shard affinity"
2772        );
2773        assert_eq!(first.shard_seq + 1, second.shard_seq);
2774    }
2775
2776    #[test]
2777    fn reactor_mesh_round_robin_event_distribution_is_deterministic() {
2778        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2779            shard_count: 3,
2780            lane_capacity: 64,
2781            topology: None,
2782        });
2783
2784        let mut routed = Vec::new();
2785        for idx in 0..6 {
2786            let envelope = mesh
2787                .enqueue_event(format!("evt-{idx}"), serde_json::json!({"i": idx}))
2788                .expect("enqueue event");
2789            routed.push(envelope.shard_id);
2790        }
2791
2792        assert_eq!(routed, vec![0, 1, 2, 0, 1, 2]);
2793    }
2794
2795    #[test]
2796    fn reactor_mesh_drain_global_order_preserves_monotone_seq() {
2797        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2798            shard_count: 4,
2799            lane_capacity: 64,
2800            topology: None,
2801        });
2802
2803        let mut expected = Vec::new();
2804        expected.push(
2805            mesh.enqueue_event("evt-1".to_string(), serde_json::json!({"v": 1}))
2806                .expect("event 1")
2807                .global_seq
2808                .value(),
2809        );
2810        expected.push(
2811            mesh.enqueue_hostcall_complete(
2812                "call-a".to_string(),
2813                HostcallOutcome::Success(serde_json::json!({"ok": true})),
2814            )
2815            .expect("call-a")
2816            .global_seq
2817            .value(),
2818        );
2819        expected.push(
2820            mesh.enqueue_event("evt-2".to_string(), serde_json::json!({"v": 2}))
2821                .expect("event 2")
2822                .global_seq
2823                .value(),
2824        );
2825        expected.push(
2826            mesh.enqueue_hostcall_complete(
2827                "call-b".to_string(),
2828                HostcallOutcome::Error {
2829                    code: "E_TEST".to_string(),
2830                    message: "boom".to_string(),
2831                },
2832            )
2833            .expect("call-b")
2834            .global_seq
2835            .value(),
2836        );
2837
2838        let drained = mesh.drain_global_order(16);
2839        let observed = drained
2840            .iter()
2841            .map(|entry| entry.global_seq.value())
2842            .collect::<Vec<_>>();
2843        assert_eq!(observed, expected);
2844    }
2845
2846    #[test]
2847    fn reactor_mesh_backpressure_tracks_rejected_enqueues() {
2848        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2849            shard_count: 1,
2850            lane_capacity: 2,
2851            topology: None,
2852        });
2853
2854        mesh.enqueue_event("evt-0".to_string(), serde_json::json!({}))
2855            .expect("enqueue evt-0");
2856        mesh.enqueue_event("evt-1".to_string(), serde_json::json!({}))
2857            .expect("enqueue evt-1");
2858
2859        let err = mesh
2860            .enqueue_event("evt-overflow".to_string(), serde_json::json!({}))
2861            .expect_err("third enqueue should overflow");
2862        assert_eq!(err.shard_id, 0);
2863        assert_eq!(err.capacity, 2);
2864        assert_eq!(err.depth, 2);
2865
2866        let telemetry = mesh.telemetry();
2867        assert_eq!(telemetry.rejected_enqueues, 1);
2868        assert_eq!(telemetry.max_queue_depths, vec![2]);
2869        assert_eq!(telemetry.queue_depths, vec![2]);
2870    }
2871
2872    #[test]
2873    fn reactor_placement_manifest_is_deterministic_across_runs() {
2874        let topology =
2875            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
2876        let first = ReactorPlacementManifest::plan(8, Some(&topology));
2877        let second = ReactorPlacementManifest::plan(8, Some(&topology));
2878        assert_eq!(first, second);
2879        assert_eq!(first.fallback_reason, None);
2880    }
2881
2882    #[test]
2883    fn reactor_topology_snapshot_normalizes_unsorted_duplicate_pairs() {
2884        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[
2885            (7, 5),
2886            (2, 1),
2887            (4, 2),
2888            (2, 1),
2889            (1, 1),
2890            (4, 2),
2891        ]);
2892        let normalized = topology
2893            .cores
2894            .iter()
2895            .map(|core| (core.core_id, core.numa_node))
2896            .collect::<Vec<_>>();
2897        assert_eq!(normalized, vec![(1, 1), (2, 1), (4, 2), (7, 5)]);
2898    }
2899
2900    #[test]
2901    fn reactor_placement_manifest_non_contiguous_numa_ids_is_stable() {
2902        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[
2903            (11, 42),
2904            (7, 5),
2905            (9, 42),
2906            (3, 5),
2907            (11, 42),
2908            (3, 5),
2909        ]);
2910        let first = ReactorPlacementManifest::plan(8, Some(&topology));
2911        let second = ReactorPlacementManifest::plan(8, Some(&topology));
2912        assert_eq!(first, second);
2913        assert_eq!(first.numa_node_count, 2);
2914        assert_eq!(first.fallback_reason, None);
2915
2916        let observed_nodes = first
2917            .bindings
2918            .iter()
2919            .map(|binding| binding.numa_node)
2920            .collect::<Vec<_>>();
2921        assert_eq!(observed_nodes, vec![5, 42, 5, 42, 5, 42, 5, 42]);
2922
2923        let observed_cores = first
2924            .bindings
2925            .iter()
2926            .map(|binding| binding.core_id)
2927            .collect::<Vec<_>>();
2928        assert_eq!(observed_cores, vec![3, 9, 7, 11, 3, 9, 7, 11]);
2929    }
2930
2931    #[test]
2932    fn reactor_placement_manifest_spreads_across_numa_nodes_round_robin() {
2933        let topology =
2934            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
2935        let manifest = ReactorPlacementManifest::plan(6, Some(&topology));
2936        let observed_nodes = manifest
2937            .bindings
2938            .iter()
2939            .map(|binding| binding.numa_node)
2940            .collect::<Vec<_>>();
2941        assert_eq!(observed_nodes, vec![0, 1, 0, 1, 0, 1]);
2942
2943        let observed_cores = manifest
2944            .bindings
2945            .iter()
2946            .map(|binding| binding.core_id)
2947            .collect::<Vec<_>>();
2948        assert_eq!(observed_cores, vec![0, 4, 1, 5, 0, 4]);
2949    }
2950
2951    #[test]
2952    fn reactor_placement_manifest_records_fallback_when_topology_missing() {
2953        let manifest = ReactorPlacementManifest::plan(3, None);
2954        assert_eq!(
2955            manifest.fallback_reason,
2956            Some(ReactorPlacementFallbackReason::TopologyUnavailable)
2957        );
2958        assert_eq!(manifest.numa_node_count, 1);
2959        assert_eq!(manifest.bindings.len(), 3);
2960        assert_eq!(manifest.bindings[0].core_id, 0);
2961        assert_eq!(manifest.bindings[2].core_id, 2);
2962    }
2963
2964    #[test]
2965    fn reactor_mesh_exposes_machine_readable_placement_manifest() {
2966        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(2, 0), (3, 0)]);
2967        let mesh = ReactorMesh::new(ReactorMeshConfig {
2968            shard_count: 3,
2969            lane_capacity: 8,
2970            topology: Some(topology),
2971        });
2972        let manifest = mesh.placement_manifest();
2973        let as_json = manifest.as_json();
2974        assert_eq!(as_json["shard_count"], serde_json::json!(3));
2975        assert_eq!(as_json["numa_node_count"], serde_json::json!(1));
2976        assert_eq!(
2977            as_json["fallback_reason"],
2978            serde_json::json!(Some("single_numa_node"))
2979        );
2980        assert_eq!(
2981            as_json["bindings"].as_array().map(std::vec::Vec::len),
2982            Some(3),
2983            "expected per-shard binding rows"
2984        );
2985    }
2986
2987    #[test]
2988    fn reactor_mesh_telemetry_includes_binding_and_fallback_metadata() {
2989        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(10, 0), (11, 0)]);
2990        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
2991            shard_count: 2,
2992            lane_capacity: 4,
2993            topology: Some(topology),
2994        });
2995        mesh.enqueue_event("evt-0".to_string(), serde_json::json!({}))
2996            .expect("enqueue event");
2997
2998        let telemetry = mesh.telemetry();
2999        assert_eq!(
3000            telemetry.fallback_reason,
3001            Some(ReactorPlacementFallbackReason::SingleNumaNode)
3002        );
3003        assert_eq!(telemetry.shard_bindings.len(), 2);
3004        let telemetry_json = telemetry.as_json();
3005        assert_eq!(
3006            telemetry_json["fallback_reason"],
3007            serde_json::json!(Some("single_numa_node"))
3008        );
3009        assert_eq!(
3010            telemetry_json["shard_bindings"]
3011                .as_array()
3012                .map(std::vec::Vec::len),
3013            Some(2)
3014        );
3015    }
3016
3017    // ====================================================================
3018    // NUMA slab allocator tests
3019    // ====================================================================
3020
3021    #[test]
3022    fn numa_slab_alloc_dealloc_round_trip() {
3023        let mut slab = NumaSlab::new(0, 4);
3024        let handle = slab.allocate().expect("should allocate");
3025        assert_eq!(handle.node_id, 0);
3026        assert_eq!(handle.generation, 1);
3027        assert!(slab.deallocate(&handle));
3028        assert_eq!(slab.in_use(), 0);
3029    }
3030
3031    #[test]
3032    fn numa_slab_exhaustion_returns_none() {
3033        let mut slab = NumaSlab::new(0, 2);
3034        let _a = slab.allocate().expect("first alloc");
3035        let _b = slab.allocate().expect("second alloc");
3036        assert!(slab.allocate().is_none(), "slab should be exhausted");
3037    }
3038
3039    #[test]
3040    fn numa_slab_generation_prevents_stale_dealloc() {
3041        let mut slab = NumaSlab::new(0, 2);
3042        let handle_v1 = slab.allocate().expect("first alloc");
3043        assert!(slab.deallocate(&handle_v1));
3044        let _handle_v2 = slab.allocate().expect("reuse slot");
3045        // The old handle has generation 1, the new allocation has generation 2.
3046        assert!(
3047            !slab.deallocate(&handle_v1),
3048            "stale generation should reject dealloc"
3049        );
3050    }
3051
3052    #[test]
3053    fn numa_slab_double_free_is_rejected() {
3054        let mut slab = NumaSlab::new(0, 4);
3055        let handle = slab.allocate().expect("alloc");
3056        assert!(slab.deallocate(&handle));
3057        assert!(!slab.deallocate(&handle), "double free must be rejected");
3058    }
3059
3060    #[test]
3061    fn numa_slab_wrong_node_dealloc_rejected() {
3062        let mut slab = NumaSlab::new(0, 4);
3063        let handle = slab.allocate().expect("alloc");
3064        let wrong_handle = NumaSlabHandle {
3065            node_id: 99,
3066            ..handle
3067        };
3068        assert!(
3069            !slab.deallocate(&wrong_handle),
3070            "wrong node_id should reject dealloc"
3071        );
3072    }
3073
3074    #[test]
3075    fn numa_slab_high_water_mark_tracks_peak() {
3076        let mut slab = NumaSlab::new(0, 8);
3077        let a = slab.allocate().expect("a");
3078        let b = slab.allocate().expect("b");
3079        let c = slab.allocate().expect("c");
3080        assert_eq!(slab.high_water_mark, 3);
3081        slab.deallocate(&a);
3082        slab.deallocate(&b);
3083        assert_eq!(
3084            slab.high_water_mark, 3,
3085            "high water mark should not decrease"
3086        );
3087        slab.deallocate(&c);
3088        let _d = slab.allocate().expect("d");
3089        assert_eq!(slab.high_water_mark, 3);
3090    }
3091
3092    #[test]
3093    fn numa_slab_pool_routes_to_local_node() {
3094        let topology =
3095            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
3096        let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3097        let config = NumaSlabConfig {
3098            slab_capacity: 8,
3099            entry_size_bytes: 256,
3100            hugepage: HugepageConfig {
3101                enabled: false,
3102                ..HugepageConfig::default()
3103            },
3104        };
3105        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3106        assert_eq!(pool.node_count(), 2);
3107
3108        let (handle, reason) = pool.allocate(1).expect("allocate on node 1");
3109        assert_eq!(handle.node_id, 1);
3110        assert!(reason.is_none(), "should be local allocation");
3111    }
3112
3113    #[test]
3114    fn numa_slab_pool_cross_node_fallback_tracks_telemetry() {
3115        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (2, 1)]);
3116        let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3117        let config = NumaSlabConfig {
3118            slab_capacity: 1, // Only 1 slot per node
3119            entry_size_bytes: 64,
3120            hugepage: HugepageConfig {
3121                enabled: false,
3122                ..HugepageConfig::default()
3123            },
3124        };
3125        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3126
3127        // Fill node 0's single slot.
3128        let (h0, _) = pool.allocate(0).expect("fill node 0");
3129        assert_eq!(h0.node_id, 0);
3130
3131        // Next alloc for node 0 should fall back to node 1.
3132        let (h1, reason) = pool.allocate(0).expect("fallback to node 1");
3133        assert_eq!(h1.node_id, 1);
3134        assert_eq!(reason, Some(CrossNodeReason::LocalExhausted));
3135
3136        let telemetry = pool.telemetry();
3137        assert_eq!(telemetry.cross_node_allocs, 1);
3138        let json = telemetry.as_json();
3139        assert_eq!(json["total_allocs"], serde_json::json!(2));
3140        assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(0));
3141        assert_eq!(json["local_allocs"], serde_json::json!(1));
3142        assert_eq!(json["remote_allocs"], serde_json::json!(1));
3143        assert_eq!(
3144            json["allocation_ratio_bps"]["local"],
3145            serde_json::json!(5000)
3146        );
3147        assert_eq!(
3148            json["allocation_ratio_bps"]["remote"],
3149            serde_json::json!(5000)
3150        );
3151        assert_eq!(
3152            json["allocation_ratio_bps"]["scale"],
3153            serde_json::json!(10_000)
3154        );
3155        assert_eq!(json["hugepage_hit_rate_bps"]["value"], serde_json::json!(0));
3156        assert_eq!(
3157            json["latency_proxies_bps"]["tlb_miss_pressure"],
3158            serde_json::json!(5000)
3159        );
3160        assert_eq!(
3161            json["latency_proxies_bps"]["cache_miss_pressure"],
3162            serde_json::json!(10_000)
3163        );
3164        assert_eq!(
3165            json["latency_proxies_bps"]["occupancy_pressure"],
3166            serde_json::json!(10_000)
3167        );
3168        assert_eq!(
3169            json["pressure_bands"]["tlb_miss"],
3170            serde_json::json!("medium")
3171        );
3172        assert_eq!(
3173            json["pressure_bands"]["cache_miss"],
3174            serde_json::json!("high")
3175        );
3176        assert_eq!(
3177            json["pressure_bands"]["occupancy"],
3178            serde_json::json!("high")
3179        );
3180        assert_eq!(
3181            json["fallback_reasons"]["cross_node"],
3182            serde_json::json!("local_exhausted")
3183        );
3184        assert_eq!(
3185            json["fallback_reasons"]["hugepage"],
3186            serde_json::json!("hugepage_disabled")
3187        );
3188    }
3189
3190    #[test]
3191    fn numa_slab_pool_total_exhaustion_returns_none() {
3192        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3193        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3194        let config = NumaSlabConfig {
3195            slab_capacity: 1,
3196            entry_size_bytes: 64,
3197            hugepage: HugepageConfig {
3198                enabled: false,
3199                ..HugepageConfig::default()
3200            },
3201        };
3202        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3203        let _ = pool.allocate(0).expect("fill the only slot");
3204        assert!(pool.allocate(0).is_none(), "pool should be exhausted");
3205    }
3206
3207    #[test]
3208    fn numa_slab_pool_deallocate_round_trip() {
3209        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1)]);
3210        let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3211        let config = NumaSlabConfig::default();
3212        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3213
3214        let (handle, _) = pool.allocate(1).expect("alloc");
3215        assert!(pool.deallocate(&handle));
3216        assert!(!pool.deallocate(&handle), "double free must be rejected");
3217    }
3218
3219    #[test]
3220    fn hugepage_status_disabled_reports_fallback() {
3221        let config = HugepageConfig {
3222            enabled: false,
3223            ..HugepageConfig::default()
3224        };
3225        let status = HugepageStatus::evaluate(&config, 1024, 512);
3226        assert!(!status.active);
3227        assert_eq!(
3228            status.fallback_reason,
3229            Some(HugepageFallbackReason::Disabled)
3230        );
3231    }
3232
3233    #[test]
3234    fn hugepage_status_zero_totals_means_unavailable() {
3235        let config = HugepageConfig::default();
3236        let status = HugepageStatus::evaluate(&config, 0, 0);
3237        assert!(!status.active);
3238        assert_eq!(
3239            status.fallback_reason,
3240            Some(HugepageFallbackReason::DetectionUnavailable)
3241        );
3242    }
3243
3244    #[test]
3245    fn hugepage_status_zero_free_means_insufficient() {
3246        let config = HugepageConfig::default();
3247        let status = HugepageStatus::evaluate(&config, 1024, 0);
3248        assert!(!status.active);
3249        assert_eq!(
3250            status.fallback_reason,
3251            Some(HugepageFallbackReason::InsufficientHugepages)
3252        );
3253    }
3254
3255    #[test]
3256    fn hugepage_status_available_is_active() {
3257        let config = HugepageConfig::default();
3258        let status = HugepageStatus::evaluate(&config, 1024, 512);
3259        assert!(status.active);
3260        assert!(status.fallback_reason.is_none());
3261        assert_eq!(status.free_pages, 512);
3262    }
3263
3264    #[test]
3265    fn numa_slab_pool_tracks_hugepage_hit_rate_when_active() {
3266        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3267        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3268        let config = NumaSlabConfig {
3269            slab_capacity: 4,
3270            entry_size_bytes: 1024,
3271            hugepage: HugepageConfig {
3272                page_size_bytes: 4096,
3273                enabled: true,
3274            },
3275        };
3276        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3277        pool.set_hugepage_status(HugepageStatus {
3278            total_pages: 128,
3279            free_pages: 64,
3280            page_size_bytes: 4096,
3281            active: true,
3282            fallback_reason: None,
3283        });
3284
3285        let _ = pool.allocate(0).expect("first hugepage-backed alloc");
3286        let _ = pool.allocate(0).expect("second hugepage-backed alloc");
3287
3288        let telemetry = pool.telemetry();
3289        let json = telemetry.as_json();
3290        assert_eq!(json["total_allocs"], serde_json::json!(2));
3291        assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(2));
3292        assert_eq!(
3293            json["hugepage_hit_rate_bps"]["value"],
3294            serde_json::json!(10_000)
3295        );
3296        assert_eq!(
3297            json["hugepage_hit_rate_bps"]["scale"],
3298            serde_json::json!(10_000)
3299        );
3300        assert_eq!(json["hugepage"]["active"], serde_json::json!(true));
3301    }
3302
3303    #[test]
3304    fn numa_slab_pool_misaligned_hugepage_config_reports_alignment_mismatch() {
3305        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3306        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3307        let config = NumaSlabConfig {
3308            slab_capacity: 3,
3309            entry_size_bytes: 1024,
3310            hugepage: HugepageConfig {
3311                page_size_bytes: 2048,
3312                enabled: true,
3313            },
3314        };
3315
3316        let pool = NumaSlabPool::from_manifest(&manifest, config);
3317        let telemetry = pool.telemetry();
3318        assert!(!telemetry.hugepage_status.active);
3319        assert_eq!(
3320            telemetry.hugepage_status.fallback_reason,
3321            Some(HugepageFallbackReason::AlignmentMismatch)
3322        );
3323
3324        let json = telemetry.as_json();
3325        assert_eq!(
3326            json["hugepage"]["fallback_reason"],
3327            serde_json::json!("alignment_mismatch")
3328        );
3329        assert_eq!(
3330            json["fallback_reasons"]["hugepage"],
3331            serde_json::json!("alignment_mismatch")
3332        );
3333    }
3334
3335    #[test]
3336    fn numa_slab_pool_aligned_hugepage_config_defaults_to_detection_unavailable() {
3337        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3338        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3339        let config = NumaSlabConfig {
3340            slab_capacity: 4,
3341            entry_size_bytes: 1024,
3342            hugepage: HugepageConfig {
3343                page_size_bytes: 4096,
3344                enabled: true,
3345            },
3346        };
3347
3348        let pool = NumaSlabPool::from_manifest(&manifest, config);
3349        let telemetry = pool.telemetry();
3350        assert!(!telemetry.hugepage_status.active);
3351        assert_eq!(
3352            telemetry.hugepage_status.fallback_reason,
3353            Some(HugepageFallbackReason::DetectionUnavailable)
3354        );
3355    }
3356
3357    #[test]
3358    fn misaligned_hugepage_config_rejects_external_status_override() {
3359        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3360        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3361        let config = NumaSlabConfig {
3362            slab_capacity: 3,
3363            entry_size_bytes: 1024,
3364            hugepage: HugepageConfig {
3365                page_size_bytes: 2048,
3366                enabled: true,
3367            },
3368        };
3369        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3370
3371        let forced = HugepageStatus::evaluate(&config.hugepage, 256, 64);
3372        assert!(forced.active);
3373        assert!(forced.fallback_reason.is_none());
3374
3375        pool.set_hugepage_status(forced);
3376        let telemetry = pool.telemetry();
3377        assert!(!telemetry.hugepage_status.active);
3378        assert_eq!(
3379            telemetry.hugepage_status.fallback_reason,
3380            Some(HugepageFallbackReason::AlignmentMismatch)
3381        );
3382    }
3383
3384    #[test]
3385    fn disabled_hugepage_config_rejects_external_active_status_override() {
3386        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3387        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3388        let config = NumaSlabConfig {
3389            slab_capacity: 4,
3390            entry_size_bytes: 1024,
3391            hugepage: HugepageConfig {
3392                page_size_bytes: 4096,
3393                enabled: false,
3394            },
3395        };
3396        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3397
3398        let forced = HugepageStatus {
3399            total_pages: 512,
3400            free_pages: 256,
3401            page_size_bytes: 4096,
3402            active: true,
3403            fallback_reason: None,
3404        };
3405        pool.set_hugepage_status(forced);
3406
3407        let telemetry = pool.telemetry();
3408        assert!(!telemetry.hugepage_status.active);
3409        assert_eq!(
3410            telemetry.hugepage_status.fallback_reason,
3411            Some(HugepageFallbackReason::Disabled)
3412        );
3413        assert_eq!(telemetry.hugepage_status.total_pages, 512);
3414        assert_eq!(telemetry.hugepage_status.free_pages, 256);
3415
3416        let json = telemetry.as_json();
3417        assert_eq!(
3418            json["hugepage"]["fallback_reason"],
3419            serde_json::json!("hugepage_disabled")
3420        );
3421    }
3422
3423    #[test]
3424    fn disabled_hugepage_config_uses_disabled_reason_even_if_slab_is_misaligned() {
3425        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3426        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3427        let config = NumaSlabConfig {
3428            slab_capacity: 3,
3429            entry_size_bytes: 1024,
3430            hugepage: HugepageConfig {
3431                page_size_bytes: 2048,
3432                enabled: false,
3433            },
3434        };
3435
3436        let pool = NumaSlabPool::from_manifest(&manifest, config);
3437        let telemetry = pool.telemetry();
3438        assert!(!telemetry.hugepage_status.active);
3439        assert_eq!(
3440            telemetry.hugepage_status.fallback_reason,
3441            Some(HugepageFallbackReason::Disabled)
3442        );
3443    }
3444
3445    #[test]
3446    fn hugepage_alignment_rejects_zero_page_size_and_fails_closed() {
3447        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3448        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3449        let config = NumaSlabConfig {
3450            slab_capacity: 4,
3451            entry_size_bytes: 1024,
3452            hugepage: HugepageConfig {
3453                page_size_bytes: 0,
3454                enabled: true,
3455            },
3456        };
3457        assert!(!config.hugepage_alignment_ok());
3458
3459        let pool = NumaSlabPool::from_manifest(&manifest, config);
3460        let telemetry = pool.telemetry();
3461        assert!(!telemetry.hugepage_status.active);
3462        assert_eq!(
3463            telemetry.hugepage_status.fallback_reason,
3464            Some(HugepageFallbackReason::AlignmentMismatch)
3465        );
3466    }
3467
3468    #[test]
3469    fn hugepage_alignment_rejects_zero_footprint_and_fails_closed() {
3470        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3471        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3472        let config = NumaSlabConfig {
3473            slab_capacity: 0,
3474            entry_size_bytes: 1024,
3475            hugepage: HugepageConfig {
3476                page_size_bytes: 2048,
3477                enabled: true,
3478            },
3479        };
3480        assert_eq!(config.slab_footprint_bytes(), Some(0));
3481        assert!(!config.hugepage_alignment_ok());
3482
3483        let pool = NumaSlabPool::from_manifest(&manifest, config);
3484        let telemetry = pool.telemetry();
3485        assert!(!telemetry.hugepage_status.active);
3486        assert_eq!(
3487            telemetry.hugepage_status.fallback_reason,
3488            Some(HugepageFallbackReason::AlignmentMismatch)
3489        );
3490    }
3491
3492    #[test]
3493    fn hugepage_alignment_rejects_checked_mul_overflow_without_panicking() {
3494        let config = NumaSlabConfig {
3495            slab_capacity: usize::MAX,
3496            entry_size_bytes: 2,
3497            hugepage: HugepageConfig {
3498                page_size_bytes: 4096,
3499                enabled: true,
3500            },
3501        };
3502        assert!(config.slab_footprint_bytes().is_none());
3503        assert!(!config.hugepage_alignment_ok());
3504
3505        let status = config.alignment_mismatch_status();
3506        assert!(!status.active);
3507        assert_eq!(
3508            status.fallback_reason,
3509            Some(HugepageFallbackReason::AlignmentMismatch)
3510        );
3511    }
3512
3513    #[test]
3514    fn zero_page_size_config_rejects_external_status_override() {
3515        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3516        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3517        let config = NumaSlabConfig {
3518            slab_capacity: 4,
3519            entry_size_bytes: 1024,
3520            hugepage: HugepageConfig {
3521                page_size_bytes: 0,
3522                enabled: true,
3523            },
3524        };
3525        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3526
3527        let forced = HugepageStatus {
3528            total_pages: 128,
3529            free_pages: 64,
3530            page_size_bytes: 0,
3531            active: true,
3532            fallback_reason: None,
3533        };
3534        pool.set_hugepage_status(forced);
3535
3536        let telemetry = pool.telemetry();
3537        assert!(!telemetry.hugepage_status.active);
3538        assert_eq!(
3539            telemetry.hugepage_status.fallback_reason,
3540            Some(HugepageFallbackReason::AlignmentMismatch)
3541        );
3542    }
3543
3544    #[test]
3545    fn zero_footprint_config_rejects_external_status_override() {
3546        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3547        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3548        let config = NumaSlabConfig {
3549            slab_capacity: 0,
3550            entry_size_bytes: 1024,
3551            hugepage: HugepageConfig {
3552                page_size_bytes: 2048,
3553                enabled: true,
3554            },
3555        };
3556        assert_eq!(config.slab_footprint_bytes(), Some(0));
3557
3558        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3559        let forced = HugepageStatus {
3560            total_pages: 128,
3561            free_pages: 64,
3562            page_size_bytes: 2048,
3563            active: true,
3564            fallback_reason: None,
3565        };
3566        pool.set_hugepage_status(forced);
3567
3568        let telemetry = pool.telemetry();
3569        assert!(!telemetry.hugepage_status.active);
3570        assert_eq!(
3571            telemetry.hugepage_status.fallback_reason,
3572            Some(HugepageFallbackReason::AlignmentMismatch)
3573        );
3574        assert_eq!(telemetry.hugepage_status.total_pages, 0);
3575        assert_eq!(telemetry.hugepage_status.free_pages, 0);
3576    }
3577
3578    #[test]
3579    fn checked_mul_overflow_config_rejects_external_status_override() {
3580        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0)]);
3581        let manifest = ReactorPlacementManifest::plan(1, Some(&topology));
3582        let config = NumaSlabConfig {
3583            slab_capacity: 2,
3584            entry_size_bytes: usize::MAX,
3585            hugepage: HugepageConfig {
3586                page_size_bytes: 4096,
3587                enabled: true,
3588            },
3589        };
3590        assert!(config.slab_footprint_bytes().is_none());
3591
3592        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3593        let forced = HugepageStatus {
3594            total_pages: 512,
3595            free_pages: 256,
3596            page_size_bytes: 4096,
3597            active: true,
3598            fallback_reason: None,
3599        };
3600        pool.set_hugepage_status(forced);
3601
3602        let telemetry = pool.telemetry();
3603        assert!(!telemetry.hugepage_status.active);
3604        assert_eq!(
3605            telemetry.hugepage_status.fallback_reason,
3606            Some(HugepageFallbackReason::AlignmentMismatch)
3607        );
3608        assert_eq!(telemetry.hugepage_status.total_pages, 0);
3609        assert_eq!(telemetry.hugepage_status.free_pages, 0);
3610    }
3611
3612    #[test]
3613    fn hugepage_status_json_is_stable() {
3614        let config = HugepageConfig::default();
3615        let status = HugepageStatus::evaluate(&config, 1024, 128);
3616        let json = status.as_json();
3617        assert_eq!(json["total_pages"], serde_json::json!(1024));
3618        assert_eq!(json["free_pages"], serde_json::json!(128));
3619        assert_eq!(json["active"], serde_json::json!(true));
3620        assert!(json["fallback_reason"].is_null());
3621    }
3622
3623    #[test]
3624    fn numa_slab_telemetry_json_has_expected_shape() {
3625        let topology =
3626            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
3627        let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3628        let config = NumaSlabConfig {
3629            slab_capacity: 16,
3630            entry_size_bytes: 128,
3631            hugepage: HugepageConfig {
3632                enabled: false,
3633                ..HugepageConfig::default()
3634            },
3635        };
3636        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
3637        let _ = pool.allocate(0);
3638        let _ = pool.allocate(1);
3639        let _ = pool.allocate(0);
3640
3641        let telemetry = pool.telemetry();
3642        let json = telemetry.as_json();
3643        assert_eq!(json["node_count"], serde_json::json!(2));
3644        assert_eq!(json["total_allocs"], serde_json::json!(3));
3645        assert_eq!(json["total_in_use"], serde_json::json!(3));
3646        assert_eq!(json["cross_node_allocs"], serde_json::json!(0));
3647        assert_eq!(json["hugepage_backed_allocs"], serde_json::json!(0));
3648        assert_eq!(json["local_allocs"], serde_json::json!(3));
3649        assert_eq!(json["remote_allocs"], serde_json::json!(0));
3650        assert_eq!(
3651            json["allocation_ratio_bps"]["local"],
3652            serde_json::json!(10_000)
3653        );
3654        assert_eq!(json["allocation_ratio_bps"]["remote"], serde_json::json!(0));
3655        assert_eq!(
3656            json["allocation_ratio_bps"]["scale"],
3657            serde_json::json!(10_000)
3658        );
3659        assert_eq!(json["hugepage_hit_rate_bps"]["value"], serde_json::json!(0));
3660        assert_eq!(
3661            json["latency_proxies_bps"]["tlb_miss_pressure"],
3662            serde_json::json!(0)
3663        );
3664        assert_eq!(
3665            json["latency_proxies_bps"]["cache_miss_pressure"],
3666            serde_json::json!(937)
3667        );
3668        assert_eq!(
3669            json["latency_proxies_bps"]["occupancy_pressure"],
3670            serde_json::json!(937)
3671        );
3672        assert_eq!(
3673            json["latency_proxies_bps"]["scale"],
3674            serde_json::json!(10_000)
3675        );
3676        assert_eq!(json["pressure_bands"]["tlb_miss"], serde_json::json!("low"));
3677        assert_eq!(
3678            json["pressure_bands"]["cache_miss"],
3679            serde_json::json!("low")
3680        );
3681        assert_eq!(
3682            json["pressure_bands"]["occupancy"],
3683            serde_json::json!("low")
3684        );
3685        assert_eq!(
3686            json["fallback_reasons"]["cross_node"],
3687            serde_json::Value::Null
3688        );
3689        assert_eq!(
3690            json["fallback_reasons"]["hugepage"],
3691            serde_json::json!("hugepage_disabled")
3692        );
3693        assert_eq!(json["config"]["slab_capacity"], serde_json::json!(16));
3694        assert_eq!(json["per_node"].as_array().map(std::vec::Vec::len), Some(2));
3695    }
3696
3697    #[test]
3698    fn thread_affinity_advice_matches_placement_manifest() {
3699        let topology =
3700            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (4, 1), (5, 1)]);
3701        let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3702        let advice = manifest.affinity_advice(AffinityEnforcement::Advisory);
3703        assert_eq!(advice.len(), 4);
3704        assert_eq!(advice[0].shard_id, 0);
3705        assert_eq!(advice[0].recommended_core, 0);
3706        assert_eq!(advice[0].recommended_numa_node, 0);
3707        assert_eq!(advice[0].enforcement, AffinityEnforcement::Advisory);
3708        assert_eq!(advice[1].recommended_numa_node, 1);
3709        assert_eq!(advice[3].recommended_numa_node, 1);
3710    }
3711
3712    #[test]
3713    fn thread_affinity_advice_json_is_stable() {
3714        let advice = ThreadAffinityAdvice {
3715            shard_id: 0,
3716            recommended_core: 3,
3717            recommended_numa_node: 1,
3718            enforcement: AffinityEnforcement::Strict,
3719        };
3720        let json = advice.as_json();
3721        assert_eq!(json["shard_id"], serde_json::json!(0));
3722        assert_eq!(json["recommended_core"], serde_json::json!(3));
3723        assert_eq!(json["enforcement"], serde_json::json!("strict"));
3724    }
3725
3726    #[test]
3727    fn reactor_mesh_preferred_numa_node_uses_manifest() {
3728        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (4, 1), (8, 2)]);
3729        let mesh = ReactorMesh::new(ReactorMeshConfig {
3730            shard_count: 3,
3731            lane_capacity: 8,
3732            topology: Some(topology),
3733        });
3734        assert_eq!(mesh.preferred_numa_node(0), 0);
3735        assert_eq!(mesh.preferred_numa_node(1), 1);
3736        assert_eq!(mesh.preferred_numa_node(2), 2);
3737        assert_eq!(mesh.preferred_numa_node(99), 0); // fallback
3738    }
3739
3740    #[test]
3741    fn reactor_mesh_affinity_advice_covers_all_shards() {
3742        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1)]);
3743        let mesh = ReactorMesh::new(ReactorMeshConfig {
3744            shard_count: 2,
3745            lane_capacity: 8,
3746            topology: Some(topology),
3747        });
3748        let advice = mesh.affinity_advice(AffinityEnforcement::Disabled);
3749        assert_eq!(advice.len(), 2);
3750        assert_eq!(advice[0].enforcement, AffinityEnforcement::Disabled);
3751        assert_eq!(advice[1].enforcement, AffinityEnforcement::Disabled);
3752    }
3753
3754    #[test]
3755    fn numa_slab_pool_from_manifest_with_no_topology_creates_single_node() {
3756        let manifest = ReactorPlacementManifest::plan(4, None);
3757        let pool = NumaSlabPool::from_manifest(&manifest, NumaSlabConfig::default());
3758        assert_eq!(pool.node_count(), 1);
3759    }
3760
3761    #[test]
3762    fn numa_node_for_shard_returns_none_for_unknown() {
3763        let manifest = ReactorPlacementManifest::plan(2, None);
3764        assert!(manifest.numa_node_for_shard(0).is_some());
3765        assert!(manifest.numa_node_for_shard(99).is_none());
3766    }
3767
3768    #[test]
3769    fn numa_slab_capacity_clamp_to_at_least_one() {
3770        let slab = NumaSlab::new(0, 0);
3771        assert_eq!(slab.capacity, 1);
3772    }
3773
3774    #[test]
3775    fn cross_node_reason_code_matches() {
3776        assert_eq!(CrossNodeReason::LocalExhausted.as_code(), "local_exhausted");
3777    }
3778
3779    #[test]
3780    fn affinity_enforcement_code_coverage() {
3781        assert_eq!(AffinityEnforcement::Advisory.as_code(), "advisory");
3782        assert_eq!(AffinityEnforcement::Strict.as_code(), "strict");
3783        assert_eq!(AffinityEnforcement::Disabled.as_code(), "disabled");
3784    }
3785
3786    // ── Additional coverage for untested public APIs ──
3787
3788    #[test]
3789    fn enqueue_hostcall_completions_batch_preserves_order() {
3790        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
3791        let completions = vec![
3792            (
3793                "c-1".to_string(),
3794                HostcallOutcome::Success(serde_json::json!(1)),
3795            ),
3796            (
3797                "c-2".to_string(),
3798                HostcallOutcome::Success(serde_json::json!(2)),
3799            ),
3800            (
3801                "c-3".to_string(),
3802                HostcallOutcome::Success(serde_json::json!(3)),
3803            ),
3804        ];
3805        sched.enqueue_hostcall_completions(completions);
3806        assert_eq!(sched.macrotask_count(), 3);
3807
3808        // Verify FIFO order: c-1, c-2, c-3
3809        for expected in ["c-1", "c-2", "c-3"] {
3810            let task = sched.tick().expect("should have macrotask");
3811            match task.kind {
3812                MacrotaskKind::HostcallComplete { ref call_id, .. } => {
3813                    assert_eq!(call_id, expected);
3814                }
3815                _ => unreachable!(),
3816            }
3817        }
3818        assert!(sched.tick().is_none());
3819    }
3820
3821    #[test]
3822    fn time_until_next_timer_positive_case() {
3823        let mut sched = Scheduler::with_clock(DeterministicClock::new(100));
3824        sched.set_timeout(50); // deadline = 150
3825        assert_eq!(sched.time_until_next_timer(), Some(50));
3826
3827        sched.clock.advance(20); // now = 120, remaining = 30
3828        assert_eq!(sched.time_until_next_timer(), Some(30));
3829    }
3830
3831    #[test]
3832    fn deterministic_clock_set_overrides_current_time() {
3833        let clock = DeterministicClock::new(0);
3834        assert_eq!(clock.now_ms(), 0);
3835        clock.advance(50);
3836        assert_eq!(clock.now_ms(), 50);
3837        clock.set(1000);
3838        assert_eq!(clock.now_ms(), 1000);
3839        clock.advance(5);
3840        assert_eq!(clock.now_ms(), 1005);
3841    }
3842
3843    #[test]
3844    fn reactor_mesh_queue_depth_per_shard() {
3845        let config = ReactorMeshConfig {
3846            shard_count: 4,
3847            lane_capacity: 64,
3848            topology: None,
3849        };
3850        let mut mesh = ReactorMesh::new(config);
3851
3852        // All shards start empty
3853        for shard in 0..4 {
3854            assert_eq!(mesh.queue_depth(shard), Some(0));
3855        }
3856        // Out of range returns None
3857        assert_eq!(mesh.queue_depth(99), None);
3858
3859        // Enqueue events via round-robin (hits shards 0, 1, 2, 3 in order)
3860        for i in 0..4 {
3861            mesh.enqueue_event(format!("evt-{i}"), serde_json::json!(null))
3862                .expect("enqueue should succeed");
3863        }
3864        // Each shard should have exactly 1
3865        for shard in 0..4 {
3866            assert_eq!(mesh.queue_depth(shard), Some(1), "shard {shard} depth");
3867        }
3868    }
3869
3870    #[test]
3871    fn reactor_mesh_shard_count_and_total_depth() {
3872        let config = ReactorMeshConfig {
3873            shard_count: 3,
3874            lane_capacity: 16,
3875            topology: None,
3876        };
3877        let mut mesh = ReactorMesh::new(config);
3878        assert_eq!(mesh.shard_count(), 3);
3879        assert_eq!(mesh.total_depth(), 0);
3880        assert!(!mesh.has_pending());
3881
3882        mesh.enqueue_event("e1".to_string(), serde_json::json!(null))
3883            .unwrap();
3884        mesh.enqueue_event("e2".to_string(), serde_json::json!(null))
3885            .unwrap();
3886        assert_eq!(mesh.total_depth(), 2);
3887        assert!(mesh.has_pending());
3888    }
3889
3890    #[test]
3891    fn reactor_mesh_drain_shard_out_of_range_returns_empty() {
3892        let config = ReactorMeshConfig {
3893            shard_count: 2,
3894            lane_capacity: 16,
3895            topology: None,
3896        };
3897        let mut mesh = ReactorMesh::new(config);
3898        mesh.enqueue_event("e1".to_string(), serde_json::json!(null))
3899            .unwrap();
3900        let drained = mesh.drain_shard(99, 10);
3901        assert!(drained.is_empty());
3902    }
3903
3904    #[test]
3905    fn reactor_mesh_zero_shards_is_empty_and_rejects_enqueues() {
3906        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
3907            shard_count: 0,
3908            lane_capacity: 16,
3909            topology: None,
3910        });
3911
3912        assert_eq!(mesh.shard_count(), 0);
3913        assert_eq!(mesh.total_depth(), 0);
3914        assert!(!mesh.has_pending());
3915        assert_eq!(mesh.queue_depth(0), None);
3916        assert!(mesh.telemetry().queue_depths.is_empty());
3917
3918        let err = mesh
3919            .enqueue_event("evt".to_string(), serde_json::json!(null))
3920            .expect_err("empty mesh should reject enqueues");
3921        assert_eq!(err.shard_id, 0);
3922        assert_eq!(err.depth, 0);
3923        assert_eq!(err.capacity, 0);
3924        assert_eq!(mesh.telemetry().rejected_enqueues, 1);
3925    }
3926
3927    #[test]
3928    fn reactor_mesh_zero_capacity_is_empty_and_rejects_enqueues() {
3929        let mut mesh = ReactorMesh::new(ReactorMeshConfig {
3930            shard_count: 4,
3931            lane_capacity: 0,
3932            topology: None,
3933        });
3934
3935        assert_eq!(mesh.shard_count(), 0);
3936        assert_eq!(mesh.total_depth(), 0);
3937        assert!(!mesh.has_pending());
3938        assert_eq!(mesh.telemetry().queue_depths, Vec::<usize>::new());
3939
3940        let err = mesh
3941            .enqueue_hostcall_complete(
3942                "call".to_string(),
3943                HostcallOutcome::Success(serde_json::json!({"ok": true})),
3944            )
3945            .expect_err("empty mesh should reject hostcall completions");
3946        assert_eq!(err.shard_id, 0);
3947        assert_eq!(err.depth, 0);
3948        assert_eq!(err.capacity, 0);
3949        assert_eq!(mesh.telemetry().rejected_enqueues, 1);
3950    }
3951
3952    #[test]
3953    fn reactor_placement_manifest_zero_shards() {
3954        let manifest = ReactorPlacementManifest::plan(0, None);
3955        assert_eq!(manifest.shard_count, 0);
3956        assert!(manifest.bindings.is_empty());
3957        assert!(manifest.fallback_reason.is_none());
3958    }
3959
3960    #[test]
3961    fn reactor_placement_manifest_as_json_has_expected_fields() {
3962        let topology =
3963            ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0), (2, 1), (3, 1)]);
3964        let manifest = ReactorPlacementManifest::plan(4, Some(&topology));
3965        let json = manifest.as_json();
3966
3967        assert_eq!(json["shard_count"], 4);
3968        assert_eq!(json["numa_node_count"], 2);
3969        assert!(json["fallback_reason"].is_null());
3970        let bindings = json["bindings"].as_array().expect("bindings array");
3971        assert_eq!(bindings.len(), 4);
3972        for binding in bindings {
3973            assert!(binding.get("shard_id").is_some());
3974            assert!(binding.get("core_id").is_some());
3975            assert!(binding.get("numa_node").is_some());
3976        }
3977    }
3978
3979    #[test]
3980    fn reactor_placement_manifest_single_node_fallback() {
3981        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 0)]);
3982        let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3983        assert_eq!(
3984            manifest.fallback_reason,
3985            Some(ReactorPlacementFallbackReason::SingleNumaNode)
3986        );
3987    }
3988
3989    #[test]
3990    fn reactor_placement_manifest_empty_topology_fallback() {
3991        let topology = ReactorTopologySnapshot { cores: vec![] };
3992        let manifest = ReactorPlacementManifest::plan(2, Some(&topology));
3993        assert_eq!(
3994            manifest.fallback_reason,
3995            Some(ReactorPlacementFallbackReason::TopologyEmpty)
3996        );
3997    }
3998
3999    #[test]
4000    fn reactor_placement_fallback_reason_as_code_all_variants() {
4001        assert_eq!(
4002            ReactorPlacementFallbackReason::TopologyUnavailable.as_code(),
4003            "topology_unavailable"
4004        );
4005        assert_eq!(
4006            ReactorPlacementFallbackReason::TopologyEmpty.as_code(),
4007            "topology_empty"
4008        );
4009        assert_eq!(
4010            ReactorPlacementFallbackReason::SingleNumaNode.as_code(),
4011            "single_numa_node"
4012        );
4013    }
4014
4015    #[test]
4016    fn hugepage_fallback_reason_as_code_all_variants() {
4017        assert_eq!(
4018            HugepageFallbackReason::Disabled.as_code(),
4019            "hugepage_disabled"
4020        );
4021        assert_eq!(
4022            HugepageFallbackReason::DetectionUnavailable.as_code(),
4023            "detection_unavailable"
4024        );
4025        assert_eq!(
4026            HugepageFallbackReason::InsufficientHugepages.as_code(),
4027            "insufficient_hugepages"
4028        );
4029        assert_eq!(
4030            HugepageFallbackReason::AlignmentMismatch.as_code(),
4031            "alignment_mismatch"
4032        );
4033    }
4034
4035    #[test]
4036    fn numa_slab_pool_set_hugepage_status_and_node_count() {
4037        let manifest = ReactorPlacementManifest::plan(4, None);
4038        let config = NumaSlabConfig {
4039            slab_capacity: 4096,
4040            entry_size_bytes: 512,
4041            hugepage: HugepageConfig {
4042                page_size_bytes: 2 * 1024 * 1024,
4043                enabled: true,
4044            },
4045        };
4046        let mut pool = NumaSlabPool::from_manifest(&manifest, config);
4047        assert_eq!(pool.node_count(), 1);
4048
4049        let status = HugepageStatus::evaluate(&config.hugepage, 512, 256);
4050        assert!(status.active);
4051        pool.set_hugepage_status(status);
4052
4053        let telem = pool.telemetry();
4054        assert!(telem.hugepage_status.active);
4055        assert_eq!(telem.hugepage_status.free_pages, 256);
4056    }
4057
4058    #[test]
4059    fn numa_slab_pool_multi_node_node_count() {
4060        let topology = ReactorTopologySnapshot::from_core_node_pairs(&[(0, 0), (1, 1), (2, 2)]);
4061        let manifest = ReactorPlacementManifest::plan(3, Some(&topology));
4062        let pool = NumaSlabPool::from_manifest(&manifest, NumaSlabConfig::default());
4063        assert_eq!(pool.node_count(), 3);
4064    }
4065
4066    #[test]
4067    fn reactor_mesh_telemetry_as_json_has_expected_shape() {
4068        let config = ReactorMeshConfig {
4069            shard_count: 2,
4070            lane_capacity: 8,
4071            topology: None,
4072        };
4073        let mesh = ReactorMesh::new(config);
4074        let telem = mesh.telemetry();
4075        let json = telem.as_json();
4076
4077        let depths = json["queue_depths"].as_array().expect("queue_depths");
4078        assert_eq!(depths.len(), 2);
4079        assert_eq!(json["rejected_enqueues"], 0);
4080        let bindings = json["shard_bindings"].as_array().expect("shard_bindings");
4081        assert_eq!(bindings.len(), 2);
4082        assert!(json.get("fallback_reason").is_some());
4083    }
4084
4085    #[test]
4086    fn numa_slab_in_use_and_has_capacity() {
4087        let mut slab = NumaSlab::new(0, 3);
4088        assert_eq!(slab.in_use(), 0);
4089        assert!(slab.has_capacity());
4090
4091        let h1 = slab.allocate().expect("alloc 1");
4092        assert_eq!(slab.in_use(), 1);
4093        assert!(slab.has_capacity());
4094
4095        let h2 = slab.allocate().expect("alloc 2");
4096        let _h3 = slab.allocate().expect("alloc 3");
4097        assert_eq!(slab.in_use(), 3);
4098        assert!(!slab.has_capacity());
4099        assert!(slab.allocate().is_none());
4100
4101        slab.deallocate(&h1);
4102        assert_eq!(slab.in_use(), 2);
4103        assert!(slab.has_capacity());
4104
4105        slab.deallocate(&h2);
4106        assert_eq!(slab.in_use(), 1);
4107    }
4108
4109    #[test]
4110    fn scheduler_macrotask_count_tracks_queue_size() {
4111        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4112        assert_eq!(sched.macrotask_count(), 0);
4113
4114        sched.enqueue_event("e1".to_string(), serde_json::json!(null));
4115        sched.enqueue_event("e2".to_string(), serde_json::json!(null));
4116        assert_eq!(sched.macrotask_count(), 2);
4117
4118        sched.tick();
4119        assert_eq!(sched.macrotask_count(), 1);
4120
4121        sched.tick();
4122        assert_eq!(sched.macrotask_count(), 0);
4123    }
4124
4125    #[test]
4126    fn scheduler_timer_count_reflects_pending_timers() {
4127        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4128        assert_eq!(sched.timer_count(), 0);
4129
4130        sched.set_timeout(100);
4131        sched.set_timeout(200);
4132        assert_eq!(sched.timer_count(), 2);
4133
4134        // Advance past first timer and tick to move it
4135        sched.clock.advance(150);
4136        sched.tick();
4137        assert_eq!(sched.timer_count(), 1);
4138    }
4139
4140    #[test]
4141    fn scheduler_current_seq_advances_with_operations() {
4142        let mut sched = Scheduler::with_clock(DeterministicClock::new(0));
4143        let initial = sched.current_seq();
4144        assert_eq!(initial.value(), 0);
4145
4146        sched.set_timeout(100); // uses one seq
4147        assert!(sched.current_seq().value() > initial.value());
4148
4149        let after_timer = sched.current_seq();
4150        sched.enqueue_event("evt".to_string(), serde_json::json!(null)); // uses another seq
4151        assert!(sched.current_seq().value() > after_timer.value());
4152    }
4153
4154    #[test]
4155    fn thread_affinity_advice_as_json_structure() {
4156        let advice = ThreadAffinityAdvice {
4157            shard_id: 2,
4158            recommended_core: 5,
4159            recommended_numa_node: 1,
4160            enforcement: AffinityEnforcement::Strict,
4161        };
4162        let json = advice.as_json();
4163        assert_eq!(json["shard_id"], 2);
4164        assert_eq!(json["recommended_core"], 5);
4165        assert_eq!(json["recommended_numa_node"], 1);
4166        assert_eq!(json["enforcement"], "strict");
4167    }
4168
4169    // ── Property tests ──
4170
4171    mod proptest_scheduler {
4172        use super::*;
4173        use proptest::prelude::*;
4174
4175        proptest! {
4176            #[test]
4177            fn seq_next_is_monotonic(start in 0..u64::MAX - 100) {
4178                let s = Seq(start);
4179                let n = s.next();
4180                assert!(n >= s, "Seq::next must be monotonically non-decreasing");
4181                assert!(
4182                    n.value() == start + 1 || start == u64::MAX,
4183                    "Seq::next must increment by 1 unless saturated"
4184                );
4185            }
4186
4187            #[test]
4188            fn seq_next_saturates(start in u64::MAX - 5..=u64::MAX) {
4189                let s = Seq(start);
4190                let n = s.next();
4191                // Verify next() does not panic and produces a valid value.
4192                let _ = n.value();
4193                assert!(n >= s, "must be monotonic even at saturation boundary");
4194            }
4195
4196            #[test]
4197            fn timer_entry_ordering_consistent_with_min_heap(
4198                id_a in 0..1000u64,
4199                id_b in 0..1000u64,
4200                deadline_a in 0..10000u64,
4201                deadline_b in 0..10000u64,
4202                seq_a in 0..1000u64,
4203                seq_b in 0..1000u64,
4204            ) {
4205                let ta = TimerEntry::new(id_a, deadline_a, Seq(seq_a));
4206                let tb = TimerEntry::new(id_b, deadline_b, Seq(seq_b));
4207                // Reversed ordering: earlier deadline = GREATER (for BinaryHeap min-heap)
4208                if deadline_a < deadline_b {
4209                    assert!(ta > tb, "earlier deadline must sort greater (min-heap)");
4210                } else if deadline_a > deadline_b {
4211                    assert!(ta < tb, "later deadline must sort less (min-heap)");
4212                } else if seq_a < seq_b {
4213                    assert!(ta > tb, "same deadline, earlier seq must sort greater");
4214                } else if seq_a > seq_b {
4215                    assert!(ta < tb, "same deadline, later seq must sort less");
4216                } else {
4217                    assert!(ta == tb, "same deadline+seq must be equal");
4218                }
4219            }
4220
4221            #[test]
4222            fn stable_hash_is_deterministic(input in "[a-z0-9_.-]{1,64}") {
4223                let h1 = ReactorMesh::stable_hash(&input);
4224                let h2 = ReactorMesh::stable_hash(&input);
4225                assert!(h1 == h2, "stable_hash must be deterministic");
4226            }
4227
4228            #[test]
4229            fn hash_route_returns_valid_shard(
4230                shard_count in 1..32usize,
4231                call_id in "[a-z0-9]{1,20}",
4232            ) {
4233                let config = ReactorMeshConfig {
4234                    shard_count,
4235                    lane_capacity: 16,
4236                    topology: None,
4237                };
4238                let mesh = ReactorMesh::new(config);
4239                let shard = mesh.hash_route(&call_id);
4240                assert!(
4241                    shard < mesh.shard_count(),
4242                    "hash_route returned {shard} >= shard_count {}",
4243                    mesh.shard_count(),
4244                );
4245            }
4246
4247            #[test]
4248            fn rr_route_returns_valid_shard(
4249                shard_count in 1..32usize,
4250                iterations in 1..100usize,
4251            ) {
4252                let config = ReactorMeshConfig {
4253                    shard_count,
4254                    lane_capacity: 16,
4255                    topology: None,
4256                };
4257                let mut mesh = ReactorMesh::new(config);
4258                for _ in 0..iterations {
4259                    let shard = mesh.rr_route();
4260                    assert!(
4261                        shard < mesh.shard_count(),
4262                        "rr_route returned {shard} >= shard_count {}",
4263                        mesh.shard_count(),
4264                    );
4265                }
4266            }
4267
4268            #[test]
4269            fn drain_global_order_is_sorted(
4270                shard_count in 1..8usize,
4271                lane_capacity in 2..16usize,
4272                enqueues in 1..30usize,
4273            ) {
4274                let config = ReactorMeshConfig {
4275                    shard_count,
4276                    lane_capacity,
4277                    topology: None,
4278                };
4279                let mut mesh = ReactorMesh::new(config);
4280                let mut success_count = 0usize;
4281                for i in 0..enqueues {
4282                    let call_id = format!("call_{i}");
4283                    let outcome = HostcallOutcome::Success(serde_json::Value::Null);
4284                    if mesh.enqueue_hostcall_complete(call_id, outcome).is_ok() {
4285                        success_count += 1;
4286                    }
4287                }
4288                let drained = mesh.drain_global_order(success_count);
4289                // Verify ascending global_seq
4290                for pair in drained.windows(2) {
4291                    assert!(
4292                        pair[0].global_seq < pair[1].global_seq,
4293                        "drain_global_order must emit ascending seq: {:?} vs {:?}",
4294                        pair[0].global_seq,
4295                        pair[1].global_seq,
4296                    );
4297                }
4298            }
4299
4300            #[test]
4301            fn mesh_total_depth_bounded_by_capacity(
4302                shard_count in 1..8usize,
4303                lane_capacity in 1..16usize,
4304                enqueues in 0..100usize,
4305            ) {
4306                let config = ReactorMeshConfig {
4307                    shard_count,
4308                    lane_capacity,
4309                    topology: None,
4310                };
4311                let mut mesh = ReactorMesh::new(config);
4312                for i in 0..enqueues {
4313                    let call_id = format!("call_{i}");
4314                    let outcome = HostcallOutcome::Success(serde_json::Value::Null);
4315                    let _ = mesh.enqueue_hostcall_complete(call_id, outcome);
4316                }
4317                let max_total = shard_count * lane_capacity;
4318                assert!(
4319                    mesh.total_depth() <= max_total,
4320                    "total_depth {} exceeds max possible {}",
4321                    mesh.total_depth(),
4322                    max_total,
4323                );
4324            }
4325
4326            #[test]
4327            fn scheduler_timer_cancel_idempotent(
4328                timer_count in 1..10usize,
4329                cancel_idx in 0..10usize,
4330            ) {
4331                let clock = DeterministicClock::new(0);
4332                let mut sched = Scheduler::with_clock(clock);
4333                let mut timer_ids = Vec::new();
4334                for i in 0..timer_count {
4335                    timer_ids.push(sched.set_timeout(u64::try_from(i + 1).unwrap() * 100));
4336                }
4337                if cancel_idx < timer_ids.len() {
4338                    let tid = timer_ids[cancel_idx];
4339                    let first = sched.clear_timeout(tid);
4340                    let second = sched.clear_timeout(tid);
4341                    assert!(first, "first cancel should succeed");
4342                    assert!(!second, "second cancel should return false");
4343                }
4344            }
4345        }
4346    }
4347}