1use vyre_driver::backend::BackendError;
4
5use super::planner::MegakernelWorkItem;
6use super::policy::MegakernelLaunchRequest;
7
8pub const TASK_SLOT_WORDS: usize = 16;
10
11pub const TASK_SLOT_BYTES: usize = TASK_SLOT_WORDS * core::mem::size_of::<u32>();
13
14pub const TASK_FLAG_PAUSED: u32 = 1 << 0;
16
17pub const TASK_FLAG_YIELDED: u32 = 1 << 1;
19
20pub const TASK_FLAG_REQUEUE_REQUESTED: u32 = 1 << 2;
22
23pub const TASK_FLAG_RESUME_READY: u32 = 1 << 3;
25
26#[repr(u32)]
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum TaskState {
30 Empty = 0,
32 Ready = 1,
34 Running = 2,
36 Done = 3,
38 Paused = 4,
40 Yielded = 5,
42 Requeued = 6,
44 Faulted = 7,
46}
47
48impl TaskState {
49 #[must_use]
51 pub const fn from_word(word: u32) -> Option<Self> {
52 match word {
53 0 => Some(Self::Empty),
54 1 => Some(Self::Ready),
55 2 => Some(Self::Running),
56 3 => Some(Self::Done),
57 4 => Some(Self::Paused),
58 5 => Some(Self::Yielded),
59 6 => Some(Self::Requeued),
60 7 => Some(Self::Faulted),
61 _ => None,
62 }
63 }
64
65 #[must_use]
67 pub const fn word(self) -> u32 {
68 self as u32
69 }
70
71 #[must_use]
73 pub const fn is_schedulable(self) -> bool {
74 matches!(self, Self::Ready | Self::Yielded | Self::Requeued)
75 }
76}
77
78#[repr(u32)]
80#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
81pub enum TaskPriority {
82 Critical = 0,
84 High = 1,
86 #[default]
88 Normal = 2,
89 Low = 3,
91 Idle = 4,
93}
94
95impl TaskPriority {
96 #[must_use]
98 pub const fn from_word(word: u32) -> Option<Self> {
99 match word {
100 0 => Some(Self::Critical),
101 1 => Some(Self::High),
102 2 => Some(Self::Normal),
103 3 => Some(Self::Low),
104 4 => Some(Self::Idle),
105 _ => None,
106 }
107 }
108
109 #[must_use]
111 pub const fn word(self) -> u32 {
112 self as u32
113 }
114}
115
116#[repr(C)]
123#[derive(Debug, Clone, Copy, PartialEq, Eq, bytemuck::Pod, bytemuck::Zeroable)]
124pub struct TaskWorkItem {
125 pub state: u32,
127 pub op_handle: u32,
129 pub tenant_id: u32,
131 pub priority: u32,
133 pub input_handle: u32,
135 pub output_handle: u32,
137 pub param: u32,
139 pub continuation_pc: u32,
141 pub continuation_data: u32,
143 pub resume_epoch: u32,
145 pub task_id: u32,
147 pub parent_task_id: u32,
149 pub age_ticks: u32,
151 pub requeue_count: u32,
153 pub yield_count: u32,
155 pub flags: u32,
157}
158
159impl TaskWorkItem {
160 #[must_use]
162 pub const fn from_work_item(
163 task_id: u32,
164 tenant_id: u32,
165 priority: TaskPriority,
166 item: MegakernelWorkItem,
167 ) -> Self {
168 Self {
169 state: TaskState::Ready.word(),
170 op_handle: item.op_handle,
171 tenant_id,
172 priority: priority.word(),
173 input_handle: item.input_handle,
174 output_handle: item.output_handle,
175 param: item.param,
176 continuation_pc: 0,
177 continuation_data: 0,
178 resume_epoch: 0,
179 task_id,
180 parent_task_id: 0,
181 age_ticks: 0,
182 requeue_count: 0,
183 yield_count: 0,
184 flags: 0,
185 }
186 }
187
188 #[must_use]
190 pub const fn work_item(&self) -> MegakernelWorkItem {
191 MegakernelWorkItem {
192 op_handle: self.op_handle,
193 input_handle: self.input_handle,
194 output_handle: self.output_handle,
195 param: self.param,
196 }
197 }
198
199 #[must_use]
201 pub const fn task_state(&self) -> Option<TaskState> {
202 TaskState::from_word(self.state)
203 }
204
205 #[must_use]
207 pub const fn task_priority(&self) -> Option<TaskPriority> {
208 TaskPriority::from_word(self.priority)
209 }
210
211 #[must_use]
213 pub const fn is_schedulable(&self) -> bool {
214 match self.task_state() {
215 Some(state) => state.is_schedulable(),
216 None => false,
217 }
218 }
219
220 #[must_use]
222 pub fn try_paused(
223 mut self,
224 continuation_pc: u32,
225 continuation_data: u32,
226 resume_epoch: u32,
227 ) -> Result<Self, BackendError> {
228 self.ensure_transitionable("pause")?;
229 self.state = TaskState::Paused.word();
230 self.continuation_pc = continuation_pc;
231 self.continuation_data = continuation_data;
232 self.resume_epoch = resume_epoch;
233 self.flags = (self.flags | TASK_FLAG_PAUSED) & !TASK_FLAG_RESUME_READY;
234 Ok(self)
235 }
236
237 #[must_use]
239 #[cfg(any(test, feature = "legacy-infallible"))]
240 pub fn paused(self, continuation_pc: u32, continuation_data: u32, resume_epoch: u32) -> Self {
241 self.try_paused(continuation_pc, continuation_data, resume_epoch)
242 .unwrap_or_else(|error| panic!("{error}"))
243 }
244
245 #[must_use]
247 pub fn try_resumed(mut self) -> Result<Self, BackendError> {
248 if self.task_state() != Some(TaskState::Paused) {
249 return Err(invalid_task_transition("resume", self.state));
250 }
251 self.state = TaskState::Ready.word();
252 self.flags =
253 (self.flags | TASK_FLAG_RESUME_READY) & !(TASK_FLAG_PAUSED | TASK_FLAG_YIELDED);
254 Ok(self)
255 }
256
257 #[must_use]
259 #[cfg(any(test, feature = "legacy-infallible"))]
260 pub fn resumed(self) -> Self {
261 self.try_resumed().unwrap_or_else(|error| panic!("{error}"))
262 }
263
264 #[must_use]
266 pub fn try_yielded(
267 mut self,
268 continuation_pc: u32,
269 continuation_data: u32,
270 ) -> Result<Self, BackendError> {
271 self.ensure_transitionable("yield")?;
272 self.state = TaskState::Yielded.word();
273 self.continuation_pc = continuation_pc;
274 self.continuation_data = continuation_data;
275 self.yield_count = checked_task_counter_increment(self.yield_count, "yield_count")?;
276 self.flags |= TASK_FLAG_YIELDED;
277 Ok(self)
278 }
279
280 #[must_use]
282 #[cfg(any(test, feature = "legacy-infallible"))]
283 pub fn yielded(self, continuation_pc: u32, continuation_data: u32) -> Self {
284 self.try_yielded(continuation_pc, continuation_data)
285 .unwrap_or_else(|error| panic!("{error}"))
286 }
287
288 #[must_use]
290 pub fn try_requeued(
291 mut self,
292 continuation_pc: u32,
293 continuation_data: u32,
294 priority: TaskPriority,
295 ) -> Result<Self, BackendError> {
296 self.ensure_transitionable("requeue")?;
297 self.state = TaskState::Requeued.word();
298 self.priority = priority.word();
299 self.continuation_pc = continuation_pc;
300 self.continuation_data = continuation_data;
301 self.requeue_count = checked_task_counter_increment(self.requeue_count, "requeue_count")?;
302 self.age_ticks = checked_task_counter_increment(self.age_ticks, "age_ticks")?;
303 self.flags |= TASK_FLAG_REQUEUE_REQUESTED;
304 Ok(self)
305 }
306
307 #[must_use]
309 #[cfg(any(test, feature = "legacy-infallible"))]
310 pub fn requeued(
311 self,
312 continuation_pc: u32,
313 continuation_data: u32,
314 priority: TaskPriority,
315 ) -> Self {
316 self.try_requeued(continuation_pc, continuation_data, priority)
317 .unwrap_or_else(|error| panic!("{error}"))
318 }
319
320 #[must_use]
322 pub fn try_completed(mut self) -> Result<Self, BackendError> {
323 self.ensure_transitionable("complete")?;
324 self.state = TaskState::Done.word();
325 self.flags = 0;
326 Ok(self)
327 }
328
329 #[must_use]
331 #[cfg(any(test, feature = "legacy-infallible"))]
332 pub fn completed(self) -> Self {
333 self.try_completed()
334 .unwrap_or_else(|error| panic!("{error}"))
335 }
336
337 #[must_use]
339 pub fn try_faulted(mut self, fault_code: u32) -> Result<Self, BackendError> {
340 self.ensure_transitionable("fault")?;
341 self.state = TaskState::Faulted.word();
342 self.continuation_data = fault_code;
343 Ok(self)
344 }
345
346 #[must_use]
348 #[cfg(any(test, feature = "legacy-infallible"))]
349 pub fn faulted(self, fault_code: u32) -> Self {
350 self.try_faulted(fault_code)
351 .unwrap_or_else(|error| panic!("{error}"))
352 }
353
354 fn ensure_transitionable(&self, action: &'static str) -> Result<(), BackendError> {
355 match self.task_state() {
356 Some(TaskState::Empty | TaskState::Done | TaskState::Faulted) | None => {
357 Err(invalid_task_transition(action, self.state))
358 }
359 Some(
360 TaskState::Ready
361 | TaskState::Running
362 | TaskState::Paused
363 | TaskState::Yielded
364 | TaskState::Requeued,
365 ) => Ok(()),
366 }
367 }
368}
369
370#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
372pub struct TaskQueueSnapshot {
373 pub ready_count: u32,
375 pub paused_count: u32,
377 pub yielded_count: u32,
379 pub requeued_count: u32,
381 pub running_count: u32,
383 pub faulted_count: u32,
385 pub total_requeues: u64,
387 pub max_priority_age: u32,
389}
390
391impl TaskQueueSnapshot {
392 pub fn from_tasks(tasks: &[TaskWorkItem]) -> Result<Self, BackendError> {
399 let mut snapshot = Self::default();
400 for task in tasks {
401 snapshot.max_priority_age = snapshot.max_priority_age.max(task.age_ticks);
402 snapshot.total_requeues = snapshot
403 .total_requeues
404 .checked_add(u64::from(task.requeue_count))
405 .ok_or_else(|| {
406 BackendError::new(
407 "megakernel task total_requeues overflowed u64. Fix: drain or shard the task ring before launch.",
408 )
409 })?;
410 match task.task_state() {
411 Some(TaskState::Empty | TaskState::Done) => {}
412 Some(TaskState::Ready) => checked_increment(&mut snapshot.ready_count)?,
413 Some(TaskState::Paused) => checked_increment(&mut snapshot.paused_count)?,
414 Some(TaskState::Yielded) => checked_increment(&mut snapshot.yielded_count)?,
415 Some(TaskState::Requeued) => checked_increment(&mut snapshot.requeued_count)?,
416 Some(TaskState::Running) => checked_increment(&mut snapshot.running_count)?,
417 Some(TaskState::Faulted) => checked_increment(&mut snapshot.faulted_count)?,
418 None => {
419 return Err(BackendError::new(format!(
420 "megakernel task slot has unknown state word {}. Fix: write a valid TaskState ABI word before publishing the slot.",
421 task.state
422 )));
423 }
424 }
425 }
426 Ok(snapshot)
427 }
428
429 #[must_use]
431 #[cfg(any(test, feature = "legacy-infallible"))]
432 pub fn schedulable_count(&self) -> u32 {
433 match self.try_schedulable_count() {
434 Ok(value) => value,
435 Err(error) => panic!("{error}"),
436 }
437 }
438
439 pub fn try_schedulable_count(&self) -> Result<u32, BackendError> {
446 self.ready_count
447 .checked_add(self.yielded_count)
448 .and_then(|value| value.checked_add(self.requeued_count))
449 .ok_or_else(|| {
450 BackendError::new(
451 "megakernel schedulable task count overflowed u32. Fix: shard the task ring before launch.",
452 )
453 })
454 }
455
456 #[must_use]
458 #[cfg(any(test, feature = "legacy-infallible"))]
459 pub fn continuation_pressure_count(&self) -> u64 {
460 match self.try_continuation_pressure_count() {
461 Ok(value) => value,
462 Err(error) => panic!("{error}"),
463 }
464 }
465
466 pub fn try_continuation_pressure_count(&self) -> Result<u64, BackendError> {
472 u64::from(self.yielded_count)
473 .checked_add(u64::from(self.requeued_count))
474 .and_then(|value| value.checked_add(self.total_requeues))
475 .ok_or_else(|| {
476 BackendError::new(
477 "megakernel continuation pressure overflowed u64. Fix: drain or shard the task ring before launch.",
478 )
479 })
480 }
481
482 #[must_use]
498 #[cfg(feature = "self-substrate-adapters")]
499 pub fn build_state_convergence_program(
500 transfer_body: Vec<vyre_foundation::ir::Node>,
501 current_buffer: &str,
502 next_buffer: &str,
503 changed_buffer: &str,
504 words: u32,
505 max_iterations: u32,
506 ) -> vyre_foundation::ir::Program {
507 vyre_self_substrate::persistent_fixpoint_program::persistent_fixpoint_program(
508 transfer_body,
509 current_buffer,
510 next_buffer,
511 changed_buffer,
512 words,
513 max_iterations,
514 )
515 }
516
517 #[must_use]
519 #[cfg(any(test, feature = "legacy-infallible"))]
520 pub fn apply_to_launch_request(
521 &self,
522 mut request: MegakernelLaunchRequest,
523 ) -> MegakernelLaunchRequest {
524 request = match self.try_apply_to_launch_request(request) {
525 Ok(request) => request,
526 Err(error) => panic!("{error}"),
527 };
528 request
529 }
530
531 pub fn try_apply_to_launch_request(
538 &self,
539 mut request: MegakernelLaunchRequest,
540 ) -> Result<MegakernelLaunchRequest, BackendError> {
541 request.queue_len = self.try_schedulable_count()?;
542 request.requeue_count = request
543 .requeue_count
544 .checked_add(self.try_continuation_pressure_count()?)
545 .ok_or_else(|| {
546 BackendError::new(
547 "megakernel launch request requeue_count overflowed u64. Fix: drain or shard the task ring before launch.",
548 )
549 })?;
550 request.max_priority_age = request.max_priority_age.max(self.max_priority_age);
551 Ok(request)
552 }
553}
554
555fn checked_increment(counter: &mut u32) -> Result<(), BackendError> {
556 *counter = counter.checked_add(1).ok_or_else(|| {
557 BackendError::new(
558 "megakernel task queue count exceeds u32::MAX. Fix: shard the task ring before launch.",
559 )
560 })?;
561 Ok(())
562}
563
564fn checked_task_counter_increment(value: u32, label: &'static str) -> Result<u32, BackendError> {
565 value.checked_add(1).ok_or_else(|| {
566 BackendError::new(format!(
567 "megakernel task {label} overflowed u32. Fix: drain or shard the task ring before mutating continuation counters."
568 ))
569 })
570}
571
572fn invalid_task_transition(action: &'static str, state_word: u32) -> BackendError {
573 let state = TaskState::from_word(state_word)
574 .map(|state| format!("{state:?}"))
575 .unwrap_or_else(|| format!("unknown({state_word})"));
576 BackendError::new(format!(
577 "megakernel task cannot {action} from state {state}. Fix: publish only legal task lifecycle transitions before mutating the task slot."
578 ))
579}
580
581#[cfg(test)]
582mod tests;