Skip to main content

nexus_core/
module_e.rs

1use std::fs::{self, File};
2use std::io::BufWriter;
3use std::path::PathBuf;
4use std::time::Instant;
5
6use anyhow::{Context, Result};
7use fst::{IntoStreamer, Streamer};
8use memmap2::MmapOptions;
9use trie_rs::{Trie, TrieBuilder};
10
11const MEMORY_TARGET_BYTES: u64 = 1_000_000_000;
12const LATENCY_TARGET_MEAN_NS: f64 = 1_000.0;
13const OFFSET_STEP: u64 = 4096;
14const KEYS_PER_TENANT: u64 = 1000;
15#[cfg(feature = "dev-mode")]
16const TRIE_SHARD_KEYS: u64 = 500_000;
17#[cfg(not(feature = "dev-mode"))]
18const TRIE_SHARD_KEYS: u64 = 2_000_000;
19
20#[derive(Debug, Clone)]
21pub struct ModuleEFstConfig {
22    pub keys: u64,
23    pub lookups: u64,
24    pub list_queries: u64,
25    pub index_path: PathBuf,
26}
27
28#[derive(Debug, Clone)]
29pub struct ModuleETrieConfig {
30    pub keys: u64,
31    pub lookups: u64,
32    pub list_queries: u64,
33}
34
35#[derive(Debug, Clone)]
36pub struct ModuleECompareConfig {
37    pub keys: u64,
38    pub lookups: u64,
39    pub list_queries: u64,
40    pub index_path: PathBuf,
41}
42
43#[derive(Debug, Clone)]
44pub struct ModuleEFstStats {
45    pub keys: u64,
46    pub lookups: u64,
47    pub list_queries: u64,
48    pub build_elapsed_ms: f64,
49    pub query_elapsed_ms: f64,
50    pub index_size_bytes: u64,
51    pub vm_rss_bytes: u64,
52    pub lookup_mean_ns: f64,
53    pub lookup_p50_ns: u64,
54    pub lookup_p99_ns: u64,
55    pub list_seek_mean_ns: f64,
56    pub list_seek_p50_ns: u64,
57    pub list_seek_p99_ns: u64,
58    pub memory_target_met: bool,
59    pub latency_target_met: bool,
60    pub index_path: PathBuf,
61}
62
63impl ModuleEFstStats {
64    pub fn to_json(&self) -> String {
65        format!(
66            "{{\"module\":\"E_FST\",\"keys\":{},\"lookups\":{},\"list_queries\":{},\"build_elapsed_ms\":{:.3},\"query_elapsed_ms\":{:.3},\"index_size_bytes\":{},\"vm_rss_bytes\":{},\"lookup_mean_ns\":{:.2},\"lookup_p50_ns\":{},\"lookup_p99_ns\":{},\"list_seek_mean_ns\":{:.2},\"list_seek_p50_ns\":{},\"list_seek_p99_ns\":{},\"memory_target_met\":{},\"latency_target_met\":{},\"index_path\":\"{}\"}}",
67            self.keys,
68            self.lookups,
69            self.list_queries,
70            self.build_elapsed_ms,
71            self.query_elapsed_ms,
72            self.index_size_bytes,
73            self.vm_rss_bytes,
74            self.lookup_mean_ns,
75            self.lookup_p50_ns,
76            self.lookup_p99_ns,
77            self.list_seek_mean_ns,
78            self.list_seek_p50_ns,
79            self.list_seek_p99_ns,
80            self.memory_target_met,
81            self.latency_target_met,
82            self.index_path.display()
83        )
84    }
85}
86
87#[derive(Debug, Clone)]
88pub struct ModuleETrieStats {
89    pub keys: u64,
90    pub lookups: u64,
91    pub list_queries: u64,
92    pub build_elapsed_ms: f64,
93    pub query_elapsed_ms: f64,
94    pub vm_rss_bytes: u64,
95    pub lookup_mean_ns: f64,
96    pub lookup_p50_ns: u64,
97    pub lookup_p99_ns: u64,
98    pub list_seek_mean_ns: f64,
99    pub list_seek_p50_ns: u64,
100    pub list_seek_p99_ns: u64,
101    pub memory_target_met: bool,
102    pub latency_target_met: bool,
103}
104
105impl ModuleETrieStats {
106    pub fn to_json(&self) -> String {
107        format!(
108            "{{\"module\":\"E_TRIE\",\"keys\":{},\"lookups\":{},\"list_queries\":{},\"build_elapsed_ms\":{:.3},\"query_elapsed_ms\":{:.3},\"vm_rss_bytes\":{},\"lookup_mean_ns\":{:.2},\"lookup_p50_ns\":{},\"lookup_p99_ns\":{},\"list_seek_mean_ns\":{:.2},\"list_seek_p50_ns\":{},\"list_seek_p99_ns\":{},\"memory_target_met\":{},\"latency_target_met\":{}}}",
109            self.keys,
110            self.lookups,
111            self.list_queries,
112            self.build_elapsed_ms,
113            self.query_elapsed_ms,
114            self.vm_rss_bytes,
115            self.lookup_mean_ns,
116            self.lookup_p50_ns,
117            self.lookup_p99_ns,
118            self.list_seek_mean_ns,
119            self.list_seek_p50_ns,
120            self.list_seek_p99_ns,
121            self.memory_target_met,
122            self.latency_target_met
123        )
124    }
125}
126
127#[derive(Debug, Clone)]
128pub struct ModuleECompareStats {
129    pub fst: ModuleEFstStats,
130    pub trie: ModuleETrieStats,
131    pub recommendation: String,
132}
133
134impl ModuleECompareStats {
135    pub fn to_json(&self) -> String {
136        format!(
137            "{{\"module\":\"E_COMPARE\",\"recommendation\":\"{}\",\"fst\":{},\"trie\":{}}}",
138            self.recommendation,
139            self.fst.to_json(),
140            self.trie.to_json()
141        )
142    }
143}
144
145pub fn run_fst(config: ModuleEFstConfig) -> Result<ModuleEFstStats> {
146    validate_workload(config.keys, config.lookups, config.list_queries)?;
147
148    if let Some(parent) = config.index_path.parent() {
149        if !parent.as_os_str().is_empty() {
150            fs::create_dir_all(parent)
151                .with_context(|| format!("failed creating index parent {}", parent.display()))?;
152        }
153    }
154
155    let build_start = Instant::now();
156    {
157        let file = File::create(&config.index_path)
158            .with_context(|| format!("failed creating index {}", config.index_path.display()))?;
159        let writer = BufWriter::with_capacity(8 * 1024 * 1024, file);
160        let mut builder =
161            fst::MapBuilder::new(writer).context("failed creating fst map builder")?;
162
163        for i in 0..config.keys {
164            let key = key_for(i);
165            let value = i
166                .checked_mul(OFFSET_STEP)
167                .context("offset overflow while building fst")?;
168            builder
169                .insert(key, value)
170                .with_context(|| format!("failed inserting fst key at index {}", i))?;
171        }
172
173        builder.finish().context("failed finishing fst map")?;
174    }
175    let build_elapsed_ms = build_start.elapsed().as_secs_f64() * 1_000.0;
176
177    let index_size_bytes = fs::metadata(&config.index_path)
178        .with_context(|| format!("failed to stat fst index {}", config.index_path.display()))?
179        .len();
180
181    let query_start = Instant::now();
182    let file = File::open(&config.index_path)
183        .with_context(|| format!("failed opening fst index {}", config.index_path.display()))?;
184    let mapped = unsafe { MmapOptions::new().map(&file) }
185        .with_context(|| format!("failed mmap fst index {}", config.index_path.display()))?;
186    let map = fst::Map::new(&mapped[..]).context("failed loading mmap-backed fst map")?;
187
188    let mut lookup_latencies = Vec::with_capacity(config.lookups as usize);
189    let mut lookup_rng = fastrand::Rng::with_seed(0xC001_D00D_0001);
190    for _ in 0..config.lookups {
191        let idx = lookup_rng.u64(..config.keys);
192        let key = key_for(idx);
193        let start = Instant::now();
194        let value = map
195            .get(key.as_bytes())
196            .ok_or_else(|| anyhow::anyhow!("fst lookup missing key {}", key))?;
197        let elapsed_ns = start.elapsed().as_nanos() as u64;
198        let expected = idx
199            .checked_mul(OFFSET_STEP)
200            .context("offset overflow while validating fst lookup")?;
201        if value != expected {
202            anyhow::bail!(
203                "fst lookup value mismatch: expected {}, got {} for key {}",
204                expected,
205                value,
206                key
207            );
208        }
209        lookup_latencies.push(elapsed_ns);
210    }
211
212    let mut list_seek_latencies = Vec::with_capacity(config.list_queries as usize);
213    let mut list_rng = fastrand::Rng::with_seed(0xC001_D00D_0002);
214    for _ in 0..config.list_queries {
215        let idx = list_rng.u64(..config.keys);
216        let prefix = prefix_for_index(idx);
217        let start = Instant::now();
218
219        let mut range = map.range().ge(prefix.as_bytes());
220        if let Some(upper) = prefix_upper_bound(prefix.as_bytes()) {
221            range = range.lt(upper);
222        }
223        let mut stream = range.into_stream();
224        let elapsed_ns = start.elapsed().as_nanos() as u64;
225
226        let Some((key, _value)) = stream.next() else {
227            anyhow::bail!("fst prefix scan returned no result for prefix {}", prefix);
228        };
229        if !key.starts_with(prefix.as_bytes()) {
230            anyhow::bail!(
231                "fst prefix scan returned key outside prefix: prefix={}, key={}",
232                prefix,
233                String::from_utf8_lossy(key)
234            );
235        }
236
237        list_seek_latencies.push(elapsed_ns);
238    }
239
240    let query_elapsed_ms = query_start.elapsed().as_secs_f64() * 1_000.0;
241    let vm_rss_bytes = read_vm_rss_bytes();
242
243    let lookup = summarize_latencies(&mut lookup_latencies);
244    let list_seek = summarize_latencies(&mut list_seek_latencies);
245
246    let memory_target_met = vm_rss_bytes < MEMORY_TARGET_BYTES;
247    let latency_target_met =
248        lookup.mean_ns < LATENCY_TARGET_MEAN_NS && list_seek.mean_ns < LATENCY_TARGET_MEAN_NS;
249
250    Ok(ModuleEFstStats {
251        keys: config.keys,
252        lookups: config.lookups,
253        list_queries: config.list_queries,
254        build_elapsed_ms,
255        query_elapsed_ms,
256        index_size_bytes,
257        vm_rss_bytes,
258        lookup_mean_ns: lookup.mean_ns,
259        lookup_p50_ns: lookup.p50_ns,
260        lookup_p99_ns: lookup.p99_ns,
261        list_seek_mean_ns: list_seek.mean_ns,
262        list_seek_p50_ns: list_seek.p50_ns,
263        list_seek_p99_ns: list_seek.p99_ns,
264        memory_target_met,
265        latency_target_met,
266        index_path: config.index_path,
267    })
268}
269
270pub fn run_trie(config: ModuleETrieConfig) -> Result<ModuleETrieStats> {
271    validate_workload(config.keys, config.lookups, config.list_queries)?;
272
273    let shard_count = config.keys.div_ceil(TRIE_SHARD_KEYS) as usize;
274    let mut build_elapsed_ms = 0.0_f64;
275    let mut query_elapsed_ms = 0.0_f64;
276    let mut vm_rss_peak_bytes = 0_u64;
277    let mut lookup_latencies = Vec::with_capacity(config.lookups as usize);
278    let mut list_seek_latencies = Vec::with_capacity(config.list_queries as usize);
279    let mut lookup_rng = fastrand::Rng::with_seed(0xC001_D00D_0003);
280    let mut list_rng = fastrand::Rng::with_seed(0xC001_D00D_0004);
281
282    for shard_idx in 0..shard_count {
283        let shard_start = shard_idx as u64 * TRIE_SHARD_KEYS;
284        let shard_end = (shard_start + TRIE_SHARD_KEYS).min(config.keys);
285        let shard_keys = shard_end.saturating_sub(shard_start);
286        if shard_keys == 0 {
287            continue;
288        }
289
290        let build_start = Instant::now();
291        let mut builder = TrieBuilder::<u8>::new();
292        for i in shard_start..shard_end {
293            let key = key_for(i);
294            builder.push(key.as_bytes());
295        }
296        let trie: Trie<u8> = builder.build();
297        build_elapsed_ms += build_start.elapsed().as_secs_f64() * 1_000.0;
298        vm_rss_peak_bytes = vm_rss_peak_bytes.max(read_vm_rss_bytes());
299
300        let shard_lookup_queries = split_queries(config.lookups, shard_idx, shard_count);
301        let shard_list_queries = split_queries(config.list_queries, shard_idx, shard_count);
302
303        let query_start = Instant::now();
304        for _ in 0..shard_lookup_queries {
305            let idx = shard_start + lookup_rng.u64(..shard_keys);
306            let key = key_for(idx);
307
308            let start = Instant::now();
309            let found = trie.exact_match(key.as_bytes());
310            let elapsed_ns = start.elapsed().as_nanos() as u64;
311            if !found {
312                anyhow::bail!("trie lookup missing key {}", key);
313            }
314            lookup_latencies.push(elapsed_ns);
315        }
316        for _ in 0..shard_list_queries {
317            let idx = shard_start + list_rng.u64(..shard_keys);
318            let prefix = prefix_for_index(idx);
319
320            let start = Instant::now();
321            let first = trie
322                .predictive_search::<Vec<u8>, _>(prefix.as_bytes())
323                .next();
324            let elapsed_ns = start.elapsed().as_nanos() as u64;
325
326            let Some(key) = first else {
327                anyhow::bail!("trie prefix scan returned no result for prefix {}", prefix);
328            };
329            if !key.starts_with(prefix.as_bytes()) {
330                anyhow::bail!(
331                    "trie prefix scan returned key outside prefix: prefix={}, key={}",
332                    prefix,
333                    String::from_utf8_lossy(&key)
334                );
335            }
336            list_seek_latencies.push(elapsed_ns);
337        }
338        query_elapsed_ms += query_start.elapsed().as_secs_f64() * 1_000.0;
339    }
340
341    if lookup_latencies.len() as u64 != config.lookups {
342        anyhow::bail!(
343            "trie lookup sample mismatch: expected {}, got {}",
344            config.lookups,
345            lookup_latencies.len()
346        );
347    }
348    if list_seek_latencies.len() as u64 != config.list_queries {
349        anyhow::bail!(
350            "trie list sample mismatch: expected {}, got {}",
351            config.list_queries,
352            list_seek_latencies.len()
353        );
354    }
355
356    let lookup = summarize_latencies(&mut lookup_latencies);
357    let list_seek = summarize_latencies(&mut list_seek_latencies);
358
359    let memory_target_met = vm_rss_peak_bytes < MEMORY_TARGET_BYTES;
360    let latency_target_met =
361        lookup.mean_ns < LATENCY_TARGET_MEAN_NS && list_seek.mean_ns < LATENCY_TARGET_MEAN_NS;
362
363    Ok(ModuleETrieStats {
364        keys: config.keys,
365        lookups: config.lookups,
366        list_queries: config.list_queries,
367        build_elapsed_ms,
368        query_elapsed_ms,
369        vm_rss_bytes: vm_rss_peak_bytes,
370        lookup_mean_ns: lookup.mean_ns,
371        lookup_p50_ns: lookup.p50_ns,
372        lookup_p99_ns: lookup.p99_ns,
373        list_seek_mean_ns: list_seek.mean_ns,
374        list_seek_p50_ns: list_seek.p50_ns,
375        list_seek_p99_ns: list_seek.p99_ns,
376        memory_target_met,
377        latency_target_met,
378    })
379}
380
381fn split_queries(total_queries: u64, shard_idx: usize, shard_count: usize) -> u64 {
382    if shard_count == 0 {
383        return 0;
384    }
385    let base = total_queries / shard_count as u64;
386    let remainder = total_queries % shard_count as u64;
387    if (shard_idx as u64) < remainder {
388        base + 1
389    } else {
390        base
391    }
392}
393
394pub fn run_compare(config: ModuleECompareConfig) -> Result<ModuleECompareStats> {
395    let fst = run_fst(ModuleEFstConfig {
396        keys: config.keys,
397        lookups: config.lookups,
398        list_queries: config.list_queries,
399        index_path: config.index_path,
400    })?;
401
402    let trie = run_trie(ModuleETrieConfig {
403        keys: config.keys,
404        lookups: config.lookups,
405        list_queries: config.list_queries,
406    })?;
407
408    let recommendation = pick_recommendation(&fst, &trie);
409
410    Ok(ModuleECompareStats {
411        fst,
412        trie,
413        recommendation,
414    })
415}
416
417fn validate_workload(keys: u64, lookups: u64, list_queries: u64) -> Result<()> {
418    if keys == 0 {
419        anyhow::bail!("keys must be > 0");
420    }
421    if lookups == 0 {
422        anyhow::bail!("lookups must be > 0");
423    }
424    if list_queries == 0 {
425        anyhow::bail!("list_queries must be > 0");
426    }
427
428    Ok(())
429}
430
431fn key_for(index: u64) -> String {
432    let tenant = index / KEYS_PER_TENANT;
433    format!("tenant-{tenant:08}/production/images/2026/02/25/image-{index:012}.jpg")
434}
435
436fn prefix_for_index(index: u64) -> String {
437    let tenant = index / KEYS_PER_TENANT;
438    format!("tenant-{tenant:08}/production/images/2026/02/25/image-")
439}
440
441fn prefix_upper_bound(prefix: &[u8]) -> Option<Vec<u8>> {
442    let mut upper = prefix.to_vec();
443    for idx in (0..upper.len()).rev() {
444        if upper[idx] != 0xFF {
445            upper[idx] += 1;
446            upper.truncate(idx + 1);
447            return Some(upper);
448        }
449    }
450    None
451}
452
453fn read_vm_rss_bytes() -> u64 {
454    let Ok(status) = fs::read_to_string("/proc/self/status") else {
455        return 0;
456    };
457
458    for line in status.lines() {
459        if !line.starts_with("VmRSS:") {
460            continue;
461        }
462
463        let mut parts = line.split_whitespace();
464        let _ = parts.next();
465        let Some(value_kib) = parts.next() else {
466            return 0;
467        };
468
469        if let Ok(kib) = value_kib.parse::<u64>() {
470            return kib.saturating_mul(1024);
471        }
472        return 0;
473    }
474
475    0
476}
477
478#[derive(Debug, Clone, Copy)]
479struct LatencySummary {
480    mean_ns: f64,
481    p50_ns: u64,
482    p99_ns: u64,
483}
484
485fn summarize_latencies(samples: &mut [u64]) -> LatencySummary {
486    if samples.is_empty() {
487        return LatencySummary {
488            mean_ns: 0.0,
489            p50_ns: 0,
490            p99_ns: 0,
491        };
492    }
493
494    let sum = samples.iter().copied().map(|v| v as f64).sum::<f64>();
495    let mean_ns = sum / samples.len() as f64;
496
497    samples.sort_unstable();
498    let p50_idx = percentile_index(samples.len(), 50);
499    let p99_idx = percentile_index(samples.len(), 99);
500
501    LatencySummary {
502        mean_ns,
503        p50_ns: samples[p50_idx],
504        p99_ns: samples[p99_idx],
505    }
506}
507
508fn percentile_index(len: usize, percentile: usize) -> usize {
509    if len == 0 {
510        return 0;
511    }
512    let rank = ((len as f64) * (percentile as f64 / 100.0)).ceil() as usize;
513    rank.saturating_sub(1).min(len - 1)
514}
515
516fn pick_recommendation(fst: &ModuleEFstStats, trie: &ModuleETrieStats) -> String {
517    match (fst.memory_target_met, trie.memory_target_met) {
518        (true, false) => return "fst".to_string(),
519        (false, true) => return "trie-rs".to_string(),
520        _ => {}
521    }
522
523    if fst.lookup_p50_ns < trie.lookup_p50_ns {
524        return "fst".to_string();
525    }
526    if trie.lookup_p50_ns < fst.lookup_p50_ns {
527        return "trie-rs".to_string();
528    }
529
530    if fst.build_elapsed_ms <= trie.build_elapsed_ms {
531        "fst".to_string()
532    } else {
533        "trie-rs".to_string()
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use std::time::{SystemTime, UNIX_EPOCH};
540
541    use super::*;
542
543    #[test]
544    fn keys_are_sorted_and_unique() {
545        let mut prev = key_for(0);
546        for idx in 1..20_000 {
547            let next = key_for(idx);
548            assert!(next > prev, "keys must be strictly increasing");
549            prev = next;
550        }
551    }
552
553    #[test]
554    fn prefix_upper_bound_increments_suffix() {
555        let upper = prefix_upper_bound(b"abc").expect("upper bound should exist");
556        assert_eq!(upper, b"abd");
557    }
558
559    #[test]
560    fn fst_small_round_trip() {
561        let path = unique_index_path("module-e-fst-small");
562        let stats = run_fst(ModuleEFstConfig {
563            keys: 10_000,
564            lookups: 2_000,
565            list_queries: 2_000,
566            index_path: path.clone(),
567        })
568        .expect("small fst run should succeed");
569
570        assert_eq!(stats.keys, 10_000);
571        assert!(stats.index_size_bytes > 0);
572        assert!(stats.lookup_p99_ns >= stats.lookup_p50_ns);
573
574        std::fs::remove_file(path).expect("temporary fst file should be removable");
575    }
576
577    #[test]
578    fn trie_small_round_trip() {
579        let stats = run_trie(ModuleETrieConfig {
580            keys: 10_000,
581            lookups: 2_000,
582            list_queries: 2_000,
583        })
584        .expect("small trie run should succeed");
585
586        assert_eq!(stats.keys, 10_000);
587        assert!(stats.lookup_p99_ns >= stats.lookup_p50_ns);
588    }
589
590    fn unique_index_path(prefix: &str) -> PathBuf {
591        let nanos = SystemTime::now()
592            .duration_since(UNIX_EPOCH)
593            .expect("clock should be after unix epoch")
594            .as_nanos();
595        std::env::temp_dir().join(format!("{}-{}-{}.fst", prefix, std::process::id(), nanos))
596    }
597}