Skip to main content

vyre_runtime/megakernel/
mixed_work.rs

1//! Runtime-owned mixed-work protocol for resident megakernel batches.
2//!
3//! This module is intentionally domain-neutral. Scan, graph, parser, and flow
4//! callers own their manifests and payload layouts; the runtime owns only the
5//! queue class, work-unit type, resident artifact id, output slab id, watchdog
6//! budget, and deterministic evidence contract needed to drain one resident
7//! batch without hidden host loops.
8
9/// Schema version for mixed-work protocol evidence.
10pub const MIXED_WORK_PROTOCOL_SCHEMA_VERSION: u32 = 1;
11
12const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
13const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
14
15/// Resident queue class used by the megakernel scheduler.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17pub enum MixedWorkQueueClass {
18    /// Byte, literal, regex, or token scan work.
19    Scan,
20    /// Frontier, CSR, motif, or reachability graph work.
21    Graph,
22    /// Lexer, parser, VAST, or changed-range parser work.
23    Parser,
24    /// Relation, dataflow, IFDS, or fixed-point flow work.
25    Flow,
26    /// Runtime control work such as bounded drain sentinels.
27    Control,
28}
29
30impl MixedWorkQueueClass {
31    /// Stable label used in evidence and diagnostics.
32    #[must_use]
33    pub const fn as_str(self) -> &'static str {
34        match self {
35            Self::Scan => "scan",
36            Self::Graph => "graph",
37            Self::Parser => "parser",
38            Self::Flow => "flow",
39            Self::Control => "control",
40        }
41    }
42
43    const fn tag(self) -> u64 {
44        match self {
45            Self::Scan => 1,
46            Self::Graph => 2,
47            Self::Parser => 3,
48            Self::Flow => 4,
49            Self::Control => 5,
50        }
51    }
52}
53
54/// Resident work-unit type selected inside a queue class.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56pub enum MixedWorkUnitType {
57    /// Scan one byte chunk or literal/regex shard.
58    ScanChunk,
59    /// Verify scan candidates in a resident verifier fragment.
60    ScanVerifier,
61    /// Expand or compact a graph frontier.
62    GraphFrontier,
63    /// Compact graph output or frontier queues.
64    GraphCompaction,
65    /// Run one parser shard or lexer/tokenization shard.
66    ParserShard,
67    /// Apply one parser changed-range shard.
68    ParserChangedRange,
69    /// Apply a relation delta batch.
70    FlowRelationDelta,
71    /// Run one flow fixed-point step.
72    FlowFixpointStep,
73    /// Drain-control sentinel used to bound persistent execution.
74    DrainSentinel,
75}
76
77impl MixedWorkUnitType {
78    /// Stable label used in evidence and diagnostics.
79    #[must_use]
80    pub const fn as_str(self) -> &'static str {
81        match self {
82            Self::ScanChunk => "scan_chunk",
83            Self::ScanVerifier => "scan_verifier",
84            Self::GraphFrontier => "graph_frontier",
85            Self::GraphCompaction => "graph_compaction",
86            Self::ParserShard => "parser_shard",
87            Self::ParserChangedRange => "parser_changed_range",
88            Self::FlowRelationDelta => "flow_relation_delta",
89            Self::FlowFixpointStep => "flow_fixpoint_step",
90            Self::DrainSentinel => "drain_sentinel",
91        }
92    }
93
94    const fn tag(self) -> u64 {
95        match self {
96            Self::ScanChunk => 11,
97            Self::ScanVerifier => 12,
98            Self::GraphFrontier => 21,
99            Self::GraphCompaction => 22,
100            Self::ParserShard => 31,
101            Self::ParserChangedRange => 32,
102            Self::FlowRelationDelta => 41,
103            Self::FlowFixpointStep => 42,
104            Self::DrainSentinel => 51,
105        }
106    }
107}
108
109/// Opaque id for an artifact already resident in megakernel-owned buffers.
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
111pub struct ResidentArtifactId(pub u32);
112
113impl ResidentArtifactId {
114    /// Return true when this id names a concrete resident artifact.
115    #[must_use]
116    pub const fn is_valid(self) -> bool {
117        self.0 != 0
118    }
119}
120
121/// Opaque id for a resident output slab owned by the runtime.
122#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
123pub struct OutputSlabId(pub u32);
124
125impl OutputSlabId {
126    /// Return true when this id names a concrete output slab.
127    #[must_use]
128    pub const fn is_valid(self) -> bool {
129        self.0 != 0
130    }
131}
132
133/// One resident mixed-work unit.
134#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
135pub struct MixedWorkUnit {
136    /// Stable sequence number used for deterministic drain and output evidence.
137    pub sequence: u64,
138    /// Scheduler queue class.
139    pub queue_class: MixedWorkQueueClass,
140    /// Work-unit kind inside the queue class.
141    pub unit_type: MixedWorkUnitType,
142    /// Resident artifact consumed by this work unit.
143    pub resident_artifact_id: ResidentArtifactId,
144    /// Output slab written by this work unit.
145    pub output_slab_id: OutputSlabId,
146    /// Per-unit watchdog budget in scheduler ticks.
147    pub watchdog_budget_ticks: u32,
148    /// Caller-owned payload digest. Runtime treats payload bytes as opaque.
149    pub payload_digest: u64,
150}
151
152impl MixedWorkUnit {
153    /// Construct one mixed-work unit.
154    #[must_use]
155    pub const fn new(
156        sequence: u64,
157        queue_class: MixedWorkQueueClass,
158        unit_type: MixedWorkUnitType,
159        resident_artifact_id: ResidentArtifactId,
160        output_slab_id: OutputSlabId,
161        watchdog_budget_ticks: u32,
162        payload_digest: u64,
163    ) -> Self {
164        Self {
165            sequence,
166            queue_class,
167            unit_type,
168            resident_artifact_id,
169            output_slab_id,
170            watchdog_budget_ticks,
171            payload_digest,
172        }
173    }
174}
175
176/// Borrowed resident mixed-work plan supplied to the runtime scheduler.
177#[derive(Debug, Clone, Copy, PartialEq, Eq)]
178pub struct MixedWorkProtocolPlan<'a> {
179    /// Work units to drain in deterministic sequence order.
180    pub units: &'a [MixedWorkUnit],
181    /// Total watchdog budget for draining the plan.
182    pub drain_watchdog_budget_ticks: u64,
183}
184
185impl<'a> MixedWorkProtocolPlan<'a> {
186    /// Construct a borrowed mixed-work protocol plan.
187    #[must_use]
188    pub const fn new(units: &'a [MixedWorkUnit], drain_watchdog_budget_ticks: u64) -> Self {
189        Self {
190            units,
191            drain_watchdog_budget_ticks,
192        }
193    }
194}
195
196/// Evidence emitted after validating a mixed-work protocol plan.
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub struct MixedWorkProtocolEvidence {
199    /// Evidence schema version.
200    pub schema_version: u32,
201    /// Total work units.
202    pub unit_count: u32,
203    /// Scan queue units.
204    pub scan_units: u32,
205    /// Graph queue units.
206    pub graph_units: u32,
207    /// Parser queue units.
208    pub parser_units: u32,
209    /// Flow queue units.
210    pub flow_units: u32,
211    /// Runtime control queue units.
212    pub control_units: u32,
213    /// Sum of per-unit watchdog budgets.
214    pub total_watchdog_budget_ticks: u64,
215    /// Largest per-unit watchdog budget.
216    pub max_watchdog_budget_ticks: u32,
217    /// Drain budget supplied for the full resident batch.
218    pub drain_watchdog_budget_ticks: u64,
219    /// True when the sum of per-unit watchdog budgets is bounded by the drain budget.
220    pub bounded_drain: bool,
221    /// Hidden host-loop count. Valid mixed-work plans keep this at zero.
222    pub hidden_host_loop_count: u32,
223    /// Deterministic digest of queue class, unit type, ids, budgets, and payload digests.
224    pub deterministic_output_digest: u64,
225}
226
227impl MixedWorkProtocolEvidence {
228    /// Return true when scan, graph, parser, and flow classes are all present.
229    #[must_use]
230    pub const fn covers_scan_graph_parser_flow(self) -> bool {
231        self.scan_units != 0 && self.graph_units != 0 && self.parser_units != 0 && self.flow_units != 0
232    }
233
234    /// Return true when evidence is complete enough for release benches.
235    #[must_use]
236    pub const fn is_complete(self) -> bool {
237        self.schema_version == MIXED_WORK_PROTOCOL_SCHEMA_VERSION
238            && self.unit_count != 0
239            && self.bounded_drain
240            && self.hidden_host_loop_count == 0
241            && self.deterministic_output_digest != 0
242    }
243}
244
245/// Mixed-work protocol validation error.
246#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
247#[non_exhaustive]
248pub enum MixedWorkProtocolError {
249    /// The plan has no resident work.
250    #[error("mixed-work plan is empty. Fix: publish at least one resident work unit before scheduling.")]
251    EmptyPlan,
252    /// The total drain budget is zero.
253    #[error("mixed-work drain watchdog budget is zero. Fix: provide a positive resident drain budget.")]
254    ZeroDrainWatchdogBudget,
255    /// A unit has no watchdog budget.
256    #[error("mixed-work unit {sequence} has zero watchdog budget. Fix: assign a positive per-unit watchdog budget.")]
257    ZeroUnitWatchdogBudget {
258        /// Sequence number of the invalid unit.
259        sequence: u64,
260    },
261    /// A unit references no resident artifact.
262    #[error("mixed-work unit {sequence} has resident artifact id 0. Fix: publish a resident artifact before queueing work.")]
263    ZeroResidentArtifactId {
264        /// Sequence number of the invalid unit.
265        sequence: u64,
266    },
267    /// A unit references no output slab.
268    #[error("mixed-work unit {sequence} has output slab id 0. Fix: allocate a resident output slab before queueing work.")]
269    ZeroOutputSlabId {
270        /// Sequence number of the invalid unit.
271        sequence: u64,
272    },
273    /// Queue class and unit type do not match.
274    #[error(
275        "mixed-work unit {sequence} routes {unit_type} through {queue_class}. Fix: use a unit type owned by the queue class."
276    )]
277    QueueClassMismatch {
278        /// Sequence number of the invalid unit.
279        sequence: u64,
280        /// Queue class label.
281        queue_class: &'static str,
282        /// Unit type label.
283        unit_type: &'static str,
284    },
285    /// Unit count cannot fit the evidence ABI.
286    #[error("mixed-work unit count {unit_count} overflows u32 evidence. Fix: shard the resident batch.")]
287    UnitCountOverflow {
288        /// Unit count that exceeded the evidence ABI.
289        unit_count: usize,
290    },
291    /// Class-specific count cannot fit the evidence ABI.
292    #[error("mixed-work {queue_class} unit count overflowed u32 evidence. Fix: shard that queue class.")]
293    ClassCountOverflow {
294        /// Queue class whose count overflowed.
295        queue_class: &'static str,
296    },
297    /// Watchdog sum overflowed the evidence ABI.
298    #[error("mixed-work watchdog budget sum overflowed u64. Fix: shard the resident batch.")]
299    WatchdogBudgetOverflow,
300    /// The plan cannot drain inside the supplied watchdog budget.
301    #[error(
302        "mixed-work watchdog budget {total_watchdog_budget_ticks} exceeds drain budget {drain_watchdog_budget_ticks}. Fix: increase the drain budget or shard the resident batch."
303    )]
304    WatchdogBudgetExceeded {
305        /// Sum of per-unit watchdog budgets.
306        total_watchdog_budget_ticks: u64,
307        /// Drain budget supplied by the caller.
308        drain_watchdog_budget_ticks: u64,
309    },
310}
311
312/// Validate a mixed-work protocol plan and return deterministic drain evidence.
313///
314/// # Errors
315///
316/// Returns [`MixedWorkProtocolError`] when the plan cannot be drained by the
317/// resident scheduler without invalid ids, class mismatches, hidden host loops,
318/// or an unbounded watchdog budget.
319pub fn mixed_work_protocol_evidence(
320    plan: &MixedWorkProtocolPlan<'_>,
321) -> Result<MixedWorkProtocolEvidence, MixedWorkProtocolError> {
322    validate_mixed_work_protocol(plan)
323}
324
325/// Validate a mixed-work protocol plan and return deterministic drain evidence.
326///
327/// # Errors
328///
329/// Returns [`MixedWorkProtocolError`] when any work unit is malformed or the
330/// plan exceeds its drain watchdog budget.
331pub fn validate_mixed_work_protocol(
332    plan: &MixedWorkProtocolPlan<'_>,
333) -> Result<MixedWorkProtocolEvidence, MixedWorkProtocolError> {
334    if plan.units.is_empty() {
335        return Err(MixedWorkProtocolError::EmptyPlan);
336    }
337    if plan.drain_watchdog_budget_ticks == 0 {
338        return Err(MixedWorkProtocolError::ZeroDrainWatchdogBudget);
339    }
340    if plan.units.len() > u32::MAX as usize {
341        return Err(MixedWorkProtocolError::UnitCountOverflow {
342            unit_count: plan.units.len(),
343        });
344    }
345
346    let mut counts = [0_u32; 5];
347    let mut total_watchdog_budget_ticks = 0_u64;
348    let mut max_watchdog_budget_ticks = 0_u32;
349    let mut digest = FNV_OFFSET;
350
351    for unit in plan.units {
352        validate_unit(*unit)?;
353        bump_class_count(&mut counts, unit.queue_class)?;
354        total_watchdog_budget_ticks = total_watchdog_budget_ticks
355            .checked_add(u64::from(unit.watchdog_budget_ticks))
356            .ok_or(MixedWorkProtocolError::WatchdogBudgetOverflow)?;
357        max_watchdog_budget_ticks = max_watchdog_budget_ticks.max(unit.watchdog_budget_ticks);
358        digest = mix_unit_digest(digest, *unit);
359    }
360
361    if total_watchdog_budget_ticks > plan.drain_watchdog_budget_ticks {
362        return Err(MixedWorkProtocolError::WatchdogBudgetExceeded {
363            total_watchdog_budget_ticks,
364            drain_watchdog_budget_ticks: plan.drain_watchdog_budget_ticks,
365        });
366    }
367
368    Ok(MixedWorkProtocolEvidence {
369        schema_version: MIXED_WORK_PROTOCOL_SCHEMA_VERSION,
370        unit_count: plan.units.len() as u32,
371        scan_units: counts[0],
372        graph_units: counts[1],
373        parser_units: counts[2],
374        flow_units: counts[3],
375        control_units: counts[4],
376        total_watchdog_budget_ticks,
377        max_watchdog_budget_ticks,
378        drain_watchdog_budget_ticks: plan.drain_watchdog_budget_ticks,
379        bounded_drain: true,
380        hidden_host_loop_count: 0,
381        deterministic_output_digest: digest,
382    })
383}
384
385fn validate_unit(unit: MixedWorkUnit) -> Result<(), MixedWorkProtocolError> {
386    if unit.watchdog_budget_ticks == 0 {
387        return Err(MixedWorkProtocolError::ZeroUnitWatchdogBudget {
388            sequence: unit.sequence,
389        });
390    }
391    if !unit.resident_artifact_id.is_valid() {
392        return Err(MixedWorkProtocolError::ZeroResidentArtifactId {
393            sequence: unit.sequence,
394        });
395    }
396    if !unit.output_slab_id.is_valid() {
397        return Err(MixedWorkProtocolError::ZeroOutputSlabId {
398            sequence: unit.sequence,
399        });
400    }
401    if !unit_type_matches_queue(unit.queue_class, unit.unit_type) {
402        return Err(MixedWorkProtocolError::QueueClassMismatch {
403            sequence: unit.sequence,
404            queue_class: unit.queue_class.as_str(),
405            unit_type: unit.unit_type.as_str(),
406        });
407    }
408    Ok(())
409}
410
411const fn unit_type_matches_queue(
412    queue_class: MixedWorkQueueClass,
413    unit_type: MixedWorkUnitType,
414) -> bool {
415    matches!(
416        (queue_class, unit_type),
417        (MixedWorkQueueClass::Scan, MixedWorkUnitType::ScanChunk)
418            | (MixedWorkQueueClass::Scan, MixedWorkUnitType::ScanVerifier)
419            | (MixedWorkQueueClass::Graph, MixedWorkUnitType::GraphFrontier)
420            | (MixedWorkQueueClass::Graph, MixedWorkUnitType::GraphCompaction)
421            | (MixedWorkQueueClass::Parser, MixedWorkUnitType::ParserShard)
422            | (MixedWorkQueueClass::Parser, MixedWorkUnitType::ParserChangedRange)
423            | (MixedWorkQueueClass::Flow, MixedWorkUnitType::FlowRelationDelta)
424            | (MixedWorkQueueClass::Flow, MixedWorkUnitType::FlowFixpointStep)
425            | (MixedWorkQueueClass::Control, MixedWorkUnitType::DrainSentinel)
426    )
427}
428
429fn bump_class_count(
430    counts: &mut [u32; 5],
431    queue_class: MixedWorkQueueClass,
432) -> Result<(), MixedWorkProtocolError> {
433    let index = match queue_class {
434        MixedWorkQueueClass::Scan => 0,
435        MixedWorkQueueClass::Graph => 1,
436        MixedWorkQueueClass::Parser => 2,
437        MixedWorkQueueClass::Flow => 3,
438        MixedWorkQueueClass::Control => 4,
439    };
440    counts[index] = counts[index]
441        .checked_add(1)
442        .ok_or(MixedWorkProtocolError::ClassCountOverflow {
443            queue_class: queue_class.as_str(),
444        })?;
445    Ok(())
446}
447
448fn mix_unit_digest(mut digest: u64, unit: MixedWorkUnit) -> u64 {
449    digest = fnv_mix(digest, unit.sequence);
450    digest = fnv_mix(digest, unit.queue_class.tag());
451    digest = fnv_mix(digest, unit.unit_type.tag());
452    digest = fnv_mix(digest, u64::from(unit.resident_artifact_id.0));
453    digest = fnv_mix(digest, u64::from(unit.output_slab_id.0));
454    digest = fnv_mix(digest, u64::from(unit.watchdog_budget_ticks));
455    fnv_mix(digest, unit.payload_digest)
456}
457
458fn fnv_mix(mut digest: u64, value: u64) -> u64 {
459    for byte in value.to_le_bytes() {
460        digest ^= u64::from(byte);
461        digest = digest.wrapping_mul(FNV_PRIME);
462    }
463    digest
464}
465
466#[cfg(test)]
467mod tests {
468    use super::{
469        mixed_work_protocol_evidence, validate_mixed_work_protocol, MixedWorkProtocolError,
470        MixedWorkProtocolPlan, MixedWorkQueueClass, MixedWorkUnit, MixedWorkUnitType,
471        OutputSlabId, ResidentArtifactId, MIXED_WORK_PROTOCOL_SCHEMA_VERSION,
472    };
473
474    fn unit(
475        sequence: u64,
476        queue_class: MixedWorkQueueClass,
477        unit_type: MixedWorkUnitType,
478    ) -> MixedWorkUnit {
479        MixedWorkUnit::new(
480            sequence,
481            queue_class,
482            unit_type,
483            ResidentArtifactId(100 + sequence as u32),
484            OutputSlabId(200 + sequence as u32),
485            10,
486            0xfeed_0000 + sequence,
487        )
488    }
489
490    #[test]
491    fn mixed_scan_graph_parser_flow_work_emits_deterministic_bounded_drain_evidence() {
492        let units = [
493            unit(1, MixedWorkQueueClass::Scan, MixedWorkUnitType::ScanChunk),
494            unit(2, MixedWorkQueueClass::Graph, MixedWorkUnitType::GraphFrontier),
495            unit(3, MixedWorkQueueClass::Parser, MixedWorkUnitType::ParserShard),
496            unit(4, MixedWorkQueueClass::Flow, MixedWorkUnitType::FlowRelationDelta),
497            unit(5, MixedWorkQueueClass::Control, MixedWorkUnitType::DrainSentinel),
498        ];
499        let plan = MixedWorkProtocolPlan::new(&units, 64);
500
501        let first = mixed_work_protocol_evidence(&plan)
502            .expect("Fix: valid mixed-work plan should emit evidence");
503        let second = validate_mixed_work_protocol(&plan)
504            .expect("Fix: valid mixed-work plan should emit stable evidence");
505
506        assert_eq!(first, second);
507        assert_eq!(first.schema_version, MIXED_WORK_PROTOCOL_SCHEMA_VERSION);
508        assert!(first.is_complete());
509        assert!(first.covers_scan_graph_parser_flow());
510        assert!(first.bounded_drain);
511        assert_eq!(first.hidden_host_loop_count, 0);
512        assert_eq!(first.unit_count, 5);
513        assert_eq!(first.total_watchdog_budget_ticks, 50);
514        assert_eq!(first.max_watchdog_budget_ticks, 10);
515        assert_ne!(first.deterministic_output_digest, 0);
516    }
517
518    #[test]
519    fn zero_watchdog_budget_is_rejected() {
520        let units = [MixedWorkUnit::new(
521            7,
522            MixedWorkQueueClass::Scan,
523            MixedWorkUnitType::ScanChunk,
524            ResidentArtifactId(1),
525            OutputSlabId(1),
526            0,
527            9,
528        )];
529        let plan = MixedWorkProtocolPlan::new(&units, 1);
530
531        assert!(matches!(
532            validate_mixed_work_protocol(&plan),
533            Err(MixedWorkProtocolError::ZeroUnitWatchdogBudget { sequence: 7 })
534        ));
535    }
536
537    #[test]
538    fn class_unit_mismatch_is_rejected() {
539        let units = [MixedWorkUnit::new(
540            9,
541            MixedWorkQueueClass::Parser,
542            MixedWorkUnitType::FlowFixpointStep,
543            ResidentArtifactId(1),
544            OutputSlabId(1),
545            1,
546            9,
547        )];
548        let plan = MixedWorkProtocolPlan::new(&units, 1);
549
550        assert!(matches!(
551            validate_mixed_work_protocol(&plan),
552            Err(MixedWorkProtocolError::QueueClassMismatch {
553                sequence: 9,
554                queue_class: "parser",
555                unit_type: "flow_fixpoint_step"
556            })
557        ));
558    }
559
560    #[test]
561    fn drain_budget_must_bound_all_units() {
562        let units = [
563            unit(1, MixedWorkQueueClass::Scan, MixedWorkUnitType::ScanChunk),
564            unit(2, MixedWorkQueueClass::Flow, MixedWorkUnitType::FlowRelationDelta),
565        ];
566        let plan = MixedWorkProtocolPlan::new(&units, 19);
567
568        assert!(matches!(
569            validate_mixed_work_protocol(&plan),
570            Err(MixedWorkProtocolError::WatchdogBudgetExceeded {
571                total_watchdog_budget_ticks: 20,
572                drain_watchdog_budget_ticks: 19
573            })
574        ));
575    }
576
577    #[test]
578    fn resident_artifact_and_output_slab_ids_are_required() {
579        let bad_artifact = [MixedWorkUnit::new(
580            1,
581            MixedWorkQueueClass::Scan,
582            MixedWorkUnitType::ScanChunk,
583            ResidentArtifactId(0),
584            OutputSlabId(1),
585            1,
586            1,
587        )];
588        assert!(matches!(
589            validate_mixed_work_protocol(&MixedWorkProtocolPlan::new(&bad_artifact, 1)),
590            Err(MixedWorkProtocolError::ZeroResidentArtifactId { sequence: 1 })
591        ));
592
593        let bad_slab = [MixedWorkUnit::new(
594            2,
595            MixedWorkQueueClass::Scan,
596            MixedWorkUnitType::ScanChunk,
597            ResidentArtifactId(1),
598            OutputSlabId(0),
599            1,
600            1,
601        )];
602        assert!(matches!(
603            validate_mixed_work_protocol(&MixedWorkProtocolPlan::new(&bad_slab, 1)),
604            Err(MixedWorkProtocolError::ZeroOutputSlabId { sequence: 2 })
605        ));
606    }
607}