1use vyre_driver::backend::BackendError;
4
5mod cache;
6use super::planner::{MegakernelGridLimits, MegakernelGridRequest, MegakernelLaunchGeometry};
7use super::staging_reserve::try_reserve_vec_capacity;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub enum MegakernelQueuePressure {
12 Empty,
14 Light,
16 Balanced,
18 Saturated,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum MegakernelExecutionMode {
25 Interpreter,
27 Jit,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33pub enum MegakernelDispatchTopology {
34 Empty,
36 SparseFrontier,
39 HybridFrontier,
42 DenseFrontier,
45 FusedDense,
47 MemoryConstrained,
50}
51
52pub const TOPOLOGY_EVIDENCE_SCHEMA_VERSION: u32 = 1;
54
55pub const HOT_WINDOW_PROMOTION_EVIDENCE_SCHEMA_VERSION: u32 = 1;
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60pub enum MegakernelGraphBlasSwitchClass {
61 Empty,
63 Sparse,
65 Hybrid,
67 Dense,
69 MemoryConstrained,
71}
72
73impl MegakernelGraphBlasSwitchClass {
74 #[must_use]
76 pub const fn as_str(self) -> &'static str {
77 match self {
78 Self::Empty => "empty",
79 Self::Sparse => "sparse",
80 Self::Hybrid => "hybrid",
81 Self::Dense => "dense",
82 Self::MemoryConstrained => "memory_constrained",
83 }
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub struct MegakernelTopologyEvidence {
90 pub schema_version: u32,
92 pub queue_pressure: MegakernelQueuePressure,
94 pub frontier_density_bps: u16,
96 pub semiring_frontier_density_bps: u16,
98 pub selected_topology: MegakernelDispatchTopology,
100 pub graphblas_switch_class: MegakernelGraphBlasSwitchClass,
102 pub resident_device_bytes: u64,
104 pub estimated_peak_device_bytes: u64,
106 pub output_parity_required: bool,
108}
109
110impl MegakernelTopologyEvidence {
111 #[must_use]
115 pub fn is_complete(self) -> bool {
116 self.schema_version == TOPOLOGY_EVIDENCE_SCHEMA_VERSION
117 && self.frontier_density_bps <= 10_000
118 && self.semiring_frontier_density_bps <= 10_000
119 && self.output_parity_required
120 }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
125pub enum MegakernelPromotionRoute {
126 Interpreter,
128 QueueJit,
130 OpcodeJit,
132 WindowJit,
134 OpcodeAndWindowJit,
136}
137
138impl MegakernelPromotionRoute {
139 #[must_use]
141 pub const fn as_str(self) -> &'static str {
142 match self {
143 Self::Interpreter => "interpreter",
144 Self::QueueJit => "queue_jit",
145 Self::OpcodeJit => "opcode_jit",
146 Self::WindowJit => "window_jit",
147 Self::OpcodeAndWindowJit => "opcode_and_window_jit",
148 }
149 }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub struct MegakernelPromotionEvidence {
155 pub schema_version: u32,
157 pub queue_len: u32,
159 pub jit_queue_len_threshold: u32,
161 pub hot_opcode_count: u32,
163 pub hot_opcode_threshold: u32,
165 pub hot_window_count: u32,
167 pub hot_window_threshold: u32,
169 pub execution_mode: MegakernelExecutionMode,
171 pub promote_hot_opcodes: bool,
173 pub promote_hot_windows: bool,
175 pub promotion_route: MegakernelPromotionRoute,
177 pub fused_descriptor_window_required: bool,
179 pub output_parity_required: bool,
181}
182
183impl MegakernelPromotionEvidence {
184 #[must_use]
187 pub fn is_complete(self) -> bool {
188 self.schema_version == HOT_WINDOW_PROMOTION_EVIDENCE_SCHEMA_VERSION
189 && self.jit_queue_len_threshold != 0
190 && self.hot_opcode_threshold != 0
191 && self.hot_window_threshold != 0
192 && self.fused_descriptor_window_required == self.promote_hot_windows
193 && self.output_parity_required
194 }
195}
196
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub struct MegakernelLaunchCacheStats {
200 pub entries: usize,
202 pub hits: u64,
204 pub misses: u64,
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
210pub struct MegakernelLaunchRequest {
211 pub queue_len: u32,
213 pub requested_worker_groups: u32,
215 pub max_workgroup_size_x: u32,
217 pub max_compute_workgroups_per_dimension: u32,
219 pub max_compute_invocations_per_workgroup: u32,
221 pub requested_hit_capacity: u32,
223 pub expected_hits_per_item: u32,
225 pub hot_opcode_count: u32,
227 pub hot_window_count: u32,
229 pub requeue_count: u64,
231 pub max_priority_age: u32,
233 pub graph_node_count: u32,
236 pub graph_edge_count: u32,
239 pub frontier_density_bps: u16,
241 pub memory_pressure_bps: u16,
243 pub resident_device_bytes: u64,
245 pub device_memory_budget_bytes: u64,
247}
248
249impl MegakernelLaunchRequest {
250 #[must_use]
252 pub const fn direct(
253 queue_len: u32,
254 requested_worker_groups: u32,
255 max_workgroup_size_x: u32,
256 ) -> Self {
257 Self {
258 queue_len,
259 requested_worker_groups,
260 max_workgroup_size_x,
261 max_compute_workgroups_per_dimension: requested_worker_groups,
262 max_compute_invocations_per_workgroup: max_workgroup_size_x,
263 requested_hit_capacity: 0,
264 expected_hits_per_item: 1,
265 hot_opcode_count: 0,
266 hot_window_count: 0,
267 requeue_count: 0,
268 max_priority_age: 0,
269 graph_node_count: 0,
270 graph_edge_count: 0,
271 frontier_density_bps: 0,
272 memory_pressure_bps: 0,
273 resident_device_bytes: 0,
274 device_memory_budget_bytes: 0,
275 }
276 }
277}
278
279#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281pub struct MegakernelLaunchRecommendation {
282 pub geometry: MegakernelLaunchGeometry,
284 pub worker_groups: u32,
286 pub hit_capacity: u32,
288 pub pressure: MegakernelQueuePressure,
290 pub execution_mode: MegakernelExecutionMode,
292 pub topology: MegakernelDispatchTopology,
295 pub promote_hot_opcodes: bool,
297 pub promote_hot_windows: bool,
299 pub age_priority_work: bool,
301 pub estimated_peak_device_bytes: u64,
303 pub device_memory_budget_bytes: u64,
305}
306
307#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
309pub struct PriorityRequeueAccounting {
310 pub requeue_count: u64,
312 pub aged_promotions: u64,
314 pub max_priority_age: u32,
316}
317
318pub const PRIORITY_COUNTER_DRAIN_HEADROOM: u64 = 1024;
320
321pub const PRIORITY_COUNTER_DRAIN_FIX: &str =
323 "drain scheduler telemetry before counters reach u64::MAX";
324
325#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
327pub enum PriorityDrainReason {
328 None,
330 PendingTelemetry,
332 RequeueCounterNearLimit,
334 AgedPromotionCounterNearLimit,
336 RequeueCounterExhausted,
338 AgedPromotionCounterExhausted,
340}
341
342impl PriorityDrainReason {
343 #[must_use]
345 pub const fn as_str(self) -> &'static str {
346 match self {
347 Self::None => "none",
348 Self::PendingTelemetry => "pending_telemetry",
349 Self::RequeueCounterNearLimit => "requeue_counter_near_limit",
350 Self::AgedPromotionCounterNearLimit => "aged_promotion_counter_near_limit",
351 Self::RequeueCounterExhausted => "requeue_counter_exhausted",
352 Self::AgedPromotionCounterExhausted => "aged_promotion_counter_exhausted",
353 }
354 }
355}
356
357#[derive(Debug, Clone, Copy, PartialEq, Eq)]
359pub struct PriorityDrainRecommendation {
360 pub should_drain: bool,
362 pub reason: PriorityDrainReason,
364 pub requeue_count: u64,
366 pub aged_promotions: u64,
368 pub max_priority_age: u32,
370 pub requeue_counter_headroom: u64,
372 pub aged_promotion_counter_headroom: u64,
374 pub fix: &'static str,
376}
377
378impl PriorityRequeueAccounting {
379 #[must_use]
381 pub fn drain_recommendation(self) -> PriorityDrainRecommendation {
382 let requeue_counter_headroom = u64::MAX.saturating_sub(self.requeue_count);
383 let aged_promotion_counter_headroom = u64::MAX.saturating_sub(self.aged_promotions);
384 let reason = if self.requeue_count == u64::MAX {
385 PriorityDrainReason::RequeueCounterExhausted
386 } else if self.aged_promotions == u64::MAX {
387 PriorityDrainReason::AgedPromotionCounterExhausted
388 } else if requeue_counter_headroom <= PRIORITY_COUNTER_DRAIN_HEADROOM {
389 PriorityDrainReason::RequeueCounterNearLimit
390 } else if aged_promotion_counter_headroom <= PRIORITY_COUNTER_DRAIN_HEADROOM {
391 PriorityDrainReason::AgedPromotionCounterNearLimit
392 } else if self.requeue_count != 0 || self.aged_promotions != 0 || self.max_priority_age != 0
393 {
394 PriorityDrainReason::PendingTelemetry
395 } else {
396 PriorityDrainReason::None
397 };
398 PriorityDrainRecommendation {
399 should_drain: reason != PriorityDrainReason::None,
400 reason,
401 requeue_count: self.requeue_count,
402 aged_promotions: self.aged_promotions,
403 max_priority_age: self.max_priority_age,
404 requeue_counter_headroom,
405 aged_promotion_counter_headroom,
406 fix: PRIORITY_COUNTER_DRAIN_FIX,
407 }
408 }
409
410 pub fn record_requeue(&mut self, age_ticks: u32) {
412 self.requeue_count = self.requeue_count.saturating_add(1);
413 self.max_priority_age = self.max_priority_age.max(age_ticks);
414 }
415
416 pub fn try_record_requeue(&mut self, age_ticks: u32) -> Result<(), BackendError> {
422 self.requeue_count = self.requeue_count.checked_add(1).ok_or_else(|| {
423 BackendError::new(
424 "megakernel priority requeue_count overflowed u64. Fix: drain scheduler telemetry before counters reach u64::MAX.",
425 )
426 })?;
427 self.max_priority_age = self.max_priority_age.max(age_ticks);
428 Ok(())
429 }
430
431 pub fn record_aged_promotion(&mut self, age_ticks: u32) {
433 self.aged_promotions = self.aged_promotions.saturating_add(1);
434 self.max_priority_age = self.max_priority_age.max(age_ticks);
435 }
436
437 pub fn try_record_aged_promotion(&mut self, age_ticks: u32) -> Result<(), BackendError> {
443 self.aged_promotions = self.aged_promotions.checked_add(1).ok_or_else(|| {
444 BackendError::new(
445 "megakernel aged_promotions overflowed u64. Fix: drain scheduler telemetry before counters reach u64::MAX.",
446 )
447 })?;
448 self.max_priority_age = self.max_priority_age.max(age_ticks);
449 Ok(())
450 }
451}
452
453#[must_use]
467#[cfg(any(test, feature = "legacy-infallible"))]
468pub fn diffuse_priority_across_siblings(
469 priority_stalks: &[f64],
470 restriction_diag: &[f64],
471 damping: f64,
472 iterations: u32,
473) -> Vec<f64> {
474 try_diffuse_priority_across_siblings(priority_stalks, restriction_diag, damping, iterations)
475 .unwrap_or_else(|source| {
476 panic!(
477 "megakernel priority diffusion allocation failed: {source}. Fix: shard the priority sibling set before diffusion."
478 )
479 })
480}
481
482pub fn try_diffuse_priority_across_siblings(
490 priority_stalks: &[f64],
491 restriction_diag: &[f64],
492 damping: f64,
493 iterations: u32,
494) -> Result<Vec<f64>, BackendError> {
495 let mut current = Vec::new();
496 let mut next = Vec::new();
497 try_diffuse_priority_across_siblings_into(
498 priority_stalks,
499 restriction_diag,
500 damping,
501 iterations,
502 &mut current,
503 &mut next,
504 )?;
505 Ok(current)
506}
507
508#[cfg(any(test, feature = "legacy-infallible"))]
510pub fn diffuse_priority_across_siblings_into(
511 priority_stalks: &[f64],
512 restriction_diag: &[f64],
513 damping: f64,
514 iterations: u32,
515 out: &mut Vec<f64>,
516 scratch: &mut Vec<f64>,
517) {
518 try_diffuse_priority_across_siblings_into(
519 priority_stalks,
520 restriction_diag,
521 damping,
522 iterations,
523 out,
524 scratch,
525 )
526 .unwrap_or_else(|source| {
527 panic!(
528 "megakernel priority diffusion allocation failed: {source}. Fix: shard the priority sibling set before diffusion."
529 )
530 });
531}
532
533pub fn try_diffuse_priority_across_siblings_into(
540 priority_stalks: &[f64],
541 restriction_diag: &[f64],
542 damping: f64,
543 iterations: u32,
544 out: &mut Vec<f64>,
545 scratch: &mut Vec<f64>,
546) -> Result<(), BackendError> {
547 out.clear();
548 reserve_target_capacity(out, priority_stalks.len(), "priority diffusion output")?;
549 out.extend_from_slice(priority_stalks);
550 scratch.clear();
551 if priority_stalks.len() != restriction_diag.len() {
552 return Ok(());
553 }
554 for _ in 0..iterations {
555 diffuse_step_into(out, restriction_diag, damping, scratch)?;
556 std::mem::swap(out, scratch);
557 }
558 Ok(())
559}
560
561#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
563pub struct MegakernelLaunchPolicy {
564 pub sizing: super::planner::MegakernelSizingPolicy,
566 pub min_hit_capacity: u32,
568 pub hit_capacity_multiplier: u32,
570 pub saturated_waves: u32,
572 pub hot_opcode_threshold: u32,
574 pub hot_window_threshold: u32,
576 pub jit_queue_len_threshold: u32,
578 pub priority_age_threshold: u32,
580 pub sparse_frontier_threshold_bps: u16,
582 pub dense_frontier_threshold_bps: u16,
584 pub memory_pressure_threshold_bps: u16,
586 pub fusion_edge_threshold: u32,
588 pub scratch_bytes_per_hit: u32,
590}
591
592impl Default for MegakernelLaunchPolicy {
593 fn default() -> Self {
594 Self::standard()
595 }
596}
597
598const FRONTIER_TOPOLOGY_HYSTERESIS_BPS: u16 = 250;
599const MEMORY_TOPOLOGY_HYSTERESIS_BPS: u16 = 250;
600
601impl MegakernelLaunchPolicy {
602 #[must_use]
604 pub const fn standard() -> Self {
605 Self {
606 sizing: super::planner::MegakernelSizingPolicy::standard(),
607 min_hit_capacity: 1024,
608 hit_capacity_multiplier: 2,
609 saturated_waves: 4,
610 hot_opcode_threshold: 8,
611 hot_window_threshold: 4,
612 jit_queue_len_threshold: 4096,
613 priority_age_threshold: 32,
614 sparse_frontier_threshold_bps: 500,
615 dense_frontier_threshold_bps: 4_000,
616 memory_pressure_threshold_bps: 8_500,
617 fusion_edge_threshold: 65_536,
618 scratch_bytes_per_hit: 16,
619 }
620 }
621
622 #[must_use]
624 pub fn launch_cache_stats() -> MegakernelLaunchCacheStats {
625 cache::LAUNCH_RECOMMENDATION_CACHE.with(|cache| cache.borrow().stats())
626 }
627
628 pub fn reset_launch_cache_for_thread() {
630 cache::LAUNCH_RECOMMENDATION_CACHE.with(|cache| cache.borrow_mut().clear());
631 }
632
633 pub fn recommend(
640 &self,
641 request: MegakernelLaunchRequest,
642 ) -> Result<MegakernelLaunchRecommendation, BackendError> {
643 self.recommend_inner(request, None)
644 }
645
646 pub fn recommend_with_topology_evidence(
653 &self,
654 request: MegakernelLaunchRequest,
655 ) -> Result<(MegakernelLaunchRecommendation, MegakernelTopologyEvidence), BackendError> {
656 let (effective_request, recommendation) = self.recommend_with_effective_request(request)?;
657 let evidence = self.topology_evidence_for(effective_request, recommendation);
658 Ok((recommendation, evidence))
659 }
660
661 pub fn recommend_with_promotion_evidence(
668 &self,
669 request: MegakernelLaunchRequest,
670 ) -> Result<(MegakernelLaunchRecommendation, MegakernelPromotionEvidence), BackendError> {
671 let (effective_request, recommendation) = self.recommend_with_effective_request(request)?;
672 let evidence = self.promotion_evidence_for(effective_request, recommendation);
673 Ok((recommendation, evidence))
674 }
675
676 pub fn recommend_with_previous_topology(
690 &self,
691 request: MegakernelLaunchRequest,
692 previous_topology: MegakernelDispatchTopology,
693 ) -> Result<MegakernelLaunchRecommendation, BackendError> {
694 self.recommend_inner(request, Some(previous_topology))
695 }
696
697 fn recommend_inner(
698 &self,
699 request: MegakernelLaunchRequest,
700 previous_topology: Option<MegakernelDispatchTopology>,
701 ) -> Result<MegakernelLaunchRecommendation, BackendError> {
702 let cache_key = cache::LaunchRecommendationCacheKey {
703 policy: *self,
704 request,
705 };
706 if previous_topology.is_none() {
707 if let Some(cached) =
708 cache::LAUNCH_RECOMMENDATION_CACHE.with(|cache| cache.borrow_mut().get(&cache_key))
709 {
710 return Ok(cached);
711 }
712 }
713
714 let effective_request = self.infer_missing_scale_signals(request)?;
715 let promote_hot_opcodes = effective_request.hot_opcode_count >= self.hot_opcode_threshold;
716 let promote_hot_windows = effective_request.hot_window_count >= self.hot_window_threshold;
717 let raw_topology =
718 self.dispatch_topology_for(effective_request, promote_hot_opcodes, promote_hot_windows);
719 let topology = self.stabilize_topology(
720 raw_topology,
721 effective_request,
722 previous_topology,
723 promote_hot_opcodes,
724 promote_hot_windows,
725 );
726 let scheduled_request = self.apply_topology_worker_policy(effective_request, topology)?;
727 let grid = self.sizing.calculate_optimal_grid(
728 MegakernelGridRequest::new(
729 scheduled_request.queue_len,
730 scheduled_request.requested_worker_groups,
731 ),
732 MegakernelGridLimits::new(
733 scheduled_request.max_workgroup_size_x,
734 scheduled_request.max_compute_workgroups_per_dimension,
735 scheduled_request.max_compute_invocations_per_workgroup,
736 ),
737 )?;
738 let geometry = grid.geometry;
739 let worker_groups = grid.worker_groups;
740 let lanes = u64::from(geometry.dispatch_grid[0])
741 .checked_mul(u64::from(geometry.workgroup_size_x))
742 .ok_or_else(|| {
743 BackendError::new(
744 "megakernel launch lane count overflowed u64. Fix: reduce dispatch grid or workgroup size.",
745 )
746 })?;
747 let pressure = classify_pressure(
748 effective_request.queue_len,
749 lanes,
750 effective_request.requeue_count,
751 self,
752 )?;
753 let hit_capacity = self.hit_capacity_for(effective_request)?;
754 let estimated_peak_device_bytes =
755 self.estimated_peak_device_bytes(effective_request, hit_capacity)?;
756 if effective_request.device_memory_budget_bytes != 0
757 && estimated_peak_device_bytes > effective_request.device_memory_budget_bytes
758 {
759 return Err(BackendError::DeviceOutOfMemory {
760 requested: estimated_peak_device_bytes,
761 available: effective_request.device_memory_budget_bytes,
762 });
763 }
764 let execution_mode = if effective_request.queue_len >= self.jit_queue_len_threshold
765 || promote_hot_opcodes
766 || promote_hot_windows
767 || topology == MegakernelDispatchTopology::FusedDense
768 {
769 MegakernelExecutionMode::Jit
770 } else {
771 MegakernelExecutionMode::Interpreter
772 };
773 let age_priority_work = effective_request.requeue_count > 0
774 || effective_request.max_priority_age >= self.priority_age_threshold;
775
776 let recommendation = MegakernelLaunchRecommendation {
777 geometry,
778 worker_groups,
779 hit_capacity,
780 pressure,
781 execution_mode,
782 topology,
783 promote_hot_opcodes,
784 promote_hot_windows,
785 age_priority_work,
786 estimated_peak_device_bytes,
787 device_memory_budget_bytes: effective_request.device_memory_budget_bytes,
788 };
789 if previous_topology.is_none() {
790 cache::LAUNCH_RECOMMENDATION_CACHE.with(|cache| {
791 cache.borrow_mut().insert(cache_key, recommendation);
792 });
793 }
794 Ok(recommendation)
795 }
796
797 fn recommend_with_effective_request(
798 &self,
799 request: MegakernelLaunchRequest,
800 ) -> Result<(MegakernelLaunchRequest, MegakernelLaunchRecommendation), BackendError> {
801 let effective_request = self.infer_missing_scale_signals(request)?;
802 let recommendation = self.recommend(effective_request)?;
803 Ok((effective_request, recommendation))
804 }
805
806 fn topology_evidence_for(
807 &self,
808 request: MegakernelLaunchRequest,
809 recommendation: MegakernelLaunchRecommendation,
810 ) -> MegakernelTopologyEvidence {
811 MegakernelTopologyEvidence {
812 schema_version: TOPOLOGY_EVIDENCE_SCHEMA_VERSION,
813 queue_pressure: recommendation.pressure,
814 frontier_density_bps: request.frontier_density_bps,
815 semiring_frontier_density_bps: request.frontier_density_bps,
816 selected_topology: recommendation.topology,
817 graphblas_switch_class: Self::graphblas_switch_class_for(recommendation.topology),
818 resident_device_bytes: request.resident_device_bytes,
819 estimated_peak_device_bytes: recommendation.estimated_peak_device_bytes,
820 output_parity_required: true,
821 }
822 }
823
824 fn promotion_evidence_for(
825 &self,
826 request: MegakernelLaunchRequest,
827 recommendation: MegakernelLaunchRecommendation,
828 ) -> MegakernelPromotionEvidence {
829 MegakernelPromotionEvidence {
830 schema_version: HOT_WINDOW_PROMOTION_EVIDENCE_SCHEMA_VERSION,
831 queue_len: request.queue_len,
832 jit_queue_len_threshold: self.jit_queue_len_threshold,
833 hot_opcode_count: request.hot_opcode_count,
834 hot_opcode_threshold: self.hot_opcode_threshold,
835 hot_window_count: request.hot_window_count,
836 hot_window_threshold: self.hot_window_threshold,
837 execution_mode: recommendation.execution_mode,
838 promote_hot_opcodes: recommendation.promote_hot_opcodes,
839 promote_hot_windows: recommendation.promote_hot_windows,
840 promotion_route: Self::promotion_route_for(recommendation),
841 fused_descriptor_window_required: recommendation.promote_hot_windows,
842 output_parity_required: true,
843 }
844 }
845
846 fn promotion_route_for(
847 recommendation: MegakernelLaunchRecommendation,
848 ) -> MegakernelPromotionRoute {
849 if recommendation.execution_mode == MegakernelExecutionMode::Interpreter {
850 return MegakernelPromotionRoute::Interpreter;
851 }
852 match (
853 recommendation.promote_hot_opcodes,
854 recommendation.promote_hot_windows,
855 ) {
856 (true, true) => MegakernelPromotionRoute::OpcodeAndWindowJit,
857 (true, false) => MegakernelPromotionRoute::OpcodeJit,
858 (false, true) => MegakernelPromotionRoute::WindowJit,
859 (false, false) => MegakernelPromotionRoute::QueueJit,
860 }
861 }
862
863 fn graphblas_switch_class_for(
864 topology: MegakernelDispatchTopology,
865 ) -> MegakernelGraphBlasSwitchClass {
866 match topology {
867 MegakernelDispatchTopology::Empty => MegakernelGraphBlasSwitchClass::Empty,
868 MegakernelDispatchTopology::SparseFrontier => MegakernelGraphBlasSwitchClass::Sparse,
869 MegakernelDispatchTopology::HybridFrontier => MegakernelGraphBlasSwitchClass::Hybrid,
870 MegakernelDispatchTopology::DenseFrontier
871 | MegakernelDispatchTopology::FusedDense => MegakernelGraphBlasSwitchClass::Dense,
872 MegakernelDispatchTopology::MemoryConstrained => {
873 MegakernelGraphBlasSwitchClass::MemoryConstrained
874 }
875 }
876 }
877
878 fn hit_capacity_for(&self, request: MegakernelLaunchRequest) -> Result<u32, BackendError> {
879 if request.requested_hit_capacity != 0 {
880 return Ok(request.requested_hit_capacity);
881 }
882 let expected_hits = request.expected_hits_per_item.max(1);
883 let multiplier = if request.memory_pressure_bps >= self.memory_pressure_threshold_bps {
884 1
885 } else {
886 self.hit_capacity_multiplier
887 };
888 let derived = request
889 .queue_len
890 .checked_mul(expected_hits)
891 .and_then(|value| value.checked_mul(multiplier))
892 .ok_or_else(|| {
893 BackendError::new(
894 "megakernel sparse-hit capacity overflowed u32. Fix: lower queue length, expected_hits_per_item, or hit_capacity_multiplier.",
895 )
896 })?;
897 Ok(derived.max(self.min_hit_capacity))
898 }
899
900 fn estimated_peak_device_bytes(
901 &self,
902 request: MegakernelLaunchRequest,
903 hit_capacity: u32,
904 ) -> Result<u64, BackendError> {
905 let scratch_bytes = u64::from(hit_capacity)
906 .checked_mul(u64::from(self.scratch_bytes_per_hit))
907 .ok_or_else(|| {
908 BackendError::new(
909 "megakernel scratch byte estimate overflowed u64. Fix: lower hit capacity or scratch_bytes_per_hit.",
910 )
911 })?;
912 request
913 .resident_device_bytes
914 .checked_add(scratch_bytes)
915 .ok_or_else(|| {
916 BackendError::new(
917 "megakernel peak resident byte estimate overflowed u64. Fix: reduce resident buffers or scratch capacity.",
918 )
919 })
920 }
921
922 fn infer_missing_scale_signals(
923 &self,
924 mut request: MegakernelLaunchRequest,
925 ) -> Result<MegakernelLaunchRequest, BackendError> {
926 if request.frontier_density_bps == 0
927 && request.queue_len != 0
928 && request.graph_node_count != 0
929 {
930 let active_nodes = u64::from(request.queue_len.min(request.graph_node_count));
931 let density = active_nodes
932 .checked_mul(10_000)
933 .ok_or_else(|| {
934 BackendError::new(
935 "megakernel frontier-density numerator overflowed u64. Fix: shard the resident graph before launch.",
936 )
937 })?
938 .checked_div(u64::from(request.graph_node_count))
939 .unwrap_or(0)
940 .clamp(1, 10_000);
941 request.frontier_density_bps = u16::try_from(density).map_err(|error| {
942 BackendError::new(format!(
943 "megakernel frontier density cannot fit u16: {error}. Fix: clamp density before ABI encoding."
944 ))
945 })?;
946 }
947 if request.memory_pressure_bps == 0
948 && request.device_memory_budget_bytes != 0
949 && request.resident_device_bytes != 0
950 {
951 let pressure = (u128::from(request.resident_device_bytes)
952 .checked_mul(10_000)
953 .ok_or_else(|| {
954 BackendError::new(
955 "megakernel memory-pressure numerator overflowed u128. Fix: reduce resident device bytes before launch.",
956 )
957 })?
958 / u128::from(request.device_memory_budget_bytes))
959 .min(10_000);
960 request.memory_pressure_bps = u16::try_from(pressure).map_err(|error| {
961 BackendError::new(format!(
962 "megakernel memory pressure cannot fit u16: {error}. Fix: clamp pressure before ABI encoding."
963 ))
964 })?;
965 }
966 Ok(request)
967 }
968
969 fn apply_topology_worker_policy(
970 &self,
971 mut request: MegakernelLaunchRequest,
972 topology: MegakernelDispatchTopology,
973 ) -> Result<MegakernelLaunchRequest, BackendError> {
974 if topology == MegakernelDispatchTopology::MemoryConstrained
975 && request.memory_pressure_bps != 0
976 && request.requested_worker_groups > 1
977 {
978 let pressure_span = u32::from(
979 10_000_u16
980 .checked_sub(self.memory_pressure_threshold_bps)
981 .ok_or_else(|| {
982 BackendError::new(
983 "megakernel memory-pressure threshold exceeds 10000 bps. Fix: configure threshold in basis points.",
984 )
985 })?,
986 )
987 .max(1);
988 let over_threshold = u32::from(
989 match request
990 .memory_pressure_bps
991 .checked_sub(self.memory_pressure_threshold_bps)
992 {
993 Some(value) => value,
994 None => 0,
995 },
996 )
997 .min(pressure_span);
998 let shed_bps = 2_500_u32
999 .checked_add(
1000 over_threshold
1001 .checked_mul(2_500)
1002 .ok_or_else(|| {
1003 BackendError::new(
1004 "megakernel memory-pressure worker shed overflowed u32. Fix: lower pressure telemetry before launch.",
1005 )
1006 })?
1007 / pressure_span,
1008 )
1009 .ok_or_else(|| {
1010 BackendError::new(
1011 "megakernel memory-pressure worker shed overflowed u32. Fix: lower pressure telemetry before launch.",
1012 )
1013 })?;
1014 let keep_bps = 10_000_u32.checked_sub(shed_bps).ok_or_else(|| {
1015 BackendError::new(
1016 "megakernel memory-pressure worker keep ratio underflowed. Fix: keep shed_bps within 0..=10000.",
1017 )
1018 })?;
1019 let scaled = u64::from(request.requested_worker_groups)
1020 .checked_mul(u64::from(keep_bps))
1021 .ok_or_else(|| {
1022 BackendError::new(
1023 "megakernel memory-constrained worker count overflowed u64. Fix: reduce requested worker groups.",
1024 )
1025 })?
1026 / 10_000;
1027 request.requested_worker_groups = u32::try_from(scaled)
1028 .map_err(|error| {
1029 BackendError::new(format!(
1030 "megakernel memory-constrained worker count cannot fit u32: {error}. Fix: reduce requested worker groups."
1031 ))
1032 })?
1033 .max(1);
1034 }
1035 if topology == MegakernelDispatchTopology::SparseFrontier
1036 && request.graph_node_count != 0
1037 && request.frontier_density_bps != 0
1038 && request.requested_worker_groups > 1
1039 {
1040 let sparse_span = u32::from(self.sparse_frontier_threshold_bps).max(1);
1041 let density = u32::from(request.frontier_density_bps).clamp(1, sparse_span);
1042 let scaled = u64::from(request.requested_worker_groups)
1043 .checked_mul(u64::from(density))
1044 .ok_or_else(|| {
1045 BackendError::new(
1046 "megakernel sparse-frontier worker count overflowed u64. Fix: reduce requested worker groups.",
1047 )
1048 })?
1049 / u64::from(sparse_span);
1050 let warp_floor = request.requested_worker_groups.min(32);
1051 request.requested_worker_groups = u32::try_from(scaled)
1052 .map_err(|error| {
1053 BackendError::new(format!(
1054 "megakernel sparse-frontier worker count cannot fit u32: {error}. Fix: reduce requested worker groups."
1055 ))
1056 })?
1057 .max(warp_floor)
1058 .min(request.requested_worker_groups);
1059 }
1060 Ok(request)
1061 }
1062
1063 fn dispatch_topology_for(
1064 &self,
1065 request: MegakernelLaunchRequest,
1066 promote_hot_opcodes: bool,
1067 promote_hot_windows: bool,
1068 ) -> MegakernelDispatchTopology {
1069 if request.queue_len == 0 {
1070 return MegakernelDispatchTopology::Empty;
1071 }
1072 if request.memory_pressure_bps >= self.memory_pressure_threshold_bps {
1073 return MegakernelDispatchTopology::MemoryConstrained;
1074 }
1075 if request.frontier_density_bps <= self.sparse_frontier_threshold_bps {
1076 return MegakernelDispatchTopology::SparseFrontier;
1077 }
1078 let dense = request.frontier_density_bps >= self.dense_frontier_threshold_bps;
1079 let graph_is_large =
1080 request.graph_node_count > 0 && request.graph_edge_count >= self.fusion_edge_threshold;
1081 if dense && graph_is_large && (promote_hot_opcodes || promote_hot_windows) {
1082 return MegakernelDispatchTopology::FusedDense;
1083 }
1084 if dense {
1085 return MegakernelDispatchTopology::DenseFrontier;
1086 }
1087 MegakernelDispatchTopology::HybridFrontier
1088 }
1089
1090 fn stabilize_topology(
1091 &self,
1092 raw_topology: MegakernelDispatchTopology,
1093 request: MegakernelLaunchRequest,
1094 previous_topology: Option<MegakernelDispatchTopology>,
1095 promote_hot_opcodes: bool,
1096 promote_hot_windows: bool,
1097 ) -> MegakernelDispatchTopology {
1098 if raw_topology == MegakernelDispatchTopology::Empty {
1099 return raw_topology;
1100 }
1101 if raw_topology == MegakernelDispatchTopology::MemoryConstrained {
1102 return raw_topology;
1103 }
1104 let Some(previous_topology) = previous_topology else {
1105 return raw_topology;
1106 };
1107 if previous_topology == MegakernelDispatchTopology::MemoryConstrained
1108 && request.memory_pressure_bps
1109 >= hysteresis_sub(
1110 self.memory_pressure_threshold_bps,
1111 MEMORY_TOPOLOGY_HYSTERESIS_BPS,
1112 )
1113 {
1114 return MegakernelDispatchTopology::MemoryConstrained;
1115 }
1116
1117 match previous_topology {
1118 MegakernelDispatchTopology::SparseFrontier
1119 if raw_topology != MegakernelDispatchTopology::SparseFrontier
1120 && request.frontier_density_bps
1121 <= hysteresis_add(
1122 self.sparse_frontier_threshold_bps,
1123 FRONTIER_TOPOLOGY_HYSTERESIS_BPS,
1124 ) =>
1125 {
1126 MegakernelDispatchTopology::SparseFrontier
1127 }
1128 MegakernelDispatchTopology::HybridFrontier
1129 if raw_topology == MegakernelDispatchTopology::SparseFrontier
1130 && request.frontier_density_bps
1131 >= hysteresis_sub(
1132 self.sparse_frontier_threshold_bps,
1133 FRONTIER_TOPOLOGY_HYSTERESIS_BPS,
1134 ) =>
1135 {
1136 MegakernelDispatchTopology::HybridFrontier
1137 }
1138 MegakernelDispatchTopology::HybridFrontier
1139 if matches!(
1140 raw_topology,
1141 MegakernelDispatchTopology::DenseFrontier
1142 | MegakernelDispatchTopology::FusedDense
1143 ) && request.frontier_density_bps
1144 <= hysteresis_add(
1145 self.dense_frontier_threshold_bps,
1146 FRONTIER_TOPOLOGY_HYSTERESIS_BPS,
1147 ) =>
1148 {
1149 MegakernelDispatchTopology::HybridFrontier
1150 }
1151 MegakernelDispatchTopology::DenseFrontier
1152 if raw_topology == MegakernelDispatchTopology::HybridFrontier
1153 && request.frontier_density_bps
1154 >= hysteresis_sub(
1155 self.dense_frontier_threshold_bps,
1156 FRONTIER_TOPOLOGY_HYSTERESIS_BPS,
1157 ) =>
1158 {
1159 MegakernelDispatchTopology::DenseFrontier
1160 }
1161 MegakernelDispatchTopology::FusedDense
1162 if raw_topology == MegakernelDispatchTopology::HybridFrontier
1163 && request.frontier_density_bps
1164 >= hysteresis_sub(
1165 self.dense_frontier_threshold_bps,
1166 FRONTIER_TOPOLOGY_HYSTERESIS_BPS,
1167 )
1168 && request.graph_edge_count >= self.fusion_edge_threshold
1169 && (promote_hot_opcodes || promote_hot_windows) =>
1170 {
1171 MegakernelDispatchTopology::FusedDense
1172 }
1173 _ => raw_topology,
1174 }
1175 }
1176
1177 #[must_use]
1188 pub fn autotune_hit_capacity_multiplier(
1189 &self,
1190 candidate_multipliers: &[u32],
1191 costs: &[f64],
1192 ) -> u32 {
1193 if candidate_multipliers.is_empty() || costs.is_empty() {
1194 return self.hit_capacity_multiplier;
1195 }
1196 let n = candidate_multipliers.len().min(costs.len());
1197 let chosen = best_cost_index(&costs[..n]);
1198 candidate_multipliers
1199 .get(chosen)
1200 .copied()
1201 .unwrap_or(self.hit_capacity_multiplier)
1202 }
1203
1204 #[must_use]
1210 pub fn autotune_workgroup_size(
1211 &self,
1212 candidate_sizes: &[u32],
1213 costs: &[f64],
1214 current_size: u32,
1215 ) -> u32 {
1216 if candidate_sizes.is_empty() || costs.is_empty() {
1217 return current_size;
1218 }
1219 let n = candidate_sizes.len().min(costs.len());
1220 let chosen = best_cost_index(&costs[..n]);
1221 candidate_sizes.get(chosen).copied().unwrap_or(current_size)
1222 }
1223
1224 #[must_use]
1241 #[cfg(any(test, feature = "legacy-infallible"))]
1242 pub fn natural_gradient_autotune_step(
1243 m_inv_sqrt: &[f64],
1244 grad: &[f64],
1245 n: u32,
1246 learning_rate: f64,
1247 ) -> Vec<f64> {
1248 Self::try_natural_gradient_autotune_step(m_inv_sqrt, grad, n, learning_rate)
1249 .unwrap_or_else(|source| {
1250 panic!(
1251 "megakernel natural-gradient autotune allocation failed: {source}. Fix: shard the autotune surface."
1252 )
1253 })
1254 }
1255
1256 pub fn try_natural_gradient_autotune_step(
1263 m_inv_sqrt: &[f64],
1264 grad: &[f64],
1265 n: u32,
1266 learning_rate: f64,
1267 ) -> Result<Vec<f64>, BackendError> {
1268 let mut out = Vec::new();
1269 Self::try_natural_gradient_autotune_step_into(
1270 m_inv_sqrt,
1271 grad,
1272 n,
1273 learning_rate,
1274 &mut out,
1275 )?;
1276 Ok(out)
1277 }
1278
1279 #[cfg(any(test, feature = "legacy-infallible"))]
1281 pub fn natural_gradient_autotune_step_into(
1282 m_inv_sqrt: &[f64],
1283 grad: &[f64],
1284 n: u32,
1285 learning_rate: f64,
1286 out: &mut Vec<f64>,
1287 ) {
1288 Self::try_natural_gradient_autotune_step_into(m_inv_sqrt, grad, n, learning_rate, out)
1289 .unwrap_or_else(|source| {
1290 panic!(
1291 "megakernel natural-gradient autotune allocation failed: {source}. Fix: shard the autotune surface."
1292 )
1293 });
1294 }
1295
1296 pub fn try_natural_gradient_autotune_step_into(
1304 m_inv_sqrt: &[f64],
1305 grad: &[f64],
1306 n: u32,
1307 learning_rate: f64,
1308 out: &mut Vec<f64>,
1309 ) -> Result<(), BackendError> {
1310 let n = u32_to_usize_checked(n, "natural-gradient dimension")?;
1311 out.clear();
1312 let Some(required_matrix_len) = n.checked_mul(n) else {
1313 return Ok(());
1314 };
1315 if m_inv_sqrt.len() < required_matrix_len || grad.len() < n {
1316 return Ok(());
1317 }
1318 reserve_target_capacity(out, n, "natural-gradient output")?;
1319 out.resize(n, 0.0);
1320 for row in 0..n {
1321 let mut acc = 0.0;
1322 for col in 0..n {
1323 acc += m_inv_sqrt[row * n + col] * grad[col];
1324 }
1325 out[row] = -learning_rate * acc;
1326 }
1327 Ok(())
1328 }
1329}
1330
1331fn diffuse_step_into(
1332 stalks: &[f64],
1333 restriction_diag: &[f64],
1334 damping: f64,
1335 out: &mut Vec<f64>,
1336) -> Result<(), BackendError> {
1337 out.clear();
1338 reserve_target_capacity(out, stalks.len(), "priority diffusion scratch")?;
1339 out.resize(stalks.len(), 0.0);
1340 for ((slot, &stalk), &restriction) in out
1341 .iter_mut()
1342 .zip(stalks.iter())
1343 .zip(restriction_diag.iter())
1344 {
1345 *slot = stalk - damping * restriction * stalk;
1346 }
1347 Ok(())
1348}
1349
1350fn reserve_target_capacity<T>(
1351 out: &mut Vec<T>,
1352 target_capacity: usize,
1353 label: &'static str,
1354) -> Result<(), BackendError> {
1355 try_reserve_vec_capacity(out, target_capacity).map_err(|source| {
1356 BackendError::new(format!(
1357 "megakernel {label} reservation failed for {target_capacity} element(s): {source}. Fix: shard the policy input before launch-policy math."
1358 ))
1359 })
1360}
1361
1362fn best_cost_index(costs: &[f64]) -> usize {
1363 debug_assert!(!costs.is_empty());
1364 let mut best = 0;
1365 let mut best_cost = costs[0];
1366 for (index, &cost) in costs.iter().enumerate().skip(1) {
1367 if cost.total_cmp(&best_cost).is_lt() {
1368 best = index;
1369 best_cost = cost;
1370 }
1371 }
1372 best
1373}
1374
1375fn u32_to_usize_checked(value: u32, label: &'static str) -> Result<usize, BackendError> {
1376 usize::try_from(value).map_err(|error| {
1377 BackendError::new(format!(
1378 "{label} cannot fit usize: {error}. Fix: shard the autotune surface."
1379 ))
1380 })
1381}
1382
1383fn hysteresis_add(value: u16, hysteresis: u16) -> u16 {
1384 value.saturating_add(hysteresis)
1385}
1386
1387fn hysteresis_sub(value: u16, hysteresis: u16) -> u16 {
1388 value.saturating_sub(hysteresis)
1389}
1390
1391fn classify_pressure(
1392 queue_len: u32,
1393 lanes: u64,
1394 requeue_count: u64,
1395 policy: &MegakernelLaunchPolicy,
1396) -> Result<MegakernelQueuePressure, BackendError> {
1397 if queue_len == 0 {
1398 return Ok(MegakernelQueuePressure::Empty);
1399 }
1400 let lanes = lanes.max(1);
1401 let queue_len = u64::from(queue_len);
1402 let saturated_lanes = lanes
1403 .checked_mul(u64::from(policy.saturated_waves))
1404 .ok_or_else(|| {
1405 BackendError::new(
1406 "megakernel pressure wave threshold overflowed u64. Fix: reduce worker lanes or saturated_waves.",
1407 )
1408 })?;
1409 if requeue_count > 0 || queue_len >= saturated_lanes {
1410 Ok(MegakernelQueuePressure::Saturated)
1411 } else if queue_len >= lanes {
1412 Ok(MegakernelQueuePressure::Balanced)
1413 } else {
1414 Ok(MegakernelQueuePressure::Light)
1415 }
1416}
1417
1418#[cfg(test)]
1419mod tests;