Skip to main content

vyre_runtime/megakernel/io/
queue.rs

1//! [`MegakernelIoQueue`]  -  high-level wrapper around the raw queue bytes.
2
3use std::sync::atomic::{fence, Ordering};
4
5use crate::PipelineError;
6
7use super::super::protocol::slot;
8use super::helpers::queue_word_index;
9use super::{io_op, io_status, io_word, IoCompletion, IO_SLOT_COUNT, IO_SLOT_WORDS};
10
11/// Host-side handle to the megakernel IO queue. Wraps a `Vec<u32>` slot ring
12/// and exposes typed poll/publish/complete entry points.
13#[derive(Debug, Clone)]
14pub struct MegakernelIoQueue {
15    words: Vec<u32>,
16    slot_count: u32,
17}
18
19impl MegakernelIoQueue {
20    /// Allocate an empty queue with `slot_count` entries.
21    ///
22    /// # Errors
23    ///
24    /// Returns [`PipelineError::QueueFull`] when `slot_count` is zero or
25    /// exceeds the IR/program's fixed poll window of [`IO_SLOT_COUNT`].
26    pub fn new(slot_count: u32) -> Result<Self, PipelineError> {
27        if slot_count == 0 {
28            return Err(PipelineError::QueueFull {
29                queue: "submission",
30                fix: "MegakernelIoQueue requires at least one slot",
31            });
32        }
33        if slot_count > IO_SLOT_COUNT {
34            return Err(PipelineError::QueueFull {
35                queue: "submission",
36                fix: "MegakernelIoQueue exceeds the compiled IO poll window of 64 slots; enlarge IO_SLOT_COUNT and rebuild the megakernel before publishing more than 64 completions",
37            });
38        }
39        let word_count = slot_count
40            .checked_mul(IO_SLOT_WORDS)
41            .ok_or(PipelineError::QueueFull {
42                queue: "submission",
43                fix: "io_queue word count overflows u32; shard the queue before allocating",
44            })?;
45        let word_count = usize::try_from(word_count).map_err(|error| {
46            PipelineError::Backend(format!(
47                "io_queue word count cannot fit host usize: {error}. Fix: shard the queue before allocating."
48            ))
49        })?;
50        Ok(Self {
51            words: vec![0; word_count],
52            slot_count,
53        })
54    }
55
56    /// Borrow the raw bytes for backend upload / readback.
57    #[must_use]
58    pub fn as_bytes(&self) -> &[u8] {
59        bytemuck::cast_slice(&self.words)
60    }
61
62    /// Mutably borrow the raw bytes for backend upload / host updates.
63    #[must_use]
64    pub fn as_mut_bytes(&mut self) -> &mut [u8] {
65        bytemuck::cast_slice_mut(&mut self.words)
66    }
67
68    /// Queue capacity in slots.
69    #[must_use]
70    pub fn slot_count(&self) -> u32 {
71        self.slot_count
72    }
73
74    /// Publish a completed DMA slot so the megakernel can consume it.
75    ///
76    /// The host writes the metadata first, then flips `STATUS` to
77    /// `slot::PUBLISHED` as the publication barrier.
78    ///
79    /// # Errors
80    ///
81    /// Returns [`PipelineError::QueueFull`] when the slot is out of bounds or
82    /// still owned by the GPU/host from a prior ingest.
83    pub fn publish_slot(
84        &mut self,
85        queue_slot: u32,
86        mapped_slot: u32,
87        byte_count: u32,
88        tag: u32,
89    ) -> Result<(), PipelineError> {
90        if queue_slot >= self.slot_count {
91            return Err(PipelineError::QueueFull {
92                queue: "submission",
93                fix: "io_queue slot exceeds MegakernelIoQueue::slot_count; enlarge the queue or publish into a valid slot id",
94            });
95        }
96        let current_status = self.read_word(queue_slot, io_word::STATUS);
97        if current_status != slot::EMPTY {
98            return Err(PipelineError::QueueFull {
99                queue: "submission",
100                fix: "io_queue slot still in flight; wait for the GPU to recycle it before publishing again",
101            });
102        }
103        self.write_word_unfenced(queue_slot, io_word::OP_TYPE, io_op::READ);
104        self.write_word_unfenced(queue_slot, io_word::SRC_HANDLE, 0);
105        self.write_word_unfenced(queue_slot, io_word::DST_HANDLE, mapped_slot);
106        self.write_word_unfenced(queue_slot, io_word::OFFSET_LO, 0);
107        self.write_word_unfenced(queue_slot, io_word::OFFSET_HI, 0);
108        self.write_word_unfenced(queue_slot, io_word::BYTE_COUNT, byte_count);
109        self.write_word_unfenced(queue_slot, io_word::TAG, tag);
110        fence(Ordering::Release);
111        self.write_word_unfenced(queue_slot, io_word::STATUS, slot::PUBLISHED);
112        fence(Ordering::Release);
113        Ok(())
114    }
115
116    /// Submit a DMA-read request to the IO queue.
117    ///
118    /// This is the GPU-initiated path: the caller writes the request metadata,
119    /// then flips `STATUS` to `slot::PUBLISHED` so the host/runtime can claim
120    /// and service it.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`PipelineError::QueueFull`] when the slot is out of bounds or
125    /// not empty.
126    pub fn submit_dma_read(
127        &mut self,
128        queue_slot: u32,
129        src_handle: u32,
130        dst_handle: u32,
131        byte_count: u32,
132        tag: u32,
133    ) -> Result<(), PipelineError> {
134        if queue_slot >= self.slot_count {
135            return Err(PipelineError::QueueFull {
136                queue: "submission",
137                fix: "io_queue slot exceeds MegakernelIoQueue::slot_count; enlarge the queue or submit into a valid slot id",
138            });
139        }
140        let current_status = self.read_word(queue_slot, io_word::STATUS);
141        if current_status != slot::EMPTY {
142            return Err(PipelineError::QueueFull {
143                queue: "submission",
144                fix: "io_queue slot still in flight; wait for completion before submitting a new request",
145            });
146        }
147        self.write_word_unfenced(queue_slot, io_word::OP_TYPE, io_op::READ);
148        self.write_word_unfenced(queue_slot, io_word::SRC_HANDLE, src_handle);
149        self.write_word_unfenced(queue_slot, io_word::DST_HANDLE, dst_handle);
150        self.write_word_unfenced(queue_slot, io_word::OFFSET_LO, 0);
151        self.write_word_unfenced(queue_slot, io_word::OFFSET_HI, 0);
152        self.write_word_unfenced(queue_slot, io_word::BYTE_COUNT, byte_count);
153        self.write_word_unfenced(queue_slot, io_word::TAG, tag);
154        fence(Ordering::Release);
155        self.write_word_unfenced(queue_slot, io_word::STATUS, slot::PUBLISHED);
156        fence(Ordering::Release);
157        Ok(())
158    }
159
160    /// Read the queue slot back as a completion record.
161    #[must_use]
162    pub fn completion(&self, queue_slot: u32) -> Option<IoCompletion> {
163        if queue_slot >= self.slot_count {
164            return None;
165        }
166        let status = self.read_word(queue_slot, io_word::STATUS);
167        if status == slot::EMPTY {
168            return None;
169        }
170        Some(IoCompletion {
171            slot_idx: queue_slot,
172            mapped_slot: self.read_word_unfenced(queue_slot, io_word::DST_HANDLE),
173            byte_count: self.read_word_unfenced(queue_slot, io_word::BYTE_COUNT),
174            tag: self.read_word_unfenced(queue_slot, io_word::TAG),
175        })
176    }
177
178    /// Return true when the GPU has recycled the slot to `EMPTY`.
179    #[must_use]
180    pub fn is_recycled(&self, queue_slot: u32) -> bool {
181        if queue_slot >= self.slot_count {
182            return false;
183        }
184        let status = self.read_word(queue_slot, io_word::STATUS);
185        match status {
186            slot::EMPTY => true,
187            slot::PUBLISHED | slot::CLAIMED | io_status::OK | io_status::ERROR | slot::DONE => {
188                false
189            }
190            _ => false,
191        }
192    }
193
194    fn read_word(&self, slot_idx: u32, word: u32) -> u32 {
195        let idx = queue_word_index(slot_idx, word);
196        fence(Ordering::Acquire);
197        self.words[idx]
198    }
199
200    fn read_word_unfenced(&self, slot_idx: u32, word: u32) -> u32 {
201        let idx = queue_word_index(slot_idx, word);
202        self.words[idx]
203    }
204
205    fn write_word_unfenced(&mut self, slot_idx: u32, word: u32, value: u32) {
206        let idx = queue_word_index(slot_idx, word);
207        self.words[idx] = value;
208    }
209}