Skip to main content

vyre_runtime/megakernel/
task.rs

1//! Resident task queue ABI for pause, resume, requeue, and priority aging.
2
3use vyre_driver::backend::BackendError;
4
5use super::planner::MegakernelWorkItem;
6use super::policy::MegakernelLaunchRequest;
7
8/// Number of `u32` words in one continuation task slot.
9pub const TASK_SLOT_WORDS: usize = 16;
10
11/// Number of bytes in one continuation task slot.
12pub const TASK_SLOT_BYTES: usize = TASK_SLOT_WORDS * core::mem::size_of::<u32>();
13
14/// Lowest flag bit set when a task voluntarily paused at a continuation point.
15pub const TASK_FLAG_PAUSED: u32 = 1 << 0;
16
17/// Flag bit set when a task yielded so another task can run on the same worker.
18pub const TASK_FLAG_YIELDED: u32 = 1 << 1;
19
20/// Flag bit set when a task asked the scheduler to publish it again.
21pub const TASK_FLAG_REQUEUE_REQUESTED: u32 = 1 << 2;
22
23/// Flag bit set when a paused task is eligible to resume.
24pub const TASK_FLAG_RESUME_READY: u32 = 1 << 3;
25
26/// GPU-visible lifecycle state for one continuation task slot.
27#[repr(u32)]
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum TaskState {
30    /// Slot is empty and may be reused.
31    Empty = 0,
32    /// Slot is published and may be claimed by a GPU worker.
33    Ready = 1,
34    /// Slot is currently owned by a GPU worker.
35    Running = 2,
36    /// Slot finished successfully.
37    Done = 3,
38    /// Slot is paused until an external device-visible condition is met.
39    Paused = 4,
40    /// Slot yielded voluntarily and should remain schedulable.
41    Yielded = 5,
42    /// Slot should be placed back into its priority partition.
43    Requeued = 6,
44    /// Slot faulted and must not be executed again without repair.
45    Faulted = 7,
46}
47
48impl TaskState {
49    /// Decode a raw ABI word into a task state.
50    #[must_use]
51    pub const fn from_word(word: u32) -> Option<Self> {
52        match word {
53            0 => Some(Self::Empty),
54            1 => Some(Self::Ready),
55            2 => Some(Self::Running),
56            3 => Some(Self::Done),
57            4 => Some(Self::Paused),
58            5 => Some(Self::Yielded),
59            6 => Some(Self::Requeued),
60            7 => Some(Self::Faulted),
61            _ => None,
62        }
63    }
64
65    /// Encode this state as the raw ABI word written by the GPU scheduler.
66    #[must_use]
67    pub const fn word(self) -> u32 {
68        self as u32
69    }
70
71    /// Return true when this state is eligible for GPU scheduling.
72    #[must_use]
73    pub const fn is_schedulable(self) -> bool {
74        matches!(self, Self::Ready | Self::Yielded | Self::Requeued)
75    }
76}
77
78/// Priority partition for a continuation task slot.
79#[repr(u32)]
80#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
81pub enum TaskPriority {
82    /// Highest priority partition for latency-critical work.
83    Critical = 0,
84    /// High priority partition for urgent work.
85    High = 1,
86    /// Default priority partition.
87    #[default]
88    Normal = 2,
89    /// Low priority partition for background work.
90    Low = 3,
91    /// Idle partition processed only when higher priorities are empty.
92    Idle = 4,
93}
94
95impl TaskPriority {
96    /// Decode a raw ABI word into a task priority.
97    #[must_use]
98    pub const fn from_word(word: u32) -> Option<Self> {
99        match word {
100            0 => Some(Self::Critical),
101            1 => Some(Self::High),
102            2 => Some(Self::Normal),
103            3 => Some(Self::Low),
104            4 => Some(Self::Idle),
105            _ => None,
106        }
107    }
108
109    /// Encode this priority as the raw ABI word used by the priority scheduler.
110    #[must_use]
111    pub const fn word(self) -> u32 {
112        self as u32
113    }
114}
115
116/// One device-visible continuation task slot.
117///
118/// The first four words match the persistent ring header:
119/// status, opcode, tenant, priority. The remaining twelve words are the slot
120/// payload. Words 4..6 preserve the legacy [`MegakernelWorkItem`] payload; words 7..15
121/// carry continuation and scheduler state.
122#[repr(C)]
123#[derive(Debug, Clone, Copy, PartialEq, Eq, bytemuck::Pod, bytemuck::Zeroable)]
124pub struct TaskWorkItem {
125    /// Raw [`TaskState`] word.
126    pub state: u32,
127    /// Stable op id index into the dialect registry.
128    pub op_handle: u32,
129    /// Tenant id checked by the runtime scheduler.
130    pub tenant_id: u32,
131    /// Raw [`TaskPriority`] word.
132    pub priority: u32,
133    /// Input-buffer handle.
134    pub input_handle: u32,
135    /// Output-buffer handle.
136    pub output_handle: u32,
137    /// Per-item parameter word.
138    pub param: u32,
139    /// Program counter or block id where the worker should resume.
140    pub continuation_pc: u32,
141    /// Opaque continuation-local scratch word.
142    pub continuation_data: u32,
143    /// Device-visible epoch at which the task may resume.
144    pub resume_epoch: u32,
145    /// Stable task id used to join yielded/requeued continuations.
146    pub task_id: u32,
147    /// Parent task id for split or fan-out work; zero when absent.
148    pub parent_task_id: u32,
149    /// Scheduler age ticks accumulated while waiting.
150    pub age_ticks: u32,
151    /// Number of times this task has been requeued.
152    pub requeue_count: u32,
153    /// Number of times this task has yielded.
154    pub yield_count: u32,
155    /// Bitset of `TASK_FLAG_*` continuation flags.
156    pub flags: u32,
157}
158
159impl TaskWorkItem {
160    /// Construct a ready continuation task from the compact legacy work item.
161    #[must_use]
162    pub const fn from_work_item(
163        task_id: u32,
164        tenant_id: u32,
165        priority: TaskPriority,
166        item: MegakernelWorkItem,
167    ) -> Self {
168        Self {
169            state: TaskState::Ready.word(),
170            op_handle: item.op_handle,
171            tenant_id,
172            priority: priority.word(),
173            input_handle: item.input_handle,
174            output_handle: item.output_handle,
175            param: item.param,
176            continuation_pc: 0,
177            continuation_data: 0,
178            resume_epoch: 0,
179            task_id,
180            parent_task_id: 0,
181            age_ticks: 0,
182            requeue_count: 0,
183            yield_count: 0,
184            flags: 0,
185        }
186    }
187
188    /// Return the compact legacy work item payload carried by this task.
189    #[must_use]
190    pub const fn work_item(&self) -> MegakernelWorkItem {
191        MegakernelWorkItem {
192            op_handle: self.op_handle,
193            input_handle: self.input_handle,
194            output_handle: self.output_handle,
195            param: self.param,
196        }
197    }
198
199    /// Decode the task state word.
200    #[must_use]
201    pub const fn task_state(&self) -> Option<TaskState> {
202        TaskState::from_word(self.state)
203    }
204
205    /// Decode the task priority word.
206    #[must_use]
207    pub const fn task_priority(&self) -> Option<TaskPriority> {
208        TaskPriority::from_word(self.priority)
209    }
210
211    /// Return true when the task is eligible to be claimed by a worker.
212    #[must_use]
213    pub const fn is_schedulable(&self) -> bool {
214        match self.task_state() {
215            Some(state) => state.is_schedulable(),
216            None => false,
217        }
218    }
219
220    /// Encode a pause at `continuation_pc` until `resume_epoch`.
221    #[must_use]
222    pub const fn paused(
223        mut self,
224        continuation_pc: u32,
225        continuation_data: u32,
226        resume_epoch: u32,
227    ) -> Self {
228        self.state = TaskState::Paused.word();
229        self.continuation_pc = continuation_pc;
230        self.continuation_data = continuation_data;
231        self.resume_epoch = resume_epoch;
232        self.flags = (self.flags | TASK_FLAG_PAUSED) & !TASK_FLAG_RESUME_READY;
233        self
234    }
235
236    /// Mark a paused task ready for GPU-side resume.
237    #[must_use]
238    pub const fn resumed(mut self) -> Self {
239        self.state = TaskState::Ready.word();
240        self.flags =
241            (self.flags | TASK_FLAG_RESUME_READY) & !(TASK_FLAG_PAUSED | TASK_FLAG_YIELDED);
242        self
243    }
244
245    /// Yield this task back to the scheduler at `continuation_pc`.
246    #[must_use]
247    pub const fn yielded(mut self, continuation_pc: u32, continuation_data: u32) -> Self {
248        self.state = TaskState::Yielded.word();
249        self.continuation_pc = continuation_pc;
250        self.continuation_data = continuation_data;
251        self.yield_count = match self.yield_count.checked_add(1) {
252            Some(value) => value,
253            None => panic!("megakernel task yield_count overflowed u32"),
254        };
255        self.flags |= TASK_FLAG_YIELDED;
256        self
257    }
258
259    /// Requeue this task, optionally changing its priority partition.
260    #[must_use]
261    pub const fn requeued(
262        mut self,
263        continuation_pc: u32,
264        continuation_data: u32,
265        priority: TaskPriority,
266    ) -> Self {
267        self.state = TaskState::Requeued.word();
268        self.priority = priority.word();
269        self.continuation_pc = continuation_pc;
270        self.continuation_data = continuation_data;
271        self.requeue_count = match self.requeue_count.checked_add(1) {
272            Some(value) => value,
273            None => panic!("megakernel task requeue_count overflowed u32"),
274        };
275        self.age_ticks = match self.age_ticks.checked_add(1) {
276            Some(value) => value,
277            None => panic!("megakernel task age_ticks overflowed u32"),
278        };
279        self.flags |= TASK_FLAG_REQUEUE_REQUESTED;
280        self
281    }
282
283    /// Mark this task completed.
284    #[must_use]
285    pub const fn completed(mut self) -> Self {
286        self.state = TaskState::Done.word();
287        self.flags = 0;
288        self
289    }
290
291    /// Mark this task faulted with a compact fault code.
292    #[must_use]
293    pub const fn faulted(mut self, fault_code: u32) -> Self {
294        self.state = TaskState::Faulted.word();
295        self.continuation_data = fault_code;
296        self
297    }
298}
299
300/// Queue telemetry derived from device-visible continuation task slots.
301#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
302pub struct TaskQueueSnapshot {
303    /// Count of ready slots.
304    pub ready_count: u32,
305    /// Count of paused slots.
306    pub paused_count: u32,
307    /// Count of yielded slots.
308    pub yielded_count: u32,
309    /// Count of requeued slots.
310    pub requeued_count: u32,
311    /// Count of running slots.
312    pub running_count: u32,
313    /// Count of faulted slots.
314    pub faulted_count: u32,
315    /// Sum of per-slot requeue counters.
316    pub total_requeues: u64,
317    /// Maximum priority-aging tick observed in the queue.
318    pub max_priority_age: u32,
319}
320
321impl TaskQueueSnapshot {
322    /// Build a queue snapshot from continuation task slots.
323    ///
324    /// # Errors
325    ///
326    /// Returns [`BackendError`] when the slice contains an unknown task state or
327    /// a count cannot fit the u32 ABI.
328    pub fn from_tasks(tasks: &[TaskWorkItem]) -> Result<Self, BackendError> {
329        let mut snapshot = Self::default();
330        for task in tasks {
331            snapshot.max_priority_age = snapshot.max_priority_age.max(task.age_ticks);
332            snapshot.total_requeues = snapshot
333                .total_requeues
334                .checked_add(u64::from(task.requeue_count))
335                .ok_or_else(|| {
336                    BackendError::new(
337                        "megakernel task total_requeues overflowed u64. Fix: drain or shard the task ring before launch.",
338                    )
339                })?;
340            match task.task_state() {
341                Some(TaskState::Empty | TaskState::Done) => {}
342                Some(TaskState::Ready) => checked_increment(&mut snapshot.ready_count)?,
343                Some(TaskState::Paused) => checked_increment(&mut snapshot.paused_count)?,
344                Some(TaskState::Yielded) => checked_increment(&mut snapshot.yielded_count)?,
345                Some(TaskState::Requeued) => checked_increment(&mut snapshot.requeued_count)?,
346                Some(TaskState::Running) => checked_increment(&mut snapshot.running_count)?,
347                Some(TaskState::Faulted) => checked_increment(&mut snapshot.faulted_count)?,
348                None => {
349                    return Err(BackendError::new(format!(
350                        "megakernel task slot has unknown state word {}. Fix: write a valid TaskState ABI word before publishing the slot.",
351                        task.state
352                    )));
353                }
354            }
355        }
356        Ok(snapshot)
357    }
358
359    /// Number of slots immediately eligible for GPU scheduling.
360    #[must_use]
361    pub fn schedulable_count(&self) -> u32 {
362        match self.try_schedulable_count() {
363            Ok(value) => value,
364            Err(error) => panic!("{error}"),
365        }
366    }
367
368    /// Checked number of slots immediately eligible for GPU scheduling.
369    ///
370    /// # Errors
371    ///
372    /// Returns [`BackendError`] when the summed schedulable count exceeds the
373    /// u32 launch ABI.
374    pub fn try_schedulable_count(&self) -> Result<u32, BackendError> {
375        self.ready_count
376            .checked_add(self.yielded_count)
377            .and_then(|value| value.checked_add(self.requeued_count))
378            .ok_or_else(|| {
379                BackendError::new(
380                    "megakernel schedulable task count overflowed u32. Fix: shard the task ring before launch.",
381                )
382            })
383    }
384
385    /// Number of slots carrying continuation pressure.
386    #[must_use]
387    pub fn continuation_pressure_count(&self) -> u64 {
388        match self.try_continuation_pressure_count() {
389            Ok(value) => value,
390            Err(error) => panic!("{error}"),
391        }
392    }
393
394    /// Checked number of slots carrying continuation pressure.
395    ///
396    /// # Errors
397    ///
398    /// Returns [`BackendError`] when continuation pressure exceeds u64.
399    pub fn try_continuation_pressure_count(&self) -> Result<u64, BackendError> {
400        u64::from(self.yielded_count)
401            .checked_add(u64::from(self.requeued_count))
402            .and_then(|value| value.checked_add(self.total_requeues))
403            .ok_or_else(|| {
404                BackendError::new(
405                    "megakernel continuation pressure overflowed u64. Fix: drain or shard the task ring before launch.",
406                )
407            })
408    }
409
410    /// Build a Program that runs a one-shot persistent fixpoint over
411    /// the queue snapshot's state-counter buffer, converging the
412    /// counts to a stable equilibrium representing the queue's
413    /// long-run distribution. Wires the self-substrate persistent
414    /// fixpoint builder for observability collectors that want stable
415    /// signals over transient queue jitter.
416    ///
417    /// `current_buffer` / `next_buffer` / `changed_buffer` are
418    /// caller-supplied buffer names for the persistent_fixpoint
419    /// ping-pong; `transfer_body` is the per-iteration body that
420    /// reads `current` and writes `next`. Returns a Program suitable
421    /// for one dispatch.
422    ///
423    /// P-RUNTIME-4: replaces a host-side jitter-smoothing loop with
424    /// a single GPU-side fixpoint dispatch.
425    #[must_use]
426    #[cfg(feature = "self-substrate-adapters")]
427    pub fn build_state_convergence_program(
428        transfer_body: Vec<vyre_foundation::ir::Node>,
429        current_buffer: &str,
430        next_buffer: &str,
431        changed_buffer: &str,
432        words: u32,
433        max_iterations: u32,
434    ) -> vyre_foundation::ir::Program {
435        vyre_self_substrate::persistent_fixpoint_program::persistent_fixpoint_program(
436            transfer_body,
437            current_buffer,
438            next_buffer,
439            changed_buffer,
440            words,
441            max_iterations,
442        )
443    }
444
445    /// Merge this queue telemetry into a launch request.
446    #[must_use]
447    pub fn apply_to_launch_request(
448        &self,
449        mut request: MegakernelLaunchRequest,
450    ) -> MegakernelLaunchRequest {
451        request = match self.try_apply_to_launch_request(request) {
452            Ok(request) => request,
453            Err(error) => panic!("{error}"),
454        };
455        request
456    }
457
458    /// Checked merge of queue telemetry into a launch request.
459    ///
460    /// # Errors
461    ///
462    /// Returns [`BackendError`] when schedulable count or continuation pressure
463    /// cannot fit the launch request ABI.
464    pub fn try_apply_to_launch_request(
465        &self,
466        mut request: MegakernelLaunchRequest,
467    ) -> Result<MegakernelLaunchRequest, BackendError> {
468        request.queue_len = self.try_schedulable_count()?;
469        request.requeue_count = request
470            .requeue_count
471            .checked_add(self.try_continuation_pressure_count()?)
472            .ok_or_else(|| {
473                BackendError::new(
474                    "megakernel launch request requeue_count overflowed u64. Fix: drain or shard the task ring before launch.",
475                )
476            })?;
477        request.max_priority_age = request.max_priority_age.max(self.max_priority_age);
478        Ok(request)
479    }
480}
481
482fn checked_increment(counter: &mut u32) -> Result<(), BackendError> {
483    *counter = counter.checked_add(1).ok_or_else(|| {
484        BackendError::new(
485            "megakernel task queue count exceeds u32::MAX. Fix: shard the task ring before launch.",
486        )
487    })?;
488    Ok(())
489}
490
491#[cfg(test)]
492mod tests;