Skip to main content

vyre_runtime/megakernel/
automata_worklist.rs

1//! Non-blocking automata worklist policy for resident megakernel scheduling.
2//!
3//! Automata traversal and graph-style frontier traversal both reduce to
4//! state/index pairs that may expand irregularly. This module owns the
5//! runtime policy and evidence contract for choosing a non-blocking worklist
6//! path without introducing automata-specific protocol words.
7
8use vyre_driver::backend::BackendError;
9
10use super::planner::MegakernelWorkItem;
11use super::task::{TaskPriority, TaskState, TaskWorkItem, TASK_FLAG_REQUEUE_REQUESTED};
12
13/// Schema version for automata worklist benchmark evidence.
14pub const AUTOMATA_WORKLIST_EVIDENCE_SCHEMA_VERSION: u32 = 1;
15
16/// One automata frontier item encoded as a state/index pair.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct AutomataStateIndex {
19    /// Automata state id.
20    pub state_id: u32,
21    /// Input byte index associated with the state.
22    pub byte_index: u32,
23}
24
25impl AutomataStateIndex {
26    /// Construct a state/index pair.
27    #[must_use]
28    pub const fn new(state_id: u32, byte_index: u32) -> Self {
29        Self {
30            state_id,
31            byte_index,
32        }
33    }
34
35    /// Encode this state/index pair into the shared continuation task ABI.
36    #[must_use]
37    pub fn to_task_work_item(
38        self,
39        task_id: u32,
40        tenant_id: u32,
41        priority: TaskPriority,
42        op_handle: u32,
43        input_handle: u32,
44        output_handle: u32,
45    ) -> TaskWorkItem {
46        let mut task = TaskWorkItem::from_work_item(
47            task_id,
48            tenant_id,
49            priority,
50            MegakernelWorkItem {
51                op_handle,
52                input_handle,
53                output_handle,
54                param: self.state_id,
55            },
56        );
57        task.state = TaskState::Ready.word();
58        task.continuation_pc = self.byte_index;
59        task.continuation_data = self.state_id;
60        task.flags |= TASK_FLAG_REQUEUE_REQUESTED;
61        task
62    }
63}
64
65/// Scheduler mode selected for an automata worklist request.
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
67pub enum AutomataWorklistMode {
68    /// No work is queued.
69    Empty,
70    /// The frontier is small enough for a blocking DFA/NFA kernel baseline.
71    Blocking,
72    /// Use a non-blocking state/index worklist.
73    NonBlocking,
74    /// The worklist must spill or shard before resident execution.
75    SpillRequired,
76}
77
78impl AutomataWorklistMode {
79    /// Stable label for benchmark and release evidence.
80    #[must_use]
81    pub const fn as_str(self) -> &'static str {
82        match self {
83            Self::Empty => "empty",
84            Self::Blocking => "blocking",
85            Self::NonBlocking => "non_blocking",
86            Self::SpillRequired => "spill_required",
87        }
88    }
89}
90
91/// Inputs for non-blocking automata worklist policy.
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub struct AutomataWorklistRequest {
94    /// Current worklist depth in state/index pairs.
95    pub worklist_depth: u32,
96    /// Number of state visits measured or planned for this corpus slice.
97    pub state_visit_count: u64,
98    /// Active-lane or occupancy proxy in basis points.
99    pub occupancy_proxy_bps: u16,
100    /// Blocking DFA/NFA active time for the same corpus slice.
101    pub blocking_active_time_ns: u64,
102    /// Non-blocking worklist active time for the same corpus slice.
103    pub nonblocking_active_time_ns: u64,
104}
105
106impl AutomataWorklistRequest {
107    /// Construct an empty worklist request.
108    #[must_use]
109    pub const fn empty() -> Self {
110        Self {
111            worklist_depth: 0,
112            state_visit_count: 0,
113            occupancy_proxy_bps: 0,
114            blocking_active_time_ns: 0,
115            nonblocking_active_time_ns: 0,
116        }
117    }
118}
119
120/// Policy for selecting blocking versus non-blocking automata traversal.
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub struct AutomataWorklistPolicy {
123    /// Worklist depth at or above which non-blocking traversal is preferred.
124    pub nonblocking_depth_threshold: u32,
125    /// Worklist depth above which the request should spill or shard first.
126    pub spill_depth_threshold: u32,
127    /// Multiplier applied to depth to derive a state-visit budget.
128    pub state_visit_budget_multiplier: u32,
129    /// Occupancy below this value prefers non-blocking traversal.
130    pub low_occupancy_threshold_bps: u16,
131}
132
133impl Default for AutomataWorklistPolicy {
134    fn default() -> Self {
135        Self::standard()
136    }
137}
138
139impl AutomataWorklistPolicy {
140    /// Standard policy for resident automata worklists.
141    #[must_use]
142    pub const fn standard() -> Self {
143        Self {
144            nonblocking_depth_threshold: 64,
145            spill_depth_threshold: 1_048_576,
146            state_visit_budget_multiplier: 8,
147            low_occupancy_threshold_bps: 5_000,
148        }
149    }
150
151    /// Recommend a scheduler mode for a resident automata worklist.
152    ///
153    /// # Errors
154    ///
155    /// Returns [`BackendError`] when derived state-visit budgets overflow.
156    pub fn recommend(
157        self,
158        request: AutomataWorklistRequest,
159    ) -> Result<AutomataWorklistRecommendation, BackendError> {
160        let state_visit_budget = request
161            .worklist_depth
162            .checked_mul(self.state_visit_budget_multiplier)
163            .map(u64::from)
164            .ok_or_else(|| {
165                BackendError::new(
166                    "automata worklist state-visit budget overflowed u32. Fix: shard the state-index frontier before resident scheduling.",
167                )
168            })?;
169        let mode = if request.worklist_depth == 0 {
170            AutomataWorklistMode::Empty
171        } else if request.worklist_depth > self.spill_depth_threshold {
172            AutomataWorklistMode::SpillRequired
173        } else if request.worklist_depth >= self.nonblocking_depth_threshold
174            || request.occupancy_proxy_bps < self.low_occupancy_threshold_bps
175        {
176            AutomataWorklistMode::NonBlocking
177        } else {
178            AutomataWorklistMode::Blocking
179        };
180        Ok(AutomataWorklistRecommendation {
181            mode,
182            worklist_depth: request.worklist_depth,
183            state_visit_budget,
184            state_visit_count: request.state_visit_count,
185            occupancy_proxy_bps: request.occupancy_proxy_bps.min(10_000),
186            match_parity_required: true,
187            reports_state_index_pairs: true,
188        })
189    }
190
191    /// Recommend a scheduler mode and emit benchmark evidence.
192    ///
193    /// # Errors
194    ///
195    /// Returns [`BackendError`] when derived state-visit budgets overflow.
196    pub fn recommend_with_evidence(
197        self,
198        request: AutomataWorklistRequest,
199    ) -> Result<(AutomataWorklistRecommendation, AutomataWorklistEvidence), BackendError> {
200        let recommendation = self.recommend(request)?;
201        let evidence = AutomataWorklistEvidence {
202            schema_version: AUTOMATA_WORKLIST_EVIDENCE_SCHEMA_VERSION,
203            selected_mode: recommendation.mode,
204            worklist_depth: recommendation.worklist_depth,
205            state_visit_count: recommendation.state_visit_count,
206            occupancy_proxy_bps: recommendation.occupancy_proxy_bps,
207            blocking_active_time_ns: request.blocking_active_time_ns,
208            nonblocking_active_time_ns: request.nonblocking_active_time_ns,
209            match_parity_required: recommendation.match_parity_required,
210            reports_state_index_pairs: recommendation.reports_state_index_pairs,
211        };
212        Ok((recommendation, evidence))
213    }
214}
215
216/// Policy output consumed by resident regex and graph-style schedulers.
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub struct AutomataWorklistRecommendation {
219    /// Selected scheduler mode.
220    pub mode: AutomataWorklistMode,
221    /// Worklist depth in state/index pairs.
222    pub worklist_depth: u32,
223    /// Derived state-visit budget for the resident scheduler.
224    pub state_visit_budget: u64,
225    /// Observed or planned state visits.
226    pub state_visit_count: u64,
227    /// Occupancy proxy in basis points.
228    pub occupancy_proxy_bps: u16,
229    /// True when blocking and non-blocking outputs must match.
230    pub match_parity_required: bool,
231    /// True when benchmark evidence must report state/index-pair work.
232    pub reports_state_index_pairs: bool,
233}
234
235/// Benchmark evidence for blocking versus non-blocking automata worklists.
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub struct AutomataWorklistEvidence {
238    /// Evidence schema version.
239    pub schema_version: u32,
240    /// Selected scheduler mode.
241    pub selected_mode: AutomataWorklistMode,
242    /// Worklist depth in state/index pairs.
243    pub worklist_depth: u32,
244    /// Number of state visits reported by the benchmark.
245    pub state_visit_count: u64,
246    /// Occupancy proxy in basis points.
247    pub occupancy_proxy_bps: u16,
248    /// Blocking DFA/NFA active time for the same corpus slice.
249    pub blocking_active_time_ns: u64,
250    /// Non-blocking worklist active time for the same corpus slice.
251    pub nonblocking_active_time_ns: u64,
252    /// True when benchmark outputs must prove match parity.
253    pub match_parity_required: bool,
254    /// True when evidence reports state/index-pair work instead of opaque jobs.
255    pub reports_state_index_pairs: bool,
256}
257
258impl AutomataWorklistEvidence {
259    /// Return true when the evidence contains the required benchmark fields.
260    #[must_use]
261    pub fn is_complete(self) -> bool {
262        self.schema_version == AUTOMATA_WORKLIST_EVIDENCE_SCHEMA_VERSION
263            && self.occupancy_proxy_bps <= 10_000
264            && self.match_parity_required
265            && self.reports_state_index_pairs
266            && (self.selected_mode == AutomataWorklistMode::Empty || self.worklist_depth != 0)
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    #[test]
275    fn state_index_pair_uses_shared_task_work_item_abi() {
276        let pair = AutomataStateIndex::new(17, 4096);
277        let task = pair.to_task_work_item(5, 3, TaskPriority::High, 99, 12, 13);
278
279        assert_eq!(task.state, TaskState::Ready.word());
280        assert_eq!(task.task_id, 5);
281        assert_eq!(task.tenant_id, 3);
282        assert_eq!(task.priority, TaskPriority::High.word());
283        assert_eq!(task.op_handle, 99);
284        assert_eq!(task.input_handle, 12);
285        assert_eq!(task.output_handle, 13);
286        assert_eq!(task.param, 17);
287        assert_eq!(task.continuation_pc, 4096);
288        assert_eq!(task.continuation_data, 17);
289        assert_eq!(task.flags & TASK_FLAG_REQUEUE_REQUESTED, TASK_FLAG_REQUEUE_REQUESTED);
290    }
291
292    #[test]
293    fn policy_emits_nonblocking_worklist_evidence() {
294        let policy = AutomataWorklistPolicy::standard();
295        let request = AutomataWorklistRequest {
296            worklist_depth: policy.nonblocking_depth_threshold,
297            state_visit_count: 2048,
298            occupancy_proxy_bps: 2_500,
299            blocking_active_time_ns: 900,
300            nonblocking_active_time_ns: 600,
301        };
302
303        let (recommendation, evidence) = policy
304            .recommend_with_evidence(request)
305            .expect("Fix: valid automata worklist request should emit evidence");
306
307        assert_eq!(recommendation.mode, AutomataWorklistMode::NonBlocking);
308        assert_eq!(
309            recommendation.state_visit_budget,
310            u64::from(policy.nonblocking_depth_threshold * policy.state_visit_budget_multiplier)
311        );
312        assert_eq!(evidence.schema_version, AUTOMATA_WORKLIST_EVIDENCE_SCHEMA_VERSION);
313        assert_eq!(evidence.selected_mode, AutomataWorklistMode::NonBlocking);
314        assert_eq!(evidence.worklist_depth, policy.nonblocking_depth_threshold);
315        assert_eq!(evidence.state_visit_count, 2048);
316        assert_eq!(evidence.occupancy_proxy_bps, 2_500);
317        assert_eq!(evidence.blocking_active_time_ns, 900);
318        assert_eq!(evidence.nonblocking_active_time_ns, 600);
319        assert!(evidence.match_parity_required);
320        assert!(evidence.reports_state_index_pairs);
321        assert!(evidence.is_complete());
322    }
323}