Skip to main content

ringkernel_core/
scheduling.rs

1//! Dynamic Actor Scheduling — Work Stealing Protocol
2//!
3//! Provides load balancing for persistent GPU actors via a work stealing protocol.
4//! Without dynamic scheduling, each actor (thread block) processes only its own
5//! message queue. If one actor's workload spikes while neighbors are idle,
6//! the busy actor becomes a bottleneck.
7//!
8//! # Scheduler Warp Pattern
9//!
10//! Within each thread block of the persistent kernel:
11//! - **Warp 0**: Scheduler warp — monitors queue depth, steals work from overloaded
12//!   neighbors, redistributes messages
13//! - **Warps 1-N**: Compute warps — process messages from the local work queue
14//!
15//! ```text
16//! ┌─── Block (Actor) ───────────────────────────────────┐
17//! │ Warp 0 [SCHEDULER]                                   │
18//! │ ├─ Monitor local queue depth                         │
19//! │ ├─ If depth < steal_threshold:                       │
20//! │ │   └─ Steal from busiest neighbor via K2K           │
21//! │ ├─ If depth > share_threshold:                       │
22//! │ │   └─ Offer work to least-busy neighbor             │
23//! │ └─ Update load metrics in shared memory              │
24//! │                                                      │
25//! │ Warps 1-7 [COMPUTE]                                  │
26//! │ ├─ Dequeue message from local work queue             │
27//! │ ├─ Process message (user handler)                    │
28//! │ └─ Enqueue response to output queue                  │
29//! └──────────────────────────────────────────────────────┘
30//! ```
31//!
32//! # Work Stealing Protocol
33//!
34//! 1. Each block publishes its queue depth to a shared load table (global or DSMEM)
35//! 2. Scheduler warp compares local depth with neighbor depths
36//! 3. If local depth < `steal_threshold` and a neighbor has depth > `share_threshold`:
37//!    a. Scheduler warp atomically reserves N messages from neighbor's queue
38//!    b. Messages are copied via K2K channel (DSMEM for cluster, global for cross-cluster)
39//!    c. Both blocks update their queue depths
40//! 4. Grid sync (or cluster sync) ensures load table consistency
41//!
42//! # Load Table Layout (in mapped/global memory)
43//!
44//! ```text
45//! load_table[block_id] = {
46//!     queue_depth: u32,    // Current input queue depth
47//!     capacity: u32,       // Queue capacity
48//!     messages_processed: u64,  // Throughput indicator
49//!     steal_requests: u32, // Pending steal requests
50//!     offer_count: u32,    // Messages offered to steal
51//! }
52//! ```
53
54use std::fmt;
55
56/// Configuration for dynamic actor scheduling.
57#[derive(Debug, Clone)]
58pub struct SchedulerConfig {
59    /// Queue depth below which the scheduler tries to steal work.
60    pub steal_threshold: u32,
61    /// Queue depth above which the scheduler offers work to neighbors.
62    pub share_threshold: u32,
63    /// Maximum messages to steal in one operation.
64    pub max_steal_batch: u32,
65    /// Number of neighbor blocks to check for work stealing.
66    pub steal_neighborhood: u32,
67    /// Enable/disable dynamic scheduling (can be toggled at runtime).
68    pub enabled: bool,
69    /// Scheduling strategy.
70    pub strategy: SchedulingStrategy,
71}
72
73impl Default for SchedulerConfig {
74    fn default() -> Self {
75        Self {
76            steal_threshold: 4,
77            share_threshold: 64,
78            max_steal_batch: 16,
79            steal_neighborhood: 4,
80            enabled: true,
81            strategy: SchedulingStrategy::WorkStealing,
82        }
83    }
84}
85
86impl SchedulerConfig {
87    /// Create a static (no scheduling) configuration.
88    ///
89    /// This is the current default behavior: each thread block processes its
90    /// own fixed work queue. No load balancing occurs.
91    pub fn static_scheduling() -> Self {
92        Self {
93            enabled: false,
94            strategy: SchedulingStrategy::Static,
95            ..Default::default()
96        }
97    }
98
99    /// Create a work-stealing configuration with the given threshold.
100    ///
101    /// When an actor's queue depth falls below `steal_threshold`, its scheduler
102    /// warp will attempt to steal messages from the busiest neighbor.
103    pub fn work_stealing(steal_threshold: u32) -> Self {
104        Self {
105            steal_threshold,
106            strategy: SchedulingStrategy::WorkStealing,
107            ..Default::default()
108        }
109    }
110
111    /// Create a round-robin configuration.
112    ///
113    /// Messages are distributed from a global work queue to blocks in
114    /// round-robin order, using a global atomic counter for index assignment.
115    pub fn round_robin() -> Self {
116        Self {
117            strategy: SchedulingStrategy::RoundRobin,
118            ..Default::default()
119        }
120    }
121
122    /// Create a priority-based configuration with the given number of levels.
123    ///
124    /// Messages are bucketed into priority sub-queues (0 = lowest, levels-1 = highest).
125    /// The scheduler warp dequeues from the highest-priority non-empty sub-queue first.
126    pub fn priority(levels: u32) -> Self {
127        let levels = levels.clamp(1, 16);
128        Self {
129            strategy: SchedulingStrategy::Priority { levels },
130            ..Default::default()
131        }
132    }
133
134    /// Set the steal threshold.
135    pub fn with_steal_threshold(mut self, threshold: u32) -> Self {
136        self.steal_threshold = threshold;
137        self
138    }
139
140    /// Set the share threshold.
141    pub fn with_share_threshold(mut self, threshold: u32) -> Self {
142        self.share_threshold = threshold;
143        self
144    }
145
146    /// Set the maximum steal batch size.
147    pub fn with_max_steal_batch(mut self, batch: u32) -> Self {
148        self.max_steal_batch = batch;
149        self
150    }
151
152    /// Set the number of neighbor blocks to check for work stealing.
153    pub fn with_steal_neighborhood(mut self, neighborhood: u32) -> Self {
154        self.steal_neighborhood = neighborhood;
155        self
156    }
157
158    /// Enable or disable the scheduler.
159    pub fn with_enabled(mut self, enabled: bool) -> Self {
160        self.enabled = enabled;
161        self
162    }
163
164    /// Check if this configuration uses dynamic scheduling (anything other than Static).
165    pub fn is_dynamic(&self) -> bool {
166        self.enabled && self.strategy != SchedulingStrategy::Static
167    }
168}
169
170/// Work item for the scheduler.
171///
172/// Represents a unit of work that can be assigned to any actor (thread block).
173/// The scheduler warp uses these to track pending work in the global work queue.
174///
175/// Layout (16 bytes): message_id (8) + actor_id (4) + priority (4).
176/// Fields ordered largest-first to avoid padding.
177#[repr(C)]
178#[derive(Debug, Clone, Copy, Default)]
179pub struct WorkItem {
180    /// Unique message identifier for tracking.
181    pub message_id: u64,
182    /// Actor ID that owns (or should process) this work item.
183    pub actor_id: u32,
184    /// Priority level (0 = lowest). Used by `Priority` strategy.
185    pub priority: u32,
186}
187
188impl WorkItem {
189    /// Create a new work item.
190    pub fn new(actor_id: u32, message_id: u64, priority: u32) -> Self {
191        Self {
192            message_id,
193            actor_id,
194            priority,
195        }
196    }
197}
198
199impl fmt::Display for WorkItem {
200    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201        write!(
202            f,
203            "WorkItem(actor={}, msg={}, pri={})",
204            self.actor_id, self.message_id, self.priority
205        )
206    }
207}
208
209/// Configuration for the scheduler warp pattern in CUDA codegen.
210///
211/// This controls how the scheduler warp (warp 0 by default) is generated
212/// within each persistent kernel thread block. When enabled, the codegen
213/// produces a split: warp 0 handles work distribution, remaining warps
214/// do message processing.
215#[derive(Debug, Clone)]
216pub struct SchedulerWarpConfig {
217    /// Which warp handles scheduling (default: 0).
218    pub scheduler_warp_id: u32,
219    /// The scheduling parameters.
220    pub scheduler: SchedulerConfig,
221    /// Size of the global work queue (number of WorkItem slots).
222    /// Must be power of 2. Used for round-robin and priority strategies.
223    pub work_queue_capacity: usize,
224    /// Polling interval in nanoseconds for the scheduler warp
225    /// when no work is available (default: 1000ns).
226    pub poll_interval_ns: u32,
227}
228
229impl Default for SchedulerWarpConfig {
230    fn default() -> Self {
231        Self {
232            scheduler_warp_id: 0,
233            scheduler: SchedulerConfig::default(),
234            work_queue_capacity: 1024,
235            poll_interval_ns: 1000,
236        }
237    }
238}
239
240impl SchedulerWarpConfig {
241    /// Create a new scheduler warp config with the given strategy.
242    pub fn new(scheduler: SchedulerConfig) -> Self {
243        Self {
244            scheduler,
245            ..Default::default()
246        }
247    }
248
249    /// Create a static (disabled) scheduler warp config.
250    /// Codegen will produce the original non-split kernel.
251    pub fn disabled() -> Self {
252        Self {
253            scheduler: SchedulerConfig::static_scheduling(),
254            ..Default::default()
255        }
256    }
257
258    /// Set the scheduler warp ID.
259    pub fn with_scheduler_warp(mut self, warp_id: u32) -> Self {
260        self.scheduler_warp_id = warp_id;
261        self
262    }
263
264    /// Set the global work queue capacity.
265    pub fn with_work_queue_capacity(mut self, capacity: usize) -> Self {
266        debug_assert!(
267            capacity.is_power_of_two(),
268            "Work queue capacity must be power of 2"
269        );
270        self.work_queue_capacity = capacity;
271        self
272    }
273
274    /// Set the poll interval for the scheduler warp.
275    pub fn with_poll_interval_ns(mut self, ns: u32) -> Self {
276        self.poll_interval_ns = ns;
277        self
278    }
279
280    /// Check if the scheduler warp pattern should be generated.
281    pub fn is_enabled(&self) -> bool {
282        self.scheduler.is_dynamic()
283    }
284}
285
286/// Scheduling strategy for persistent actors.
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub enum SchedulingStrategy {
289    /// No dynamic scheduling — each actor processes its own queue only.
290    Static,
291    /// Work stealing — idle actors steal from busy neighbors.
292    WorkStealing,
293    /// Work sharing — busy actors proactively share with idle neighbors.
294    WorkSharing,
295    /// Hybrid — combines stealing and sharing based on load imbalance.
296    Hybrid,
297    /// Round-robin: a central work queue distributes to blocks in round-robin order.
298    /// The scheduler warp in each block pulls from a global atomic counter.
299    RoundRobin,
300    /// Priority-based: actors have priority levels, higher priority served first.
301    /// Messages are dequeued from the highest-priority non-empty sub-queue.
302    Priority {
303        /// Number of priority levels (1-16). Messages are bucketed into sub-queues.
304        levels: u32,
305    },
306}
307
308impl fmt::Display for SchedulingStrategy {
309    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310        match self {
311            Self::Static => write!(f, "static"),
312            Self::WorkStealing => write!(f, "work-stealing"),
313            Self::WorkSharing => write!(f, "work-sharing"),
314            Self::Hybrid => write!(f, "hybrid"),
315            Self::RoundRobin => write!(f, "round-robin"),
316            Self::Priority { levels } => write!(f, "priority({})", levels),
317        }
318    }
319}
320
321/// Per-actor load entry in the shared load table.
322///
323/// This structure is stored in mapped memory (or global memory on GPU)
324/// so both the scheduler warp and the host can read/write it.
325#[repr(C, align(32))]
326#[derive(Debug, Clone, Copy, Default)]
327pub struct LoadEntry {
328    /// Current input queue depth.
329    pub queue_depth: u32,
330    /// Queue capacity.
331    pub capacity: u32,
332    /// Total messages processed (monotonic counter for throughput).
333    pub messages_processed: u64,
334    /// Pending steal requests (atomically incremented by thieves).
335    pub steal_requests: u32,
336    /// Messages available for stealing (set by owner when depth > share_threshold).
337    pub offer_count: u32,
338    /// Load score: queue_depth * 256 / capacity (0-255, for fast comparison).
339    pub load_score: u32,
340    /// Padding to 32 bytes.
341    pub _pad: u32,
342}
343
344impl LoadEntry {
345    /// Compute the load score (0-255) from queue depth and capacity.
346    pub fn compute_load_score(&mut self) {
347        if self.capacity > 0 {
348            self.load_score = ((self.queue_depth as u64 * 255) / self.capacity as u64) as u32;
349        } else {
350            self.load_score = 0;
351        }
352    }
353
354    /// Check if this actor is overloaded (should offer work).
355    pub fn is_overloaded(&self, threshold: u32) -> bool {
356        self.queue_depth > threshold
357    }
358
359    /// Check if this actor is underloaded (should steal work).
360    pub fn is_underloaded(&self, threshold: u32) -> bool {
361        self.queue_depth < threshold
362    }
363}
364
365/// The load table containing entries for all actors.
366///
367/// This lives in mapped memory so both host and GPU can access it.
368pub struct LoadTable {
369    entries: Vec<LoadEntry>,
370}
371
372impl LoadTable {
373    /// Create a new load table for `num_actors` actors.
374    pub fn new(num_actors: usize) -> Self {
375        Self {
376            entries: vec![LoadEntry::default(); num_actors],
377        }
378    }
379
380    /// Get a reference to an entry.
381    pub fn get(&self, actor_id: u32) -> Option<&LoadEntry> {
382        self.entries.get(actor_id as usize)
383    }
384
385    /// Get a mutable reference to an entry.
386    pub fn get_mut(&mut self, actor_id: u32) -> Option<&mut LoadEntry> {
387        self.entries.get_mut(actor_id as usize)
388    }
389
390    /// Find the most loaded actor (best target for work stealing FROM).
391    pub fn most_loaded(&self) -> Option<(u32, &LoadEntry)> {
392        self.entries
393            .iter()
394            .enumerate()
395            .filter(|(_, e)| e.queue_depth > 0)
396            .max_by_key(|(_, e)| e.queue_depth)
397            .map(|(i, e)| (i as u32, e))
398    }
399
400    /// Find the least loaded actor (best target for work sharing TO).
401    pub fn least_loaded(&self) -> Option<(u32, &LoadEntry)> {
402        self.entries
403            .iter()
404            .enumerate()
405            .filter(|(_, e)| e.capacity > 0)
406            .min_by_key(|(_, e)| e.queue_depth)
407            .map(|(i, e)| (i as u32, e))
408    }
409
410    /// Compute load imbalance ratio (max_depth / min_depth).
411    /// Returns 1.0 for perfectly balanced, higher = more imbalanced.
412    pub fn imbalance_ratio(&self) -> f64 {
413        let active: Vec<&LoadEntry> = self.entries.iter().filter(|e| e.capacity > 0).collect();
414        if active.is_empty() {
415            return 1.0;
416        }
417
418        let max = active.iter().map(|e| e.queue_depth).max().unwrap_or(0);
419        let min = active.iter().map(|e| e.queue_depth).min().unwrap_or(0);
420
421        if min == 0 {
422            if max == 0 {
423                1.0
424            } else {
425                f64::INFINITY
426            }
427        } else {
428            max as f64 / min as f64
429        }
430    }
431
432    /// Compute a work stealing plan: which actors should steal from which.
433    ///
434    /// Returns a list of (thief_id, victim_id, count) tuples.
435    /// For `RoundRobin` and `Priority` strategies, this returns an empty plan
436    /// since those use a central work queue rather than peer-to-peer stealing.
437    pub fn compute_steal_plan(&self, config: &SchedulerConfig) -> Vec<StealOp> {
438        if !config.enabled || config.strategy == SchedulingStrategy::Static {
439            return Vec::new();
440        }
441
442        // Round-robin and priority use a central queue, not peer stealing.
443        if matches!(
444            config.strategy,
445            SchedulingStrategy::RoundRobin | SchedulingStrategy::Priority { .. }
446        ) {
447            return Vec::new();
448        }
449
450        let mut ops = Vec::new();
451
452        // Find underloaded actors (potential thieves)
453        let thieves: Vec<u32> = self
454            .entries
455            .iter()
456            .enumerate()
457            .filter(|(_, e)| e.is_underloaded(config.steal_threshold) && e.capacity > 0)
458            .map(|(i, _)| i as u32)
459            .collect();
460
461        // Find overloaded actors (potential victims)
462        let mut victims: Vec<(u32, u32)> = self
463            .entries
464            .iter()
465            .enumerate()
466            .filter(|(_, e)| e.is_overloaded(config.share_threshold))
467            .map(|(i, e)| (i as u32, e.queue_depth - config.share_threshold))
468            .collect();
469
470        // Sort victims by excess load (descending)
471        victims.sort_by_key(|v| std::cmp::Reverse(v.1));
472
473        // Match thieves to victims
474        let mut victim_idx = 0;
475        for thief in &thieves {
476            if victim_idx >= victims.len() {
477                break;
478            }
479
480            let (victim_id, available) = &mut victims[victim_idx];
481            if *available == 0 {
482                victim_idx += 1;
483                continue;
484            }
485
486            let steal_count = (*available).min(config.max_steal_batch);
487            ops.push(StealOp {
488                thief: *thief,
489                victim: *victim_id,
490                count: steal_count,
491            });
492
493            *available -= steal_count;
494            if *available == 0 {
495                victim_idx += 1;
496            }
497        }
498
499        ops
500    }
501
502    /// Get all entries as a slice.
503    pub fn entries(&self) -> &[LoadEntry] {
504        &self.entries
505    }
506
507    /// Number of actors in the table.
508    pub fn len(&self) -> usize {
509        self.entries.len()
510    }
511
512    /// Check if empty.
513    pub fn is_empty(&self) -> bool {
514        self.entries.is_empty()
515    }
516}
517
518/// A single work-stealing operation.
519#[derive(Debug, Clone, Copy)]
520pub struct StealOp {
521    /// Actor that will receive stolen work.
522    pub thief: u32,
523    /// Actor that work will be stolen from.
524    pub victim: u32,
525    /// Number of messages to transfer.
526    pub count: u32,
527}
528
529impl fmt::Display for StealOp {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        write!(
532            f,
533            "steal {} msgs: actor {} ← actor {}",
534            self.count, self.thief, self.victim
535        )
536    }
537}
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542
543    #[test]
544    fn test_scheduler_config_defaults() {
545        let config = SchedulerConfig::default();
546        assert_eq!(config.steal_threshold, 4);
547        assert_eq!(config.share_threshold, 64);
548        assert!(config.enabled);
549    }
550
551    #[test]
552    fn test_load_entry_score() {
553        let mut entry = LoadEntry {
554            queue_depth: 50,
555            capacity: 100,
556            ..Default::default()
557        };
558        entry.compute_load_score();
559        assert_eq!(entry.load_score, 127); // 50/100 * 255 ≈ 127
560    }
561
562    #[test]
563    fn test_load_table_most_least_loaded() {
564        let mut table = LoadTable::new(4);
565        table.entries[0] = LoadEntry {
566            queue_depth: 10,
567            capacity: 100,
568            ..Default::default()
569        };
570        table.entries[1] = LoadEntry {
571            queue_depth: 90,
572            capacity: 100,
573            ..Default::default()
574        };
575        table.entries[2] = LoadEntry {
576            queue_depth: 50,
577            capacity: 100,
578            ..Default::default()
579        };
580        table.entries[3] = LoadEntry {
581            queue_depth: 5,
582            capacity: 100,
583            ..Default::default()
584        };
585
586        let (most_id, most) = table.most_loaded().unwrap();
587        assert_eq!(most_id, 1);
588        assert_eq!(most.queue_depth, 90);
589
590        let (least_id, least) = table.least_loaded().unwrap();
591        assert_eq!(least_id, 3);
592        assert_eq!(least.queue_depth, 5);
593    }
594
595    #[test]
596    fn test_imbalance_ratio() {
597        let mut table = LoadTable::new(4);
598        // Balanced: all depth 50
599        for e in &mut table.entries {
600            e.queue_depth = 50;
601            e.capacity = 100;
602        }
603        assert!((table.imbalance_ratio() - 1.0).abs() < 0.01);
604
605        // Imbalanced: 10 vs 100
606        table.entries[0].queue_depth = 10;
607        table.entries[1].queue_depth = 100;
608        assert!((table.imbalance_ratio() - 10.0).abs() < 0.01);
609    }
610
611    #[test]
612    fn test_steal_plan_static_disabled() {
613        let table = LoadTable::new(4);
614        let config = SchedulerConfig {
615            strategy: SchedulingStrategy::Static,
616            ..Default::default()
617        };
618        let plan = table.compute_steal_plan(&config);
619        assert!(plan.is_empty());
620    }
621
622    #[test]
623    fn test_steal_plan_work_stealing() {
624        let mut table = LoadTable::new(4);
625        // Actor 0: idle (depth 2, below steal threshold 4)
626        table.entries[0] = LoadEntry {
627            queue_depth: 2,
628            capacity: 100,
629            ..Default::default()
630        };
631        // Actor 1: overloaded (depth 80, above share threshold 64)
632        table.entries[1] = LoadEntry {
633            queue_depth: 80,
634            capacity: 100,
635            ..Default::default()
636        };
637        // Actor 2: normal
638        table.entries[2] = LoadEntry {
639            queue_depth: 30,
640            capacity: 100,
641            ..Default::default()
642        };
643        // Actor 3: idle
644        table.entries[3] = LoadEntry {
645            queue_depth: 1,
646            capacity: 100,
647            ..Default::default()
648        };
649
650        let config = SchedulerConfig::default();
651        let plan = table.compute_steal_plan(&config);
652
653        assert!(!plan.is_empty(), "Should produce steal operations");
654        // Actor 0 and 3 should steal from actor 1
655        assert!(plan.iter().all(|op| op.victim == 1));
656        assert!(plan.iter().any(|op| op.thief == 0 || op.thief == 3));
657    }
658
659    #[test]
660    fn test_steal_plan_respects_max_batch() {
661        let mut table = LoadTable::new(2);
662        table.entries[0] = LoadEntry {
663            queue_depth: 0,
664            capacity: 100,
665            ..Default::default()
666        };
667        table.entries[1] = LoadEntry {
668            queue_depth: 100,
669            capacity: 100,
670            ..Default::default()
671        };
672
673        let config = SchedulerConfig {
674            max_steal_batch: 8,
675            ..Default::default()
676        };
677        let plan = table.compute_steal_plan(&config);
678
679        assert!(!plan.is_empty());
680        for op in &plan {
681            assert!(
682                op.count <= 8,
683                "Steal count {} exceeds max batch 8",
684                op.count
685            );
686        }
687    }
688
689    #[test]
690    fn test_load_entry_size() {
691        assert_eq!(
692            std::mem::size_of::<LoadEntry>(),
693            32,
694            "LoadEntry must be 32 bytes for GPU cache efficiency"
695        );
696    }
697
698    #[test]
699    fn test_work_item_size() {
700        assert_eq!(
701            std::mem::size_of::<WorkItem>(),
702            16,
703            "WorkItem must be 16 bytes for GPU cache efficiency"
704        );
705    }
706
707    #[test]
708    fn test_work_item_display() {
709        let item = WorkItem::new(3, 42, 2);
710        let s = format!("{}", item);
711        assert!(s.contains("actor=3"));
712        assert!(s.contains("msg=42"));
713        assert!(s.contains("pri=2"));
714    }
715
716    #[test]
717    fn test_scheduler_config_static() {
718        let config = SchedulerConfig::static_scheduling();
719        assert!(!config.enabled);
720        assert_eq!(config.strategy, SchedulingStrategy::Static);
721        assert!(!config.is_dynamic());
722    }
723
724    #[test]
725    fn test_scheduler_config_work_stealing() {
726        let config = SchedulerConfig::work_stealing(8);
727        assert_eq!(config.steal_threshold, 8);
728        assert_eq!(config.strategy, SchedulingStrategy::WorkStealing);
729        assert!(config.is_dynamic());
730    }
731
732    #[test]
733    fn test_scheduler_config_round_robin() {
734        let config = SchedulerConfig::round_robin();
735        assert_eq!(config.strategy, SchedulingStrategy::RoundRobin);
736        assert!(config.is_dynamic());
737    }
738
739    #[test]
740    fn test_scheduler_config_priority() {
741        let config = SchedulerConfig::priority(4);
742        assert_eq!(config.strategy, SchedulingStrategy::Priority { levels: 4 });
743        assert!(config.is_dynamic());
744    }
745
746    #[test]
747    fn test_scheduler_config_priority_clamped() {
748        let config = SchedulerConfig::priority(100);
749        assert_eq!(config.strategy, SchedulingStrategy::Priority { levels: 16 });
750    }
751
752    #[test]
753    fn test_scheduler_config_builder_chain() {
754        let config = SchedulerConfig::work_stealing(10)
755            .with_share_threshold(80)
756            .with_max_steal_batch(32)
757            .with_steal_neighborhood(6);
758
759        assert_eq!(config.steal_threshold, 10);
760        assert_eq!(config.share_threshold, 80);
761        assert_eq!(config.max_steal_batch, 32);
762        assert_eq!(config.steal_neighborhood, 6);
763    }
764
765    #[test]
766    fn test_scheduler_warp_config_default() {
767        let config = SchedulerWarpConfig::default();
768        assert_eq!(config.scheduler_warp_id, 0);
769        assert_eq!(config.work_queue_capacity, 1024);
770        assert_eq!(config.poll_interval_ns, 1000);
771        assert!(config.is_enabled());
772    }
773
774    #[test]
775    fn test_scheduler_warp_config_disabled() {
776        let config = SchedulerWarpConfig::disabled();
777        assert!(!config.is_enabled());
778    }
779
780    #[test]
781    fn test_scheduler_warp_config_builder() {
782        let config = SchedulerWarpConfig::new(SchedulerConfig::round_robin())
783            .with_scheduler_warp(1)
784            .with_work_queue_capacity(2048)
785            .with_poll_interval_ns(500);
786
787        assert_eq!(config.scheduler_warp_id, 1);
788        assert_eq!(config.work_queue_capacity, 2048);
789        assert_eq!(config.poll_interval_ns, 500);
790        assert!(config.is_enabled());
791    }
792
793    #[test]
794    fn test_strategy_display() {
795        assert_eq!(format!("{}", SchedulingStrategy::Static), "static");
796        assert_eq!(
797            format!("{}", SchedulingStrategy::WorkStealing),
798            "work-stealing"
799        );
800        assert_eq!(
801            format!("{}", SchedulingStrategy::WorkSharing),
802            "work-sharing"
803        );
804        assert_eq!(format!("{}", SchedulingStrategy::Hybrid), "hybrid");
805        assert_eq!(format!("{}", SchedulingStrategy::RoundRobin), "round-robin");
806        assert_eq!(
807            format!("{}", SchedulingStrategy::Priority { levels: 4 }),
808            "priority(4)"
809        );
810    }
811
812    #[test]
813    fn test_steal_plan_round_robin_empty() {
814        let mut table = LoadTable::new(4);
815        table.entries[0] = LoadEntry {
816            queue_depth: 2,
817            capacity: 100,
818            ..Default::default()
819        };
820        table.entries[1] = LoadEntry {
821            queue_depth: 80,
822            capacity: 100,
823            ..Default::default()
824        };
825
826        let config = SchedulerConfig::round_robin();
827        let plan = table.compute_steal_plan(&config);
828        assert!(plan.is_empty(), "Round-robin should not produce steal ops");
829    }
830
831    #[test]
832    fn test_steal_plan_priority_empty() {
833        let mut table = LoadTable::new(4);
834        table.entries[0] = LoadEntry {
835            queue_depth: 2,
836            capacity: 100,
837            ..Default::default()
838        };
839        table.entries[1] = LoadEntry {
840            queue_depth: 80,
841            capacity: 100,
842            ..Default::default()
843        };
844
845        let config = SchedulerConfig::priority(4);
846        let plan = table.compute_steal_plan(&config);
847        assert!(plan.is_empty(), "Priority should not produce steal ops");
848    }
849}