vyre_runtime/megakernel/execution/
types.rs1use crate::PipelineError;
2use vyre_driver::backend::{OutputBuffers, Resource};
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub struct MegakernelDispatchStats {
7 pub input_bytes: u64,
9 pub output_bytes: u64,
11 pub readback_bytes: u64,
13 pub bytes_moved: u64,
15 pub device_allocation_bytes: u64,
17 pub device_allocation_events: u32,
19 pub latency_ns: u64,
21 pub output_buffers: u32,
23 pub resident_resource_rows: u32,
25 pub resident_resource_handles: u32,
27 pub kernel_launches: u32,
29 pub sync_points: u32,
31 pub recovered_after_device_loss: bool,
34}
35
36impl MegakernelDispatchStats {
37 #[must_use]
39 pub fn output_bytes_per_second(&self) -> u64 {
40 bytes_per_second_or_panic(self.output_bytes, self.latency_ns, "output bytes")
41 }
42
43 #[must_use]
45 pub fn readback_bytes_per_second(&self) -> u64 {
46 bytes_per_second_or_panic(self.readback_bytes, self.latency_ns, "readback bytes")
47 }
48
49 #[must_use]
51 pub fn bytes_moved_per_second(&self) -> u64 {
52 bytes_per_second_or_panic(self.bytes_moved, self.latency_ns, "moved bytes")
53 }
54
55 #[must_use]
57 pub fn device_allocation_bytes_per_second(&self) -> u64 {
58 bytes_per_second_or_panic(
59 self.device_allocation_bytes,
60 self.latency_ns,
61 "device allocation bytes",
62 )
63 }
64}
65
66fn bytes_per_second_or_panic(bytes: u64, latency_ns: u64, _label: &'static str) -> u64 {
67 if latency_ns == 0 {
68 return 0;
69 }
70 let scaled = (bytes as u128) * 1_000_000_000u128;
71 let rate = scaled / u128::from(latency_ns);
72 rate.min(u128::from(u64::MAX)) as u64
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
77pub struct MegakernelDispatchOutput {
78 pub buffers: Vec<Vec<u8>>,
80 pub stats: MegakernelDispatchStats,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct MegakernelBatchDispatchOutput {
88 pub batches: Vec<Vec<Vec<u8>>>,
90 pub stats: MegakernelDispatchStats,
92}
93
94#[derive(Debug, Default)]
101pub struct MegakernelResidentBatchScratch {
102 pub(super) resources: Vec<[Resource; 4]>,
103 pub(super) batches: Vec<OutputBuffers>,
104 pub(super) active_batches: usize,
105}
106
107impl MegakernelResidentBatchScratch {
108 #[must_use]
110 pub fn new() -> Self {
111 Self::default()
112 }
113
114 #[must_use]
116 pub fn with_capacity(batch_count: usize, output_slots_per_batch: usize) -> Self {
117 match Self::try_with_capacity(batch_count, output_slots_per_batch) {
118 Ok(scratch) => scratch,
119 Err(_error) => Self::default(),
120 }
121 }
122
123 pub fn try_with_capacity(
126 batch_count: usize,
127 output_slots_per_batch: usize,
128 ) -> Result<Self, PipelineError> {
129 let mut resources = Vec::new();
130 vyre_foundation::allocation::try_reserve_vec_to_capacity(&mut resources, batch_count)
131 .map_err(|error| {
132 PipelineError::Backend(format!(
133 "megakernel resident batch scratch could not reserve {batch_count} resource row(s): {error}. Fix: split persistent-handle batches before dispatch."
134 ))
135 })?;
136 let mut batches = Vec::new();
137 vyre_foundation::allocation::try_reserve_vec_to_capacity(&mut batches, batch_count)
138 .map_err(|error| {
139 PipelineError::Backend(format!(
140 "megakernel resident batch scratch could not reserve {batch_count} batch row(s): {error}. Fix: split persistent-handle batches before dispatch."
141 ))
142 })?;
143 for _ in 0..batch_count {
144 let mut outputs = Vec::new();
145 vyre_foundation::allocation::try_reserve_vec_to_capacity(
146 &mut outputs,
147 output_slots_per_batch,
148 )
149 .map_err(|error| {
150 PipelineError::Backend(format!(
151 "megakernel resident batch scratch could not reserve {output_slots_per_batch} output slot(s): {error}. Fix: reduce resident output fanout or split persistent-handle batches."
152 ))
153 })?;
154 outputs.resize_with(output_slots_per_batch, Vec::new);
155 batches.push(outputs);
156 }
157 Ok(Self {
158 resources,
159 batches,
160 active_batches: 0,
161 })
162 }
163
164 #[must_use]
166 pub fn batches(&self) -> &[OutputBuffers] {
167 &self.batches[..self.active_batches.min(self.batches.len())]
168 }
169
170 pub fn batches_mut(&mut self) -> &mut Vec<OutputBuffers> {
173 &mut self.batches
174 }
175
176 pub fn clear(&mut self) {
178 self.resources.clear();
179 self.active_batches = 0;
180 for batch in &mut self.batches {
181 for output in batch {
182 output.clear();
183 }
184 }
185 }
186
187 #[must_use]
189 pub fn resource_capacity(&self) -> usize {
190 self.resources.capacity()
191 }
192
193 #[must_use]
195 pub fn batch_capacity(&self) -> usize {
196 self.batches.capacity()
197 }
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
206pub struct MegakernelResidentHandles {
207 pub control: u64,
209 pub ring: u64,
211 pub debug_log: u64,
213 pub io_queue: u64,
215}
216
217impl MegakernelResidentHandles {
218 pub const ABI_RESOURCE_COUNT: usize = 4;
220
221 #[must_use]
223 pub const fn new(control: u64, ring: u64, debug_log: u64, io_queue: u64) -> Self {
224 Self {
225 control,
226 ring,
227 debug_log,
228 io_queue,
229 }
230 }
231
232 pub(super) fn resources(self) -> [Resource; Self::ABI_RESOURCE_COUNT] {
233 [
234 Resource::Resident(self.control),
235 Resource::Resident(self.ring),
236 Resource::Resident(self.debug_log),
237 Resource::Resident(self.io_queue),
238 ]
239 }
240}