Skip to main content

ix/
builder.rs

1//! Index builder — the complete pipeline from files to .ix shard.
2//!
3//! Phase 1: Discovery (walk directory, respect .gitignore)
4//! Phase 2: Scan (mmap, check binary, extract trigrams, bloom filter)
5//! Phase 3: Serialize (write sections, compute CRCs, atomic rename)
6
7use crate::bloom::BloomFilter;
8use crate::decompress::maybe_decompress;
9use crate::error::{Error, Result};
10use crate::format::{
11    FileStatus, HEADER_SIZE, MAGIC, VERSION_MAJOR, VERSION_MINOR, flags, is_binary,
12};
13use crate::posting::{PostingEntry, PostingList};
14use crate::trigram::{Extractor, Trigram};
15use crate::varint;
16use ignore::WalkBuilder;
17use libc;
18use llmosafe::ResourceGuard;
19use memmap2::Mmap;
20use std::collections::{BinaryHeap, HashMap};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24use std::time::{Instant, SystemTime, UNIX_EPOCH};
25
26/// Index builder — orchestrates the full pipeline from file discovery to shard
27/// serialization.
28pub struct Builder {
29    root: PathBuf,
30    ix_dir: PathBuf,
31    file_count: u32,
32
33    // O(1) memory streaming writers for temporary file table and blooms
34    files_writer: BufWriter<File>,
35    blooms_writer: BufWriter<File>,
36    strings_writer: BufWriter<File>,
37
38    // Postings batching for external sort
39    postings: HashMap<Trigram, Vec<PostingEntry>>,
40    postings_count: usize,
41    temp_runs: Vec<PathBuf>,
42
43    extractor: Extractor,
44    stats: BuildStats,
45    decompress: bool,
46    resource_guard: Option<ResourceGuard>,
47    dead_ends: Vec<PathBuf>,
48    max_file_size: u64,
49    committed: bool,
50}
51
52/// Accumulated metrics collected during a build run.
53#[derive(Default, Debug)]
54pub struct BuildStats {
55    /// Number of files processed (text only, after filtering).
56    pub files_scanned: u64,
57    /// Number of files skipped because they were detected as binary.
58    pub files_skipped_binary: u64,
59    /// Number of files skipped because they exceeded the size limit.
60    pub files_skipped_size: u64,
61    /// Total number of raw bytes scanned across all files.
62    pub bytes_scanned: u64,
63    /// Number of distinct trigrams discovered across all files.
64    pub unique_trigrams: u64,
65}
66
67struct RunIterator {
68    file: BufReader<File>,
69}
70
71impl RunIterator {
72    fn new(path: &Path) -> Result<Self> {
73        let f = File::open(path)?;
74        Ok(Self {
75            file: BufReader::new(f),
76        })
77    }
78
79    fn next_trigram(&mut self) -> Result<Option<(Trigram, Vec<PostingEntry>)>> {
80        let mut tri_buf = [0u8; 4];
81        if let Err(e) = self.file.read_exact(&mut tri_buf) {
82            if e.kind() == std::io::ErrorKind::UnexpectedEof {
83                return Ok(None);
84            }
85            return Err(e.into());
86        }
87        let tri = u32::from_le_bytes(tri_buf);
88
89        let mut len_buf = [0u8; 4];
90        self.file.read_exact(&mut len_buf)?;
91        let entries_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
92
93        let mut entries = Vec::with_capacity(entries_len);
94        for _ in 0..entries_len {
95            self.file.read_exact(&mut len_buf)?;
96            let file_id = u32::from_le_bytes(len_buf);
97
98            self.file.read_exact(&mut len_buf)?;
99            let offsets_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
100
101            let mut offsets = Vec::with_capacity(offsets_len);
102            for _ in 0..offsets_len {
103                self.file.read_exact(&mut len_buf)?;
104                offsets.push(u32::from_le_bytes(len_buf));
105            }
106            entries.push(PostingEntry { file_id, offsets });
107        }
108
109        Ok(Some((tri, entries)))
110    }
111}
112
113#[derive(Eq, PartialEq)]
114struct MergeItem {
115    tri: Trigram,
116    run_idx: usize,
117}
118
119impl PartialOrd for MergeItem {
120    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
121        Some(self.cmp(other))
122    }
123}
124
125impl Ord for MergeItem {
126    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
127        other.tri.cmp(&self.tri) // Min-heap
128    }
129}
130
131#[allow(clippy::as_conversions)] // binary format: usize→u32/u16 for index encoding
132#[allow(clippy::indexing_slicing)] // binary format: fixed-size buffer ops
133impl Builder {
134    /// Creates a new builder for the given root directory.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if the `.ix` directory cannot be created or temp files
139    /// cannot be opened.
140    pub fn new(root: &Path) -> Result<Self> {
141        let ix_dir = root.join(".ix");
142        fs::create_dir_all(&ix_dir)?;
143
144        let files_tmp = ix_dir.join("shard.ix.tmp.files");
145        let blooms_tmp = ix_dir.join("shard.ix.tmp.blooms");
146        let strings_tmp = ix_dir.join("shard.ix.tmp.strings");
147
148        let files_writer = BufWriter::new(File::create(&files_tmp)?);
149        let blooms_writer = BufWriter::new(File::create(&blooms_tmp)?);
150        let mut strings_writer = BufWriter::new(File::create(&strings_tmp)?);
151
152        strings_writer.write_all(&1u32.to_le_bytes())?;
153        strings_writer.write_all(&0u16.to_le_bytes())?;
154        strings_writer.write_all(&0u16.to_le_bytes())?;
155        strings_writer.write_all(&[0u8; 2])?;
156
157        Ok(Self {
158            root: root.to_owned(),
159            ix_dir,
160            file_count: 0,
161            files_writer,
162            blooms_writer,
163            strings_writer,
164            postings: HashMap::new(),
165            postings_count: 0,
166            temp_runs: Vec::new(),
167            extractor: Extractor::new(),
168            stats: BuildStats::default(),
169            decompress: false,
170            resource_guard: None,
171            dead_ends: Vec::new(),
172            max_file_size: 100 * 1024 * 1024,
173            committed: false,
174        })
175    }
176
177    /// Attach a [`ResourceGuard`] for memory-pressure-driven flush.
178    #[must_use]
179    pub const fn with_resource_guard(mut self, guard: ResourceGuard) -> Self {
180        self.resource_guard = Some(guard);
181        self
182    }
183
184    /// Enable or disable transparent decompression of archives during
185    /// scanning.
186    pub const fn set_decompress(&mut self, decompress: bool) {
187        self.decompress = decompress;
188    }
189
190    /// Set the maximum file size to index (0 = unlimited). Default: 100 MB.
191    pub const fn set_max_file_size(&mut self, max_bytes: u64) {
192        self.max_file_size = max_bytes;
193    }
194
195    fn flush_run(&mut self) -> Result<()> {
196        if self.postings.is_empty() {
197            return Ok(());
198        }
199        let old_postings = std::mem::take(&mut self.postings);
200        let mut sorted: Vec<_> = old_postings.into_iter().collect();
201        sorted.sort_unstable_by_key(|(t, _)| *t);
202
203        let run_path = self
204            .ix_dir
205            .join(format!("shard.ix.run.{}", self.temp_runs.len()));
206        let mut f = BufWriter::new(File::create(&run_path)?);
207
208        for (tri, entries) in sorted {
209            f.write_all(&tri.to_le_bytes())?;
210            f.write_all(&u32::try_from(entries.len()).unwrap_or(0).to_le_bytes())?;
211            for entry in entries {
212                f.write_all(&entry.file_id.to_le_bytes())?;
213                f.write_all(
214                    &u32::try_from(entry.offsets.len())
215                        .unwrap_or(0)
216                        .to_le_bytes(),
217                )?;
218                for off in entry.offsets {
219                    f.write_all(&off.to_le_bytes())?;
220                }
221            }
222        }
223        f.flush()?;
224
225        self.temp_runs.push(run_path);
226        self.postings_count = 0;
227        Ok(())
228    }
229
230    /// Build or rebuild the index from scratch.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if the build process fails (I/O, disk space, etc.).
235    pub fn build(&mut self) -> Result<PathBuf> {
236        // Upfront disk space check: estimate required bytes before any I/O.
237        // Peak overhead is ~3× existing shard (old + tmp + bak + runs).
238        // For first builds, use a 200 MB floor.
239        if let Ok(free) = Self::free_bytes_at(&self.ix_dir) {
240            let existing_shard_size = fs::metadata(self.ix_dir.join("shard.ix"))
241                .map_or(0, |m| m.len());
242            let required = if existing_shard_size > 0 {
243                existing_shard_size.saturating_mul(3)
244            } else {
245                200 * 1024 * 1024 // 200 MB floor for first builds
246            };
247            if free < required {
248                return Err(Error::Io(std::io::Error::other(format!(
249                    "insufficient disk space: {} MB free, need ≥{} MB (path: {})",
250                    free / 1024 / 1024,
251                    required / 1024 / 1024,
252                    self.ix_dir.display(),
253                ))));
254            }
255        }
256
257        // Cleanup old intermediate shard files before building
258        if self.ix_dir.exists()
259            && let Ok(entries) = std::fs::read_dir(&self.ix_dir)
260        {
261            for entry in entries.flatten() {
262                let name = entry.file_name();
263                let name_str = name.to_string_lossy();
264                if (name_str.starts_with("shard.ix.run.")
265                    || name_str.starts_with("shard.ix.merged."))
266                    && let Err(e) = std::fs::remove_file(entry.path())
267                {
268                    tracing::warn!("Failed to cleanup shard file {}: {}", name_str, e);
269                }
270            }
271        }
272
273        let start = Instant::now();
274        let root = self.root.clone();
275
276        // LLMOSafe Formal Law: Sensitive filesystem traversal (Root)
277        if root.to_string_lossy() == "/" {
278            tracing::warn!(
279                "LLMOSafe Advisory: Indexing root filesystem. Ensure adequate resource guards are in place."
280            );
281        }
282
283        let walker = WalkBuilder::new(&root)
284            .hidden(false)
285            .git_ignore(true)
286            .git_global(true)
287            .git_exclude(true)
288            .require_git(false)
289            .add_custom_ignore_filename(".ixignore")
290            .filter_entry(move |entry| {
291                let path = entry.path();
292                let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
293
294                if entry.file_type().is_some_and(|t| t.is_dir())
295                    && (name == "lost+found" || name == ".git" || name == ".ix")
296                {
297                    return false;
298                }
299
300                if entry.file_type().is_some_and(|t| t.is_file())
301                    && (name == "shard.ix"
302                        || name == "shard.ix.tmp"
303                        || name.starts_with("shard.ix."))
304                {
305                    return false;
306                }
307
308                if entry.file_type().is_some_and(|t| t.is_file()) {
309                    let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
310                    match ext {
311                        "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" | "jpg" | "png"
312                        | "gif" | "mp4" | "mp3" | "pdf" | "zip" | "7z" | "rar" | "sqlite"
313                        | "db" | "bin" => return false,
314                        _ => {}
315                    }
316                    if name.ends_with(".tar.gz") {
317                        return false;
318                    }
319                }
320                true
321            })
322            .build();
323
324        let mut files_processed = 0u64;
325        for entry_res in walker {
326            let entry = match entry_res {
327                Ok(e) => e,
328                Err(e) => {
329                    // Handle KernelError::BacktrackSignaled (-7) during the walk
330                    let backtrack_path = match &e {
331                        ignore::Error::Io(io_err) if io_err.raw_os_error() == Some(-7) => {
332                            Some(None)
333                        }
334                        ignore::Error::WithPath { path, err } => {
335                            if let ignore::Error::Io(io_err) = err.as_ref() {
336                                if io_err.raw_os_error() == Some(-7) {
337                                    Some(Some(path.clone()))
338                                } else {
339                                    None
340                                }
341                            } else {
342                                None
343                            }
344                        }
345                        _ => None,
346                    };
347
348                    if let Some(path_opt) = backtrack_path {
349                        tracing::warn!(
350                            "Immune Memory Triggered: Skipping path due to backtrack signal."
351                        );
352                        if let Some(path) = path_opt {
353                            self.dead_ends.push(path);
354                        }
355                    }
356                    continue;
357                }
358            };
359
360            if entry.file_type().is_some_and(|t| t.is_file()) {
361                self.process_file(entry.path())?;
362                files_processed += 1;
363
364                // Resource Guard Check: check every 250 files to prevent OOM
365                // allow: manual_is_multiple_of — MSRV 1.85 compat; is_multiple_of not available
366                #[allow(clippy::manual_is_multiple_of)]
367                if files_processed % 250 == 0 {
368                    if let Some(guard) = &self.resource_guard {
369                        if let Err(_err) = guard.check() {
370                            eprintln!(
371                                "ixd: memory ceiling reached... flushing intermediate chunk ({files_processed} files processed)"
372                            );
373                            self.flush_run()?;
374                        }
375                    } else {
376                        // Fallback to manual RSS limit if no formal guard provided
377                        if let Ok(rss) = Self::current_rss_bytes()
378                            && rss > 512 * 1024 * 1024
379                        {
380                            eprintln!(
381                                "ixd: RSS ceiling reached ({} MB) after {} files — flushing intermediate chunk",
382                                rss / 1024 / 1024,
383                                files_processed
384                            );
385                            self.flush_run()?;
386                        }
387                    }
388                }
389            }
390        }
391
392        let output_path = self.serialize()?;
393        tracing::info!("Build completed in {:?}: {:?}", start.elapsed(), self.stats);
394        Ok(output_path)
395    }
396
397    /// Update the index for changed files (currently does a full rebuild).
398    ///
399    /// # Errors
400    ///
401    /// Returns an error if the build process fails.
402    pub fn update(&mut self, _changed_files: &[PathBuf]) -> Result<PathBuf> {
403        self.build()
404    }
405
406    /// Number of files indexed so far (snapshot for progress reporting).
407    #[must_use]
408    pub const fn files_len(&self) -> usize {
409        self.file_count as usize
410    }
411
412    /// Number of unique trigrams accumulated so far.
413    #[must_use]
414    pub const fn trigrams_len(&self) -> usize {
415        self.stats.unique_trigrams as usize
416    }
417
418    /// Returns current process RSS in bytes by reading /proc/self/status.
419    fn current_rss_bytes() -> std::io::Result<u64> {
420        let status = std::fs::read_to_string("/proc/self/status")?;
421        for line in status.lines() {
422            if let Some(rest) = line.strip_prefix("VmRSS:") {
423                let kb: u64 = rest
424                    .split_whitespace()
425                    .next()
426                    .and_then(|s| s.parse().ok())
427                    .unwrap_or(0);
428                return Ok(kb * 1024);
429            }
430        }
431        Ok(0)
432    }
433
434    /// Returns free bytes available on the filesystem containing `path`.
435    fn free_bytes_at(path: &Path) -> std::io::Result<u64> {
436        use std::os::unix::ffi::OsStrExt;
437        let path_c = std::ffi::CString::new(path.as_os_str().as_bytes())
438            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
439        let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
440        let ret = unsafe { libc::statvfs(path_c.as_ptr(), &raw mut stat) };
441        if ret != 0 {
442            return Err(std::io::Error::last_os_error());
443        }
444        #[allow(clippy::unnecessary_cast)]
445        Ok(stat.f_bavail as u64 * stat.f_frsize as u64)
446    }
447
448    fn cleanup_temp_files(&self) {
449        let paths = [
450            self.ix_dir.join("shard.ix.tmp.files"),
451            self.ix_dir.join("shard.ix.tmp.blooms"),
452            self.ix_dir.join("shard.ix.tmp.strings"),
453            self.ix_dir.join("shard.ix.tmp"),
454            self.ix_dir.join("shard.ix.bak"),
455        ];
456        for p in &paths {
457            if let Err(e) = fs::remove_file(p)
458                && e.kind() != std::io::ErrorKind::NotFound
459            {
460                tracing::warn!("Failed to cleanup temp file {}: {}", p.display(), e);
461            }
462        }
463        for run_path in &self.temp_runs {
464            if let Err(e) = fs::remove_file(run_path)
465                && e.kind() != std::io::ErrorKind::NotFound
466            {
467                tracing::warn!(
468                    "Failed to cleanup temp run {}: {}",
469                    run_path.display(),
470                    e
471                );
472            }
473        }
474    }
475
476    fn process_file(&mut self, path: &Path) -> Result<bool> {
477        let path_str = path.display().to_string();
478        // TOCTOU guard: file may have been deleted between walk and open
479        let metadata = match fs::metadata(path) {
480            Ok(m) => m,
481            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
482            Err(e) => return Err(e.into()),
483        };
484        let size = metadata.len();
485        let mtime = metadata
486            .modified()?
487            .duration_since(UNIX_EPOCH)
488            .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(0));
489
490        if self.max_file_size > 0 && size > self.max_file_size {
491            tracing::debug!("SKIP: file too large ({size} bytes): {path_str}");
492            self.stats.files_skipped_size += 1;
493            return Ok(false);
494        }
495
496        let file = match File::open(path) {
497            Ok(f) => f,
498            Err(e)
499                if e.kind() == std::io::ErrorKind::NotFound
500                    || e.kind() == std::io::ErrorKind::PermissionDenied =>
501            {
502                return Ok(false);
503            }
504            Err(e) => return Err(e.into()),
505        };
506        let mmap = unsafe { Mmap::map(&file)? };
507
508        let raw_data = if self.decompress {
509            if let Some(mut reader) = maybe_decompress(path, &mmap)? {
510                let mut buf = Vec::new();
511                let take_limit = if self.max_file_size > 0 {
512                    self.max_file_size
513                } else {
514                    100 * 1024 * 1024
515                };
516                reader.by_ref().take(take_limit).read_to_end(&mut buf)?;
517                std::borrow::Cow::Owned(buf)
518            } else {
519                std::borrow::Cow::Borrowed(&mmap[..])
520            }
521        } else {
522            std::borrow::Cow::Borrowed(&mmap[..])
523        };
524
525        let data = &raw_data[..];
526        if is_binary(data) {
527            tracing::debug!("SKIP: binary detection ({size} bytes): {path_str}");
528            self.stats.files_skipped_binary += 1;
529            return Ok(false);
530        }
531
532        let content_hash = xxhash_rust::xxh64::xxh64(data, 0);
533        let pairs = self.extractor.extract_with_offsets(data);
534
535        if pairs.is_empty() && !data.is_empty() {
536            tracing::debug!("SKIP: no trigrams extracted ({size} bytes): {path_str}");
537        }
538
539        let file_id = self.file_count;
540        self.file_count += 1;
541
542        let path_str = path.to_string_lossy();
543        let path_bytes = path_str.as_bytes();
544        let path_off = u32::try_from(self.strings_writer.stream_position()?).unwrap_or(0);
545        let path_len = u16::try_from(path_bytes.len()).unwrap_or(0);
546
547        self.strings_writer.write_all(&0u16.to_le_bytes())?;
548        self.strings_writer.write_all(&path_len.to_le_bytes())?;
549        self.strings_writer.write_all(path_bytes)?;
550
551        let mut bloom = BloomFilter::new(256, 5);
552        let mut trigram_count = 0u32;
553
554        let mut i = 0;
555        while i < pairs.len() {
556            let tri = pairs[i].0;
557            let mut j = i + 1;
558            while j < pairs.len() && pairs[j].0 == tri {
559                j += 1;
560            }
561
562            let take_count = (j - i).min(10_000);
563            let offsets: Vec<u32> = pairs[i..i + take_count].iter().map(|p| p.1).collect();
564
565            bloom.insert(tri);
566            self.postings
567                .entry(tri)
568                .or_default()
569                .push(PostingEntry { file_id, offsets });
570            self.postings_count += take_count + 8;
571
572            trigram_count += 1;
573            i = j;
574        }
575
576        bloom.serialize(&mut self.blooms_writer)?;
577
578        let bloom_offset = u64::from(file_id)
579            .checked_mul(260)
580            .ok_or_else(|| Error::Config("file_id * 260 overflow".into()))?;
581        let bloom_offset_u32 = u32::try_from(bloom_offset)
582            .map_err(|_| Error::Config(format!("bloom_offset {bloom_offset} exceeds u32::MAX")))?;
583        self.files_writer.write_all(&file_id.to_le_bytes())?;
584        self.files_writer.write_all(&path_off.to_le_bytes())?;
585        self.files_writer.write_all(&path_len.to_le_bytes())?;
586        self.files_writer.write_all(&[FileStatus::Fresh as u8])?;
587        self.files_writer.write_all(&[0u8])?;
588        self.files_writer.write_all(&mtime.to_le_bytes())?;
589        self.files_writer.write_all(&size.to_le_bytes())?;
590        self.files_writer.write_all(&content_hash.to_le_bytes())?;
591        self.files_writer.write_all(&trigram_count.to_le_bytes())?;
592        self.files_writer
593            .write_all(&bloom_offset_u32.to_le_bytes())?;
594        self.files_writer.write_all(&[0u8; 4])?;
595
596        self.stats.files_scanned += 1;
597        self.stats.bytes_scanned += size;
598
599        if std::env::var("IX_DEBUG_BUILD").is_ok() {
600            eprintln!(
601                "IX-INDEXED: file_id={file_id} unique_trigrams={trigram_count} size={size}: {path_str}"
602            );
603        }
604
605        // Flush every 500k entries (~8MB peak RAM) to prevent unbounded HashMap growth.
606        // This was the RAM DDOS root cause in v0.1.1 — threshold was 5M (far too high).
607        if self.postings_count >= 500_000 {
608            if std::env::var("IX_DEBUG_BUILD").is_ok() {
609                eprintln!(
610                    "IX-FLUSH: postings_count={} after file_id={file_id}",
611                    self.postings_count
612                );
613            }
614            self.flush_run()?;
615        }
616
617        Ok(true)
618    }
619
620    fn serialize(&mut self) -> Result<PathBuf> {
621        self.flush_run()?;
622
623        self.files_writer.flush()?;
624        self.blooms_writer.flush()?;
625        self.strings_writer.flush()?;
626
627        // Hierarchical Merge to stay under ulimit
628        while self.temp_runs.len() > 128 {
629            let mut next_generation = Vec::new();
630            for chunk in self.temp_runs.chunks(128) {
631                let out_path = self.ix_dir.join(format!(
632                    "shard.ix.merged.{}.{}",
633                    next_generation.len(),
634                    SystemTime::now()
635                        .duration_since(UNIX_EPOCH)
636                        .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
637                        .as_micros()
638                ));
639                Self::merge_to_run(chunk, &out_path)?;
640                next_generation.push(out_path);
641                for p in chunk {
642                    if let Err(e) = fs::remove_file(p) {
643                        tracing::warn!("Failed to cleanup temp run {}: {}", p.display(), e);
644                    }
645                }
646            }
647            self.temp_runs = next_generation;
648        }
649
650        let tmp_path = self.ix_dir.join("shard.ix.tmp");
651        let final_path = self.ix_dir.join("shard.ix");
652
653        let mut f = BufWriter::new(File::create(&tmp_path)?);
654        f.write_all(&[0u8; HEADER_SIZE])?;
655
656        let file_table_offset = Self::align_to_8(&mut f)?;
657        let mut files_reader = File::open(self.ix_dir.join("shard.ix.tmp.files"))?;
658        std::io::copy(&mut files_reader, &mut f)?;
659        let file_table_size = f.stream_position()? - file_table_offset;
660
661        Self::align_to_8(&mut f)?;
662        let posting_data_offset = f.stream_position()?;
663
664        let mut cdx_entries: Vec<(Trigram, u64, u32, u32)> = Vec::new();
665        let mut global_trigram_count = 0u32;
666
667        let mut runs = Vec::new();
668        for path in &self.temp_runs {
669            runs.push(RunIterator::new(path)?);
670        }
671
672        let mut heap = BinaryHeap::new();
673        let mut current_items = vec![None; runs.len()];
674
675        for (i, run) in runs.iter_mut().enumerate() {
676            if let Some(item) = run.next_trigram()? {
677                heap.push(MergeItem {
678                    tri: item.0,
679                    run_idx: i,
680                });
681                current_items[i] = Some(item);
682            }
683        }
684
685        let mut current_tri: Option<Trigram> = None;
686        let mut merged_entries: Vec<PostingEntry> = Vec::new();
687
688        while let Some(MergeItem { tri, run_idx }) = heap.pop() {
689            if Some(tri) != current_tri {
690                if let Some(t) = current_tri {
691                    let cdx_entry = Self::write_merged_posting(
692                        &mut f,
693                        t,
694                        posting_data_offset,
695                        &mut merged_entries,
696                    )?;
697                    cdx_entries.push(cdx_entry);
698                    global_trigram_count += 1;
699                    merged_entries.clear();
700                }
701                current_tri = Some(tri);
702            }
703
704            let item = current_items
705                .get_mut(run_idx)
706                .ok_or(Error::Config("run_idx out of bounds".into()))?
707                .take()
708                .ok_or(Error::Config("current item is None".into()))?;
709            merged_entries.extend(item.1);
710
711            if let Some(next_item) = runs
712                .get_mut(run_idx)
713                .ok_or(Error::Config("run_idx out of bounds".into()))?
714                .next_trigram()?
715            {
716                heap.push(MergeItem {
717                    tri: next_item.0,
718                    run_idx,
719                });
720                *current_items
721                    .get_mut(run_idx)
722                    .ok_or(Error::Config("run_idx out of bounds".into()))? = Some(next_item);
723            }
724        }
725
726        if let Some(t) = current_tri {
727            let cdx_entry =
728                Self::write_merged_posting(&mut f, t, posting_data_offset, &mut merged_entries)?;
729            cdx_entries.push(cdx_entry);
730            global_trigram_count += 1;
731        }
732
733        self.stats.unique_trigrams = u64::from(global_trigram_count);
734        let posting_data_size = f.stream_position()? - posting_data_offset;
735
736        Self::align_to_8(&mut f)?;
737        let trigram_table_offset = f.stream_position()?;
738
739        // CDX encode: delta + varint + ZSTD per block
740        let cdx_block_index = Self::write_cdx_blocks(&mut f, &cdx_entries)?;
741        let trigram_table_size = f.stream_position()? - trigram_table_offset;
742
743        Self::align_to_8(&mut f)?;
744        let cdx_block_index_offset = f.stream_position()?;
745        for entry in &cdx_block_index {
746            f.write_all(&entry.0.to_le_bytes())?;
747            f.write_all(&entry.1.to_le_bytes())?;
748        }
749        // Sentinel entry
750        f.write_all(&u32::MAX.to_le_bytes())?;
751        f.write_all(&(trigram_table_offset + trigram_table_size).to_le_bytes())?;
752        let cdx_block_index_size = f.stream_position()? - cdx_block_index_offset;
753
754        Self::align_to_8(&mut f)?;
755        let bloom_offset = f.stream_position()?;
756        let mut blooms_reader = File::open(self.ix_dir.join("shard.ix.tmp.blooms"))?;
757        std::io::copy(&mut blooms_reader, &mut f)?;
758        let bloom_size = f.stream_position()? - bloom_offset;
759
760        Self::align_to_8(&mut f)?;
761        let string_pool_offset = f.stream_position()?;
762        let mut strings_reader = File::open(self.ix_dir.join("shard.ix.tmp.strings"))?;
763        std::io::copy(&mut strings_reader, &mut f)?;
764        let string_pool_size = f.stream_position()? - string_pool_offset;
765
766        let name_index_offset = f.stream_position()?;
767        let name_index_size = 0u64;
768
769        let created_at = u64::try_from(
770            SystemTime::now()
771                .duration_since(UNIX_EPOCH)
772                .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
773                .as_micros(),
774        )
775        .unwrap_or(0);
776        let mut header_bytes = [0u8; HEADER_SIZE];
777        header_bytes[0..4].copy_from_slice(&MAGIC);
778        header_bytes[0x04..0x06].copy_from_slice(&VERSION_MAJOR.to_le_bytes());
779        header_bytes[0x06..0x08].copy_from_slice(&VERSION_MINOR.to_le_bytes());
780        header_bytes[0x08..0x10].copy_from_slice(
781            &(flags::HAS_BLOOM_FILTERS
782                | flags::HAS_CONTENT_HASHES
783                | flags::POSTING_LISTS_CHECKSUMMED
784                | flags::HAS_CDX_INDEX)
785                .to_le_bytes(),
786        );
787        header_bytes[0x10..0x18].copy_from_slice(&created_at.to_le_bytes());
788        header_bytes[0x18..0x20].copy_from_slice(&self.stats.bytes_scanned.to_le_bytes());
789        header_bytes[0x20..0x24].copy_from_slice(&self.file_count.to_le_bytes());
790        header_bytes[0x24..0x28].copy_from_slice(&(global_trigram_count).to_le_bytes());
791        header_bytes[0x28..0x30].copy_from_slice(&file_table_offset.to_le_bytes());
792        header_bytes[0x30..0x38].copy_from_slice(&file_table_size.to_le_bytes());
793        header_bytes[0x38..0x40].copy_from_slice(&trigram_table_offset.to_le_bytes());
794        header_bytes[0x40..0x48].copy_from_slice(&trigram_table_size.to_le_bytes());
795        header_bytes[0x48..0x50].copy_from_slice(&posting_data_offset.to_le_bytes());
796        header_bytes[0x50..0x58].copy_from_slice(&posting_data_size.to_le_bytes());
797        header_bytes[0x58..0x60].copy_from_slice(&bloom_offset.to_le_bytes());
798        header_bytes[0x60..0x68].copy_from_slice(&bloom_size.to_le_bytes());
799        header_bytes[0x68..0x70].copy_from_slice(&string_pool_offset.to_le_bytes());
800        header_bytes[0x70..0x78].copy_from_slice(&string_pool_size.to_le_bytes());
801        header_bytes[0x78..0x80].copy_from_slice(&name_index_offset.to_le_bytes());
802        header_bytes[0x80..0x88].copy_from_slice(&name_index_size.to_le_bytes());
803        header_bytes[0x88..0x90].copy_from_slice(&cdx_block_index_offset.to_le_bytes());
804        header_bytes[0x90..0x98].copy_from_slice(&cdx_block_index_size.to_le_bytes());
805
806        let crc = crc32c::crc32c(&header_bytes[0..0xF8]);
807        header_bytes[0xF8..0xFC].copy_from_slice(&crc.to_le_bytes());
808
809        f.seek(SeekFrom::Start(0))?;
810        f.write_all(&header_bytes)?;
811        f.flush()?;
812        drop(f);
813
814        // Atomic swap: backup old index, rename new to final
815        let backup_path = self.ix_dir.join("shard.ix.bak");
816        let old_index_exists = final_path.exists();
817        if old_index_exists {
818            // Remove old backup if exists
819            let _ = fs::remove_file(&backup_path);
820            // Backup current index
821            if let Err(e) = fs::rename(&final_path, &backup_path) {
822                tracing::warn!("Failed to backup existing index: {}", e);
823            }
824        }
825        fs::rename(&tmp_path, &final_path)?;
826        self.committed = true;
827        // Remove backup after successful rename
828        if old_index_exists {
829            let _ = fs::remove_file(&backup_path);
830        }
831
832        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.files"));
833        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.blooms"));
834        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.strings"));
835        for path in &self.temp_runs {
836            if let Err(e) = fs::remove_file(path) {
837                tracing::warn!("Failed to cleanup temp run {}: {}", path.display(), e);
838            }
839        }
840        self.temp_runs.clear();
841
842        Ok(final_path)
843    }
844
845    fn merge_to_run(run_paths: &[PathBuf], out_path: &Path) -> Result<()> {
846        let mut runs = Vec::new();
847        for path in run_paths {
848            runs.push(RunIterator::new(path)?);
849        }
850        let mut heap = BinaryHeap::new();
851        let mut current_items = vec![None; runs.len()];
852        for (i, run) in runs.iter_mut().enumerate() {
853            if let Some(item) = run.next_trigram()? {
854                heap.push(MergeItem {
855                    tri: item.0,
856                    run_idx: i,
857                });
858                current_items[i] = Some(item);
859            }
860        }
861        let mut out = BufWriter::new(File::create(out_path)?);
862        let mut current_tri: Option<Trigram> = None;
863        let mut merged_entries: Vec<PostingEntry> = Vec::new();
864        while let Some(MergeItem { tri, run_idx }) = heap.pop() {
865            if Some(tri) != current_tri {
866                if let Some(t) = current_tri {
867                    Self::write_run_entry(&mut out, t, &mut merged_entries)?;
868                    merged_entries.clear();
869                }
870                current_tri = Some(tri);
871            }
872            let item = current_items[run_idx].take().ok_or_else(|| {
873                Error::Config(format!(
874                    "merge invariant broken: no item at run index {run_idx}"
875                ))
876            })?;
877            merged_entries.extend(item.1);
878            if let Some(next_item) = runs[run_idx].next_trigram()? {
879                heap.push(MergeItem {
880                    tri: next_item.0,
881                    run_idx,
882                });
883                current_items[run_idx] = Some(next_item);
884            }
885        }
886        if let Some(t) = current_tri {
887            Self::write_run_entry(&mut out, t, &mut merged_entries)?;
888        }
889        out.flush()?;
890        Ok(())
891    }
892
893    fn write_run_entry<W: Write>(
894        w: &mut W,
895        tri: Trigram,
896        entries: &mut [PostingEntry],
897    ) -> Result<()> {
898        entries.sort_by_key(|e| e.file_id);
899        w.write_all(&tri.to_le_bytes())?;
900        w.write_all(&(entries.len() as u32).to_le_bytes())?;
901        for entry in entries {
902            w.write_all(&entry.file_id.to_le_bytes())?;
903            w.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
904            for off in &entry.offsets {
905                w.write_all(&off.to_le_bytes())?;
906            }
907        }
908        Ok(())
909    }
910
911    fn write_merged_posting<W: Write + Seek>(
912        f: &mut W,
913        tri: Trigram,
914        base_off: u64,
915        entries: &mut [PostingEntry],
916    ) -> Result<(Trigram, u64, u32, u32)> {
917        entries.sort_by_key(|e| e.file_id);
918        let count = entries.len() as u32;
919        let list = PostingList {
920            entries: entries.to_vec(),
921        };
922        let encoded = list.encode()?;
923        let offset = f.stream_position()? - base_off;
924        f.write_all(&encoded)?;
925        let abs_off = base_off + offset;
926        Ok((tri, abs_off, encoded.len() as u32, count))
927    }
928
929    fn write_cdx_blocks<W: Write + Seek>(
930        f: &mut W,
931        cdx_entries: &[(Trigram, u64, u32, u32)],
932    ) -> Result<Vec<(u32, u64)>> {
933        let mut block_index = Vec::new();
934        for chunk in cdx_entries.chunks(crate::format::CDX_BLOCK_SIZE) {
935            let first_key = chunk[0].0;
936            let block_offset = f.stream_position()?;
937            block_index.push((first_key, block_offset));
938
939            let mut buf = Vec::new();
940            varint::encode(u64::try_from(chunk.len()).unwrap_or(0), &mut buf);
941            let mut last_key = 0u32;
942            for &(tri, posting_offset, posting_length, doc_frequency) in chunk {
943                varint::encode(u64::from(tri - last_key), &mut buf);
944                last_key = tri;
945                varint::encode(posting_offset, &mut buf);
946                varint::encode(u64::from(posting_length), &mut buf);
947                varint::encode(u64::from(doc_frequency), &mut buf);
948            }
949
950            let compressed = zstd::encode_all(&buf[..], PostingList::ZSTD_COMPRESSION_LEVEL)
951                .map_err(|e| Error::Config(format!("cdx zstd encode: {e}")))?;
952            f.write_all(&compressed)?;
953        }
954        Ok(block_index)
955    }
956
957    fn align_to_8<W: Write + Seek>(mut w: W) -> std::io::Result<u64> {
958        let pos = w.stream_position()?;
959        let padding = (8 - (pos % 8)) % 8;
960        if padding > 0 {
961            w.write_all(&vec![0u8; padding as usize])?;
962        }
963        w.stream_position()
964    }
965}
966
967impl Drop for Builder {
968    fn drop(&mut self) {
969        if !self.committed {
970            self.cleanup_temp_files();
971        }
972    }
973}