Skip to main content

vyre_runtime/megakernel/
rule_catalog.rs

1//! DFA rule catalog packing for batched megakernel dispatch.
2
3use super::staging_reserve::{
4    reserve_hash_map_capacity as reserve_catalog_map, reserve_vec_capacity as reserve_catalog_vec,
5};
6use crate::PipelineError;
7use rustc_hash::FxHashMap;
8
9/// Dense byte alphabet used by the DFA transition table.
10pub const ALPHABET_SIZE: u32 = 256;
11const ALPHABET_SIZE_USIZE: usize = 256;
12
13/// Number of `u32` words per rule metadata entry.
14pub const RULE_META_WORDS: usize = 3;
15
16/// One compiled DFA-backed rule program consumed by the batch dispatcher.
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct BatchRuleProgram {
19    /// Stable rule-table index.
20    pub rule_idx: u32,
21    /// Dense DFA transition table (`state * 256 + byte -> next_state`).
22    pub transitions: Vec<u32>,
23    /// Dense DFA accept table (`state -> non-zero match marker`).
24    pub accept: Vec<u32>,
25    /// DFA state count.
26    pub state_count: u32,
27}
28
29impl BatchRuleProgram {
30    /// Build one DFA-backed rule program.
31    ///
32    /// # Errors
33    ///
34    /// Returns [`PipelineError::Backend`] when the DFA buffers do not match
35    /// `state_count`.
36    pub fn new(
37        rule_idx: u32,
38        transitions: Vec<u32>,
39        accept: Vec<u32>,
40        state_count: u32,
41    ) -> Result<Self, PipelineError> {
42        validate_rule_shape(rule_idx, &transitions, &accept, state_count)?;
43        Ok(Self {
44            rule_idx,
45            transitions,
46            accept,
47            state_count,
48        })
49    }
50}
51
52/// Packed metadata for one dense DFA rule entry.
53#[repr(C)]
54#[derive(Debug, Clone, Copy, PartialEq, Eq, bytemuck::Pod, bytemuck::Zeroable)]
55pub struct RuleMeta {
56    /// Word offset into the flattened transition table.
57    pub transition_base: u32,
58    /// Word offset into the flattened accept table.
59    pub accept_base: u32,
60    /// DFA state count for this rule.
61    pub state_count: u32,
62}
63
64/// One rule rejected from a megakernel batch while other rules still ran.
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct BatchRuleRejection {
67    /// Caller-supplied rule index when present.
68    pub rule_idx: Option<u32>,
69    /// Human-readable rejection reason.
70    pub reason: String,
71}
72
73/// Packed rule catalog uploaded to device storage buffers.
74pub struct PackedRuleCatalog {
75    /// Dense per-rule metadata table.
76    pub rule_meta: Vec<RuleMeta>,
77    /// Deduplicated flattened DFA transition storage.
78    pub transitions: Vec<u32>,
79    /// Deduplicated flattened DFA accept storage.
80    pub accept: Vec<u32>,
81    /// Rules rejected during validation or dense-slot assignment.
82    pub rejected_rules: Vec<BatchRuleRejection>,
83}
84
85/// Caller-owned storage for packing rule catalogs without rebuilding host
86/// allocations on every refresh.
87#[derive(Default)]
88pub struct RuleCatalogPackingScratch {
89    /// Dense per-rule metadata table.
90    pub rule_meta: Vec<RuleMeta>,
91    /// Deduplicated flattened DFA transition storage.
92    pub transitions: Vec<u32>,
93    /// Deduplicated flattened DFA accept storage.
94    pub accept: Vec<u32>,
95    /// Rules rejected during validation or dense-slot assignment.
96    pub rejected_rules: Vec<BatchRuleRejection>,
97    unique_storage: FxHashMap<[u8; 32], (u32, u32, u32)>,
98    occupied: Vec<bool>,
99    addressed: Vec<bool>,
100}
101
102/// Fingerprints for the valid dense catalog entries.
103#[must_use]
104pub fn accepted_rule_fingerprints(
105    rules: &[BatchRuleProgram],
106) -> (Vec<[u8; 32]>, Vec<BatchRuleRejection>) {
107    let mut fingerprints = Vec::new();
108    let mut occupied = Vec::new();
109    let mut addressed = Vec::new();
110    let rejections =
111        accepted_rule_fingerprints_into(rules, &mut fingerprints, &mut occupied, &mut addressed);
112    (fingerprints, rejections)
113}
114
115/// Fill caller-owned storage with fingerprints for valid dense catalog entries.
116///
117/// The output fingerprint order matches dense rule-table order, not input
118/// order. `fingerprints`, `occupied`, and `addressed` are cleared and reused so
119/// dispatchers can check resident catalog identity without allocating on every
120/// cache-hit dispatch.
121pub fn accepted_rule_fingerprints_into(
122    rules: &[BatchRuleProgram],
123    fingerprints: &mut Vec<[u8; 32]>,
124    occupied: &mut Vec<bool>,
125    addressed: &mut Vec<bool>,
126) -> Vec<BatchRuleRejection> {
127    let mut rejections = Vec::new();
128    accepted_rule_fingerprints_and_rejections_into(
129        rules,
130        fingerprints,
131        occupied,
132        addressed,
133        &mut rejections,
134    );
135    rejections
136}
137
138/// Fill caller-owned storage with fingerprints and rejection details for valid
139/// dense catalog entries.
140///
141/// This is the allocation-stable form used by hot dispatchers. All scratch
142/// vectors are cleared and reused; valid unchanged catalogs perform no host
143/// allocations while checking resident rule-buffer identity.
144pub fn accepted_rule_fingerprints_and_rejections_into(
145    rules: &[BatchRuleProgram],
146    fingerprints: &mut Vec<[u8; 32]>,
147    occupied: &mut Vec<bool>,
148    addressed: &mut Vec<bool>,
149    rejections: &mut Vec<BatchRuleRejection>,
150) {
151    fingerprints.clear();
152    fingerprints.resize(rules.len(), [0; 32]);
153    occupied.clear();
154    occupied.resize(rules.len(), false);
155    addressed.clear();
156    addressed.resize(rules.len(), false);
157    rejections.clear();
158
159    for rule in rules {
160        mark_addressed(addressed, rule.rule_idx);
161        match validate_rule_shape(
162            rule.rule_idx,
163            &rule.transitions,
164            &rule.accept,
165            rule.state_count,
166        ) {
167            Ok(()) => match claim_dense_index(occupied, rule.rule_idx, rules.len()) {
168                Ok(index) => fingerprints[index] = rule_fingerprint(rule),
169                Err(rejection) => rejections.push(rejection),
170            },
171            Err(error) => rejections.push(BatchRuleRejection {
172                rule_idx: Some(rule.rule_idx),
173                reason: error.to_string(),
174            }),
175        }
176    }
177
178    extend_missing_rejections(occupied, addressed, rejections);
179    let mut write = 0;
180    for read in 0..occupied.len() {
181        if occupied[read] {
182            fingerprints[write] = fingerprints[read];
183            write += 1;
184        }
185    }
186    fingerprints.truncate(write);
187}
188
189/// Pack valid DFA rules into compact shared device tables.
190///
191/// Rules with identical `(transitions, accept, state_count)` share backing
192/// transition and accept storage while retaining distinct dense metadata slots.
193pub fn pack_rule_catalog(rules: &[BatchRuleProgram]) -> Result<PackedRuleCatalog, PipelineError> {
194    let mut scratch = RuleCatalogPackingScratch::default();
195    pack_rule_catalog_into(rules, &mut scratch)?;
196    Ok(PackedRuleCatalog {
197        rule_meta: scratch.rule_meta,
198        transitions: scratch.transitions,
199        accept: scratch.accept,
200        rejected_rules: scratch.rejected_rules,
201    })
202}
203
204/// Pack valid DFA rules into caller-owned storage.
205///
206/// Existing vector and hash-map allocations in `scratch` are reused across
207/// calls. This is the hot-path form for resident megakernel dispatchers that
208/// refresh device rule buffers repeatedly.
209pub fn pack_rule_catalog_into(
210    rules: &[BatchRuleProgram],
211    scratch: &mut RuleCatalogPackingScratch,
212) -> Result<(), PipelineError> {
213    scratch.unique_storage.clear();
214    reserve_catalog_map(
215        &mut scratch.unique_storage,
216        rules.len(),
217        "unique DFA storage",
218    )?;
219    scratch.transitions.clear();
220    reserve_catalog_vec(
221        &mut scratch.transitions,
222        ALPHABET_SIZE_USIZE,
223        "inert transition row",
224    )?;
225    scratch.transitions.resize(ALPHABET_SIZE_USIZE, 0);
226    scratch.accept.clear();
227    reserve_catalog_vec(&mut scratch.accept, 1, "inert accept row")?;
228    scratch.accept.push(0);
229    scratch.rule_meta.clear();
230    reserve_catalog_vec(&mut scratch.rule_meta, rules.len(), "rule metadata")?;
231    scratch.rule_meta.resize(
232        rules.len(),
233        RuleMeta {
234            transition_base: 0,
235            accept_base: 0,
236            state_count: 1,
237        },
238    );
239    scratch.rejected_rules.clear();
240    reserve_catalog_vec(
241        &mut scratch.rejected_rules,
242        rules.len(),
243        "rule rejection rows",
244    )?;
245    scratch.occupied.clear();
246    reserve_catalog_vec(&mut scratch.occupied, rules.len(), "dense occupancy bitmap")?;
247    scratch.occupied.resize(rules.len(), false);
248    scratch.addressed.clear();
249    reserve_catalog_vec(
250        &mut scratch.addressed,
251        rules.len(),
252        "dense addressed bitmap",
253    )?;
254    scratch.addressed.resize(rules.len(), false);
255
256    for rule in rules {
257        mark_addressed(&mut scratch.addressed, rule.rule_idx);
258        if let Err(error) = validate_rule_shape(
259            rule.rule_idx,
260            &rule.transitions,
261            &rule.accept,
262            rule.state_count,
263        ) {
264            scratch.rejected_rules.push(BatchRuleRejection {
265                rule_idx: Some(rule.rule_idx),
266                reason: error.to_string(),
267            });
268            continue;
269        }
270
271        let meta_index = match claim_dense_index(
272            &mut scratch.occupied,
273            rule.rule_idx,
274            scratch.rule_meta.len(),
275        ) {
276            Ok(index) => index,
277            Err(rejection) => {
278                scratch.rejected_rules.push(rejection);
279                continue;
280            }
281        };
282
283        let storage_fingerprint = dfa_storage_fingerprint(rule);
284        let (transition_base, accept_base, state_count) = if let Some(layout) =
285            scratch.unique_storage.get(&storage_fingerprint)
286        {
287            *layout
288        } else {
289            let transition_base =
290                u32::try_from(scratch.transitions.len()).map_err(|_| PipelineError::QueueFull {
291                    queue: "submission",
292                    fix: "flattened transition table exceeds u32::MAX words; split the rule catalog into smaller groups",
293                })?;
294            let accept_base = u32::try_from(scratch.accept.len()).map_err(|_| PipelineError::QueueFull {
295                queue: "submission",
296                fix: "flattened accept table exceeds u32::MAX words; split the rule catalog into smaller groups",
297            })?;
298            let transition_target = scratch
299                .transitions
300                .len()
301                .checked_add(rule.transitions.len())
302                .ok_or(PipelineError::QueueFull {
303                    queue: "submission",
304                    fix: "flattened transition table length overflows usize; split the rule catalog into smaller groups",
305                })?;
306            reserve_catalog_vec(
307                &mut scratch.transitions,
308                transition_target,
309                "flattened transition storage",
310            )?;
311            let accept_target = scratch
312                .accept
313                .len()
314                .checked_add(rule.accept.len())
315                .ok_or(PipelineError::QueueFull {
316                    queue: "submission",
317                    fix: "flattened accept table length overflows usize; split the rule catalog into smaller groups",
318                })?;
319            reserve_catalog_vec(
320                &mut scratch.accept,
321                accept_target,
322                "flattened accept storage",
323            )?;
324            scratch.transitions.extend_from_slice(&rule.transitions);
325            scratch.accept.extend_from_slice(&rule.accept);
326            scratch.unique_storage.insert(
327                storage_fingerprint,
328                (transition_base, accept_base, rule.state_count),
329            );
330            (transition_base, accept_base, rule.state_count)
331        };
332        scratch.rule_meta[meta_index] = RuleMeta {
333            transition_base,
334            accept_base,
335            state_count,
336        };
337    }
338
339    extend_missing_rejections(
340        &scratch.occupied,
341        &scratch.addressed,
342        &mut scratch.rejected_rules,
343    );
344    Ok(())
345}
346
347fn validate_rule_shape(
348    rule_idx: u32,
349    transitions: &[u32],
350    accept: &[u32],
351    state_count: u32,
352) -> Result<(), PipelineError> {
353    let expected_transitions = usize::try_from(state_count)
354        .ok()
355        .and_then(|count| count.checked_mul(ALPHABET_SIZE_USIZE))
356        .ok_or_else(|| {
357            PipelineError::Backend("rule transition table size overflowed usize".to_string())
358        })?;
359    if transitions.len() != expected_transitions {
360        return Err(PipelineError::Backend(format!(
361            "rule {rule_idx} transition table has {} words, expected {expected_transitions}. Fix: compile a dense state_count * 256 DFA table before batch dispatch.",
362            transitions.len()
363        )));
364    }
365    let state_count_usize = usize::try_from(state_count).map_err(|source| {
366        PipelineError::Backend(format!(
367            "rule {rule_idx} state_count {state_count} cannot fit usize: {source}. Fix: shard the DFA state space before batch dispatch."
368        ))
369    })?;
370    if accept.len() != state_count_usize {
371        return Err(PipelineError::Backend(format!(
372            "rule {rule_idx} accept table has {} words, expected {state_count}. Fix: emit one accept entry per DFA state before batch dispatch.",
373            accept.len()
374        )));
375    }
376    Ok(())
377}
378
379fn rule_fingerprint(rule: &BatchRuleProgram) -> [u8; 32] {
380    let mut hasher = blake3::Hasher::new();
381    hasher.update(&rule.rule_idx.to_le_bytes());
382    hasher.update(bytemuck::cast_slice(&rule.transitions));
383    hasher.update(bytemuck::cast_slice(&rule.accept));
384    hasher.update(&rule.state_count.to_le_bytes());
385    *hasher.finalize().as_bytes()
386}
387
388fn dfa_storage_fingerprint(rule: &BatchRuleProgram) -> [u8; 32] {
389    let mut hasher = blake3::Hasher::new();
390    hasher.update(bytemuck::cast_slice(&rule.transitions));
391    hasher.update(bytemuck::cast_slice(&rule.accept));
392    hasher.update(&rule.state_count.to_le_bytes());
393    *hasher.finalize().as_bytes()
394}
395
396fn mark_addressed(addressed: &mut [bool], rule_idx: u32) {
397    if let Some(index) = usize::try_from(rule_idx)
398        .ok()
399        .filter(|index| *index < addressed.len())
400    {
401        addressed[index] = true;
402    }
403}
404
405fn claim_dense_index(
406    occupied: &mut [bool],
407    rule_idx: u32,
408    slot_count: usize,
409) -> Result<usize, BatchRuleRejection> {
410    let Some(meta_index) = usize::try_from(rule_idx).ok() else {
411        return Err(BatchRuleRejection {
412            rule_idx: Some(rule_idx),
413            reason: "rule_idx exceeds usize. Fix: rebuild the batch with a smaller rule catalog"
414                .to_string(),
415        });
416    };
417    if meta_index >= slot_count {
418        return Err(BatchRuleRejection {
419            rule_idx: Some(rule_idx),
420            reason: format!(
421                "rule_idx {rule_idx} falls outside 0..{slot_count}. Fix: keep the rule catalog dense so the batch work queue can address every rule"
422            ),
423        });
424    }
425    if occupied[meta_index] {
426        return Err(BatchRuleRejection {
427            rule_idx: Some(rule_idx),
428            reason: format!(
429                "duplicate rule_idx {rule_idx}. Fix: keep exactly one rule per dense rule-table slot"
430            ),
431        });
432    }
433    occupied[meta_index] = true;
434    Ok(meta_index)
435}
436
437fn extend_missing_rejections(
438    occupied: &[bool],
439    addressed: &[bool],
440    out: &mut Vec<BatchRuleRejection>,
441) {
442    for (rule_idx, (occupied, addressed)) in occupied
443        .iter()
444        .copied()
445        .zip(addressed.iter().copied())
446        .enumerate()
447    {
448        if !occupied && !addressed {
449            let rule_idx_u32 = u32::try_from(rule_idx).unwrap_or_else(|source| {
450                panic!(
451                    "rule catalog dense index {rule_idx} cannot fit u32: {source}. Fix: shard the rule catalog before rejection reporting."
452                )
453            });
454            out.push(BatchRuleRejection {
455                rule_idx: Some(rule_idx_u32),
456                reason: format!(
457                    "rule_idx {rule_idx} has no valid catalog entry. Fix: provide a well-formed DFA for every dense rule slot before batch dispatch"
458                ),
459            });
460        }
461    }
462}
463
464#[cfg(test)]
465
466mod tests {
467    use super::*;
468
469    #[test]
470    fn duplicate_dfas_share_catalog_storage() {
471        let first = BatchRuleProgram::new(0, vec![0; 256], vec![0], 1).unwrap();
472        let second = BatchRuleProgram::new(1, vec![0; 256], vec![0], 1).unwrap();
473        let packed = pack_rule_catalog(&[first, second]).unwrap();
474        assert_eq!(
475            packed.rule_meta[0].transition_base,
476            packed.rule_meta[1].transition_base
477        );
478        assert_eq!(
479            packed.rule_meta[0].accept_base,
480            packed.rule_meta[1].accept_base
481        );
482        assert_eq!(
483            packed.transitions.len(),
484            packed.rule_meta[0].transition_base as usize + ALPHABET_SIZE as usize
485        );
486        assert_eq!(
487            packed.accept.len(),
488            packed.rule_meta[0].accept_base as usize + 1
489        );
490        assert!(packed.rejected_rules.is_empty());
491    }
492
493    #[test]
494    fn duplicate_dfas_do_not_reserve_raw_duplicate_storage() {
495        let rules = (0..32)
496            .map(|rule_idx| BatchRuleProgram::new(rule_idx, vec![0; 256], vec![0], 1).unwrap())
497            .collect::<Vec<_>>();
498
499        let packed = pack_rule_catalog(&rules).unwrap();
500
501        assert_eq!(packed.transitions.len(), ALPHABET_SIZE as usize * 2);
502        assert!(
503            packed.transitions.capacity() < ALPHABET_SIZE as usize * rules.len(),
504            "Fix: duplicate DFA catalogs must not reserve memory as if every rule had unique transition storage."
505        );
506        assert_eq!(packed.accept.len(), 2);
507        assert!(
508            packed.accept.capacity() < rules.len(),
509            "Fix: duplicate DFA catalogs must not reserve accept storage for every duplicate rule."
510        );
511    }
512
513    #[test]
514    fn accepted_rule_fingerprints_into_reuses_caller_storage() {
515        let rules = (0..8)
516            .map(|rule_idx| BatchRuleProgram::new(rule_idx, vec![0; 256], vec![0], 1).unwrap())
517            .collect::<Vec<_>>();
518        let mut fingerprints = Vec::with_capacity(16);
519        let mut occupied = Vec::with_capacity(16);
520        let mut addressed = Vec::with_capacity(16);
521        let fingerprint_ptr = fingerprints.as_ptr();
522        let occupied_ptr = occupied.as_ptr();
523        let addressed_ptr = addressed.as_ptr();
524
525        let rejections = accepted_rule_fingerprints_into(
526            &rules,
527            &mut fingerprints,
528            &mut occupied,
529            &mut addressed,
530        );
531
532        assert!(rejections.is_empty());
533        assert_eq!(fingerprints.len(), rules.len());
534        assert_eq!(fingerprints.as_ptr(), fingerprint_ptr);
535        assert_eq!(occupied.as_ptr(), occupied_ptr);
536        assert_eq!(addressed.as_ptr(), addressed_ptr);
537    }
538
539    #[test]
540    fn invalid_rules_are_isolated_to_inert_catalog_entries() {
541        let valid = BatchRuleProgram::new(0, vec![0; 256], vec![1], 1).unwrap();
542        let invalid = BatchRuleProgram {
543            rule_idx: 1,
544            transitions: vec![0; 8],
545            accept: vec![0],
546            state_count: 1,
547        };
548
549        let packed = pack_rule_catalog(&[valid, invalid]).unwrap();
550        assert_eq!(packed.rejected_rules.len(), 1);
551        assert_eq!(packed.rejected_rules[0].rule_idx, Some(1));
552        assert_eq!(packed.rule_meta[0].state_count, 1);
553        assert!(packed.rule_meta[0].transition_base >= ALPHABET_SIZE);
554        assert_eq!(packed.rule_meta[1].transition_base, 0);
555        assert_eq!(packed.rule_meta[1].accept_base, 0);
556        assert_eq!(packed.rule_meta[1].state_count, 1);
557        assert_eq!(
558            &packed.transitions[..ALPHABET_SIZE as usize],
559            &vec![0; ALPHABET_SIZE as usize]
560        );
561        assert_eq!(packed.accept[0], 0);
562    }
563}
564