1use vyre_driver::backend::BackendError;
4
5use super::planner::MegakernelWorkItem;
6use super::policy::MegakernelLaunchRequest;
7
8pub const TASK_SLOT_WORDS: usize = 16;
10
11pub const TASK_SLOT_BYTES: usize = TASK_SLOT_WORDS * core::mem::size_of::<u32>();
13
14pub const TASK_FLAG_PAUSED: u32 = 1 << 0;
16
17pub const TASK_FLAG_YIELDED: u32 = 1 << 1;
19
20pub const TASK_FLAG_REQUEUE_REQUESTED: u32 = 1 << 2;
22
23pub const TASK_FLAG_RESUME_READY: u32 = 1 << 3;
25
26#[repr(u32)]
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum TaskState {
30 Empty = 0,
32 Ready = 1,
34 Running = 2,
36 Done = 3,
38 Paused = 4,
40 Yielded = 5,
42 Requeued = 6,
44 Faulted = 7,
46}
47
48impl TaskState {
49 #[must_use]
51 pub const fn from_word(word: u32) -> Option<Self> {
52 match word {
53 0 => Some(Self::Empty),
54 1 => Some(Self::Ready),
55 2 => Some(Self::Running),
56 3 => Some(Self::Done),
57 4 => Some(Self::Paused),
58 5 => Some(Self::Yielded),
59 6 => Some(Self::Requeued),
60 7 => Some(Self::Faulted),
61 _ => None,
62 }
63 }
64
65 #[must_use]
67 pub const fn word(self) -> u32 {
68 self as u32
69 }
70
71 #[must_use]
73 pub const fn is_schedulable(self) -> bool {
74 matches!(self, Self::Ready | Self::Yielded | Self::Requeued)
75 }
76}
77
78#[repr(u32)]
80#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
81pub enum TaskPriority {
82 Critical = 0,
84 High = 1,
86 #[default]
88 Normal = 2,
89 Low = 3,
91 Idle = 4,
93}
94
95impl TaskPriority {
96 #[must_use]
98 pub const fn from_word(word: u32) -> Option<Self> {
99 match word {
100 0 => Some(Self::Critical),
101 1 => Some(Self::High),
102 2 => Some(Self::Normal),
103 3 => Some(Self::Low),
104 4 => Some(Self::Idle),
105 _ => None,
106 }
107 }
108
109 #[must_use]
111 pub const fn word(self) -> u32 {
112 self as u32
113 }
114}
115
116#[repr(C)]
123#[derive(Debug, Clone, Copy, PartialEq, Eq, bytemuck::Pod, bytemuck::Zeroable)]
124pub struct TaskWorkItem {
125 pub state: u32,
127 pub op_handle: u32,
129 pub tenant_id: u32,
131 pub priority: u32,
133 pub input_handle: u32,
135 pub output_handle: u32,
137 pub param: u32,
139 pub continuation_pc: u32,
141 pub continuation_data: u32,
143 pub resume_epoch: u32,
145 pub task_id: u32,
147 pub parent_task_id: u32,
149 pub age_ticks: u32,
151 pub requeue_count: u32,
153 pub yield_count: u32,
155 pub flags: u32,
157}
158
159impl TaskWorkItem {
160 #[must_use]
162 pub const fn from_work_item(
163 task_id: u32,
164 tenant_id: u32,
165 priority: TaskPriority,
166 item: MegakernelWorkItem,
167 ) -> Self {
168 Self {
169 state: TaskState::Ready.word(),
170 op_handle: item.op_handle,
171 tenant_id,
172 priority: priority.word(),
173 input_handle: item.input_handle,
174 output_handle: item.output_handle,
175 param: item.param,
176 continuation_pc: 0,
177 continuation_data: 0,
178 resume_epoch: 0,
179 task_id,
180 parent_task_id: 0,
181 age_ticks: 0,
182 requeue_count: 0,
183 yield_count: 0,
184 flags: 0,
185 }
186 }
187
188 #[must_use]
190 pub const fn work_item(&self) -> MegakernelWorkItem {
191 MegakernelWorkItem {
192 op_handle: self.op_handle,
193 input_handle: self.input_handle,
194 output_handle: self.output_handle,
195 param: self.param,
196 }
197 }
198
199 #[must_use]
201 pub const fn task_state(&self) -> Option<TaskState> {
202 TaskState::from_word(self.state)
203 }
204
205 #[must_use]
207 pub const fn task_priority(&self) -> Option<TaskPriority> {
208 TaskPriority::from_word(self.priority)
209 }
210
211 #[must_use]
213 pub const fn is_schedulable(&self) -> bool {
214 match self.task_state() {
215 Some(state) => state.is_schedulable(),
216 None => false,
217 }
218 }
219
220 #[must_use]
222 pub const fn paused(
223 mut self,
224 continuation_pc: u32,
225 continuation_data: u32,
226 resume_epoch: u32,
227 ) -> Self {
228 self.state = TaskState::Paused.word();
229 self.continuation_pc = continuation_pc;
230 self.continuation_data = continuation_data;
231 self.resume_epoch = resume_epoch;
232 self.flags = (self.flags | TASK_FLAG_PAUSED) & !TASK_FLAG_RESUME_READY;
233 self
234 }
235
236 #[must_use]
238 pub const fn resumed(mut self) -> Self {
239 self.state = TaskState::Ready.word();
240 self.flags =
241 (self.flags | TASK_FLAG_RESUME_READY) & !(TASK_FLAG_PAUSED | TASK_FLAG_YIELDED);
242 self
243 }
244
245 #[must_use]
247 pub const fn yielded(mut self, continuation_pc: u32, continuation_data: u32) -> Self {
248 self.state = TaskState::Yielded.word();
249 self.continuation_pc = continuation_pc;
250 self.continuation_data = continuation_data;
251 self.yield_count = match self.yield_count.checked_add(1) {
252 Some(value) => value,
253 None => panic!("megakernel task yield_count overflowed u32"),
254 };
255 self.flags |= TASK_FLAG_YIELDED;
256 self
257 }
258
259 #[must_use]
261 pub const fn requeued(
262 mut self,
263 continuation_pc: u32,
264 continuation_data: u32,
265 priority: TaskPriority,
266 ) -> Self {
267 self.state = TaskState::Requeued.word();
268 self.priority = priority.word();
269 self.continuation_pc = continuation_pc;
270 self.continuation_data = continuation_data;
271 self.requeue_count = match self.requeue_count.checked_add(1) {
272 Some(value) => value,
273 None => panic!("megakernel task requeue_count overflowed u32"),
274 };
275 self.age_ticks = match self.age_ticks.checked_add(1) {
276 Some(value) => value,
277 None => panic!("megakernel task age_ticks overflowed u32"),
278 };
279 self.flags |= TASK_FLAG_REQUEUE_REQUESTED;
280 self
281 }
282
283 #[must_use]
285 pub const fn completed(mut self) -> Self {
286 self.state = TaskState::Done.word();
287 self.flags = 0;
288 self
289 }
290
291 #[must_use]
293 pub const fn faulted(mut self, fault_code: u32) -> Self {
294 self.state = TaskState::Faulted.word();
295 self.continuation_data = fault_code;
296 self
297 }
298}
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
302pub struct TaskQueueSnapshot {
303 pub ready_count: u32,
305 pub paused_count: u32,
307 pub yielded_count: u32,
309 pub requeued_count: u32,
311 pub running_count: u32,
313 pub faulted_count: u32,
315 pub total_requeues: u64,
317 pub max_priority_age: u32,
319}
320
321impl TaskQueueSnapshot {
322 pub fn from_tasks(tasks: &[TaskWorkItem]) -> Result<Self, BackendError> {
329 let mut snapshot = Self::default();
330 for task in tasks {
331 snapshot.max_priority_age = snapshot.max_priority_age.max(task.age_ticks);
332 snapshot.total_requeues = snapshot
333 .total_requeues
334 .checked_add(u64::from(task.requeue_count))
335 .ok_or_else(|| {
336 BackendError::new(
337 "megakernel task total_requeues overflowed u64. Fix: drain or shard the task ring before launch.",
338 )
339 })?;
340 match task.task_state() {
341 Some(TaskState::Empty | TaskState::Done) => {}
342 Some(TaskState::Ready) => checked_increment(&mut snapshot.ready_count)?,
343 Some(TaskState::Paused) => checked_increment(&mut snapshot.paused_count)?,
344 Some(TaskState::Yielded) => checked_increment(&mut snapshot.yielded_count)?,
345 Some(TaskState::Requeued) => checked_increment(&mut snapshot.requeued_count)?,
346 Some(TaskState::Running) => checked_increment(&mut snapshot.running_count)?,
347 Some(TaskState::Faulted) => checked_increment(&mut snapshot.faulted_count)?,
348 None => {
349 return Err(BackendError::new(format!(
350 "megakernel task slot has unknown state word {}. Fix: write a valid TaskState ABI word before publishing the slot.",
351 task.state
352 )));
353 }
354 }
355 }
356 Ok(snapshot)
357 }
358
359 #[must_use]
361 pub fn schedulable_count(&self) -> u32 {
362 match self.try_schedulable_count() {
363 Ok(value) => value,
364 Err(error) => panic!("{error}"),
365 }
366 }
367
368 pub fn try_schedulable_count(&self) -> Result<u32, BackendError> {
375 self.ready_count
376 .checked_add(self.yielded_count)
377 .and_then(|value| value.checked_add(self.requeued_count))
378 .ok_or_else(|| {
379 BackendError::new(
380 "megakernel schedulable task count overflowed u32. Fix: shard the task ring before launch.",
381 )
382 })
383 }
384
385 #[must_use]
387 pub fn continuation_pressure_count(&self) -> u64 {
388 match self.try_continuation_pressure_count() {
389 Ok(value) => value,
390 Err(error) => panic!("{error}"),
391 }
392 }
393
394 pub fn try_continuation_pressure_count(&self) -> Result<u64, BackendError> {
400 u64::from(self.yielded_count)
401 .checked_add(u64::from(self.requeued_count))
402 .and_then(|value| value.checked_add(self.total_requeues))
403 .ok_or_else(|| {
404 BackendError::new(
405 "megakernel continuation pressure overflowed u64. Fix: drain or shard the task ring before launch.",
406 )
407 })
408 }
409
410 #[must_use]
426 #[cfg(feature = "self-substrate-adapters")]
427 pub fn build_state_convergence_program(
428 transfer_body: Vec<vyre_foundation::ir::Node>,
429 current_buffer: &str,
430 next_buffer: &str,
431 changed_buffer: &str,
432 words: u32,
433 max_iterations: u32,
434 ) -> vyre_foundation::ir::Program {
435 vyre_self_substrate::persistent_fixpoint_program::persistent_fixpoint_program(
436 transfer_body,
437 current_buffer,
438 next_buffer,
439 changed_buffer,
440 words,
441 max_iterations,
442 )
443 }
444
445 #[must_use]
447 pub fn apply_to_launch_request(
448 &self,
449 mut request: MegakernelLaunchRequest,
450 ) -> MegakernelLaunchRequest {
451 request = match self.try_apply_to_launch_request(request) {
452 Ok(request) => request,
453 Err(error) => panic!("{error}"),
454 };
455 request
456 }
457
458 pub fn try_apply_to_launch_request(
465 &self,
466 mut request: MegakernelLaunchRequest,
467 ) -> Result<MegakernelLaunchRequest, BackendError> {
468 request.queue_len = self.try_schedulable_count()?;
469 request.requeue_count = request
470 .requeue_count
471 .checked_add(self.try_continuation_pressure_count()?)
472 .ok_or_else(|| {
473 BackendError::new(
474 "megakernel launch request requeue_count overflowed u64. Fix: drain or shard the task ring before launch.",
475 )
476 })?;
477 request.max_priority_age = request.max_priority_age.max(self.max_priority_age);
478 Ok(request)
479 }
480}
481
482fn checked_increment(counter: &mut u32) -> Result<(), BackendError> {
483 *counter = counter.checked_add(1).ok_or_else(|| {
484 BackendError::new(
485 "megakernel task queue count exceeds u32::MAX. Fix: shard the task ring before launch.",
486 )
487 })?;
488 Ok(())
489}
490
491#[cfg(test)]
492mod tests;