vyre-libs 0.6.4

vyre Category A library ecosystem - pure-IR compositions over vyre-ops hardware primitives
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
//! Resident-buffer dispatch for [`RulePipeline`] (the regex/NFA mega-scan path).
//!
//! # Why this exists
//!
//! [`RulePipeline::scan`](super::mega_scan::RulePipeline::scan) issues every
//! dispatch through `dispatch_borrowed`, which re-creates GPU buffers and
//! **re-uploads the lane-major NFA transition table on every call**. That table
//! is `num_states × 256 × LANES_PER_SUBGROUP` u32s — tens of MiB for a large
//! detector set — and it is *immutable* across scans of the same pipeline. A
//! consumer that scans many buffers (a directory walk coalesced into batches)
//! pays that multi-MiB host→device transfer once per batch even though only the
//! haystack and the hit buffer actually change.
//!
//! [`ResidentRulePipeline`] uploads the transition and epsilon tables **once**
//! into backend-resident resources and keeps them resident for the lifetime of
//! the session. Each [`scan`](ResidentRulePipeline::scan_into) then transfers
//! only the haystack (a ranged upload into the resident haystack buffer), a
//! 4-byte hit-counter reset, and the two 4-byte control values (haystack length
//! and the per-workgroup scan bound), dispatches against the resident tables, and
//! decodes the hit buffer — the per-scan transfer drops from `O(tables + haystack)`
//! to `O(haystack)`. This is the regex-path counterpart of
//! [`GpuLiteralSet::prepare_scan_dispatch`](super::literal_set::GpuLiteralSet::prepare_scan_dispatch).
//!
//! The match wire format is byte-identical to [`RulePipeline::scan`] (slot 0 =
//! atomic counter, then `(pattern_id, start, end)` triples), so a consumer can
//! swap the borrowed path for a resident session without changing any
//! post-processing — proven by the host-orchestration unit test below and the
//! CUDA backend parity test
//! (`vyre-driver-cuda/tests/resident_rule_pipeline_cuda_parity.rs`).
//!
//! # Backend support
//!
//! Resident dispatch requires a backend that implements the resident half of
//! the [`VyreBackend`] contract (`allocate_resident`, `upload_resident*`,
//! `dispatch_resident_timed`). The wgpu and CUDA backends do; the CPU reference
//! does not. [`RulePipeline::prepare_resident`] returns the backend's
//! `UnsupportedFeature` error **loudly** — the caller must handle it explicitly
//! (fail closed, or a loud/recorded fallback), never degrade silently.

use vyre::{BackendError, DispatchConfig, VyreBackend};
use vyre_driver::Resource;
use vyre_foundation::ir::Program;
use vyre_foundation::match_result::Match;

use super::dispatch_io;
use super::mega_scan::{hit_buffer_byte_len, RulePipeline};

/// A [`RulePipeline`] with its immutable NFA tables uploaded into
/// backend-resident resources, ready for repeated low-overhead scans.
///
/// Construct with [`RulePipeline::prepare_resident`]. The session owns four
/// resident resources (haystack, transition table, epsilon table, hit buffer);
/// call [`free`](Self::free) to release them, or drop the session and let the
/// backend reclaim them when its device context is torn down.
///
/// The session is `Send + Sync`: the resident handles are opaque ids and all
/// mutation happens through the borrowed `backend`, so a single session can be
/// shared across scan threads (each thread supplies its own packing scratch).
#[derive(Debug)]
pub struct ResidentRulePipeline {
    /// The pipeline's compiled GPU program (cheap to hold; the heavy tables are
    /// resident, not in this clone).
    program: Program,
    /// Resident haystack buffer, sized to `haystack_capacity` padded bytes.
    haystack: Resource,
    /// Resident lane-major transition table (immutable, uploaded once).
    transition: Resource,
    /// Resident lane-major epsilon table (immutable, uploaded once).
    epsilon: Resource,
    /// Resident hit buffer (`max_matches × 3 + 1` u32s); counter reset per scan.
    hits: Resource,
    /// Resident haystack-length control buffer (1 u32; re-uploaded per scan).
    haystack_len_buf: Resource,
    /// Resident max-scan-bytes control buffer (1 u32; re-uploaded per scan).
    max_scan_bytes_buf: Resource,
    /// Padded byte capacity of the resident haystack buffer.
    haystack_capacity: usize,
    /// Match cap this session's hit buffer was sized for.
    max_matches: u32,
}

