Skip to main content

vyre_runtime/megakernel/execution/
persistent_handles.rs

1use super::{
2    nanos_u64, nested_output_bytes, nested_output_count_u32, output_bytes, output_count_u32,
3    reserve_output_shell, resident_handle_count_u32, resident_row_count_u32, Megakernel,
4    MegakernelBatchDispatchOutput, MegakernelDispatchOutput, MegakernelDispatchStats,
5    MegakernelResidentBatchScratch, MegakernelResidentHandles,
6};
7use crate::PipelineError;
8use smallvec::SmallVec;
9use std::time::Instant;
10use vyre_driver::backend::{OutputBuffers, Resource};
11
12impl Megakernel {
13    /// Dispatch using backend-resident handles for all megakernel ABI buffers.
14    ///
15    /// This path never falls back to host byte buffers. If the compiled backend
16    /// pipeline does not implement resident handles, the backend's structured
17    /// unsupported-feature error is returned.
18    ///
19    /// # Errors
20    ///
21    /// Returns [`PipelineError`] when the backend rejects persistent handles,
22    /// dispatch fails, or device-loss recovery cannot rebuild the pipeline.
23    pub fn dispatch_persistent_handles(
24        &self,
25        handles: MegakernelResidentHandles,
26    ) -> Result<Vec<Vec<u8>>, PipelineError> {
27        Ok(self.dispatch_persistent_handles_observed(handles)?.buffers)
28    }
29
30    /// Dispatch using backend-resident handles and return instrumentation.
31    ///
32    /// # Errors
33    ///
34    /// See [`Megakernel::dispatch_persistent_handles`].
35    pub fn dispatch_persistent_handles_observed(
36        &self,
37        handles: MegakernelResidentHandles,
38    ) -> Result<MegakernelDispatchOutput, PipelineError> {
39        let mut buffers = Vec::new();
40        reserve_output_shell(
41            &mut buffers,
42            MegakernelResidentHandles::ABI_RESOURCE_COUNT,
43            "persistent-handle output slots",
44        )?;
45        let stats = self.dispatch_persistent_handles_into(handles, &mut buffers)?;
46        Ok(MegakernelDispatchOutput { buffers, stats })
47    }
48
49    /// Dispatch using backend-resident handles into caller-owned output storage.
50    ///
51    /// This keeps the persistent ABI buffers resident and lets callers retain
52    /// host readback allocation across repeated megakernel launches.
53    ///
54    /// # Errors
55    ///
56    /// See [`Megakernel::dispatch_persistent_handles`].
57    pub fn dispatch_persistent_handles_into(
58        &self,
59        handles: MegakernelResidentHandles,
60        outputs: &mut OutputBuffers,
61    ) -> Result<MegakernelDispatchStats, PipelineError> {
62        if self.has_grid_sync && !self.backend.supports_grid_sync() {
63            return Err(PipelineError::Backend(
64                "persistent-handle dispatch cannot split GridSync barriers without backend-resident segment threading. Fix: use a backend with native grid sync or dispatch borrowed buffers through the grid-sync splitter."
65                    .to_string(),
66            ));
67        }
68        let resources = handles.resources();
69        let config = self.launch_geometry().dispatch_config(None);
70        let started = Instant::now();
71        let mut recovered = false;
72        match self.dispatch_persistent_handles_once_into(&resources, &config, outputs) {
73            Ok(()) => {}
74            Err(error) if self.recovery_policy.allows_retry(&error) => {
75                self.recover_after_device_loss()?;
76                recovered = true;
77                self.dispatch_persistent_handles_once_into(&resources, &config, outputs)?
78            }
79            Err(error) => return Err(error.into()),
80        }
81        let latency_ns = nanos_u64(started.elapsed().as_nanos())?;
82        let output_bytes = output_bytes(outputs)?;
83        let output_buffers = output_count_u32(outputs)?;
84        Ok(MegakernelDispatchStats {
85            input_bytes: 0,
86            output_bytes,
87            readback_bytes: output_bytes,
88            bytes_moved: output_bytes,
89            device_allocation_bytes: 0,
90            device_allocation_events: 0,
91            latency_ns,
92            output_buffers,
93            resident_resource_rows: 1,
94            resident_resource_handles: resident_handle_count_u32(1)?,
95            kernel_launches: if recovered { 2 } else { 1 },
96            sync_points: 1,
97            recovered_after_device_loss: recovered,
98        })
99    }
100
101    /// Dispatch several resident megakernel submissions through the compiled
102    /// backend batch contract.
103    ///
104    /// This is the many-small-launch path: callers keep every ABI buffer
105    /// resident, then submit a slice of handle tuples so native backends can
106    /// record one command buffer or replay one graph batch instead of paying a
107    /// host submission per item.
108    ///
109    /// # Errors
110    ///
111    /// Returns [`PipelineError`] when the backend rejects persistent handles,
112    /// any item fails, or device-loss recovery cannot rebuild the pipeline.
113    pub fn dispatch_persistent_handles_many_observed(
114        &self,
115        handles: &[MegakernelResidentHandles],
116    ) -> Result<MegakernelBatchDispatchOutput, PipelineError> {
117        let mut batches = Vec::new();
118        reserve_output_shell(&mut batches, handles.len(), "persistent-handle batch rows")?;
119        let stats = self.dispatch_persistent_handles_many_into(handles, &mut batches)?;
120        Ok(MegakernelBatchDispatchOutput { batches, stats })
121    }
122
123    /// Dispatch several resident megakernel submissions into caller-owned
124    /// nested output storage.
125    ///
126    /// Existing batch rows and output slots are reused when the backend returns
127    /// the same shape, avoiding nested result-vector churn in many-small-launch
128    /// hot paths.
129    ///
130    /// # Errors
131    ///
132    /// See [`Megakernel::dispatch_persistent_handles_many_observed`].
133    pub fn dispatch_persistent_handles_many_into(
134        &self,
135        handles: &[MegakernelResidentHandles],
136        batches: &mut Vec<OutputBuffers>,
137    ) -> Result<MegakernelDispatchStats, PipelineError> {
138        if handles.is_empty() {
139            batches.clear();
140            return Ok(MegakernelDispatchStats {
141                input_bytes: 0,
142                output_bytes: 0,
143                readback_bytes: 0,
144                bytes_moved: 0,
145                device_allocation_bytes: 0,
146                device_allocation_events: 0,
147                latency_ns: 0,
148                output_buffers: 0,
149                resident_resource_rows: 0,
150                resident_resource_handles: 0,
151                kernel_launches: 0,
152                sync_points: 0,
153                recovered_after_device_loss: false,
154            });
155        }
156        if self.has_grid_sync && !self.backend.supports_grid_sync() {
157            return Err(PipelineError::Backend(
158                "batched persistent-handle dispatch cannot split GridSync barriers without backend-resident segment threading. Fix: use a backend with native grid sync or dispatch borrowed buffers through the grid-sync splitter."
159                    .to_string(),
160            ));
161        }
162
163        let mut resources: SmallVec<[[Resource; 4]; 16]> = SmallVec::new();
164        reserve_resource_rows_small(&mut resources, handles.len())?;
165        resources.extend(handles.iter().map(|handles| handles.resources()));
166        let config = self.launch_geometry().dispatch_config(None);
167        let started = Instant::now();
168        let mut recovered = false;
169        match self.dispatch_persistent_handle_rows_once_into(&resources, &config, batches) {
170            Ok(()) => {}
171            Err(error) if self.recovery_policy.allows_retry(&error) => {
172                self.recover_after_device_loss()?;
173                recovered = true;
174                self.dispatch_persistent_handle_rows_once_into(&resources, &config, batches)?
175            }
176            Err(error) => return Err(error.into()),
177        }
178        let latency_ns = nanos_u64(started.elapsed().as_nanos())?;
179        let output_bytes = nested_output_bytes(batches)?;
180        let output_buffers = nested_output_count_u32(batches)?;
181        let resident_resource_rows = resident_row_count_u32(handles.len())?;
182        let resident_resource_handles = resident_handle_count_u32(handles.len())?;
183        Ok(MegakernelDispatchStats {
184            input_bytes: 0,
185            output_bytes,
186            readback_bytes: output_bytes,
187            bytes_moved: output_bytes,
188            device_allocation_bytes: 0,
189            device_allocation_events: 0,
190            latency_ns,
191            output_buffers,
192            resident_resource_rows,
193            resident_resource_handles,
194            kernel_launches: if recovered { 2 } else { 1 },
195            sync_points: 1,
196            recovered_after_device_loss: recovered,
197        })
198    }
199
200    /// Dispatch several resident megakernel submissions through reusable
201    /// resident-batch scratch.
202    ///
203    /// This is the allocation-stable many-small-launch path: resource rows and
204    /// returned output batches stay owned by `scratch` across calls.
205    ///
206    /// # Errors
207    ///
208    /// See [`Megakernel::dispatch_persistent_handles_many_observed`].
209    pub fn dispatch_persistent_handles_many_with_scratch(
210        &self,
211        handles: &[MegakernelResidentHandles],
212        scratch: &mut MegakernelResidentBatchScratch,
213    ) -> Result<MegakernelDispatchStats, PipelineError> {
214        if handles.is_empty() {
215            scratch.clear();
216            return Ok(MegakernelDispatchStats {
217                input_bytes: 0,
218                output_bytes: 0,
219                readback_bytes: 0,
220                bytes_moved: 0,
221                device_allocation_bytes: 0,
222                device_allocation_events: 0,
223                latency_ns: 0,
224                output_buffers: 0,
225                resident_resource_rows: 0,
226                resident_resource_handles: 0,
227                kernel_launches: 0,
228                sync_points: 0,
229                recovered_after_device_loss: false,
230            });
231        }
232        if self.has_grid_sync && !self.backend.supports_grid_sync() {
233            return Err(PipelineError::Backend(
234                "batched persistent-handle dispatch cannot split GridSync barriers without backend-resident segment threading. Fix: use a backend with native grid sync or dispatch borrowed buffers through the grid-sync splitter."
235                    .to_string(),
236            ));
237        }
238
239        prepare_resource_rows_into(handles, &mut scratch.resources)?;
240        scratch.active_batches = 0;
241        let config = self.launch_geometry().dispatch_config(None);
242        let started = Instant::now();
243        let mut recovered = false;
244        match self.dispatch_persistent_handle_rows_once_into(
245            &scratch.resources,
246            &config,
247            &mut scratch.batches,
248        ) {
249            Ok(()) => {}
250            Err(error) if self.recovery_policy.allows_retry(&error) => {
251                self.recover_after_device_loss()?;
252                recovered = true;
253                self.dispatch_persistent_handle_rows_once_into(
254                    &scratch.resources,
255                    &config,
256                    &mut scratch.batches,
257                )?
258            }
259            Err(error) => return Err(error.into()),
260        }
261        scratch.active_batches = handles.len();
262        let latency_ns = nanos_u64(started.elapsed().as_nanos())?;
263        let output_bytes = nested_output_bytes(&scratch.batches)?;
264        let output_buffers = nested_output_count_u32(&scratch.batches)?;
265        let resident_resource_rows = resident_row_count_u32(handles.len())?;
266        let resident_resource_handles = resident_handle_count_u32(handles.len())?;
267        Ok(MegakernelDispatchStats {
268            input_bytes: 0,
269            output_bytes,
270            readback_bytes: output_bytes,
271            bytes_moved: output_bytes,
272            device_allocation_bytes: 0,
273            device_allocation_events: 0,
274            latency_ns,
275            output_buffers,
276            resident_resource_rows,
277            resident_resource_handles,
278            kernel_launches: if recovered { 2 } else { 1 },
279            sync_points: 1,
280            recovered_after_device_loss: recovered,
281        })
282    }
283}
284
285fn prepare_resource_rows_into(
286    handles: &[MegakernelResidentHandles],
287    resources: &mut Vec<[Resource; 4]>,
288) -> Result<(), PipelineError> {
289    resources.clear();
290    reserve_resource_rows(resources, handles.len())?;
291    resources.extend(handles.iter().map(|handles| handles.resources()));
292    Ok(())
293}
294
295fn reserve_resource_rows(
296    rows: &mut Vec<[Resource; 4]>,
297    capacity: usize,
298) -> Result<(), PipelineError> {
299    vyre_foundation::allocation::try_reserve_vec_to_capacity(rows, capacity).map_err(|error| {
300        PipelineError::Backend(format!(
301            "megakernel resident resource-row reservation failed for {capacity} row(s): {error}. Fix: split persistent-handle dispatch batches before launch."
302        ))
303    })
304}
305
306fn reserve_resource_rows_small(
307    rows: &mut SmallVec<[[Resource; 4]; 16]>,
308    capacity: usize,
309) -> Result<(), PipelineError> {
310    vyre_foundation::allocation::try_reserve_smallvec_to_capacity(rows, capacity).map_err(
311        |error| {
312            PipelineError::Backend(format!(
313                "megakernel resident inline resource-row reservation failed for {capacity} row(s): {error}. Fix: split persistent-handle dispatch batches before launch."
314            ))
315        },
316    )
317}