Skip to main content

ix/
builder.rs

1//! Index builder — the complete pipeline from files to .ix shard.
2//!
3//! Phase 1: Discovery (walk directory, respect .gitignore)
4//! Phase 2: Scan (mmap, check binary, extract trigrams, bloom filter)
5//! Phase 3: Serialize (write sections, compute CRCs, atomic rename)
6
7use crate::bloom::BloomFilter;
8use crate::decompress::maybe_decompress;
9use crate::error::{Error, Result};
10use crate::format::{
11    FileStatus, HEADER_SIZE, MAGIC, VERSION_MAJOR, VERSION_MINOR, flags, is_binary,
12};
13use crate::posting::{PostingEntry, PostingList};
14use crate::trigram::{Extractor, Trigram};
15use crate::varint;
16use ignore::WalkBuilder;
17use libc;
18use llmosafe::ResourceGuard;
19use memmap2::Mmap;
20use std::collections::{BinaryHeap, HashMap};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24use std::time::{Instant, SystemTime, UNIX_EPOCH};
25
26/// Index builder — orchestrates the full pipeline from file discovery to shard
27/// serialization.
28pub struct Builder {
29    root: PathBuf,
30    ix_dir: PathBuf,
31    file_count: u32,
32
33    // O(1) memory streaming writers for temporary file table and blooms.
34    // Wrapped in Option so they can be recreated per-build (clean-before-build pattern).
35    files_writer: Option<BufWriter<File>>,
36    blooms_writer: Option<BufWriter<File>>,
37    strings_writer: Option<BufWriter<File>>,
38
39    // Postings batching for external sort
40    postings: HashMap<Trigram, Vec<PostingEntry>>,
41    postings_count: usize,
42    temp_runs: Vec<PathBuf>,
43
44    extractor: Extractor,
45    stats: BuildStats,
46    decompress: bool,
47    resource_guard: Option<ResourceGuard>,
48    dead_ends: Vec<PathBuf>,
49    max_file_size: u64,
50    committed: bool,
51}
52
53/// Accumulated metrics collected during a build run.
54#[derive(Default, Debug)]
55pub struct BuildStats {
56    /// Number of files processed (text only, after filtering).
57    pub files_scanned: u64,
58    /// Number of files skipped because they were detected as binary.
59    pub files_skipped_binary: u64,
60    /// Number of files skipped because they exceeded the size limit.
61    pub files_skipped_size: u64,
62    /// Total number of raw bytes scanned across all files.
63    pub bytes_scanned: u64,
64    /// Number of distinct trigrams discovered across all files.
65    pub unique_trigrams: u64,
66}
67
68struct RunIterator {
69    file: BufReader<File>,
70}
71
72impl RunIterator {
73    fn new(path: &Path) -> Result<Self> {
74        let f = File::open(path)?;
75        Ok(Self {
76            file: BufReader::new(f),
77        })
78    }
79
80    fn next_trigram(&mut self) -> Result<Option<(Trigram, Vec<PostingEntry>)>> {
81        let mut tri_buf = [0u8; 4];
82        if let Err(e) = self.file.read_exact(&mut tri_buf) {
83            if e.kind() == std::io::ErrorKind::UnexpectedEof {
84                return Ok(None);
85            }
86            return Err(e.into());
87        }
88        let tri = u32::from_le_bytes(tri_buf);
89
90        let mut len_buf = [0u8; 4];
91        self.file.read_exact(&mut len_buf)?;
92        let entries_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
93
94        let mut entries = Vec::with_capacity(entries_len);
95        for _ in 0..entries_len {
96            self.file.read_exact(&mut len_buf)?;
97            let file_id = u32::from_le_bytes(len_buf);
98
99            self.file.read_exact(&mut len_buf)?;
100            let offsets_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
101
102            let mut offsets = Vec::with_capacity(offsets_len);
103            for _ in 0..offsets_len {
104                self.file.read_exact(&mut len_buf)?;
105                offsets.push(u32::from_le_bytes(len_buf));
106            }
107            entries.push(PostingEntry { file_id, offsets });
108        }
109
110        Ok(Some((tri, entries)))
111    }
112}
113
114#[derive(Eq, PartialEq)]
115struct MergeItem {
116    tri: Trigram,
117    run_idx: usize,
118}
119
120impl PartialOrd for MergeItem {
121    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
122        Some(self.cmp(other))
123    }
124}
125
126impl Ord for MergeItem {
127    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
128        other.tri.cmp(&self.tri) // Min-heap
129    }
130}
131
132#[allow(clippy::as_conversions)] // binary format: usize→u32/u16 for index encoding
133#[allow(clippy::indexing_slicing)] // binary format: fixed-size buffer ops
134impl Builder {
135    /// Creates a new builder for the given root directory.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if the `.ix` directory cannot be created or temp files
140    /// cannot be opened.
141    pub fn new(root: &Path) -> Result<Self> {
142        let ix_dir = root.join(".ix");
143        fs::create_dir_all(&ix_dir)?;
144
145        Ok(Self {
146            root: root.to_owned(),
147            ix_dir,
148            file_count: 0,
149            files_writer: None,
150            blooms_writer: None,
151            strings_writer: None,
152            postings: HashMap::new(),
153            postings_count: 0,
154            temp_runs: Vec::new(),
155            extractor: Extractor::new(),
156            stats: BuildStats::default(),
157            decompress: false,
158            resource_guard: None,
159            dead_ends: Vec::new(),
160            max_file_size: 100 * 1024 * 1024,
161            committed: false,
162        })
163    }
164
165    /// Attach a [`ResourceGuard`] for memory-pressure-driven flush.
166    #[must_use]
167    pub const fn with_resource_guard(mut self, guard: ResourceGuard) -> Self {
168        self.resource_guard = Some(guard);
169        self
170    }
171
172    /// Enable or disable transparent decompression of archives during
173    /// scanning.
174    pub const fn set_decompress(&mut self, decompress: bool) {
175        self.decompress = decompress;
176    }
177
178    /// Set the maximum file size to index (0 = unlimited). Default: 100 MB.
179    pub const fn set_max_file_size(&mut self, max_bytes: u64) {
180        self.max_file_size = max_bytes;
181    }
182
183    /// Initialize temporary files and writers for building.
184    ///
185    /// Called at start of each `build()` to ensure fresh file handles.
186    /// This is the core of the "clean-before-build" pattern: temp files
187    /// are BUILD ARTIFACTS, not struct state — created fresh each build,
188    /// cleaned at NEXT build start, never deleted mid-lifecycle.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if temp files cannot be created.
193    fn init_writers(&mut self) -> Result<()> {
194        let files_tmp = self.ix_dir.join("shard.ix.tmp.files");
195        let blooms_tmp = self.ix_dir.join("shard.ix.tmp.blooms");
196        let strings_tmp = self.ix_dir.join("shard.ix.tmp.strings");
197
198        self.files_writer = Some(BufWriter::new(File::create(&files_tmp)?));
199        self.blooms_writer = Some(BufWriter::new(File::create(&blooms_tmp)?));
200        let mut strings_writer = BufWriter::new(File::create(&strings_tmp)?);
201
202        // Initialize strings table header (10 bytes total):
203        // version(u32) + size(u16) + count(u16) + padding(2)
204        strings_writer.write_all(&1u32.to_le_bytes())?; // version
205        strings_writer.write_all(&0u16.to_le_bytes())?; // size placeholder
206        strings_writer.write_all(&0u16.to_le_bytes())?; // count
207        strings_writer.write_all(&[0u8; 2])?; // padding
208
209        self.strings_writer = Some(strings_writer);
210        Ok(())
211    }
212
213    /// Clean up old temporary files from a previous build.
214    ///
215    /// Called at start of `build()` to release disk space before
216    /// allocating new temp files. On Linux, deleting a file while a
217    /// process holds an open FD keeps the inode alive; dropping the
218    /// old writers first ensures inodes are released.
219    fn cleanup_old_temp_files(&mut self) {
220        // Drop writers first to release inode references on Linux
221        self.files_writer = None;
222        self.blooms_writer = None;
223        self.strings_writer = None;
224
225        let patterns = [
226            "shard.ix.tmp.files",
227            "shard.ix.tmp.blooms",
228            "shard.ix.tmp.strings",
229            "shard.ix.tmp",
230            "shard.ix.bak",
231        ];
232
233        for pattern in &patterns {
234            let path = self.ix_dir.join(pattern);
235            if path.exists() {
236                let _ = fs::remove_file(&path);
237            }
238        }
239
240        if let Ok(entries) = fs::read_dir(&self.ix_dir) {
241            for entry in entries.flatten() {
242                let name = entry.file_name().to_string_lossy().into_owned();
243                if name.starts_with("shard.ix.run.") || name.starts_with("shard.ix.merged.") {
244                    let _ = fs::remove_file(entry.path());
245                }
246            }
247        }
248    }
249    /// Helper to get mutable writer reference with proper error handling.
250    ///
251    /// Returns an error if the writer is not initialized (violates clean-before-build contract).
252    fn get_writer<'a, T>(writer: &'a mut Option<T>, context: &'static str) -> Result<&'a mut T> {
253        writer.as_mut().ok_or_else(|| {
254        Error::Io(std::io::Error::other(format!(
255            "Builder invariant violated: {context} not initialized (clean-before-build contract)"
256        )))
257        })
258    }
259
260    fn flush_run(&mut self) -> Result<()> {
261        if self.postings.is_empty() {
262            return Ok(());
263        }
264        let old_postings = std::mem::take(&mut self.postings);
265        let mut sorted: Vec<_> = old_postings.into_iter().collect();
266        sorted.sort_unstable_by_key(|(t, _)| *t);
267
268        let run_path = self
269            .ix_dir
270            .join(format!("shard.ix.run.{}", self.temp_runs.len()));
271        let mut f = BufWriter::new(File::create(&run_path)?);
272
273        for (tri, entries) in sorted {
274            f.write_all(&tri.to_le_bytes())?;
275            f.write_all(&u32::try_from(entries.len()).unwrap_or(0).to_le_bytes())?;
276            for entry in entries {
277                f.write_all(&entry.file_id.to_le_bytes())?;
278                f.write_all(
279                    &u32::try_from(entry.offsets.len())
280                        .unwrap_or(0)
281                        .to_le_bytes(),
282                )?;
283                for off in entry.offsets {
284                    f.write_all(&off.to_le_bytes())?;
285                }
286            }
287        }
288        f.flush()?;
289
290        self.temp_runs.push(run_path);
291        self.postings_count = 0;
292        Ok(())
293    }
294
295    /// Build or rebuild the index from scratch.
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if the build process fails (I/O, disk space, etc.).
300    pub fn build(&mut self) -> Result<PathBuf> {
301        // === Clean-before-build: release old temp files + create fresh writers ===
302        // This fixes the stale FD bug where serialize() deletes temp files but
303        // Builder keeps BufWriter handles pointing to deleted inodes.
304        // On Linux, dropping the writers releases the inode references first.
305        self.cleanup_old_temp_files();
306        self.init_writers()?;
307
308        // Reset state for fresh build — clear any residual data from previous builds
309        self.postings.clear();
310        self.postings_count = 0;
311        self.temp_runs.clear();
312        self.file_count = 0;
313        self.stats = BuildStats::default();
314        self.dead_ends.clear();
315        self.committed = false;
316
317        // Upfront disk space check: estimate required bytes before any I/O.
318
319        // Peak overhead is ~3× existing shard (old + tmp + bak + runs).
320        // For first builds, use a 200 MB floor.
321        if let Ok(free) = Self::free_bytes_at(&self.ix_dir) {
322            let existing_shard_size =
323                fs::metadata(self.ix_dir.join("shard.ix")).map_or(0, |m| m.len());
324            let required = if existing_shard_size > 0 {
325                existing_shard_size.saturating_mul(3)
326            } else {
327                200 * 1024 * 1024 // 200 MB floor for first builds
328            };
329            if free < required {
330                return Err(Error::Io(std::io::Error::other(format!(
331                    "insufficient disk space: {} MB free, need ≥{} MB (path: {})",
332                    free / 1024 / 1024,
333                    required / 1024 / 1024,
334                    self.ix_dir.display(),
335                ))));
336            }
337        }
338
339        // Cleanup old intermediate shard files before building
340        if self.ix_dir.exists()
341            && let Ok(entries) = std::fs::read_dir(&self.ix_dir)
342        {
343            for entry in entries.flatten() {
344                let name = entry.file_name();
345                let name_str = name.to_string_lossy();
346                if (name_str.starts_with("shard.ix.run.")
347                    || name_str.starts_with("shard.ix.merged."))
348                    && let Err(e) = std::fs::remove_file(entry.path())
349                {
350                    tracing::warn!("Failed to cleanup shard file {}: {}", name_str, e);
351                }
352            }
353        }
354
355        let start = Instant::now();
356        let root = self.root.clone();
357
358        // LLMOSafe Formal Law: Sensitive filesystem traversal (Root)
359        if root.to_string_lossy() == "/" {
360            tracing::warn!(
361                "LLMOSafe Advisory: Indexing root filesystem. Ensure adequate resource guards are in place."
362            );
363        }
364
365        let walker = WalkBuilder::new(&root)
366            .hidden(false)
367            .git_ignore(true)
368            .git_global(true)
369            .git_exclude(true)
370            .require_git(false)
371            .add_custom_ignore_filename(".ixignore")
372            .filter_entry(move |entry| {
373                let path = entry.path();
374                let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
375
376                if entry.file_type().is_some_and(|t| t.is_dir())
377                    && (name == "lost+found" || name == ".git" || name == ".ix")
378                {
379                    return false;
380                }
381
382                if entry.file_type().is_some_and(|t| t.is_file())
383                    && (name == "shard.ix"
384                        || name == "shard.ix.tmp"
385                        || name.starts_with("shard.ix."))
386                {
387                    return false;
388                }
389
390                if entry.file_type().is_some_and(|t| t.is_file()) {
391                    let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
392                    match ext {
393                        "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" | "jpg" | "png"
394                        | "gif" | "mp4" | "mp3" | "pdf" | "zip" | "7z" | "rar" | "sqlite"
395                        | "db" | "bin" => return false,
396                        _ => {}
397                    }
398                    if name.ends_with(".tar.gz") {
399                        return false;
400                    }
401                }
402                true
403            })
404            .build();
405
406        let mut files_processed = 0u64;
407        for entry_res in walker {
408            let entry = match entry_res {
409                Ok(e) => e,
410                Err(e) => {
411                    // Handle KernelError::BacktrackSignaled (-7) during the walk
412                    let backtrack_path = match &e {
413                        ignore::Error::Io(io_err) if io_err.raw_os_error() == Some(-7) => {
414                            Some(None)
415                        }
416                        ignore::Error::WithPath { path, err } => {
417                            if let ignore::Error::Io(io_err) = err.as_ref() {
418                                if io_err.raw_os_error() == Some(-7) {
419                                    Some(Some(path.clone()))
420                                } else {
421                                    None
422                                }
423                            } else {
424                                None
425                            }
426                        }
427                        _ => None,
428                    };
429
430                    if let Some(path_opt) = backtrack_path {
431                        tracing::warn!(
432                            "Immune Memory Triggered: Skipping path due to backtrack signal."
433                        );
434                        if let Some(path) = path_opt {
435                            self.dead_ends.push(path);
436                        }
437                    }
438                    continue;
439                }
440            };
441
442            if entry.file_type().is_some_and(|t| t.is_file()) {
443                self.process_file(entry.path())?;
444                files_processed += 1;
445
446                // Resource Guard Check: check every 250 files to prevent OOM
447                // allow: manual_is_multiple_of — MSRV 1.85 compat; is_multiple_of not available
448                #[allow(clippy::manual_is_multiple_of)]
449                if files_processed % 250 == 0 {
450                    if let Some(guard) = &self.resource_guard {
451                        if let Err(_err) = guard.check() {
452                            eprintln!(
453                                "ixd: memory ceiling reached... flushing intermediate chunk ({files_processed} files processed)"
454                            );
455                            self.flush_run()?;
456                        }
457                    } else {
458                        // Fallback to manual RSS limit if no formal guard provided
459                        if let Ok(rss) = Self::current_rss_bytes()
460                            && rss > 512 * 1024 * 1024
461                        {
462                            eprintln!(
463                                "ixd: RSS ceiling reached ({} MB) after {} files — flushing intermediate chunk",
464                                rss / 1024 / 1024,
465                                files_processed
466                            );
467                            self.flush_run()?;
468                        }
469                    }
470                }
471            }
472        }
473
474        let output_path = self.serialize()?;
475        tracing::info!("Build completed in {:?}: {:?}", start.elapsed(), self.stats);
476        Ok(output_path)
477    }
478
479    /// Update the index for changed files (currently does a full rebuild).
480    ///
481    /// # Errors
482    ///
483    /// Returns an error if the build process fails.
484    pub fn update(&mut self, _changed_files: &[PathBuf]) -> Result<PathBuf> {
485        self.build()
486    }
487
488    /// Number of files indexed so far (snapshot for progress reporting).
489    #[must_use]
490    pub const fn files_len(&self) -> usize {
491        self.file_count as usize
492    }
493
494    /// Number of unique trigrams accumulated so far.
495    #[must_use]
496    pub const fn trigrams_len(&self) -> usize {
497        self.stats.unique_trigrams as usize
498    }
499
500    /// Returns current process RSS in bytes by reading /proc/self/status.
501    fn current_rss_bytes() -> std::io::Result<u64> {
502        let status = std::fs::read_to_string("/proc/self/status")?;
503        for line in status.lines() {
504            if let Some(rest) = line.strip_prefix("VmRSS:") {
505                let kb: u64 = rest
506                    .split_whitespace()
507                    .next()
508                    .and_then(|s| s.parse().ok())
509                    .unwrap_or(0);
510                return Ok(kb * 1024);
511            }
512        }
513        Ok(0)
514    }
515
516    /// Returns free bytes available on the filesystem containing `path`.
517    fn free_bytes_at(path: &Path) -> std::io::Result<u64> {
518        use std::os::unix::ffi::OsStrExt;
519        let path_c = std::ffi::CString::new(path.as_os_str().as_bytes())
520            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
521        let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
522        let ret = unsafe { libc::statvfs(path_c.as_ptr(), &raw mut stat) };
523        if ret != 0 {
524            return Err(std::io::Error::last_os_error());
525        }
526        #[allow(clippy::unnecessary_cast)]
527        Ok(stat.f_bavail as u64 * stat.f_frsize as u64)
528    }
529
530    fn cleanup_temp_files(&self) {
531        let paths = [
532            self.ix_dir.join("shard.ix.tmp.files"),
533            self.ix_dir.join("shard.ix.tmp.blooms"),
534            self.ix_dir.join("shard.ix.tmp.strings"),
535            self.ix_dir.join("shard.ix.tmp"),
536            self.ix_dir.join("shard.ix.bak"),
537        ];
538        for p in &paths {
539            if let Err(e) = fs::remove_file(p)
540                && e.kind() != std::io::ErrorKind::NotFound
541            {
542                tracing::warn!("Failed to cleanup temp file {}: {}", p.display(), e);
543            }
544        }
545        for run_path in &self.temp_runs {
546            if let Err(e) = fs::remove_file(run_path)
547                && e.kind() != std::io::ErrorKind::NotFound
548            {
549                tracing::warn!("Failed to cleanup temp run {}: {}", run_path.display(), e);
550            }
551        }
552    }
553
554    fn process_file(&mut self, path: &Path) -> Result<bool> {
555        let path_str = path.display().to_string();
556        // TOCTOU guard: file may have been deleted between walk and open
557        let metadata = match fs::metadata(path) {
558            Ok(m) => m,
559            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
560            Err(e) => return Err(e.into()),
561        };
562        let size = metadata.len();
563        let mtime = metadata
564            .modified()?
565            .duration_since(UNIX_EPOCH)
566            .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(0));
567
568        if self.max_file_size > 0 && size > self.max_file_size {
569            tracing::debug!("SKIP: file too large ({size} bytes): {path_str}");
570            self.stats.files_skipped_size += 1;
571            return Ok(false);
572        }
573
574        let file = match File::open(path) {
575            Ok(f) => f,
576            Err(e)
577                if e.kind() == std::io::ErrorKind::NotFound
578                    || e.kind() == std::io::ErrorKind::PermissionDenied =>
579            {
580                return Ok(false);
581            }
582            Err(e) => return Err(e.into()),
583        };
584        let mmap = unsafe { Mmap::map(&file)? };
585
586        let raw_data = if self.decompress {
587            if let Some(mut reader) = maybe_decompress(path, &mmap)? {
588                let mut buf = Vec::new();
589                let take_limit = if self.max_file_size > 0 {
590                    self.max_file_size
591                } else {
592                    100 * 1024 * 1024
593                };
594                reader.by_ref().take(take_limit).read_to_end(&mut buf)?;
595                std::borrow::Cow::Owned(buf)
596            } else {
597                std::borrow::Cow::Borrowed(&mmap[..])
598            }
599        } else {
600            std::borrow::Cow::Borrowed(&mmap[..])
601        };
602
603        let data = &raw_data[..];
604        if is_binary(data) {
605            tracing::debug!("SKIP: binary detection ({size} bytes): {path_str}");
606            self.stats.files_skipped_binary += 1;
607            return Ok(false);
608        }
609
610        let content_hash = xxhash_rust::xxh64::xxh64(data, 0);
611        let pairs = self.extractor.extract_with_offsets(data);
612
613        if pairs.is_empty() && !data.is_empty() {
614            tracing::debug!("SKIP: no trigrams extracted ({size} bytes): {path_str}");
615        }
616
617        let file_id = self.file_count;
618        self.file_count += 1;
619
620        let path_str = path.to_string_lossy();
621        let path_bytes = path_str.as_bytes();
622        let path_off = u32::try_from(
623            Self::get_writer(&mut self.strings_writer, "strings_writer")?.stream_position()?,
624        )
625        .unwrap_or(0);
626        let path_len = u16::try_from(path_bytes.len()).unwrap_or(0);
627
628        self.strings_writer
629            .as_mut()
630            .ok_or_else(|| Error::Io(std::io::Error::other("strings_writer not initialized")))?
631            .write_all(&0u16.to_le_bytes())?;
632        self.strings_writer
633            .as_mut()
634            .ok_or_else(|| Error::Io(std::io::Error::other("strings_writer not initialized")))?
635            .write_all(&path_len.to_le_bytes())?;
636        self.strings_writer
637            .as_mut()
638            .ok_or_else(|| Error::Io(std::io::Error::other("strings_writer not initialized")))?
639            .write_all(path_bytes)?;
640
641        let mut bloom = BloomFilter::new(256, 5);
642        let mut trigram_count = 0u32;
643
644        let mut i = 0;
645        while i < pairs.len() {
646            let tri = pairs[i].0;
647            let mut j = i + 1;
648            while j < pairs.len() && pairs[j].0 == tri {
649                j += 1;
650            }
651
652            let take_count = (j - i).min(10_000);
653            let offsets: Vec<u32> = pairs[i..i + take_count].iter().map(|p| p.1).collect();
654
655            bloom.insert(tri);
656            self.postings
657                .entry(tri)
658                .or_default()
659                .push(PostingEntry { file_id, offsets });
660            self.postings_count += take_count + 8;
661
662            trigram_count += 1;
663            i = j;
664        }
665
666        bloom.serialize(
667            self.blooms_writer
668                .as_mut()
669                .ok_or_else(|| Error::Io(std::io::Error::other("blooms_writer not initialized")))?,
670        )?;
671
672        let bloom_offset = u64::from(file_id)
673            .checked_mul(260)
674            .ok_or_else(|| Error::Config("file_id * 260 overflow".into()))?;
675        let bloom_offset_u32 = u32::try_from(bloom_offset)
676            .map_err(|_| Error::Config(format!("bloom_offset {bloom_offset} exceeds u32::MAX")))?;
677        self.files_writer
678            .as_mut()
679            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
680            .write_all(&file_id.to_le_bytes())?;
681        self.files_writer
682            .as_mut()
683            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
684            .write_all(&path_off.to_le_bytes())?;
685        self.files_writer
686            .as_mut()
687            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
688            .write_all(&path_len.to_le_bytes())?;
689        self.files_writer
690            .as_mut()
691            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
692            .write_all(&[FileStatus::Fresh as u8])?;
693        self.files_writer
694            .as_mut()
695            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
696            .write_all(&[0u8])?;
697        self.files_writer
698            .as_mut()
699            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
700            .write_all(&mtime.to_le_bytes())?;
701        self.files_writer
702            .as_mut()
703            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
704            .write_all(&size.to_le_bytes())?;
705        self.files_writer
706            .as_mut()
707            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
708            .write_all(&content_hash.to_le_bytes())?;
709        self.files_writer
710            .as_mut()
711            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
712            .write_all(&trigram_count.to_le_bytes())?;
713        self.files_writer
714            .as_mut()
715            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
716            .write_all(&bloom_offset_u32.to_le_bytes())?;
717        self.files_writer
718            .as_mut()
719            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
720            .write_all(&[0u8; 4])?;
721
722        self.stats.files_scanned += 1;
723        self.stats.bytes_scanned += size;
724
725        if std::env::var("IX_DEBUG_BUILD").is_ok() {
726            eprintln!(
727                "IX-INDEXED: file_id={file_id} unique_trigrams={trigram_count} size={size}: {path_str}"
728            );
729        }
730
731        // Flush every 500k entries (~8MB peak RAM) to prevent unbounded HashMap growth.
732        // This was the RAM DDOS root cause in v0.1.1 — threshold was 5M (far too high).
733        if self.postings_count >= 500_000 {
734            if std::env::var("IX_DEBUG_BUILD").is_ok() {
735                eprintln!(
736                    "IX-FLUSH: postings_count={} after file_id={file_id}",
737                    self.postings_count
738                );
739            }
740            self.flush_run()?;
741        }
742
743        Ok(true)
744    }
745
746    fn serialize(&mut self) -> Result<PathBuf> {
747        self.flush_run()?;
748
749        self.files_writer
750            .as_mut()
751            .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
752            .flush()?;
753        self.blooms_writer
754            .as_mut()
755            .ok_or_else(|| Error::Io(std::io::Error::other("blooms_writer not initialized")))?
756            .flush()?;
757        self.strings_writer
758            .as_mut()
759            .ok_or_else(|| Error::Io(std::io::Error::other("strings_writer not initialized")))?
760            .flush()?;
761
762        // Hierarchical Merge to stay under ulimit
763        while self.temp_runs.len() > 128 {
764            let mut next_generation = Vec::new();
765            for chunk in self.temp_runs.chunks(128) {
766                let out_path = self.ix_dir.join(format!(
767                    "shard.ix.merged.{}.{}",
768                    next_generation.len(),
769                    SystemTime::now()
770                        .duration_since(UNIX_EPOCH)
771                        .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
772                        .as_micros()
773                ));
774                Self::merge_to_run(chunk, &out_path)?;
775                next_generation.push(out_path);
776                for p in chunk {
777                    if let Err(e) = fs::remove_file(p) {
778                        tracing::warn!("Failed to cleanup temp run {}: {}", p.display(), e);
779                    }
780                }
781            }
782            self.temp_runs = next_generation;
783        }
784
785        let tmp_path = self.ix_dir.join("shard.ix.tmp");
786        let final_path = self.ix_dir.join("shard.ix");
787
788        let mut f = BufWriter::new(File::create(&tmp_path)?);
789        f.write_all(&[0u8; HEADER_SIZE])?;
790
791        let file_table_offset = Self::align_to_8(&mut f)?;
792        let mut files_reader = File::open(self.ix_dir.join("shard.ix.tmp.files"))?;
793        std::io::copy(&mut files_reader, &mut f)?;
794        let file_table_size = f.stream_position()? - file_table_offset;
795
796        Self::align_to_8(&mut f)?;
797        let posting_data_offset = f.stream_position()?;
798
799        let mut cdx_entries: Vec<(Trigram, u64, u32, u32)> = Vec::new();
800        let mut global_trigram_count = 0u32;
801
802        let mut runs = Vec::new();
803        for path in &self.temp_runs {
804            runs.push(RunIterator::new(path)?);
805        }
806
807        let mut heap = BinaryHeap::new();
808        let mut current_items = vec![None; runs.len()];
809
810        for (i, run) in runs.iter_mut().enumerate() {
811            if let Some(item) = run.next_trigram()? {
812                heap.push(MergeItem {
813                    tri: item.0,
814                    run_idx: i,
815                });
816                current_items[i] = Some(item);
817            }
818        }
819
820        let mut current_tri: Option<Trigram> = None;
821        let mut merged_entries: Vec<PostingEntry> = Vec::new();
822
823        while let Some(MergeItem { tri, run_idx }) = heap.pop() {
824            if Some(tri) != current_tri {
825                if let Some(t) = current_tri {
826                    let cdx_entry = Self::write_merged_posting(
827                        &mut f,
828                        t,
829                        posting_data_offset,
830                        &mut merged_entries,
831                    )?;
832                    cdx_entries.push(cdx_entry);
833                    global_trigram_count += 1;
834                    merged_entries.clear();
835                }
836                current_tri = Some(tri);
837            }
838
839            let item = current_items
840                .get_mut(run_idx)
841                .ok_or(Error::Config("run_idx out of bounds".into()))?
842                .take()
843                .ok_or(Error::Config("current item is None".into()))?;
844            merged_entries.extend(item.1);
845
846            if let Some(next_item) = runs
847                .get_mut(run_idx)
848                .ok_or(Error::Config("run_idx out of bounds".into()))?
849                .next_trigram()?
850            {
851                heap.push(MergeItem {
852                    tri: next_item.0,
853                    run_idx,
854                });
855                *current_items
856                    .get_mut(run_idx)
857                    .ok_or(Error::Config("run_idx out of bounds".into()))? = Some(next_item);
858            }
859        }
860
861        if let Some(t) = current_tri {
862            let cdx_entry =
863                Self::write_merged_posting(&mut f, t, posting_data_offset, &mut merged_entries)?;
864            cdx_entries.push(cdx_entry);
865            global_trigram_count += 1;
866        }
867
868        self.stats.unique_trigrams = u64::from(global_trigram_count);
869        let posting_data_size = f.stream_position()? - posting_data_offset;
870
871        Self::align_to_8(&mut f)?;
872        let trigram_table_offset = f.stream_position()?;
873
874        // CDX encode: delta + varint + ZSTD per block
875        let cdx_block_index = Self::write_cdx_blocks(&mut f, &cdx_entries)?;
876        let trigram_table_size = f.stream_position()? - trigram_table_offset;
877
878        Self::align_to_8(&mut f)?;
879        let cdx_block_index_offset = f.stream_position()?;
880        for entry in &cdx_block_index {
881            f.write_all(&entry.0.to_le_bytes())?;
882            f.write_all(&entry.1.to_le_bytes())?;
883        }
884        // Sentinel entry
885        f.write_all(&u32::MAX.to_le_bytes())?;
886        f.write_all(&(trigram_table_offset + trigram_table_size).to_le_bytes())?;
887        let cdx_block_index_size = f.stream_position()? - cdx_block_index_offset;
888
889        Self::align_to_8(&mut f)?;
890        let bloom_offset = f.stream_position()?;
891        let mut blooms_reader = File::open(self.ix_dir.join("shard.ix.tmp.blooms"))?;
892        std::io::copy(&mut blooms_reader, &mut f)?;
893        let bloom_size = f.stream_position()? - bloom_offset;
894
895        Self::align_to_8(&mut f)?;
896        let string_pool_offset = f.stream_position()?;
897        let mut strings_reader = File::open(self.ix_dir.join("shard.ix.tmp.strings"))?;
898        std::io::copy(&mut strings_reader, &mut f)?;
899        let string_pool_size = f.stream_position()? - string_pool_offset;
900
901        let name_index_offset = f.stream_position()?;
902        let name_index_size = 0u64;
903
904        let created_at = u64::try_from(
905            SystemTime::now()
906                .duration_since(UNIX_EPOCH)
907                .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
908                .as_micros(),
909        )
910        .unwrap_or(0);
911        let mut header_bytes = [0u8; HEADER_SIZE];
912        header_bytes[0..4].copy_from_slice(&MAGIC);
913        header_bytes[0x04..0x06].copy_from_slice(&VERSION_MAJOR.to_le_bytes());
914        header_bytes[0x06..0x08].copy_from_slice(&VERSION_MINOR.to_le_bytes());
915        header_bytes[0x08..0x10].copy_from_slice(
916            &(flags::HAS_BLOOM_FILTERS
917                | flags::HAS_CONTENT_HASHES
918                | flags::POSTING_LISTS_CHECKSUMMED
919                | flags::HAS_CDX_INDEX)
920                .to_le_bytes(),
921        );
922        header_bytes[0x10..0x18].copy_from_slice(&created_at.to_le_bytes());
923        header_bytes[0x18..0x20].copy_from_slice(&self.stats.bytes_scanned.to_le_bytes());
924        header_bytes[0x20..0x24].copy_from_slice(&self.file_count.to_le_bytes());
925        header_bytes[0x24..0x28].copy_from_slice(&(global_trigram_count).to_le_bytes());
926        header_bytes[0x28..0x30].copy_from_slice(&file_table_offset.to_le_bytes());
927        header_bytes[0x30..0x38].copy_from_slice(&file_table_size.to_le_bytes());
928        header_bytes[0x38..0x40].copy_from_slice(&trigram_table_offset.to_le_bytes());
929        header_bytes[0x40..0x48].copy_from_slice(&trigram_table_size.to_le_bytes());
930        header_bytes[0x48..0x50].copy_from_slice(&posting_data_offset.to_le_bytes());
931        header_bytes[0x50..0x58].copy_from_slice(&posting_data_size.to_le_bytes());
932        header_bytes[0x58..0x60].copy_from_slice(&bloom_offset.to_le_bytes());
933        header_bytes[0x60..0x68].copy_from_slice(&bloom_size.to_le_bytes());
934        header_bytes[0x68..0x70].copy_from_slice(&string_pool_offset.to_le_bytes());
935        header_bytes[0x70..0x78].copy_from_slice(&string_pool_size.to_le_bytes());
936        header_bytes[0x78..0x80].copy_from_slice(&name_index_offset.to_le_bytes());
937        header_bytes[0x80..0x88].copy_from_slice(&name_index_size.to_le_bytes());
938        header_bytes[0x88..0x90].copy_from_slice(&cdx_block_index_offset.to_le_bytes());
939        header_bytes[0x90..0x98].copy_from_slice(&cdx_block_index_size.to_le_bytes());
940
941        let crc = crc32c::crc32c(&header_bytes[0..0xF8]);
942        header_bytes[0xF8..0xFC].copy_from_slice(&crc.to_le_bytes());
943
944        f.seek(SeekFrom::Start(0))?;
945        f.write_all(&header_bytes)?;
946        f.flush()?;
947        drop(f);
948
949        // Atomic swap: backup old index, rename new to final
950        let backup_path = self.ix_dir.join("shard.ix.bak");
951        let old_index_exists = final_path.exists();
952        if old_index_exists {
953            // Remove old backup if exists
954            let _ = fs::remove_file(&backup_path);
955            // Backup current index
956            if let Err(e) = fs::rename(&final_path, &backup_path) {
957                tracing::warn!("Failed to backup existing index: {}", e);
958            }
959        }
960        fs::rename(&tmp_path, &final_path)?;
961        self.committed = true;
962        // Remove backup after successful rename
963        if old_index_exists {
964            let _ = fs::remove_file(&backup_path);
965        }
966
967        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.files"));
968        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.blooms"));
969        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.strings"));
970        for path in &self.temp_runs {
971            if let Err(e) = fs::remove_file(path) {
972                tracing::warn!("Failed to cleanup temp run {}: {}", path.display(), e);
973            }
974        }
975        self.temp_runs.clear();
976
977        Ok(final_path)
978    }
979
980    fn merge_to_run(run_paths: &[PathBuf], out_path: &Path) -> Result<()> {
981        let mut runs = Vec::new();
982        for path in run_paths {
983            runs.push(RunIterator::new(path)?);
984        }
985        let mut heap = BinaryHeap::new();
986        let mut current_items = vec![None; runs.len()];
987        for (i, run) in runs.iter_mut().enumerate() {
988            if let Some(item) = run.next_trigram()? {
989                heap.push(MergeItem {
990                    tri: item.0,
991                    run_idx: i,
992                });
993                current_items[i] = Some(item);
994            }
995        }
996        let mut out = BufWriter::new(File::create(out_path)?);
997        let mut current_tri: Option<Trigram> = None;
998        let mut merged_entries: Vec<PostingEntry> = Vec::new();
999        while let Some(MergeItem { tri, run_idx }) = heap.pop() {
1000            if Some(tri) != current_tri {
1001                if let Some(t) = current_tri {
1002                    Self::write_run_entry(&mut out, t, &mut merged_entries)?;
1003                    merged_entries.clear();
1004                }
1005                current_tri = Some(tri);
1006            }
1007            let item = current_items[run_idx].take().ok_or_else(|| {
1008                Error::Config(format!(
1009                    "merge invariant broken: no item at run index {run_idx}"
1010                ))
1011            })?;
1012            merged_entries.extend(item.1);
1013            if let Some(next_item) = runs[run_idx].next_trigram()? {
1014                heap.push(MergeItem {
1015                    tri: next_item.0,
1016                    run_idx,
1017                });
1018                current_items[run_idx] = Some(next_item);
1019            }
1020        }
1021        if let Some(t) = current_tri {
1022            Self::write_run_entry(&mut out, t, &mut merged_entries)?;
1023        }
1024        out.flush()?;
1025        Ok(())
1026    }
1027
1028    fn write_run_entry<W: Write>(
1029        w: &mut W,
1030        tri: Trigram,
1031        entries: &mut [PostingEntry],
1032    ) -> Result<()> {
1033        entries.sort_by_key(|e| e.file_id);
1034        w.write_all(&tri.to_le_bytes())?;
1035        w.write_all(&(entries.len() as u32).to_le_bytes())?;
1036        for entry in entries {
1037            w.write_all(&entry.file_id.to_le_bytes())?;
1038            w.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
1039            for off in &entry.offsets {
1040                w.write_all(&off.to_le_bytes())?;
1041            }
1042        }
1043        Ok(())
1044    }
1045
1046    fn write_merged_posting<W: Write + Seek>(
1047        f: &mut W,
1048        tri: Trigram,
1049        base_off: u64,
1050        entries: &mut [PostingEntry],
1051    ) -> Result<(Trigram, u64, u32, u32)> {
1052        entries.sort_by_key(|e| e.file_id);
1053        let count = entries.len() as u32;
1054        let list = PostingList {
1055            entries: entries.to_vec(),
1056        };
1057        let encoded = list.encode()?;
1058        let offset = f.stream_position()? - base_off;
1059        f.write_all(&encoded)?;
1060        let abs_off = base_off + offset;
1061        Ok((tri, abs_off, encoded.len() as u32, count))
1062    }
1063
1064    fn write_cdx_blocks<W: Write + Seek>(
1065        f: &mut W,
1066        cdx_entries: &[(Trigram, u64, u32, u32)],
1067    ) -> Result<Vec<(u32, u64)>> {
1068        let mut block_index = Vec::new();
1069        for chunk in cdx_entries.chunks(crate::format::CDX_BLOCK_SIZE) {
1070            let first_key = chunk[0].0;
1071            let block_offset = f.stream_position()?;
1072            block_index.push((first_key, block_offset));
1073
1074            let mut buf = Vec::new();
1075            varint::encode(u64::try_from(chunk.len()).unwrap_or(0), &mut buf);
1076            let mut last_key = 0u32;
1077            for &(tri, posting_offset, posting_length, doc_frequency) in chunk {
1078                varint::encode(u64::from(tri - last_key), &mut buf);
1079                last_key = tri;
1080                varint::encode(posting_offset, &mut buf);
1081                varint::encode(u64::from(posting_length), &mut buf);
1082                varint::encode(u64::from(doc_frequency), &mut buf);
1083            }
1084
1085            let compressed = zstd::encode_all(&buf[..], PostingList::ZSTD_COMPRESSION_LEVEL)
1086                .map_err(|e| Error::Config(format!("cdx zstd encode: {e}")))?;
1087            f.write_all(&compressed)?;
1088        }
1089        Ok(block_index)
1090    }
1091
1092    fn align_to_8<W: Write + Seek>(mut w: W) -> std::io::Result<u64> {
1093        let pos = w.stream_position()?;
1094        let padding = (8 - (pos % 8)) % 8;
1095        if padding > 0 {
1096            w.write_all(&vec![0u8; padding as usize])?;
1097        }
1098        w.stream_position()
1099    }
1100}
1101
1102impl Drop for Builder {
1103    fn drop(&mut self) {
1104        if !self.committed {
1105            self.cleanup_temp_files();
1106        }
1107    }
1108}