Skip to main content

difflib_fast/
rationer.rs

1//! `Rationer` — stateful similarity / clustering handle with explicit backend control.
2//!
3//! The free-function API (`ratio`, `cluster_canonicals`, …) is stateless and rebuilds every per-call
4//! resource. That's fine for one-shot calls, but for callers that issue many clustering jobs in a
5//! tight loop (e.g. `find-dup-defs` runs `cluster_canonicals` once per same-name group, often a few
6//! thousand times per repo) the per-call setup — Metal device handshake, IOPM assertion, thread QoS,
7//! GPU warmup dispatch — adds up to seconds. A `Rationer` constructs that state once at `new()` and
8//! reuses it across every `.cluster_canonicals(…)` call.
9//!
10//! Whether any given call routes through the GPU or stays on CPU is an internal decision; callers
11//! don't pass GPU flags, don't import the `gpu` module, and don't change with feature gates. On
12//! systems without Metal (`cfg(not(target_os = "macos"))` or the `gpu` feature off), the GPU side is
13//! simply absent and every call goes to CPU — same behaviour, same output.
14//!
15//! ## Usage
16//!
17//! ```ignore
18//! let r = difflib_fast::Rationer::new();
19//! for group in groups {
20//!     let clusters = r.cluster_canonicals(&group.canonicals, 0.5);
21//!     // ...
22//! }
23//! ```
24//!
25//! Pass `&Rationer` across rayon threads — it's `Send + Sync`, designed for shared use from
26//! `par_iter()` chains.
27
28// GPU-glue module: string/pair counts are bounded well within u32 (the same invariant the `gpu`
29// module relies on), index arithmetic uses intentional `as` casts (incl. the `-1` non-ASCII
30// sentinel round-tripped through i32), local `use`/`const` sit next to the code they serve, and the
31// doc prose names internal types (`CorpusGpu`, `QoS`, …) without backticks — mirror `gpu.rs`'s
32// module-level allows.
33#![allow(
34    clippy::cast_possible_truncation,
35    clippy::cast_possible_wrap,
36    clippy::cast_sign_loss,
37    clippy::doc_markdown,
38    clippy::many_single_char_names,
39    clippy::items_after_statements,
40    clippy::iter_cloned_collect,
41    clippy::redundant_closure_for_method_calls,
42    clippy::type_complexity
43)]
44
45use crate::cluster_canonicals_chars;
46
47#[cfg(all(feature = "gpu", target_os = "macos"))]
48use crate::gpu::{BoostGuard, CorpusGpu, Gpu};
49
50/// Backend selection for `Rationer` similarity work.
51///
52/// Picked at construction; can't change for a given handle (the resource set is wired up at
53/// `build()` time). Re-create the handle if you need a different concurrency mode.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum Concurrency {
56    /// Pure CPU — single-threaded if `threads(1)`, multi-threaded rayon otherwise. Works on
57    /// every platform; the only mode available with `feature = "gpu"` disabled.
58    Cpu,
59    /// Allow the Metal GPU where it measured a net win. In practice that is **only**
60    /// `cluster_canonicals` on a single group above `DFGPU_CLUSTER_THRESHOLD` (~1.1–1.4× CPU,
61    /// peaking on mid-size canonical-Python corpora). `ratio_many` and `cluster_canonicals_multi`
62    /// stay on CPU under this mode too — the GPU `matching_stats` offload lost on both at every
63    /// size benchmarked (their GPU paths are env-opt-in only). On non-Metal platforms: `Cpu`.
64    Gpu,
65    /// Default. Same routing as `Gpu` (GPU only for big single-group `cluster_canonicals`,
66    /// everything else CPU), plus rayon-parallel CPU for the per-pair `longest_in` recursion on the
67    /// GPU output. Behaves identically to `Gpu` for the currently GPU-enabled path; kept as a
68    /// distinct mode so heterogeneous overlap can be re-enabled per-path via the env thresholds
69    /// without an API change.
70    GpuPlusCpu,
71}
72
73impl Default for Concurrency {
74    // Not `#[derive(Default)]`: the default is `GpuPlusCpu`, not the first variant.
75    #[allow(clippy::derivable_impls)]
76    fn default() -> Self {
77        Self::GpuPlusCpu
78    }
79}
80
81/// Builder for [`Rationer`]. Configure once, then `.build()`.
82pub struct RationerBuilder {
83    concurrency: Concurrency,
84    threads: Option<usize>,
85    delta: f64,
86}
87
88impl Default for RationerBuilder {
89    fn default() -> Self {
90        Self { concurrency: Concurrency::default(), threads: None, delta: 0.0 }
91    }
92}
93
94impl RationerBuilder {
95    /// Pick a backend. See [`Concurrency`].
96    #[must_use]
97    pub fn concurrency(mut self, c: Concurrency) -> Self {
98        self.concurrency = c;
99        self
100    }
101
102    /// Set the rayon worker count for CPU-side work (filtering, per-pair recursion, assemble).
103    /// `None` (default) uses the global rayon pool. `Some(1)` forces single-threaded execution.
104    #[must_use]
105    pub fn threads(mut self, n: usize) -> Self {
106        self.threads = Some(n);
107        self
108    }
109
110    /// Approximate-RO knob (Phase 3). `delta = 0.0` (default) means exact Ratcliff-Obershelp
111    /// — bit-identical to Python difflib. `delta ∈ (0, 1]` trades a bounded amount of accuracy
112    /// for fewer chain walks inside `longest_in`: the suffix-link chain is capped at roughly
113    /// `1/√delta` ascensions, which empirically only fires on the long-tail (p99+) cases.
114    /// Property-tested in `tests/approx_ro.rs`: actual worst-case absolute RO deviation
115    /// stays below `delta` on canonical-Python corpora (typically several times tighter).
116    #[must_use]
117    pub fn delta(mut self, d: f64) -> Self {
118        self.delta = d.clamp(0.0, 1.0);
119        self
120    }
121
122    /// Materialize the handle. Acquires Metal device + IOPM boost assertion (when GPU is in
123    /// the requested concurrency mode); allocates a dedicated rayon pool if `threads` is set.
124    #[must_use]
125    pub fn build(self) -> Rationer {
126        Rationer::new_with(self.concurrency, self.threads, self.delta)
127    }
128}
129
130/// Handle that owns long-lived clustering resources (Metal device, IOPM power assertion, thread QoS
131/// boost). Construct once per process; share `&Rationer` across rayon workers.
132///
133/// On macOS with `feature = "gpu"`, `Rationer::new()` lazily acquires a Metal device + power
134/// assertion. On platforms without Metal (or with the `gpu` feature disabled at compile time) the
135/// struct is essentially empty — methods still work and produce the same answers, just always on CPU.
136pub struct Rationer {
137    concurrency: Concurrency,
138    /// Local rayon pool when the builder pinned a thread count; `None` ⇒ use rayon's global pool.
139    pool: Option<rayon::ThreadPool>,
140    /// Phase 3 approximate-RO knob (see `RationerBuilder::delta`). 0.0 = exact (default).
141    delta: f64,
142    #[cfg(all(feature = "gpu", target_os = "macos"))]
143    gpu: Option<Gpu>,
144    #[cfg(all(feature = "gpu", target_os = "macos"))]
145    _boost: Option<BoostGuard>,
146}
147
148impl Rationer {
149    /// Start with sensible defaults: GPU+CPU concurrency, rayon's global thread pool.
150    #[must_use]
151    pub fn builder() -> RationerBuilder {
152        RationerBuilder::default()
153    }
154
155    /// Create a `Rationer` with default settings (`Concurrency::GpuPlusCpu`, global rayon pool,
156    /// `delta = 0.0` = exact RO). Never fails — if Metal is unavailable, the handle quietly
157    /// degrades to CPU-only.
158    ///
159    /// Equivalent to `Rationer::builder().build()`.
160    #[must_use]
161    pub fn new() -> Self {
162        Self::new_with(Concurrency::default(), None, 0.0)
163    }
164
165    /// Active `delta` for the approximate-RO knob — 0.0 means exact.
166    #[must_use]
167    pub fn delta(&self) -> f64 {
168        self.delta
169    }
170
171    fn new_with(concurrency: Concurrency, threads: Option<usize>, delta: f64) -> Self {
172        let pool = threads.and_then(|n| rayon::ThreadPoolBuilder::new().num_threads(n).build().ok());
173
174        #[cfg(all(feature = "gpu", target_os = "macos"))]
175        {
176            let want_gpu = matches!(concurrency, Concurrency::Gpu | Concurrency::GpuPlusCpu);
177            let (gpu, boost) = if want_gpu {
178                // Acquire the boost guard FIRST so the GPU warmup dispatch inside Gpu::new()
179                // runs at USER_INTERACTIVE QoS and with IOPM holding boost clocks — that gets
180                // first-call latency out of the way at construction time, not on first
181                // `.cluster_*()`.
182                let b = BoostGuard::acquire();
183                let g = Gpu::new();
184                (g, Some(b))
185            } else {
186                (None, None)
187            };
188            Self { concurrency, pool, delta, gpu, _boost: boost }
189        }
190        #[cfg(not(all(feature = "gpu", target_os = "macos")))]
191        {
192            // On non-macOS (or with `feature = "gpu"` off) GPU is impossible — degrade to CPU.
193            let _ = concurrency;
194            Self { concurrency: Concurrency::Cpu, pool, delta }
195        }
196    }
197
198    /// The active backend (after construction-time fallback). A `Rationer` built with
199    /// `Concurrency::Gpu` on a non-Metal platform reports `Cpu` here.
200    #[must_use]
201    pub fn concurrency(&self) -> Concurrency {
202        self.concurrency
203    }
204
205    /// Run a closure inside the configured rayon pool (the local pool when `threads(n)` was
206    /// set; rayon's global pool otherwise). All public methods that do rayon work funnel
207    /// through this so the `threads` setting actually takes effect.
208    fn with_pool<F, R>(&self, f: F) -> R
209    where
210        F: FnOnce() -> R + Send,
211        R: Send,
212    {
213        if let Some(pool) = &self.pool {
214            pool.install(f)
215        } else {
216            f()
217        }
218    }
219
220    /// Single-pair Ratcliff–Obershelp ratio. Always CPU (one pair offers no GPU win). Same
221    /// output as the free function [`crate::gestalt_ratio`].
222    #[must_use]
223    pub fn ratio(&self, a: &str, b: &str) -> f64 {
224        crate::gestalt_ratio(a, b)
225    }
226
227    /// Batched ratio over a list of `(a, b)` string pairs, in parallel across cores.
228    ///
229    /// **CPU by default.** Benchmarking (mypy/django/sympy/ha/transformers, 61k–404k pairs) showed
230    /// the GPU `matching_stats` offload *loses* here — 0.82–0.93× CPU across every size measured —
231    /// because the CPU rayon path (intern uniques, prebuild each SAM once, `gestalt_ratio_prebuilt`
232    /// per pair) is already efficient and the GPU's corpus-build + dispatch overhead isn't amortized
233    /// by the relatively light per-pair `matching_stats` walk. So `ratio_many` stays on CPU even
234    /// under `Concurrency::Gpu`. The GPU path is retained behind `DFGPU_RATIO_MANY_THRESHOLD=<n>`
235    /// (default off) for experimentation / other hardware: set it to engage GPU at `pairs.len() >= n`.
236    ///
237    /// Output is the bit-identical Ratcliff–Obershelp ratio for each pair, in input order. On
238    /// non-ASCII pairs the GPU path routes the affected pairs to the CPU per-pair fallback.
239    #[must_use]
240    pub fn ratio_many<S1, S2>(&self, pairs: &[(S1, S2)]) -> Vec<f64>
241    where
242        S1: AsRef<str> + Sync,
243        S2: AsRef<str> + Sync,
244    {
245        #[cfg(all(feature = "gpu", target_os = "macos"))]
246        {
247            let want_gpu =
248                matches!(self.concurrency, Concurrency::Gpu | Concurrency::GpuPlusCpu);
249            // Default off (usize::MAX): GPU ratio_many measured slower than CPU at every tested
250            // size. Opt in via env for experimentation.
251            let gpu_threshold: usize = std::env::var("DFGPU_RATIO_MANY_THRESHOLD")
252                .ok()
253                .and_then(|s| s.parse().ok())
254                .unwrap_or(usize::MAX);
255            if want_gpu && pairs.len() >= gpu_threshold {
256                if let Some(gpu) = &self.gpu {
257                    let delta = self.delta;
258                    return self.with_pool(|| ratio_many_via_gpu(gpu, pairs, delta));
259                }
260            }
261        }
262        self.with_pool(|| ratio_many_cpu(pairs))
263    }
264
265    /// Build a reusable corpus over `strings`: parses chars, builds SAMs, and (when GPU is
266    /// active) uploads the [`CorpusGpu`] arena. The returned [`PreparedRationer`] borrows the
267    /// `Rationer` and lets you issue multiple `ratio_many_idx(pairs)` calls that amortize the
268    /// SAM-build + GPU-upload cost over an arbitrary number of pair queries.
269    ///
270    /// Use this when the same string set is queried repeatedly (e.g. iterative refinement,
271    /// dedup pipelines, find-dup-defs over a fixed file list). For one-shot queries the regular
272    /// [`Rationer::ratio_many`] does the same work internally and is fine.
273    ///
274    /// All strings should be ASCII for the GPU path to engage; non-ASCII strings are kept in the
275    /// SAM pool but their pair queries automatically fall back to a CPU per-pair compute on the
276    /// host (same semantics as `ratio_many`).
277    #[must_use]
278    pub fn prepare<S: AsRef<str>>(&self, strings: &[S]) -> PreparedRationer<'_> {
279        use rayon::prelude::*;
280        let owned: Vec<String> = strings.iter().map(|s| s.as_ref().to_owned()).collect();
281        let chars_pool: Vec<Vec<char>> =
282            self.with_pool(|| owned.par_iter().map(|s| s.chars().collect()).collect());
283        let sams: Vec<crate::gestalt::Sam> =
284            self.with_pool(|| chars_pool.par_iter().map(|c| crate::gestalt::build_sam(c)).collect());
285
286        #[cfg(all(feature = "gpu", target_os = "macos"))]
287        {
288            // Identify ASCII-only strings; only those are uploaded to the GPU arena. Non-ASCII
289            // strings stay in the SAM pool for the CPU fallback path.
290            let mut gpu_idx_for: Vec<i32> = vec![-1; owned.len()];
291            let mut ascii_strings: Vec<&str> = Vec::new();
292            let mut ascii_sams: Vec<crate::gestalt::Sam> = Vec::new();
293            // SAFETY: ascii_sams takes ownership of a CLONE of ascii SAMs — borrowing
294            // would tangle lifetimes with `sams` (which we keep around for CPU fallback).
295            // Sam is cheap to clone (Vec<u32> data); only done once at prepare time.
296            for (i, s) in owned.iter().enumerate() {
297                if s.bytes().all(|b| b < 128) {
298                    gpu_idx_for[i] = ascii_strings.len() as i32;
299                    ascii_strings.push(s.as_str());
300                    ascii_sams.push(sams[i].clone());
301                }
302            }
303
304            let corpus = if let Some(ref gpu) = self.gpu {
305                if ascii_strings.is_empty() {
306                    None
307                } else {
308                    let byte_refs: Vec<&[u8]> = ascii_strings.iter().map(|s| s.as_bytes()).collect();
309                    Some(CorpusGpu::build(gpu, &byte_refs, &ascii_sams))
310                }
311            } else {
312                None
313            };
314
315            PreparedRationer {
316                rationer: self,
317                strings: owned,
318                chars_pool,
319                sams,
320                corpus,
321                gpu_idx_for,
322            }
323        }
324        #[cfg(not(all(feature = "gpu", target_os = "macos")))]
325        {
326            PreparedRationer { rationer: self, strings: owned, chars_pool, sams }
327        }
328    }
329
330    /// Exact single-linkage clustering at `threshold`, identical to the free-standing
331    /// [`crate::cluster_canonicals_chars`] in behaviour and output. Routes through the GPU on
332    /// macOS when the group is large enough to amortize dispatch overhead; small groups stay on
333    /// CPU because the GPU dispatch fixed cost (~5–50 ms) exceeds CPU rayon's full run for them.
334    ///
335    /// Routing: if the handle's [`Concurrency`] includes GPU and the group is big + all-ASCII,
336    /// dispatch through Metal; else stay on the CPU. The size cutoff is set so the GPU path's
337    /// `CorpusGpu` build + dispatch (~10–30 ms) is amortized over enough verified pairs to win.
338    /// Override with `DFGPU_CLUSTER_THRESHOLD=<n>` env var; default 300.
339    #[must_use]
340    pub fn cluster_canonicals_chars(
341        &self,
342        chars: &[Vec<char>],
343        threshold: f64,
344    ) -> Vec<(Vec<usize>, f64)> {
345        #[cfg(all(feature = "gpu", target_os = "macos"))]
346        {
347            let want_gpu =
348                matches!(self.concurrency, Concurrency::Gpu | Concurrency::GpuPlusCpu);
349            if want_gpu {
350                if let Some(gpu) = &self.gpu {
351                    let gpu_threshold: usize = std::env::var("DFGPU_CLUSTER_THRESHOLD")
352                        .ok()
353                        .and_then(|s| s.parse().ok())
354                        .unwrap_or(300);
355                    if chars.len() >= gpu_threshold
356                        && chars.iter().all(|c| c.iter().all(|&ch| (ch as u32) < 128))
357                    {
358                        return self.with_pool(|| {
359                            cluster_canonicals_chars_via_gpu(gpu, chars, threshold, self.delta)
360                        });
361                    }
362                }
363            }
364        }
365        self.with_pool(|| cluster_canonicals_chars(chars, threshold))
366    }
367
368    /// String-input convenience, mirrors [`crate::cluster_canonicals`].
369    #[must_use]
370    pub fn cluster_canonicals(
371        &self,
372        canonicals: &[String],
373        threshold: f64,
374    ) -> Vec<(Vec<usize>, f64)> {
375        let chars: Vec<Vec<char>> = canonicals.iter().map(|s| s.chars().collect()).collect();
376        self.cluster_canonicals_chars(&chars, threshold)
377    }
378
379    /// Batched-across-groups cluster_canonicals.
380    ///
381    /// Run K independent clustering jobs as ONE GPU dispatch — concatenate every group's strings
382    /// into a single Metal corpus arena, run filters per-group on the CPU, submit ALL surviving
383    /// candidate pairs (from every group) in one batched `matching_stats` kernel call, then split
384    /// results back to per-group `gestalt_edge_with_ms` + `assemble` on the CPU.
385    ///
386    /// The single-dispatch idea was meant to be find-dup-defs's win condition (thousands of
387    /// same-name groups, each too small for per-call GPU overhead, batched into one dispatch).
388    /// **CPU by default**, though: benchmarking showed the batched GPU path loses on that very shape
389    /// — 0.70× CPU at 44 groups of 50, only reaching break-even (~0.99×) at a handful of large
390    /// groups — the corpus-build + dispatch overhead isn't amortized when each group's surviving-pair
391    /// count is small. So `cluster_canonicals_multi` runs per-group on CPU (rayon across groups)
392    /// unless `DFGPU_MULTI_THRESHOLD=<total strings>` is set to opt the batched GPU path back in.
393    ///
394    /// Returns one cluster list per input group in the input order. Each list has identical
395    /// semantics to [`cluster_canonicals`] called on that group alone.
396    #[must_use]
397    #[allow(clippy::too_many_lines)]
398    pub fn cluster_canonicals_multi(
399        &self,
400        groups: &[Vec<String>],
401        threshold: f64,
402    ) -> Vec<Vec<(Vec<usize>, f64)>> {
403        #[cfg(all(feature = "gpu", target_os = "macos"))]
404        {
405            let want_gpu =
406                matches!(self.concurrency, Concurrency::Gpu | Concurrency::GpuPlusCpu);
407            // Default off (usize::MAX): the batched GPU path measured at best break-even vs the
408            // per-group CPU path. Opt in via env for experimentation / other hardware.
409            let gpu_threshold: usize = std::env::var("DFGPU_MULTI_THRESHOLD")
410                .ok()
411                .and_then(|s| s.parse().ok())
412                .unwrap_or(usize::MAX);
413            let total: usize = groups.iter().map(Vec::len).sum();
414            if want_gpu && total >= gpu_threshold {
415                if let Some(gpu) = &self.gpu {
416                    return self
417                        .with_pool(|| cluster_canonicals_multi_via_gpu(gpu, groups, threshold, self.delta));
418                }
419            }
420        }
421        // CPU path (default): iterate per group across rayon.
422        use rayon::prelude::*;
423        self.with_pool(|| {
424            groups
425                .par_iter()
426                .map(|g| {
427                    let chars: Vec<Vec<char>> =
428                        g.iter().map(|s| s.chars().collect()).collect();
429                    cluster_canonicals_chars(&chars, threshold)
430                })
431                .collect()
432        })
433    }
434}
435
436impl Default for Rationer {
437    fn default() -> Self {
438        Self::new()
439    }
440}
441
442/// Reusable corpus + SAMs built by [`Rationer::prepare`]. Hold this across many
443/// `ratio_many_idx` calls to amortize SAM building and the GPU upload over the lifetime of the
444/// string set. Borrowing the `Rationer` keeps the Metal device and thread pool wiring alive.
445pub struct PreparedRationer<'r> {
446    rationer: &'r Rationer,
447    strings: Vec<String>,
448    chars_pool: Vec<Vec<char>>,
449    sams: Vec<crate::gestalt::Sam>,
450    #[cfg(all(feature = "gpu", target_os = "macos"))]
451    corpus: Option<CorpusGpu>,
452    /// Maps original string index → index into `corpus` (ascii-only). `-1` for non-ASCII strings.
453    /// Used to translate pair indices the user gives in `ratio_many_idx` into GPU-corpus indices.
454    #[cfg(all(feature = "gpu", target_os = "macos"))]
455    gpu_idx_for: Vec<i32>,
456}
457
458impl PreparedRationer<'_> {
459    /// Number of strings in the prepared corpus.
460    #[must_use]
461    pub fn len(&self) -> usize {
462        self.strings.len()
463    }
464
465    /// True iff `len() == 0`.
466    #[must_use]
467    pub fn is_empty(&self) -> bool {
468        self.strings.is_empty()
469    }
470
471    /// Batched Ratcliff-Obershelp ratio over a pair list referencing the prepared strings by
472    /// index (`(i, j)` ⇒ `ratio(strings[i], strings[j])`). All SAM / GPU-corpus state is reused
473    /// from `prepare()`, so a long-lived `PreparedRationer` paying its build cost once can serve
474    /// many queries at the kernel-pure GPU throughput.
475    ///
476    /// # Panics
477    /// Panics if any pair index is out of bounds.
478    #[must_use]
479    pub fn ratio_many_idx(&self, pairs: &[(u32, u32)]) -> Vec<f64> {
480        let n = pairs.len();
481        if n == 0 {
482            return Vec::new();
483        }
484        // Bounds-check eagerly so the rest of the code can use unchecked indexing in the kernel.
485        let n_strings = self.strings.len() as u32;
486        for &(i, j) in pairs {
487            assert!(i < n_strings && j < n_strings, "ratio_many_idx: index out of bounds");
488        }
489
490        #[cfg(all(feature = "gpu", target_os = "macos"))]
491        {
492            if let (Some(corpus), Some(gpu)) = (self.corpus.as_ref(), self.rationer.gpu.as_ref()) {
493                let delta = self.rationer.delta;
494                return self.rationer.with_pool(|| {
495                    ratio_many_via_prepared_gpu(gpu, corpus, &self.sams, &self.chars_pool, &self.gpu_idx_for, pairs, delta)
496                });
497            }
498        }
499        // CPU fallback — reuse prebuilt SAMs.
500        self.rationer.with_pool(|| ratio_many_via_prepared_cpu(&self.chars_pool, &self.sams, pairs))
501    }
502}
503
504#[cfg(all(feature = "gpu", target_os = "macos"))]
505fn ratio_many_via_prepared_gpu(
506    gpu: &Gpu,
507    corpus: &CorpusGpu,
508    sams: &[crate::gestalt::Sam],
509    chars_pool: &[Vec<char>],
510    gpu_idx_for: &[i32],
511    pairs: &[(u32, u32)],
512    delta: f64,
513) -> Vec<f64> {
514    use rayon::prelude::*;
515
516    let n = pairs.len();
517    let mut out = vec![0.0f64; n];
518
519    // Split into GPU-eligible (both strings ASCII) and CPU-fallback lanes.
520    let mut gpu_pairs: Vec<(u32, u32)> = Vec::with_capacity(n);
521    let mut gpu_slot_for: Vec<usize> = Vec::with_capacity(n);
522    let mut cpu_slot_for: Vec<usize> = Vec::new();
523    for (slot, &(a, b)) in pairs.iter().enumerate() {
524        let ga = gpu_idx_for[a as usize];
525        let gb = gpu_idx_for[b as usize];
526        if ga >= 0 && gb >= 0 {
527            gpu_pairs.push((ga as u32, gb as u32));
528            gpu_slot_for.push(slot);
529        } else {
530            cpu_slot_for.push(slot);
531        }
532    }
533
534    // GPU lane: matching_stats kernel + CPU `gestalt_edge_with_ms` per pair. This is the only
535    // GPU path that wins on real workloads; Stage 4d/4g full_ratio kernels lose 3× because
536    // longest_in's seg-tree queries are bandwidth-bound on the 50-200 MB seg_data buffer with
537    // no GPU cache locality. See `src/new/PERF_MAP.md` for measurements.
538    if !gpu_pairs.is_empty() {
539        // gpu_idx → original_idx reverse map for fetching chars/sams by ORIGINAL string index.
540        let mut orig_for: Vec<u32> = vec![u32::MAX; corpus.n_sams()];
541        for (orig, &gi) in gpu_idx_for.iter().enumerate() {
542            if gi >= 0 {
543                orig_for[gi as usize] = orig as u32;
544            }
545        }
546        let flat = gpu.matching_stats_by_b_partial_flat_with_timings(corpus, &gpu_pairs).0;
547        let fstate_all = flat.fstate_all();
548        let fmatch_all = flat.fmatch_all();
549        let results: Vec<(usize, f64)> = (0..gpu_pairs.len())
550            .into_par_iter()
551            .map(|slot| {
552                let orig_pair_idx = flat.pair_orig_idx[slot] as usize;
553                let (ga, gb) = gpu_pairs[orig_pair_idx];
554                let lo = flat.out_offsets[slot] as usize;
555                let hi = flat.out_offsets[slot + 1] as usize;
556                let fstate = &fstate_all[lo..hi];
557                let fmatch = &fmatch_all[lo..hi];
558                let oa = orig_for[ga as usize] as usize;
559                let ob = orig_for[gb as usize] as usize;
560                let r = crate::gestalt::gestalt_edge_with_ms_delta(
561                    &chars_pool[oa],
562                    &chars_pool[ob],
563                    &sams[ob],
564                    fstate,
565                    fmatch,
566                    0.0,
567                    delta,
568                )
569                .unwrap_or(0.0);
570                (gpu_slot_for[orig_pair_idx], r)
571            })
572            .collect();
573        for (slot, r) in results {
574            out[slot] = r;
575        }
576    }
577
578    // CPU lane (non-ASCII pairs).
579    if !cpu_slot_for.is_empty() {
580        let cpu_results: Vec<(usize, f64)> = cpu_slot_for
581            .par_iter()
582            .map(|&slot| {
583                let (i, j) = pairs[slot];
584                let a = &chars_pool[i as usize];
585                let b = &chars_pool[j as usize];
586                let r = crate::gestalt::gestalt_ratio_prebuilt(a, b, &sams[j as usize]);
587                (slot, r)
588            })
589            .collect();
590        for (slot, r) in cpu_results {
591            out[slot] = r;
592        }
593    }
594
595    out
596}
597
598fn ratio_many_via_prepared_cpu(
599    chars_pool: &[Vec<char>],
600    sams: &[crate::gestalt::Sam],
601    pairs: &[(u32, u32)],
602) -> Vec<f64> {
603    use rayon::prelude::*;
604    let n = pairs.len();
605    // OPTIMIZATION C: sort pair indices by b (then a) so adjacent rayon-chunk work hits the
606    // same SAM → b's node + edges + epmeta + epos stay in L2 across calls. Without this each
607    // worker thread bounces between SAMs in input order (random) and pays L2 eviction per pair.
608    let mut perm: Vec<u32> = (0..n as u32).collect();
609    perm.sort_unstable_by_key(|&i| {
610        let (a, b) = pairs[i as usize];
611        (b, a)
612    });
613    let mut out = vec![0.0f64; n];
614    let results: Vec<(usize, f64)> = perm
615        .par_iter()
616        .map(|&pi| {
617            let pi_us = pi as usize;
618            let (i, j) = pairs[pi_us];
619            let a = &chars_pool[i as usize];
620            let b = &chars_pool[j as usize];
621            let r = crate::gestalt::gestalt_ratio_prebuilt(a, b, &sams[j as usize]);
622            (pi_us, r)
623        })
624        .collect();
625    for (pi, r) in results {
626        out[pi] = r;
627    }
628    out
629}
630
631/// CPU `ratio_many`: intern unique strings, prebuild each one's SAM once, then call
632/// `gestalt_ratio_prebuilt` per pair. Without this we'd pay a SAM build per pair — for
633/// 50 k pairs over 1 k unique strings that's 50 k rebuilds vs 1 k. Matches what the GPU
634/// path's CorpusGpu does, so the cpu-vs-gpu speedup measurement is apples to apples.
635fn ratio_many_cpu<S1, S2>(pairs: &[(S1, S2)]) -> Vec<f64>
636where
637    S1: AsRef<str> + Sync,
638    S2: AsRef<str> + Sync,
639{
640    use std::collections::HashMap;
641
642    use rayon::prelude::*;
643
644    if pairs.is_empty() {
645        return Vec::new();
646    }
647
648    let mut pool: Vec<String> = Vec::new();
649    let mut by_str: HashMap<String, u32> = HashMap::new();
650    let mut pair_idx: Vec<(u32, u32)> = Vec::with_capacity(pairs.len());
651    for p in pairs {
652        let a: &str = p.0.as_ref();
653        let b: &str = p.1.as_ref();
654        let mut intern = |s: &str| -> u32 {
655            if let Some(&id) = by_str.get(s) {
656                return id;
657            }
658            let idx = pool.len() as u32;
659            pool.push(s.to_owned());
660            by_str.insert(s.to_owned(), idx);
661            idx
662        };
663        let ai = intern(a);
664        let bi = intern(b);
665        pair_idx.push((ai, bi));
666    }
667    drop(by_str);
668
669    let chars_pool: Vec<Vec<char>> =
670        pool.par_iter().map(|s| s.chars().collect()).collect();
671    let sams: Vec<crate::gestalt::Sam> =
672        chars_pool.par_iter().map(|c| crate::gestalt::build_sam(c)).collect();
673
674    // OPTIMIZATION C: sort indices by b (then a) so each rayon worker gets a chunk of consecutive
675    // pairs sharing the same SAM-b → b's node + edges + epmeta + epos stay in L2 across calls.
676    let n = pair_idx.len();
677    let mut perm: Vec<u32> = (0..n as u32).collect();
678    perm.sort_unstable_by_key(|&i| {
679        let (a, b) = pair_idx[i as usize];
680        (b, a)
681    });
682    let mut out = vec![0.0f64; n];
683    let results: Vec<(usize, f64)> = perm
684        .par_iter()
685        .map(|&pi| {
686            let pi_us = pi as usize;
687            let (ai, bi) = pair_idx[pi_us];
688            let a = &chars_pool[ai as usize];
689            let b = &chars_pool[bi as usize];
690            let sam_b = &sams[bi as usize];
691            let r = crate::gestalt::gestalt_ratio_prebuilt(a, b, sam_b);
692            (pi_us, r)
693        })
694        .collect();
695    for (pi, r) in results {
696        out[pi] = r;
697    }
698    out
699}
700
701/// Stage-4c: GPU port of `ratio_many`. Pipeline is:
702///
703/// 1. **Intern** every unique `&str` from the input pairs into a flat pool — most workloads
704///    have lots of repeated strings (find-dup-defs's same-name groups, cluster_canonicals
705///    inside the same group, etc.), so deduplication shrinks the corpus we build SAMs over.
706/// 2. **Split** the pair list into ASCII-routed and non-ASCII routed lanes. Non-ASCII pairs
707///    fall through to CPU `gestalt_ratio` (the GPU kernel reads `u8`).
708/// 3. **Build SAMs + CorpusGpu** over the ASCII pool. Rayon-parallel.
709/// 4. **GPU dispatch**: matching_stats kernel batched over the surviving ASCII pairs.
710/// 5. **Per-pair recursion on CPU** using the GPU-filled `(fstate, fmatch)` — same code path
711///    as `gestalt_edge_with_ms` with threshold 0, so the return value is the unconditional
712///    ratio (no early-exit drop).
713/// 6. **Merge** ASCII and non-ASCII results back in input order.
714///
715/// Same byte-for-byte correctness guarantee as `gestalt_ratio`. Wins on big batches because
716/// the heavy SAM walk runs on the bandwidth-bound GPU at ~3× CPU rayon throughput, and the
717/// CorpusGpu build cost (~10–50 ms) is amortized across all pairs.
718#[cfg(all(feature = "gpu", target_os = "macos"))]
719#[allow(clippy::too_many_lines)]
720fn ratio_many_via_gpu<S1, S2>(gpu: &Gpu, pairs: &[(S1, S2)], delta: f64) -> Vec<f64>
721where
722    S1: AsRef<str> + Sync,
723    S2: AsRef<str> + Sync,
724{
725    use std::collections::HashMap;
726
727    use rayon::prelude::*;
728
729    use crate::gpu::CorpusGpu;
730
731    let n_pairs = pairs.len();
732    if n_pairs == 0 {
733        return Vec::new();
734    }
735
736    // Step 1 — intern unique strings into a pool. Routing: ASCII strings get an index in the
737    // GPU corpus; non-ASCII strings get the `NON_ASCII` sentinel and the pair is handled by
738    // the CPU fallback below. We use `String`-keyed dedup so the pool owns its data — the
739    // input slice's lifetime isn't carried into the HashMap.
740    const NON_ASCII: u32 = u32::MAX;
741    let mut pool: Vec<String> = Vec::new();
742    let mut by_str: HashMap<String, u32> = HashMap::new();
743    let mut pair_idx: Vec<(u32, u32)> = Vec::with_capacity(n_pairs);
744    for p in pairs {
745        let a: &str = p.0.as_ref();
746        let b: &str = p.1.as_ref();
747        let mut intern = |s: &str| -> u32 {
748            if !s.chars().all(|c| (c as u32) < 128) {
749                return NON_ASCII;
750            }
751            if let Some(&id) = by_str.get(s) {
752                return id;
753            }
754            let idx = pool.len() as u32;
755            pool.push(s.to_owned());
756            by_str.insert(s.to_owned(), idx);
757            idx
758        };
759        let ai = intern(a);
760        let bi = intern(b);
761        pair_idx.push((ai, bi));
762    }
763    let n_unique = pool.len();
764    drop(by_str);
765
766    // Step 2 — split pairs into GPU and CPU lanes. The GPU lane has all `(a, b)` where both
767    // strings are ASCII; the CPU lane has the rest.
768    let mut gpu_pairs: Vec<(u32, u32)> = Vec::with_capacity(n_pairs);
769    let mut gpu_slot_for: Vec<usize> = Vec::with_capacity(n_pairs);
770    let mut cpu_slot_for: Vec<usize> = Vec::new();
771    for (i, &(a, b)) in pair_idx.iter().enumerate() {
772        if a == NON_ASCII || b == NON_ASCII {
773            cpu_slot_for.push(i);
774        } else {
775            gpu_pairs.push((a, b));
776            gpu_slot_for.push(i);
777        }
778    }
779
780    // Step 3 — build SAMs + char view in parallel.
781    let chars_pool: Vec<Vec<char>> =
782        pool.par_iter().map(|s| s.chars().collect()).collect();
783    let sams: Vec<crate::gestalt::Sam> =
784        chars_pool.par_iter().map(|c| crate::gestalt::build_sam(c)).collect();
785    let byte_refs: Vec<&[u8]> = pool.iter().map(|s| s.as_bytes()).collect();
786    let corpus = CorpusGpu::build(gpu, &byte_refs, &sams);
787    let _ = n_unique;
788
789    // Allocate output, fill from both lanes.
790    let mut out = vec![0.0f64; n_pairs];
791
792    // Step 4–5 — GPU lane. `matching_stats_by_b_partial_flat` does the SAM walk on the GPU;
793    // CPU runs `gestalt_edge_with_ms` (longest_in + RO recursion) per pair on rayon. This is
794    // the only GPU path that wins on real workloads (see `src/new/PERF_MAP.md`).
795    if !gpu_pairs.is_empty() {
796        let flat = gpu.matching_stats_by_b_partial_flat_with_timings(&corpus, &gpu_pairs).0;
797        let fstate_all = flat.fstate_all();
798        let fmatch_all = flat.fmatch_all();
799        let results: Vec<(usize, f64)> = (0..gpu_pairs.len())
800            .into_par_iter()
801            .map(|slot| {
802                let orig = flat.pair_orig_idx[slot] as usize;
803                let (a_idx, b_idx) = gpu_pairs[orig];
804                let lo = flat.out_offsets[slot] as usize;
805                let hi = flat.out_offsets[slot + 1] as usize;
806                let fstate = &fstate_all[lo..hi];
807                let fmatch = &fmatch_all[lo..hi];
808                let r = crate::gestalt::gestalt_edge_with_ms_delta(
809                    &chars_pool[a_idx as usize],
810                    &chars_pool[b_idx as usize],
811                    &sams[b_idx as usize],
812                    fstate,
813                    fmatch,
814                    0.0,
815                    delta,
816                )
817                .unwrap_or(0.0);
818                (gpu_slot_for[orig], r)
819            })
820            .collect();
821        for (slot, r) in results {
822            out[slot] = r;
823        }
824    }
825
826    // Step 6 — CPU lane for non-ASCII (rare) pairs.
827    let cpu_results: Vec<(usize, f64)> = cpu_slot_for
828        .par_iter()
829        .map(|&i| {
830            let (a, b) = &pairs[i];
831            (i, crate::gestalt_ratio(a.as_ref(), b.as_ref()))
832        })
833        .collect();
834    for (slot, r) in cpu_results {
835        out[slot] = r;
836    }
837    out
838}
839
840/// Cross-group batched GPU path — one CorpusGpu, one dispatch, K group results.
841/// See [`Rationer::cluster_canonicals_multi`] for the contract.
842#[cfg(all(feature = "gpu", target_os = "macos"))]
843#[allow(clippy::too_many_lines)]
844fn cluster_canonicals_multi_via_gpu(
845    gpu: &Gpu,
846    groups: &[Vec<String>],
847    threshold: f64,
848    delta: f64,
849) -> Vec<Vec<(Vec<usize>, f64)>> {
850    use rayon::prelude::*;
851
852    use crate::gpu::CorpusGpu;
853    use crate::{assemble, char_counts, quick_ratio_counts, real_quick_ratio};
854
855    if groups.is_empty() {
856        return Vec::new();
857    }
858
859    // Flatten: every (group_idx, in_group_idx) → global_idx. Per-group offset for unflattening.
860    let mut group_offsets: Vec<usize> = Vec::with_capacity(groups.len() + 1);
861    group_offsets.push(0);
862    for g in groups {
863        let end = group_offsets[group_offsets.len() - 1] + g.len();
864        group_offsets.push(end);
865    }
866
867    // OPTIMIZATION E: dedup strings across groups before building SAMs. find-dup-defs's actual
868    // shape is "the same function name appears in many groups" — the synthetic `ratoner-groups`
869    // bench partitions strings by `len % 50` so each string is in exactly one group (no dedup
870    // possible there), but real workloads dedupe heavily. We build SAMs once per UNIQUE string
871    // and remap flat (input-order) indices via `flat_to_unique` whenever we touch SAM-side data.
872    // For pair (a_flat, b_flat): both sides are mapped to unique indices for the GPU corpus and
873    // for SAM lookups; the FLAT indices are kept for per-group demux (group_offsets[] uses flat
874    // space).
875    let total: usize = groups.iter().map(|g| g.len()).sum();
876    let mut unique_idx: std::collections::HashMap<&str, u32> = std::collections::HashMap::new();
877    let mut unique_strings: Vec<&str> = Vec::new();
878    let mut flat_to_unique: Vec<u32> = Vec::with_capacity(total);
879    for g in groups {
880        for s in g {
881            let s_ref = s.as_str();
882            let u = if let Some(&u) = unique_idx.get(s_ref) {
883                u
884            } else {
885                let u = unique_strings.len() as u32;
886                unique_strings.push(s_ref);
887                unique_idx.insert(s_ref, u);
888                u
889            };
890            flat_to_unique.push(u);
891        }
892    }
893    let unique_chars: Vec<Vec<char>> =
894        unique_strings.par_iter().map(|s| s.chars().collect()).collect();
895    let unique_sams: Vec<crate::gestalt::Sam> =
896        unique_chars.par_iter().map(|c| crate::gestalt::build_sam(c)).collect();
897    let unique_ascii: Vec<bool> =
898        unique_chars.iter().map(|c| c.iter().all(|&ch| (ch as u32) < 128)).collect();
899
900    // group_ascii: all strings in a group are ASCII → ok for GPU. Resolve via unique_ascii.
901    let group_ascii: Vec<bool> = (0..groups.len())
902        .map(|gi| {
903            let lo = group_offsets[gi];
904            let hi = group_offsets[gi + 1];
905            (lo..hi).all(|i| unique_ascii[flat_to_unique[i] as usize])
906        })
907        .collect();
908
909    // CorpusGpu is built over UNIQUE strings; the GPU kernel indexes by unique_idx. Pair (a, b)
910    // fed to GPU must be the unique pair, not the flat one.
911    let unique_bytes: Vec<Vec<u8>> = unique_chars
912        .iter()
913        .map(|c| {
914            if c.iter().all(|&ch| (ch as u32) < 128) {
915                c.iter().map(|&ch| ch as u8).collect()
916            } else {
917                Vec::new()
918            }
919        })
920        .collect();
921    let byte_refs: Vec<&[u8]> = unique_bytes.iter().map(Vec::as_slice).collect();
922    let corpus = CorpusGpu::build(gpu, &byte_refs, &unique_sams);
923
924    // Below, indexing flat → unique uses `flat_to_unique[flat_i] as usize` whenever we need
925    // SAM/chars data. Pair tuples carry BOTH flat (for demux) and unique (for GPU) indices.
926    let flat_chars = |flat_i: u32| -> &Vec<char> { &unique_chars[flat_to_unique[flat_i as usize] as usize] };
927
928    // Per-group filtering on CPU (length + quick_ratio). Survivors are concatenated into one
929    // global candidate list. Each candidate carries:
930    //   (a_unique, b_unique)  → for the GPU CorpusGpu lookup (unique-string indexed)
931    //   (a_flat, b_flat)      → for per-group demux: local = flat - group_offsets[gi]
932    //   group_idx             → so we can route the edge back to the right group
933    let unique_counts: Vec<Vec<(char, u32)>> =
934        unique_chars.par_iter().map(|c| char_counts(c)).collect();
935    let per_group_candidates: Vec<Vec<(u32, u32, u32, u32, u32)>> = (0..groups.len())
936        .into_par_iter()
937        .map(|gi| {
938            if !group_ascii[gi] {
939                return Vec::new();
940            }
941            let lo = group_offsets[gi];
942            let hi = group_offsets[gi + 1];
943            let n = hi - lo;
944            let mut order: Vec<usize> = (lo..hi).collect();
945            order.sort_by_key(|&i| flat_chars(i as u32).len());
946            let mut out: Vec<(u32, u32, u32, u32, u32)> = Vec::new();
947            #[allow(clippy::cast_possible_truncation)]
948            for p in 0..n {
949                let i = order[p];
950                let i_u = flat_to_unique[i] as usize;
951                for &j in &order[p + 1..] {
952                    let j_u = flat_to_unique[j] as usize;
953                    let ci = &unique_chars[i_u];
954                    let cj = &unique_chars[j_u];
955                    if real_quick_ratio(ci, cj) < threshold {
956                        break;
957                    }
958                    if quick_ratio_counts(&unique_counts[i_u], &unique_counts[j_u], ci.len() + cj.len()) < threshold {
959                        continue;
960                    }
961                    let (loi_flat, hii_flat) = if i < j { (i, j) } else { (j, i) };
962                    let loi_u = flat_to_unique[loi_flat];
963                    let hii_u = flat_to_unique[hii_flat];
964                    out.push((loi_u, hii_u, loi_flat as u32, hii_flat as u32, gi as u32));
965                }
966            }
967            out
968        })
969        .collect();
970
971    // Concatenate all candidates into one global list. `pairs_for_gpu` is in UNIQUE space (fed to
972    // GPU); `pair_flat` parallel-stores flat indices for per-group demux + gestalt_edge_with_ms.
973    let mut pairs_for_gpu: Vec<(u32, u32)> = Vec::new();
974    let mut pair_flat: Vec<(u32, u32)> = Vec::new();
975    let mut pair_group: Vec<u32> = Vec::new();
976    for group_pairs in &per_group_candidates {
977        for &(au, bu, af, bf, gi) in group_pairs {
978            pairs_for_gpu.push((au, bu));
979            pair_flat.push((af, bf));
980            pair_group.push(gi);
981        }
982    }
983
984    // Per-group `pairs` arrays (for `assemble` later) — populate from GPU results.
985    let mut per_group_edges: Vec<Vec<(usize, usize, f64)>> = vec![Vec::new(); groups.len()];
986
987    if !pairs_for_gpu.is_empty() {
988        // Chunk the GPU dispatch to keep the output buffer (fmatch + fstate) under ~1 GB. Each
989        // pair contributes `a_len * 8 B`; for canonical Python the avg is ~1 KB/pair, so we cap
990        // at 250 k pairs per dispatch. Without this cap, multi-million-pair runs OOM Metal's
991        // single-buffer limit and the kernel silently returns zero output.
992        let max_pairs_per_dispatch: usize = std::env::var("DFGPU_MAX_PAIRS")
993            .ok()
994            .and_then(|s| s.parse().ok())
995            .unwrap_or(250_000);
996        let mut edges_acc: Vec<(u32, u32, u32, f64)> = Vec::new(); // (a_flat, b_flat, gi, ratio)
997        for chunk_start in (0..pairs_for_gpu.len()).step_by(max_pairs_per_dispatch) {
998            let chunk_end = (chunk_start + max_pairs_per_dispatch).min(pairs_for_gpu.len());
999            let chunk_pairs = &pairs_for_gpu[chunk_start..chunk_end]; // unique-indexed
1000            let chunk_pair_flat = &pair_flat[chunk_start..chunk_end];
1001            let chunk_pair_group = &pair_group[chunk_start..chunk_end];
1002            let flat = gpu.matching_stats_batched_flat(&corpus, chunk_pairs);
1003            let fstate_all = flat.fstate_all();
1004            let fmatch_all = flat.fmatch_all();
1005            let chunk_edges: Vec<(u32, u32, u32, f64)> = (0..chunk_pairs.len())
1006                .into_par_iter()
1007                .filter_map(|slot| {
1008                    let orig = flat.pair_orig_idx[slot] as usize;
1009                    let (au, bu) = chunk_pairs[orig]; // unique indices for SAM/chars
1010                    let (af, bf) = chunk_pair_flat[orig]; // flat indices for demux
1011                    let gi = chunk_pair_group[orig];
1012                    let lo_st = flat.out_offsets[slot] as usize;
1013                    let hi_st = flat.out_offsets[slot + 1] as usize;
1014                    let fstate = &fstate_all[lo_st..hi_st];
1015                    let fmatch = &fmatch_all[lo_st..hi_st];
1016                    let ratio = crate::gestalt::gestalt_edge_with_ms_delta(
1017                        &unique_chars[au as usize],
1018                        &unique_chars[bu as usize],
1019                        &unique_sams[bu as usize],
1020                        fstate,
1021                        fmatch,
1022                        threshold,
1023                        delta,
1024                    )?;
1025                    Some((af, bf, gi, ratio))
1026                })
1027                .collect();
1028            edges_acc.extend(chunk_edges);
1029        }
1030
1031        for (af, bf, gi, ratio) in edges_acc {
1032            let base = group_offsets[gi as usize];
1033            per_group_edges[gi as usize].push((
1034                af as usize - base,
1035                bf as usize - base,
1036                ratio,
1037            ));
1038        }
1039    }
1040
1041    // Non-ASCII groups: handle with the pure CPU path (rare).
1042    for (gi, ascii) in group_ascii.iter().enumerate() {
1043        if !ascii {
1044            let lo = group_offsets[gi];
1045            let hi = group_offsets[gi + 1];
1046            // Materialise the group's per-flat-position SAMs + chars via flat_to_unique (cloning
1047            // — see comment on assemble below: SAM-clone is much cheaper than SAM-build, so on
1048            // dedupe-heavy real workloads the E saving still dominates).
1049            let local_chars: Vec<Vec<char>> = (lo..hi)
1050                .map(|i| unique_chars[flat_to_unique[i] as usize].clone())
1051                .collect();
1052            let clusters = cluster_canonicals_chars(&local_chars, threshold);
1053            per_group_edges[gi].clear();
1054            let mut out: Vec<Vec<(Vec<usize>, f64)>> = Vec::with_capacity(groups.len());
1055            for (k, edges) in per_group_edges.into_iter().enumerate() {
1056                if k == gi {
1057                    out.push(clusters.clone());
1058                } else {
1059                    let n_k = group_offsets[k + 1] - group_offsets[k];
1060                    let lo_k = group_offsets[k];
1061                    let chars_k: Vec<Vec<char>> = (lo_k..lo_k + n_k)
1062                        .map(|i| unique_chars[flat_to_unique[i] as usize].clone())
1063                        .collect();
1064                    let sams_k: Vec<crate::gestalt::Sam> = (lo_k..lo_k + n_k)
1065                        .map(|i| unique_sams[flat_to_unique[i] as usize].clone())
1066                        .collect();
1067                    out.push(assemble(n_k, edges, &chars_k, &sams_k));
1068                }
1069            }
1070            return out;
1071        }
1072    }
1073
1074    // Assemble per-group on CPU. Each group is independent; parallelize across groups. We
1075    // materialise per-group chars/sams from unique storage; SAM-clone is a Vec memcpy of the
1076    // already-built tables — much cheaper than re-building SAMs from scratch.
1077    (0..groups.len())
1078        .into_par_iter()
1079        .map(|gi| {
1080            let lo = group_offsets[gi];
1081            let hi = group_offsets[gi + 1];
1082            let n = hi - lo;
1083            let chars_k: Vec<Vec<char>> = (lo..hi)
1084                .map(|i| unique_chars[flat_to_unique[i] as usize].clone())
1085                .collect();
1086            let sams_k: Vec<crate::gestalt::Sam> = (lo..hi)
1087                .map(|i| unique_sams[flat_to_unique[i] as usize].clone())
1088                .collect();
1089            assemble(n, std::mem::take(&mut per_group_edges[gi].clone()), &chars_k, &sams_k)
1090        })
1091        .collect()
1092}
1093
1094/// Stage-4b: do the per-pair `matching_stats` walk in ONE GPU dispatch instead of CPU rayon's
1095/// per-pair scan. The CPU side keeps doing the cheap stuff — length blocking, `quick_ratio`
1096/// filter (these kill 70–90% of pairs without ever computing `matching_stats`), and the small
1097/// `longest_in` stack walk per surviving pair (data-dependent depth, poor GPU fit).
1098///
1099/// Bit-identical to [`crate::cluster_canonicals_chars`] under any threshold ≥ 0; the kernel
1100/// computes the same `(fstate, fmatch)` byte-for-byte, and `gestalt::gestalt_edge_with_ms`
1101/// runs the same recursion + early-exit on the host with those arrays.
1102///
1103/// Cost model: per-call GPU setup ≈ 5 ms (sort+offsets+upload) + dispatch ≈ proportional to
1104/// the number of surviving-pair bytes. For groups smaller than `DFGPU_CLUSTER_THRESHOLD` the
1105/// caller stays on the pure-CPU path so we don't pay that fixed cost for nothing.
1106#[cfg(all(feature = "gpu", target_os = "macos"))]
1107#[allow(clippy::too_many_lines)]
1108fn cluster_canonicals_chars_via_gpu(
1109    gpu: &Gpu,
1110    chars: &[Vec<char>],
1111    threshold: f64,
1112    delta: f64,
1113) -> Vec<(Vec<usize>, f64)> {
1114    use rayon::prelude::*;
1115
1116    use crate::gpu::CorpusGpu;
1117    use crate::{assemble, char_counts, quick_ratio_counts, real_quick_ratio};
1118
1119    let n = chars.len();
1120    if n < 2 {
1121        return Vec::new();
1122    }
1123
1124    // Prebuild every string's SAM (rayon-parallel).
1125    let sams: Vec<crate::gestalt::Sam> =
1126        chars.par_iter().map(|c| crate::gestalt::build_sam(c)).collect();
1127
1128    // ASCII byte view of each string for the GPU corpus arena. We already enforced ASCII at
1129    // the call site (`Rationer::cluster_canonicals_chars`), so this cast is exact.
1130    let bytes: Vec<Vec<u8>> = chars.iter().map(|c| c.iter().map(|&ch| ch as u8).collect()).collect();
1131    let byte_refs: Vec<&[u8]> = bytes.iter().map(Vec::as_slice).collect();
1132    let corpus = CorpusGpu::build(gpu, &byte_refs, &sams);
1133
1134    // CPU-side filter: length blocking + quick_ratio. Produces candidate (i, j) pairs with i < j.
1135    // Length-sorted outer loop + break-on-length-bound is the same shape as the canonical
1136    // qualifying_pairs path, so we don't visit any pair we'd skip there.
1137    let mut order: Vec<usize> = (0..n).collect();
1138    order.sort_by_key(|&i| chars[i].len());
1139    let counts: Vec<Vec<(char, u32)>> = chars.par_iter().map(|c| char_counts(c)).collect();
1140    let candidates: Vec<(u32, u32)> = (0..n)
1141        .into_par_iter()
1142        .flat_map_iter(|p| {
1143            let i = order[p];
1144            let mut local: Vec<(u32, u32)> = Vec::new();
1145            for &j in &order[p + 1..] {
1146                if real_quick_ratio(&chars[i], &chars[j]) < threshold {
1147                    break;
1148                }
1149                if quick_ratio_counts(&counts[i], &counts[j], chars[i].len() + chars[j].len())
1150                    < threshold
1151                {
1152                    continue;
1153                }
1154                // Production gestalt_edge(lo, hi, sam_b=sam_hi) — replicate ordering.
1155                let (lo, hi) = if i < j { (i, j) } else { (j, i) };
1156                #[allow(clippy::cast_possible_truncation)]
1157                local.push((lo as u32, hi as u32));
1158            }
1159            local
1160        })
1161        .collect();
1162
1163    if candidates.is_empty() {
1164        return assemble(n, Vec::new(), chars, &sams);
1165    }
1166
1167    // GPU pass: matching_stats kernel + CPU `gestalt_edge_with_ms` per pair. (Stage 4d/4f
1168    // full_ratio kernel removed — lost 3× against this path on every canonical-Python corpus;
1169    // see `src/new/PERF_MAP.md`.)
1170    let pairs_for_gpu: Vec<(u32, u32)> = candidates.iter().copied().collect();
1171    let flat = gpu.matching_stats_batched_flat(&corpus, &pairs_for_gpu);
1172    let fstate_all = flat.fstate_all();
1173    let fmatch_all = flat.fmatch_all();
1174    let edges: Vec<(usize, usize, f64)> = (0..pairs_for_gpu.len())
1175        .into_par_iter()
1176        .filter_map(|slot| {
1177            let orig = flat.pair_orig_idx[slot] as usize;
1178            let (a_idx, b_idx) = candidates[orig];
1179            let lo_state = flat.out_offsets[slot] as usize;
1180            let hi_state = flat.out_offsets[slot + 1] as usize;
1181            let fstate = &fstate_all[lo_state..hi_state];
1182            let fmatch = &fmatch_all[lo_state..hi_state];
1183            let ratio = crate::gestalt::gestalt_edge_with_ms_delta(
1184                &chars[a_idx as usize],
1185                &chars[b_idx as usize],
1186                &sams[b_idx as usize],
1187                fstate,
1188                fmatch,
1189                threshold,
1190                delta,
1191            )?;
1192            Some((a_idx as usize, b_idx as usize, ratio))
1193        })
1194        .collect();
1195
1196    assemble(n, edges, chars, &sams)
1197}
1198
1199// SAFETY: All owned fields (Gpu, BoostGuard) are Send+Sync (Gpu wraps Metal handles documented
1200// thread-safe; BoostGuard owns a kernel object referenced by id). No interior mutability.
1201#[cfg(all(feature = "gpu", target_os = "macos"))]
1202unsafe impl Send for Rationer {}
1203#[cfg(all(feature = "gpu", target_os = "macos"))]
1204unsafe impl Sync for Rationer {}