use vyre::{BackendError, DispatchConfig, VyreBackend};
use vyre_driver::Resource;
use vyre_foundation::ir::Program;
use vyre_foundation::match_result::Match;
use super::dispatch_io;
use super::mega_scan::{hit_buffer_byte_len, RulePipeline};
pub struct ResidentRulePipeline {
program: Program,
haystack: Resource,
transition: Resource,
epsilon: Resource,
hits: Resource,
haystack_capacity: usize,
max_matches: u32,
}
const _: () = {
const fn assert_send_sync<T: Send + Sync>() {}
let _ = assert_send_sync::<ResidentRulePipeline>;
};
impl RulePipeline {
pub fn prepare_resident(
&self,
backend: &dyn VyreBackend,
haystack_capacity_bytes: usize,
max_matches: u32,
) -> Result<ResidentRulePipeline, BackendError> {
let haystack_capacity = dispatch_io::haystack_padded_u32_byte_len(haystack_capacity_bytes)?;
let haystack = backend.allocate_resident(haystack_capacity)?;
let transition_bytes = dispatch_io::u32_words_as_le_bytes(&self.transition_table);
let transition = backend.allocate_resident(transition_bytes.len())?;
backend.upload_resident(&transition, transition_bytes.as_ref())?;
let epsilon_bytes = dispatch_io::u32_words_as_le_bytes(&self.epsilon_table);
let epsilon = backend.allocate_resident(epsilon_bytes.len())?;
backend.upload_resident(&epsilon, epsilon_bytes.as_ref())?;
let hit_capacity = hit_buffer_byte_len(max_matches)?;
let hits = backend.allocate_resident(hit_capacity)?;
Ok(ResidentRulePipeline {
program: self.program.clone(),
haystack,
transition,
epsilon,
hits,
haystack_capacity,
max_matches,
})
}
}
impl ResidentRulePipeline {
pub fn scan_into(
&self,
backend: &dyn VyreBackend,
haystack: &[u8],
matches: &mut Vec<Match>,
scratch: &mut Vec<u8>,
) -> Result<(), BackendError> {
self.scan_bounded_into(backend, haystack, u32::MAX, matches, scratch)
}
pub fn scan_bounded_into(
&self,
backend: &dyn VyreBackend,
haystack: &[u8],
max_scan_bytes: u32,
matches: &mut Vec<Match>,
scratch: &mut Vec<u8>,
) -> Result<(), BackendError> {
matches.clear();
let haystack_len = dispatch_io::scan_guard(
haystack,
"ResidentRulePipeline::scan",
dispatch_io::DEFAULT_MAX_SCAN_BYTES,
)?;
dispatch_io::pack_haystack_u32_into(haystack, scratch)?;
if scratch.len() > self.haystack_capacity {
return Err(BackendError::new(format!(
"ResidentRulePipeline haystack is {} packed byte(s) but the resident buffer holds {}. Fix: raise haystack_capacity_bytes in prepare_resident or shard the haystack.",
scratch.len(),
self.haystack_capacity
)));
}
backend.upload_resident_at(&self.haystack, 0, scratch)?;
backend.upload_resident_at(&self.hits, 0, &0u32.to_le_bytes())?;
let resources = [
self.haystack.clone(),
self.transition.clone(),
self.epsilon.clone(),
self.hits.clone(),
Resource::Borrowed(haystack_len.to_le_bytes().to_vec()),
Resource::Borrowed(max_scan_bytes.to_le_bytes().to_vec()),
];
let mut config = DispatchConfig::default();
config.grid_override = Some([haystack_len.max(1), 1, 1]);
let timed = backend.dispatch_resident_timed(&self.program, &resources, &config)?;
let hit_bytes =
dispatch_io::try_output_bytes(&timed.outputs, 0, "ResidentRulePipeline hit buffer")?;
let count = dispatch_io::try_read_u32_prefix(hit_bytes, "ResidentRulePipeline hit buffer")?;
if count > self.max_matches {
return Err(BackendError::new(format!(
"ResidentRulePipeline hit count {count} exceeds the resident cap {}. Fix: re-dispatch this batch through the per-batch-sized borrowed RulePipeline::scan (truncation would drop matches).",
self.max_matches
)));
}
dispatch_io::try_unpack_match_triples_exact_prefix_into(&hit_bytes[4..], count, matches)
}
#[must_use]
pub fn max_matches(&self) -> u32 {
self.max_matches
}
#[must_use]
pub fn haystack_capacity(&self) -> usize {
self.haystack_capacity
}
pub fn free(self, backend: &dyn VyreBackend) -> Result<(), BackendError> {
let mut first_err = None;
for resource in [self.haystack, self.transition, self.epsilon, self.hits] {
if let Err(error) = backend.free_resident(resource) {
first_err.get_or_insert(error);
}
}
first_err.map_or(Ok(()), Err)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Mutex;
use vyre::DispatchConfig as Config;
use vyre_driver::TimedDispatchResult;
use vyre_foundation::ir::Program;
struct MockResidentBackend {
next_id: AtomicU64,
allocations: Mutex<Vec<(u64, usize)>>,
full_uploads: AtomicUsize,
ranged_uploads: AtomicUsize,
hit_buffer: Vec<u8>,
}
impl MockResidentBackend {
fn new(hit_buffer: Vec<u8>) -> Self {
Self {
next_id: AtomicU64::new(1),
allocations: Mutex::new(Vec::new()),
full_uploads: AtomicUsize::new(0),
ranged_uploads: AtomicUsize::new(0),
hit_buffer,
}
}
}
impl vyre::backend::private::Sealed for MockResidentBackend {}
impl VyreBackend for MockResidentBackend {
fn id(&self) -> &'static str {
"mock-resident"
}
fn dispatch(
&self,
_program: &Program,
_inputs: &[Vec<u8>],
_config: &Config,
) -> Result<Vec<Vec<u8>>, BackendError> {
unreachable!("resident path does not use borrowed dispatch")
}
fn allocate_resident(&self, byte_len: usize) -> Result<Resource, BackendError> {
let handle = self.next_id.fetch_add(1, Ordering::Relaxed);
self.allocations
.lock()
.expect("mock allocations mutex")
.push((handle, byte_len));
Ok(Resource::Resident(handle))
}
fn upload_resident(&self, _resource: &Resource, _bytes: &[u8]) -> Result<(), BackendError> {
self.full_uploads.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn upload_resident_at(
&self,
_resource: &Resource,
_dst_offset_bytes: usize,
_bytes: &[u8],
) -> Result<(), BackendError> {
self.ranged_uploads.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn free_resident(&self, _resource: Resource) -> Result<(), BackendError> {
Ok(())
}
fn dispatch_resident_timed(
&self,
_program: &Program,
resources: &[Resource],
config: &Config,
) -> Result<TimedDispatchResult, BackendError> {
assert_eq!(resources.len(), 6, "nfa_scan binds six buffers");
assert!(
matches!(resources[1], Resource::Resident(_))
&& matches!(resources[2], Resource::Resident(_)),
"transition + epsilon tables must be resident, not re-uploaded"
);
assert!(
config.grid_override.is_some(),
"resident scan must supply candidate-start grid override"
);
Ok(TimedDispatchResult {
outputs: vec![self.hit_buffer.clone()],
wall_ns: 0,
device_ns: None,
enqueue_ns: None,
wait_ns: None,
})
}
}
fn hit_buffer_with(matches: &[(u32, u32, u32)]) -> Vec<u8> {
let mut bytes = Vec::with_capacity(4 + matches.len() * 12);
bytes.extend_from_slice(&(matches.len() as u32).to_le_bytes());
for &(pid, start, end) in matches {
bytes.extend_from_slice(&pid.to_le_bytes());
bytes.extend_from_slice(&start.to_le_bytes());
bytes.extend_from_slice(&end.to_le_bytes());
}
bytes
}
#[test]
fn prepare_resident_uploads_tables_once_then_scans_transfer_only_haystack() {
let pipeline = super::super::mega_scan::build(&["ab", "cd"], "input", "hits", 4096);
let canned = hit_buffer_with(&[(0, 1, 3), (1, 5, 7)]);
let backend = MockResidentBackend::new(canned);
let session = pipeline
.prepare_resident(&backend, 4096, 64)
.expect("mock backend supports resident allocation");
assert_eq!(backend.allocations.lock().unwrap().len(), 4);
assert_eq!(backend.full_uploads.load(Ordering::Relaxed), 2);
assert_eq!(backend.ranged_uploads.load(Ordering::Relaxed), 0);
let mut scratch = Vec::new();
let mut matches = Vec::new();
for _ in 0..3 {
session
.scan_into(&backend, b"zabqcd", &mut matches, &mut scratch)
.expect("resident scan decodes canned hits");
}
assert_eq!(matches, vec![Match::new(0, 1, 3), Match::new(1, 5, 7)]);
assert_eq!(
backend.full_uploads.load(Ordering::Relaxed),
2,
"tables re-uploaded mid-loop"
);
assert_eq!(
backend.ranged_uploads.load(Ordering::Relaxed),
6,
"3 scans × (haystack + counter reset)"
);
}
#[test]
fn scan_rejects_truncating_hit_count_instead_of_dropping_matches() {
let pipeline = super::super::mega_scan::build(&["ab"], "input", "hits", 64);
let mut canned = 9u32.to_le_bytes().to_vec();
canned.extend(std::iter::repeat(0u8).take(4 * 12)); let backend = MockResidentBackend::new(canned);
let session = pipeline
.prepare_resident(&backend, 64, 4)
.expect("prepare with a 4-match cap");
let mut scratch = Vec::new();
let mut matches = vec![Match::new(7, 7, 7)];
let err = session
.scan_into(&backend, b"ab", &mut matches, &mut scratch)
.expect_err("hit count over the resident cap must error, not truncate");
assert!(
err.to_string().contains("exceeds the resident cap") && matches.is_empty(),
"truncation guard must name the cap and expose no partial matches: {err}"
);
}
#[test]
fn scan_rejects_haystack_larger_than_resident_capacity() {
let pipeline = super::super::mega_scan::build(&["ab"], "input", "hits", 64);
let backend = MockResidentBackend::new(hit_buffer_with(&[]));
let session = pipeline
.prepare_resident(&backend, 16, 8)
.expect("prepare with a 16-byte haystack capacity");
let mut scratch = Vec::new();
let mut matches = Vec::new();
let err = session
.scan_into(&backend, &[b'a'; 64], &mut matches, &mut scratch)
.expect_err("64-byte haystack must not fit a 16-byte resident buffer");
assert!(
err.to_string().contains("resident buffer holds") && matches.is_empty(),
"capacity error must name the limit and expose no stale matches: {err}"
);
}
}