1use super::staging_reserve::reserve_vec_capacity as reserve_descriptor_vec;
9use crate::PipelineError;
10
11use smallvec::SmallVec;
12
13const ARGS_PER_SLOT_USIZE: usize = 12;
14
15use super::{protocol, Megakernel};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum BuiltinOpcode {
20 Nop,
22 StoreU32,
24 AtomicAdd,
26 LoadU32,
28 CompareSwap,
30 Memcpy,
32 DfaStep,
34 BatchFence,
36 Printf,
38 Shutdown,
40}
41
42impl BuiltinOpcode {
43 #[must_use]
45 pub const fn into_wire(self) -> u32 {
46 match self {
47 Self::Nop => protocol::opcode::NOP,
48 Self::StoreU32 => protocol::opcode::STORE_U32,
49 Self::AtomicAdd => protocol::opcode::ATOMIC_ADD,
50 Self::LoadU32 => protocol::opcode::LOAD_U32,
51 Self::CompareSwap => protocol::opcode::COMPARE_SWAP,
52 Self::Memcpy => protocol::opcode::MEMCPY,
53 Self::DfaStep => protocol::opcode::DFA_STEP,
54 Self::BatchFence => protocol::opcode::BATCH_FENCE,
55 Self::Printf => protocol::opcode::PRINTF,
56 Self::Shutdown => protocol::opcode::SHUTDOWN,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum SlotOpcode {
65 Builtin(BuiltinOpcode),
67 Custom(u32),
69}
70
71impl SlotOpcode {
72 #[must_use]
74 pub const fn into_wire(self) -> u32 {
75 match self {
76 Self::Builtin(op) => op.into_wire(),
77 Self::Custom(op) => op,
78 }
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq)]
84pub struct PackedOpDescriptor {
85 pub opcode: u8,
87 pub args: Vec<u32>,
89}
90
91impl PackedOpDescriptor {
92 #[must_use]
94 pub fn new(opcode: u8, args: Vec<u32>) -> Self {
95 Self { opcode, args }
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
101pub enum SlotDescriptor {
102 Single {
104 tenant_id: u32,
106 opcode: SlotOpcode,
108 args: Vec<u32>,
110 },
111 Packed {
113 tenant_id: u32,
115 ops: Vec<PackedOpDescriptor>,
117 },
118}
119
120impl SlotDescriptor {
121 #[must_use]
123 pub fn single(tenant_id: u32, opcode: SlotOpcode, args: Vec<u32>) -> Self {
124 Self::Single {
125 tenant_id,
126 opcode,
127 args,
128 }
129 }
130
131 #[must_use]
133 pub fn packed(tenant_id: u32, ops: Vec<PackedOpDescriptor>) -> Self {
134 Self::Packed { tenant_id, ops }
135 }
136
137 pub fn publish_into(&self, ring_bytes: &mut [u8], slot_idx: u32) -> Result<(), PipelineError> {
144 match self {
145 Self::Single {
146 tenant_id,
147 opcode,
148 args,
149 } => {
150 Megakernel::publish_slot(ring_bytes, slot_idx, *tenant_id, opcode.into_wire(), args)
151 }
152 Self::Packed { tenant_id, ops } => {
153 Megakernel::publish_packed_descriptors(ring_bytes, slot_idx, *tenant_id, ops)
154 }
155 }
156 }
157}
158
159#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct BatchDescriptor {
162 pub start_slot: u32,
164 pub items: Vec<SlotDescriptor>,
166}
167
168impl BatchDescriptor {
169 #[must_use]
171 pub fn new(start_slot: u32, items: Vec<SlotDescriptor>) -> Self {
172 Self { start_slot, items }
173 }
174
175 pub fn publish_into(&self, ring_bytes: &mut [u8]) -> Result<u32, PipelineError> {
181 let item_count = u32::try_from(self.items.len()).map_err(|_| PipelineError::QueueFull {
182 queue: "submission",
183 fix: "batch size exceeds u32::MAX slots",
184 })?;
185 if item_count > 0 {
186 self.start_slot
187 .checked_add(item_count - 1)
188 .ok_or(PipelineError::QueueFull {
189 queue: "submission",
190 fix: "batch start plus item count overflows u32; split the descriptor batch before publishing",
191 })?;
192 }
193 for (slot_offset, item) in (0..item_count).zip(self.items.iter()) {
194 let slot_idx = self
195 .start_slot
196 .checked_add(slot_offset)
197 .ok_or(PipelineError::QueueFull {
198 queue: "submission",
199 fix:
200 "batch slot index overflowed u32; split the descriptor batch before publishing",
201 })?;
202 item.publish_into(ring_bytes, slot_idx)?;
203 }
204 Ok(item_count)
205 }
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
210pub enum WindowClass {
211 Required,
213 Lookahead,
215}
216
217impl WindowClass {
218 #[must_use]
220 pub const fn into_wire(self) -> u32 {
221 match self {
222 Self::Required => 0,
223 Self::Lookahead => 1,
224 }
225 }
226}
227
228#[derive(Debug, Clone, PartialEq, Eq)]
234pub struct WindowDescriptor {
235 pub start_slot: u32,
237 pub tenant_id: u32,
239 pub opcode: SlotOpcode,
241 pub ticket: u32,
243 pub required: Vec<Vec<u32>>,
245 pub lookahead: Vec<Vec<u32>>,
247}
248
249impl WindowDescriptor {
250 #[must_use]
252 pub fn new(
253 start_slot: u32,
254 tenant_id: u32,
255 opcode: SlotOpcode,
256 ticket: u32,
257 required: Vec<Vec<u32>>,
258 lookahead: Vec<Vec<u32>>,
259 ) -> Self {
260 Self {
261 start_slot,
262 tenant_id,
263 opcode,
264 ticket,
265 required,
266 lookahead,
267 }
268 }
269
270 #[must_use]
272 pub fn into_batch(&self) -> BatchDescriptor {
273 match self.try_into_batch() {
274 Ok(batch) => batch,
275 Err(_) => BatchDescriptor::new(self.start_slot, Vec::new()),
276 }
277 }
278
279 pub fn try_into_batch(&self) -> Result<BatchDescriptor, PipelineError> {
282 let item_count = self
283 .required
284 .len()
285 .checked_add(self.lookahead.len())
286 .ok_or(PipelineError::QueueFull {
287 queue: "submission",
288 fix:
289 "window item count overflowed usize; split the window before materializing a batch",
290 })?;
291 let mut items = Vec::new();
292 reserve_descriptor_vec(&mut items, item_count, "window batch item")?;
293 for payload in &self.required {
294 let mut args = window_payload_args(self.ticket, WindowClass::Required, payload)?;
295 args.push(self.ticket);
296 args.push(WindowClass::Required.into_wire());
297 args.extend(payload.iter().copied());
298 items.push(SlotDescriptor::single(self.tenant_id, self.opcode, args));
299 }
300 for payload in &self.lookahead {
301 let mut args = window_payload_args(self.ticket, WindowClass::Lookahead, payload)?;
302 args.push(self.ticket);
303 args.push(WindowClass::Lookahead.into_wire());
304 args.extend(payload.iter().copied());
305 items.push(SlotDescriptor::single(self.tenant_id, self.opcode, args));
306 }
307 Ok(BatchDescriptor::new(self.start_slot, items))
308 }
309
310 pub fn publish_into(&self, ring_bytes: &mut [u8]) -> Result<u32, PipelineError> {
312 let consumed = self
313 .required
314 .len()
315 .checked_add(self.lookahead.len())
316 .ok_or(PipelineError::QueueFull {
317 queue: "submission",
318 fix: "window item count overflowed usize; split the window before publishing",
319 })?;
320 let consumed_u32 = u32::try_from(consumed).map_err(|_| PipelineError::QueueFull {
321 queue: "submission",
322 fix: "window size exceeds u32::MAX slots; split the window before publishing",
323 })?;
324 if consumed_u32 == 0 {
325 return Ok(0);
326 }
327 self.start_slot
328 .checked_add(consumed_u32 - 1)
329 .ok_or(PipelineError::QueueFull {
330 queue: "submission",
331 fix: "window start plus item count overflows u32; split the window before publishing",
332 })?;
333
334 let mut slot_offset = 0u32;
335 let mut args = SmallVec::<[u32; ARGS_PER_SLOT_USIZE]>::new();
336 for payload in &self.required {
337 publish_window_payload(
338 ring_bytes,
339 self.start_slot,
340 &mut slot_offset,
341 self.tenant_id,
342 self.opcode,
343 self.ticket,
344 WindowClass::Required,
345 payload,
346 &mut args,
347 )?;
348 }
349 for payload in &self.lookahead {
350 publish_window_payload(
351 ring_bytes,
352 self.start_slot,
353 &mut slot_offset,
354 self.tenant_id,
355 self.opcode,
356 self.ticket,
357 WindowClass::Lookahead,
358 payload,
359 &mut args,
360 )?;
361 }
362 Ok(slot_offset)
363 }
364}
365
366fn window_payload_args(
367 _ticket: u32,
368 _class: WindowClass,
369 payload: &[u32],
370) -> Result<Vec<u32>, PipelineError> {
371 let required_args = payload
372 .len()
373 .checked_add(2)
374 .ok_or(PipelineError::QueueFull {
375 queue: "submission",
376 fix: "window payload argument count overflowed usize; split the payload before materializing a batch",
377 })?;
378 if required_args > ARGS_PER_SLOT_USIZE {
379 return Err(PipelineError::QueueFull {
380 queue: "submission",
381 fix: "too many args for one window payload; ticket plus class plus payload must fit in 12 u32 args",
382 });
383 }
384 let mut args = Vec::new();
385 reserve_descriptor_vec(&mut args, required_args, "window payload arg")?;
386 Ok(args)
387}
388
389fn publish_window_payload(
390 ring_bytes: &mut [u8],
391 start_slot: u32,
392 slot_offset: &mut u32,
393 tenant_id: u32,
394 opcode: SlotOpcode,
395 ticket: u32,
396 class: WindowClass,
397 payload: &[u32],
398 args: &mut SmallVec<[u32; ARGS_PER_SLOT_USIZE]>,
399) -> Result<(), PipelineError> {
400 let slot_idx = start_slot
401 .checked_add(*slot_offset)
402 .ok_or(PipelineError::QueueFull {
403 queue: "submission",
404 fix: "window slot index overflowed u32; split the window before publishing",
405 })?;
406 args.clear();
407 let required_args = payload
408 .len()
409 .checked_add(2)
410 .ok_or(PipelineError::QueueFull {
411 queue: "submission",
412 fix: "window payload argument count overflowed usize; split the payload before publishing",
413 })?;
414 if required_args > ARGS_PER_SLOT_USIZE {
415 return Err(PipelineError::QueueFull {
416 queue: "submission",
417 fix: "too many args for one window payload; ticket plus class plus payload must fit in 12 u32 args",
418 });
419 }
420 args.push(ticket);
421 args.push(class.into_wire());
422 args.extend_from_slice(payload);
423 Megakernel::publish_slot(ring_bytes, slot_idx, tenant_id, opcode.into_wire(), args)?;
424 *slot_offset = slot_offset.checked_add(1).ok_or(PipelineError::QueueFull {
425 queue: "submission",
426 fix: "window slot count overflowed u32; split the window before publishing",
427 })?;
428 Ok(())
429}
430
431#[cfg(test)]
432mod tests;