Skip to main content

ix/
builder.rs

1//! Index builder — the complete pipeline from files to .ix shard.
2//!
3//! Phase 1: Discovery (walk directory, respect .gitignore)
4//! Phase 2: Scan (mmap, check binary, extract trigrams, bloom filter)
5//! Phase 3: Serialize (write sections, compute CRCs, atomic rename)
6
7use crate::bloom::BloomFilter;
8use crate::decompress::maybe_decompress;
9use crate::error::Result;
10use crate::format::*;
11use crate::posting::{PostingEntry, PostingList};
12use crate::trigram::{Extractor, Trigram};
13use ignore::WalkBuilder;
14use libc;
15use llmosafe::{ResourceGuard, WorkingMemory, sift_perceptions};
16use memmap2::Mmap;
17use std::collections::{BinaryHeap, HashMap};
18use std::fs::{self, File};
19use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
20use std::path::{Path, PathBuf};
21use std::time::{Instant, SystemTime, UNIX_EPOCH};
22
23pub struct Builder {
24    root: PathBuf,
25    ix_dir: PathBuf,
26    file_count: u32,
27
28    // O(1) memory streaming writers for temporary file table and blooms
29    files_writer: BufWriter<File>,
30    blooms_writer: BufWriter<File>,
31    strings_writer: BufWriter<File>,
32
33    // Postings batching for external sort
34    postings: HashMap<Trigram, Vec<PostingEntry>>,
35    postings_count: usize,
36    temp_runs: Vec<PathBuf>,
37
38    extractor: Extractor,
39    stats: BuildStats,
40    decompress: bool,
41    resource_guard: Option<ResourceGuard>,
42    cognitive_memory: WorkingMemory<128>,
43    dead_ends: Vec<PathBuf>,
44}
45
46#[derive(Default, Debug)]
47pub struct BuildStats {
48    pub files_scanned: u64,
49    pub files_skipped_binary: u64,
50    pub files_skipped_size: u64,
51    pub bytes_scanned: u64,
52    pub unique_trigrams: u64,
53}
54
55struct RunIterator {
56    file: BufReader<File>,
57}
58
59impl RunIterator {
60    fn new(path: &Path) -> Result<Self> {
61        let f = File::open(path)?;
62        Ok(Self {
63            file: BufReader::new(f),
64        })
65    }
66
67    fn next_trigram(&mut self) -> Result<Option<(Trigram, Vec<PostingEntry>)>> {
68        let mut tri_buf = [0u8; 4];
69        if let Err(e) = self.file.read_exact(&mut tri_buf) {
70            if e.kind() == std::io::ErrorKind::UnexpectedEof {
71                return Ok(None);
72            }
73            return Err(e.into());
74        }
75        let tri = u32::from_le_bytes(tri_buf);
76
77        let mut len_buf = [0u8; 4];
78        self.file.read_exact(&mut len_buf)?;
79        let entries_len = u32::from_le_bytes(len_buf) as usize;
80
81        let mut entries = Vec::with_capacity(entries_len);
82        for _ in 0..entries_len {
83            self.file.read_exact(&mut len_buf)?;
84            let file_id = u32::from_le_bytes(len_buf);
85
86            self.file.read_exact(&mut len_buf)?;
87            let offsets_len = u32::from_le_bytes(len_buf) as usize;
88
89            let mut offsets = Vec::with_capacity(offsets_len);
90            for _ in 0..offsets_len {
91                self.file.read_exact(&mut len_buf)?;
92                offsets.push(u32::from_le_bytes(len_buf));
93            }
94            entries.push(PostingEntry { file_id, offsets });
95        }
96
97        Ok(Some((tri, entries)))
98    }
99}
100
101#[derive(Eq, PartialEq)]
102struct MergeItem {
103    tri: Trigram,
104    run_idx: usize,
105}
106
107impl PartialOrd for MergeItem {
108    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
109        Some(self.cmp(other))
110    }
111}
112
113impl Ord for MergeItem {
114    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
115        other.tri.cmp(&self.tri) // Min-heap
116    }
117}
118
119impl Builder {
120    pub fn new(root: &Path) -> Result<Self> {
121        let ix_dir = root.join(".ix");
122        fs::create_dir_all(&ix_dir)?;
123
124        let files_tmp = ix_dir.join("shard.ix.tmp.files");
125        let blooms_tmp = ix_dir.join("shard.ix.tmp.blooms");
126        let strings_tmp = ix_dir.join("shard.ix.tmp.strings");
127
128        let files_writer = BufWriter::new(File::create(&files_tmp)?);
129        let blooms_writer = BufWriter::new(File::create(&blooms_tmp)?);
130        let mut strings_writer = BufWriter::new(File::create(&strings_tmp)?);
131
132        strings_writer.write_all(&1u32.to_le_bytes())?;
133        strings_writer.write_all(&0u16.to_le_bytes())?;
134        strings_writer.write_all(&0u16.to_le_bytes())?;
135        strings_writer.write_all(&[0u8; 2])?;
136
137        Ok(Self {
138            root: root.to_owned(),
139            ix_dir,
140            file_count: 0,
141            files_writer,
142            blooms_writer,
143            strings_writer,
144            postings: HashMap::new(),
145            postings_count: 0,
146            temp_runs: Vec::new(),
147            extractor: Extractor::new(),
148            stats: BuildStats::default(),
149            decompress: false,
150            resource_guard: None,
151            cognitive_memory: WorkingMemory::new(1000), // Standard surprise threshold
152            dead_ends: Vec::new(),
153        })
154    }
155
156    pub fn with_resource_guard(mut self, guard: ResourceGuard) -> Self {
157        self.resource_guard = Some(guard);
158        self
159    }
160
161    pub fn set_decompress(&mut self, decompress: bool) {
162        self.decompress = decompress;
163    }
164
165    fn flush_run(&mut self) -> Result<()> {
166        if self.postings.is_empty() {
167            return Ok(());
168        }
169        let old_postings = std::mem::take(&mut self.postings);
170        let mut sorted: Vec<_> = old_postings.into_iter().collect();
171        sorted.sort_unstable_by_key(|(t, _)| *t);
172
173        let run_path = self
174            .ix_dir
175            .join(format!("shard.ix.run.{}", self.temp_runs.len()));
176        let mut f = BufWriter::new(File::create(&run_path)?);
177
178        for (tri, entries) in sorted {
179            f.write_all(&tri.to_le_bytes())?;
180            f.write_all(&(entries.len() as u32).to_le_bytes())?;
181            for entry in entries {
182                f.write_all(&entry.file_id.to_le_bytes())?;
183                f.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
184                for off in entry.offsets {
185                    f.write_all(&off.to_le_bytes())?;
186                }
187            }
188        }
189        f.flush()?;
190
191        self.temp_runs.push(run_path);
192        self.postings_count = 0;
193        Ok(())
194    }
195
196pub fn build(&mut self) -> Result<PathBuf> {
197    // Cleanup old intermediate shard files before building
198    if self.ix_dir.exists() {
199        if let Ok(entries) = std::fs::read_dir(&self.ix_dir) {
200            for entry in entries.flatten() {
201                let name = entry.file_name();
202                let name_str = name.to_string_lossy();
203                if name_str.starts_with("shard.ix.run.") || name_str.starts_with("shard.ix.merged.") {
204                    let _ = std::fs::remove_file(entry.path());
205                }
206            }
207        }
208    }
209
210    let start = Instant::now();
211    let root = self.root.clone();
212
213        // LLMOSafe Formal Law: Sensitive filesystem traversal (Root)
214        if root.to_string_lossy() == "/" {
215            tracing::warn!(
216                "LLMOSafe Advisory: Indexing root filesystem. Ensure adequate resource guards are in place."
217            );
218        }
219
220        let walker = WalkBuilder::new(&root)
221            .hidden(false)
222            .git_ignore(true)
223            .require_git(false)
224            .add_custom_ignore_filename(".ixignore")
225            .filter_entry(move |entry| {
226                let path = entry.path();
227                let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
228
229                if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
230                    && (name == "lost+found"
231                        || name == ".git"
232                        || name == "node_modules"
233                        || name == "target"
234                        || name == "__pycache__"
235                        || name == ".tox"
236                        || name == ".venv"
237                        || name == "venv"
238                        || name == ".ix")
239                {
240                    return false;
241                }
242
243                if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
244                    if let Ok(metadata) = entry.metadata()
245                        && metadata.len() > 10 * 1024 * 1024
246                    {
247                        return false;
248                    }
249                    if name == "Cargo.lock"
250                        || name == "package-lock.json"
251                        || name == "pnpm-lock.yaml"
252                        || name == "shard.ix"
253                        || name == "shard.ix.tmp"
254                        || name.starts_with("shard.ix.")
255                    {
256                        return false;
257                    }
258                }
259
260                if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
261                    let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
262                    match ext {
263                        "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" | "jpg" | "png"
264                        | "gif" | "mp4" | "mp3" | "pdf" | "zip" | "7z" | "rar" | "sqlite"
265                        | "db" | "bin" => return false,
266                        _ => {}
267                    }
268                    if name.ends_with(".tar.gz") {
269                        return false;
270                    }
271                }
272                true
273            })
274            .build();
275
276        let mut files_processed = 0u64;
277        for entry_res in walker {
278            let entry = match entry_res {
279                Ok(e) => e,
280                Err(e) => {
281                    // Handle KernelError::BacktrackSignaled (-7) during the walk
282                    let backtrack_path = match &e {
283                        ignore::Error::Io(io_err) if io_err.raw_os_error() == Some(-7) => {
284                            Some(None)
285                        }
286                        ignore::Error::WithPath { path, err } => {
287                            if let ignore::Error::Io(io_err) = err.as_ref() {
288                                if io_err.raw_os_error() == Some(-7) {
289                                    Some(Some(path.clone()))
290                                } else {
291                                    None
292                                }
293                            } else {
294                                None
295                            }
296                        }
297                        _ => None,
298                    };
299
300                    if let Some(path_opt) = backtrack_path {
301                        tracing::warn!(
302                            "Immune Memory Triggered: Skipping path due to backtrack signal."
303                        );
304                        if let Some(path) = path_opt {
305                            self.dead_ends.push(path);
306                        }
307                    }
308                    continue;
309                }
310            };
311
312            if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
313                self.process_file(entry.path().to_owned())?;
314                files_processed += 1;
315
316                // Resource Guard Check: check every 250 files to prevent OOM
317                if files_processed.is_multiple_of(250) {
318                    if let Some(guard) = &self.resource_guard {
319                        if guard.check().map(|_s: ::llmosafe::Synapse| ()).is_err() {
320                            let _err = guard.check().unwrap_err();
321                            eprintln!(
322                                "ixd: memory ceiling reached... flushing intermediate chunk ({} files processed)",
323                                files_processed
324                            );
325                            self.flush_run()?;
326                            continue;
327                        }
328                    } else {
329                        // Fallback to manual RSS limit if no formal guard provided
330                        if let Ok(rss) = Self::current_rss_bytes()
331                            && rss > 512 * 1024 * 1024
332                        {
333                            eprintln!(
334                                "ixd: RSS ceiling reached ({} MB) after {} files — flushing intermediate chunk",
335                                rss / 1024 / 1024,
336                                files_processed
337                            );
338                            self.flush_run()?;
339                            continue;
340                        }
341                    }
342                }
343            }
344        }
345
346        let output_path = self.serialize()?;
347        tracing::info!("Build completed in {:?}: {:?}", start.elapsed(), self.stats);
348        Ok(output_path)
349    }
350
351    pub fn update(&mut self, _changed_files: &[PathBuf]) -> Result<PathBuf> {
352        self.build()
353    }
354
355    pub fn files_len(&self) -> usize {
356        self.file_count as usize
357    }
358
359    pub fn trigrams_len(&self) -> usize {
360        self.stats.unique_trigrams as usize
361    }
362
363    /// Returns current process RSS in bytes by reading /proc/self/status.
364    fn current_rss_bytes() -> std::io::Result<u64> {
365        let status = std::fs::read_to_string("/proc/self/status")?;
366        for line in status.lines() {
367            if let Some(rest) = line.strip_prefix("VmRSS:") {
368                let kb: u64 = rest
369                    .split_whitespace()
370                    .next()
371                    .and_then(|s| s.parse().ok())
372                    .unwrap_or(0);
373                return Ok(kb * 1024);
374            }
375        }
376        Ok(0)
377    }
378
379    /// Returns free bytes available on the filesystem containing `path`.
380    fn free_bytes_at(path: &Path) -> std::io::Result<u64> {
381        use std::os::unix::ffi::OsStrExt;
382        let path_c = std::ffi::CString::new(path.as_os_str().as_bytes())
383            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
384        let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
385        let ret = unsafe { libc::statvfs(path_c.as_ptr(), &mut stat) };
386        if ret != 0 {
387            return Err(std::io::Error::last_os_error());
388        }
389        Ok(stat.f_bavail * stat.f_frsize)
390    }
391
392    fn process_file(&mut self, path: PathBuf) -> Result<bool> {
393        // TOCTOU guard: file may have been deleted between walk and open
394        let metadata = match fs::metadata(&path) {
395            Ok(m) => m,
396            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
397            Err(e) => return Err(e.into()),
398        };
399        let size = metadata.len();
400        let mtime = metadata
401            .modified()?
402            .duration_since(UNIX_EPOCH)
403            .map(|d| d.as_nanos() as u64)
404            .unwrap_or(0);
405
406        if size > 10 * 1024 * 1024 {
407            self.stats.files_skipped_size += 1;
408            return Ok(false);
409        }
410
411        let file = match File::open(&path) {
412            Ok(f) => f,
413            Err(e)
414                if e.kind() == std::io::ErrorKind::NotFound
415                    || e.kind() == std::io::ErrorKind::PermissionDenied =>
416            {
417                return Ok(false);
418            }
419            Err(e) => return Err(e.into()),
420        };
421        let mmap = unsafe { Mmap::map(&file)? };
422
423        let raw_data = if self.decompress {
424            if let Some(mut reader) = maybe_decompress(&path, &mmap)? {
425                let mut buf = Vec::new();
426                use std::io::Read;
427                reader
428                    .by_ref()
429                    .take(10 * 1024 * 1024)
430                    .read_to_end(&mut buf)?;
431                std::borrow::Cow::Owned(buf)
432            } else {
433                std::borrow::Cow::Borrowed(&mmap[..])
434            }
435        } else {
436            std::borrow::Cow::Borrowed(&mmap[..])
437        };
438
439        let data = &raw_data[..];
440        if is_binary(data) {
441            self.stats.files_skipped_binary += 1;
442            return Ok(false);
443        }
444
445        // LLMOSafe Tier 3: Perceptual Sifting (Cognitive Layer)
446        // Evaluate file utility and bias (Halo signal)
447        let sample_len = data.len().min(2048);
448        let sample = String::from_utf8_lossy(&data[..sample_len]);
449        let objective = "High-signal source code for semantic indexing";
450        let sifted = sift_perceptions(&[sample.as_ref()], objective);
451
452        if let Err(e) = self.cognitive_memory.update(sifted) {
453            tracing::warn!(
454                "LLMOSafe Cognitive Guard rejection for {}: {:?}",
455                path.display(),
456                e
457            );
458            // Skip files that don't pass the safety/utility check (e.g., high bias/halo or high surprise)
459            return Ok(false);
460        }
461
462        let content_hash = xxhash_rust::xxh64::xxh64(data, 0);
463        let pairs = self.extractor.extract_with_offsets(data);
464
465        let file_id = self.file_count;
466        self.file_count += 1;
467
468        let path_str = path.to_string_lossy();
469        let path_bytes = path_str.as_bytes();
470        let path_off = (self.strings_writer.stream_position()?) as u32;
471        let path_len = path_bytes.len() as u16;
472
473        self.strings_writer.write_all(&0u16.to_le_bytes())?;
474        self.strings_writer.write_all(&path_len.to_le_bytes())?;
475        self.strings_writer.write_all(path_bytes)?;
476
477        let mut bloom = BloomFilter::new(256, 5);
478        let mut trigram_count = 0u32;
479
480        let mut i = 0;
481        while i < pairs.len() && trigram_count < 20_000 {
482            let tri = pairs[i].0;
483            let mut j = i + 1;
484            while j < pairs.len() && pairs[j].0 == tri {
485                j += 1;
486            }
487
488            let take_count = (j - i).min(10_000);
489            let offsets: Vec<u32> = pairs[i..i + take_count].iter().map(|p| p.1).collect();
490
491            bloom.insert(tri);
492            self.postings
493                .entry(tri)
494                .or_default()
495                .push(PostingEntry { file_id, offsets });
496            self.postings_count += take_count + 8;
497
498            trigram_count += 1;
499            i = j;
500        }
501
502        bloom.serialize(&mut self.blooms_writer)?;
503
504        let bloom_offset = file_id * 260;
505        self.files_writer.write_all(&file_id.to_le_bytes())?;
506        self.files_writer.write_all(&path_off.to_le_bytes())?;
507        self.files_writer.write_all(&path_len.to_le_bytes())?;
508        self.files_writer.write_all(&[FileStatus::Fresh as u8])?;
509        self.files_writer.write_all(&[0u8])?;
510        self.files_writer.write_all(&mtime.to_le_bytes())?;
511        self.files_writer.write_all(&size.to_le_bytes())?;
512        self.files_writer.write_all(&content_hash.to_le_bytes())?;
513        self.files_writer.write_all(&trigram_count.to_le_bytes())?;
514        self.files_writer.write_all(&bloom_offset.to_le_bytes())?;
515        self.files_writer.write_all(&[0u8; 4])?;
516
517        self.stats.files_scanned += 1;
518        self.stats.bytes_scanned += size;
519
520        // Flush every 500k entries (~8MB peak RAM) to prevent unbounded HashMap growth.
521        // This was the RAM DDOS root cause in v0.1.1 — threshold was 5M (far too high).
522        if self.postings_count >= 500_000 {
523            self.flush_run()?;
524        }
525
526        Ok(true)
527    }
528
529    fn serialize(&mut self) -> Result<PathBuf> {
530        // Disk space guard: abort if < 100MB free to avoid partial shard writes
531        if let Ok(free) = Self::free_bytes_at(&self.ix_dir) {
532            const MIN_FREE: u64 = 100 * 1024 * 1024; // 100 MB
533            if free < MIN_FREE {
534                return Err(crate::error::Error::Io(std::io::Error::other(format!(
535                    "insufficient disk space: {} MB free, need ≥100 MB (path: {})",
536                    free / 1024 / 1024,
537                    self.ix_dir.display()
538                ))));
539            }
540        }
541        self.flush_run()?;
542
543        self.files_writer.flush()?;
544        self.blooms_writer.flush()?;
545        self.strings_writer.flush()?;
546
547        // Hierarchical Merge to stay under ulimit
548        while self.temp_runs.len() > 128 {
549            let mut next_generation = Vec::new();
550            for chunk in self.temp_runs.chunks(128) {
551                let out_path = self.ix_dir.join(format!(
552                    "shard.ix.merged.{}.{}",
553                    next_generation.len(),
554                    SystemTime::now()
555                        .duration_since(UNIX_EPOCH)
556                        .unwrap()
557                        .as_micros()
558                ));
559                self.merge_to_run(chunk, &out_path)?;
560                next_generation.push(out_path);
561                for p in chunk {
562                    let _ = fs::remove_file(p);
563                }
564            }
565            self.temp_runs = next_generation;
566        }
567
568        let tmp_path = self.ix_dir.join("shard.ix.tmp");
569        let final_path = self.ix_dir.join("shard.ix");
570        let temp_trigrams_path = self.ix_dir.join("shard.ix.tmp.trigrams");
571
572        let mut f = BufWriter::new(File::create(&tmp_path)?);
573        f.write_all(&[0u8; HEADER_SIZE])?;
574
575        let file_table_offset = self.align_to_8(&mut f)?;
576        let mut files_reader = File::open(self.ix_dir.join("shard.ix.tmp.files"))?;
577        std::io::copy(&mut files_reader, &mut f)?;
578        let file_table_size = f.stream_position()? - file_table_offset;
579
580        self.align_to_8(&mut f)?;
581        let posting_data_offset = f.stream_position()?;
582
583        let mut trigram_table_writer = BufWriter::new(File::create(&temp_trigrams_path)?);
584        let mut global_trigram_count = 0u32;
585
586        let mut runs = Vec::new();
587        for path in &self.temp_runs {
588            runs.push(RunIterator::new(path)?);
589        }
590
591        let mut heap = BinaryHeap::new();
592        let mut current_items = vec![None; runs.len()];
593
594        for (i, run) in runs.iter_mut().enumerate() {
595            if let Some(item) = run.next_trigram()? {
596                heap.push(MergeItem {
597                    tri: item.0,
598                    run_idx: i,
599                });
600                current_items[i] = Some(item);
601            }
602        }
603
604        let mut current_tri: Option<Trigram> = None;
605        let mut merged_entries: Vec<PostingEntry> = Vec::new();
606
607        while let Some(MergeItem { tri, run_idx }) = heap.pop() {
608            if Some(tri) != current_tri {
609                if let Some(t) = current_tri {
610                    self.write_merged_posting(
611                        &mut f,
612                        &mut trigram_table_writer,
613                        t,
614                        posting_data_offset,
615                        &mut merged_entries,
616                    )?;
617                    global_trigram_count += 1;
618                    merged_entries.clear();
619                }
620                current_tri = Some(tri);
621            }
622
623            let item = current_items[run_idx].take().unwrap();
624            merged_entries.extend(item.1);
625
626            if let Some(next_item) = runs[run_idx].next_trigram()? {
627                heap.push(MergeItem {
628                    tri: next_item.0,
629                    run_idx,
630                });
631                current_items[run_idx] = Some(next_item);
632            }
633        }
634
635        if let Some(t) = current_tri {
636            self.write_merged_posting(
637                &mut f,
638                &mut trigram_table_writer,
639                t,
640                posting_data_offset,
641                &mut merged_entries,
642            )?;
643            global_trigram_count += 1;
644        }
645
646        self.stats.unique_trigrams = global_trigram_count as u64;
647        let posting_data_size = f.stream_position()? - posting_data_offset;
648
649        self.align_to_8(&mut f)?;
650        let trigram_table_offset = f.stream_position()?;
651        trigram_table_writer.flush()?;
652        drop(trigram_table_writer);
653
654        let mut trigram_table_file = File::open(&temp_trigrams_path)?;
655        std::io::copy(&mut trigram_table_file, &mut f)?;
656        let trigram_table_size = f.stream_position()? - trigram_table_offset;
657
658        self.align_to_8(&mut f)?;
659        let bloom_offset = f.stream_position()?;
660        let mut blooms_reader = File::open(self.ix_dir.join("shard.ix.tmp.blooms"))?;
661        std::io::copy(&mut blooms_reader, &mut f)?;
662        let bloom_size = f.stream_position()? - bloom_offset;
663
664        self.align_to_8(&mut f)?;
665        let string_pool_offset = f.stream_position()?;
666        let mut strings_reader = File::open(self.ix_dir.join("shard.ix.tmp.strings"))?;
667        std::io::copy(&mut strings_reader, &mut f)?;
668        let string_pool_size = f.stream_position()? - string_pool_offset;
669
670        let name_index_offset = f.stream_position()?;
671        let name_index_size = 0u64;
672
673        let created_at = SystemTime::now()
674            .duration_since(UNIX_EPOCH)
675            .unwrap()
676            .as_micros() as u64;
677        let mut header_bytes = [0u8; HEADER_SIZE];
678        header_bytes[0..4].copy_from_slice(&MAGIC);
679        header_bytes[0x04..0x06].copy_from_slice(&VERSION_MAJOR.to_le_bytes());
680        header_bytes[0x06..0x08].copy_from_slice(&VERSION_MINOR.to_le_bytes());
681        header_bytes[0x08..0x10].copy_from_slice(
682            &(flags::HAS_BLOOM_FILTERS
683                | flags::HAS_CONTENT_HASHES
684                | flags::POSTING_LISTS_CHECKSUMMED)
685                .to_le_bytes(),
686        );
687        header_bytes[0x10..0x18].copy_from_slice(&created_at.to_le_bytes());
688        header_bytes[0x18..0x20].copy_from_slice(&self.stats.bytes_scanned.to_le_bytes());
689        header_bytes[0x20..0x24].copy_from_slice(&self.file_count.to_le_bytes());
690        header_bytes[0x24..0x28].copy_from_slice(&(global_trigram_count).to_le_bytes());
691        header_bytes[0x28..0x30].copy_from_slice(&file_table_offset.to_le_bytes());
692        header_bytes[0x30..0x38].copy_from_slice(&file_table_size.to_le_bytes());
693        header_bytes[0x38..0x40].copy_from_slice(&trigram_table_offset.to_le_bytes());
694        header_bytes[0x40..0x48].copy_from_slice(&trigram_table_size.to_le_bytes());
695        header_bytes[0x48..0x50].copy_from_slice(&posting_data_offset.to_le_bytes());
696        header_bytes[0x50..0x58].copy_from_slice(&posting_data_size.to_le_bytes());
697        header_bytes[0x58..0x60].copy_from_slice(&bloom_offset.to_le_bytes());
698        header_bytes[0x60..0x68].copy_from_slice(&bloom_size.to_le_bytes());
699        header_bytes[0x68..0x70].copy_from_slice(&string_pool_offset.to_le_bytes());
700        header_bytes[0x70..0x78].copy_from_slice(&string_pool_size.to_le_bytes());
701        header_bytes[0x78..0x80].copy_from_slice(&name_index_offset.to_le_bytes());
702        header_bytes[0x80..0x88].copy_from_slice(&name_index_size.to_le_bytes());
703
704        let crc = crc32c::crc32c(&header_bytes[0..0xF8]);
705        header_bytes[0xF8..0xFC].copy_from_slice(&crc.to_le_bytes());
706
707        f.seek(SeekFrom::Start(0))?;
708        f.write_all(&header_bytes)?;
709        f.flush()?;
710        drop(f);
711
712        fs::rename(&tmp_path, &final_path)?;
713
714        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.files"));
715        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.blooms"));
716        let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.strings"));
717        let _ = fs::remove_file(&temp_trigrams_path);
718        for path in &self.temp_runs {
719            let _ = fs::remove_file(path);
720        }
721        self.temp_runs.clear();
722
723        Ok(final_path)
724    }
725
726    fn merge_to_run(&self, run_paths: &[PathBuf], out_path: &Path) -> Result<()> {
727        let mut runs = Vec::new();
728        for path in run_paths {
729            runs.push(RunIterator::new(path)?);
730        }
731        let mut heap = BinaryHeap::new();
732        let mut current_items = vec![None; runs.len()];
733        for (i, run) in runs.iter_mut().enumerate() {
734            if let Some(item) = run.next_trigram()? {
735                heap.push(MergeItem {
736                    tri: item.0,
737                    run_idx: i,
738                });
739                current_items[i] = Some(item);
740            }
741        }
742        let mut out = BufWriter::new(File::create(out_path)?);
743        let mut current_tri: Option<Trigram> = None;
744        let mut merged_entries: Vec<PostingEntry> = Vec::new();
745        while let Some(MergeItem { tri, run_idx }) = heap.pop() {
746            if Some(tri) != current_tri {
747                if let Some(t) = current_tri {
748                    self.write_run_entry(&mut out, t, &mut merged_entries)?;
749                    merged_entries.clear();
750                }
751                current_tri = Some(tri);
752            }
753            let item = current_items[run_idx].take().unwrap();
754            merged_entries.extend(item.1);
755            if let Some(next_item) = runs[run_idx].next_trigram()? {
756                heap.push(MergeItem {
757                    tri: next_item.0,
758                    run_idx,
759                });
760                current_items[run_idx] = Some(next_item);
761            }
762        }
763        if let Some(t) = current_tri {
764            self.write_run_entry(&mut out, t, &mut merged_entries)?;
765        }
766        out.flush()?;
767        Ok(())
768    }
769
770    fn write_run_entry<W: Write>(
771        &self,
772        w: &mut W,
773        tri: Trigram,
774        entries: &mut [PostingEntry],
775    ) -> Result<()> {
776        entries.sort_by_key(|e| e.file_id);
777        w.write_all(&tri.to_le_bytes())?;
778        w.write_all(&(entries.len() as u32).to_le_bytes())?;
779        for entry in entries {
780            w.write_all(&entry.file_id.to_le_bytes())?;
781            w.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
782            for off in &entry.offsets {
783                w.write_all(&off.to_le_bytes())?;
784            }
785        }
786        Ok(())
787    }
788
789    fn write_merged_posting<W: Write + Seek>(
790        &self,
791        f: &mut W,
792        table: &mut W,
793        tri: Trigram,
794        base_off: u64,
795        entries: &mut [PostingEntry],
796    ) -> Result<()> {
797        entries.sort_by_key(|e| e.file_id);
798        let count = entries.len() as u32;
799        let list = PostingList {
800            entries: entries.to_vec(),
801        };
802        let encoded = list.encode();
803        let offset = f.stream_position()? - base_off;
804        f.write_all(&encoded)?;
805        let abs_off = base_off + offset;
806        table.write_all(&tri.to_le_bytes())?;
807        table.write_all(&abs_off.to_le_bytes()[..6])?;
808        table.write_all(&(encoded.len() as u32).to_le_bytes())?;
809        table.write_all(&count.to_le_bytes())?;
810        table.write_all(&[0u8; 2])?;
811        Ok(())
812    }
813
814    fn align_to_8<W: Write + Seek>(&self, mut w: W) -> std::io::Result<u64> {
815        let pos = w.stream_position()?;
816        let padding = (8 - (pos % 8)) % 8;
817        if padding > 0 {
818            w.write_all(&vec![0u8; padding as usize])?;
819        }
820        w.stream_position()
821    }
822}