use super::{
nanos_u64, nested_output_bytes, nested_output_count_u32, output_bytes, output_count_u32,
reserve_output_shell, resident_handle_count_u32, resident_row_count_u32, Megakernel,
MegakernelBatchDispatchOutput, MegakernelDispatchOutput, MegakernelDispatchStats,
MegakernelResidentBatchScratch, MegakernelResidentHandles,
};
use crate::PipelineError;
use smallvec::SmallVec;
use std::time::Instant;
use vyre_driver::backend::{OutputBuffers, Resource};
impl Megakernel {
pub fn dispatch_persistent_handles(
&self,
handles: MegakernelResidentHandles,
) -> Result<Vec<Vec<u8>>, PipelineError> {
Ok(self.dispatch_persistent_handles_observed(handles)?.buffers)
}
pub fn dispatch_persistent_handles_observed(
&self,
handles: MegakernelResidentHandles,
) -> Result<MegakernelDispatchOutput, PipelineError> {
let mut buffers = Vec::new();
reserve_output_shell(
&mut buffers,
MegakernelResidentHandles::ABI_RESOURCE_COUNT,
"persistent-handle output slots",
)?;
let stats = self.dispatch_persistent_handles_into(handles, &mut buffers)?;
Ok(MegakernelDispatchOutput { buffers, stats })
}
pub fn dispatch_persistent_handles_into(
&self,
handles: MegakernelResidentHandles,
outputs: &mut OutputBuffers,
) -> Result<MegakernelDispatchStats, PipelineError> {
if self.has_grid_sync && !self.backend.supports_grid_sync() {
return Err(PipelineError::Backend(
"persistent-handle dispatch cannot split GridSync barriers without backend-resident segment threading. Fix: use a backend with native grid sync or dispatch borrowed buffers through the grid-sync splitter."
.to_string(),
));
}
let resources = handles.resources();
let config = self.launch_geometry().dispatch_config(None);
let started = Instant::now();
let mut recovered = false;
match self.dispatch_persistent_handles_once_into(&resources, &config, outputs) {
Ok(()) => {}
Err(error) if self.recovery_policy.allows_retry(&error) => {
self.recover_after_device_loss()?;
recovered = true;
self.dispatch_persistent_handles_once_into(&resources, &config, outputs)?
}
Err(error) => return Err(error.into()),
}
let latency_ns = nanos_u64(started.elapsed().as_nanos())?;
let output_bytes = output_bytes(outputs)?;
let output_buffers = output_count_u32(outputs)?;
Ok(MegakernelDispatchStats {
input_bytes: 0,
output_bytes,
readback_bytes: output_bytes,
bytes_moved: output_bytes,
device_allocation_bytes: 0,
device_allocation_events: 0,
latency_ns,
output_buffers,
resident_resource_rows: 1,
resident_resource_handles: resident_handle_count_u32(1)?,
kernel_launches: if recovered { 2 } else { 1 },
sync_points: 1,
recovered_after_device_loss: recovered,
})
}
pub fn dispatch_persistent_handles_many_observed(
&self,
handles: &[MegakernelResidentHandles],
) -> Result<MegakernelBatchDispatchOutput, PipelineError> {
let mut batches = Vec::new();
reserve_output_shell(&mut batches, handles.len(), "persistent-handle batch rows")?;
let stats = self.dispatch_persistent_handles_many_into(handles, &mut batches)?;
Ok(MegakernelBatchDispatchOutput { batches, stats })
}
pub fn dispatch_persistent_handles_many_into(
&self,
handles: &[MegakernelResidentHandles],
batches: &mut Vec<OutputBuffers>,
) -> Result<MegakernelDispatchStats, PipelineError> {
if handles.is_empty() {
batches.clear();
return Ok(MegakernelDispatchStats {
input_bytes: 0,
output_bytes: 0,
readback_bytes: 0,
bytes_moved: 0,
device_allocation_bytes: 0,
device_allocation_events: 0,
latency_ns: 0,
output_buffers: 0,
resident_resource_rows: 0,
resident_resource_handles: 0,
kernel_launches: 0,
sync_points: 0,
recovered_after_device_loss: false,
});
}
if self.has_grid_sync && !self.backend.supports_grid_sync() {
return Err(PipelineError::Backend(
"batched persistent-handle dispatch cannot split GridSync barriers without backend-resident segment threading. Fix: use a backend with native grid sync or dispatch borrowed buffers through the grid-sync splitter."
.to_string(),
));
}
let mut resources: SmallVec<[[Resource; 4]; 16]> = SmallVec::new();
reserve_resource_rows_small(&mut resources, handles.len())?;
resources.extend(handles.iter().map(|handles| handles.resources()));
let config = self.launch_geometry().dispatch_config(None);
let started = Instant::now();
let mut recovered = false;
match self.dispatch_persistent_handle_rows_once_into(&resources, &config, batches) {
Ok(()) => {}
Err(error) if self.recovery_policy.allows_retry(&error) => {
self.recover_after_device_loss()?;
recovered = true;
self.dispatch_persistent_handle_rows_once_into(&resources, &config, batches)?
}
Err(error) => return Err(error.into()),
}
let latency_ns = nanos_u64(started.elapsed().as_nanos())?;
let output_bytes = nested_output_bytes(batches)?;
let output_buffers = nested_output_count_u32(batches)?;
let resident_resource_rows = resident_row_count_u32(handles.len())?;
let resident_resource_handles = resident_handle_count_u32(handles.len())?;
Ok(MegakernelDispatchStats {
input_bytes: 0,
output_bytes,
readback_bytes: output_bytes,
bytes_moved: output_bytes,
device_allocation_bytes: 0,
device_allocation_events: 0,
latency_ns,
output_buffers,
resident_resource_rows,
resident_resource_handles,
kernel_launches: if recovered { 2 } else { 1 },
sync_points: 1,
recovered_after_device_loss: recovered,
})
}
pub fn dispatch_persistent_handles_many_with_scratch(
&self,
handles: &[MegakernelResidentHandles],
scratch: &mut MegakernelResidentBatchScratch,
) -> Result<MegakernelDispatchStats, PipelineError> {
if handles.is_empty() {
scratch.clear();
return Ok(MegakernelDispatchStats {
input_bytes: 0,
output_bytes: 0,
readback_bytes: 0,
bytes_moved: 0,
device_allocation_bytes: 0,
device_allocation_events: 0,
latency_ns: 0,
output_buffers: 0,
resident_resource_rows: 0,
resident_resource_handles: 0,
kernel_launches: 0,
sync_points: 0,
recovered_after_device_loss: false,
});
}
if self.has_grid_sync && !self.backend.supports_grid_sync() {
return Err(PipelineError::Backend(
"batched persistent-handle dispatch cannot split GridSync barriers without backend-resident segment threading. Fix: use a backend with native grid sync or dispatch borrowed buffers through the grid-sync splitter."
.to_string(),
));
}
prepare_resource_rows_into(handles, &mut scratch.resources)?;
scratch.active_batches = 0;
let config = self.launch_geometry().dispatch_config(None);
let started = Instant::now();
let mut recovered = false;
match self.dispatch_persistent_handle_rows_once_into(
&scratch.resources,
&config,
&mut scratch.batches,
) {
Ok(()) => {}
Err(error) if self.recovery_policy.allows_retry(&error) => {
self.recover_after_device_loss()?;
recovered = true;
self.dispatch_persistent_handle_rows_once_into(
&scratch.resources,
&config,
&mut scratch.batches,
)?
}
Err(error) => return Err(error.into()),
}
scratch.active_batches = handles.len();
let latency_ns = nanos_u64(started.elapsed().as_nanos())?;
let output_bytes = nested_output_bytes(&scratch.batches)?;
let output_buffers = nested_output_count_u32(&scratch.batches)?;
let resident_resource_rows = resident_row_count_u32(handles.len())?;
let resident_resource_handles = resident_handle_count_u32(handles.len())?;
Ok(MegakernelDispatchStats {
input_bytes: 0,
output_bytes,
readback_bytes: output_bytes,
bytes_moved: output_bytes,
device_allocation_bytes: 0,
device_allocation_events: 0,
latency_ns,
output_buffers,
resident_resource_rows,
resident_resource_handles,
kernel_launches: if recovered { 2 } else { 1 },
sync_points: 1,
recovered_after_device_loss: recovered,
})
}
}
fn prepare_resource_rows_into(
handles: &[MegakernelResidentHandles],
resources: &mut Vec<[Resource; 4]>,
) -> Result<(), PipelineError> {
resources.clear();
reserve_resource_rows(resources, handles.len())?;
resources.extend(handles.iter().map(|handles| handles.resources()));
Ok(())
}
fn reserve_resource_rows(
rows: &mut Vec<[Resource; 4]>,
capacity: usize,
) -> Result<(), PipelineError> {
vyre_foundation::allocation::try_reserve_vec_to_capacity(rows, capacity).map_err(|error| {
PipelineError::Backend(format!(
"megakernel resident resource-row reservation failed for {capacity} row(s): {error}. Fix: split persistent-handle dispatch batches before launch."
))
})
}
fn reserve_resource_rows_small(
rows: &mut SmallVec<[[Resource; 4]; 16]>,
capacity: usize,
) -> Result<(), PipelineError> {
vyre_foundation::allocation::try_reserve_smallvec_to_capacity(rows, capacity).map_err(
|error| {
PipelineError::Backend(format!(
"megakernel resident inline resource-row reservation failed for {capacity} row(s): {error}. Fix: split persistent-handle dispatch batches before launch."
))
},
)
}