Skip to main content

vyre_runtime/megakernel/
policy.rs

1//! Resident megakernel launch policy and queue-pressure decisions.
2
3use vyre_driver::backend::BackendError;
4
5mod cache;
6use super::planner::{MegakernelGridLimits, MegakernelGridRequest, MegakernelLaunchGeometry};
7use super::staging_reserve::try_reserve_vec_capacity;
8
9/// Host-side pressure classification for one megakernel launch.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub enum MegakernelQueuePressure {
12    /// No logical slots are queued.
13    Empty,
14    /// The queue is below the available worker lanes.
15    Light,
16    /// The queue is large enough to keep the submitted workers occupied.
17    Balanced,
18    /// The queue is several waves deep or already showing requeue pressure.
19    Saturated,
20}
21
22/// Interpreter/JIT route selected by the launch policy.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub enum MegakernelExecutionMode {
25    /// Use the generic opcode interpreter.
26    Interpreter,
27    /// Use a fused payload processor for hot windows or opcodes.
28    Jit,
29}
30
31/// Scale-aware execution topology selected for one megakernel launch.
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33pub enum MegakernelDispatchTopology {
34    /// Nothing is queued.
35    Empty,
36    /// Low frontier density; prefer sparse frontier expansion and avoid
37    /// block-wide dense scans.
38    SparseFrontier,
39    /// Mid-density frontier; combine sparse frontier queues with dense block
40    /// tiles instead of forcing either extreme.
41    HybridFrontier,
42    /// High frontier density; prefer dense block propagation with coalesced
43    /// scans.
44    DenseFrontier,
45    /// High-density graph with enough hot structure to justify fused waves.
46    FusedDense,
47    /// Memory pressure is high enough that bounded occupancy is more important
48    /// than maximizing active waves.
49    MemoryConstrained,
50}
51
52/// Schema version for topology evidence emitted by the megakernel launch policy.
53pub const TOPOLOGY_EVIDENCE_SCHEMA_VERSION: u32 = 1;
54
55/// Schema version for hot opcode/window promotion evidence emitted by policy.
56pub const HOT_WINDOW_PROMOTION_EVIDENCE_SCHEMA_VERSION: u32 = 1;
57
58/// GraphBLAS-style sparse/dense switch class for a selected launch topology.
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60pub enum MegakernelGraphBlasSwitchClass {
61    /// Nothing is queued.
62    Empty,
63    /// Sparse frontier expansion is preferred.
64    Sparse,
65    /// Sparse and dense paths should both remain available.
66    Hybrid,
67    /// Dense propagation is preferred.
68    Dense,
69    /// Memory pressure overrides the sparse/dense frontier choice.
70    MemoryConstrained,
71}
72
73impl MegakernelGraphBlasSwitchClass {
74    /// Stable label for reports and bench output.
75    #[must_use]
76    pub const fn as_str(self) -> &'static str {
77        match self {
78            Self::Empty => "empty",
79            Self::Sparse => "sparse",
80            Self::Hybrid => "hybrid",
81            Self::Dense => "dense",
82            Self::MemoryConstrained => "memory_constrained",
83        }
84    }
85}
86
87/// Evidence envelope that makes topology selection auditable by runtime benches.
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub struct MegakernelTopologyEvidence {
90    /// Evidence schema version.
91    pub schema_version: u32,
92    /// Queue pressure that participated in the launch recommendation.
93    pub queue_pressure: MegakernelQueuePressure,
94    /// Active frontier density in basis points after policy-side inference.
95    pub frontier_density_bps: u16,
96    /// Semiring frontier density input used for GraphBLAS-style switching.
97    pub semiring_frontier_density_bps: u16,
98    /// Concrete topology selected by the launch policy.
99    pub selected_topology: MegakernelDispatchTopology,
100    /// Sparse/dense switch class corresponding to the selected topology.
101    pub graphblas_switch_class: MegakernelGraphBlasSwitchClass,
102    /// Resident device bytes reported by the caller after policy-side inference.
103    pub resident_device_bytes: u64,
104    /// Estimated peak resident bytes for the selected launch plan.
105    pub estimated_peak_device_bytes: u64,
106    /// True when benches must compare output parity across topology variants.
107    pub output_parity_required: bool,
108}
109
110impl MegakernelTopologyEvidence {
111    /// Return true when the evidence envelope contains bounded, versioned
112    /// fields that a parity bench can report without consulting hidden policy
113    /// state.
114    #[must_use]
115    pub fn is_complete(self) -> bool {
116        self.schema_version == TOPOLOGY_EVIDENCE_SCHEMA_VERSION
117            && self.frontier_density_bps <= 10_000
118            && self.semiring_frontier_density_bps <= 10_000
119            && self.output_parity_required
120    }
121}
122
123/// Interpreter/JIT promotion route selected from queue and hot-window signals.
124#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
125pub enum MegakernelPromotionRoute {
126    /// Stay on the generic interpreter path.
127    Interpreter,
128    /// Use JIT because the queue is large enough to amortize fused execution.
129    QueueJit,
130    /// Use JIT because opcode counters crossed the promotion threshold.
131    OpcodeJit,
132    /// Use JIT because repeated descriptor windows crossed the promotion threshold.
133    WindowJit,
134    /// Use JIT because both opcode and window promotion thresholds were crossed.
135    OpcodeAndWindowJit,
136}
137
138impl MegakernelPromotionRoute {
139    /// Stable label for reports and lowerer evidence.
140    #[must_use]
141    pub const fn as_str(self) -> &'static str {
142        match self {
143            Self::Interpreter => "interpreter",
144            Self::QueueJit => "queue_jit",
145            Self::OpcodeJit => "opcode_jit",
146            Self::WindowJit => "window_jit",
147            Self::OpcodeAndWindowJit => "opcode_and_window_jit",
148        }
149    }
150}
151
152/// Evidence envelope that makes hot opcode/window promotion auditable.
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub struct MegakernelPromotionEvidence {
155    /// Evidence schema version.
156    pub schema_version: u32,
157    /// Logical ring slots or work items queued for this launch.
158    pub queue_len: u32,
159    /// Queue length threshold that can trigger JIT without hot counters.
160    pub jit_queue_len_threshold: u32,
161    /// Hot opcode counter supplied to the policy.
162    pub hot_opcode_count: u32,
163    /// Hot opcode threshold configured on the policy.
164    pub hot_opcode_threshold: u32,
165    /// Hot descriptor-window counter supplied to the policy.
166    pub hot_window_count: u32,
167    /// Hot descriptor-window threshold configured on the policy.
168    pub hot_window_threshold: u32,
169    /// Interpreter or JIT route selected by the policy.
170    pub execution_mode: MegakernelExecutionMode,
171    /// True when opcode counters require fused opcode promotion.
172    pub promote_hot_opcodes: bool,
173    /// True when window counters require fused descriptor-window promotion.
174    pub promote_hot_windows: bool,
175    /// Stable promotion class for reports and lowerer input.
176    pub promotion_route: MegakernelPromotionRoute,
177    /// True when the lowerer should materialize fused descriptor windows.
178    pub fused_descriptor_window_required: bool,
179    /// True when benches must compare interpreter and fused-window outputs.
180    pub output_parity_required: bool,
181}
182
183impl MegakernelPromotionEvidence {
184    /// Return true when the promotion evidence carries all thresholds and
185    /// route fields needed by a lowerer or parity bench.
186    #[must_use]
187    pub fn is_complete(self) -> bool {
188        self.schema_version == HOT_WINDOW_PROMOTION_EVIDENCE_SCHEMA_VERSION
189            && self.jit_queue_len_threshold != 0
190            && self.hot_opcode_threshold != 0
191            && self.hot_window_threshold != 0
192            && self.fused_descriptor_window_required == self.promote_hot_windows
193            && self.output_parity_required
194    }
195}
196
197/// Thread-local launch recommendation cache telemetry.
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
199pub struct MegakernelLaunchCacheStats {
200    /// Live cache entries retained in the current thread.
201    pub entries: usize,
202    /// Cache hits served without recomputing launch geometry.
203    pub hits: u64,
204    /// Cache misses that required policy recomputation.
205    pub misses: u64,
206}
207
208/// Inputs for one launch-policy recommendation.
209#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
210pub struct MegakernelLaunchRequest {
211    /// Logical ring slots or work items queued for this launch.
212    pub queue_len: u32,
213    /// Caller-requested worker workgroup ceiling. Zero means derive from occupancy.
214    pub requested_worker_groups: u32,
215    /// Adapter maximum workgroup size in the x dimension.
216    pub max_workgroup_size_x: u32,
217    /// Adapter maximum compute workgroups per dimension.
218    pub max_compute_workgroups_per_dimension: u32,
219    /// Adapter maximum invocations per compute workgroup.
220    pub max_compute_invocations_per_workgroup: u32,
221    /// Caller-requested sparse-hit capacity. Zero means derive from queue shape.
222    pub requested_hit_capacity: u32,
223    /// Expected sparse hits per queued item when deriving hit capacity.
224    pub expected_hits_per_item: u32,
225    /// Count of opcodes observed hot enough for promotion.
226    pub hot_opcode_count: u32,
227    /// Count of ticketed route windows observed hot enough for promotion.
228    pub hot_window_count: u32,
229    /// Slots requeued by priority scheduling since the last recommendation.
230    pub requeue_count: u64,
231    /// Maximum priority age observed since the last recommendation.
232    pub max_priority_age: u32,
233    /// Nodes in the resident dependency graph. Zero means the caller has no
234    /// graph-shape telemetry for this launch.
235    pub graph_node_count: u32,
236    /// Edges in the resident dependency graph. Zero means the caller has no
237    /// graph-shape telemetry for this launch.
238    pub graph_edge_count: u32,
239    /// Active frontier density in basis points relative to graph nodes.
240    pub frontier_density_bps: u16,
241    /// Device-memory pressure in basis points relative to the active budget.
242    pub memory_pressure_bps: u16,
243    /// Device-resident bytes already required by this dispatch family.
244    pub resident_device_bytes: u64,
245    /// Hard device-memory budget for this launch. Zero means unbounded.
246    pub device_memory_budget_bytes: u64,
247}
248
249impl MegakernelLaunchRequest {
250    /// Construct a direct-dispatch request with conservative defaults.
251    #[must_use]
252    pub const fn direct(
253        queue_len: u32,
254        requested_worker_groups: u32,
255        max_workgroup_size_x: u32,
256    ) -> Self {
257        Self {
258            queue_len,
259            requested_worker_groups,
260            max_workgroup_size_x,
261            max_compute_workgroups_per_dimension: requested_worker_groups,
262            max_compute_invocations_per_workgroup: max_workgroup_size_x,
263            requested_hit_capacity: 0,
264            expected_hits_per_item: 1,
265            hot_opcode_count: 0,
266            hot_window_count: 0,
267            requeue_count: 0,
268            max_priority_age: 0,
269            graph_node_count: 0,
270            graph_edge_count: 0,
271            frontier_density_bps: 0,
272            memory_pressure_bps: 0,
273            resident_device_bytes: 0,
274            device_memory_budget_bytes: 0,
275        }
276    }
277}
278
279/// Policy output consumed by runtime dispatchers and batch builders.
280#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281pub struct MegakernelLaunchRecommendation {
282    /// Padded launch geometry for the ring protocol.
283    pub geometry: MegakernelLaunchGeometry,
284    /// Worker workgroups selected for the dispatch.
285    pub worker_groups: u32,
286    /// Sparse-hit capacity selected for the dispatch.
287    pub hit_capacity: u32,
288    /// Queue pressure classification.
289    pub pressure: MegakernelQueuePressure,
290    /// Interpreter or JIT route selected from telemetry.
291    pub execution_mode: MegakernelExecutionMode,
292    /// Scale-aware dispatch topology selected from graph shape, frontier
293    /// density, and memory pressure.
294    pub topology: MegakernelDispatchTopology,
295    /// True when hot opcode counters justify fused opcode promotion.
296    pub promote_hot_opcodes: bool,
297    /// True when ticketed route windows justify fused window promotion.
298    pub promote_hot_windows: bool,
299    /// True when aged/requeued priority work should be lifted on the next publish.
300    pub age_priority_work: bool,
301    /// Estimated peak device bytes needed by the resident launch plan.
302    pub estimated_peak_device_bytes: u64,
303    /// Hard device-memory budget applied to this recommendation. Zero means unbounded.
304    pub device_memory_budget_bytes: u64,
305}
306
307/// Requeue and aging counters produced by priority-aware schedulers.
308#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
309pub struct PriorityRequeueAccounting {
310    /// Number of slots requeued due to contention or quota pressure.
311    pub requeue_count: u64,
312    /// Number of slots promoted because their priority age crossed policy.
313    pub aged_promotions: u64,
314    /// Largest age observed for any queued priority slot.
315    pub max_priority_age: u32,
316}
317
318/// Counter headroom at or below which schedulers should drain telemetry.
319pub const PRIORITY_COUNTER_DRAIN_HEADROOM: u64 = 1024;
320
321/// Stable operator fix for priority counter drain recommendations.
322pub const PRIORITY_COUNTER_DRAIN_FIX: &str =
323    "drain scheduler telemetry before counters reach u64::MAX";
324
325/// Reason a priority scheduler should drain telemetry into a launch request.
326#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
327pub enum PriorityDrainReason {
328    /// No priority telemetry is pending.
329    None,
330    /// Non-empty priority telemetry should be propagated to the policy.
331    PendingTelemetry,
332    /// The requeue counter is inside the configured drain headroom.
333    RequeueCounterNearLimit,
334    /// The aged-promotion counter is inside the configured drain headroom.
335    AgedPromotionCounterNearLimit,
336    /// The requeue counter is exhausted.
337    RequeueCounterExhausted,
338    /// The aged-promotion counter is exhausted.
339    AgedPromotionCounterExhausted,
340}
341
342impl PriorityDrainReason {
343    /// Stable label for tests, reports, and scheduler diagnostics.
344    #[must_use]
345    pub const fn as_str(self) -> &'static str {
346        match self {
347            Self::None => "none",
348            Self::PendingTelemetry => "pending_telemetry",
349            Self::RequeueCounterNearLimit => "requeue_counter_near_limit",
350            Self::AgedPromotionCounterNearLimit => "aged_promotion_counter_near_limit",
351            Self::RequeueCounterExhausted => "requeue_counter_exhausted",
352            Self::AgedPromotionCounterExhausted => "aged_promotion_counter_exhausted",
353        }
354    }
355}
356
357/// Structured drain recommendation for priority scheduler counters.
358#[derive(Debug, Clone, Copy, PartialEq, Eq)]
359pub struct PriorityDrainRecommendation {
360    /// True when the scheduler should drain telemetry before accepting more work.
361    pub should_drain: bool,
362    /// Concrete reason for the recommendation.
363    pub reason: PriorityDrainReason,
364    /// Requeue counter value included for propagation into launch telemetry.
365    pub requeue_count: u64,
366    /// Aged-promotion counter value included for propagation into launch telemetry.
367    pub aged_promotions: u64,
368    /// Largest priority age observed for any queued slot.
369    pub max_priority_age: u32,
370    /// Remaining requeue counter increments before exact overflow.
371    pub requeue_counter_headroom: u64,
372    /// Remaining aged-promotion counter increments before exact overflow.
373    pub aged_promotion_counter_headroom: u64,
374    /// Stable operator fix string to surface with drain diagnostics.
375    pub fix: &'static str,
376}
377
378impl PriorityRequeueAccounting {
379    /// Return a structured drain recommendation for scheduler telemetry.
380    #[must_use]
381    pub fn drain_recommendation(self) -> PriorityDrainRecommendation {
382        let requeue_counter_headroom = u64::MAX.saturating_sub(self.requeue_count);
383        let aged_promotion_counter_headroom = u64::MAX.saturating_sub(self.aged_promotions);
384        let reason = if self.requeue_count == u64::MAX {
385            PriorityDrainReason::RequeueCounterExhausted
386        } else if self.aged_promotions == u64::MAX {
387            PriorityDrainReason::AgedPromotionCounterExhausted
388        } else if requeue_counter_headroom <= PRIORITY_COUNTER_DRAIN_HEADROOM {
389            PriorityDrainReason::RequeueCounterNearLimit
390        } else if aged_promotion_counter_headroom <= PRIORITY_COUNTER_DRAIN_HEADROOM {
391            PriorityDrainReason::AgedPromotionCounterNearLimit
392        } else if self.requeue_count != 0 || self.aged_promotions != 0 || self.max_priority_age != 0
393        {
394            PriorityDrainReason::PendingTelemetry
395        } else {
396            PriorityDrainReason::None
397        };
398        PriorityDrainRecommendation {
399            should_drain: reason != PriorityDrainReason::None,
400            reason,
401            requeue_count: self.requeue_count,
402            aged_promotions: self.aged_promotions,
403            max_priority_age: self.max_priority_age,
404            requeue_counter_headroom,
405            aged_promotion_counter_headroom,
406            fix: PRIORITY_COUNTER_DRAIN_FIX,
407        }
408    }
409
410    /// Record one requeue event.
411    pub fn record_requeue(&mut self, age_ticks: u32) {
412        self.requeue_count = self.requeue_count.saturating_add(1);
413        self.max_priority_age = self.max_priority_age.max(age_ticks);
414    }
415
416    /// Record one requeue event with exact overflow reporting.
417    ///
418    /// # Errors
419    ///
420    /// Returns [`BackendError`] when the requeue counter would overflow.
421    pub fn try_record_requeue(&mut self, age_ticks: u32) -> Result<(), BackendError> {
422        self.requeue_count = self.requeue_count.checked_add(1).ok_or_else(|| {
423            BackendError::new(
424                "megakernel priority requeue_count overflowed u64. Fix: drain scheduler telemetry before counters reach u64::MAX.",
425            )
426        })?;
427        self.max_priority_age = self.max_priority_age.max(age_ticks);
428        Ok(())
429    }
430
431    /// Record one priority-aging promotion.
432    pub fn record_aged_promotion(&mut self, age_ticks: u32) {
433        self.aged_promotions = self.aged_promotions.saturating_add(1);
434        self.max_priority_age = self.max_priority_age.max(age_ticks);
435    }
436
437    /// Record one priority-aging promotion with exact overflow reporting.
438    ///
439    /// # Errors
440    ///
441    /// Returns [`BackendError`] when the aged-promotion counter would overflow.
442    pub fn try_record_aged_promotion(&mut self, age_ticks: u32) -> Result<(), BackendError> {
443        self.aged_promotions = self.aged_promotions.checked_add(1).ok_or_else(|| {
444            BackendError::new(
445                "megakernel aged_promotions overflowed u64. Fix: drain scheduler telemetry before counters reach u64::MAX.",
446            )
447        })?;
448        self.max_priority_age = self.max_priority_age.max(age_ticks);
449        Ok(())
450    }
451}
452
453/// Diffuse priority signals across a set of priority-class siblings
454/// via sheaf diffusion (P-RUNTIME-3). Higher-priority siblings pull
455/// neighbors toward higher priority; lower-priority siblings drag
456/// down. After a few diffusion steps, each item's priority reflects
457/// both its own age and its neighborhood pressure  -  letting requeue
458/// decisions be group-aware without hand-rolling a propagation pass.
459///
460/// `priority_stalks` is the per-item priority value (caller's choice
461/// of scale; higher = more urgent). `restriction_diag` is the
462/// per-item transmission coefficient (1.0 = freely shares priority,
463/// 0.0 = isolated). `damping` controls the diffusion rate in [0, 1].
464///
465/// Returns the post-diffusion priority vector, same shape as input.
466#[must_use]
467#[cfg(any(test, feature = "legacy-infallible"))]
468pub fn diffuse_priority_across_siblings(
469    priority_stalks: &[f64],
470    restriction_diag: &[f64],
471    damping: f64,
472    iterations: u32,
473) -> Vec<f64> {
474    try_diffuse_priority_across_siblings(priority_stalks, restriction_diag, damping, iterations)
475        .unwrap_or_else(|source| {
476            panic!(
477                "megakernel priority diffusion allocation failed: {source}. Fix: shard the priority sibling set before diffusion."
478            )
479        })
480}
481
482/// Diffuse priority signals across priority-class siblings with fallible
483/// output staging.
484///
485/// # Errors
486///
487/// Returns [`BackendError`] when host staging cannot be reserved for the
488/// priority vector.
489pub fn try_diffuse_priority_across_siblings(
490    priority_stalks: &[f64],
491    restriction_diag: &[f64],
492    damping: f64,
493    iterations: u32,
494) -> Result<Vec<f64>, BackendError> {
495    let mut current = Vec::new();
496    let mut next = Vec::new();
497    try_diffuse_priority_across_siblings_into(
498        priority_stalks,
499        restriction_diag,
500        damping,
501        iterations,
502        &mut current,
503        &mut next,
504    )?;
505    Ok(current)
506}
507
508/// Diffuse priority signals into caller-owned storage.
509#[cfg(any(test, feature = "legacy-infallible"))]
510pub fn diffuse_priority_across_siblings_into(
511    priority_stalks: &[f64],
512    restriction_diag: &[f64],
513    damping: f64,
514    iterations: u32,
515    out: &mut Vec<f64>,
516    scratch: &mut Vec<f64>,
517) {
518    try_diffuse_priority_across_siblings_into(
519        priority_stalks,
520        restriction_diag,
521        damping,
522        iterations,
523        out,
524        scratch,
525    )
526    .unwrap_or_else(|source| {
527        panic!(
528            "megakernel priority diffusion allocation failed: {source}. Fix: shard the priority sibling set before diffusion."
529        )
530    });
531}
532
533/// Diffuse priority signals into caller-owned storage with fallible staging.
534///
535/// # Errors
536///
537/// Returns [`BackendError`] when host staging cannot be reserved for the
538/// priority vector.
539pub fn try_diffuse_priority_across_siblings_into(
540    priority_stalks: &[f64],
541    restriction_diag: &[f64],
542    damping: f64,
543    iterations: u32,
544    out: &mut Vec<f64>,
545    scratch: &mut Vec<f64>,
546) -> Result<(), BackendError> {
547    out.clear();
548    reserve_target_capacity(out, priority_stalks.len(), "priority diffusion output")?;
549    out.extend_from_slice(priority_stalks);
550    scratch.clear();
551    if priority_stalks.len() != restriction_diag.len() {
552        return Ok(());
553    }
554    for _ in 0..iterations {
555        diffuse_step_into(out, restriction_diag, damping, scratch)?;
556        std::mem::swap(out, scratch);
557    }
558    Ok(())
559}
560
561/// Single policy surface for megakernel launch sizing and telemetry-driven routing.
562#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
563pub struct MegakernelLaunchPolicy {
564    /// Sizing policy for worker counts and grid geometry.
565    pub sizing: super::planner::MegakernelSizingPolicy,
566    /// Minimum capacity for sparse-hit results.
567    pub min_hit_capacity: u32,
568    /// Multiplier for expected hits to determine capacity.
569    pub hit_capacity_multiplier: u32,
570    /// Number of waves that define a saturated queue.
571    pub saturated_waves: u32,
572    /// Threshold for promoting hot opcodes to JIT.
573    pub hot_opcode_threshold: u32,
574    /// Threshold for promoting hot windows to JIT.
575    pub hot_window_threshold: u32,
576    /// Queue length threshold to prefer JIT over interpreter.
577    pub jit_queue_len_threshold: u32,
578    /// Priority age threshold to trigger aging promotions.
579    pub priority_age_threshold: u32,
580    /// Frontier density at or below this value uses sparse expansion.
581    pub sparse_frontier_threshold_bps: u16,
582    /// Frontier density at or above this value uses dense propagation.
583    pub dense_frontier_threshold_bps: u16,
584    /// Memory pressure at or above this value uses the memory-constrained path.
585    pub memory_pressure_threshold_bps: u16,
586    /// Minimum graph edge count before dense hot work is eligible for fusion.
587    pub fusion_edge_threshold: u32,
588    /// Conservative resident scratch bytes needed per sparse-hit entry.
589    pub scratch_bytes_per_hit: u32,
590}
591
592impl Default for MegakernelLaunchPolicy {
593    fn default() -> Self {
594        Self::standard()
595    }
596}
597
598const FRONTIER_TOPOLOGY_HYSTERESIS_BPS: u16 = 250;
599const MEMORY_TOPOLOGY_HYSTERESIS_BPS: u16 = 250;
600
601impl MegakernelLaunchPolicy {
602    /// Standard launch policy used by VYRE megakernel dispatchers.
603    #[must_use]
604    pub const fn standard() -> Self {
605        Self {
606            sizing: super::planner::MegakernelSizingPolicy::standard(),
607            min_hit_capacity: 1024,
608            hit_capacity_multiplier: 2,
609            saturated_waves: 4,
610            hot_opcode_threshold: 8,
611            hot_window_threshold: 4,
612            jit_queue_len_threshold: 4096,
613            priority_age_threshold: 32,
614            sparse_frontier_threshold_bps: 500,
615            dense_frontier_threshold_bps: 4_000,
616            memory_pressure_threshold_bps: 8_500,
617            fusion_edge_threshold: 65_536,
618            scratch_bytes_per_hit: 16,
619        }
620    }
621
622    /// Return launch recommendation cache telemetry for the current thread.
623    #[must_use]
624    pub fn launch_cache_stats() -> MegakernelLaunchCacheStats {
625        cache::LAUNCH_RECOMMENDATION_CACHE.with(|cache| cache.borrow().stats())
626    }
627
628    /// Clear launch recommendation cache entries and counters for this thread.
629    pub fn reset_launch_cache_for_thread() {
630        cache::LAUNCH_RECOMMENDATION_CACHE.with(|cache| cache.borrow_mut().clear());
631    }
632
633    /// Recommend geometry, hit capacity, and interpreter/JIT route.
634    ///
635    /// # Errors
636    ///
637    /// Returns [`BackendError`] when required adapter limits are zero or derived
638    /// launch values cannot fit the u32 ring protocol.
639    pub fn recommend(
640        &self,
641        request: MegakernelLaunchRequest,
642    ) -> Result<MegakernelLaunchRecommendation, BackendError> {
643        self.recommend_inner(request, None)
644    }
645
646    /// Recommend a launch and emit topology evidence for parity benches.
647    ///
648    /// # Errors
649    ///
650    /// Returns [`BackendError`] when the underlying recommendation cannot be
651    /// built from the request or adapter limits.
652    pub fn recommend_with_topology_evidence(
653        &self,
654        request: MegakernelLaunchRequest,
655    ) -> Result<(MegakernelLaunchRecommendation, MegakernelTopologyEvidence), BackendError> {
656        let (effective_request, recommendation) = self.recommend_with_effective_request(request)?;
657        let evidence = self.topology_evidence_for(effective_request, recommendation);
658        Ok((recommendation, evidence))
659    }
660
661    /// Recommend a launch and emit hot opcode/window promotion evidence.
662    ///
663    /// # Errors
664    ///
665    /// Returns [`BackendError`] when the underlying recommendation cannot be
666    /// built from the request or adapter limits.
667    pub fn recommend_with_promotion_evidence(
668        &self,
669        request: MegakernelLaunchRequest,
670    ) -> Result<(MegakernelLaunchRecommendation, MegakernelPromotionEvidence), BackendError> {
671        let (effective_request, recommendation) = self.recommend_with_effective_request(request)?;
672        let evidence = self.promotion_evidence_for(effective_request, recommendation);
673        Ok((recommendation, evidence))
674    }
675
676    /// Recommend a launch while preserving the previous topology inside a
677    /// narrow hysteresis band.
678    ///
679    /// CUDA resident graphs and long-running dataflow streams should use this
680    /// entry point when they can track the last successful topology. It prevents
681    /// borderline frontier-density or memory-pressure telemetry from repeatedly
682    /// switching kernel variants, invalidating launch plans, and disturbing
683    /// cache locality at scale.
684    ///
685    /// # Errors
686    ///
687    /// Returns [`BackendError`] when required adapter limits are zero or derived
688    /// launch values cannot fit the u32 ring protocol.
689    pub fn recommend_with_previous_topology(
690        &self,
691        request: MegakernelLaunchRequest,
692        previous_topology: MegakernelDispatchTopology,
693    ) -> Result<MegakernelLaunchRecommendation, BackendError> {
694        self.recommend_inner(request, Some(previous_topology))
695    }
696
697    fn recommend_inner(
698        &self,
699        request: MegakernelLaunchRequest,
700        previous_topology: Option<MegakernelDispatchTopology>,
701    ) -> Result<MegakernelLaunchRecommendation, BackendError> {
702        let cache_key = cache::LaunchRecommendationCacheKey {
703            policy: *self,
704            request,
705        };
706        if previous_topology.is_none() {
707            if let Some(cached) =
708                cache::LAUNCH_RECOMMENDATION_CACHE.with(|cache| cache.borrow_mut().get(&cache_key))
709            {
710                return Ok(cached);
711            }
712        }
713
714        let effective_request = self.infer_missing_scale_signals(request)?;
715        let promote_hot_opcodes = effective_request.hot_opcode_count >= self.hot_opcode_threshold;
716        let promote_hot_windows = effective_request.hot_window_count >= self.hot_window_threshold;
717        let raw_topology =
718            self.dispatch_topology_for(effective_request, promote_hot_opcodes, promote_hot_windows);
719        let topology = self.stabilize_topology(
720            raw_topology,
721            effective_request,
722            previous_topology,
723            promote_hot_opcodes,
724            promote_hot_windows,
725        );
726        let scheduled_request = self.apply_topology_worker_policy(effective_request, topology)?;
727        let grid = self.sizing.calculate_optimal_grid(
728            MegakernelGridRequest::new(
729                scheduled_request.queue_len,
730                scheduled_request.requested_worker_groups,
731            ),
732            MegakernelGridLimits::new(
733                scheduled_request.max_workgroup_size_x,
734                scheduled_request.max_compute_workgroups_per_dimension,
735                scheduled_request.max_compute_invocations_per_workgroup,
736            ),
737        )?;
738        let geometry = grid.geometry;
739        let worker_groups = grid.worker_groups;
740        let lanes = u64::from(geometry.dispatch_grid[0])
741            .checked_mul(u64::from(geometry.workgroup_size_x))
742            .ok_or_else(|| {
743                BackendError::new(
744                    "megakernel launch lane count overflowed u64. Fix: reduce dispatch grid or workgroup size.",
745                )
746            })?;
747        let pressure = classify_pressure(
748            effective_request.queue_len,
749            lanes,
750            effective_request.requeue_count,
751            self,
752        )?;
753        let hit_capacity = self.hit_capacity_for(effective_request)?;
754        let estimated_peak_device_bytes =
755            self.estimated_peak_device_bytes(effective_request, hit_capacity)?;
756        if effective_request.device_memory_budget_bytes != 0
757            && estimated_peak_device_bytes > effective_request.device_memory_budget_bytes
758        {
759            return Err(BackendError::DeviceOutOfMemory {
760                requested: estimated_peak_device_bytes,
761                available: effective_request.device_memory_budget_bytes,
762            });
763        }
764        let execution_mode = if effective_request.queue_len >= self.jit_queue_len_threshold
765            || promote_hot_opcodes
766            || promote_hot_windows
767            || topology == MegakernelDispatchTopology::FusedDense
768        {
769            MegakernelExecutionMode::Jit
770        } else {
771            MegakernelExecutionMode::Interpreter
772        };
773        let age_priority_work = effective_request.requeue_count > 0
774            || effective_request.max_priority_age >= self.priority_age_threshold;
775
776        let recommendation = MegakernelLaunchRecommendation {
777            geometry,
778            worker_groups,
779            hit_capacity,
780            pressure,
781            execution_mode,
782            topology,
783            promote_hot_opcodes,
784            promote_hot_windows,
785            age_priority_work,
786            estimated_peak_device_bytes,
787            device_memory_budget_bytes: effective_request.device_memory_budget_bytes,
788        };
789        if previous_topology.is_none() {
790            cache::LAUNCH_RECOMMENDATION_CACHE.with(|cache| {
791                cache.borrow_mut().insert(cache_key, recommendation);
792            });
793        }
794        Ok(recommendation)
795    }
796
797    fn recommend_with_effective_request(
798        &self,
799        request: MegakernelLaunchRequest,
800    ) -> Result<(MegakernelLaunchRequest, MegakernelLaunchRecommendation), BackendError> {
801        let effective_request = self.infer_missing_scale_signals(request)?;
802        let recommendation = self.recommend(effective_request)?;
803        Ok((effective_request, recommendation))
804    }
805
806    fn topology_evidence_for(
807        &self,
808        request: MegakernelLaunchRequest,
809        recommendation: MegakernelLaunchRecommendation,
810    ) -> MegakernelTopologyEvidence {
811        MegakernelTopologyEvidence {
812            schema_version: TOPOLOGY_EVIDENCE_SCHEMA_VERSION,
813            queue_pressure: recommendation.pressure,
814            frontier_density_bps: request.frontier_density_bps,
815            semiring_frontier_density_bps: request.frontier_density_bps,
816            selected_topology: recommendation.topology,
817            graphblas_switch_class: Self::graphblas_switch_class_for(recommendation.topology),
818            resident_device_bytes: request.resident_device_bytes,
819            estimated_peak_device_bytes: recommendation.estimated_peak_device_bytes,
820            output_parity_required: true,
821        }
822    }
823
824    fn promotion_evidence_for(
825        &self,
826        request: MegakernelLaunchRequest,
827        recommendation: MegakernelLaunchRecommendation,
828    ) -> MegakernelPromotionEvidence {
829        MegakernelPromotionEvidence {
830            schema_version: HOT_WINDOW_PROMOTION_EVIDENCE_SCHEMA_VERSION,
831            queue_len: request.queue_len,
832            jit_queue_len_threshold: self.jit_queue_len_threshold,
833            hot_opcode_count: request.hot_opcode_count,
834            hot_opcode_threshold: self.hot_opcode_threshold,
835            hot_window_count: request.hot_window_count,
836            hot_window_threshold: self.hot_window_threshold,
837            execution_mode: recommendation.execution_mode,
838            promote_hot_opcodes: recommendation.promote_hot_opcodes,
839            promote_hot_windows: recommendation.promote_hot_windows,
840            promotion_route: Self::promotion_route_for(recommendation),
841            fused_descriptor_window_required: recommendation.promote_hot_windows,
842            output_parity_required: true,
843        }
844    }
845
846    fn promotion_route_for(
847        recommendation: MegakernelLaunchRecommendation,
848    ) -> MegakernelPromotionRoute {
849        if recommendation.execution_mode == MegakernelExecutionMode::Interpreter {
850            return MegakernelPromotionRoute::Interpreter;
851        }
852        match (
853            recommendation.promote_hot_opcodes,
854            recommendation.promote_hot_windows,
855        ) {
856            (true, true) => MegakernelPromotionRoute::OpcodeAndWindowJit,
857            (true, false) => MegakernelPromotionRoute::OpcodeJit,
858            (false, true) => MegakernelPromotionRoute::WindowJit,
859            (false, false) => MegakernelPromotionRoute::QueueJit,
860        }
861    }
862
863    fn graphblas_switch_class_for(
864        topology: MegakernelDispatchTopology,
865    ) -> MegakernelGraphBlasSwitchClass {
866        match topology {
867            MegakernelDispatchTopology::Empty => MegakernelGraphBlasSwitchClass::Empty,
868            MegakernelDispatchTopology::SparseFrontier => MegakernelGraphBlasSwitchClass::Sparse,
869            MegakernelDispatchTopology::HybridFrontier => MegakernelGraphBlasSwitchClass::Hybrid,
870            MegakernelDispatchTopology::DenseFrontier
871            | MegakernelDispatchTopology::FusedDense => MegakernelGraphBlasSwitchClass::Dense,
872            MegakernelDispatchTopology::MemoryConstrained => {
873                MegakernelGraphBlasSwitchClass::MemoryConstrained
874            }
875        }
876    }
877
878    fn hit_capacity_for(&self, request: MegakernelLaunchRequest) -> Result<u32, BackendError> {
879        if request.requested_hit_capacity != 0 {
880            return Ok(request.requested_hit_capacity);
881        }
882        let expected_hits = request.expected_hits_per_item.max(1);
883        let multiplier = if request.memory_pressure_bps >= self.memory_pressure_threshold_bps {
884            1
885        } else {
886            self.hit_capacity_multiplier
887        };
888        let derived = request
889            .queue_len
890            .checked_mul(expected_hits)
891            .and_then(|value| value.checked_mul(multiplier))
892            .ok_or_else(|| {
893                BackendError::new(
894                    "megakernel sparse-hit capacity overflowed u32. Fix: lower queue length, expected_hits_per_item, or hit_capacity_multiplier.",
895                )
896            })?;
897        Ok(derived.max(self.min_hit_capacity))
898    }
899
900    fn estimated_peak_device_bytes(
901        &self,
902        request: MegakernelLaunchRequest,
903        hit_capacity: u32,
904    ) -> Result<u64, BackendError> {
905        let scratch_bytes = u64::from(hit_capacity)
906            .checked_mul(u64::from(self.scratch_bytes_per_hit))
907            .ok_or_else(|| {
908                BackendError::new(
909                    "megakernel scratch byte estimate overflowed u64. Fix: lower hit capacity or scratch_bytes_per_hit.",
910                )
911            })?;
912        request
913            .resident_device_bytes
914            .checked_add(scratch_bytes)
915            .ok_or_else(|| {
916                BackendError::new(
917                    "megakernel peak resident byte estimate overflowed u64. Fix: reduce resident buffers or scratch capacity.",
918                )
919            })
920    }
921
922    fn infer_missing_scale_signals(
923        &self,
924        mut request: MegakernelLaunchRequest,
925    ) -> Result<MegakernelLaunchRequest, BackendError> {
926        if request.frontier_density_bps == 0
927            && request.queue_len != 0
928            && request.graph_node_count != 0
929        {
930            let active_nodes = u64::from(request.queue_len.min(request.graph_node_count));
931            let density = active_nodes
932                .checked_mul(10_000)
933                .ok_or_else(|| {
934                    BackendError::new(
935                        "megakernel frontier-density numerator overflowed u64. Fix: shard the resident graph before launch.",
936                    )
937                })?
938                .checked_div(u64::from(request.graph_node_count))
939                .unwrap_or(0)
940                .clamp(1, 10_000);
941            request.frontier_density_bps = u16::try_from(density).map_err(|error| {
942                BackendError::new(format!(
943                    "megakernel frontier density cannot fit u16: {error}. Fix: clamp density before ABI encoding."
944                ))
945            })?;
946        }
947        if request.memory_pressure_bps == 0
948            && request.device_memory_budget_bytes != 0
949            && request.resident_device_bytes != 0
950        {
951            let pressure = (u128::from(request.resident_device_bytes)
952                .checked_mul(10_000)
953                .ok_or_else(|| {
954                    BackendError::new(
955                        "megakernel memory-pressure numerator overflowed u128. Fix: reduce resident device bytes before launch.",
956                    )
957                })?
958                / u128::from(request.device_memory_budget_bytes))
959            .min(10_000);
960            request.memory_pressure_bps = u16::try_from(pressure).map_err(|error| {
961                BackendError::new(format!(
962                    "megakernel memory pressure cannot fit u16: {error}. Fix: clamp pressure before ABI encoding."
963                ))
964            })?;
965        }
966        Ok(request)
967    }
968
969    fn apply_topology_worker_policy(
970        &self,
971        mut request: MegakernelLaunchRequest,
972        topology: MegakernelDispatchTopology,
973    ) -> Result<MegakernelLaunchRequest, BackendError> {
974        if topology == MegakernelDispatchTopology::MemoryConstrained
975            && request.memory_pressure_bps != 0
976            && request.requested_worker_groups > 1
977        {
978            let pressure_span = u32::from(
979                10_000_u16
980                    .checked_sub(self.memory_pressure_threshold_bps)
981                    .ok_or_else(|| {
982                        BackendError::new(
983                            "megakernel memory-pressure threshold exceeds 10000 bps. Fix: configure threshold in basis points.",
984                        )
985                    })?,
986            )
987            .max(1);
988            let over_threshold = u32::from(
989                match request
990                    .memory_pressure_bps
991                    .checked_sub(self.memory_pressure_threshold_bps)
992                {
993                    Some(value) => value,
994                    None => 0,
995                },
996            )
997            .min(pressure_span);
998            let shed_bps = 2_500_u32
999                .checked_add(
1000                    over_threshold
1001                        .checked_mul(2_500)
1002                        .ok_or_else(|| {
1003                            BackendError::new(
1004                                "megakernel memory-pressure worker shed overflowed u32. Fix: lower pressure telemetry before launch.",
1005                            )
1006                        })?
1007                        / pressure_span,
1008                )
1009                .ok_or_else(|| {
1010                    BackendError::new(
1011                        "megakernel memory-pressure worker shed overflowed u32. Fix: lower pressure telemetry before launch.",
1012                    )
1013                })?;
1014            let keep_bps = 10_000_u32.checked_sub(shed_bps).ok_or_else(|| {
1015                BackendError::new(
1016                    "megakernel memory-pressure worker keep ratio underflowed. Fix: keep shed_bps within 0..=10000.",
1017                )
1018            })?;
1019            let scaled = u64::from(request.requested_worker_groups)
1020                .checked_mul(u64::from(keep_bps))
1021                .ok_or_else(|| {
1022                    BackendError::new(
1023                        "megakernel memory-constrained worker count overflowed u64. Fix: reduce requested worker groups.",
1024                    )
1025                })?
1026                / 10_000;
1027            request.requested_worker_groups = u32::try_from(scaled)
1028                .map_err(|error| {
1029                    BackendError::new(format!(
1030                        "megakernel memory-constrained worker count cannot fit u32: {error}. Fix: reduce requested worker groups."
1031                    ))
1032                })?
1033                .max(1);
1034        }
1035        if topology == MegakernelDispatchTopology::SparseFrontier
1036            && request.graph_node_count != 0
1037            && request.frontier_density_bps != 0
1038            && request.requested_worker_groups > 1
1039        {
1040            let sparse_span = u32::from(self.sparse_frontier_threshold_bps).max(1);
1041            let density = u32::from(request.frontier_density_bps).clamp(1, sparse_span);
1042            let scaled = u64::from(request.requested_worker_groups)
1043                .checked_mul(u64::from(density))
1044                .ok_or_else(|| {
1045                    BackendError::new(
1046                        "megakernel sparse-frontier worker count overflowed u64. Fix: reduce requested worker groups.",
1047                    )
1048                })?
1049                / u64::from(sparse_span);
1050            let warp_floor = request.requested_worker_groups.min(32);
1051            request.requested_worker_groups = u32::try_from(scaled)
1052                .map_err(|error| {
1053                    BackendError::new(format!(
1054                        "megakernel sparse-frontier worker count cannot fit u32: {error}. Fix: reduce requested worker groups."
1055                    ))
1056                })?
1057                .max(warp_floor)
1058                .min(request.requested_worker_groups);
1059        }
1060        Ok(request)
1061    }
1062
1063    fn dispatch_topology_for(
1064        &self,
1065        request: MegakernelLaunchRequest,
1066        promote_hot_opcodes: bool,
1067        promote_hot_windows: bool,
1068    ) -> MegakernelDispatchTopology {
1069        if request.queue_len == 0 {
1070            return MegakernelDispatchTopology::Empty;
1071        }
1072        if request.memory_pressure_bps >= self.memory_pressure_threshold_bps {
1073            return MegakernelDispatchTopology::MemoryConstrained;
1074        }
1075        if request.frontier_density_bps <= self.sparse_frontier_threshold_bps {
1076            return MegakernelDispatchTopology::SparseFrontier;
1077        }
1078        let dense = request.frontier_density_bps >= self.dense_frontier_threshold_bps;
1079        let graph_is_large =
1080            request.graph_node_count > 0 && request.graph_edge_count >= self.fusion_edge_threshold;
1081        if dense && graph_is_large && (promote_hot_opcodes || promote_hot_windows) {
1082            return MegakernelDispatchTopology::FusedDense;
1083        }
1084        if dense {
1085            return MegakernelDispatchTopology::DenseFrontier;
1086        }
1087        MegakernelDispatchTopology::HybridFrontier
1088    }
1089
1090    fn stabilize_topology(
1091        &self,
1092        raw_topology: MegakernelDispatchTopology,
1093        request: MegakernelLaunchRequest,
1094        previous_topology: Option<MegakernelDispatchTopology>,
1095        promote_hot_opcodes: bool,
1096        promote_hot_windows: bool,
1097    ) -> MegakernelDispatchTopology {
1098        if raw_topology == MegakernelDispatchTopology::Empty {
1099            return raw_topology;
1100        }
1101        if raw_topology == MegakernelDispatchTopology::MemoryConstrained {
1102            return raw_topology;
1103        }
1104        let Some(previous_topology) = previous_topology else {
1105            return raw_topology;
1106        };
1107        if previous_topology == MegakernelDispatchTopology::MemoryConstrained
1108            && request.memory_pressure_bps
1109                >= hysteresis_sub(
1110                    self.memory_pressure_threshold_bps,
1111                    MEMORY_TOPOLOGY_HYSTERESIS_BPS,
1112                )
1113        {
1114            return MegakernelDispatchTopology::MemoryConstrained;
1115        }
1116
1117        match previous_topology {
1118            MegakernelDispatchTopology::SparseFrontier
1119                if raw_topology != MegakernelDispatchTopology::SparseFrontier
1120                    && request.frontier_density_bps
1121                        <= hysteresis_add(
1122                            self.sparse_frontier_threshold_bps,
1123                            FRONTIER_TOPOLOGY_HYSTERESIS_BPS,
1124                        ) =>
1125            {
1126                MegakernelDispatchTopology::SparseFrontier
1127            }
1128            MegakernelDispatchTopology::HybridFrontier
1129                if raw_topology == MegakernelDispatchTopology::SparseFrontier
1130                    && request.frontier_density_bps
1131                        >= hysteresis_sub(
1132                            self.sparse_frontier_threshold_bps,
1133                            FRONTIER_TOPOLOGY_HYSTERESIS_BPS,
1134                        ) =>
1135            {
1136                MegakernelDispatchTopology::HybridFrontier
1137            }
1138            MegakernelDispatchTopology::HybridFrontier
1139                if matches!(
1140                    raw_topology,
1141                    MegakernelDispatchTopology::DenseFrontier
1142                        | MegakernelDispatchTopology::FusedDense
1143                ) && request.frontier_density_bps
1144                    <= hysteresis_add(
1145                        self.dense_frontier_threshold_bps,
1146                        FRONTIER_TOPOLOGY_HYSTERESIS_BPS,
1147                    ) =>
1148            {
1149                MegakernelDispatchTopology::HybridFrontier
1150            }
1151            MegakernelDispatchTopology::DenseFrontier
1152                if raw_topology == MegakernelDispatchTopology::HybridFrontier
1153                    && request.frontier_density_bps
1154                        >= hysteresis_sub(
1155                            self.dense_frontier_threshold_bps,
1156                            FRONTIER_TOPOLOGY_HYSTERESIS_BPS,
1157                        ) =>
1158            {
1159                MegakernelDispatchTopology::DenseFrontier
1160            }
1161            MegakernelDispatchTopology::FusedDense
1162                if raw_topology == MegakernelDispatchTopology::HybridFrontier
1163                    && request.frontier_density_bps
1164                        >= hysteresis_sub(
1165                            self.dense_frontier_threshold_bps,
1166                            FRONTIER_TOPOLOGY_HYSTERESIS_BPS,
1167                        )
1168                    && request.graph_edge_count >= self.fusion_edge_threshold
1169                    && (promote_hot_opcodes || promote_hot_windows) =>
1170            {
1171                MegakernelDispatchTopology::FusedDense
1172            }
1173            _ => raw_topology,
1174        }
1175    }
1176
1177    /// Select the best `hit_capacity_multiplier` from a candidate set.
1178    ///
1179    /// `candidate_multipliers` are the multipliers to try; `costs[i]`
1180    /// is the observed dispatch latency (or any minimization metric)
1181    /// when `candidate_multipliers[i]` was used. Lower cost wins; the
1182    /// minimum observed cost selects the multiplier.
1183    ///
1184    /// Returns the chosen multiplier. If `candidate_multipliers` is
1185    /// empty, returns the policy's existing `hit_capacity_multiplier`.
1186    ///
1187    #[must_use]
1188    pub fn autotune_hit_capacity_multiplier(
1189        &self,
1190        candidate_multipliers: &[u32],
1191        costs: &[f64],
1192    ) -> u32 {
1193        if candidate_multipliers.is_empty() || costs.is_empty() {
1194            return self.hit_capacity_multiplier;
1195        }
1196        let n = candidate_multipliers.len().min(costs.len());
1197        let chosen = best_cost_index(&costs[..n]);
1198        candidate_multipliers
1199            .get(chosen)
1200            .copied()
1201            .unwrap_or(self.hit_capacity_multiplier)
1202    }
1203
1204    /// Select the best workgroup-size from a candidate set.
1205    ///
1206    /// `candidate_sizes[i]` is paired
1207    /// with `costs[i]` (lower is better). Returns the chosen size or
1208    /// the policy's `sizing.default_workgroup_size_x()` fallback.
1209    #[must_use]
1210    pub fn autotune_workgroup_size(
1211        &self,
1212        candidate_sizes: &[u32],
1213        costs: &[f64],
1214        current_size: u32,
1215    ) -> u32 {
1216        if candidate_sizes.is_empty() || costs.is_empty() {
1217            return current_size;
1218        }
1219        let n = candidate_sizes.len().min(costs.len());
1220        let chosen = best_cost_index(&costs[..n]);
1221        candidate_sizes.get(chosen).copied().unwrap_or(current_size)
1222    }
1223
1224    /// Compute the next-step parameter delta for a continuous autotune
1225    /// knob using a Fisher-preconditioned natural-gradient step.
1226    ///
1227    /// `m_inv_sqrt`: inverse-square-root of the Fisher block (n×n
1228    /// row-major). Passing an identity matrix reduces the natural
1229    /// gradient to plain gradient descent.
1230    ///
1231    /// `grad`: plain gradient ∂latency/∂param (length n).
1232    ///
1233    /// Returns the parameter delta `-lr · M_inv_sqrt · grad`.
1234    ///
1235    /// P-DRIVER-8: every continuous autotune knob (workgroup size,
1236    /// hit-capacity, fixpoint iteration count, …) should follow the
1237    /// natural-gradient direction by default  -  Fisher-preconditioned
1238    /// descent converges 5-10× faster than plain gradient on the
1239    /// elongated-valley latency surfaces typical of GPU autotuning.
1240    #[must_use]
1241    #[cfg(any(test, feature = "legacy-infallible"))]
1242    pub fn natural_gradient_autotune_step(
1243        m_inv_sqrt: &[f64],
1244        grad: &[f64],
1245        n: u32,
1246        learning_rate: f64,
1247    ) -> Vec<f64> {
1248        Self::try_natural_gradient_autotune_step(m_inv_sqrt, grad, n, learning_rate)
1249            .unwrap_or_else(|source| {
1250                panic!(
1251                    "megakernel natural-gradient autotune allocation failed: {source}. Fix: shard the autotune surface."
1252                )
1253            })
1254    }
1255
1256    /// Compute the next-step parameter delta with fallible output staging.
1257    ///
1258    /// # Errors
1259    ///
1260    /// Returns [`BackendError`] when host staging cannot be reserved for the
1261    /// natural-gradient vector.
1262    pub fn try_natural_gradient_autotune_step(
1263        m_inv_sqrt: &[f64],
1264        grad: &[f64],
1265        n: u32,
1266        learning_rate: f64,
1267    ) -> Result<Vec<f64>, BackendError> {
1268        let mut out = Vec::new();
1269        Self::try_natural_gradient_autotune_step_into(
1270            m_inv_sqrt,
1271            grad,
1272            n,
1273            learning_rate,
1274            &mut out,
1275        )?;
1276        Ok(out)
1277    }
1278
1279    /// Compute the natural-gradient autotune step into caller-owned storage.
1280    #[cfg(any(test, feature = "legacy-infallible"))]
1281    pub fn natural_gradient_autotune_step_into(
1282        m_inv_sqrt: &[f64],
1283        grad: &[f64],
1284        n: u32,
1285        learning_rate: f64,
1286        out: &mut Vec<f64>,
1287    ) {
1288        Self::try_natural_gradient_autotune_step_into(m_inv_sqrt, grad, n, learning_rate, out)
1289            .unwrap_or_else(|source| {
1290                panic!(
1291                    "megakernel natural-gradient autotune allocation failed: {source}. Fix: shard the autotune surface."
1292                )
1293            });
1294    }
1295
1296    /// Compute the natural-gradient autotune step into caller-owned storage
1297    /// with fallible host staging.
1298    ///
1299    /// # Errors
1300    ///
1301    /// Returns [`BackendError`] when host staging cannot be reserved for the
1302    /// natural-gradient vector.
1303    pub fn try_natural_gradient_autotune_step_into(
1304        m_inv_sqrt: &[f64],
1305        grad: &[f64],
1306        n: u32,
1307        learning_rate: f64,
1308        out: &mut Vec<f64>,
1309    ) -> Result<(), BackendError> {
1310        let n = u32_to_usize_checked(n, "natural-gradient dimension")?;
1311        out.clear();
1312        let Some(required_matrix_len) = n.checked_mul(n) else {
1313            return Ok(());
1314        };
1315        if m_inv_sqrt.len() < required_matrix_len || grad.len() < n {
1316            return Ok(());
1317        }
1318        reserve_target_capacity(out, n, "natural-gradient output")?;
1319        out.resize(n, 0.0);
1320        for row in 0..n {
1321            let mut acc = 0.0;
1322            for col in 0..n {
1323                acc += m_inv_sqrt[row * n + col] * grad[col];
1324            }
1325            out[row] = -learning_rate * acc;
1326        }
1327        Ok(())
1328    }
1329}
1330
1331fn diffuse_step_into(
1332    stalks: &[f64],
1333    restriction_diag: &[f64],
1334    damping: f64,
1335    out: &mut Vec<f64>,
1336) -> Result<(), BackendError> {
1337    out.clear();
1338    reserve_target_capacity(out, stalks.len(), "priority diffusion scratch")?;
1339    out.resize(stalks.len(), 0.0);
1340    for ((slot, &stalk), &restriction) in out
1341        .iter_mut()
1342        .zip(stalks.iter())
1343        .zip(restriction_diag.iter())
1344    {
1345        *slot = stalk - damping * restriction * stalk;
1346    }
1347    Ok(())
1348}
1349
1350fn reserve_target_capacity<T>(
1351    out: &mut Vec<T>,
1352    target_capacity: usize,
1353    label: &'static str,
1354) -> Result<(), BackendError> {
1355    try_reserve_vec_capacity(out, target_capacity).map_err(|source| {
1356        BackendError::new(format!(
1357            "megakernel {label} reservation failed for {target_capacity} element(s): {source}. Fix: shard the policy input before launch-policy math."
1358        ))
1359    })
1360}
1361
1362fn best_cost_index(costs: &[f64]) -> usize {
1363    debug_assert!(!costs.is_empty());
1364    let mut best = 0;
1365    let mut best_cost = costs[0];
1366    for (index, &cost) in costs.iter().enumerate().skip(1) {
1367        if cost.total_cmp(&best_cost).is_lt() {
1368            best = index;
1369            best_cost = cost;
1370        }
1371    }
1372    best
1373}
1374
1375fn u32_to_usize_checked(value: u32, label: &'static str) -> Result<usize, BackendError> {
1376    usize::try_from(value).map_err(|error| {
1377        BackendError::new(format!(
1378            "{label} cannot fit usize: {error}. Fix: shard the autotune surface."
1379        ))
1380    })
1381}
1382
1383fn hysteresis_add(value: u16, hysteresis: u16) -> u16 {
1384    value.saturating_add(hysteresis)
1385}
1386
1387fn hysteresis_sub(value: u16, hysteresis: u16) -> u16 {
1388    value.saturating_sub(hysteresis)
1389}
1390
1391fn classify_pressure(
1392    queue_len: u32,
1393    lanes: u64,
1394    requeue_count: u64,
1395    policy: &MegakernelLaunchPolicy,
1396) -> Result<MegakernelQueuePressure, BackendError> {
1397    if queue_len == 0 {
1398        return Ok(MegakernelQueuePressure::Empty);
1399    }
1400    let lanes = lanes.max(1);
1401    let queue_len = u64::from(queue_len);
1402    let saturated_lanes = lanes
1403        .checked_mul(u64::from(policy.saturated_waves))
1404        .ok_or_else(|| {
1405            BackendError::new(
1406                "megakernel pressure wave threshold overflowed u64. Fix: reduce worker lanes or saturated_waves.",
1407            )
1408        })?;
1409    if requeue_count > 0 || queue_len >= saturated_lanes {
1410        Ok(MegakernelQueuePressure::Saturated)
1411    } else if queue_len >= lanes {
1412        Ok(MegakernelQueuePressure::Balanced)
1413    } else {
1414        Ok(MegakernelQueuePressure::Light)
1415    }
1416}
1417
1418#[cfg(test)]
1419mod tests;