Skip to main content

ix/
builder.rs

1//! Index builder — the complete pipeline from files to .ix shard.
2//!
3//! Phase 1: Discovery (walk directory, respect .gitignore)
4//! Phase 2: Scan (mmap, check binary, extract trigrams, bloom filter)
5//! Phase 3: Serialize (write sections, compute CRCs, atomic rename)
6
7use crate::bloom::BloomFilter;
8use crate::decompress::maybe_decompress;
9use crate::error::{Error, Result};
10use crate::format::{
11    FileStatus, HEADER_SIZE, MAGIC, VERSION_MAJOR, VERSION_MINOR, flags, is_binary,
12};
13use crate::posting::{PostingEntry, PostingList};
14use crate::trigram::{Extractor, Trigram};
15use crate::varint;
16use ignore::WalkBuilder;
17use libc;
18use llmosafe::ResourceGuard;
19use memmap2::Mmap;
20use std::collections::{BinaryHeap, HashMap};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24use std::time::{Instant, SystemTime, UNIX_EPOCH};
25
26/// Index builder — orchestrates the full pipeline from file discovery to shard
27/// serialization.
28pub struct Builder {
29    root: PathBuf,
30    ix_dir: PathBuf,
31    file_count: u32,
32
33    // O(1) memory streaming writers for temporary file table and blooms
34    files_writer: BufWriter<File>,
35    blooms_writer: BufWriter<File>,
36    strings_writer: BufWriter<File>,
37
38    // Postings batching for external sort
39    postings: HashMap<Trigram, Vec<PostingEntry>>,
40    postings_count: usize,
41    temp_runs: Vec<PathBuf>,
42
43    extractor: Extractor,
44    stats: BuildStats,
45    decompress: bool,
46    resource_guard: Option<ResourceGuard>,
47    dead_ends: Vec<PathBuf>,
48    max_file_size: u64,
49}
50
51/// Accumulated metrics collected during a build run.
52#[derive(Default, Debug)]
53pub struct BuildStats {
54    /// Number of files processed (text only, after filtering).
55    pub files_scanned: u64,
56    /// Number of files skipped because they were detected as binary.
57    pub files_skipped_binary: u64,
58    /// Number of files skipped because they exceeded the size limit.
59    pub files_skipped_size: u64,
60    /// Total number of raw bytes scanned across all files.
61    pub bytes_scanned: u64,
62    /// Number of distinct trigrams discovered across all files.
63    pub unique_trigrams: u64,
64}
65
66struct RunIterator {
67    file: BufReader<File>,
68}
69
70impl RunIterator {
71    fn new(path: &Path) -> Result<Self> {
72        let f = File::open(path)?;
73        Ok(Self {
74            file: BufReader::new(f),
75        })
76    }
77
78    fn next_trigram(&mut self) -> Result<Option<(Trigram, Vec<PostingEntry>)>> {
79        let mut tri_buf = [0u8; 4];
80        if let Err(e) = self.file.read_exact(&mut tri_buf) {
81            if e.kind() == std::io::ErrorKind::UnexpectedEof {
82                return Ok(None);
83            }
84            return Err(e.into());
85        }
86        let tri = u32::from_le_bytes(tri_buf);
87
88        let mut len_buf = [0u8; 4];
89        self.file.read_exact(&mut len_buf)?;
90        let entries_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
91
92        let mut entries = Vec::with_capacity(entries_len);
93        for _ in 0..entries_len {
94            self.file.read_exact(&mut len_buf)?;
95            let file_id = u32::from_le_bytes(len_buf);
96
97            self.file.read_exact(&mut len_buf)?;
98            let offsets_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
99
100            let mut offsets = Vec::with_capacity(offsets_len);
101            for _ in 0..offsets_len {
102                self.file.read_exact(&mut len_buf)?;
103                offsets.push(u32::from_le_bytes(len_buf));
104            }
105            entries.push(PostingEntry { file_id, offsets });
106        }
107
108        Ok(Some((tri, entries)))
109    }
110}
111
112#[derive(Eq, PartialEq)]
113struct MergeItem {
114    tri: Trigram,
115    run_idx: usize,
116}
117
118impl PartialOrd for MergeItem {
119    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
120        Some(self.cmp(other))
121    }
122}
123
124impl Ord for MergeItem {
125    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
126        other.tri.cmp(&self.tri) // Min-heap
127    }
128}
129
130#[allow(clippy::as_conversions)] // binary format: usize→u32/u16 for index encoding
131#[allow(clippy::indexing_slicing)] // binary format: fixed-size buffer ops
132impl Builder {
133    /// Creates a new builder for the given root directory.
134    ///
135    /// # Errors
136    ///
137    /// Returns an error if the `.ix` directory cannot be created or temp files
138    /// cannot be opened.
139    pub fn new(root: &Path) -> Result<Self> {
140        let ix_dir = root.join(".ix");
141        fs::create_dir_all(&ix_dir)?;
142
143        let files_tmp = ix_dir.join("shard.ix.tmp.files");
144        let blooms_tmp = ix_dir.join("shard.ix.tmp.blooms");
145        let strings_tmp = ix_dir.join("shard.ix.tmp.strings");
146
147        let files_writer = BufWriter::new(File::create(&files_tmp)?);
148        let blooms_writer = BufWriter::new(File::create(&blooms_tmp)?);
149        let mut strings_writer = BufWriter::new(File::create(&strings_tmp)?);
150
151        strings_writer.write_all(&1u32.to_le_bytes())?;
152        strings_writer.write_all(&0u16.to_le_bytes())?;
153        strings_writer.write_all(&0u16.to_le_bytes())?;
154        strings_writer.write_all(&[0u8; 2])?;
155
156        Ok(Self {
157            root: root.to_owned(),
158            ix_dir,
159            file_count: 0,
160            files_writer,
161            blooms_writer,
162            strings_writer,
163            postings: HashMap::new(),
164            postings_count: 0,
165            temp_runs: Vec::new(),
166            extractor: Extractor::new(),
167            stats: BuildStats::default(),
168            decompress: false,
169            resource_guard: None,
170            dead_ends: Vec::new(),
171            max_file_size: 100 * 1024 * 1024,
172        })
173    }
174
175    /// Attach a [`ResourceGuard`] for memory-pressure-driven flush.
176    #[must_use]
177    pub const fn with_resource_guard(mut self, guard: ResourceGuard) -> Self {
178        self.resource_guard = Some(guard);
179        self
180    }
181
182    /// Enable or disable transparent decompression of archives during
183    /// scanning.
184    pub const fn set_decompress(&mut self, decompress: bool) {
185        self.decompress = decompress;
186    }
187
188    /// Set the maximum file size to index (0 = unlimited). Default: 100 MB.
189    pub const fn set_max_file_size(&mut self, max_bytes: u64) {
190        self.max_file_size = max_bytes;
191    }
192
193    fn flush_run(&mut self) -> Result<()> {
194        if self.postings.is_empty() {
195            return Ok(());
196        }
197        let old_postings = std::mem::take(&mut self.postings);
198        let mut sorted: Vec<_> = old_postings.into_iter().collect();
199        sorted.sort_unstable_by_key(|(t, _)| *t);
200
201        let run_path = self
202            .ix_dir
203            .join(format!("shard.ix.run.{}", self.temp_runs.len()));
204        let mut f = BufWriter::new(File::create(&run_path)?);
205
206        for (tri, entries) in sorted {
207            f.write_all(&tri.to_le_bytes())?;
208            f.write_all(&u32::try_from(entries.len()).unwrap_or(0).to_le_bytes())?;
209            for entry in entries {
210                f.write_all(&entry.file_id.to_le_bytes())?;
211                f.write_all(
212                    &u32::try_from(entry.offsets.len())
213                        .unwrap_or(0)
214                        .to_le_bytes(),
215                )?;
216                for off in entry.offsets {
217                    f.write_all(&off.to_le_bytes())?;
218                }
219            }
220        }
221        f.flush()?;
222
223        self.temp_runs.push(run_path);
224        self.postings_count = 0;
225        Ok(())
226    }
227
228    /// Build or rebuild the index from scratch.
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if the build process fails (I/O, disk space, etc.).
233    pub fn build(&mut self) -> Result<PathBuf> {
234        // Cleanup old intermediate shard files before building
235        if self.ix_dir.exists()
236            && let Ok(entries) = std::fs::read_dir(&self.ix_dir)
237        {
238            for entry in entries.flatten() {
239                let name = entry.file_name();
240                let name_str = name.to_string_lossy();
241                if (name_str.starts_with("shard.ix.run.")
242                    || name_str.starts_with("shard.ix.merged."))
243                    && let Err(e) = std::fs::remove_file(entry.path())
244                {
245                    tracing::warn!("Failed to cleanup shard file {}: {}", name_str, e);
246                }
247            }
248        }
249
250        let start = Instant::now();
251        let root = self.root.clone();
252
253        // LLMOSafe Formal Law: Sensitive filesystem traversal (Root)
254        if root.to_string_lossy() == "/" {
255            tracing::warn!(
256                "LLMOSafe Advisory: Indexing root filesystem. Ensure adequate resource guards are in place."
257            );
258        }
259
260        let walker = WalkBuilder::new(&root)
261            .hidden(false)
262            .git_ignore(true)
263            .git_global(true)
264            .git_exclude(true)
265            .require_git(false)
266            .add_custom_ignore_filename(".ixignore")
267            .filter_entry(move |entry| {
268                let path = entry.path();
269                let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
270
271                if entry.file_type().is_some_and(|t| t.is_dir())
272                    && (name == "lost+found" || name == ".git" || name == ".ix")
273                {
274                    return false;
275                }
276
277                if entry.file_type().is_some_and(|t| t.is_file())
278                    && (name == "shard.ix"
279                        || name == "shard.ix.tmp"
280                        || name.starts_with("shard.ix."))
281                {
282                    return false;
283                }
284
285                if entry.file_type().is_some_and(|t| t.is_file()) {
286                    let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
287                    match ext {
288                        "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" | "jpg" | "png"
289                        | "gif" | "mp4" | "mp3" | "pdf" | "zip" | "7z" | "rar" | "sqlite"
290                        | "db" | "bin" => return false,
291                        _ => {}
292                    }
293                    if name.ends_with(".tar.gz") {
294                        return false;
295                    }
296                }
297                true
298            })
299            .build();
300
301        let mut files_processed = 0u64;
302        for entry_res in walker {
303            let entry = match entry_res {
304                Ok(e) => e,
305                Err(e) => {
306                    // Handle KernelError::BacktrackSignaled (-7) during the walk
307                    let backtrack_path = match &e {
308                        ignore::Error::Io(io_err) if io_err.raw_os_error() == Some(-7) => {
309                            Some(None)
310                        }
311                        ignore::Error::WithPath { path, err } => {
312                            if let ignore::Error::Io(io_err) = err.as_ref() {
313                                if io_err.raw_os_error() == Some(-7) {
314                                    Some(Some(path.clone()))
315                                } else {
316                                    None
317                                }
318                            } else {
319                                None
320                            }
321                        }
322                        _ => None,
323                    };
324
325                    if let Some(path_opt) = backtrack_path {
326                        tracing::warn!(
327                            "Immune Memory Triggered: Skipping path due to backtrack signal."
328                        );
329                        if let Some(path) = path_opt {
330                            self.dead_ends.push(path);
331                        }
332                    }
333                    continue;
334                }
335            };
336
337            if entry.file_type().is_some_and(|t| t.is_file()) {
338                self.process_file(entry.path())?;
339                files_processed += 1;
340
341                // Resource Guard Check: check every 250 files to prevent OOM
342                // allow: manual_is_multiple_of — MSRV 1.85 compat; is_multiple_of not available
343                #[allow(clippy::manual_is_multiple_of)]
344                if files_processed % 250 == 0 {
345                    if let Some(guard) = &self.resource_guard {
346                        if let Err(_err) = guard.check() {
347                            eprintln!(
348                                "ixd: memory ceiling reached... flushing intermediate chunk ({files_processed} files processed)"
349                            );
350                            self.flush_run()?;
351                        }
352                    } else {
353                        // Fallback to manual RSS limit if no formal guard provided
354                        if let Ok(rss) = Self::current_rss_bytes()
355                            && rss > 512 * 1024 * 1024
356                        {
357                            eprintln!(
358                                "ixd: RSS ceiling reached ({} MB) after {} files — flushing intermediate chunk",
359                                rss / 1024 / 1024,
360                                files_processed
361                            );
362                            self.flush_run()?;
363                        }
364                    }
365                }
366            }
367        }
368
369        let output_path = self.serialize()?;
370        tracing::info!("Build completed in {:?}: {:?}", start.elapsed(), self.stats);
371        Ok(output_path)
372    }
373
374    /// Update the index for changed files (currently does a full rebuild).
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if the build process fails.
379    pub fn update(&mut self, _changed_files: &[PathBuf]) -> Result<PathBuf> {
380        self.build()
381    }
382
383    /// Number of files indexed so far (snapshot for progress reporting).
384    #[must_use]
385    pub const fn files_len(&self) -> usize {
386        self.file_count as usize
387    }
388
389    /// Number of unique trigrams accumulated so far.
390    #[must_use]
391    pub const fn trigrams_len(&self) -> usize {
392        self.stats.unique_trigrams as usize
393    }
394
395    /// Returns current process RSS in bytes by reading /proc/self/status.
396    fn current_rss_bytes() -> std::io::Result<u64> {
397        let status = std::fs::read_to_string("/proc/self/status")?;
398        for line in status.lines() {
399            if let Some(rest) = line.strip_prefix("VmRSS:") {
400                let kb: u64 = rest
401                    .split_whitespace()
402                    .next()
403                    .and_then(|s| s.parse().ok())
404                    .unwrap_or(0);
405                return Ok(kb * 1024);
406            }
407        }
408        Ok(0)
409    }
410
411    /// Returns free bytes available on the filesystem containing `path`.
412    fn free_bytes_at(path: &Path) -> std::io::Result<u64> {
413        use std::os::unix::ffi::OsStrExt;
414        let path_c = std::ffi::CString::new(path.as_os_str().as_bytes())
415            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
416        let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
417        let ret = unsafe { libc::statvfs(path_c.as_ptr(), &raw mut stat) };
418        if ret != 0 {
419            return Err(std::io::Error::last_os_error());
420        }
421        #[allow(clippy::unnecessary_cast)]
422        Ok(stat.f_bavail as u64 * stat.f_frsize as u64)
423    }
424
425    fn process_file(&mut self, path: &Path) -> Result<bool> {
426        let path_str = path.display().to_string();
427        // TOCTOU guard: file may have been deleted between walk and open
428        let metadata = match fs::metadata(path) {
429            Ok(m) => m,
430            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
431            Err(e) => return Err(e.into()),
432        };
433        let size = metadata.len();
434        let mtime = metadata
435            .modified()?
436            .duration_since(UNIX_EPOCH)
437            .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(0));
438
439        if self.max_file_size > 0 && size > self.max_file_size {
440            tracing::debug!("SKIP: file too large ({size} bytes): {path_str}");
441            self.stats.files_skipped_size += 1;
442            return Ok(false);
443        }
444
445        let file = match File::open(path) {
446            Ok(f) => f,
447            Err(e)
448                if e.kind() == std::io::ErrorKind::NotFound
449                    || e.kind() == std::io::ErrorKind::PermissionDenied =>
450            {
451                return Ok(false);
452            }
453            Err(e) => return Err(e.into()),
454        };
455        let mmap = unsafe { Mmap::map(&file)? };
456
457        let raw_data = if self.decompress {
458            if let Some(mut reader) = maybe_decompress(path, &mmap)? {
459                let mut buf = Vec::new();
460                let take_limit = if self.max_file_size > 0 {
461                    self.max_file_size
462                } else {
463                    100 * 1024 * 1024
464                };
465                reader.by_ref().take(take_limit).read_to_end(&mut buf)?;
466                std::borrow::Cow::Owned(buf)
467            } else {
468                std::borrow::Cow::Borrowed(&mmap[..])
469            }
470        } else {
471            std::borrow::Cow::Borrowed(&mmap[..])
472        };
473
474        let data = &raw_data[..];
475        if is_binary(data) {
476            tracing::debug!("SKIP: binary detection ({size} bytes): {path_str}");
477            self.stats.files_skipped_binary += 1;
478            return Ok(false);
479        }
480
481        let content_hash = xxhash_rust::xxh64::xxh64(data, 0);
482        let pairs = self.extractor.extract_with_offsets(data);
483
484        if pairs.is_empty() && !data.is_empty() {
485            tracing::debug!("SKIP: no trigrams extracted ({size} bytes): {path_str}");
486        }
487
488        let file_id = self.file_count;
489        self.file_count += 1;
490
491        let path_str = path.to_string_lossy();
492        let path_bytes = path_str.as_bytes();
493        let path_off = u32::try_from(self.strings_writer.stream_position()?).unwrap_or(0);
494        let path_len = u16::try_from(path_bytes.len()).unwrap_or(0);
495
496        self.strings_writer.write_all(&0u16.to_le_bytes())?;
497        self.strings_writer.write_all(&path_len.to_le_bytes())?;
498        self.strings_writer.write_all(path_bytes)?;
499
500        let mut bloom = BloomFilter::new(256, 5);
501        let mut trigram_count = 0u32;
502
503        let mut i = 0;
504        while i < pairs.len() {
505            let tri = pairs[i].0;
506            let mut j = i + 1;
507            while j < pairs.len() && pairs[j].0 == tri {
508                j += 1;
509            }
510
511            let take_count = (j - i).min(10_000);
512            let offsets: Vec<u32> = pairs[i..i + take_count].iter().map(|p| p.1).collect();
513
514            bloom.insert(tri);
515            self.postings
516                .entry(tri)
517                .or_default()
518                .push(PostingEntry { file_id, offsets });
519            self.postings_count += take_count + 8;
520
521            trigram_count += 1;
522            i = j;
523        }
524
525        bloom.serialize(&mut self.blooms_writer)?;
526
527        let bloom_offset = file_id * 260;
528        self.files_writer.write_all(&file_id.to_le_bytes())?;
529        self.files_writer.write_all(&path_off.to_le_bytes())?;
530        self.files_writer.write_all(&path_len.to_le_bytes())?;
531        self.files_writer.write_all(&[FileStatus::Fresh as u8])?;
532        self.files_writer.write_all(&[0u8])?;
533        self.files_writer.write_all(&mtime.to_le_bytes())?;
534        self.files_writer.write_all(&size.to_le_bytes())?;
535        self.files_writer.write_all(&content_hash.to_le_bytes())?;
536        self.files_writer.write_all(&trigram_count.to_le_bytes())?;
537        self.files_writer.write_all(&bloom_offset.to_le_bytes())?;
538        self.files_writer.write_all(&[0u8; 4])?;
539
540        self.stats.files_scanned += 1;
541        self.stats.bytes_scanned += size;
542
543        if std::env::var("IX_DEBUG_BUILD").is_ok() {
544            eprintln!(
545                "IX-INDEXED: file_id={file_id} unique_trigrams={trigram_count} size={size}: {path_str}"
546            );
547        }
548
549        // Flush every 500k entries (~8MB peak RAM) to prevent unbounded HashMap growth.
550        // This was the RAM DDOS root cause in v0.1.1 — threshold was 5M (far too high).
551        if self.postings_count >= 500_000 {
552            if std::env::var("IX_DEBUG_BUILD").is_ok() {
553                eprintln!(
554                    "IX-FLUSH: postings_count={} after file_id={file_id}",
555                    self.postings_count
556                );
557            }
558            self.flush_run()?;
559        }
560
561        Ok(true)
562    }
563
564    fn serialize(&mut self) -> Result<PathBuf> {
565        // Disk space guard: abort if < 100MB free to avoid partial shard writes
566        if let Ok(free) = Self::free_bytes_at(&self.ix_dir) {
567            const MIN_FREE: u64 = 100 * 1024 * 1024; // 100 MB
568            if free < MIN_FREE {
569                return Err(crate::error::Error::Io(std::io::Error::other(format!(
570                    "insufficient disk space: {} MB free, need ≥100 MB (path: {})",
571                    free / 1024 / 1024,
572                    self.ix_dir.display()
573                ))));
574            }
575        }
576        self.flush_run()?;
577
578        self.files_writer.flush()?;
579        self.blooms_writer.flush()?;
580        self.strings_writer.flush()?;
581
582        // Hierarchical Merge to stay under ulimit
583        while self.temp_runs.len() > 128 {
584            let mut next_generation = Vec::new();
585            for chunk in self.temp_runs.chunks(128) {
586                let out_path = self.ix_dir.join(format!(
587                    "shard.ix.merged.{}.{}",
588                    next_generation.len(),
589                    SystemTime::now()
590                        .duration_since(UNIX_EPOCH)
591                        .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
592                        .as_micros()
593                ));
594                Self::merge_to_run(chunk, &out_path)?;
595                next_generation.push(out_path);
596                for p in chunk {
597                    if let Err(e) = fs::remove_file(p) {
598                        tracing::warn!("Failed to cleanup temp run {}: {}", p.display(), e);
599                    }
600                }
601            }
602            self.temp_runs = next_generation;
603        }
604
605        let tmp_path = self.ix_dir.join("shard.ix.tmp");
606        let final_path = self.ix_dir.join("shard.ix");
607
608        let mut f = BufWriter::new(File::create(&tmp_path)?);
609        f.write_all(&[0u8; HEADER_SIZE])?;
610
611        let file_table_offset = Self::align_to_8(&mut f)?;
612        let mut files_reader = File::open(self.ix_dir.join("shard.ix.tmp.files"))?;
613        std::io::copy(&mut files_reader, &mut f)?;
614        let file_table_size = f.stream_position()? - file_table_offset;
615
616        Self::align_to_8(&mut f)?;
617        let posting_data_offset = f.stream_position()?;
618
619        let mut cdx_entries: Vec<(Trigram, u64, u32, u32)> = Vec::new();
620        let mut global_trigram_count = 0u32;
621
622        let mut runs = Vec::new();
623        for path in &self.temp_runs {
624            runs.push(RunIterator::new(path)?);
625        }
626
627        let mut heap = BinaryHeap::new();
628        let mut current_items = vec![None; runs.len()];
629
630        for (i, run) in runs.iter_mut().enumerate() {
631            if let Some(item) = run.next_trigram()? {
632                heap.push(MergeItem {
633                    tri: item.0,
634                    run_idx: i,
635                });
636                current_items[i] = Some(item);
637            }
638        }
639
640        let mut current_tri: Option<Trigram> = None;
641        let mut merged_entries: Vec<PostingEntry> = Vec::new();
642
643        while let Some(MergeItem { tri, run_idx }) = heap.pop() {
644            if Some(tri) != current_tri {
645                if let Some(t) = current_tri {
646                    let cdx_entry = Self::write_merged_posting(
647                        &mut f,
648                        t,
649                        posting_data_offset,
650                        &mut merged_entries,
651                    )?;
652                    cdx_entries.push(cdx_entry);
653                    global_trigram_count += 1;
654                    merged_entries.clear();
655                }
656                current_tri = Some(tri);
657            }
658
659            let item = current_items
660                .get_mut(run_idx)
661                .ok_or(Error::Config("run_idx out of bounds".into()))?
662                .take()
663                .ok_or(Error::Config("current item is None".into()))?;
664            merged_entries.extend(item.1);
665
666            if let Some(next_item) = runs
667                .get_mut(run_idx)
668                .ok_or(Error::Config("run_idx out of bounds".into()))?
669                .next_trigram()?
670            {
671                heap.push(MergeItem {
672                    tri: next_item.0,
673                    run_idx,
674                });
675                *current_items
676                    .get_mut(run_idx)
677                    .ok_or(Error::Config("run_idx out of bounds".into()))? = Some(next_item);
678            }
679        }
680
681        if let Some(t) = current_tri {
682            let cdx_entry =
683                Self::write_merged_posting(&mut f, t, posting_data_offset, &mut merged_entries)?;
684            cdx_entries.push(cdx_entry);
685            global_trigram_count += 1;
686        }
687
688        self.stats.unique_trigrams = u64::from(global_trigram_count);
689        let posting_data_size = f.stream_position()? - posting_data_offset;
690
691        Self::align_to_8(&mut f)?;
692        let trigram_table_offset = f.stream_position()?;
693
694        // CDX encode: delta + varint + ZSTD per block
695        let cdx_block_index = Self::write_cdx_blocks(&mut f, &cdx_entries)?;
696        let trigram_table_size = f.stream_position()? - trigram_table_offset;
697
698        Self::align_to_8(&mut f)?;
699        let cdx_block_index_offset = f.stream_position()?;
700        for entry in &cdx_block_index {
701            f.write_all(&entry.0.to_le_bytes())?;
702            f.write_all(&entry.1.to_le_bytes())?;
703        }
704        // Sentinel entry
705        f.write_all(&u32::MAX.to_le_bytes())?;
706        f.write_all(&(trigram_table_offset + trigram_table_size).to_le_bytes())?;
707        let cdx_block_index_size = f.stream_position()? - cdx_block_index_offset;
708
709        Self::align_to_8(&mut f)?;
710        let bloom_offset = f.stream_position()?;
711        let mut blooms_reader = File::open(self.ix_dir.join("shard.ix.tmp.blooms"))?;
712        std::io::copy(&mut blooms_reader, &mut f)?;
713        let bloom_size = f.stream_position()? - bloom_offset;
714
715        Self::align_to_8(&mut f)?;
716        let string_pool_offset = f.stream_position()?;
717        let mut strings_reader = File::open(self.ix_dir.join("shard.ix.tmp.strings"))?;
718        std::io::copy(&mut strings_reader, &mut f)?;
719        let string_pool_size = f.stream_position()? - string_pool_offset;
720
721        let name_index_offset = f.stream_position()?;
722        let name_index_size = 0u64;
723
724        let created_at = u64::try_from(
725            SystemTime::now()
726                .duration_since(UNIX_EPOCH)
727                .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
728                .as_micros(),
729        )
730        .unwrap_or(0);
731        let mut header_bytes = [0u8; HEADER_SIZE];
732        header_bytes[0..4].copy_from_slice(&MAGIC);
733        header_bytes[0x04..0x06].copy_from_slice(&VERSION_MAJOR.to_le_bytes());
734        header_bytes[0x06..0x08].copy_from_slice(&VERSION_MINOR.to_le_bytes());
735        header_bytes[0x08..0x10].copy_from_slice(
736            &(flags::HAS_BLOOM_FILTERS
737                | flags::HAS_CONTENT_HASHES
738                | flags::POSTING_LISTS_CHECKSUMMED
739                | flags::HAS_CDX_INDEX)
740                .to_le_bytes(),
741        );
742        header_bytes[0x10..0x18].copy_from_slice(&created_at.to_le_bytes());
743        header_bytes[0x18..0x20].copy_from_slice(&self.stats.bytes_scanned.to_le_bytes());
744        header_bytes[0x20..0x24].copy_from_slice(&self.file_count.to_le_bytes());
745        header_bytes[0x24..0x28].copy_from_slice(&(global_trigram_count).to_le_bytes());
746        header_bytes[0x28..0x30].copy_from_slice(&file_table_offset.to_le_bytes());
747        header_bytes[0x30..0x38].copy_from_slice(&file_table_size.to_le_bytes());
748        header_bytes[0x38..0x40].copy_from_slice(&trigram_table_offset.to_le_bytes());
749        header_bytes[0x40..0x48].copy_from_slice(&trigram_table_size.to_le_bytes());
750        header_bytes[0x48..0x50].copy_from_slice(&posting_data_offset.to_le_bytes());
751        header_bytes[0x50..0x58].copy_from_slice(&posting_data_size.to_le_bytes());
752        header_bytes[0x58..0x60].copy_from_slice(&bloom_offset.to_le_bytes());
753        header_bytes[0x60..0x68].copy_from_slice(&bloom_size.to_le_bytes());
754        header_bytes[0x68..0x70].copy_from_slice(&string_pool_offset.to_le_bytes());
755        header_bytes[0x70..0x78].copy_from_slice(&string_pool_size.to_le_bytes());
756        header_bytes[0x78..0x80].copy_from_slice(&name_index_offset.to_le_bytes());
757        header_bytes[0x80..0x88].copy_from_slice(&name_index_size.to_le_bytes());
758        header_bytes[0x88..0x90].copy_from_slice(&cdx_block_index_offset.to_le_bytes());
759        header_bytes[0x90..0x98].copy_from_slice(&cdx_block_index_size.to_le_bytes());
760
761        let crc = crc32c::crc32c(&header_bytes[0..0xF8]);
762        header_bytes[0xF8..0xFC].copy_from_slice(&crc.to_le_bytes());
763
764        f.seek(SeekFrom::Start(0))?;
765        f.write_all(&header_bytes)?;
766        f.flush()?;
767        drop(f);
768
769        // Atomic swap: backup old index, rename new to final
770        let backup_path = self.ix_dir.join("shard.ix.bak");
771        let old_index_exists = final_path.exists();
772        if old_index_exists {
773            // Remove old backup if exists
774            let _ = fs::remove_file(&backup_path);
775            // Backup current index
776            if let Err(e) = fs::rename(&final_path, &backup_path) {
777                tracing::warn!("Failed to backup existing index: {}", e);
778            }
779        }
780        fs::rename(&tmp_path, &final_path)?;
781        // Remove backup after successful rename
782        if old_index_exists {
783            let _ = fs::remove_file(&backup_path);
784        }
785
786        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.files"));
787        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.blooms"));
788        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.strings"));
789        for path in &self.temp_runs {
790            if let Err(e) = fs::remove_file(path) {
791                tracing::warn!("Failed to cleanup temp run {}: {}", path.display(), e);
792            }
793        }
794        self.temp_runs.clear();
795
796        Ok(final_path)
797    }
798
799    fn merge_to_run(run_paths: &[PathBuf], out_path: &Path) -> Result<()> {
800        let mut runs = Vec::new();
801        for path in run_paths {
802            runs.push(RunIterator::new(path)?);
803        }
804        let mut heap = BinaryHeap::new();
805        let mut current_items = vec![None; runs.len()];
806        for (i, run) in runs.iter_mut().enumerate() {
807            if let Some(item) = run.next_trigram()? {
808                heap.push(MergeItem {
809                    tri: item.0,
810                    run_idx: i,
811                });
812                current_items[i] = Some(item);
813            }
814        }
815        let mut out = BufWriter::new(File::create(out_path)?);
816        let mut current_tri: Option<Trigram> = None;
817        let mut merged_entries: Vec<PostingEntry> = Vec::new();
818        while let Some(MergeItem { tri, run_idx }) = heap.pop() {
819            if Some(tri) != current_tri {
820                if let Some(t) = current_tri {
821                    Self::write_run_entry(&mut out, t, &mut merged_entries)?;
822                    merged_entries.clear();
823                }
824                current_tri = Some(tri);
825            }
826        let item = current_items[run_idx]
827            .take()
828            .ok_or_else(|| Error::Config(format!("merge invariant broken: no item at run index {run_idx}")))?;
829            merged_entries.extend(item.1);
830            if let Some(next_item) = runs[run_idx].next_trigram()? {
831                heap.push(MergeItem {
832                    tri: next_item.0,
833                    run_idx,
834                });
835                current_items[run_idx] = Some(next_item);
836            }
837        }
838        if let Some(t) = current_tri {
839            Self::write_run_entry(&mut out, t, &mut merged_entries)?;
840        }
841        out.flush()?;
842        Ok(())
843    }
844
845    fn write_run_entry<W: Write>(
846        w: &mut W,
847        tri: Trigram,
848        entries: &mut [PostingEntry],
849    ) -> Result<()> {
850        entries.sort_by_key(|e| e.file_id);
851        w.write_all(&tri.to_le_bytes())?;
852        w.write_all(&(entries.len() as u32).to_le_bytes())?;
853        for entry in entries {
854            w.write_all(&entry.file_id.to_le_bytes())?;
855            w.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
856            for off in &entry.offsets {
857                w.write_all(&off.to_le_bytes())?;
858            }
859        }
860        Ok(())
861    }
862
863    fn write_merged_posting<W: Write + Seek>(
864        f: &mut W,
865        tri: Trigram,
866        base_off: u64,
867        entries: &mut [PostingEntry],
868    ) -> Result<(Trigram, u64, u32, u32)> {
869        entries.sort_by_key(|e| e.file_id);
870        let count = entries.len() as u32;
871        let list = PostingList {
872            entries: entries.to_vec(),
873        };
874        let encoded = list.encode()?;
875        let offset = f.stream_position()? - base_off;
876        f.write_all(&encoded)?;
877        let abs_off = base_off + offset;
878        Ok((tri, abs_off, encoded.len() as u32, count))
879    }
880
881    fn write_cdx_blocks<W: Write + Seek>(
882        f: &mut W,
883        cdx_entries: &[(Trigram, u64, u32, u32)],
884    ) -> Result<Vec<(u32, u64)>> {
885        let mut block_index = Vec::new();
886        for chunk in cdx_entries.chunks(crate::format::CDX_BLOCK_SIZE) {
887            let first_key = chunk[0].0;
888            let block_offset = f.stream_position()?;
889            block_index.push((first_key, block_offset));
890
891            let mut buf = Vec::new();
892            varint::encode(u64::try_from(chunk.len()).unwrap_or(0), &mut buf);
893            let mut last_key = 0u32;
894            for &(tri, posting_offset, posting_length, doc_frequency) in chunk {
895                varint::encode(u64::from(tri - last_key), &mut buf);
896                last_key = tri;
897                varint::encode(posting_offset, &mut buf);
898                varint::encode(u64::from(posting_length), &mut buf);
899                varint::encode(u64::from(doc_frequency), &mut buf);
900            }
901
902            let compressed = zstd::encode_all(&buf[..], PostingList::ZSTD_COMPRESSION_LEVEL)
903                .map_err(|e| Error::Config(format!("cdx zstd encode: {e}")))?;
904            f.write_all(&compressed)?;
905        }
906        Ok(block_index)
907    }
908
909    fn align_to_8<W: Write + Seek>(mut w: W) -> std::io::Result<u64> {
910        let pos = w.stream_position()?;
911        let padding = (8 - (pos % 8)) % 8;
912        if padding > 0 {
913            w.write_all(&vec![0u8; padding as usize])?;
914        }
915        w.stream_position()
916    }
917}