use crate::PipelineError;
use vyre_driver::backend::{OutputBuffers, Resource};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MegakernelDispatchStats {
pub input_bytes: u64,
pub output_bytes: u64,
pub readback_bytes: u64,
pub bytes_moved: u64,
pub device_allocation_bytes: u64,
pub device_allocation_events: u32,
pub latency_ns: u64,
pub output_buffers: u32,
pub resident_resource_rows: u32,
pub resident_resource_handles: u32,
pub kernel_launches: u32,
pub sync_points: u32,
pub recovered_after_device_loss: bool,
}
impl MegakernelDispatchStats {
#[must_use]
pub fn output_bytes_per_second(&self) -> u64 {
bytes_per_second_or_panic(self.output_bytes, self.latency_ns, "output bytes")
}
#[must_use]
pub fn readback_bytes_per_second(&self) -> u64 {
bytes_per_second_or_panic(self.readback_bytes, self.latency_ns, "readback bytes")
}
#[must_use]
pub fn bytes_moved_per_second(&self) -> u64 {
bytes_per_second_or_panic(self.bytes_moved, self.latency_ns, "moved bytes")
}
#[must_use]
pub fn device_allocation_bytes_per_second(&self) -> u64 {
bytes_per_second_or_panic(
self.device_allocation_bytes,
self.latency_ns,
"device allocation bytes",
)
}
}
fn bytes_per_second_or_panic(bytes: u64, latency_ns: u64, _label: &'static str) -> u64 {
if latency_ns == 0 {
return 0;
}
let scaled = (bytes as u128) * 1_000_000_000u128;
let rate = scaled / u128::from(latency_ns);
rate.min(u128::from(u64::MAX)) as u64
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MegakernelDispatchOutput {
pub buffers: Vec<Vec<u8>>,
pub stats: MegakernelDispatchStats,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MegakernelBatchDispatchOutput {
pub batches: Vec<Vec<Vec<u8>>>,
pub stats: MegakernelDispatchStats,
}
#[derive(Debug, Default)]
pub struct MegakernelResidentBatchScratch {
pub(super) resources: Vec<[Resource; 4]>,
pub(super) batches: Vec<OutputBuffers>,
pub(super) active_batches: usize,
}
impl MegakernelResidentBatchScratch {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_capacity(batch_count: usize, output_slots_per_batch: usize) -> Self {
match Self::try_with_capacity(batch_count, output_slots_per_batch) {
Ok(scratch) => scratch,
Err(_error) => Self::default(),
}
}
pub fn try_with_capacity(
batch_count: usize,
output_slots_per_batch: usize,
) -> Result<Self, PipelineError> {
let mut resources = Vec::new();
vyre_foundation::allocation::try_reserve_vec_to_capacity(&mut resources, batch_count)
.map_err(|error| {
PipelineError::Backend(format!(
"megakernel resident batch scratch could not reserve {batch_count} resource row(s): {error}. Fix: split persistent-handle batches before dispatch."
))
})?;
let mut batches = Vec::new();
vyre_foundation::allocation::try_reserve_vec_to_capacity(&mut batches, batch_count)
.map_err(|error| {
PipelineError::Backend(format!(
"megakernel resident batch scratch could not reserve {batch_count} batch row(s): {error}. Fix: split persistent-handle batches before dispatch."
))
})?;
for _ in 0..batch_count {
let mut outputs = Vec::new();
vyre_foundation::allocation::try_reserve_vec_to_capacity(
&mut outputs,
output_slots_per_batch,
)
.map_err(|error| {
PipelineError::Backend(format!(
"megakernel resident batch scratch could not reserve {output_slots_per_batch} output slot(s): {error}. Fix: reduce resident output fanout or split persistent-handle batches."
))
})?;
outputs.resize_with(output_slots_per_batch, Vec::new);
batches.push(outputs);
}
Ok(Self {
resources,
batches,
active_batches: 0,
})
}
#[must_use]
pub fn batches(&self) -> &[OutputBuffers] {
&self.batches[..self.active_batches.min(self.batches.len())]
}
pub fn batches_mut(&mut self) -> &mut Vec<OutputBuffers> {
&mut self.batches
}
pub fn clear(&mut self) {
self.resources.clear();
self.active_batches = 0;
for batch in &mut self.batches {
for output in batch {
output.clear();
}
}
}
#[must_use]
pub fn resource_capacity(&self) -> usize {
self.resources.capacity()
}
#[must_use]
pub fn batch_capacity(&self) -> usize {
self.batches.capacity()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MegakernelResidentHandles {
pub control: u64,
pub ring: u64,
pub debug_log: u64,
pub io_queue: u64,
}
impl MegakernelResidentHandles {
pub const ABI_RESOURCE_COUNT: usize = 4;
#[must_use]
pub const fn new(control: u64, ring: u64, debug_log: u64, io_queue: u64) -> Self {
Self {
control,
ring,
debug_log,
io_queue,
}
}
pub(super) fn resources(self) -> [Resource; Self::ABI_RESOURCE_COUNT] {
[
Resource::Resident(self.control),
Resource::Resident(self.ring),
Resource::Resident(self.debug_log),
Resource::Resident(self.io_queue),
]
}
}