// SAFETY mirror of the `RulePipeline`/`GpuLiteralSet` contract: `Resource`
// handles are plain ids and `Program` is `Send + Sync`.
const _: () = {
    const fn assert_send_sync<T: Send + Sync>() {}
    let _ = assert_send_sync::<ResidentRulePipeline>;
};

impl RulePipeline {
    /// Upload this pipeline's immutable NFA tables into backend-resident
    /// resources and return a [`ResidentRulePipeline`] for repeated scans.
    ///
    /// `haystack_capacity_bytes` is the largest haystack the session will scan
    /// (e.g. the consumer's coalesced-batch cap); the resident haystack buffer
    /// is allocated once at that padded size and every scan uploads only its
    /// real bytes. `max_matches` sizes the resident hit buffer and caps decoded
    /// matches, exactly as in [`RulePipeline::scan`].
    ///
    /// # Errors
    ///
    /// Returns [`BackendError`] when the backend does not support resident
    /// resources, or when allocation / upload of the resident tables fails. The
    /// caller must handle this loudly (fail closed or a recorded fallback) —
    /// never degrade silently.
    pub fn prepare_resident(
        &self,
        backend: &dyn VyreBackend,
        haystack_capacity_bytes: usize,
        max_matches: u32,
    ) -> Result<ResidentRulePipeline, BackendError> {
        let haystack_capacity = dispatch_io::haystack_padded_u32_byte_len(haystack_capacity_bytes)?;

        // Fail closed early if the resident haystack would be SMALLER than the NFA
        // program's static input-buffer decl. A resident dispatch binds this buffer
        // to the program input (binding 0); the CUDA backend rejects a resident
        // buffer whose byte length is below the binding's `static_byte_len`
        // (`vyre-driver-cuda/.../resident_dispatch/batch.rs`), but wgpu tolerates an
        // oversized/undersized buffer. Without this guard an undersized
        // `haystack_capacity_bytes` PASSES on wgpu (the dev backend) and fails only
        // at dispatch on CUDA (a downstream consumer's backend) with a confusing late
        // `InvalidProgram` — a dev-passes / prod-fails backend divergence. Surface
        // it here, portably, at prepare time. Reuses `BufferDecl::static_byte_len`,
        // the exact source CUDA checks against, so a runtime-sized input
        // (`count == 0` → `Ok(None)`) correctly imposes no requirement.
        if let Some(input_decl) = self.program.buffers().iter().find(|decl| decl.binding == 0) {
            if let Some(required_bytes) = input_decl.static_byte_len().map_err(BackendError::new)? {
                if haystack_capacity < required_bytes {
                    return Err(BackendError::new(format!(
                        "ResidentRulePipeline::prepare_resident: the NFA program's input buffer (binding 0) statically declares {required_bytes} byte(s) (input_len = {input_len}), but the requested resident haystack capacity is only {haystack_capacity} padded byte(s). A resident dispatch binds this buffer to the program input; the CUDA backend requires the resident buffer to be at least the static decl, so this would fail at dispatch with a confusing late error (wgpu would silently tolerate it). Fix: pass haystack_capacity_bytes >= {input_len} to prepare_resident, or rebuild the RulePipeline (build / build_rule_pipeline) with a smaller input_len.",
                        input_len = self.plan.input_len,
                    )));
                }
            }
        }

        let haystack = backend.allocate_resident(haystack_capacity)?;

        let transition_bytes = dispatch_io::u32_words_as_le_bytes(&self.transition_table);
        let transition = backend.allocate_resident(transition_bytes.len())?;
        backend.upload_resident(&transition, transition_bytes.as_ref())?;

        let epsilon_bytes = dispatch_io::u32_words_as_le_bytes(&self.epsilon_table);
        let epsilon = backend.allocate_resident(epsilon_bytes.len())?;
        backend.upload_resident(&epsilon, epsilon_bytes.as_ref())?;

        let hit_capacity = hit_buffer_byte_len(max_matches)?;
        let hits = backend.allocate_resident(hit_capacity)?;

        // The two 1-u32 control buffers (haystack_len, max_scan_bytes) are ALSO
        // resident. The CUDA backend's resident dispatch rejects any borrowed
        // resource (it resolves every binding to a resident handle), so a resident
        // dispatch must be ALL-resident — a borrowed-control mix works on wgpu but
        // fails closed on CUDA. They are allocated once here and re-uploaded per scan.
        let control_byte_len = std::mem::size_of::<u32>();
        let haystack_len_buf = backend.allocate_resident(control_byte_len)?;
        let max_scan_bytes_buf = backend.allocate_resident(control_byte_len)?;

        Ok(ResidentRulePipeline {
            program: self.program.clone(),
            haystack,
            transition,
            epsilon,
            hits,
            haystack_len_buf,
            max_scan_bytes_buf,
            haystack_capacity,
            max_matches,
        })
    }
}

