use vyre_driver::backend::BackendError;
use super::planner::MegakernelWorkItem;
use super::policy::MegakernelLaunchRequest;
pub const TASK_SLOT_WORDS: usize = 16;
pub const TASK_SLOT_BYTES: usize = TASK_SLOT_WORDS * core::mem::size_of::<u32>();
pub const TASK_FLAG_PAUSED: u32 = 1 << 0;
pub const TASK_FLAG_YIELDED: u32 = 1 << 1;
pub const TASK_FLAG_REQUEUE_REQUESTED: u32 = 1 << 2;
pub const TASK_FLAG_RESUME_READY: u32 = 1 << 3;
#[repr(u32)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TaskState {
Empty = 0,
Ready = 1,
Running = 2,
Done = 3,
Paused = 4,
Yielded = 5,
Requeued = 6,
Faulted = 7,
}
impl TaskState {
#[must_use]
pub const fn from_word(word: u32) -> Option<Self> {
match word {
0 => Some(Self::Empty),
1 => Some(Self::Ready),
2 => Some(Self::Running),
3 => Some(Self::Done),
4 => Some(Self::Paused),
5 => Some(Self::Yielded),
6 => Some(Self::Requeued),
7 => Some(Self::Faulted),
_ => None,
}
}
#[must_use]
pub const fn word(self) -> u32 {
self as u32
}
#[must_use]
pub const fn is_schedulable(self) -> bool {
matches!(self, Self::Ready | Self::Yielded | Self::Requeued)
}
}
#[repr(u32)]
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
pub enum TaskPriority {
Critical = 0,
High = 1,
#[default]
Normal = 2,
Low = 3,
Idle = 4,
}
impl TaskPriority {
#[must_use]
pub const fn from_word(word: u32) -> Option<Self> {
match word {
0 => Some(Self::Critical),
1 => Some(Self::High),
2 => Some(Self::Normal),
3 => Some(Self::Low),
4 => Some(Self::Idle),
_ => None,
}
}
#[must_use]
pub const fn word(self) -> u32 {
self as u32
}
}
#[repr(C)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, bytemuck::Pod, bytemuck::Zeroable)]
pub struct TaskWorkItem {
pub state: u32,
pub op_handle: u32,
pub tenant_id: u32,
pub priority: u32,
pub input_handle: u32,
pub output_handle: u32,
pub param: u32,
pub continuation_pc: u32,
pub continuation_data: u32,
pub resume_epoch: u32,
pub task_id: u32,
pub parent_task_id: u32,
pub age_ticks: u32,
pub requeue_count: u32,
pub yield_count: u32,
pub flags: u32,
}
impl TaskWorkItem {
#[must_use]
pub const fn from_work_item(
task_id: u32,
tenant_id: u32,
priority: TaskPriority,
item: MegakernelWorkItem,
) -> Self {
Self {
state: TaskState::Ready.word(),
op_handle: item.op_handle,
tenant_id,
priority: priority.word(),
input_handle: item.input_handle,
output_handle: item.output_handle,
param: item.param,
continuation_pc: 0,
continuation_data: 0,
resume_epoch: 0,
task_id,
parent_task_id: 0,
age_ticks: 0,
requeue_count: 0,
yield_count: 0,
flags: 0,
}
}
#[must_use]
pub const fn work_item(&self) -> MegakernelWorkItem {
MegakernelWorkItem {
op_handle: self.op_handle,
input_handle: self.input_handle,
output_handle: self.output_handle,
param: self.param,
}
}
#[must_use]
pub const fn task_state(&self) -> Option<TaskState> {
TaskState::from_word(self.state)
}
#[must_use]
pub const fn task_priority(&self) -> Option<TaskPriority> {
TaskPriority::from_word(self.priority)
}
#[must_use]
pub const fn is_schedulable(&self) -> bool {
match self.task_state() {
Some(state) => state.is_schedulable(),
None => false,
}
}
#[must_use]
pub fn try_paused(
mut self,
continuation_pc: u32,
continuation_data: u32,
resume_epoch: u32,
) -> Result<Self, BackendError> {
self.ensure_transitionable("pause")?;
self.state = TaskState::Paused.word();
self.continuation_pc = continuation_pc;
self.continuation_data = continuation_data;
self.resume_epoch = resume_epoch;
self.flags = (self.flags | TASK_FLAG_PAUSED) & !TASK_FLAG_RESUME_READY;
Ok(self)
}
#[must_use]
#[cfg(any(test, feature = "legacy-infallible"))]
pub fn paused(self, continuation_pc: u32, continuation_data: u32, resume_epoch: u32) -> Self {
self.try_paused(continuation_pc, continuation_data, resume_epoch)
.unwrap_or_else(|error| panic!("{error}"))
}
#[must_use]
pub fn try_resumed(mut self) -> Result<Self, BackendError> {
if self.task_state() != Some(TaskState::Paused) {
return Err(invalid_task_transition("resume", self.state));
}
self.state = TaskState::Ready.word();
self.flags =
(self.flags | TASK_FLAG_RESUME_READY) & !(TASK_FLAG_PAUSED | TASK_FLAG_YIELDED);
Ok(self)
}
#[must_use]
#[cfg(any(test, feature = "legacy-infallible"))]
pub fn resumed(self) -> Self {
self.try_resumed().unwrap_or_else(|error| panic!("{error}"))
}
#[must_use]
pub fn try_yielded(
mut self,
continuation_pc: u32,
continuation_data: u32,
) -> Result<Self, BackendError> {
self.ensure_transitionable("yield")?;
self.state = TaskState::Yielded.word();
self.continuation_pc = continuation_pc;
self.continuation_data = continuation_data;
self.yield_count = checked_task_counter_increment(self.yield_count, "yield_count")?;
self.flags |= TASK_FLAG_YIELDED;
Ok(self)
}
#[must_use]
#[cfg(any(test, feature = "legacy-infallible"))]
pub fn yielded(self, continuation_pc: u32, continuation_data: u32) -> Self {
self.try_yielded(continuation_pc, continuation_data)
.unwrap_or_else(|error| panic!("{error}"))
}
#[must_use]
pub fn try_requeued(
mut self,
continuation_pc: u32,
continuation_data: u32,
priority: TaskPriority,
) -> Result<Self, BackendError> {
self.ensure_transitionable("requeue")?;
self.state = TaskState::Requeued.word();
self.priority = priority.word();
self.continuation_pc = continuation_pc;
self.continuation_data = continuation_data;
self.requeue_count = checked_task_counter_increment(self.requeue_count, "requeue_count")?;
self.age_ticks = checked_task_counter_increment(self.age_ticks, "age_ticks")?;
self.flags |= TASK_FLAG_REQUEUE_REQUESTED;
Ok(self)
}
#[must_use]
#[cfg(any(test, feature = "legacy-infallible"))]
pub fn requeued(
self,
continuation_pc: u32,
continuation_data: u32,
priority: TaskPriority,
) -> Self {
self.try_requeued(continuation_pc, continuation_data, priority)
.unwrap_or_else(|error| panic!("{error}"))
}
#[must_use]
pub fn try_completed(mut self) -> Result<Self, BackendError> {
self.ensure_transitionable("complete")?;
self.state = TaskState::Done.word();
self.flags = 0;
Ok(self)
}
#[must_use]
#[cfg(any(test, feature = "legacy-infallible"))]
pub fn completed(self) -> Self {
self.try_completed()
.unwrap_or_else(|error| panic!("{error}"))
}
#[must_use]
pub fn try_faulted(mut self, fault_code: u32) -> Result<Self, BackendError> {
self.ensure_transitionable("fault")?;
self.state = TaskState::Faulted.word();
self.continuation_data = fault_code;
Ok(self)
}
#[must_use]
#[cfg(any(test, feature = "legacy-infallible"))]
pub fn faulted(self, fault_code: u32) -> Self {
self.try_faulted(fault_code)
.unwrap_or_else(|error| panic!("{error}"))
}
fn ensure_transitionable(&self, action: &'static str) -> Result<(), BackendError> {
match self.task_state() {
Some(TaskState::Empty | TaskState::Done | TaskState::Faulted) | None => {
Err(invalid_task_transition(action, self.state))
}
Some(
TaskState::Ready
| TaskState::Running
| TaskState::Paused
| TaskState::Yielded
| TaskState::Requeued,
) => Ok(()),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct TaskQueueSnapshot {
pub ready_count: u32,
pub paused_count: u32,
pub yielded_count: u32,
pub requeued_count: u32,
pub running_count: u32,
pub faulted_count: u32,
pub total_requeues: u64,
pub max_priority_age: u32,
}
impl TaskQueueSnapshot {
pub fn from_tasks(tasks: &[TaskWorkItem]) -> Result<Self, BackendError> {
let mut snapshot = Self::default();
for task in tasks {
snapshot.max_priority_age = snapshot.max_priority_age.max(task.age_ticks);
snapshot.total_requeues = snapshot
.total_requeues
.checked_add(u64::from(task.requeue_count))
.ok_or_else(|| {
BackendError::new(
"megakernel task total_requeues overflowed u64. Fix: drain or shard the task ring before launch.",
)
})?;
match task.task_state() {
Some(TaskState::Empty | TaskState::Done) => {}
Some(TaskState::Ready) => checked_increment(&mut snapshot.ready_count)?,
Some(TaskState::Paused) => checked_increment(&mut snapshot.paused_count)?,
Some(TaskState::Yielded) => checked_increment(&mut snapshot.yielded_count)?,
Some(TaskState::Requeued) => checked_increment(&mut snapshot.requeued_count)?,
Some(TaskState::Running) => checked_increment(&mut snapshot.running_count)?,
Some(TaskState::Faulted) => checked_increment(&mut snapshot.faulted_count)?,
None => {
return Err(BackendError::new(format!(
"megakernel task slot has unknown state word {}. Fix: write a valid TaskState ABI word before publishing the slot.",
task.state
)));
}
}
}
Ok(snapshot)
}
#[must_use]
#[cfg(any(test, feature = "legacy-infallible"))]
pub fn schedulable_count(&self) -> u32 {
match self.try_schedulable_count() {
Ok(value) => value,
Err(error) => panic!("{error}"),
}
}
pub fn try_schedulable_count(&self) -> Result<u32, BackendError> {
self.ready_count
.checked_add(self.yielded_count)
.and_then(|value| value.checked_add(self.requeued_count))
.ok_or_else(|| {
BackendError::new(
"megakernel schedulable task count overflowed u32. Fix: shard the task ring before launch.",
)
})
}
#[must_use]
#[cfg(any(test, feature = "legacy-infallible"))]
pub fn continuation_pressure_count(&self) -> u64 {
match self.try_continuation_pressure_count() {
Ok(value) => value,
Err(error) => panic!("{error}"),
}
}
pub fn try_continuation_pressure_count(&self) -> Result<u64, BackendError> {
u64::from(self.yielded_count)
.checked_add(u64::from(self.requeued_count))
.and_then(|value| value.checked_add(self.total_requeues))
.ok_or_else(|| {
BackendError::new(
"megakernel continuation pressure overflowed u64. Fix: drain or shard the task ring before launch.",
)
})
}
#[must_use]
#[cfg(feature = "self-substrate-adapters")]
pub fn build_state_convergence_program(
transfer_body: Vec<vyre_foundation::ir::Node>,
current_buffer: &str,
next_buffer: &str,
changed_buffer: &str,
words: u32,
max_iterations: u32,
) -> vyre_foundation::ir::Program {
vyre_self_substrate::persistent_fixpoint_program::persistent_fixpoint_program(
transfer_body,
current_buffer,
next_buffer,
changed_buffer,
words,
max_iterations,
)
}
#[must_use]
#[cfg(any(test, feature = "legacy-infallible"))]
pub fn apply_to_launch_request(
&self,
mut request: MegakernelLaunchRequest,
) -> MegakernelLaunchRequest {
request = match self.try_apply_to_launch_request(request) {
Ok(request) => request,
Err(error) => panic!("{error}"),
};
request
}
pub fn try_apply_to_launch_request(
&self,
mut request: MegakernelLaunchRequest,
) -> Result<MegakernelLaunchRequest, BackendError> {
request.queue_len = self.try_schedulable_count()?;
request.requeue_count = request
.requeue_count
.checked_add(self.try_continuation_pressure_count()?)
.ok_or_else(|| {
BackendError::new(
"megakernel launch request requeue_count overflowed u64. Fix: drain or shard the task ring before launch.",
)
})?;
request.max_priority_age = request.max_priority_age.max(self.max_priority_age);
Ok(request)
}
}
fn checked_increment(counter: &mut u32) -> Result<(), BackendError> {
*counter = counter.checked_add(1).ok_or_else(|| {
BackendError::new(
"megakernel task queue count exceeds u32::MAX. Fix: shard the task ring before launch.",
)
})?;
Ok(())
}
fn checked_task_counter_increment(value: u32, label: &'static str) -> Result<u32, BackendError> {
value.checked_add(1).ok_or_else(|| {
BackendError::new(format!(
"megakernel task {label} overflowed u32. Fix: drain or shard the task ring before mutating continuation counters."
))
})
}
fn invalid_task_transition(action: &'static str, state_word: u32) -> BackendError {
let state = TaskState::from_word(state_word)
.map(|state| format!("{state:?}"))
.unwrap_or_else(|| format!("unknown({state_word})"));
BackendError::new(format!(
"megakernel task cannot {action} from state {state}. Fix: publish only legal task lifecycle transitions before mutating the task slot."
))
}
#[cfg(test)]
mod tests;