1use crate::bloom::BloomFilter;
8use crate::decompress::maybe_decompress;
9use crate::error::{Error, Result};
10use crate::format::{is_binary, FileStatus, HEADER_SIZE, MAGIC, VERSION_MAJOR, VERSION_MINOR, flags};
11use crate::posting::{PostingEntry, PostingList};
12use crate::trigram::{Extractor, Trigram};
13use ignore::WalkBuilder;
14use libc;
15use llmosafe::ResourceGuard;
16use memmap2::Mmap;
17use std::collections::{BinaryHeap, HashMap};
18use std::fs::{self, File};
19use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
20use std::path::{Path, PathBuf};
21use std::time::{Instant, SystemTime, UNIX_EPOCH};
22
23pub struct Builder {
26 root: PathBuf,
27 ix_dir: PathBuf,
28 file_count: u32,
29
30 files_writer: BufWriter<File>,
32 blooms_writer: BufWriter<File>,
33 strings_writer: BufWriter<File>,
34
35 postings: HashMap<Trigram, Vec<PostingEntry>>,
37 postings_count: usize,
38 temp_runs: Vec<PathBuf>,
39
40 extractor: Extractor,
41 stats: BuildStats,
42 decompress: bool,
43 resource_guard: Option<ResourceGuard>,
44 dead_ends: Vec<PathBuf>,
45}
46
47#[derive(Default, Debug)]
49pub struct BuildStats {
50 pub files_scanned: u64,
52 pub files_skipped_binary: u64,
54 pub files_skipped_size: u64,
56 pub bytes_scanned: u64,
58 pub unique_trigrams: u64,
60}
61
62struct RunIterator {
63 file: BufReader<File>,
64}
65
66impl RunIterator {
67 fn new(path: &Path) -> Result<Self> {
68 let f = File::open(path)?;
69 Ok(Self {
70 file: BufReader::new(f),
71 })
72 }
73
74 fn next_trigram(&mut self) -> Result<Option<(Trigram, Vec<PostingEntry>)>> {
75 let mut tri_buf = [0u8; 4];
76 if let Err(e) = self.file.read_exact(&mut tri_buf) {
77 if e.kind() == std::io::ErrorKind::UnexpectedEof {
78 return Ok(None);
79 }
80 return Err(e.into());
81 }
82 let tri = u32::from_le_bytes(tri_buf);
83
84 let mut len_buf = [0u8; 4];
85 self.file.read_exact(&mut len_buf)?;
86 let entries_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
87
88 let mut entries = Vec::with_capacity(entries_len);
89 for _ in 0..entries_len {
90 self.file.read_exact(&mut len_buf)?;
91 let file_id = u32::from_le_bytes(len_buf);
92
93 self.file.read_exact(&mut len_buf)?;
94 let offsets_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
95
96 let mut offsets = Vec::with_capacity(offsets_len);
97 for _ in 0..offsets_len {
98 self.file.read_exact(&mut len_buf)?;
99 offsets.push(u32::from_le_bytes(len_buf));
100 }
101 entries.push(PostingEntry { file_id, offsets });
102 }
103
104 Ok(Some((tri, entries)))
105 }
106}
107
108#[derive(Eq, PartialEq)]
109struct MergeItem {
110 tri: Trigram,
111 run_idx: usize,
112}
113
114impl PartialOrd for MergeItem {
115 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
116 Some(self.cmp(other))
117 }
118}
119
120impl Ord for MergeItem {
121 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
122 other.tri.cmp(&self.tri) }
124}
125
126#[allow(clippy::as_conversions)] #[allow(clippy::indexing_slicing)] impl Builder {
129 pub fn new(root: &Path) -> Result<Self> {
136 let ix_dir = root.join(".ix");
137 fs::create_dir_all(&ix_dir)?;
138
139 let files_tmp = ix_dir.join("shard.ix.tmp.files");
140 let blooms_tmp = ix_dir.join("shard.ix.tmp.blooms");
141 let strings_tmp = ix_dir.join("shard.ix.tmp.strings");
142
143 let files_writer = BufWriter::new(File::create(&files_tmp)?);
144 let blooms_writer = BufWriter::new(File::create(&blooms_tmp)?);
145 let mut strings_writer = BufWriter::new(File::create(&strings_tmp)?);
146
147 strings_writer.write_all(&1u32.to_le_bytes())?;
148 strings_writer.write_all(&0u16.to_le_bytes())?;
149 strings_writer.write_all(&0u16.to_le_bytes())?;
150 strings_writer.write_all(&[0u8; 2])?;
151
152 Ok(Self {
153 root: root.to_owned(),
154 ix_dir,
155 file_count: 0,
156 files_writer,
157 blooms_writer,
158 strings_writer,
159 postings: HashMap::new(),
160 postings_count: 0,
161 temp_runs: Vec::new(),
162 extractor: Extractor::new(),
163 stats: BuildStats::default(),
164 decompress: false,
165 resource_guard: None,
166 dead_ends: Vec::new(),
167 })
168 }
169
170 #[must_use]
172 pub const fn with_resource_guard(mut self, guard: ResourceGuard) -> Self {
173 self.resource_guard = Some(guard);
174 self
175 }
176
177 pub const fn set_decompress(&mut self, decompress: bool) {
180 self.decompress = decompress;
181 }
182
183 fn flush_run(&mut self) -> Result<()> {
184 if self.postings.is_empty() {
185 return Ok(());
186 }
187 let old_postings = std::mem::take(&mut self.postings);
188 let mut sorted: Vec<_> = old_postings.into_iter().collect();
189 sorted.sort_unstable_by_key(|(t, _)| *t);
190
191 let run_path = self
192 .ix_dir
193 .join(format!("shard.ix.run.{}", self.temp_runs.len()));
194 let mut f = BufWriter::new(File::create(&run_path)?);
195
196 for (tri, entries) in sorted {
197 f.write_all(&tri.to_le_bytes())?;
198 f.write_all(&u32::try_from(entries.len()).unwrap_or(0).to_le_bytes())?;
199 for entry in entries {
200 f.write_all(&entry.file_id.to_le_bytes())?;
201 f.write_all(&u32::try_from(entry.offsets.len()).unwrap_or(0).to_le_bytes())?;
202 for off in entry.offsets {
203 f.write_all(&off.to_le_bytes())?;
204 }
205 }
206 }
207 f.flush()?;
208
209 self.temp_runs.push(run_path);
210 self.postings_count = 0;
211 Ok(())
212 }
213
214 pub fn build(&mut self) -> Result<PathBuf> {
220 if self.ix_dir.exists()
222 && let Ok(entries) = std::fs::read_dir(&self.ix_dir)
223 {
224 for entry in entries.flatten() {
225 let name = entry.file_name();
226 let name_str = name.to_string_lossy();
227 if (name_str.starts_with("shard.ix.run.")
228 || name_str.starts_with("shard.ix.merged."))
229 && let Err(e) = std::fs::remove_file(entry.path())
230 {
231 tracing::warn!("Failed to cleanup shard file {}: {}", name_str, e);
232 }
233 }
234 }
235
236 let start = Instant::now();
237 let root = self.root.clone();
238
239 if root.to_string_lossy() == "/" {
241 tracing::warn!(
242 "LLMOSafe Advisory: Indexing root filesystem. Ensure adequate resource guards are in place."
243 );
244 }
245
246 let walker = WalkBuilder::new(&root)
247 .hidden(false)
248 .git_ignore(true)
249 .require_git(false)
250 .add_custom_ignore_filename(".ixignore")
251 .filter_entry(move |entry| {
252 let path = entry.path();
253 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
254
255 if entry.file_type().is_some_and(|t| t.is_dir())
256 && (name == "lost+found"
257 || name == ".git"
258 || name == "node_modules"
259 || name == "target"
260 || name == "__pycache__"
261 || name == ".tox"
262 || name == ".venv"
263 || name == "venv"
264 || name == ".ix")
265 {
266 return false;
267 }
268
269 if entry.file_type().is_some_and(|t| t.is_file()) {
270 if let Ok(metadata) = entry.metadata()
271 && metadata.len() > 10 * 1024 * 1024
272 {
273 return false;
274 }
275 if name == "Cargo.lock"
276 || name == "package-lock.json"
277 || name == "pnpm-lock.yaml"
278 || name == "shard.ix"
279 || name == "shard.ix.tmp"
280 || name.starts_with("shard.ix.")
281 {
282 return false;
283 }
284 }
285
286 if entry.file_type().is_some_and(|t| t.is_file()) {
287 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
288 match ext {
289 "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" | "jpg" | "png"
290 | "gif" | "mp4" | "mp3" | "pdf" | "zip" | "7z" | "rar" | "sqlite"
291 | "db" | "bin" => return false,
292 _ => {}
293 }
294 if name.ends_with(".tar.gz") {
295 return false;
296 }
297 }
298 true
299 })
300 .build();
301
302 let mut files_processed = 0u64;
303 for entry_res in walker {
304 let entry = match entry_res {
305 Ok(e) => e,
306 Err(e) => {
307 let backtrack_path = match &e {
309 ignore::Error::Io(io_err) if io_err.raw_os_error() == Some(-7) => {
310 Some(None)
311 }
312 ignore::Error::WithPath { path, err } => {
313 if let ignore::Error::Io(io_err) = err.as_ref() {
314 if io_err.raw_os_error() == Some(-7) {
315 Some(Some(path.clone()))
316 } else {
317 None
318 }
319 } else {
320 None
321 }
322 }
323 _ => None,
324 };
325
326 if let Some(path_opt) = backtrack_path {
327 tracing::warn!(
328 "Immune Memory Triggered: Skipping path due to backtrack signal."
329 );
330 if let Some(path) = path_opt {
331 self.dead_ends.push(path);
332 }
333 }
334 continue;
335 }
336 };
337
338 if entry.file_type().is_some_and(|t| t.is_file()) {
339 self.process_file(entry.path())?;
340 files_processed += 1;
341
342 #[allow(clippy::manual_is_multiple_of)]
345 if files_processed % 250 == 0 {
346 if let Some(guard) = &self.resource_guard {
347 if let Err(_err) = guard.check() {
348 eprintln!(
349 "ixd: memory ceiling reached... flushing intermediate chunk ({files_processed} files processed)"
350 );
351 self.flush_run()?;
352 }
353 } else {
354 if let Ok(rss) = Self::current_rss_bytes()
356 && rss > 512 * 1024 * 1024
357 {
358 eprintln!(
359 "ixd: RSS ceiling reached ({} MB) after {} files — flushing intermediate chunk",
360 rss / 1024 / 1024,
361 files_processed
362 );
363 self.flush_run()?;
364 }
365 }
366 }
367 }
368 }
369
370 let output_path = self.serialize()?;
371 tracing::info!("Build completed in {:?}: {:?}", start.elapsed(), self.stats);
372 Ok(output_path)
373 }
374
375 pub fn update(&mut self, _changed_files: &[PathBuf]) -> Result<PathBuf> {
381 self.build()
382 }
383
384 #[must_use]
386 pub const fn files_len(&self) -> usize {
387 self.file_count as usize
388 }
389
390 #[must_use]
392 pub const fn trigrams_len(&self) -> usize {
393 self.stats.unique_trigrams as usize
394 }
395
396 fn current_rss_bytes() -> std::io::Result<u64> {
398 let status = std::fs::read_to_string("/proc/self/status")?;
399 for line in status.lines() {
400 if let Some(rest) = line.strip_prefix("VmRSS:") {
401 let kb: u64 = rest
402 .split_whitespace()
403 .next()
404 .and_then(|s| s.parse().ok())
405 .unwrap_or(0);
406 return Ok(kb * 1024);
407 }
408 }
409 Ok(0)
410 }
411
412 fn free_bytes_at(path: &Path) -> std::io::Result<u64> {
414 use std::os::unix::ffi::OsStrExt;
415 let path_c = std::ffi::CString::new(path.as_os_str().as_bytes())
416 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
417 let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
418 let ret = unsafe { libc::statvfs(path_c.as_ptr(), &raw mut stat) };
419 if ret != 0 {
420 return Err(std::io::Error::last_os_error());
421 }
422 #[allow(clippy::unnecessary_cast)]
423 Ok(stat.f_bavail as u64 * stat.f_frsize as u64)
424 }
425
426 fn process_file(&mut self, path: &Path) -> Result<bool> {
427 let metadata = match fs::metadata(path) {
429 Ok(m) => m,
430 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
431 Err(e) => return Err(e.into()),
432 };
433 let size = metadata.len();
434 let mtime = metadata
435 .modified()?
436 .duration_since(UNIX_EPOCH)
437 .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(0));
438
439 if size > 10 * 1024 * 1024 {
440 self.stats.files_skipped_size += 1;
441 return Ok(false);
442 }
443
444 let file = match File::open(path) {
445 Ok(f) => f,
446 Err(e)
447 if e.kind() == std::io::ErrorKind::NotFound
448 || e.kind() == std::io::ErrorKind::PermissionDenied =>
449 {
450 return Ok(false);
451 }
452 Err(e) => return Err(e.into()),
453 };
454 let mmap = unsafe { Mmap::map(&file)? };
455
456 let raw_data = if self.decompress {
457 if let Some(mut reader) = maybe_decompress(path, &mmap)? {
458 let mut buf = Vec::new();
459 reader
460 .by_ref()
461 .take(10 * 1024 * 1024)
462 .read_to_end(&mut buf)?;
463 std::borrow::Cow::Owned(buf)
464 } else {
465 std::borrow::Cow::Borrowed(&mmap[..])
466 }
467 } else {
468 std::borrow::Cow::Borrowed(&mmap[..])
469 };
470
471 let data = &raw_data[..];
472 if is_binary(data) {
473 self.stats.files_skipped_binary += 1;
474 return Ok(false);
475 }
476
477 let content_hash = xxhash_rust::xxh64::xxh64(data, 0);
478 let pairs = self.extractor.extract_with_offsets(data);
479
480 let file_id = self.file_count;
481 self.file_count += 1;
482
483 let path_str = path.to_string_lossy();
484 let path_bytes = path_str.as_bytes();
485 let path_off = u32::try_from(self.strings_writer.stream_position()?).unwrap_or(0);
486 let path_len = u16::try_from(path_bytes.len()).unwrap_or(0);
487
488 self.strings_writer.write_all(&0u16.to_le_bytes())?;
489 self.strings_writer.write_all(&path_len.to_le_bytes())?;
490 self.strings_writer.write_all(path_bytes)?;
491
492 let mut bloom = BloomFilter::new(256, 5);
493 let mut trigram_count = 0u32;
494
495 let mut i = 0;
496 while i < pairs.len() && trigram_count < 20_000 {
497 let tri = pairs[i].0;
498 let mut j = i + 1;
499 while j < pairs.len() && pairs[j].0 == tri {
500 j += 1;
501 }
502
503 let take_count = (j - i).min(10_000);
504 let offsets: Vec<u32> = pairs[i..i + take_count].iter().map(|p| p.1).collect();
505
506 bloom.insert(tri);
507 self.postings
508 .entry(tri)
509 .or_default()
510 .push(PostingEntry { file_id, offsets });
511 self.postings_count += take_count + 8;
512
513 trigram_count += 1;
514 i = j;
515 }
516
517 bloom.serialize(&mut self.blooms_writer)?;
518
519 let bloom_offset = file_id * 260;
520 self.files_writer.write_all(&file_id.to_le_bytes())?;
521 self.files_writer.write_all(&path_off.to_le_bytes())?;
522 self.files_writer.write_all(&path_len.to_le_bytes())?;
523 self.files_writer.write_all(&[FileStatus::Fresh as u8])?;
524 self.files_writer.write_all(&[0u8])?;
525 self.files_writer.write_all(&mtime.to_le_bytes())?;
526 self.files_writer.write_all(&size.to_le_bytes())?;
527 self.files_writer.write_all(&content_hash.to_le_bytes())?;
528 self.files_writer.write_all(&trigram_count.to_le_bytes())?;
529 self.files_writer.write_all(&bloom_offset.to_le_bytes())?;
530 self.files_writer.write_all(&[0u8; 4])?;
531
532 self.stats.files_scanned += 1;
533 self.stats.bytes_scanned += size;
534
535 if self.postings_count >= 500_000 {
538 self.flush_run()?;
539 }
540
541 Ok(true)
542 }
543
544 fn serialize(&mut self) -> Result<PathBuf> {
545 if let Ok(free) = Self::free_bytes_at(&self.ix_dir) {
547 const MIN_FREE: u64 = 100 * 1024 * 1024; if free < MIN_FREE {
549 return Err(crate::error::Error::Io(std::io::Error::other(format!(
550 "insufficient disk space: {} MB free, need ≥100 MB (path: {})",
551 free / 1024 / 1024,
552 self.ix_dir.display()
553 ))));
554 }
555 }
556 self.flush_run()?;
557
558 self.files_writer.flush()?;
559 self.blooms_writer.flush()?;
560 self.strings_writer.flush()?;
561
562 while self.temp_runs.len() > 128 {
564 let mut next_generation = Vec::new();
565 for chunk in self.temp_runs.chunks(128) {
566 let out_path = self.ix_dir.join(format!(
567 "shard.ix.merged.{}.{}",
568 next_generation.len(),
569 SystemTime::now()
570 .duration_since(UNIX_EPOCH)
571 .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
572 .as_micros()
573 ));
574 Self::merge_to_run(chunk, &out_path)?;
575 next_generation.push(out_path);
576 for p in chunk {
577 if let Err(e) = fs::remove_file(p) {
578 tracing::warn!("Failed to cleanup temp run {}: {}", p.display(), e);
579 }
580 }
581 }
582 self.temp_runs = next_generation;
583 }
584
585 let tmp_path = self.ix_dir.join("shard.ix.tmp");
586 let final_path = self.ix_dir.join("shard.ix");
587 let temp_trigrams_path = self.ix_dir.join("shard.ix.tmp.trigrams");
588
589 let mut f = BufWriter::new(File::create(&tmp_path)?);
590 f.write_all(&[0u8; HEADER_SIZE])?;
591
592 let file_table_offset = Self::align_to_8(&mut f)?;
593 let mut files_reader = File::open(self.ix_dir.join("shard.ix.tmp.files"))?;
594 std::io::copy(&mut files_reader, &mut f)?;
595 let file_table_size = f.stream_position()? - file_table_offset;
596
597 Self::align_to_8(&mut f)?;
598 let posting_data_offset = f.stream_position()?;
599
600 let mut trigram_table_writer = BufWriter::new(File::create(&temp_trigrams_path)?);
601 let mut global_trigram_count = 0u32;
602
603 let mut runs = Vec::new();
604 for path in &self.temp_runs {
605 runs.push(RunIterator::new(path)?);
606 }
607
608 let mut heap = BinaryHeap::new();
609 let mut current_items = vec![None; runs.len()];
610
611 for (i, run) in runs.iter_mut().enumerate() {
612 if let Some(item) = run.next_trigram()? {
613 heap.push(MergeItem {
614 tri: item.0,
615 run_idx: i,
616 });
617 current_items[i] = Some(item);
618 }
619 }
620
621 let mut current_tri: Option<Trigram> = None;
622 let mut merged_entries: Vec<PostingEntry> = Vec::new();
623
624 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
625 if Some(tri) != current_tri {
626 if let Some(t) = current_tri {
627 Self::write_merged_posting(
628 &mut f,
629 &mut trigram_table_writer,
630 t,
631 posting_data_offset,
632 &mut merged_entries,
633 )?;
634 global_trigram_count += 1;
635 merged_entries.clear();
636 }
637 current_tri = Some(tri);
638 }
639
640 let item = current_items
641 .get_mut(run_idx)
642 .ok_or(Error::Config("run_idx out of bounds".into()))?
643 .take()
644 .ok_or(Error::Config("current item is None".into()))?;
645 merged_entries.extend(item.1);
646
647 if let Some(next_item) = runs
648 .get_mut(run_idx)
649 .ok_or(Error::Config("run_idx out of bounds".into()))?
650 .next_trigram()?
651 {
652 heap.push(MergeItem {
653 tri: next_item.0,
654 run_idx,
655 });
656 *current_items
657 .get_mut(run_idx)
658 .ok_or(Error::Config("run_idx out of bounds".into()))? =
659 Some(next_item);
660 }
661 }
662
663 if let Some(t) = current_tri {
664 Self::write_merged_posting(
665 &mut f,
666 &mut trigram_table_writer,
667 t,
668 posting_data_offset,
669 &mut merged_entries,
670 )?;
671 global_trigram_count += 1;
672 }
673
674 self.stats.unique_trigrams = u64::from(global_trigram_count);
675 let posting_data_size = f.stream_position()? - posting_data_offset;
676
677 Self::align_to_8(&mut f)?;
678 let trigram_table_offset = f.stream_position()?;
679 trigram_table_writer.flush()?;
680 drop(trigram_table_writer);
681
682 let mut trigram_table_file = File::open(&temp_trigrams_path)?;
683 std::io::copy(&mut trigram_table_file, &mut f)?;
684 let trigram_table_size = f.stream_position()? - trigram_table_offset;
685
686 Self::align_to_8(&mut f)?;
687 let bloom_offset = f.stream_position()?;
688 let mut blooms_reader = File::open(self.ix_dir.join("shard.ix.tmp.blooms"))?;
689 std::io::copy(&mut blooms_reader, &mut f)?;
690 let bloom_size = f.stream_position()? - bloom_offset;
691
692 Self::align_to_8(&mut f)?;
693 let string_pool_offset = f.stream_position()?;
694 let mut strings_reader = File::open(self.ix_dir.join("shard.ix.tmp.strings"))?;
695 std::io::copy(&mut strings_reader, &mut f)?;
696 let string_pool_size = f.stream_position()? - string_pool_offset;
697
698 let name_index_offset = f.stream_position()?;
699 let name_index_size = 0u64;
700
701 let created_at = u64::try_from(
702 SystemTime::now()
703 .duration_since(UNIX_EPOCH)
704 .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
705 .as_micros(),
706 )
707 .unwrap_or(0);
708 let mut header_bytes = [0u8; HEADER_SIZE];
709 header_bytes[0..4].copy_from_slice(&MAGIC);
710 header_bytes[0x04..0x06].copy_from_slice(&VERSION_MAJOR.to_le_bytes());
711 header_bytes[0x06..0x08].copy_from_slice(&VERSION_MINOR.to_le_bytes());
712 header_bytes[0x08..0x10].copy_from_slice(
713 &(flags::HAS_BLOOM_FILTERS
714 | flags::HAS_CONTENT_HASHES
715 | flags::POSTING_LISTS_CHECKSUMMED)
716 .to_le_bytes(),
717 );
718 header_bytes[0x10..0x18].copy_from_slice(&created_at.to_le_bytes());
719 header_bytes[0x18..0x20].copy_from_slice(&self.stats.bytes_scanned.to_le_bytes());
720 header_bytes[0x20..0x24].copy_from_slice(&self.file_count.to_le_bytes());
721 header_bytes[0x24..0x28].copy_from_slice(&(global_trigram_count).to_le_bytes());
722 header_bytes[0x28..0x30].copy_from_slice(&file_table_offset.to_le_bytes());
723 header_bytes[0x30..0x38].copy_from_slice(&file_table_size.to_le_bytes());
724 header_bytes[0x38..0x40].copy_from_slice(&trigram_table_offset.to_le_bytes());
725 header_bytes[0x40..0x48].copy_from_slice(&trigram_table_size.to_le_bytes());
726 header_bytes[0x48..0x50].copy_from_slice(&posting_data_offset.to_le_bytes());
727 header_bytes[0x50..0x58].copy_from_slice(&posting_data_size.to_le_bytes());
728 header_bytes[0x58..0x60].copy_from_slice(&bloom_offset.to_le_bytes());
729 header_bytes[0x60..0x68].copy_from_slice(&bloom_size.to_le_bytes());
730 header_bytes[0x68..0x70].copy_from_slice(&string_pool_offset.to_le_bytes());
731 header_bytes[0x70..0x78].copy_from_slice(&string_pool_size.to_le_bytes());
732 header_bytes[0x78..0x80].copy_from_slice(&name_index_offset.to_le_bytes());
733 header_bytes[0x80..0x88].copy_from_slice(&name_index_size.to_le_bytes());
734
735 let crc = crc32c::crc32c(&header_bytes[0..0xF8]);
736 header_bytes[0xF8..0xFC].copy_from_slice(&crc.to_le_bytes());
737
738 f.seek(SeekFrom::Start(0))?;
739 f.write_all(&header_bytes)?;
740 f.flush()?;
741 drop(f);
742
743 let backup_path = self.ix_dir.join("shard.ix.bak");
745 let old_index_exists = final_path.exists();
746 if old_index_exists {
747 let _ = fs::remove_file(&backup_path);
749 if let Err(e) = fs::rename(&final_path, &backup_path) {
751 tracing::warn!("Failed to backup existing index: {}", e);
752 }
753 }
754 fs::rename(&tmp_path, &final_path)?;
755 if old_index_exists {
757 let _ = fs::remove_file(&backup_path);
758 }
759
760 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.files"));
761 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.blooms"));
762 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.strings"));
763 let _ = fs::remove_file(&temp_trigrams_path);
764 for path in &self.temp_runs {
765 if let Err(e) = fs::remove_file(path) {
766 tracing::warn!("Failed to cleanup temp run {}: {}", path.display(), e);
767 }
768 }
769 self.temp_runs.clear();
770
771 Ok(final_path)
772 }
773
774 fn merge_to_run(run_paths: &[PathBuf], out_path: &Path) -> Result<()> {
775 let mut runs = Vec::new();
776 for path in run_paths {
777 runs.push(RunIterator::new(path)?);
778 }
779 let mut heap = BinaryHeap::new();
780 let mut current_items = vec![None; runs.len()];
781 for (i, run) in runs.iter_mut().enumerate() {
782 if let Some(item) = run.next_trigram()? {
783 heap.push(MergeItem {
784 tri: item.0,
785 run_idx: i,
786 });
787 current_items[i] = Some(item);
788 }
789 }
790 let mut out = BufWriter::new(File::create(out_path)?);
791 let mut current_tri: Option<Trigram> = None;
792 let mut merged_entries: Vec<PostingEntry> = Vec::new();
793 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
794 if Some(tri) != current_tri {
795 if let Some(t) = current_tri {
796 Self::write_run_entry(&mut out, t, &mut merged_entries)?;
797 merged_entries.clear();
798 }
799 current_tri = Some(tri);
800 }
801 let item = current_items[run_idx].take().unwrap_or_else(|| {
802 unreachable!("run iterator invariant: item must exist at index {}", run_idx);
803 });
804 merged_entries.extend(item.1);
805 if let Some(next_item) = runs[run_idx].next_trigram()? {
806 heap.push(MergeItem {
807 tri: next_item.0,
808 run_idx,
809 });
810 current_items[run_idx] = Some(next_item);
811 }
812 }
813 if let Some(t) = current_tri {
814 Self::write_run_entry(&mut out, t, &mut merged_entries)?;
815 }
816 out.flush()?;
817 Ok(())
818 }
819
820 fn write_run_entry<W: Write>(
821 w: &mut W,
822 tri: Trigram,
823 entries: &mut [PostingEntry],
824 ) -> Result<()> {
825 entries.sort_by_key(|e| e.file_id);
826 w.write_all(&tri.to_le_bytes())?;
827 w.write_all(&(entries.len() as u32).to_le_bytes())?;
828 for entry in entries {
829 w.write_all(&entry.file_id.to_le_bytes())?;
830 w.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
831 for off in &entry.offsets {
832 w.write_all(&off.to_le_bytes())?;
833 }
834 }
835 Ok(())
836 }
837
838 fn write_merged_posting<W: Write + Seek>(
839 f: &mut W,
840 table: &mut W,
841 tri: Trigram,
842 base_off: u64,
843 entries: &mut [PostingEntry],
844 ) -> Result<()> {
845 entries.sort_by_key(|e| e.file_id);
846 let count = entries.len() as u32;
847 let list = PostingList {
848 entries: entries.to_vec(),
849 };
850 let encoded = list.encode();
851 let offset = f.stream_position()? - base_off;
852 f.write_all(&encoded)?;
853 let abs_off = base_off + offset;
854 table.write_all(&tri.to_le_bytes())?;
855 table.write_all(&abs_off.to_le_bytes()[..6])?;
856 table.write_all(&(encoded.len() as u32).to_le_bytes())?;
857 table.write_all(&count.to_le_bytes())?;
858 table.write_all(&[0u8; 2])?;
859 Ok(())
860 }
861
862 fn align_to_8<W: Write + Seek>(mut w: W) -> std::io::Result<u64> {
863 let pos = w.stream_position()?;
864 let padding = (8 - (pos % 8)) % 8;
865 if padding > 0 {
866 w.write_all(&vec![0u8; padding as usize])?;
867 }
868 w.stream_position()
869 }
870}