impl ResidentRulePipeline {
    /// Scan `haystack` against the resident pipeline, decoding matches into
    /// caller-owned `matches`. Equivalent to [`RulePipeline::scan`] but with the
    /// NFA tables already resident (no per-scan table transfer).
    ///
    /// `scratch` reuses the packed-haystack staging buffer across calls; pass a
    /// per-thread `Vec` that lives as long as the scan loop.
    ///
    /// Walks every workgroup to end-of-haystack (`max_scan_bytes = u32::MAX`),
    /// matching [`RulePipeline::scan`]. Use [`scan_bounded_into`](Self::scan_bounded_into)
    /// to cap per-workgroup work to the longest possible match length.
    ///
    /// # Errors
    /// Returns [`BackendError`] on upload, dispatch, or readback failure, or
    /// when `haystack` exceeds the session's configured capacity.
    pub fn scan_into(
        &self,
        backend: &dyn VyreBackend,
        haystack: &[u8],
        matches: &mut Vec<Match>,
        scratch: &mut Vec<u8>,
    ) -> Result<(), BackendError> {
        self.scan_bounded_into(backend, haystack, u32::MAX, matches, scratch)
    }

    /// Per-workgroup-bounded resident scan. See [`RulePipeline::scan_bounded`]
    /// for the bound's semantics (O(N × max_scan_bytes) instead of O(N²)).
    ///
    /// # Errors
    /// Same as [`scan_into`](Self::scan_into).
    pub fn scan_bounded_into(
        &self,
        backend: &dyn VyreBackend,
        haystack: &[u8],
        max_scan_bytes: u32,
        matches: &mut Vec<Match>,
        scratch: &mut Vec<u8>,
    ) -> Result<(), BackendError> {
        matches.clear();
        let haystack_len = dispatch_io::scan_guard(
            haystack,
            "ResidentRulePipeline::scan",
            dispatch_io::DEFAULT_MAX_SCAN_BYTES,
        )?;

        // Stage the haystack into the resident buffer (real bytes only; the
        // kernel bounds its cursor with nfa_haystack_len so the stale tail of
        // the resident buffer is never read).
        dispatch_io::pack_haystack_u32_into(haystack, scratch)?;
        if scratch.len() > self.haystack_capacity {
            return Err(BackendError::new(format!(
                "ResidentRulePipeline haystack is {} packed byte(s) but the resident buffer holds {}. Fix: raise haystack_capacity_bytes in prepare_resident or shard the haystack.",
                scratch.len(),
                self.haystack_capacity
            )));
        }
        backend.upload_resident_at(&self.haystack, 0, scratch)?;

        // Reset only the atomic hit counter (slot 0). Triples are written from
        // slot 0 upward and only `count` of them are read back, so stale triples
        // beyond the new count are never observed — a 4-byte reset, not a full
        // hit-buffer clear.
        backend.upload_resident_at(&self.hits, 0, &0u32.to_le_bytes())?;

        // Stage the two 1-u32 control buffers into their resident slots. They MUST
        // be resident, not borrowed: the CUDA resident dispatch resolves every
        // binding to a resident handle and rejects a borrowed mix, so an all-resident
        // dispatch is the only form portable across wgpu AND CUDA. The upload copies
        // the source synchronously, so reusing the just-staged `scratch` above is
        // safe.
        backend.upload_resident_at(&self.haystack_len_buf, 0, &haystack_len.to_le_bytes())?;
        backend.upload_resident_at(&self.max_scan_bytes_buf, 0, &max_scan_bytes.to_le_bytes())?;

        // Buffer binding order MUST match `nfa::nfa_scan`'s BufferDecl order:
        // input(0), nfa_transition(1), nfa_epsilon(2), hits(3),
        // nfa_haystack_len(4), nfa_max_scan_bytes(5) — every binding resident.
        let resources = [
            self.haystack.clone(),
            self.transition.clone(),
            self.epsilon.clone(),
            self.hits.clone(),
            self.haystack_len_buf.clone(),
            self.max_scan_bytes_buf.clone(),
        ];

        let mut config = DispatchConfig::default();
        // Candidate-start parallelism: one workgroup per haystack byte, matching
        // `dispatch_io::candidate_start_dispatch_config`.
        config.grid_override = Some([haystack_len.max(1), 1, 1]);

        let timed = backend.dispatch_resident_timed(&self.program, &resources, &config)?;

        // The hit buffer is the program's only ReadWrite storage, returned at
        // output index 0 — identical decode to `RulePipeline::scan`.
        let hit_bytes =
            dispatch_io::try_output_bytes(&timed.outputs, 0, "ResidentRulePipeline hit buffer")?;
        let count = dispatch_io::try_read_u32_prefix(hit_bytes, "ResidentRulePipeline hit buffer")?;
        // Truncation guard: the resident hit buffer is fixed-size, so a batch
        // that overflows `max_matches` would silently drop matches (a false
        // negative). The shared capped decode surfaces it as an error so the
        // consumer degrades to a per-batch-sized borrowed dispatch instead.
        // Never decode a truncated set.
        dispatch_io::try_unpack_match_triples_capped_into(
            &hit_bytes[4..],
            count,
            self.max_matches,
            "ResidentRulePipeline hit buffer",
            matches,
        )
    }

