Skip to main content

vyre_runtime/megakernel/
task.rs

1//! Resident task queue ABI for pause, resume, requeue, and priority aging.
2
3use vyre_driver::backend::BackendError;
4
5use super::planner::MegakernelWorkItem;
6use super::policy::MegakernelLaunchRequest;
7
8/// Number of `u32` words in one continuation task slot.
9pub const TASK_SLOT_WORDS: usize = 16;
10
11/// Number of bytes in one continuation task slot.
12pub const TASK_SLOT_BYTES: usize = TASK_SLOT_WORDS * core::mem::size_of::<u32>();
13
14/// Lowest flag bit set when a task voluntarily paused at a continuation point.
15pub const TASK_FLAG_PAUSED: u32 = 1 << 0;
16
17/// Flag bit set when a task yielded so another task can run on the same worker.
18pub const TASK_FLAG_YIELDED: u32 = 1 << 1;
19
20/// Flag bit set when a task asked the scheduler to publish it again.
21pub const TASK_FLAG_REQUEUE_REQUESTED: u32 = 1 << 2;
22
23/// Flag bit set when a paused task is eligible to resume.
24pub const TASK_FLAG_RESUME_READY: u32 = 1 << 3;
25
26/// GPU-visible lifecycle state for one continuation task slot.
27#[repr(u32)]
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum TaskState {
30    /// Slot is empty and may be reused.
31    Empty = 0,
32    /// Slot is published and may be claimed by a GPU worker.
33    Ready = 1,
34    /// Slot is currently owned by a GPU worker.
35    Running = 2,
36    /// Slot finished successfully.
37    Done = 3,
38    /// Slot is paused until an external device-visible condition is met.
39    Paused = 4,
40    /// Slot yielded voluntarily and should remain schedulable.
41    Yielded = 5,
42    /// Slot should be placed back into its priority partition.
43    Requeued = 6,
44    /// Slot faulted and must not be executed again without repair.
45    Faulted = 7,
46}
47
48impl TaskState {
49    /// Decode a raw ABI word into a task state.
50    #[must_use]
51    pub const fn from_word(word: u32) -> Option<Self> {
52        match word {
53            0 => Some(Self::Empty),
54            1 => Some(Self::Ready),
55            2 => Some(Self::Running),
56            3 => Some(Self::Done),
57            4 => Some(Self::Paused),
58            5 => Some(Self::Yielded),
59            6 => Some(Self::Requeued),
60            7 => Some(Self::Faulted),
61            _ => None,
62        }
63    }
64
65    /// Encode this state as the raw ABI word written by the GPU scheduler.
66    #[must_use]
67    pub const fn word(self) -> u32 {
68        self as u32
69    }
70
71    /// Return true when this state is eligible for GPU scheduling.
72    #[must_use]
73    pub const fn is_schedulable(self) -> bool {
74        matches!(self, Self::Ready | Self::Yielded | Self::Requeued)
75    }
76}
77
78/// Priority partition for a continuation task slot.
79#[repr(u32)]
80#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
81pub enum TaskPriority {
82    /// Highest priority partition for latency-critical work.
83    Critical = 0,
84    /// High priority partition for urgent work.
85    High = 1,
86    /// Default priority partition.
87    #[default]
88    Normal = 2,
89    /// Low priority partition for background work.
90    Low = 3,
91    /// Idle partition processed only when higher priorities are empty.
92    Idle = 4,
93}
94
95impl TaskPriority {
96    /// Decode a raw ABI word into a task priority.
97    #[must_use]
98    pub const fn from_word(word: u32) -> Option<Self> {
99        match word {
100            0 => Some(Self::Critical),
101            1 => Some(Self::High),
102            2 => Some(Self::Normal),
103            3 => Some(Self::Low),
104            4 => Some(Self::Idle),
105            _ => None,
106        }
107    }
108
109    /// Encode this priority as the raw ABI word used by the priority scheduler.
110    #[must_use]
111    pub const fn word(self) -> u32 {
112        self as u32
113    }
114}
115
116/// One device-visible continuation task slot.
117///
118/// The first four words match the persistent ring header:
119/// status, opcode, tenant, priority. The remaining twelve words are the slot
120/// payload. Words 4..6 preserve the legacy [`MegakernelWorkItem`] payload; words 7..15
121/// carry continuation and scheduler state.
122#[repr(C)]
123#[derive(Debug, Clone, Copy, PartialEq, Eq, bytemuck::Pod, bytemuck::Zeroable)]
124pub struct TaskWorkItem {
125    /// Raw [`TaskState`] word.
126    pub state: u32,
127    /// Stable op id index into the dialect registry.
128    pub op_handle: u32,
129    /// Tenant id checked by the runtime scheduler.
130    pub tenant_id: u32,
131    /// Raw [`TaskPriority`] word.
132    pub priority: u32,
133    /// Input-buffer handle.
134    pub input_handle: u32,
135    /// Output-buffer handle.
136    pub output_handle: u32,
137    /// Per-item parameter word.
138    pub param: u32,
139    /// Program counter or block id where the worker should resume.
140    pub continuation_pc: u32,
141    /// Opaque continuation-local scratch word.
142    pub continuation_data: u32,
143    /// Device-visible epoch at which the task may resume.
144    pub resume_epoch: u32,
145    /// Stable task id used to join yielded/requeued continuations.
146    pub task_id: u32,
147    /// Parent task id for split or fan-out work; zero when absent.
148    pub parent_task_id: u32,
149    /// Scheduler age ticks accumulated while waiting.
150    pub age_ticks: u32,
151    /// Number of times this task has been requeued.
152    pub requeue_count: u32,
153    /// Number of times this task has yielded.
154    pub yield_count: u32,
155    /// Bitset of `TASK_FLAG_*` continuation flags.
156    pub flags: u32,
157}
158
159impl TaskWorkItem {
160    /// Construct a ready continuation task from the compact legacy work item.
161    #[must_use]
162    pub const fn from_work_item(
163        task_id: u32,
164        tenant_id: u32,
165        priority: TaskPriority,
166        item: MegakernelWorkItem,
167    ) -> Self {
168        Self {
169            state: TaskState::Ready.word(),
170            op_handle: item.op_handle,
171            tenant_id,
172            priority: priority.word(),
173            input_handle: item.input_handle,
174            output_handle: item.output_handle,
175            param: item.param,
176            continuation_pc: 0,
177            continuation_data: 0,
178            resume_epoch: 0,
179            task_id,
180            parent_task_id: 0,
181            age_ticks: 0,
182            requeue_count: 0,
183            yield_count: 0,
184            flags: 0,
185        }
186    }
187
188    /// Return the compact legacy work item payload carried by this task.
189    #[must_use]
190    pub const fn work_item(&self) -> MegakernelWorkItem {
191        MegakernelWorkItem {
192            op_handle: self.op_handle,
193            input_handle: self.input_handle,
194            output_handle: self.output_handle,
195            param: self.param,
196        }
197    }
198
199    /// Decode the task state word.
200    #[must_use]
201    pub const fn task_state(&self) -> Option<TaskState> {
202        TaskState::from_word(self.state)
203    }
204
205    /// Decode the task priority word.
206    #[must_use]
207    pub const fn task_priority(&self) -> Option<TaskPriority> {
208        TaskPriority::from_word(self.priority)
209    }
210
211    /// Return true when the task is eligible to be claimed by a worker.
212    #[must_use]
213    pub const fn is_schedulable(&self) -> bool {
214        match self.task_state() {
215            Some(state) => state.is_schedulable(),
216            None => false,
217        }
218    }
219
220    /// Encode a pause at `continuation_pc` until `resume_epoch`.
221    #[must_use]
222    pub fn try_paused(
223        mut self,
224        continuation_pc: u32,
225        continuation_data: u32,
226        resume_epoch: u32,
227    ) -> Result<Self, BackendError> {
228        self.ensure_transitionable("pause")?;
229        self.state = TaskState::Paused.word();
230        self.continuation_pc = continuation_pc;
231        self.continuation_data = continuation_data;
232        self.resume_epoch = resume_epoch;
233        self.flags = (self.flags | TASK_FLAG_PAUSED) & !TASK_FLAG_RESUME_READY;
234        Ok(self)
235    }
236
237    /// Encode a pause at `continuation_pc` until `resume_epoch`.
238    #[must_use]
239    #[cfg(any(test, feature = "legacy-infallible"))]
240    pub fn paused(self, continuation_pc: u32, continuation_data: u32, resume_epoch: u32) -> Self {
241        self.try_paused(continuation_pc, continuation_data, resume_epoch)
242            .unwrap_or_else(|error| panic!("{error}"))
243    }
244
245    /// Mark a paused task ready for GPU-side resume.
246    #[must_use]
247    pub fn try_resumed(mut self) -> Result<Self, BackendError> {
248        if self.task_state() != Some(TaskState::Paused) {
249            return Err(invalid_task_transition("resume", self.state));
250        }
251        self.state = TaskState::Ready.word();
252        self.flags =
253            (self.flags | TASK_FLAG_RESUME_READY) & !(TASK_FLAG_PAUSED | TASK_FLAG_YIELDED);
254        Ok(self)
255    }
256
257    /// Mark a paused task ready for GPU-side resume.
258    #[must_use]
259    #[cfg(any(test, feature = "legacy-infallible"))]
260    pub fn resumed(self) -> Self {
261        self.try_resumed().unwrap_or_else(|error| panic!("{error}"))
262    }
263
264    /// Yield this task back to the scheduler at `continuation_pc`.
265    #[must_use]
266    pub fn try_yielded(
267        mut self,
268        continuation_pc: u32,
269        continuation_data: u32,
270    ) -> Result<Self, BackendError> {
271        self.ensure_transitionable("yield")?;
272        self.state = TaskState::Yielded.word();
273        self.continuation_pc = continuation_pc;
274        self.continuation_data = continuation_data;
275        self.yield_count = checked_task_counter_increment(self.yield_count, "yield_count")?;
276        self.flags |= TASK_FLAG_YIELDED;
277        Ok(self)
278    }
279
280    /// Yield this task back to the scheduler at `continuation_pc`.
281    #[must_use]
282    #[cfg(any(test, feature = "legacy-infallible"))]
283    pub fn yielded(self, continuation_pc: u32, continuation_data: u32) -> Self {
284        self.try_yielded(continuation_pc, continuation_data)
285            .unwrap_or_else(|error| panic!("{error}"))
286    }
287
288    /// Requeue this task, optionally changing its priority partition.
289    #[must_use]
290    pub fn try_requeued(
291        mut self,
292        continuation_pc: u32,
293        continuation_data: u32,
294        priority: TaskPriority,
295    ) -> Result<Self, BackendError> {
296        self.ensure_transitionable("requeue")?;
297        self.state = TaskState::Requeued.word();
298        self.priority = priority.word();
299        self.continuation_pc = continuation_pc;
300        self.continuation_data = continuation_data;
301        self.requeue_count = checked_task_counter_increment(self.requeue_count, "requeue_count")?;
302        self.age_ticks = checked_task_counter_increment(self.age_ticks, "age_ticks")?;
303        self.flags |= TASK_FLAG_REQUEUE_REQUESTED;
304        Ok(self)
305    }
306
307    /// Requeue this task, optionally changing its priority partition.
308    #[must_use]
309    #[cfg(any(test, feature = "legacy-infallible"))]
310    pub fn requeued(
311        self,
312        continuation_pc: u32,
313        continuation_data: u32,
314        priority: TaskPriority,
315    ) -> Self {
316        self.try_requeued(continuation_pc, continuation_data, priority)
317            .unwrap_or_else(|error| panic!("{error}"))
318    }
319
320    /// Mark this task completed.
321    #[must_use]
322    pub fn try_completed(mut self) -> Result<Self, BackendError> {
323        self.ensure_transitionable("complete")?;
324        self.state = TaskState::Done.word();
325        self.flags = 0;
326        Ok(self)
327    }
328
329    /// Mark this task completed.
330    #[must_use]
331    #[cfg(any(test, feature = "legacy-infallible"))]
332    pub fn completed(self) -> Self {
333        self.try_completed()
334            .unwrap_or_else(|error| panic!("{error}"))
335    }
336
337    /// Mark this task faulted with a compact fault code.
338    #[must_use]
339    pub fn try_faulted(mut self, fault_code: u32) -> Result<Self, BackendError> {
340        self.ensure_transitionable("fault")?;
341        self.state = TaskState::Faulted.word();
342        self.continuation_data = fault_code;
343        Ok(self)
344    }
345
346    /// Mark this task faulted with a compact fault code.
347    #[must_use]
348    #[cfg(any(test, feature = "legacy-infallible"))]
349    pub fn faulted(self, fault_code: u32) -> Self {
350        self.try_faulted(fault_code)
351            .unwrap_or_else(|error| panic!("{error}"))
352    }
353
354    fn ensure_transitionable(&self, action: &'static str) -> Result<(), BackendError> {
355        match self.task_state() {
356            Some(TaskState::Empty | TaskState::Done | TaskState::Faulted) | None => {
357                Err(invalid_task_transition(action, self.state))
358            }
359            Some(
360                TaskState::Ready
361                | TaskState::Running
362                | TaskState::Paused
363                | TaskState::Yielded
364                | TaskState::Requeued,
365            ) => Ok(()),
366        }
367    }
368}
369
370/// Queue telemetry derived from device-visible continuation task slots.
371#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
372pub struct TaskQueueSnapshot {
373    /// Count of ready slots.
374    pub ready_count: u32,
375    /// Count of paused slots.
376    pub paused_count: u32,
377    /// Count of yielded slots.
378    pub yielded_count: u32,
379    /// Count of requeued slots.
380    pub requeued_count: u32,
381    /// Count of running slots.
382    pub running_count: u32,
383    /// Count of faulted slots.
384    pub faulted_count: u32,
385    /// Sum of per-slot requeue counters.
386    pub total_requeues: u64,
387    /// Maximum priority-aging tick observed in the queue.
388    pub max_priority_age: u32,
389}
390
391impl TaskQueueSnapshot {
392    /// Build a queue snapshot from continuation task slots.
393    ///
394    /// # Errors
395    ///
396    /// Returns [`BackendError`] when the slice contains an unknown task state or
397    /// a count cannot fit the u32 ABI.
398    pub fn from_tasks(tasks: &[TaskWorkItem]) -> Result<Self, BackendError> {
399        let mut snapshot = Self::default();
400        for task in tasks {
401            snapshot.max_priority_age = snapshot.max_priority_age.max(task.age_ticks);
402            snapshot.total_requeues = snapshot
403                .total_requeues
404                .checked_add(u64::from(task.requeue_count))
405                .ok_or_else(|| {
406                    BackendError::new(
407                        "megakernel task total_requeues overflowed u64. Fix: drain or shard the task ring before launch.",
408                    )
409                })?;
410            match task.task_state() {
411                Some(TaskState::Empty | TaskState::Done) => {}
412                Some(TaskState::Ready) => checked_increment(&mut snapshot.ready_count)?,
413                Some(TaskState::Paused) => checked_increment(&mut snapshot.paused_count)?,
414                Some(TaskState::Yielded) => checked_increment(&mut snapshot.yielded_count)?,
415                Some(TaskState::Requeued) => checked_increment(&mut snapshot.requeued_count)?,
416                Some(TaskState::Running) => checked_increment(&mut snapshot.running_count)?,
417                Some(TaskState::Faulted) => checked_increment(&mut snapshot.faulted_count)?,
418                None => {
419                    return Err(BackendError::new(format!(
420                        "megakernel task slot has unknown state word {}. Fix: write a valid TaskState ABI word before publishing the slot.",
421                        task.state
422                    )));
423                }
424            }
425        }
426        Ok(snapshot)
427    }
428
429    /// Number of slots immediately eligible for GPU scheduling.
430    #[must_use]
431    #[cfg(any(test, feature = "legacy-infallible"))]
432    pub fn schedulable_count(&self) -> u32 {
433        match self.try_schedulable_count() {
434            Ok(value) => value,
435            Err(error) => panic!("{error}"),
436        }
437    }
438
439    /// Checked number of slots immediately eligible for GPU scheduling.
440    ///
441    /// # Errors
442    ///
443    /// Returns [`BackendError`] when the summed schedulable count exceeds the
444    /// u32 launch ABI.
445    pub fn try_schedulable_count(&self) -> Result<u32, BackendError> {
446        self.ready_count
447            .checked_add(self.yielded_count)
448            .and_then(|value| value.checked_add(self.requeued_count))
449            .ok_or_else(|| {
450                BackendError::new(
451                    "megakernel schedulable task count overflowed u32. Fix: shard the task ring before launch.",
452                )
453            })
454    }
455
456    /// Number of slots carrying continuation pressure.
457    #[must_use]
458    #[cfg(any(test, feature = "legacy-infallible"))]
459    pub fn continuation_pressure_count(&self) -> u64 {
460        match self.try_continuation_pressure_count() {
461            Ok(value) => value,
462            Err(error) => panic!("{error}"),
463        }
464    }
465
466    /// Checked number of slots carrying continuation pressure.
467    ///
468    /// # Errors
469    ///
470    /// Returns [`BackendError`] when continuation pressure exceeds u64.
471    pub fn try_continuation_pressure_count(&self) -> Result<u64, BackendError> {
472        u64::from(self.yielded_count)
473            .checked_add(u64::from(self.requeued_count))
474            .and_then(|value| value.checked_add(self.total_requeues))
475            .ok_or_else(|| {
476                BackendError::new(
477                    "megakernel continuation pressure overflowed u64. Fix: drain or shard the task ring before launch.",
478                )
479            })
480    }
481
482    /// Build a Program that runs a one-shot persistent fixpoint over
483    /// the queue snapshot's state-counter buffer, converging the
484    /// counts to a stable equilibrium representing the queue's
485    /// long-run distribution. Wires the self-substrate persistent
486    /// fixpoint builder for observability collectors that want stable
487    /// signals over transient queue jitter.
488    ///
489    /// `current_buffer` / `next_buffer` / `changed_buffer` are
490    /// caller-supplied buffer names for the persistent_fixpoint
491    /// ping-pong; `transfer_body` is the per-iteration body that
492    /// reads `current` and writes `next`. Returns a Program suitable
493    /// for one dispatch.
494    ///
495    /// P-RUNTIME-4: replaces a host-side jitter-smoothing loop with
496    /// a single GPU-side fixpoint dispatch.
497    #[must_use]
498    #[cfg(feature = "self-substrate-adapters")]
499    pub fn build_state_convergence_program(
500        transfer_body: Vec<vyre_foundation::ir::Node>,
501        current_buffer: &str,
502        next_buffer: &str,
503        changed_buffer: &str,
504        words: u32,
505        max_iterations: u32,
506    ) -> vyre_foundation::ir::Program {
507        vyre_self_substrate::persistent_fixpoint_program::persistent_fixpoint_program(
508            transfer_body,
509            current_buffer,
510            next_buffer,
511            changed_buffer,
512            words,
513            max_iterations,
514        )
515    }
516
517    /// Merge this queue telemetry into a launch request.
518    #[must_use]
519    #[cfg(any(test, feature = "legacy-infallible"))]
520    pub fn apply_to_launch_request(
521        &self,
522        mut request: MegakernelLaunchRequest,
523    ) -> MegakernelLaunchRequest {
524        request = match self.try_apply_to_launch_request(request) {
525            Ok(request) => request,
526            Err(error) => panic!("{error}"),
527        };
528        request
529    }
530
531    /// Checked merge of queue telemetry into a launch request.
532    ///
533    /// # Errors
534    ///
535    /// Returns [`BackendError`] when schedulable count or continuation pressure
536    /// cannot fit the launch request ABI.
537    pub fn try_apply_to_launch_request(
538        &self,
539        mut request: MegakernelLaunchRequest,
540    ) -> Result<MegakernelLaunchRequest, BackendError> {
541        request.queue_len = self.try_schedulable_count()?;
542        request.requeue_count = request
543            .requeue_count
544            .checked_add(self.try_continuation_pressure_count()?)
545            .ok_or_else(|| {
546                BackendError::new(
547                    "megakernel launch request requeue_count overflowed u64. Fix: drain or shard the task ring before launch.",
548                )
549            })?;
550        request.max_priority_age = request.max_priority_age.max(self.max_priority_age);
551        Ok(request)
552    }
553}
554
555fn checked_increment(counter: &mut u32) -> Result<(), BackendError> {
556    *counter = counter.checked_add(1).ok_or_else(|| {
557        BackendError::new(
558            "megakernel task queue count exceeds u32::MAX. Fix: shard the task ring before launch.",
559        )
560    })?;
561    Ok(())
562}
563
564fn checked_task_counter_increment(value: u32, label: &'static str) -> Result<u32, BackendError> {
565    value.checked_add(1).ok_or_else(|| {
566        BackendError::new(format!(
567            "megakernel task {label} overflowed u32. Fix: drain or shard the task ring before mutating continuation counters."
568        ))
569    })
570}
571
572fn invalid_task_transition(action: &'static str, state_word: u32) -> BackendError {
573    let state = TaskState::from_word(state_word)
574        .map(|state| format!("{state:?}"))
575        .unwrap_or_else(|| format!("unknown({state_word})"));
576    BackendError::new(format!(
577        "megakernel task cannot {action} from state {state}. Fix: publish only legal task lifecycle transitions before mutating the task slot."
578    ))
579}
580
581#[cfg(test)]
582mod tests;