Skip to main content

ix/
builder.rs

1//! Index builder — the complete pipeline from files to .ix shard.
2//!
3//! Phase 1: Discovery (walk directory, respect .gitignore)
4//! Phase 2: Scan (mmap, check binary, extract trigrams, bloom filter)
5//! Phase 3: Serialize (write sections, compute CRCs, atomic rename)
6
7use crate::bloom::BloomFilter;
8use crate::decompress::maybe_decompress;
9use crate::error::{Error, Result};
10use crate::format::{is_binary, FileStatus, HEADER_SIZE, MAGIC, VERSION_MAJOR, VERSION_MINOR, flags};
11use crate::posting::{PostingEntry, PostingList};
12use crate::trigram::{Extractor, Trigram};
13use ignore::WalkBuilder;
14use libc;
15use llmosafe::ResourceGuard;
16use memmap2::Mmap;
17use std::collections::{BinaryHeap, HashMap};
18use std::fs::{self, File};
19use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
20use std::path::{Path, PathBuf};
21use std::time::{Instant, SystemTime, UNIX_EPOCH};
22
23/// Index builder — orchestrates the full pipeline from file discovery to shard
24/// serialization.
25pub struct Builder {
26    root: PathBuf,
27    ix_dir: PathBuf,
28    file_count: u32,
29
30    // O(1) memory streaming writers for temporary file table and blooms
31    files_writer: BufWriter<File>,
32    blooms_writer: BufWriter<File>,
33    strings_writer: BufWriter<File>,
34
35    // Postings batching for external sort
36    postings: HashMap<Trigram, Vec<PostingEntry>>,
37    postings_count: usize,
38    temp_runs: Vec<PathBuf>,
39
40    extractor: Extractor,
41    stats: BuildStats,
42    decompress: bool,
43    resource_guard: Option<ResourceGuard>,
44    dead_ends: Vec<PathBuf>,
45}
46
47/// Accumulated metrics collected during a build run.
48#[derive(Default, Debug)]
49pub struct BuildStats {
50    /// Number of files processed (text only, after filtering).
51    pub files_scanned: u64,
52    /// Number of files skipped because they were detected as binary.
53    pub files_skipped_binary: u64,
54    /// Number of files skipped because they exceeded the size limit.
55    pub files_skipped_size: u64,
56    /// Total number of raw bytes scanned across all files.
57    pub bytes_scanned: u64,
58    /// Number of distinct trigrams discovered across all files.
59    pub unique_trigrams: u64,
60}
61
62struct RunIterator {
63    file: BufReader<File>,
64}
65
66impl RunIterator {
67    fn new(path: &Path) -> Result<Self> {
68        let f = File::open(path)?;
69        Ok(Self {
70            file: BufReader::new(f),
71        })
72    }
73
74    fn next_trigram(&mut self) -> Result<Option<(Trigram, Vec<PostingEntry>)>> {
75        let mut tri_buf = [0u8; 4];
76        if let Err(e) = self.file.read_exact(&mut tri_buf) {
77            if e.kind() == std::io::ErrorKind::UnexpectedEof {
78                return Ok(None);
79            }
80            return Err(e.into());
81        }
82        let tri = u32::from_le_bytes(tri_buf);
83
84        let mut len_buf = [0u8; 4];
85        self.file.read_exact(&mut len_buf)?;
86        let entries_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
87
88        let mut entries = Vec::with_capacity(entries_len);
89        for _ in 0..entries_len {
90            self.file.read_exact(&mut len_buf)?;
91            let file_id = u32::from_le_bytes(len_buf);
92
93            self.file.read_exact(&mut len_buf)?;
94            let offsets_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
95
96            let mut offsets = Vec::with_capacity(offsets_len);
97            for _ in 0..offsets_len {
98                self.file.read_exact(&mut len_buf)?;
99                offsets.push(u32::from_le_bytes(len_buf));
100            }
101            entries.push(PostingEntry { file_id, offsets });
102        }
103
104        Ok(Some((tri, entries)))
105    }
106}
107
108#[derive(Eq, PartialEq)]
109struct MergeItem {
110    tri: Trigram,
111    run_idx: usize,
112}
113
114impl PartialOrd for MergeItem {
115    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
116        Some(self.cmp(other))
117    }
118}
119
120impl Ord for MergeItem {
121    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
122        other.tri.cmp(&self.tri) // Min-heap
123    }
124}
125
126#[allow(clippy::as_conversions)] // binary format: usize→u32/u16 for index encoding
127#[allow(clippy::indexing_slicing)] // binary format: fixed-size buffer ops
128impl Builder {
129    /// Creates a new builder for the given root directory.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the `.ix` directory cannot be created or temp files
134    /// cannot be opened.
135    pub fn new(root: &Path) -> Result<Self> {
136        let ix_dir = root.join(".ix");
137        fs::create_dir_all(&ix_dir)?;
138
139        let files_tmp = ix_dir.join("shard.ix.tmp.files");
140        let blooms_tmp = ix_dir.join("shard.ix.tmp.blooms");
141        let strings_tmp = ix_dir.join("shard.ix.tmp.strings");
142
143        let files_writer = BufWriter::new(File::create(&files_tmp)?);
144        let blooms_writer = BufWriter::new(File::create(&blooms_tmp)?);
145        let mut strings_writer = BufWriter::new(File::create(&strings_tmp)?);
146
147        strings_writer.write_all(&1u32.to_le_bytes())?;
148        strings_writer.write_all(&0u16.to_le_bytes())?;
149        strings_writer.write_all(&0u16.to_le_bytes())?;
150        strings_writer.write_all(&[0u8; 2])?;
151
152        Ok(Self {
153            root: root.to_owned(),
154            ix_dir,
155            file_count: 0,
156            files_writer,
157            blooms_writer,
158            strings_writer,
159            postings: HashMap::new(),
160            postings_count: 0,
161            temp_runs: Vec::new(),
162            extractor: Extractor::new(),
163            stats: BuildStats::default(),
164            decompress: false,
165            resource_guard: None,
166            dead_ends: Vec::new(),
167        })
168    }
169
170    /// Attach a [`ResourceGuard`] for memory-pressure-driven flush.
171    #[must_use]
172    pub const fn with_resource_guard(mut self, guard: ResourceGuard) -> Self {
173        self.resource_guard = Some(guard);
174        self
175    }
176
177    /// Enable or disable transparent decompression of archives during
178    /// scanning.
179    pub const fn set_decompress(&mut self, decompress: bool) {
180        self.decompress = decompress;
181    }
182
183    fn flush_run(&mut self) -> Result<()> {
184        if self.postings.is_empty() {
185            return Ok(());
186        }
187        let old_postings = std::mem::take(&mut self.postings);
188        let mut sorted: Vec<_> = old_postings.into_iter().collect();
189        sorted.sort_unstable_by_key(|(t, _)| *t);
190
191        let run_path = self
192            .ix_dir
193            .join(format!("shard.ix.run.{}", self.temp_runs.len()));
194        let mut f = BufWriter::new(File::create(&run_path)?);
195
196        for (tri, entries) in sorted {
197            f.write_all(&tri.to_le_bytes())?;
198            f.write_all(&u32::try_from(entries.len()).unwrap_or(0).to_le_bytes())?;
199            for entry in entries {
200                f.write_all(&entry.file_id.to_le_bytes())?;
201                f.write_all(&u32::try_from(entry.offsets.len()).unwrap_or(0).to_le_bytes())?;
202                for off in entry.offsets {
203                    f.write_all(&off.to_le_bytes())?;
204                }
205            }
206        }
207        f.flush()?;
208
209        self.temp_runs.push(run_path);
210        self.postings_count = 0;
211        Ok(())
212    }
213
214    /// Build or rebuild the index from scratch.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the build process fails (I/O, disk space, etc.).
219    pub fn build(&mut self) -> Result<PathBuf> {
220        // Cleanup old intermediate shard files before building
221        if self.ix_dir.exists()
222            && let Ok(entries) = std::fs::read_dir(&self.ix_dir)
223        {
224            for entry in entries.flatten() {
225                let name = entry.file_name();
226                let name_str = name.to_string_lossy();
227                if (name_str.starts_with("shard.ix.run.")
228                    || name_str.starts_with("shard.ix.merged."))
229                    && let Err(e) = std::fs::remove_file(entry.path())
230                {
231                    tracing::warn!("Failed to cleanup shard file {}: {}", name_str, e);
232                }
233            }
234        }
235
236        let start = Instant::now();
237        let root = self.root.clone();
238
239        // LLMOSafe Formal Law: Sensitive filesystem traversal (Root)
240        if root.to_string_lossy() == "/" {
241            tracing::warn!(
242                "LLMOSafe Advisory: Indexing root filesystem. Ensure adequate resource guards are in place."
243            );
244        }
245
246        let walker = WalkBuilder::new(&root)
247            .hidden(false)
248            .git_ignore(true)
249            .require_git(false)
250            .add_custom_ignore_filename(".ixignore")
251            .filter_entry(move |entry| {
252                let path = entry.path();
253                let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
254
255                if entry.file_type().is_some_and(|t| t.is_dir())
256                    && (name == "lost+found"
257                        || name == ".git"
258                        || name == "node_modules"
259                        || name == "target"
260                        || name == "__pycache__"
261                        || name == ".tox"
262                        || name == ".venv"
263                        || name == "venv"
264                        || name == ".ix")
265                {
266                    return false;
267                }
268
269                if entry.file_type().is_some_and(|t| t.is_file()) {
270                    if let Ok(metadata) = entry.metadata()
271                        && metadata.len() > 10 * 1024 * 1024
272                    {
273                        return false;
274                    }
275                    if name == "Cargo.lock"
276                        || name == "package-lock.json"
277                        || name == "pnpm-lock.yaml"
278                        || name == "shard.ix"
279                        || name == "shard.ix.tmp"
280                        || name.starts_with("shard.ix.")
281                    {
282                        return false;
283                    }
284                }
285
286                if entry.file_type().is_some_and(|t| t.is_file()) {
287                    let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
288                    match ext {
289                        "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" | "jpg" | "png"
290                        | "gif" | "mp4" | "mp3" | "pdf" | "zip" | "7z" | "rar" | "sqlite"
291                        | "db" | "bin" => return false,
292                        _ => {}
293                    }
294                    if name.ends_with(".tar.gz") {
295                        return false;
296                    }
297                }
298                true
299            })
300            .build();
301
302        let mut files_processed = 0u64;
303        for entry_res in walker {
304            let entry = match entry_res {
305                Ok(e) => e,
306                Err(e) => {
307                    // Handle KernelError::BacktrackSignaled (-7) during the walk
308                    let backtrack_path = match &e {
309                        ignore::Error::Io(io_err) if io_err.raw_os_error() == Some(-7) => {
310                            Some(None)
311                        }
312                        ignore::Error::WithPath { path, err } => {
313                            if let ignore::Error::Io(io_err) = err.as_ref() {
314                                if io_err.raw_os_error() == Some(-7) {
315                                    Some(Some(path.clone()))
316                                } else {
317                                    None
318                                }
319                            } else {
320                                None
321                            }
322                        }
323                        _ => None,
324                    };
325
326                    if let Some(path_opt) = backtrack_path {
327                        tracing::warn!(
328                            "Immune Memory Triggered: Skipping path due to backtrack signal."
329                        );
330                        if let Some(path) = path_opt {
331                            self.dead_ends.push(path);
332                        }
333                    }
334                    continue;
335                }
336            };
337
338            if entry.file_type().is_some_and(|t| t.is_file()) {
339                self.process_file(entry.path())?;
340                files_processed += 1;
341
342                // Resource Guard Check: check every 250 files to prevent OOM
343                // allow: manual_is_multiple_of — MSRV 1.85 compat; is_multiple_of not available
344                #[allow(clippy::manual_is_multiple_of)]
345                if files_processed % 250 == 0 {
346                    if let Some(guard) = &self.resource_guard {
347                        if let Err(_err) = guard.check() {
348                            eprintln!(
349                                "ixd: memory ceiling reached... flushing intermediate chunk ({files_processed} files processed)"
350                            );
351                            self.flush_run()?;
352                        }
353                    } else {
354                        // Fallback to manual RSS limit if no formal guard provided
355                        if let Ok(rss) = Self::current_rss_bytes()
356                            && rss > 512 * 1024 * 1024
357                        {
358                            eprintln!(
359                                "ixd: RSS ceiling reached ({} MB) after {} files — flushing intermediate chunk",
360                                rss / 1024 / 1024,
361                                files_processed
362                            );
363                            self.flush_run()?;
364                        }
365                    }
366                }
367            }
368        }
369
370        let output_path = self.serialize()?;
371        tracing::info!("Build completed in {:?}: {:?}", start.elapsed(), self.stats);
372        Ok(output_path)
373    }
374
375    /// Update the index for changed files (currently does a full rebuild).
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if the build process fails.
380    pub fn update(&mut self, _changed_files: &[PathBuf]) -> Result<PathBuf> {
381        self.build()
382    }
383
384    /// Number of files indexed so far (snapshot for progress reporting).
385    #[must_use]
386    pub const fn files_len(&self) -> usize {
387        self.file_count as usize
388    }
389
390    /// Number of unique trigrams accumulated so far.
391    #[must_use]
392    pub const fn trigrams_len(&self) -> usize {
393        self.stats.unique_trigrams as usize
394    }
395
396    /// Returns current process RSS in bytes by reading /proc/self/status.
397    fn current_rss_bytes() -> std::io::Result<u64> {
398        let status = std::fs::read_to_string("/proc/self/status")?;
399        for line in status.lines() {
400            if let Some(rest) = line.strip_prefix("VmRSS:") {
401                let kb: u64 = rest
402                    .split_whitespace()
403                    .next()
404                    .and_then(|s| s.parse().ok())
405                    .unwrap_or(0);
406                return Ok(kb * 1024);
407            }
408        }
409        Ok(0)
410    }
411
412    /// Returns free bytes available on the filesystem containing `path`.
413    fn free_bytes_at(path: &Path) -> std::io::Result<u64> {
414        use std::os::unix::ffi::OsStrExt;
415        let path_c = std::ffi::CString::new(path.as_os_str().as_bytes())
416            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
417        let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
418        let ret = unsafe { libc::statvfs(path_c.as_ptr(), &raw mut stat) };
419        if ret != 0 {
420            return Err(std::io::Error::last_os_error());
421        }
422        #[allow(clippy::unnecessary_cast)]
423        Ok(stat.f_bavail as u64 * stat.f_frsize as u64)
424    }
425
426    fn process_file(&mut self, path: &Path) -> Result<bool> {
427        // TOCTOU guard: file may have been deleted between walk and open
428        let metadata = match fs::metadata(path) {
429            Ok(m) => m,
430            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
431            Err(e) => return Err(e.into()),
432        };
433        let size = metadata.len();
434        let mtime = metadata
435            .modified()?
436            .duration_since(UNIX_EPOCH)
437            .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(0));
438
439        if size > 10 * 1024 * 1024 {
440            self.stats.files_skipped_size += 1;
441            return Ok(false);
442        }
443
444        let file = match File::open(path) {
445            Ok(f) => f,
446            Err(e)
447                if e.kind() == std::io::ErrorKind::NotFound
448                    || e.kind() == std::io::ErrorKind::PermissionDenied =>
449            {
450                return Ok(false);
451            }
452            Err(e) => return Err(e.into()),
453        };
454        let mmap = unsafe { Mmap::map(&file)? };
455
456        let raw_data = if self.decompress {
457            if let Some(mut reader) = maybe_decompress(path, &mmap)? {
458                let mut buf = Vec::new();
459                reader
460                    .by_ref()
461                    .take(10 * 1024 * 1024)
462                    .read_to_end(&mut buf)?;
463                std::borrow::Cow::Owned(buf)
464            } else {
465                std::borrow::Cow::Borrowed(&mmap[..])
466            }
467        } else {
468            std::borrow::Cow::Borrowed(&mmap[..])
469        };
470
471        let data = &raw_data[..];
472        if is_binary(data) {
473            self.stats.files_skipped_binary += 1;
474            return Ok(false);
475        }
476
477        let content_hash = xxhash_rust::xxh64::xxh64(data, 0);
478        let pairs = self.extractor.extract_with_offsets(data);
479
480        let file_id = self.file_count;
481        self.file_count += 1;
482
483        let path_str = path.to_string_lossy();
484        let path_bytes = path_str.as_bytes();
485        let path_off = u32::try_from(self.strings_writer.stream_position()?).unwrap_or(0);
486        let path_len = u16::try_from(path_bytes.len()).unwrap_or(0);
487
488        self.strings_writer.write_all(&0u16.to_le_bytes())?;
489        self.strings_writer.write_all(&path_len.to_le_bytes())?;
490        self.strings_writer.write_all(path_bytes)?;
491
492        let mut bloom = BloomFilter::new(256, 5);
493        let mut trigram_count = 0u32;
494
495        let mut i = 0;
496        while i < pairs.len() && trigram_count < 20_000 {
497            let tri = pairs[i].0;
498            let mut j = i + 1;
499            while j < pairs.len() && pairs[j].0 == tri {
500                j += 1;
501            }
502
503            let take_count = (j - i).min(10_000);
504            let offsets: Vec<u32> = pairs[i..i + take_count].iter().map(|p| p.1).collect();
505
506            bloom.insert(tri);
507            self.postings
508                .entry(tri)
509                .or_default()
510                .push(PostingEntry { file_id, offsets });
511            self.postings_count += take_count + 8;
512
513            trigram_count += 1;
514            i = j;
515        }
516
517        bloom.serialize(&mut self.blooms_writer)?;
518
519        let bloom_offset = file_id * 260;
520        self.files_writer.write_all(&file_id.to_le_bytes())?;
521        self.files_writer.write_all(&path_off.to_le_bytes())?;
522        self.files_writer.write_all(&path_len.to_le_bytes())?;
523        self.files_writer.write_all(&[FileStatus::Fresh as u8])?;
524        self.files_writer.write_all(&[0u8])?;
525        self.files_writer.write_all(&mtime.to_le_bytes())?;
526        self.files_writer.write_all(&size.to_le_bytes())?;
527        self.files_writer.write_all(&content_hash.to_le_bytes())?;
528        self.files_writer.write_all(&trigram_count.to_le_bytes())?;
529        self.files_writer.write_all(&bloom_offset.to_le_bytes())?;
530        self.files_writer.write_all(&[0u8; 4])?;
531
532        self.stats.files_scanned += 1;
533        self.stats.bytes_scanned += size;
534
535        // Flush every 500k entries (~8MB peak RAM) to prevent unbounded HashMap growth.
536        // This was the RAM DDOS root cause in v0.1.1 — threshold was 5M (far too high).
537        if self.postings_count >= 500_000 {
538            self.flush_run()?;
539        }
540
541        Ok(true)
542    }
543
544    fn serialize(&mut self) -> Result<PathBuf> {
545        // Disk space guard: abort if < 100MB free to avoid partial shard writes
546        if let Ok(free) = Self::free_bytes_at(&self.ix_dir) {
547            const MIN_FREE: u64 = 100 * 1024 * 1024; // 100 MB
548            if free < MIN_FREE {
549                return Err(crate::error::Error::Io(std::io::Error::other(format!(
550                    "insufficient disk space: {} MB free, need ≥100 MB (path: {})",
551                    free / 1024 / 1024,
552                    self.ix_dir.display()
553                ))));
554            }
555        }
556        self.flush_run()?;
557
558        self.files_writer.flush()?;
559        self.blooms_writer.flush()?;
560        self.strings_writer.flush()?;
561
562        // Hierarchical Merge to stay under ulimit
563        while self.temp_runs.len() > 128 {
564            let mut next_generation = Vec::new();
565            for chunk in self.temp_runs.chunks(128) {
566                let out_path = self.ix_dir.join(format!(
567                    "shard.ix.merged.{}.{}",
568                    next_generation.len(),
569                    SystemTime::now()
570                        .duration_since(UNIX_EPOCH)
571                        .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
572                        .as_micros()
573                ));
574                Self::merge_to_run(chunk, &out_path)?;
575                next_generation.push(out_path);
576                for p in chunk {
577                    if let Err(e) = fs::remove_file(p) {
578                        tracing::warn!("Failed to cleanup temp run {}: {}", p.display(), e);
579                    }
580                }
581            }
582            self.temp_runs = next_generation;
583        }
584
585        let tmp_path = self.ix_dir.join("shard.ix.tmp");
586        let final_path = self.ix_dir.join("shard.ix");
587        let temp_trigrams_path = self.ix_dir.join("shard.ix.tmp.trigrams");
588
589        let mut f = BufWriter::new(File::create(&tmp_path)?);
590        f.write_all(&[0u8; HEADER_SIZE])?;
591
592        let file_table_offset = Self::align_to_8(&mut f)?;
593        let mut files_reader = File::open(self.ix_dir.join("shard.ix.tmp.files"))?;
594        std::io::copy(&mut files_reader, &mut f)?;
595        let file_table_size = f.stream_position()? - file_table_offset;
596
597        Self::align_to_8(&mut f)?;
598        let posting_data_offset = f.stream_position()?;
599
600        let mut trigram_table_writer = BufWriter::new(File::create(&temp_trigrams_path)?);
601        let mut global_trigram_count = 0u32;
602
603        let mut runs = Vec::new();
604        for path in &self.temp_runs {
605            runs.push(RunIterator::new(path)?);
606        }
607
608        let mut heap = BinaryHeap::new();
609        let mut current_items = vec![None; runs.len()];
610
611        for (i, run) in runs.iter_mut().enumerate() {
612            if let Some(item) = run.next_trigram()? {
613                heap.push(MergeItem {
614                    tri: item.0,
615                    run_idx: i,
616                });
617                current_items[i] = Some(item);
618            }
619        }
620
621        let mut current_tri: Option<Trigram> = None;
622        let mut merged_entries: Vec<PostingEntry> = Vec::new();
623
624        while let Some(MergeItem { tri, run_idx }) = heap.pop() {
625            if Some(tri) != current_tri {
626                if let Some(t) = current_tri {
627                    Self::write_merged_posting(
628                        &mut f,
629                        &mut trigram_table_writer,
630                        t,
631                        posting_data_offset,
632                        &mut merged_entries,
633                    )?;
634                    global_trigram_count += 1;
635                    merged_entries.clear();
636                }
637                current_tri = Some(tri);
638            }
639
640            let item = current_items
641                .get_mut(run_idx)
642                .ok_or(Error::Config("run_idx out of bounds".into()))?
643                .take()
644                .ok_or(Error::Config("current item is None".into()))?;
645            merged_entries.extend(item.1);
646
647            if let Some(next_item) = runs
648                .get_mut(run_idx)
649                .ok_or(Error::Config("run_idx out of bounds".into()))?
650                .next_trigram()?
651            {
652                heap.push(MergeItem {
653                    tri: next_item.0,
654                    run_idx,
655                });
656                *current_items
657                    .get_mut(run_idx)
658                    .ok_or(Error::Config("run_idx out of bounds".into()))? =
659                    Some(next_item);
660            }
661        }
662
663        if let Some(t) = current_tri {
664            Self::write_merged_posting(
665                &mut f,
666                &mut trigram_table_writer,
667                t,
668                posting_data_offset,
669                &mut merged_entries,
670            )?;
671            global_trigram_count += 1;
672        }
673
674        self.stats.unique_trigrams = u64::from(global_trigram_count);
675        let posting_data_size = f.stream_position()? - posting_data_offset;
676
677        Self::align_to_8(&mut f)?;
678        let trigram_table_offset = f.stream_position()?;
679        trigram_table_writer.flush()?;
680        drop(trigram_table_writer);
681
682        let mut trigram_table_file = File::open(&temp_trigrams_path)?;
683        std::io::copy(&mut trigram_table_file, &mut f)?;
684        let trigram_table_size = f.stream_position()? - trigram_table_offset;
685
686        Self::align_to_8(&mut f)?;
687        let bloom_offset = f.stream_position()?;
688        let mut blooms_reader = File::open(self.ix_dir.join("shard.ix.tmp.blooms"))?;
689        std::io::copy(&mut blooms_reader, &mut f)?;
690        let bloom_size = f.stream_position()? - bloom_offset;
691
692        Self::align_to_8(&mut f)?;
693        let string_pool_offset = f.stream_position()?;
694        let mut strings_reader = File::open(self.ix_dir.join("shard.ix.tmp.strings"))?;
695        std::io::copy(&mut strings_reader, &mut f)?;
696        let string_pool_size = f.stream_position()? - string_pool_offset;
697
698        let name_index_offset = f.stream_position()?;
699        let name_index_size = 0u64;
700
701        let created_at = u64::try_from(
702            SystemTime::now()
703                .duration_since(UNIX_EPOCH)
704                .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
705                .as_micros(),
706        )
707        .unwrap_or(0);
708        let mut header_bytes = [0u8; HEADER_SIZE];
709        header_bytes[0..4].copy_from_slice(&MAGIC);
710        header_bytes[0x04..0x06].copy_from_slice(&VERSION_MAJOR.to_le_bytes());
711        header_bytes[0x06..0x08].copy_from_slice(&VERSION_MINOR.to_le_bytes());
712        header_bytes[0x08..0x10].copy_from_slice(
713            &(flags::HAS_BLOOM_FILTERS
714                | flags::HAS_CONTENT_HASHES
715                | flags::POSTING_LISTS_CHECKSUMMED)
716                .to_le_bytes(),
717        );
718        header_bytes[0x10..0x18].copy_from_slice(&created_at.to_le_bytes());
719        header_bytes[0x18..0x20].copy_from_slice(&self.stats.bytes_scanned.to_le_bytes());
720        header_bytes[0x20..0x24].copy_from_slice(&self.file_count.to_le_bytes());
721        header_bytes[0x24..0x28].copy_from_slice(&(global_trigram_count).to_le_bytes());
722        header_bytes[0x28..0x30].copy_from_slice(&file_table_offset.to_le_bytes());
723        header_bytes[0x30..0x38].copy_from_slice(&file_table_size.to_le_bytes());
724        header_bytes[0x38..0x40].copy_from_slice(&trigram_table_offset.to_le_bytes());
725        header_bytes[0x40..0x48].copy_from_slice(&trigram_table_size.to_le_bytes());
726        header_bytes[0x48..0x50].copy_from_slice(&posting_data_offset.to_le_bytes());
727        header_bytes[0x50..0x58].copy_from_slice(&posting_data_size.to_le_bytes());
728        header_bytes[0x58..0x60].copy_from_slice(&bloom_offset.to_le_bytes());
729        header_bytes[0x60..0x68].copy_from_slice(&bloom_size.to_le_bytes());
730        header_bytes[0x68..0x70].copy_from_slice(&string_pool_offset.to_le_bytes());
731        header_bytes[0x70..0x78].copy_from_slice(&string_pool_size.to_le_bytes());
732        header_bytes[0x78..0x80].copy_from_slice(&name_index_offset.to_le_bytes());
733        header_bytes[0x80..0x88].copy_from_slice(&name_index_size.to_le_bytes());
734
735        let crc = crc32c::crc32c(&header_bytes[0..0xF8]);
736        header_bytes[0xF8..0xFC].copy_from_slice(&crc.to_le_bytes());
737
738        f.seek(SeekFrom::Start(0))?;
739        f.write_all(&header_bytes)?;
740        f.flush()?;
741        drop(f);
742
743        // Atomic swap: backup old index, rename new to final
744        let backup_path = self.ix_dir.join("shard.ix.bak");
745        let old_index_exists = final_path.exists();
746        if old_index_exists {
747            // Remove old backup if exists
748            let _ = fs::remove_file(&backup_path);
749            // Backup current index
750            if let Err(e) = fs::rename(&final_path, &backup_path) {
751                tracing::warn!("Failed to backup existing index: {}", e);
752            }
753        }
754        fs::rename(&tmp_path, &final_path)?;
755        // Remove backup after successful rename
756        if old_index_exists {
757            let _ = fs::remove_file(&backup_path);
758        }
759
760        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.files"));
761        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.blooms"));
762        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.strings"));
763        let _ = fs::remove_file(&temp_trigrams_path);
764        for path in &self.temp_runs {
765            if let Err(e) = fs::remove_file(path) {
766                tracing::warn!("Failed to cleanup temp run {}: {}", path.display(), e);
767            }
768        }
769        self.temp_runs.clear();
770
771        Ok(final_path)
772    }
773
774    fn merge_to_run(run_paths: &[PathBuf], out_path: &Path) -> Result<()> {
775        let mut runs = Vec::new();
776        for path in run_paths {
777            runs.push(RunIterator::new(path)?);
778        }
779        let mut heap = BinaryHeap::new();
780        let mut current_items = vec![None; runs.len()];
781        for (i, run) in runs.iter_mut().enumerate() {
782            if let Some(item) = run.next_trigram()? {
783                heap.push(MergeItem {
784                    tri: item.0,
785                    run_idx: i,
786                });
787                current_items[i] = Some(item);
788            }
789        }
790        let mut out = BufWriter::new(File::create(out_path)?);
791        let mut current_tri: Option<Trigram> = None;
792        let mut merged_entries: Vec<PostingEntry> = Vec::new();
793        while let Some(MergeItem { tri, run_idx }) = heap.pop() {
794            if Some(tri) != current_tri {
795                if let Some(t) = current_tri {
796                    Self::write_run_entry(&mut out, t, &mut merged_entries)?;
797                    merged_entries.clear();
798                }
799                current_tri = Some(tri);
800            }
801            let item = current_items[run_idx].take().unwrap_or_else(|| {
802                unreachable!("run iterator invariant: item must exist at index {}", run_idx);
803            });
804            merged_entries.extend(item.1);
805            if let Some(next_item) = runs[run_idx].next_trigram()? {
806                heap.push(MergeItem {
807                    tri: next_item.0,
808                    run_idx,
809                });
810                current_items[run_idx] = Some(next_item);
811            }
812        }
813        if let Some(t) = current_tri {
814            Self::write_run_entry(&mut out, t, &mut merged_entries)?;
815        }
816        out.flush()?;
817        Ok(())
818    }
819
820    fn write_run_entry<W: Write>(
821        w: &mut W,
822        tri: Trigram,
823        entries: &mut [PostingEntry],
824    ) -> Result<()> {
825        entries.sort_by_key(|e| e.file_id);
826        w.write_all(&tri.to_le_bytes())?;
827        w.write_all(&(entries.len() as u32).to_le_bytes())?;
828        for entry in entries {
829            w.write_all(&entry.file_id.to_le_bytes())?;
830            w.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
831            for off in &entry.offsets {
832                w.write_all(&off.to_le_bytes())?;
833            }
834        }
835        Ok(())
836    }
837
838    fn write_merged_posting<W: Write + Seek>(
839        f: &mut W,
840        table: &mut W,
841        tri: Trigram,
842        base_off: u64,
843        entries: &mut [PostingEntry],
844    ) -> Result<()> {
845        entries.sort_by_key(|e| e.file_id);
846        let count = entries.len() as u32;
847        let list = PostingList {
848            entries: entries.to_vec(),
849        };
850        let encoded = list.encode();
851        let offset = f.stream_position()? - base_off;
852        f.write_all(&encoded)?;
853        let abs_off = base_off + offset;
854        table.write_all(&tri.to_le_bytes())?;
855        table.write_all(&abs_off.to_le_bytes()[..6])?;
856        table.write_all(&(encoded.len() as u32).to_le_bytes())?;
857        table.write_all(&count.to_le_bytes())?;
858        table.write_all(&[0u8; 2])?;
859        Ok(())
860    }
861
862    fn align_to_8<W: Write + Seek>(mut w: W) -> std::io::Result<u64> {
863        let pos = w.stream_position()?;
864        let padding = (8 - (pos % 8)) % 8;
865        if padding > 0 {
866            w.write_all(&vec![0u8; padding as usize])?;
867        }
868        w.stream_position()
869    }
870}