vyre_runtime/megakernel/io/
queue.rs1use std::sync::atomic::{fence, Ordering};
4
5use crate::PipelineError;
6
7use super::super::protocol::slot;
8use super::helpers::queue_word_index;
9use super::{io_op, io_status, io_word, IoCompletion, IO_SLOT_COUNT, IO_SLOT_WORDS};
10
11#[derive(Debug, Clone)]
14pub struct MegakernelIoQueue {
15 words: Vec<u32>,
16 slot_count: u32,
17}
18
19impl MegakernelIoQueue {
20 pub fn new(slot_count: u32) -> Result<Self, PipelineError> {
27 if slot_count == 0 {
28 return Err(PipelineError::QueueFull {
29 queue: "submission",
30 fix: "MegakernelIoQueue requires at least one slot",
31 });
32 }
33 if slot_count > IO_SLOT_COUNT {
34 return Err(PipelineError::QueueFull {
35 queue: "submission",
36 fix: "MegakernelIoQueue exceeds the compiled IO poll window of 64 slots; enlarge IO_SLOT_COUNT and rebuild the megakernel before publishing more than 64 completions",
37 });
38 }
39 let word_count = slot_count
40 .checked_mul(IO_SLOT_WORDS)
41 .ok_or(PipelineError::QueueFull {
42 queue: "submission",
43 fix: "io_queue word count overflows u32; shard the queue before allocating",
44 })?;
45 let word_count = usize::try_from(word_count).map_err(|error| {
46 PipelineError::Backend(format!(
47 "io_queue word count cannot fit host usize: {error}. Fix: shard the queue before allocating."
48 ))
49 })?;
50 Ok(Self {
51 words: vec![0; word_count],
52 slot_count,
53 })
54 }
55
56 #[must_use]
58 pub fn as_bytes(&self) -> &[u8] {
59 bytemuck::cast_slice(&self.words)
60 }
61
62 #[must_use]
64 pub fn as_mut_bytes(&mut self) -> &mut [u8] {
65 bytemuck::cast_slice_mut(&mut self.words)
66 }
67
68 #[must_use]
70 pub fn slot_count(&self) -> u32 {
71 self.slot_count
72 }
73
74 pub fn publish_slot(
84 &mut self,
85 queue_slot: u32,
86 mapped_slot: u32,
87 byte_count: u32,
88 tag: u32,
89 ) -> Result<(), PipelineError> {
90 if queue_slot >= self.slot_count {
91 return Err(PipelineError::QueueFull {
92 queue: "submission",
93 fix: "io_queue slot exceeds MegakernelIoQueue::slot_count; enlarge the queue or publish into a valid slot id",
94 });
95 }
96 let current_status = self.read_word(queue_slot, io_word::STATUS);
97 if current_status != slot::EMPTY {
98 return Err(PipelineError::QueueFull {
99 queue: "submission",
100 fix: "io_queue slot still in flight; wait for the GPU to recycle it before publishing again",
101 });
102 }
103 self.write_word_unfenced(queue_slot, io_word::OP_TYPE, io_op::READ);
104 self.write_word_unfenced(queue_slot, io_word::SRC_HANDLE, 0);
105 self.write_word_unfenced(queue_slot, io_word::DST_HANDLE, mapped_slot);
106 self.write_word_unfenced(queue_slot, io_word::OFFSET_LO, 0);
107 self.write_word_unfenced(queue_slot, io_word::OFFSET_HI, 0);
108 self.write_word_unfenced(queue_slot, io_word::BYTE_COUNT, byte_count);
109 self.write_word_unfenced(queue_slot, io_word::TAG, tag);
110 fence(Ordering::Release);
111 self.write_word_unfenced(queue_slot, io_word::STATUS, slot::PUBLISHED);
112 fence(Ordering::Release);
113 Ok(())
114 }
115
116 pub fn submit_dma_read(
127 &mut self,
128 queue_slot: u32,
129 src_handle: u32,
130 dst_handle: u32,
131 byte_count: u32,
132 tag: u32,
133 ) -> Result<(), PipelineError> {
134 if queue_slot >= self.slot_count {
135 return Err(PipelineError::QueueFull {
136 queue: "submission",
137 fix: "io_queue slot exceeds MegakernelIoQueue::slot_count; enlarge the queue or submit into a valid slot id",
138 });
139 }
140 let current_status = self.read_word(queue_slot, io_word::STATUS);
141 if current_status != slot::EMPTY {
142 return Err(PipelineError::QueueFull {
143 queue: "submission",
144 fix: "io_queue slot still in flight; wait for completion before submitting a new request",
145 });
146 }
147 self.write_word_unfenced(queue_slot, io_word::OP_TYPE, io_op::READ);
148 self.write_word_unfenced(queue_slot, io_word::SRC_HANDLE, src_handle);
149 self.write_word_unfenced(queue_slot, io_word::DST_HANDLE, dst_handle);
150 self.write_word_unfenced(queue_slot, io_word::OFFSET_LO, 0);
151 self.write_word_unfenced(queue_slot, io_word::OFFSET_HI, 0);
152 self.write_word_unfenced(queue_slot, io_word::BYTE_COUNT, byte_count);
153 self.write_word_unfenced(queue_slot, io_word::TAG, tag);
154 fence(Ordering::Release);
155 self.write_word_unfenced(queue_slot, io_word::STATUS, slot::PUBLISHED);
156 fence(Ordering::Release);
157 Ok(())
158 }
159
160 #[must_use]
162 pub fn completion(&self, queue_slot: u32) -> Option<IoCompletion> {
163 if queue_slot >= self.slot_count {
164 return None;
165 }
166 let status = self.read_word(queue_slot, io_word::STATUS);
167 if status == slot::EMPTY {
168 return None;
169 }
170 Some(IoCompletion {
171 slot_idx: queue_slot,
172 mapped_slot: self.read_word_unfenced(queue_slot, io_word::DST_HANDLE),
173 byte_count: self.read_word_unfenced(queue_slot, io_word::BYTE_COUNT),
174 tag: self.read_word_unfenced(queue_slot, io_word::TAG),
175 })
176 }
177
178 #[must_use]
180 pub fn is_recycled(&self, queue_slot: u32) -> bool {
181 if queue_slot >= self.slot_count {
182 return false;
183 }
184 let status = self.read_word(queue_slot, io_word::STATUS);
185 match status {
186 slot::EMPTY => true,
187 slot::PUBLISHED | slot::CLAIMED | io_status::OK | io_status::ERROR | slot::DONE => {
188 false
189 }
190 _ => false,
191 }
192 }
193
194 fn read_word(&self, slot_idx: u32, word: u32) -> u32 {
195 let idx = queue_word_index(slot_idx, word);
196 fence(Ordering::Acquire);
197 self.words[idx]
198 }
199
200 fn read_word_unfenced(&self, slot_idx: u32, word: u32) -> u32 {
201 let idx = queue_word_index(slot_idx, word);
202 self.words[idx]
203 }
204
205 fn write_word_unfenced(&mut self, slot_idx: u32, word: u32, value: u32) {
206 let idx = queue_word_index(slot_idx, word);
207 self.words[idx] = value;
208 }
209}