use vyre::{BackendError, VyreBackend};
use vyre_driver::{Resource, TimedDispatchResult};
use vyre_foundation::ir::Program;
use super::dispatch_io;
use super::literal_set::{decode_presence_words_into, GpuLiteralSet};
const U32_BYTES: usize = std::mem::size_of::<u32>();
const PRESENCE_BY_REGION_BINDINGS: usize = 12;
#[derive(Debug)]
pub struct ResidentPresencePipeline {
program: Program,
haystack: Resource,
transitions: Resource,
output_offsets: Resource,
output_records: Resource,
pattern_lengths: Resource,
presence: Resource,
candidate_end_mask: Resource,
candidate_suffix2_mask: Resource,
candidate_suffix3_bloom: Resource,
haystack_len_buf: Resource,
region_starts_buf: Resource,
region_base_buf: Resource,
haystack_capacity: usize,
max_regions: u32,
pattern_count: u32,
presence_words: u32,
workgroup_x: u32,
}
const _: () = {
const fn assert_send_sync<T: Send + Sync>() {}
let _ = assert_send_sync::<ResidentPresencePipeline>;
};
impl GpuLiteralSet {
pub fn prepare_resident_presence(
&self,
backend: &dyn VyreBackend,
haystack_capacity_bytes: usize,
max_regions: u32,
) -> Result<ResidentPresencePipeline, BackendError> {
let tables = self.resident_presence_tables(max_regions)?;
let haystack_capacity = dispatch_io::haystack_padded_u32_byte_len(haystack_capacity_bytes)?;
let haystack = backend.allocate_resident(haystack_capacity)?;
let transitions = allocate_and_upload(backend, &tables.transitions)?;
let output_offsets = allocate_and_upload(backend, &tables.output_offsets)?;
let output_records = allocate_and_upload(backend, &tables.output_records)?;
let pattern_lengths = allocate_and_upload(backend, &tables.pattern_lengths)?;
let candidate_end_mask = allocate_and_upload(backend, &tables.candidate_end_mask)?;
let candidate_suffix2_mask =
allocate_and_upload(backend, &tables.candidate_suffix2_mask)?;
let candidate_suffix3_bloom =
allocate_and_upload(backend, &tables.candidate_suffix3_bloom)?;
let presence_capacity_words = (max_regions as usize)
.checked_mul(tables.presence_words as usize)
.ok_or_else(|| {
BackendError::new(format!(
"resident region-presence capacity {max_regions} regions × {} words/region overflows host usize. Fix: lower max_regions or shard the pattern set.",
tables.presence_words
))
})?;
let presence_capacity_bytes = presence_capacity_words
.checked_mul(U32_BYTES)
.ok_or_else(|| {
BackendError::new(
"resident region-presence presence-buffer byte capacity overflows host usize. Fix: lower max_regions or shard the pattern set.".to_string(),
)
})?;
let presence = backend.allocate_resident(presence_capacity_bytes)?;
let region_starts_capacity_bytes =
(max_regions as usize).checked_mul(U32_BYTES).ok_or_else(|| {
BackendError::new(
"resident region-presence region-starts byte capacity overflows host usize. Fix: lower max_regions.".to_string(),
)
})?;
let haystack_len_buf = backend.allocate_resident(U32_BYTES)?;
let region_starts_buf = backend.allocate_resident(region_starts_capacity_bytes)?;
let region_base_buf = backend.allocate_resident(U32_BYTES)?;
Ok(ResidentPresencePipeline {
program: tables.program,
haystack,
transitions,
output_offsets,
output_records,
pattern_lengths,
presence,
candidate_end_mask,
candidate_suffix2_mask,
candidate_suffix3_bloom,
haystack_len_buf,
region_starts_buf,
region_base_buf,
haystack_capacity,
max_regions,
pattern_count: tables.pattern_count,
presence_words: tables.presence_words,
workgroup_x: tables.workgroup_x,
})
}
}
fn allocate_and_upload(backend: &dyn VyreBackend, bytes: &[u8]) -> Result<Resource, BackendError> {
let resource = backend.allocate_resident(bytes.len())?;
backend.upload_resident(&resource, bytes)?;
Ok(resource)
}
impl ResidentPresencePipeline {
pub fn scan_into(
&self,
backend: &dyn VyreBackend,
haystack: &[u8],
region_starts: &[u32],
region_base: u32,
out: &mut Vec<u32>,
scratch: &mut Vec<u8>,
) -> Result<(), BackendError> {
self.scan_into_timed(backend, haystack, region_starts, region_base, out, scratch)?;
Ok(())
}
pub fn scan_into_timed(
&self,
backend: &dyn VyreBackend,
haystack: &[u8],
region_starts: &[u32],
region_base: u32,
out: &mut Vec<u32>,
scratch: &mut Vec<u8>,
) -> Result<TimedDispatchResult, BackendError> {
out.clear();
let region_count = u32::try_from(region_starts.len()).map_err(|_| {
BackendError::new(
"resident region-presence: region count exceeds u32 GPU ABI".to_string(),
)
})?;
if region_count == 0 {
return Err(BackendError::new(
"resident region-presence: region_starts must be non-empty. Fix: pass one start offset per coalesced file, beginning with 0.".to_string(),
));
}
if region_starts[0] != 0 {
return Err(BackendError::new(
"resident region-presence: region_starts[0] must be 0 (the kernel binary-search lower bound). Fix: the first coalesced file must start at offset 0.".to_string(),
));
}
if region_count > self.max_regions {
return Err(BackendError::new(format!(
"resident region-presence batch has {region_count} regions but the session was prepared for at most {}. Fix: raise max_regions in prepare_resident_presence, or dispatch this batch through the per-batch-sized borrowed GpuLiteralSet::scan_presence_by_region (a larger cap would index past the resident presence buffer).",
self.max_regions
)));
}
let haystack_len = dispatch_io::scan_guard(
haystack,
"ResidentPresencePipeline::scan",
dispatch_io::DEFAULT_MAX_SCAN_BYTES,
)?;
dispatch_io::pack_haystack_u32_into(haystack, scratch)?;
if scratch.len() > self.haystack_capacity {
return Err(BackendError::new(format!(
"ResidentPresencePipeline haystack is {} packed byte(s) but the resident buffer holds {}. Fix: raise haystack_capacity_bytes in prepare_resident_presence or shard the haystack.",
scratch.len(),
self.haystack_capacity
)));
}
backend.upload_resident_at(&self.haystack, 0, scratch)?;
let used_words = (region_count as usize)
.checked_mul(self.presence_words as usize)
.ok_or_else(|| {
BackendError::new(
"resident region-presence used-word count overflows host usize. Fix: lower the region count or shard the pattern set.".to_string(),
)
})?;
let reset_bytes = used_words.checked_mul(U32_BYTES).ok_or_else(|| {
BackendError::new(
"resident region-presence presence-reset byte count overflows host usize. Fix: lower the region count or shard the pattern set.".to_string(),
)
})?;
scratch.clear();
scratch.resize(reset_bytes, 0);
backend.upload_resident_at(&self.presence, 0, scratch)?;
backend.upload_resident_at(&self.haystack_len_buf, 0, &haystack_len.to_le_bytes())?;
backend.upload_resident_at(&self.region_base_buf, 0, ®ion_base.to_le_bytes())?;
scratch.clear();
let region_starts_words = self.max_regions as usize;
scratch.reserve(region_starts_words.saturating_mul(U32_BYTES));
for &start in region_starts {
scratch.extend_from_slice(&start.to_le_bytes());
}
for _ in (region_count as usize)..region_starts_words {
scratch.extend_from_slice(&u32::MAX.to_le_bytes());
}
backend.upload_resident_at(&self.region_starts_buf, 0, scratch)?;
let resources = [
self.haystack.clone(), self.transitions.clone(), self.output_offsets.clone(), self.output_records.clone(), self.pattern_lengths.clone(), self.haystack_len_buf.clone(), self.presence.clone(), self.candidate_end_mask.clone(), self.candidate_suffix2_mask.clone(), self.candidate_suffix3_bloom.clone(), self.region_starts_buf.clone(), self.region_base_buf.clone(), ];
debug_assert_eq!(resources.len(), PRESENCE_BY_REGION_BINDINGS);
let config = dispatch_io::byte_scan_dispatch_config(haystack_len, self.workgroup_x);
let timed = backend.dispatch_resident_timed(&self.program, &resources, &config)?;
let presence_bytes = dispatch_io::try_output_bytes(
&timed.outputs,
0,
"ResidentPresencePipeline presence buffer",
)?;
decode_presence_words_into(presence_bytes, used_words, out);
if out.len() != used_words {
let returned = out.len();
out.clear();
return Err(BackendError::new(format!(
"ResidentPresencePipeline presence readback returned {returned} u32 word(s) but the {region_count}-region scan needs {used_words}. Fix: ensure the backend reads back the full binding-6 presence resource."
)));
}
Ok(timed)
}
#[must_use]
pub fn max_regions(&self) -> u32 {
self.max_regions
}
#[must_use]
pub fn pattern_count(&self) -> u32 {
self.pattern_count
}
#[must_use]
pub fn presence_words(&self) -> u32 {
self.presence_words
}
#[must_use]
pub fn haystack_capacity(&self) -> usize {
self.haystack_capacity
}
pub fn free(self, backend: &dyn VyreBackend) -> Result<(), BackendError> {
let mut first_err = None;
for resource in [
self.haystack,
self.transitions,
self.output_offsets,
self.output_records,
self.pattern_lengths,
self.presence,
self.candidate_end_mask,
self.candidate_suffix2_mask,
self.candidate_suffix3_bloom,
self.haystack_len_buf,
self.region_starts_buf,
self.region_base_buf,
] {
if let Err(error) = backend.free_resident(resource) {
first_err.get_or_insert(error);
}
}
first_err.map_or(Ok(()), Err)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeSet;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Mutex;
use vyre::DispatchConfig as Config;
use vyre_driver::TimedDispatchResult;
const LITERALS: &[&[u8]] = &[
b"key", b"token", b"secret", b"AKIA", b"ghp_", b"sk_live_", b"password", b"api",
];
struct MockResidentBackend {
next_id: AtomicU64,
allocations: Mutex<Vec<(u64, usize)>>,
full_uploads: AtomicUsize,
ranged_uploads: AtomicUsize,
ranged_upload_lens: Mutex<Vec<usize>>,
presence_buffer: Vec<u8>,
}
impl MockResidentBackend {
fn new(presence_buffer: Vec<u8>) -> Self {
Self {
next_id: AtomicU64::new(1),
allocations: Mutex::new(Vec::new()),
full_uploads: AtomicUsize::new(0),
ranged_uploads: AtomicUsize::new(0),
ranged_upload_lens: Mutex::new(Vec::new()),
presence_buffer,
}
}
}
impl vyre::backend::private::Sealed for MockResidentBackend {}
impl VyreBackend for MockResidentBackend {
fn id(&self) -> &'static str {
"mock-resident-presence"
}
fn dispatch(
&self,
_program: &Program,
_inputs: &[Vec<u8>],
_config: &Config,
) -> Result<Vec<Vec<u8>>, BackendError> {
unreachable!("resident path does not use borrowed dispatch")
}
fn allocate_resident(&self, byte_len: usize) -> Result<Resource, BackendError> {
let handle = self.next_id.fetch_add(1, Ordering::Relaxed);
self.allocations
.lock()
.expect("mock allocations mutex")
.push((handle, byte_len));
Ok(Resource::Resident(handle))
}
fn upload_resident(&self, _resource: &Resource, _bytes: &[u8]) -> Result<(), BackendError> {
self.full_uploads.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn upload_resident_at(
&self,
_resource: &Resource,
_dst_offset_bytes: usize,
bytes: &[u8],
) -> Result<(), BackendError> {
self.ranged_uploads.fetch_add(1, Ordering::Relaxed);
self.ranged_upload_lens
.lock()
.expect("mock ranged-upload mutex")
.push(bytes.len());
Ok(())
}
fn free_resident(&self, _resource: Resource) -> Result<(), BackendError> {
Ok(())
}
fn dispatch_resident_timed(
&self,
_program: &Program,
resources: &[Resource],
config: &Config,
) -> Result<TimedDispatchResult, BackendError> {
assert_eq!(
resources.len(),
PRESENCE_BY_REGION_BINDINGS,
"region-presence binds twelve buffers"
);
for idx in 0..PRESENCE_BY_REGION_BINDINGS {
assert!(
matches!(resources[idx], Resource::Resident(_)),
"binding {idx} must be resident (no borrowed mix in a resident dispatch)"
);
}
assert!(
config.grid_override.is_some(),
"resident region-presence scan must supply a byte-scan grid override"
);
Ok(TimedDispatchResult {
outputs: vec![self.presence_buffer.clone()],
wall_ns: 0,
device_ns: None,
enqueue_ns: None,
wait_ns: None,
})
}
}
fn present_ids(word: u32, pattern_count: u32) -> BTreeSet<u32> {
(0..pattern_count).filter(|&p| (word >> p) & 1 == 1).collect()
}
#[test]
fn prepare_uploads_tables_once_then_scans_transfer_only_haystack_and_reset() {
let matcher = GpuLiteralSet::compile(LITERALS);
let pattern_count = LITERALS.len() as u32;
assert_eq!(pattern_count, 8);
let row0 = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3) | (1 << 7); let row1 = (1 << 4) | (1 << 5) | (1 << 6); let row2 = 0u32; let stale = 0xDEAD_BEEFu32;
let mut canned = Vec::new();
for w in [row0, row1, row2, stale] {
canned.extend_from_slice(&w.to_le_bytes());
}
let backend = MockResidentBackend::new(canned);
let session = matcher
.prepare_resident_presence(&backend, 4096, 4)
.expect("mock backend supports resident allocation");
{
let allocs = backend.allocations.lock().unwrap();
assert_eq!(allocs.len(), 12, "haystack + 7 tables + presence + 3 controls");
assert_eq!(allocs[8].1, 4 * 1 * U32_BYTES, "presence sized for max_regions");
assert_eq!(allocs[9].1, U32_BYTES, "haystack_len control is one u32");
assert_eq!(allocs[10].1, 4 * U32_BYTES, "region_starts sized for max_regions");
assert_eq!(allocs[11].1, U32_BYTES, "region_base control is one u32");
}
assert_eq!(
backend.full_uploads.load(Ordering::Relaxed),
7,
"seven immutable tables uploaded once each"
);
assert_eq!(backend.ranged_uploads.load(Ordering::Relaxed), 0);
let haystack = b"aaa\nbbbb\nccc\n";
let region_starts = [0u32, 4, 9];
let mut out = Vec::new();
let mut scratch = Vec::new();
for _ in 0..3 {
session
.scan_into(&backend, haystack, ®ion_starts, 0, &mut out, &mut scratch)
.expect("resident region-presence scan decodes canned bitmap");
}
assert_eq!(out, vec![row0, row1, row2], "3 regions × 1 word, stale tail ignored");
assert_eq!(present_ids(out[0], pattern_count), BTreeSet::from([0, 1, 2, 3, 7]));
assert_eq!(present_ids(out[1], pattern_count), BTreeSet::from([4, 5, 6]));
assert_eq!(present_ids(out[2], pattern_count), BTreeSet::new());
assert_eq!(
backend.full_uploads.load(Ordering::Relaxed),
7,
"immutable tables re-uploaded mid-loop"
);
assert_eq!(
backend.ranged_uploads.load(Ordering::Relaxed),
15,
"3 scans × 5 ranged uploads (haystack, presence reset, haystack_len, region_base, region_starts)"
);
let lens = backend.ranged_upload_lens.lock().unwrap();
let nth_of_each_scan = |offset: usize| -> Vec<usize> {
lens.iter().skip(offset).step_by(5).copied().collect()
};
assert_eq!(
nth_of_each_scan(1),
vec![12, 12, 12],
"each presence reset zeroes only the 3-region used prefix"
);
assert_eq!(
nth_of_each_scan(2),
vec![U32_BYTES, U32_BYTES, U32_BYTES],
"haystack_len control is one u32 per scan"
);
assert_eq!(
nth_of_each_scan(3),
vec![U32_BYTES, U32_BYTES, U32_BYTES],
"region_base control is one u32 per scan"
);
assert_eq!(
nth_of_each_scan(4),
vec![4 * U32_BYTES, 4 * U32_BYTES, 4 * U32_BYTES],
"region_starts is uploaded padded to the full max_regions width every scan"
);
}
#[test]
fn scan_rejects_region_count_over_the_max_regions_cap() {
let matcher = GpuLiteralSet::compile(LITERALS);
let backend = MockResidentBackend::new(vec![0u8; 4]);
let session = matcher
.prepare_resident_presence(&backend, 4096, 2)
.expect("prepare with a 2-region cap");
let haystack = b"a\nb\nc\n";
let region_starts = [0u32, 2, 4]; let mut out = vec![999];
let mut scratch = Vec::new();
let err = session
.scan_into(&backend, haystack, ®ion_starts, 0, &mut out, &mut scratch)
.expect_err("a batch over the resident region cap must error, not truncate");
assert!(
err.to_string().contains("session was prepared for at most 2") && out.is_empty(),
"cap error must name the limit and expose no partial bitmap: {err}"
);
assert_eq!(
backend.ranged_uploads.load(Ordering::Relaxed),
0,
"rejected batch must not stage any resident upload"
);
}
#[test]
fn scan_rejects_haystack_larger_than_resident_capacity() {
let matcher = GpuLiteralSet::compile(LITERALS);
let backend = MockResidentBackend::new(vec![0u8; 4]);
let session = matcher
.prepare_resident_presence(&backend, 8, 4)
.expect("prepare with an 8-byte haystack capacity");
let mut out = Vec::new();
let mut scratch = Vec::new();
let region_starts = [0u32];
let err = session
.scan_into(&backend, &[b'a'; 64], ®ion_starts, 0, &mut out, &mut scratch)
.expect_err("64-byte haystack must not fit an 8-byte resident buffer");
assert!(
err.to_string().contains("resident buffer holds") && out.is_empty(),
"capacity error must name the limit and expose no stale bitmap: {err}"
);
}
#[test]
fn prepare_rejects_zero_max_regions() {
let matcher = GpuLiteralSet::compile(LITERALS);
let backend = MockResidentBackend::new(vec![0u8; 4]);
let err = matcher
.prepare_resident_presence(&backend, 4096, 0)
.expect_err("max_regions = 0 cannot size the presence buffer");
assert!(
err.to_string().contains("max_regions must be >= 1"),
"zero-cap error must explain the cause: {err}"
);
}
}