1use super::execution::{Megakernel, MegakernelDispatchStats, MegakernelResidentHandles};
4use super::io;
5use super::planner::MegakernelWorkItem;
6use super::protocol;
7use super::protocol_api::{validate_control_bytes, validate_debug_log_bytes};
8use super::readback::MegakernelReadback;
9use super::scheduler::write_default_priority_offsets;
10use crate::PipelineError;
11use vyre_driver::backend::OutputBuffers;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct MegakernelResidentDispatchScratch {
16 readback: MegakernelReadback,
17 outputs: OutputBuffers,
18}
19
20impl Default for MegakernelResidentDispatchScratch {
21 fn default() -> Self {
22 Self {
23 readback: MegakernelReadback::default(),
24 outputs: (0..MegakernelResidentHandles::ABI_RESOURCE_COUNT)
25 .map(|_| Vec::new())
26 .collect(),
27 }
28 }
29}
30
31impl MegakernelResidentDispatchScratch {
32 #[must_use]
36 pub fn new() -> Self {
37 Self::default()
38 }
39
40 #[must_use]
42 pub fn retained_output_slots(&self) -> usize {
43 self.outputs.len()
44 }
45
46 #[must_use]
48 pub fn retained_output_bytes(&self) -> usize {
49 self.outputs.iter().map(Vec::capacity).sum()
50 }
51}
52
53#[derive(Debug)]
56pub struct MegakernelResidentBuffers {
57 control_bytes: Vec<u8>,
58 ring_bytes: Vec<u8>,
59 debug_log_bytes: Vec<u8>,
60 io_queue_bytes: Vec<u8>,
61 slot_count: u32,
62 scratch: MegakernelResidentDispatchScratch,
63}
64
65impl Clone for MegakernelResidentBuffers {
66 fn clone(&self) -> Self {
67 Self {
68 control_bytes: self.control_bytes.clone(),
69 ring_bytes: self.ring_bytes.clone(),
70 debug_log_bytes: self.debug_log_bytes.clone(),
71 io_queue_bytes: self.io_queue_bytes.clone(),
72 slot_count: self.slot_count,
73 scratch: MegakernelResidentDispatchScratch::new(),
74 }
75 }
76}
77
78impl PartialEq for MegakernelResidentBuffers {
79 fn eq(&self, other: &Self) -> bool {
80 self.control_bytes == other.control_bytes
81 && self.ring_bytes == other.ring_bytes
82 && self.debug_log_bytes == other.debug_log_bytes
83 && self.io_queue_bytes == other.io_queue_bytes
84 && self.slot_count == other.slot_count
85 }
86}
87
88impl Eq for MegakernelResidentBuffers {}
89
90impl MegakernelResidentBuffers {
91 pub fn new(
97 slot_count: u32,
98 tenant_count: u32,
99 observable_slots: u32,
100 ) -> Result<Self, PipelineError> {
101 let control_capacity = protocol::control_byte_len(observable_slots).ok_or_else(|| {
102 PipelineError::Backend(
103 "megakernel resident control byte length overflowed usize. Fix: shard observable resident buffers before allocation."
104 .to_string(),
105 )
106 })?;
107 let ring_capacity = protocol::ring_byte_len(slot_count).ok_or_else(|| {
108 PipelineError::Backend(
109 "megakernel resident ring byte length overflowed usize. Fix: shard resident rings before allocation."
110 .to_string(),
111 )
112 })?;
113 let debug_log_capacity =
114 protocol::debug_log_byte_len(protocol::debug::RECORD_CAPACITY).ok_or_else(|| {
115 PipelineError::Backend(
116 "megakernel resident debug-log byte length overflowed usize. Fix: reduce debug record capacity before allocation."
117 .to_string(),
118 )
119 })?;
120 let io_queue_capacity = io::empty_io_queue_byte_len(io::IO_SLOT_COUNT)?;
121 let mut control_bytes = Vec::new();
122 reserve_resident_bytes(
123 &mut control_bytes,
124 control_capacity,
125 "control",
126 "shard observable resident buffers before allocation",
127 )?;
128 let mut ring_bytes = Vec::new();
129 reserve_resident_bytes(
130 &mut ring_bytes,
131 ring_capacity,
132 "ring",
133 "shard resident rings before allocation",
134 )?;
135 let mut debug_log_bytes = Vec::new();
136 reserve_resident_bytes(
137 &mut debug_log_bytes,
138 debug_log_capacity,
139 "debug-log",
140 "reduce debug record capacity before allocation",
141 )?;
142 let mut io_queue_bytes = Vec::new();
143 reserve_resident_bytes(
144 &mut io_queue_bytes,
145 io_queue_capacity,
146 "io-queue",
147 "reduce resident IO queue capacity before allocation",
148 )?;
149 let mut buffers = Self {
150 control_bytes,
151 ring_bytes,
152 debug_log_bytes,
153 io_queue_bytes,
154 slot_count,
155 scratch: MegakernelResidentDispatchScratch::new(),
156 };
157 buffers.reset(tenant_count, observable_slots)?;
158 Ok(buffers)
159 }
160
161 pub fn reset(&mut self, tenant_count: u32, observable_slots: u32) -> Result<(), PipelineError> {
167 Megakernel::try_encode_control_into(
168 false,
169 tenant_count,
170 observable_slots,
171 &mut self.control_bytes,
172 )?;
173 write_default_priority_offsets(&mut self.control_bytes, self.slot_count)?;
174 Megakernel::try_encode_empty_ring_into(self.slot_count, &mut self.ring_bytes)?;
175 Megakernel::try_encode_empty_debug_log_into(
176 protocol::debug::RECORD_CAPACITY,
177 &mut self.debug_log_bytes,
178 )?;
179 io::try_encode_empty_io_queue_into(io::IO_SLOT_COUNT, &mut self.io_queue_bytes)?;
180 Ok(())
181 }
182
183 pub fn from_parts(
189 slot_count: u32,
190 control_bytes: Vec<u8>,
191 ring_bytes: Vec<u8>,
192 debug_log_bytes: Vec<u8>,
193 io_queue_bytes: Vec<u8>,
194 ) -> Result<Self, PipelineError> {
195 validate_control_bytes(&control_bytes)?;
196 validate_debug_log_bytes(&debug_log_bytes)?;
197 io::validate_io_queue_bytes(&io_queue_bytes)?;
198 let expected_ring_bytes = protocol::ring_byte_len(slot_count).ok_or_else(|| {
199 PipelineError::Backend(
200 "megakernel resident ring byte length overflowed usize. Fix: shard resident rings before allocation."
201 .to_string(),
202 )
203 })?;
204 if ring_bytes.len() != expected_ring_bytes {
205 return Err(PipelineError::Backend(format!(
206 "megakernel resident ring has {} bytes, expected {expected_ring_bytes}. Fix: build resident rings with the same slot_count as the Megakernel handle.",
207 ring_bytes.len()
208 )));
209 }
210 Ok(Self {
211 control_bytes,
212 ring_bytes,
213 debug_log_bytes,
214 io_queue_bytes,
215 slot_count,
216 scratch: MegakernelResidentDispatchScratch::new(),
217 })
218 }
219
220 pub fn publish_slot(
227 &mut self,
228 slot_idx: u32,
229 tenant_id: u32,
230 opcode: u32,
231 args: &[u32],
232 ) -> Result<(), PipelineError> {
233 Megakernel::publish_slot(&mut self.ring_bytes, slot_idx, tenant_id, opcode, args)
234 }
235
236 pub fn publish_work_items(
244 &mut self,
245 start_slot: u32,
246 tenant_id: u32,
247 items: &[MegakernelWorkItem],
248 ) -> Result<u32, PipelineError> {
249 Megakernel::publish_work_items(&mut self.ring_bytes, start_slot, tenant_id, items)
250 }
251
252 pub fn apply_readback(&mut self, readback: MegakernelReadback) {
254 self.control_bytes = readback.control_bytes;
255 self.ring_bytes = readback.ring_bytes;
256 self.debug_log_bytes = readback.debug_log_bytes;
257 self.io_queue_bytes = readback.io_queue_bytes;
258 }
259
260 pub fn dispatch(
266 &mut self,
267 megakernel: &Megakernel,
268 ) -> Result<MegakernelReadback, PipelineError> {
269 self.dispatch_update(megakernel)?;
270 Ok(self.snapshot_readback())
271 }
272
273 pub fn dispatch_update(&mut self, megakernel: &Megakernel) -> Result<(), PipelineError> {
280 self.dispatch_update_observed(megakernel)?;
281 Ok(())
282 }
283
284 pub fn dispatch_update_observed(
291 &mut self,
292 megakernel: &Megakernel,
293 ) -> Result<MegakernelDispatchStats, PipelineError> {
294 if megakernel.slot_count() != self.slot_count {
295 return Err(PipelineError::Backend(format!(
296 "resident buffer slot_count {} does not match megakernel slot_count {}. Fix: allocate resident buffers from the same Megakernel geometry.",
297 self.slot_count,
298 megakernel.slot_count()
299 )));
300 }
301 let stats = megakernel.dispatch_with_io_queue_readback_borrowed_into(
302 &self.control_bytes,
303 &self.ring_bytes,
304 &self.debug_log_bytes,
305 &self.io_queue_bytes,
306 &mut self.scratch.readback,
307 &mut self.scratch.outputs,
308 )?;
309 std::mem::swap(
310 &mut self.control_bytes,
311 &mut self.scratch.readback.control_bytes,
312 );
313 std::mem::swap(&mut self.ring_bytes, &mut self.scratch.readback.ring_bytes);
314 std::mem::swap(
315 &mut self.debug_log_bytes,
316 &mut self.scratch.readback.debug_log_bytes,
317 );
318 std::mem::swap(
319 &mut self.io_queue_bytes,
320 &mut self.scratch.readback.io_queue_bytes,
321 );
322 Ok(stats)
323 }
324
325 pub fn dispatch_update_observed_with_scratch(
332 &mut self,
333 megakernel: &Megakernel,
334 scratch: &mut MegakernelResidentDispatchScratch,
335 ) -> Result<MegakernelDispatchStats, PipelineError> {
336 if megakernel.slot_count() != self.slot_count {
337 return Err(PipelineError::Backend(format!(
338 "resident buffer slot_count {} does not match megakernel slot_count {}. Fix: allocate resident buffers from the same Megakernel geometry.",
339 self.slot_count,
340 megakernel.slot_count()
341 )));
342 }
343 let stats = megakernel.dispatch_with_io_queue_readback_borrowed_into(
344 &self.control_bytes,
345 &self.ring_bytes,
346 &self.debug_log_bytes,
347 &self.io_queue_bytes,
348 &mut scratch.readback,
349 &mut scratch.outputs,
350 )?;
351 self.swap_readback_from(&mut scratch.readback);
352 Ok(stats)
353 }
354
355 fn swap_readback_from(&mut self, readback: &mut MegakernelReadback) {
356 std::mem::swap(&mut self.control_bytes, &mut readback.control_bytes);
357 std::mem::swap(&mut self.ring_bytes, &mut readback.ring_bytes);
358 std::mem::swap(&mut self.debug_log_bytes, &mut readback.debug_log_bytes);
359 std::mem::swap(&mut self.io_queue_bytes, &mut readback.io_queue_bytes);
360 }
361
362 #[must_use]
364 pub fn snapshot_readback(&self) -> MegakernelReadback {
365 MegakernelReadback {
366 control_bytes: self.control_bytes.clone(),
367 ring_bytes: self.ring_bytes.clone(),
368 debug_log_bytes: self.debug_log_bytes.clone(),
369 io_queue_bytes: self.io_queue_bytes.clone(),
370 }
371 }
372
373 pub fn snapshot_readback_into(&self, out: &mut MegakernelReadback) {
375 out.control_bytes.clone_from(&self.control_bytes);
376 out.ring_bytes.clone_from(&self.ring_bytes);
377 out.debug_log_bytes.clone_from(&self.debug_log_bytes);
378 out.io_queue_bytes.clone_from(&self.io_queue_bytes);
379 }
380
381 #[must_use]
383 pub fn control_bytes(&self) -> &[u8] {
384 &self.control_bytes
385 }
386
387 #[must_use]
389 pub fn ring_bytes(&self) -> &[u8] {
390 &self.ring_bytes
391 }
392
393 #[must_use]
395 pub fn ring_bytes_mut(&mut self) -> &mut [u8] {
396 &mut self.ring_bytes
397 }
398
399 #[must_use]
401 pub fn debug_log_bytes(&self) -> &[u8] {
402 &self.debug_log_bytes
403 }
404
405 #[must_use]
407 pub fn io_queue_bytes(&self) -> &[u8] {
408 &self.io_queue_bytes
409 }
410
411 #[must_use]
413 pub const fn slot_count(&self) -> u32 {
414 self.slot_count
415 }
416
417 #[must_use]
420 pub fn retained_default_output_slots(&self) -> usize {
421 self.scratch.retained_output_slots()
422 }
423
424 #[must_use]
426 pub fn retained_default_output_bytes(&self) -> usize {
427 self.scratch.retained_output_bytes()
428 }
429}
430
431fn reserve_resident_bytes(
432 bytes: &mut Vec<u8>,
433 capacity: usize,
434 label: &'static str,
435 fix: &'static str,
436) -> Result<(), PipelineError> {
437 vyre_foundation::allocation::try_reserve_vec_to_capacity(bytes, capacity).map_err(|error| {
438 PipelineError::Backend(format!(
439 "megakernel resident {label} byte reservation failed for {capacity} bytes: {error}. Fix: {fix}."
440 ))
441 })
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447 use crate::megakernel::protocol::opcode;
448 use std::sync::Arc;
449 use vyre_driver::backend::{CompiledPipeline, DispatchConfig, VyreBackend};
450
451 struct ResidentEchoPipeline;
452
453 impl vyre_driver::backend::private::Sealed for ResidentEchoPipeline {}
454
455 impl CompiledPipeline for ResidentEchoPipeline {
456 fn id(&self) -> &str {
457 "resident-echo:pipeline"
458 }
459
460 fn dispatch(
461 &self,
462 inputs: &[Vec<u8>],
463 _config: &DispatchConfig,
464 ) -> Result<OutputBuffers, vyre_driver::BackendError> {
465 Ok(inputs.to_vec())
466 }
467
468 fn dispatch_borrowed_into(
469 &self,
470 inputs: &[&[u8]],
471 _config: &DispatchConfig,
472 outputs: &mut OutputBuffers,
473 ) -> Result<(), vyre_driver::BackendError> {
474 if outputs.len() != inputs.len() {
475 outputs.resize_with(inputs.len(), Vec::new);
476 }
477 for (slot, input) in outputs.iter_mut().zip(inputs.iter().copied()) {
478 slot.clear();
479 slot.extend_from_slice(input);
480 }
481 Ok(())
482 }
483 }
484
485 struct ResidentEchoBackend;
486
487 impl vyre_driver::backend::private::Sealed for ResidentEchoBackend {}
488
489 impl VyreBackend for ResidentEchoBackend {
490 fn id(&self) -> &'static str {
491 "resident-echo"
492 }
493
494 fn dispatch(
495 &self,
496 _program: &vyre_foundation::ir::Program,
497 inputs: &[Vec<u8>],
498 _config: &DispatchConfig,
499 ) -> Result<OutputBuffers, vyre_driver::BackendError> {
500 Ok(inputs.to_vec())
501 }
502
503 fn compile_native(
504 &self,
505 _program: &vyre_foundation::ir::Program,
506 _config: &DispatchConfig,
507 ) -> Result<Option<Arc<dyn CompiledPipeline>>, vyre_driver::BackendError> {
508 Ok(Some(Arc::new(ResidentEchoPipeline)))
509 }
510 }
511
512 #[test]
513 fn resident_buffers_keep_runtime_abi_separate_from_publish_logic() {
514 let mut buffers = MegakernelResidentBuffers::new(4, 2, 8).unwrap();
515 buffers
516 .publish_slot(2, 1, opcode::STORE_U32, &[7, 9])
517 .unwrap();
518 assert_eq!(buffers.slot_count(), 4);
519 assert_eq!(
520 buffers.ring_bytes().len(),
521 protocol::ring_byte_len(4).unwrap()
522 );
523 }
524
525 #[test]
526 fn resident_buffers_publish_work_items_without_ring_reset() {
527 let mut buffers = MegakernelResidentBuffers::new(4, 2, 8).unwrap();
528 let sentinel = 0xCAFE_BABEu32;
529 let sentinel_offset =
530 (3 * protocol::SLOT_WORDS as usize + protocol::ARG0_WORD as usize) * 4;
531 buffers.ring_bytes_mut()[sentinel_offset..sentinel_offset + 4]
532 .copy_from_slice(&sentinel.to_le_bytes());
533 let items = [MegakernelWorkItem {
534 op_handle: opcode::STORE_U32,
535 input_handle: 10,
536 output_handle: 20,
537 param: 30,
538 }];
539
540 let published = buffers.publish_work_items(1, 2, &items).unwrap();
541
542 assert_eq!(published, 1);
543 let read = |slot: usize, word: u32| {
544 let start = (slot * protocol::SLOT_WORDS as usize + word as usize) * 4;
545 u32::from_le_bytes(buffers.ring_bytes()[start..start + 4].try_into().unwrap())
546 };
547 assert_eq!(read(1, protocol::STATUS_WORD), protocol::slot::PUBLISHED);
548 assert_eq!(read(1, protocol::OPCODE_WORD), opcode::STORE_U32);
549 assert_eq!(read(1, protocol::TENANT_WORD), 2);
550 assert_eq!(read(1, protocol::ARG0_WORD), 10);
551 assert_eq!(read(1, protocol::ARG0_WORD + 1), 20);
552 assert_eq!(read(1, protocol::ARG0_WORD + 2), 30);
553 assert_eq!(read(3, protocol::ARG0_WORD), sentinel);
554 }
555
556 #[test]
557 fn resident_buffers_seed_priority_offsets_for_priority_scheduler() {
558 let buffers = MegakernelResidentBuffers::new(10, 2, 0).unwrap();
559 let read = |word: u32| {
560 let start = word as usize * 4;
561 u32::from_le_bytes(
562 buffers.control_bytes()[start..start + 4]
563 .try_into()
564 .unwrap(),
565 )
566 };
567 assert_eq!(read(protocol::control::PRIORITY_OFFSETS_BASE), 0);
568 assert_eq!(
569 read(
570 protocol::control::PRIORITY_OFFSETS_BASE + super::super::scheduler::PRIORITY_LEVELS
571 ),
572 10
573 );
574 }
575
576 #[test]
577 fn resident_buffers_reset_reuses_encoded_storage() {
578 let mut buffers = MegakernelResidentBuffers::new(8, 2, 4).unwrap();
579 let control_ptr = buffers.control_bytes.as_ptr();
580 let ring_ptr = buffers.ring_bytes.as_ptr();
581 let debug_ptr = buffers.debug_log_bytes.as_ptr();
582 let io_ptr = buffers.io_queue_bytes.as_ptr();
583
584 buffers.reset(2, 4).unwrap();
585
586 assert_eq!(buffers.control_bytes.as_ptr(), control_ptr);
587 assert_eq!(buffers.ring_bytes.as_ptr(), ring_ptr);
588 assert_eq!(buffers.debug_log_bytes.as_ptr(), debug_ptr);
589 assert_eq!(buffers.io_queue_bytes.as_ptr(), io_ptr);
590 assert!(buffers.ring_bytes.iter().all(|byte| *byte == 0));
591 }
592
593 #[test]
594 fn resident_buffers_preallocate_exact_runtime_buffer_capacities() {
595 let buffers = MegakernelResidentBuffers::new(8, 2, 4).unwrap();
596 assert_eq!(
597 buffers.control_bytes.capacity(),
598 buffers.control_bytes.len()
599 );
600 assert_eq!(buffers.ring_bytes.capacity(), buffers.ring_bytes.len());
601 assert_eq!(
602 buffers.debug_log_bytes.capacity(),
603 buffers.debug_log_bytes.len()
604 );
605 assert_eq!(
606 buffers.io_queue_bytes.capacity(),
607 buffers.io_queue_bytes.len()
608 );
609 }
610
611 #[test]
612 fn resident_buffers_reject_mismatched_ring_shape() {
613 let control = Megakernel::try_encode_control(false, 1, 0).unwrap();
614 let ring = Megakernel::try_encode_empty_ring(2).unwrap();
615 let debug =
616 Megakernel::try_encode_empty_debug_log(protocol::debug::RECORD_CAPACITY).unwrap();
617 let io = io::try_encode_empty_io_queue(io::IO_SLOT_COUNT).unwrap();
618 let error = MegakernelResidentBuffers::from_parts(4, control, ring, debug, io)
619 .expect_err("resident ring shape must match declared slot count");
620 assert!(error.to_string().contains("resident ring"));
621 }
622
623 #[test]
624 fn snapshot_readback_into_reuses_buffers() {
625 let buffers = MegakernelResidentBuffers::new(4, 2, 8).unwrap();
626 let mut readback = buffers.snapshot_readback();
627 let control_capacity = readback.control_bytes.capacity();
628 let ring_capacity = readback.ring_bytes.capacity();
629 let debug_capacity = readback.debug_log_bytes.capacity();
630 let io_capacity = readback.io_queue_bytes.capacity();
631
632 buffers.snapshot_readback_into(&mut readback);
633 assert_eq!(readback.control_bytes.capacity(), control_capacity);
634 assert_eq!(readback.ring_bytes.capacity(), ring_capacity);
635 assert_eq!(readback.debug_log_bytes.capacity(), debug_capacity);
636 assert_eq!(readback.io_queue_bytes.capacity(), io_capacity);
637 assert_eq!(readback.ring_bytes, buffers.ring_bytes());
638 }
639
640 #[test]
641 fn resident_readback_swap_preserves_previous_mirror_for_scratch_reuse() {
642 let mut buffers = MegakernelResidentBuffers::new(4, 2, 8).unwrap();
643 let previous_control = buffers.control_bytes.clone();
644 let previous_ring = buffers.ring_bytes.clone();
645 let previous_debug = buffers.debug_log_bytes.clone();
646 let previous_io = buffers.io_queue_bytes.clone();
647 let mut readback = MegakernelReadback {
648 control_bytes: Megakernel::try_encode_control(false, 3, 8).unwrap(),
649 ring_bytes: Megakernel::try_encode_empty_ring(4).unwrap(),
650 debug_log_bytes: Megakernel::try_encode_empty_debug_log(
651 protocol::debug::RECORD_CAPACITY,
652 )
653 .unwrap(),
654 io_queue_bytes: io::try_encode_empty_io_queue(io::IO_SLOT_COUNT).unwrap(),
655 };
656
657 buffers.swap_readback_from(&mut readback);
658
659 assert_eq!(readback.control_bytes, previous_control);
660 assert_eq!(readback.ring_bytes, previous_ring);
661 assert_eq!(readback.debug_log_bytes, previous_debug);
662 assert_eq!(readback.io_queue_bytes, previous_io);
663 assert_ne!(buffers.control_bytes(), readback.control_bytes.as_slice());
664 }
665
666 #[test]
667 fn default_dispatch_update_reuses_internal_output_scratch() {
668 let kernel = Megakernel::bootstrap_sharded(Arc::new(ResidentEchoBackend), 1, 1, Vec::new())
669 .expect("Fix: resident echo backend must compile megakernel");
670 let mut buffers = MegakernelResidentBuffers::new(1, 1, 0).unwrap();
671 assert_eq!(
672 buffers.retained_default_output_slots(),
673 MegakernelResidentHandles::ABI_RESOURCE_COUNT,
674 "Fix: resident dispatch scratch must pre-seed ABI output slots before the first dispatch."
675 );
676 let initial_output_slots_ptr = buffers.scratch.outputs.as_ptr();
677
678 buffers
679 .dispatch_update_observed(&kernel)
680 .expect("Fix: default resident dispatch update must use reusable scratch");
681 assert_eq!(buffers.retained_default_output_slots(), 4);
682 let output_slots_ptr = buffers.scratch.outputs.as_ptr();
683 assert_eq!(
684 output_slots_ptr, initial_output_slots_ptr,
685 "Fix: first resident dispatch update must not grow the output shell."
686 );
687
688 buffers
689 .dispatch_update_observed(&kernel)
690 .expect("Fix: repeated default resident dispatch update must reuse scratch");
691
692 assert_eq!(buffers.retained_default_output_slots(), 4);
693 assert_eq!(buffers.scratch.outputs.as_ptr(), output_slots_ptr);
694 assert!(buffers.retained_default_output_bytes() > 0);
695 }
696}