    /// The match cap this session's resident hit buffer was sized for.
    #[must_use]
    pub fn max_matches(&self) -> u32 {
        self.max_matches
    }

    /// Padded byte capacity of the resident haystack buffer.
    #[must_use]
    pub fn haystack_capacity(&self) -> usize {
        self.haystack_capacity
    }

    /// Release every resident resource this session owns.
    ///
    /// Call this before the backend's device context is dropped to reclaim the
    /// resident allocations eagerly; otherwise they are reclaimed when the
    /// backend tears down. The session is consumed.
    ///
    /// # Errors
    /// Returns the first [`BackendError`] from freeing a resource; remaining
    /// resources are still attempted.
    pub fn free(self, backend: &dyn VyreBackend) -> Result<(), BackendError> {
        let mut first_err = None;
        for resource in [
            self.haystack,
            self.transition,
            self.epsilon,
            self.hits,
            self.haystack_len_buf,
            self.max_scan_bytes_buf,
        ] {
            if let Err(error) = backend.free_resident(resource) {
                first_err.get_or_insert(error);
            }
        }
        first_err.map_or(Ok(()), Err)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
    use std::sync::Mutex;
    use vyre::DispatchConfig as Config;
    use vyre_driver::TimedDispatchResult;
    use vyre_foundation::ir::Program;

    /// Mock backend that records resident traffic and returns a canned hit
    /// buffer, so the host orchestration (table-upload-once, per-scan haystack
    /// staging, counter reset, decode) is validated without a GPU. Real
    /// GPU resident-vs-borrowed parity is asserted in the downstream scanner crate
    /// where a live wgpu/CUDA backend is available. `VyreBackend` requires
    /// `Send + Sync`, so the counters use atomics/`Mutex`, not `RefCell`.
    struct MockResidentBackend {
        next_id: AtomicU64,
        /// (handle_id, byte_len) for every allocate_resident call.
        allocations: Mutex<Vec<(u64, usize)>>,
        /// Number of full uploads (table uploads) seen.
        full_uploads: AtomicUsize,
        /// Number of ranged uploads (haystack + counter resets) seen.
        ranged_uploads: AtomicUsize,
        /// Canned hit-buffer bytes returned at output index 0.
        hit_buffer: Vec<u8>,
    }

    impl MockResidentBackend {
        fn new(hit_buffer: Vec<u8>) -> Self {
            Self {
                next_id: AtomicU64::new(1),
                allocations: Mutex::new(Vec::new()),
                full_uploads: AtomicUsize::new(0),
                ranged_uploads: AtomicUsize::new(0),
                hit_buffer,
            }
        }
    }

    impl vyre::backend::private::Sealed for MockResidentBackend {}

    impl VyreBackend for MockResidentBackend {
        fn id(&self) -> &'static str {
            "mock-resident"
        }

        fn dispatch(
            &self,
            _program: &Program,
            _inputs: &[Vec<u8>],
            _config: &Config,
        ) -> Result<Vec<Vec<u8>>, BackendError> {
            unreachable!("resident path does not use borrowed dispatch")
        }

        fn allocate_resident(&self, byte_len: usize) -> Result<Resource, BackendError> {
            let handle = self.next_id.fetch_add(1, Ordering::Relaxed);
            self.allocations
                .lock()
                .expect("mock allocations mutex")
                .push((handle, byte_len));
            Ok(Resource::Resident(handle))
        }

        fn upload_resident(&self, _resource: &Resource, _bytes: &[u8]) -> Result<(), BackendError> {
            self.full_uploads.fetch_add(1, Ordering::Relaxed);
            Ok(())
        }

        fn upload_resident_at(
            &self,
            _resource: &Resource,
            _dst_offset_bytes: usize,
            _bytes: &[u8],
        ) -> Result<(), BackendError> {
            self.ranged_uploads.fetch_add(1, Ordering::Relaxed);
            Ok(())
        }

        fn free_resident(&self, _resource: Resource) -> Result<(), BackendError> {
            Ok(())
        }

        fn dispatch_resident_timed(
            &self,
            _program: &Program,
            resources: &[Resource],
            config: &Config,
        ) -> Result<TimedDispatchResult, BackendError> {
            // Contract checks the consumer relies on:
            assert_eq!(resources.len(), 6, "nfa_scan binds six buffers");
            // EVERY binding must be resident — the CUDA resident dispatch rejects a
            // borrowed-resource mix, so no binding (not even the two 1-u32 control
            // buffers) may be Borrowed.
            for (idx, resource) in resources.iter().enumerate() {
                assert!(
                    matches!(resource, Resource::Resident(_)),
                    "binding {idx} must be resident (no borrowed mix in a resident dispatch)"
                );
            }
            assert!(
                config.grid_override.is_some(),
                "resident scan must supply candidate-start grid override"
            );
            Ok(TimedDispatchResult {
                outputs: vec![self.hit_buffer.clone()],
                wall_ns: 0,
                device_ns: None,
                enqueue_ns: None,
                wait_ns: None,
            })
        }
    }

