vyre_runtime/megakernel/execution/
persistent_handles.rs1use super::{
2 nanos_u64, nested_output_bytes, nested_output_count_u32, output_bytes, output_count_u32,
3 reserve_output_shell, resident_handle_count_u32, resident_row_count_u32, Megakernel,
4 MegakernelBatchDispatchOutput, MegakernelDispatchOutput, MegakernelDispatchStats,
5 MegakernelResidentBatchScratch, MegakernelResidentHandles,
6};
7use crate::PipelineError;
8use smallvec::SmallVec;
9use std::time::Instant;
10use vyre_driver::backend::{OutputBuffers, Resource};
11
12impl Megakernel {
13 pub fn dispatch_persistent_handles(
24 &self,
25 handles: MegakernelResidentHandles,
26 ) -> Result<Vec<Vec<u8>>, PipelineError> {
27 Ok(self.dispatch_persistent_handles_observed(handles)?.buffers)
28 }
29
30 pub fn dispatch_persistent_handles_observed(
36 &self,
37 handles: MegakernelResidentHandles,
38 ) -> Result<MegakernelDispatchOutput, PipelineError> {
39 let mut buffers = Vec::new();
40 reserve_output_shell(
41 &mut buffers,
42 MegakernelResidentHandles::ABI_RESOURCE_COUNT,
43 "persistent-handle output slots",
44 )?;
45 let stats = self.dispatch_persistent_handles_into(handles, &mut buffers)?;
46 Ok(MegakernelDispatchOutput { buffers, stats })
47 }
48
49 pub fn dispatch_persistent_handles_into(
58 &self,
59 handles: MegakernelResidentHandles,
60 outputs: &mut OutputBuffers,
61 ) -> Result<MegakernelDispatchStats, PipelineError> {
62 if self.has_grid_sync && !self.backend.supports_grid_sync() {
63 return Err(PipelineError::Backend(
64 "persistent-handle dispatch cannot split GridSync barriers without backend-resident segment threading. Fix: use a backend with native grid sync or dispatch borrowed buffers through the grid-sync splitter."
65 .to_string(),
66 ));
67 }
68 let resources = handles.resources();
69 let config = self.launch_geometry().dispatch_config(None);
70 let started = Instant::now();
71 let mut recovered = false;
72 match self.dispatch_persistent_handles_once_into(&resources, &config, outputs) {
73 Ok(()) => {}
74 Err(error) if self.recovery_policy.allows_retry(&error) => {
75 self.recover_after_device_loss()?;
76 recovered = true;
77 self.dispatch_persistent_handles_once_into(&resources, &config, outputs)?
78 }
79 Err(error) => return Err(error.into()),
80 }
81 let latency_ns = nanos_u64(started.elapsed().as_nanos())?;
82 let output_bytes = output_bytes(outputs)?;
83 let output_buffers = output_count_u32(outputs)?;
84 Ok(MegakernelDispatchStats {
85 input_bytes: 0,
86 output_bytes,
87 readback_bytes: output_bytes,
88 bytes_moved: output_bytes,
89 device_allocation_bytes: 0,
90 device_allocation_events: 0,
91 latency_ns,
92 output_buffers,
93 resident_resource_rows: 1,
94 resident_resource_handles: resident_handle_count_u32(1)?,
95 kernel_launches: if recovered { 2 } else { 1 },
96 sync_points: 1,
97 recovered_after_device_loss: recovered,
98 })
99 }
100
101 pub fn dispatch_persistent_handles_many_observed(
114 &self,
115 handles: &[MegakernelResidentHandles],
116 ) -> Result<MegakernelBatchDispatchOutput, PipelineError> {
117 let mut batches = Vec::new();
118 reserve_output_shell(&mut batches, handles.len(), "persistent-handle batch rows")?;
119 let stats = self.dispatch_persistent_handles_many_into(handles, &mut batches)?;
120 Ok(MegakernelBatchDispatchOutput { batches, stats })
121 }
122
123 pub fn dispatch_persistent_handles_many_into(
134 &self,
135 handles: &[MegakernelResidentHandles],
136 batches: &mut Vec<OutputBuffers>,
137 ) -> Result<MegakernelDispatchStats, PipelineError> {
138 if handles.is_empty() {
139 batches.clear();
140 return Ok(MegakernelDispatchStats {
141 input_bytes: 0,
142 output_bytes: 0,
143 readback_bytes: 0,
144 bytes_moved: 0,
145 device_allocation_bytes: 0,
146 device_allocation_events: 0,
147 latency_ns: 0,
148 output_buffers: 0,
149 resident_resource_rows: 0,
150 resident_resource_handles: 0,
151 kernel_launches: 0,
152 sync_points: 0,
153 recovered_after_device_loss: false,
154 });
155 }
156 if self.has_grid_sync && !self.backend.supports_grid_sync() {
157 return Err(PipelineError::Backend(
158 "batched persistent-handle dispatch cannot split GridSync barriers without backend-resident segment threading. Fix: use a backend with native grid sync or dispatch borrowed buffers through the grid-sync splitter."
159 .to_string(),
160 ));
161 }
162
163 let mut resources: SmallVec<[[Resource; 4]; 16]> = SmallVec::new();
164 reserve_resource_rows_small(&mut resources, handles.len())?;
165 resources.extend(handles.iter().map(|handles| handles.resources()));
166 let config = self.launch_geometry().dispatch_config(None);
167 let started = Instant::now();
168 let mut recovered = false;
169 match self.dispatch_persistent_handle_rows_once_into(&resources, &config, batches) {
170 Ok(()) => {}
171 Err(error) if self.recovery_policy.allows_retry(&error) => {
172 self.recover_after_device_loss()?;
173 recovered = true;
174 self.dispatch_persistent_handle_rows_once_into(&resources, &config, batches)?
175 }
176 Err(error) => return Err(error.into()),
177 }
178 let latency_ns = nanos_u64(started.elapsed().as_nanos())?;
179 let output_bytes = nested_output_bytes(batches)?;
180 let output_buffers = nested_output_count_u32(batches)?;
181 let resident_resource_rows = resident_row_count_u32(handles.len())?;
182 let resident_resource_handles = resident_handle_count_u32(handles.len())?;
183 Ok(MegakernelDispatchStats {
184 input_bytes: 0,
185 output_bytes,
186 readback_bytes: output_bytes,
187 bytes_moved: output_bytes,
188 device_allocation_bytes: 0,
189 device_allocation_events: 0,
190 latency_ns,
191 output_buffers,
192 resident_resource_rows,
193 resident_resource_handles,
194 kernel_launches: if recovered { 2 } else { 1 },
195 sync_points: 1,
196 recovered_after_device_loss: recovered,
197 })
198 }
199
200 pub fn dispatch_persistent_handles_many_with_scratch(
210 &self,
211 handles: &[MegakernelResidentHandles],
212 scratch: &mut MegakernelResidentBatchScratch,
213 ) -> Result<MegakernelDispatchStats, PipelineError> {
214 if handles.is_empty() {
215 scratch.clear();
216 return Ok(MegakernelDispatchStats {
217 input_bytes: 0,
218 output_bytes: 0,
219 readback_bytes: 0,
220 bytes_moved: 0,
221 device_allocation_bytes: 0,
222 device_allocation_events: 0,
223 latency_ns: 0,
224 output_buffers: 0,
225 resident_resource_rows: 0,
226 resident_resource_handles: 0,
227 kernel_launches: 0,
228 sync_points: 0,
229 recovered_after_device_loss: false,
230 });
231 }
232 if self.has_grid_sync && !self.backend.supports_grid_sync() {
233 return Err(PipelineError::Backend(
234 "batched persistent-handle dispatch cannot split GridSync barriers without backend-resident segment threading. Fix: use a backend with native grid sync or dispatch borrowed buffers through the grid-sync splitter."
235 .to_string(),
236 ));
237 }
238
239 prepare_resource_rows_into(handles, &mut scratch.resources)?;
240 scratch.active_batches = 0;
241 let config = self.launch_geometry().dispatch_config(None);
242 let started = Instant::now();
243 let mut recovered = false;
244 match self.dispatch_persistent_handle_rows_once_into(
245 &scratch.resources,
246 &config,
247 &mut scratch.batches,
248 ) {
249 Ok(()) => {}
250 Err(error) if self.recovery_policy.allows_retry(&error) => {
251 self.recover_after_device_loss()?;
252 recovered = true;
253 self.dispatch_persistent_handle_rows_once_into(
254 &scratch.resources,
255 &config,
256 &mut scratch.batches,
257 )?
258 }
259 Err(error) => return Err(error.into()),
260 }
261 scratch.active_batches = handles.len();
262 let latency_ns = nanos_u64(started.elapsed().as_nanos())?;
263 let output_bytes = nested_output_bytes(&scratch.batches)?;
264 let output_buffers = nested_output_count_u32(&scratch.batches)?;
265 let resident_resource_rows = resident_row_count_u32(handles.len())?;
266 let resident_resource_handles = resident_handle_count_u32(handles.len())?;
267 Ok(MegakernelDispatchStats {
268 input_bytes: 0,
269 output_bytes,
270 readback_bytes: output_bytes,
271 bytes_moved: output_bytes,
272 device_allocation_bytes: 0,
273 device_allocation_events: 0,
274 latency_ns,
275 output_buffers,
276 resident_resource_rows,
277 resident_resource_handles,
278 kernel_launches: if recovered { 2 } else { 1 },
279 sync_points: 1,
280 recovered_after_device_loss: recovered,
281 })
282 }
283}
284
285fn prepare_resource_rows_into(
286 handles: &[MegakernelResidentHandles],
287 resources: &mut Vec<[Resource; 4]>,
288) -> Result<(), PipelineError> {
289 resources.clear();
290 reserve_resource_rows(resources, handles.len())?;
291 resources.extend(handles.iter().map(|handles| handles.resources()));
292 Ok(())
293}
294
295fn reserve_resource_rows(
296 rows: &mut Vec<[Resource; 4]>,
297 capacity: usize,
298) -> Result<(), PipelineError> {
299 vyre_foundation::allocation::try_reserve_vec_to_capacity(rows, capacity).map_err(|error| {
300 PipelineError::Backend(format!(
301 "megakernel resident resource-row reservation failed for {capacity} row(s): {error}. Fix: split persistent-handle dispatch batches before launch."
302 ))
303 })
304}
305
306fn reserve_resource_rows_small(
307 rows: &mut SmallVec<[[Resource; 4]; 16]>,
308 capacity: usize,
309) -> Result<(), PipelineError> {
310 vyre_foundation::allocation::try_reserve_smallvec_to_capacity(rows, capacity).map_err(
311 |error| {
312 PipelineError::Backend(format!(
313 "megakernel resident inline resource-row reservation failed for {capacity} row(s): {error}. Fix: split persistent-handle dispatch batches before launch."
314 ))
315 },
316 )
317}