use super::slot;
use rustc_hash::FxHashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RingStatus {
Empty,
Published,
Claimed,
Done,
WaitIo,
Yield,
Requeue,
Fault,
Unknown(u32),
}
impl RingStatus {
#[must_use]
pub(super) fn from_raw(raw: u32) -> Self {
match raw {
slot::EMPTY => Self::Empty,
slot::PUBLISHED => Self::Published,
slot::CLAIMED => Self::Claimed,
slot::DONE => Self::Done,
slot::WAIT_IO => Self::WaitIo,
slot::YIELD => Self::Yield,
slot::REQUEUE => Self::Requeue,
slot::FAULT => Self::Fault,
other => Self::Unknown(other),
}
}
#[must_use]
pub const fn raw(self) -> u32 {
match self {
Self::Empty => slot::EMPTY,
Self::Published => slot::PUBLISHED,
Self::Claimed => slot::CLAIMED,
Self::Done => slot::DONE,
Self::WaitIo => slot::WAIT_IO,
Self::Yield => slot::YIELD,
Self::Requeue => slot::REQUEUE,
Self::Fault => slot::FAULT,
Self::Unknown(raw) => raw,
}
}
#[must_use]
pub const fn is_active(self) -> bool {
matches!(
self,
Self::Published | Self::Claimed | Self::WaitIo | Self::Yield | Self::Requeue
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RingSlotSnapshot {
pub slot_idx: u32,
pub status: RingStatus,
pub tenant_id: u32,
pub opcode: u32,
pub args_prefix: [u32; 3],
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WindowTelemetry {
pub ticket: u32,
pub tenant_id: u32,
pub opcode: u32,
pub required_slots: u32,
pub lookahead_slots: u32,
pub published: u32,
pub claimed: u32,
pub done: u32,
pub wait_io: u32,
pub yield_count: u32,
pub requeue: u32,
pub fault: u32,
}
impl WindowTelemetry {
#[must_use]
pub const fn is_active(&self) -> bool {
self.published > 0
|| self.claimed > 0
|| self.wait_io > 0
|| self.yield_count > 0
|| self.requeue > 0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct RingOccupancy {
pub empty: u32,
pub published: u32,
pub claimed: u32,
pub done: u32,
pub wait_io: u32,
pub yield_count: u32,
pub requeue: u32,
pub fault: u32,
pub unknown: u32,
}
impl RingOccupancy {
#[must_use]
pub fn total_slots(&self) -> u32 {
checked_status_sum(
[
self.empty,
self.published,
self.claimed,
self.done,
self.wait_io,
self.yield_count,
self.requeue,
self.fault,
self.unknown,
],
"total ring slots",
)
}
#[must_use]
pub fn queue_depth(&self) -> u32 {
checked_status_sum(
[
self.published,
self.claimed,
self.wait_io,
self.yield_count,
self.requeue,
self.fault,
self.unknown,
],
"ring queue depth",
)
}
}
pub const RUNTIME_IO_EVIDENCE_SCHEMA_VERSION: u32 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RuntimeEvidenceMetricFamily {
Ring,
Control,
Copy,
Residency,
}
impl RuntimeEvidenceMetricFamily {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Ring => "ring",
Self::Control => "control",
Self::Copy => "copy",
Self::Residency => "residency",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct RuntimeEvidenceMetricCoverage {
pub ring: bool,
pub control: bool,
pub copy: bool,
pub residency: bool,
}
impl RuntimeEvidenceMetricCoverage {
#[must_use]
pub const fn complete() -> Self {
Self {
ring: true,
control: true,
copy: true,
residency: true,
}
}
#[must_use]
pub fn missing_families(self) -> Vec<RuntimeEvidenceMetricFamily> {
let mut missing = Vec::new();
if !self.ring {
missing.push(RuntimeEvidenceMetricFamily::Ring);
}
if !self.control {
missing.push(RuntimeEvidenceMetricFamily::Control);
}
if !self.copy {
missing.push(RuntimeEvidenceMetricFamily::Copy);
}
if !self.residency {
missing.push(RuntimeEvidenceMetricFamily::Residency);
}
missing
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MegakernelRuntimeEvidence {
pub schema_version: u32,
pub resident_device_bytes: u64,
pub host_copy_bytes: u64,
pub host_copy_avoided_bytes: u64,
pub ring_occupancy: RingOccupancy,
pub control_decode_ns: u64,
pub ring_decode_ns: u64,
pub coverage: RuntimeEvidenceMetricCoverage,
}
impl MegakernelRuntimeEvidence {
#[must_use]
pub const fn complete(
resident_device_bytes: u64,
host_copy_bytes: u64,
host_copy_avoided_bytes: u64,
ring_occupancy: RingOccupancy,
control_decode_ns: u64,
ring_decode_ns: u64,
) -> Self {
Self {
schema_version: RUNTIME_IO_EVIDENCE_SCHEMA_VERSION,
resident_device_bytes,
host_copy_bytes,
host_copy_avoided_bytes,
ring_occupancy,
control_decode_ns,
ring_decode_ns,
coverage: RuntimeEvidenceMetricCoverage::complete(),
}
}
#[must_use]
pub fn missing_metric_families(&self) -> Vec<RuntimeEvidenceMetricFamily> {
self.coverage.missing_families()
}
#[must_use]
pub fn is_complete(&self) -> bool {
self.schema_version == RUNTIME_IO_EVIDENCE_SCHEMA_VERSION
&& self.missing_metric_families().is_empty()
}
#[must_use]
pub fn host_copy_avoidance_bps(&self) -> u16 {
let total = u128::from(self.host_copy_bytes)
.saturating_add(u128::from(self.host_copy_avoided_bytes));
if total == 0 {
return 0;
}
let bps = u128::from(self.host_copy_avoided_bytes)
.saturating_mul(10_000)
/ total;
bps.min(10_000) as u16
}
}
fn checked_status_sum<const N: usize>(values: [u32; N], label: &'static str) -> u32 {
let _ = label;
values
.into_iter()
.fold(0_u32, |acc, value| acc.saturating_add(value))
}
#[cfg(test)]
mod evidence_tests {
use super::*;
#[test]
fn runtime_evidence_reports_missing_metric_families() {
let evidence = MegakernelRuntimeEvidence {
schema_version: RUNTIME_IO_EVIDENCE_SCHEMA_VERSION,
resident_device_bytes: 0,
host_copy_bytes: 0,
host_copy_avoided_bytes: 0,
ring_occupancy: RingOccupancy::default(),
control_decode_ns: 0,
ring_decode_ns: 0,
coverage: RuntimeEvidenceMetricCoverage {
ring: true,
control: false,
copy: true,
residency: false,
},
};
let missing = evidence
.missing_metric_families()
.into_iter()
.map(RuntimeEvidenceMetricFamily::as_str)
.collect::<Vec<_>>();
assert_eq!(missing, vec!["control", "residency"]);
assert!(!evidence.is_complete());
}
#[test]
fn runtime_evidence_records_copy_avoidance_and_occupancy() {
let evidence = MegakernelRuntimeEvidence::complete(
4096,
1024,
3072,
RingOccupancy {
empty: 1,
published: 2,
claimed: 3,
done: 4,
wait_io: 5,
yield_count: 6,
requeue: 7,
fault: 8,
unknown: 9,
},
11,
13,
);
assert!(evidence.is_complete());
assert_eq!(evidence.resident_device_bytes, 4096);
assert_eq!(evidence.ring_occupancy.total_slots(), 45);
assert_eq!(evidence.ring_occupancy.queue_depth(), 40);
assert_eq!(evidence.host_copy_avoidance_bps(), 7500);
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ControlSnapshot {
pub shutdown: bool,
pub done_count: u32,
pub epoch: u32,
pub metrics: Vec<(u32, u32)>,
pub tenant_fairness: Vec<u32>,
pub priority_fairness: Vec<u32>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MegakernelRuntimeCounters {
pub total_slots: u32,
pub queue_depth: u32,
pub gpu_idle_slots: u32,
pub gpu_idle_ppm: u32,
pub frontier_density_bps: u16,
pub occupancy_proxy_bps: u16,
pub drained_slots: u32,
pub unreclaimed_done_slots: u32,
pub tenant_fairness_total: u64,
pub tenant_fairness_skew: u32,
pub priority_fairness_total: u64,
pub requeue_slots: u32,
pub fault_slots: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MegakernelWatchdogSnapshot {
pub done_delta: u32,
pub queue_depth: u32,
pub fault_slots: u32,
pub requeue_slots: u32,
pub gpu_idle_ppm: u32,
pub suspected_stall: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct RingTelemetry {
pub control: ControlSnapshot,
pub occupancy: RingOccupancy,
pub slots: Vec<RingSlotSnapshot>,
pub windows: Vec<WindowTelemetry>,
}
pub const TELEMETRY_DECODE_CAPACITY_SCHEMA_VERSION: u32 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TelemetryDecodeCapacityEvidence {
pub schema_version: u32,
pub decoded_slot_count: usize,
pub slot_output_capacity: usize,
pub decoded_window_count: usize,
pub window_output_capacity: usize,
pub window_opcode_scratch_capacity: usize,
pub window_accumulator_scratch_capacity: usize,
pub uses_caller_owned_scratch: bool,
}
impl TelemetryDecodeCapacityEvidence {
#[must_use]
pub fn is_complete(self) -> bool {
self.schema_version == TELEMETRY_DECODE_CAPACITY_SCHEMA_VERSION
&& self.uses_caller_owned_scratch
&& self.slot_output_capacity >= self.decoded_slot_count
&& self.window_output_capacity >= self.decoded_window_count
&& self.window_accumulator_scratch_capacity >= self.decoded_window_count
}
}
impl RingTelemetry {
#[must_use]
pub fn decode_capacity_evidence(
&self,
scratch: &TelemetryDecodeScratch,
) -> TelemetryDecodeCapacityEvidence {
TelemetryDecodeCapacityEvidence {
schema_version: TELEMETRY_DECODE_CAPACITY_SCHEMA_VERSION,
decoded_slot_count: self.slots.len(),
slot_output_capacity: self.slots.capacity(),
decoded_window_count: self.windows.len(),
window_output_capacity: self.windows.capacity(),
window_opcode_scratch_capacity: scratch.window_opcodes.capacity(),
window_accumulator_scratch_capacity: scratch.windows.capacity(),
uses_caller_owned_scratch: true,
}
}
}
#[derive(Debug, Default)]
pub struct TelemetryDecodeScratch {
pub(super) window_opcodes: Vec<u32>,
pub(super) windows: FxHashMap<(u32, u32), WindowAccumulator>,
}
impl TelemetryDecodeScratch {
#[must_use]
pub fn new() -> Self {
Self {
window_opcodes: Vec::new(),
windows: FxHashMap::default(),
}
}
pub fn clear(&mut self) {
self.window_opcodes.clear();
self.windows.clear();
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub(super) struct WindowAccumulator {
pub(super) tenant_id: u32,
pub(super) opcode: u32,
pub(super) required_slots: u32,
pub(super) lookahead_slots: u32,
pub(super) published: u32,
pub(super) claimed: u32,
pub(super) done: u32,
pub(super) wait_io: u32,
pub(super) yield_count: u32,
pub(super) requeue: u32,
pub(super) fault: u32,
}