    fn hit_buffer_with(matches: &[(u32, u32, u32)]) -> Vec<u8> {
        let mut bytes = Vec::with_capacity(4 + matches.len() * 12);
        bytes.extend_from_slice(&(matches.len() as u32).to_le_bytes());
        for &(pid, start, end) in matches {
            bytes.extend_from_slice(&pid.to_le_bytes());
            bytes.extend_from_slice(&start.to_le_bytes());
            bytes.extend_from_slice(&end.to_le_bytes());
        }
        bytes
    }

    #[test]
    fn prepare_resident_uploads_tables_once_then_scans_transfer_only_haystack() {
        let pipeline = super::super::mega_scan::build(&["ab", "cd"], "input", "hits", 4096);
        let canned = hit_buffer_with(&[(0, 1, 3), (1, 5, 7)]);
        let backend = MockResidentBackend::new(canned);

        let session = pipeline
            .prepare_resident(&backend, 4096, 64)
            .expect("mock backend supports resident allocation");

        // Six resident allocations: haystack, transition, epsilon, hits, and the two
        // 1-u32 control buffers (haystack_len, max_scan_bytes).
        assert_eq!(backend.allocations.lock().unwrap().len(), 6);
        // The two immutable tables are uploaded exactly once, at prepare time.
        assert_eq!(backend.full_uploads.load(Ordering::Relaxed), 2);
        assert_eq!(backend.ranged_uploads.load(Ordering::Relaxed), 0);

        let mut scratch = Vec::new();
        let mut matches = Vec::new();
        for _ in 0..3 {
            session
                .scan_into(&backend, b"zabqcd", &mut matches, &mut scratch)
                .expect("resident scan decodes canned hits");
        }

        // Decode parity: canned triples surface byte-identically to the borrowed
        // path's `Match` decode.
        assert_eq!(matches, vec![Match::new(0, 1, 3), Match::new(1, 5, 7)]);
        // No further full uploads after prepare; each scan does exactly four ranged
        // uploads (haystack stage + counter reset + haystack_len + max_scan_bytes) —
        // the immutable tables never move again.
        assert_eq!(
            backend.full_uploads.load(Ordering::Relaxed),
            2,
            "tables re-uploaded mid-loop"
        );
        assert_eq!(
            backend.ranged_uploads.load(Ordering::Relaxed),
            12,
            "3 scans × (haystack + counter reset + haystack_len + max_scan_bytes)"
        );
    }

    #[test]
    fn scan_rejects_truncating_hit_count_instead_of_dropping_matches() {
        let pipeline = super::super::mega_scan::build(&["ab"], "input", "hits", 64);
        // Canned counter says 9 hits but the session was sized for 4 — decoding
        // would silently drop 5. The guard must error so the caller degrades.
        let mut canned = 9u32.to_le_bytes().to_vec();
        canned.extend(std::iter::repeat(0u8).take(4 * 12)); // only 4 triples present
        let backend = MockResidentBackend::new(canned);
        let session = pipeline
            .prepare_resident(&backend, 64, 4)
            .expect("prepare with a 4-match cap");

        let mut scratch = Vec::new();
        let mut matches = vec![Match::new(7, 7, 7)];
        let err = session
            .scan_into(&backend, b"ab", &mut matches, &mut scratch)
            .expect_err("hit count over the resident cap must error, not truncate");
        let msg = err.to_string();
        assert!(
            msg.contains("exceeds the output-buffer cap 4")
                && msg.contains("drop 5 match(es)")
                && matches.is_empty(),
            "truncation guard must name the cap, the dropped count, and expose no partial matches: {err}"
        );
    }

    #[test]
    fn scan_rejects_haystack_larger_than_resident_capacity() {
        // Build the program's static input decl to match the resident capacity (16
        // bytes) so the prepare-time decl guard passes; the scan-time guard below is
        // what this test exercises (a 64-byte haystack into a 16-byte buffer).
        let pipeline = super::super::mega_scan::build(&["ab"], "input", "hits", 16);
        let backend = MockResidentBackend::new(hit_buffer_with(&[]));
        let session = pipeline
            .prepare_resident(&backend, 16, 8)
            .expect("prepare with a 16-byte haystack capacity");

        let mut scratch = Vec::new();
        let mut matches = Vec::new();
        let err = session
            .scan_into(&backend, &[b'a'; 64], &mut matches, &mut scratch)
            .expect_err("64-byte haystack must not fit a 16-byte resident buffer");
        assert!(
            err.to_string().contains("resident buffer holds") && matches.is_empty(),
            "capacity error must name the limit and expose no stale matches: {err}"
        );
    }

    #[test]
    fn prepare_resident_rejects_capacity_below_static_input_decl() {
        // The NFA program declares a STATIC input buffer of input_len=64 → 16 u32
        // words → 64 bytes (binding 0). The CUDA backend rejects a resident input
        // buffer smaller than that static decl; wgpu silently tolerates it. The
        // prepare-time guard turns that dev-passes / prod-fails divergence into one
        // portable, actionable error BEFORE any allocation.
        let pipeline = super::super::mega_scan::build(&["ab"], "input", "hits", 64);
        let backend = MockResidentBackend::new(hit_buffer_with(&[]));

        // Under-capacity (16 < 64): must fail closed at prepare time, naming the
        // declared bytes, the input_len, the requested capacity, and the fix — and
        // it must NOT have allocated anything (the guard precedes allocation).
        let err = pipeline
            .prepare_resident(&backend, 16, 8)
            .expect_err("capacity 16 below the 64-byte static input decl must be rejected");
        let msg = err.to_string();
        assert!(
            msg.contains("statically declares 64 byte(s)")
                && msg.contains("input_len = 64")
                && msg.contains("only 16 padded byte(s)")
                && msg.contains("Fix: pass haystack_capacity_bytes >= 64"),
            "prepare guard must name decl bytes, input_len, capacity, and the fix: {msg}"
        );
        assert_eq!(
            backend.allocations.lock().unwrap().len(),
            0,
            "guard must fail BEFORE allocating any resident resource"
        );

        // Boundary (capacity == decl == 64): allowed. CUDA requires byte_len >= decl.
        pipeline
            .prepare_resident(&backend, 64, 8)
            .expect("capacity exactly equal to the static input decl must prepare");
        assert_eq!(
            backend.allocations.lock().unwrap().len(),
            6,
            "the boundary-equal prepare allocates all six resident buffers"
        );

        // Over-capacity (128 > 64): allowed — the CUDA contract is `>= decl`, not
        // exact equality, so a larger resident haystack buffer is valid.
        pipeline
            .prepare_resident(&backend, 128, 8)
            .expect("capacity above the static input decl must prepare (CUDA allows >= decl)");
    }
}