1use crate::bloom::BloomFilter;
8use crate::decompress::maybe_decompress;
9use crate::error::{Error, Result};
10use crate::format::{
11 FileStatus, HEADER_SIZE, MAGIC, VERSION_MAJOR, VERSION_MINOR, flags, is_binary,
12};
13use crate::posting::{PostingEntry, PostingList};
14use crate::trigram::{Extractor, Trigram};
15use crate::varint;
16use ignore::WalkBuilder;
17use libc;
18use llmosafe::ResourceGuard;
19use memmap2::Mmap;
20use std::collections::{BinaryHeap, HashMap};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24use std::time::{Instant, SystemTime, UNIX_EPOCH};
25
26pub struct Builder {
29 root: PathBuf,
30 ix_dir: PathBuf,
31 file_count: u32,
32
33 files_writer: Option<BufWriter<File>>,
36 blooms_writer: Option<BufWriter<File>>,
37 strings_writer: Option<BufWriter<File>>,
38
39 postings: HashMap<Trigram, Vec<PostingEntry>>,
41 postings_count: usize,
42 temp_runs: Vec<PathBuf>,
43
44 extractor: Extractor,
45 stats: BuildStats,
46 decompress: bool,
47 resource_guard: Option<ResourceGuard>,
48 dead_ends: Vec<PathBuf>,
49 max_file_size: u64,
50 committed: bool,
51}
52
53#[derive(Default, Debug)]
55pub struct BuildStats {
56 pub files_scanned: u64,
58 pub files_skipped_binary: u64,
60 pub files_skipped_size: u64,
62 pub bytes_scanned: u64,
64 pub unique_trigrams: u64,
66}
67
68struct RunIterator {
69 file: BufReader<File>,
70}
71
72impl RunIterator {
73 fn new(path: &Path) -> Result<Self> {
74 let f = File::open(path)?;
75 Ok(Self {
76 file: BufReader::new(f),
77 })
78 }
79
80 fn next_trigram(&mut self) -> Result<Option<(Trigram, Vec<PostingEntry>)>> {
81 let mut tri_buf = [0u8; 4];
82 if let Err(e) = self.file.read_exact(&mut tri_buf) {
83 if e.kind() == std::io::ErrorKind::UnexpectedEof {
84 return Ok(None);
85 }
86 return Err(e.into());
87 }
88 let tri = u32::from_le_bytes(tri_buf);
89
90 let mut len_buf = [0u8; 4];
91 self.file.read_exact(&mut len_buf)?;
92 let entries_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
93
94 let mut entries = Vec::with_capacity(entries_len);
95 for _ in 0..entries_len {
96 self.file.read_exact(&mut len_buf)?;
97 let file_id = u32::from_le_bytes(len_buf);
98
99 self.file.read_exact(&mut len_buf)?;
100 let offsets_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
101
102 let mut offsets = Vec::with_capacity(offsets_len);
103 for _ in 0..offsets_len {
104 self.file.read_exact(&mut len_buf)?;
105 offsets.push(u32::from_le_bytes(len_buf));
106 }
107 entries.push(PostingEntry { file_id, offsets });
108 }
109
110 Ok(Some((tri, entries)))
111 }
112}
113
114#[derive(Eq, PartialEq)]
115struct MergeItem {
116 tri: Trigram,
117 run_idx: usize,
118}
119
120impl PartialOrd for MergeItem {
121 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
122 Some(self.cmp(other))
123 }
124}
125
126impl Ord for MergeItem {
127 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
128 other.tri.cmp(&self.tri) }
130}
131
132#[allow(clippy::as_conversions)] #[allow(clippy::indexing_slicing)] impl Builder {
135 pub fn new(root: &Path) -> Result<Self> {
142 let ix_dir = root.join(".ix");
143 fs::create_dir_all(&ix_dir)?;
144
145 Ok(Self {
146 root: root.to_owned(),
147 ix_dir,
148 file_count: 0,
149 files_writer: None,
150 blooms_writer: None,
151 strings_writer: None,
152 postings: HashMap::new(),
153 postings_count: 0,
154 temp_runs: Vec::new(),
155 extractor: Extractor::new(),
156 stats: BuildStats::default(),
157 decompress: false,
158 resource_guard: None,
159 dead_ends: Vec::new(),
160 max_file_size: 100 * 1024 * 1024,
161 committed: false,
162 })
163 }
164
165 #[must_use]
167 pub const fn with_resource_guard(mut self, guard: ResourceGuard) -> Self {
168 self.resource_guard = Some(guard);
169 self
170 }
171
172 pub const fn set_decompress(&mut self, decompress: bool) {
175 self.decompress = decompress;
176 }
177
178 pub const fn set_max_file_size(&mut self, max_bytes: u64) {
180 self.max_file_size = max_bytes;
181 }
182
183 fn init_writers(&mut self) -> Result<()> {
194 let files_tmp = self.ix_dir.join("shard.ix.tmp.files");
195 let blooms_tmp = self.ix_dir.join("shard.ix.tmp.blooms");
196 let strings_tmp = self.ix_dir.join("shard.ix.tmp.strings");
197
198 self.files_writer = Some(BufWriter::new(File::create(&files_tmp)?));
199 self.blooms_writer = Some(BufWriter::new(File::create(&blooms_tmp)?));
200 let mut strings_writer = BufWriter::new(File::create(&strings_tmp)?);
201
202 strings_writer.write_all(&1u32.to_le_bytes())?; strings_writer.write_all(&0u16.to_le_bytes())?; strings_writer.write_all(&0u16.to_le_bytes())?; strings_writer.write_all(&[0u8; 2])?; self.strings_writer = Some(strings_writer);
210 Ok(())
211 }
212
213 fn cleanup_old_temp_files(&mut self) {
220 self.files_writer = None;
222 self.blooms_writer = None;
223 self.strings_writer = None;
224
225 let patterns = [
226 "shard.ix.tmp.files",
227 "shard.ix.tmp.blooms",
228 "shard.ix.tmp.strings",
229 "shard.ix.tmp",
230 "shard.ix.bak",
231 ];
232
233 for pattern in &patterns {
234 let path = self.ix_dir.join(pattern);
235 if path.exists() {
236 let _ = fs::remove_file(&path);
237 }
238 }
239
240 if let Ok(entries) = fs::read_dir(&self.ix_dir) {
241 for entry in entries.flatten() {
242 let name = entry.file_name().to_string_lossy().into_owned();
243 if name.starts_with("shard.ix.run.") || name.starts_with("shard.ix.merged.") {
244 let _ = fs::remove_file(entry.path());
245 }
246 }
247 }
248 }
249 fn get_writer<'a, T>(writer: &'a mut Option<T>, context: &'static str) -> Result<&'a mut T> {
253 writer.as_mut().ok_or_else(|| {
254 Error::Io(std::io::Error::other(format!(
255 "Builder invariant violated: {context} not initialized (clean-before-build contract)"
256 )))
257 })
258 }
259
260 fn flush_run(&mut self) -> Result<()> {
261 if self.postings.is_empty() {
262 return Ok(());
263 }
264 let old_postings = std::mem::take(&mut self.postings);
265 let mut sorted: Vec<_> = old_postings.into_iter().collect();
266 sorted.sort_unstable_by_key(|(t, _)| *t);
267
268 let run_path = self
269 .ix_dir
270 .join(format!("shard.ix.run.{}", self.temp_runs.len()));
271 let mut f = BufWriter::new(File::create(&run_path)?);
272
273 for (tri, entries) in sorted {
274 f.write_all(&tri.to_le_bytes())?;
275 f.write_all(&u32::try_from(entries.len()).unwrap_or(0).to_le_bytes())?;
276 for entry in entries {
277 f.write_all(&entry.file_id.to_le_bytes())?;
278 f.write_all(
279 &u32::try_from(entry.offsets.len())
280 .unwrap_or(0)
281 .to_le_bytes(),
282 )?;
283 for off in entry.offsets {
284 f.write_all(&off.to_le_bytes())?;
285 }
286 }
287 }
288 f.flush()?;
289
290 self.temp_runs.push(run_path);
291 self.postings_count = 0;
292 Ok(())
293 }
294
295 pub fn build(&mut self) -> Result<PathBuf> {
301 self.cleanup_old_temp_files();
306 self.init_writers()?;
307
308 self.postings.clear();
310 self.postings_count = 0;
311 self.temp_runs.clear();
312 self.file_count = 0;
313 self.stats = BuildStats::default();
314 self.dead_ends.clear();
315 self.committed = false;
316
317 if let Ok(free) = Self::free_bytes_at(&self.ix_dir) {
322 let existing_shard_size =
323 fs::metadata(self.ix_dir.join("shard.ix")).map_or(0, |m| m.len());
324 let required = if existing_shard_size > 0 {
325 existing_shard_size.saturating_mul(3)
326 } else {
327 200 * 1024 * 1024 };
329 if free < required {
330 return Err(Error::Io(std::io::Error::other(format!(
331 "insufficient disk space: {} MB free, need ≥{} MB (path: {})",
332 free / 1024 / 1024,
333 required / 1024 / 1024,
334 self.ix_dir.display(),
335 ))));
336 }
337 }
338
339 if self.ix_dir.exists()
341 && let Ok(entries) = std::fs::read_dir(&self.ix_dir)
342 {
343 for entry in entries.flatten() {
344 let name = entry.file_name();
345 let name_str = name.to_string_lossy();
346 if (name_str.starts_with("shard.ix.run.")
347 || name_str.starts_with("shard.ix.merged."))
348 && let Err(e) = std::fs::remove_file(entry.path())
349 {
350 tracing::warn!("Failed to cleanup shard file {}: {}", name_str, e);
351 }
352 }
353 }
354
355 let start = Instant::now();
356 let root = self.root.clone();
357
358 if root.to_string_lossy() == "/" {
360 tracing::warn!(
361 "LLMOSafe Advisory: Indexing root filesystem. Ensure adequate resource guards are in place."
362 );
363 }
364
365 let walker = WalkBuilder::new(&root)
366 .hidden(false)
367 .git_ignore(true)
368 .git_global(true)
369 .git_exclude(true)
370 .require_git(false)
371 .add_custom_ignore_filename(".ixignore")
372 .filter_entry(move |entry| {
373 let path = entry.path();
374 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
375
376 if entry.file_type().is_some_and(|t| t.is_dir())
377 && (name == "lost+found" || name == ".git" || name == ".ix")
378 {
379 return false;
380 }
381
382 if entry.file_type().is_some_and(|t| t.is_file())
383 && (name == "shard.ix"
384 || name == "shard.ix.tmp"
385 || name.starts_with("shard.ix."))
386 {
387 return false;
388 }
389
390 if entry.file_type().is_some_and(|t| t.is_file()) {
391 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
392 match ext {
393 "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" | "jpg" | "png"
394 | "gif" | "mp4" | "mp3" | "pdf" | "zip" | "7z" | "rar" | "sqlite"
395 | "db" | "bin" => return false,
396 _ => {}
397 }
398 if name.ends_with(".tar.gz") {
399 return false;
400 }
401 }
402 true
403 })
404 .build();
405
406 let mut files_processed = 0u64;
407 for entry_res in walker {
408 let entry = match entry_res {
409 Ok(e) => e,
410 Err(e) => {
411 let backtrack_path = match &e {
413 ignore::Error::Io(io_err) if io_err.raw_os_error() == Some(-7) => {
414 Some(None)
415 }
416 ignore::Error::WithPath { path, err } => {
417 if let ignore::Error::Io(io_err) = err.as_ref() {
418 if io_err.raw_os_error() == Some(-7) {
419 Some(Some(path.clone()))
420 } else {
421 None
422 }
423 } else {
424 None
425 }
426 }
427 _ => None,
428 };
429
430 if let Some(path_opt) = backtrack_path {
431 tracing::warn!(
432 "Immune Memory Triggered: Skipping path due to backtrack signal."
433 );
434 if let Some(path) = path_opt {
435 self.dead_ends.push(path);
436 }
437 }
438 continue;
439 }
440 };
441
442 if entry.file_type().is_some_and(|t| t.is_file()) {
443 self.process_file(entry.path())?;
444 files_processed += 1;
445
446 #[allow(clippy::manual_is_multiple_of)]
449 if files_processed % 250 == 0 {
450 if let Some(guard) = &self.resource_guard {
451 if let Err(_err) = guard.check() {
452 eprintln!(
453 "ixd: memory ceiling reached... flushing intermediate chunk ({files_processed} files processed)"
454 );
455 self.flush_run()?;
456 }
457 } else {
458 if let Ok(rss) = Self::current_rss_bytes()
460 && rss > 512 * 1024 * 1024
461 {
462 eprintln!(
463 "ixd: RSS ceiling reached ({} MB) after {} files — flushing intermediate chunk",
464 rss / 1024 / 1024,
465 files_processed
466 );
467 self.flush_run()?;
468 }
469 }
470 }
471 }
472 }
473
474 let output_path = self.serialize()?;
475 tracing::info!("Build completed in {:?}: {:?}", start.elapsed(), self.stats);
476 Ok(output_path)
477 }
478
479 pub fn update(&mut self, _changed_files: &[PathBuf]) -> Result<PathBuf> {
485 self.build()
486 }
487
488 #[must_use]
490 pub const fn files_len(&self) -> usize {
491 self.file_count as usize
492 }
493
494 #[must_use]
496 pub const fn trigrams_len(&self) -> usize {
497 self.stats.unique_trigrams as usize
498 }
499
500 fn current_rss_bytes() -> std::io::Result<u64> {
502 let status = std::fs::read_to_string("/proc/self/status")?;
503 for line in status.lines() {
504 if let Some(rest) = line.strip_prefix("VmRSS:") {
505 let kb: u64 = rest
506 .split_whitespace()
507 .next()
508 .and_then(|s| s.parse().ok())
509 .unwrap_or(0);
510 return Ok(kb * 1024);
511 }
512 }
513 Ok(0)
514 }
515
516 fn free_bytes_at(path: &Path) -> std::io::Result<u64> {
518 use std::os::unix::ffi::OsStrExt;
519 let path_c = std::ffi::CString::new(path.as_os_str().as_bytes())
520 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
521 let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
522 let ret = unsafe { libc::statvfs(path_c.as_ptr(), &raw mut stat) };
523 if ret != 0 {
524 return Err(std::io::Error::last_os_error());
525 }
526 #[allow(clippy::unnecessary_cast)]
527 Ok(stat.f_bavail as u64 * stat.f_frsize as u64)
528 }
529
530 fn cleanup_temp_files(&self) {
531 let paths = [
532 self.ix_dir.join("shard.ix.tmp.files"),
533 self.ix_dir.join("shard.ix.tmp.blooms"),
534 self.ix_dir.join("shard.ix.tmp.strings"),
535 self.ix_dir.join("shard.ix.tmp"),
536 self.ix_dir.join("shard.ix.bak"),
537 ];
538 for p in &paths {
539 if let Err(e) = fs::remove_file(p)
540 && e.kind() != std::io::ErrorKind::NotFound
541 {
542 tracing::warn!("Failed to cleanup temp file {}: {}", p.display(), e);
543 }
544 }
545 for run_path in &self.temp_runs {
546 if let Err(e) = fs::remove_file(run_path)
547 && e.kind() != std::io::ErrorKind::NotFound
548 {
549 tracing::warn!("Failed to cleanup temp run {}: {}", run_path.display(), e);
550 }
551 }
552 }
553
554 fn process_file(&mut self, path: &Path) -> Result<bool> {
555 let path_str = path.display().to_string();
556 let metadata = match fs::metadata(path) {
558 Ok(m) => m,
559 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
560 Err(e) => return Err(e.into()),
561 };
562 let size = metadata.len();
563 let mtime = metadata
564 .modified()?
565 .duration_since(UNIX_EPOCH)
566 .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(0));
567
568 if self.max_file_size > 0 && size > self.max_file_size {
569 tracing::debug!("SKIP: file too large ({size} bytes): {path_str}");
570 self.stats.files_skipped_size += 1;
571 return Ok(false);
572 }
573
574 let file = match File::open(path) {
575 Ok(f) => f,
576 Err(e)
577 if e.kind() == std::io::ErrorKind::NotFound
578 || e.kind() == std::io::ErrorKind::PermissionDenied =>
579 {
580 return Ok(false);
581 }
582 Err(e) => return Err(e.into()),
583 };
584 let mmap = unsafe { Mmap::map(&file)? };
585
586 let raw_data = if self.decompress {
587 if let Some(mut reader) = maybe_decompress(path, &mmap)? {
588 let mut buf = Vec::new();
589 let take_limit = if self.max_file_size > 0 {
590 self.max_file_size
591 } else {
592 100 * 1024 * 1024
593 };
594 reader.by_ref().take(take_limit).read_to_end(&mut buf)?;
595 std::borrow::Cow::Owned(buf)
596 } else {
597 std::borrow::Cow::Borrowed(&mmap[..])
598 }
599 } else {
600 std::borrow::Cow::Borrowed(&mmap[..])
601 };
602
603 let data = &raw_data[..];
604 if is_binary(data) {
605 tracing::debug!("SKIP: binary detection ({size} bytes): {path_str}");
606 self.stats.files_skipped_binary += 1;
607 return Ok(false);
608 }
609
610 let content_hash = xxhash_rust::xxh64::xxh64(data, 0);
611 let pairs = self.extractor.extract_with_offsets(data);
612
613 if pairs.is_empty() && !data.is_empty() {
614 tracing::debug!("SKIP: no trigrams extracted ({size} bytes): {path_str}");
615 }
616
617 let file_id = self.file_count;
618 self.file_count += 1;
619
620 let path_str = path.to_string_lossy();
621 let path_bytes = path_str.as_bytes();
622 let path_off = u32::try_from(
623 Self::get_writer(&mut self.strings_writer, "strings_writer")?.stream_position()?,
624 )
625 .unwrap_or(0);
626 let path_len = u16::try_from(path_bytes.len()).unwrap_or(0);
627
628 self.strings_writer
629 .as_mut()
630 .ok_or_else(|| Error::Io(std::io::Error::other("strings_writer not initialized")))?
631 .write_all(&0u16.to_le_bytes())?;
632 self.strings_writer
633 .as_mut()
634 .ok_or_else(|| Error::Io(std::io::Error::other("strings_writer not initialized")))?
635 .write_all(&path_len.to_le_bytes())?;
636 self.strings_writer
637 .as_mut()
638 .ok_or_else(|| Error::Io(std::io::Error::other("strings_writer not initialized")))?
639 .write_all(path_bytes)?;
640
641 let mut bloom = BloomFilter::new(256, 5);
642 let mut trigram_count = 0u32;
643
644 let mut i = 0;
645 while i < pairs.len() {
646 let tri = pairs[i].0;
647 let mut j = i + 1;
648 while j < pairs.len() && pairs[j].0 == tri {
649 j += 1;
650 }
651
652 let take_count = (j - i).min(10_000);
653 let offsets: Vec<u32> = pairs[i..i + take_count].iter().map(|p| p.1).collect();
654
655 bloom.insert(tri);
656 self.postings
657 .entry(tri)
658 .or_default()
659 .push(PostingEntry { file_id, offsets });
660 self.postings_count += take_count + 8;
661
662 trigram_count += 1;
663 i = j;
664 }
665
666 bloom.serialize(
667 self.blooms_writer
668 .as_mut()
669 .ok_or_else(|| Error::Io(std::io::Error::other("blooms_writer not initialized")))?,
670 )?;
671
672 let bloom_offset = u64::from(file_id)
673 .checked_mul(260)
674 .ok_or_else(|| Error::Config("file_id * 260 overflow".into()))?;
675 let bloom_offset_u32 = u32::try_from(bloom_offset)
676 .map_err(|_| Error::Config(format!("bloom_offset {bloom_offset} exceeds u32::MAX")))?;
677 self.files_writer
678 .as_mut()
679 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
680 .write_all(&file_id.to_le_bytes())?;
681 self.files_writer
682 .as_mut()
683 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
684 .write_all(&path_off.to_le_bytes())?;
685 self.files_writer
686 .as_mut()
687 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
688 .write_all(&path_len.to_le_bytes())?;
689 self.files_writer
690 .as_mut()
691 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
692 .write_all(&[FileStatus::Fresh as u8])?;
693 self.files_writer
694 .as_mut()
695 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
696 .write_all(&[0u8])?;
697 self.files_writer
698 .as_mut()
699 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
700 .write_all(&mtime.to_le_bytes())?;
701 self.files_writer
702 .as_mut()
703 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
704 .write_all(&size.to_le_bytes())?;
705 self.files_writer
706 .as_mut()
707 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
708 .write_all(&content_hash.to_le_bytes())?;
709 self.files_writer
710 .as_mut()
711 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
712 .write_all(&trigram_count.to_le_bytes())?;
713 self.files_writer
714 .as_mut()
715 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
716 .write_all(&bloom_offset_u32.to_le_bytes())?;
717 self.files_writer
718 .as_mut()
719 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
720 .write_all(&[0u8; 4])?;
721
722 self.stats.files_scanned += 1;
723 self.stats.bytes_scanned += size;
724
725 if std::env::var("IX_DEBUG_BUILD").is_ok() {
726 eprintln!(
727 "IX-INDEXED: file_id={file_id} unique_trigrams={trigram_count} size={size}: {path_str}"
728 );
729 }
730
731 if self.postings_count >= 500_000 {
734 if std::env::var("IX_DEBUG_BUILD").is_ok() {
735 eprintln!(
736 "IX-FLUSH: postings_count={} after file_id={file_id}",
737 self.postings_count
738 );
739 }
740 self.flush_run()?;
741 }
742
743 Ok(true)
744 }
745
746 fn serialize(&mut self) -> Result<PathBuf> {
747 self.flush_run()?;
748
749 self.files_writer
750 .as_mut()
751 .ok_or_else(|| Error::Io(std::io::Error::other("files_writer not initialized")))?
752 .flush()?;
753 self.blooms_writer
754 .as_mut()
755 .ok_or_else(|| Error::Io(std::io::Error::other("blooms_writer not initialized")))?
756 .flush()?;
757 self.strings_writer
758 .as_mut()
759 .ok_or_else(|| Error::Io(std::io::Error::other("strings_writer not initialized")))?
760 .flush()?;
761
762 while self.temp_runs.len() > 128 {
764 let mut next_generation = Vec::new();
765 for chunk in self.temp_runs.chunks(128) {
766 let out_path = self.ix_dir.join(format!(
767 "shard.ix.merged.{}.{}",
768 next_generation.len(),
769 SystemTime::now()
770 .duration_since(UNIX_EPOCH)
771 .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
772 .as_micros()
773 ));
774 Self::merge_to_run(chunk, &out_path)?;
775 next_generation.push(out_path);
776 for p in chunk {
777 if let Err(e) = fs::remove_file(p) {
778 tracing::warn!("Failed to cleanup temp run {}: {}", p.display(), e);
779 }
780 }
781 }
782 self.temp_runs = next_generation;
783 }
784
785 let tmp_path = self.ix_dir.join("shard.ix.tmp");
786 let final_path = self.ix_dir.join("shard.ix");
787
788 let mut f = BufWriter::new(File::create(&tmp_path)?);
789 f.write_all(&[0u8; HEADER_SIZE])?;
790
791 let file_table_offset = Self::align_to_8(&mut f)?;
792 let mut files_reader = File::open(self.ix_dir.join("shard.ix.tmp.files"))?;
793 std::io::copy(&mut files_reader, &mut f)?;
794 let file_table_size = f.stream_position()? - file_table_offset;
795
796 Self::align_to_8(&mut f)?;
797 let posting_data_offset = f.stream_position()?;
798
799 let mut cdx_entries: Vec<(Trigram, u64, u32, u32)> = Vec::new();
800 let mut global_trigram_count = 0u32;
801
802 let mut runs = Vec::new();
803 for path in &self.temp_runs {
804 runs.push(RunIterator::new(path)?);
805 }
806
807 let mut heap = BinaryHeap::new();
808 let mut current_items = vec![None; runs.len()];
809
810 for (i, run) in runs.iter_mut().enumerate() {
811 if let Some(item) = run.next_trigram()? {
812 heap.push(MergeItem {
813 tri: item.0,
814 run_idx: i,
815 });
816 current_items[i] = Some(item);
817 }
818 }
819
820 let mut current_tri: Option<Trigram> = None;
821 let mut merged_entries: Vec<PostingEntry> = Vec::new();
822
823 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
824 if Some(tri) != current_tri {
825 if let Some(t) = current_tri {
826 let cdx_entry = Self::write_merged_posting(
827 &mut f,
828 t,
829 posting_data_offset,
830 &mut merged_entries,
831 )?;
832 cdx_entries.push(cdx_entry);
833 global_trigram_count += 1;
834 merged_entries.clear();
835 }
836 current_tri = Some(tri);
837 }
838
839 let item = current_items
840 .get_mut(run_idx)
841 .ok_or(Error::Config("run_idx out of bounds".into()))?
842 .take()
843 .ok_or(Error::Config("current item is None".into()))?;
844 merged_entries.extend(item.1);
845
846 if let Some(next_item) = runs
847 .get_mut(run_idx)
848 .ok_or(Error::Config("run_idx out of bounds".into()))?
849 .next_trigram()?
850 {
851 heap.push(MergeItem {
852 tri: next_item.0,
853 run_idx,
854 });
855 *current_items
856 .get_mut(run_idx)
857 .ok_or(Error::Config("run_idx out of bounds".into()))? = Some(next_item);
858 }
859 }
860
861 if let Some(t) = current_tri {
862 let cdx_entry =
863 Self::write_merged_posting(&mut f, t, posting_data_offset, &mut merged_entries)?;
864 cdx_entries.push(cdx_entry);
865 global_trigram_count += 1;
866 }
867
868 self.stats.unique_trigrams = u64::from(global_trigram_count);
869 let posting_data_size = f.stream_position()? - posting_data_offset;
870
871 Self::align_to_8(&mut f)?;
872 let trigram_table_offset = f.stream_position()?;
873
874 let cdx_block_index = Self::write_cdx_blocks(&mut f, &cdx_entries)?;
876 let trigram_table_size = f.stream_position()? - trigram_table_offset;
877
878 Self::align_to_8(&mut f)?;
879 let cdx_block_index_offset = f.stream_position()?;
880 for entry in &cdx_block_index {
881 f.write_all(&entry.0.to_le_bytes())?;
882 f.write_all(&entry.1.to_le_bytes())?;
883 }
884 f.write_all(&u32::MAX.to_le_bytes())?;
886 f.write_all(&(trigram_table_offset + trigram_table_size).to_le_bytes())?;
887 let cdx_block_index_size = f.stream_position()? - cdx_block_index_offset;
888
889 Self::align_to_8(&mut f)?;
890 let bloom_offset = f.stream_position()?;
891 let mut blooms_reader = File::open(self.ix_dir.join("shard.ix.tmp.blooms"))?;
892 std::io::copy(&mut blooms_reader, &mut f)?;
893 let bloom_size = f.stream_position()? - bloom_offset;
894
895 Self::align_to_8(&mut f)?;
896 let string_pool_offset = f.stream_position()?;
897 let mut strings_reader = File::open(self.ix_dir.join("shard.ix.tmp.strings"))?;
898 std::io::copy(&mut strings_reader, &mut f)?;
899 let string_pool_size = f.stream_position()? - string_pool_offset;
900
901 let name_index_offset = f.stream_position()?;
902 let name_index_size = 0u64;
903
904 let created_at = u64::try_from(
905 SystemTime::now()
906 .duration_since(UNIX_EPOCH)
907 .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
908 .as_micros(),
909 )
910 .unwrap_or(0);
911 let mut header_bytes = [0u8; HEADER_SIZE];
912 header_bytes[0..4].copy_from_slice(&MAGIC);
913 header_bytes[0x04..0x06].copy_from_slice(&VERSION_MAJOR.to_le_bytes());
914 header_bytes[0x06..0x08].copy_from_slice(&VERSION_MINOR.to_le_bytes());
915 header_bytes[0x08..0x10].copy_from_slice(
916 &(flags::HAS_BLOOM_FILTERS
917 | flags::HAS_CONTENT_HASHES
918 | flags::POSTING_LISTS_CHECKSUMMED
919 | flags::HAS_CDX_INDEX)
920 .to_le_bytes(),
921 );
922 header_bytes[0x10..0x18].copy_from_slice(&created_at.to_le_bytes());
923 header_bytes[0x18..0x20].copy_from_slice(&self.stats.bytes_scanned.to_le_bytes());
924 header_bytes[0x20..0x24].copy_from_slice(&self.file_count.to_le_bytes());
925 header_bytes[0x24..0x28].copy_from_slice(&(global_trigram_count).to_le_bytes());
926 header_bytes[0x28..0x30].copy_from_slice(&file_table_offset.to_le_bytes());
927 header_bytes[0x30..0x38].copy_from_slice(&file_table_size.to_le_bytes());
928 header_bytes[0x38..0x40].copy_from_slice(&trigram_table_offset.to_le_bytes());
929 header_bytes[0x40..0x48].copy_from_slice(&trigram_table_size.to_le_bytes());
930 header_bytes[0x48..0x50].copy_from_slice(&posting_data_offset.to_le_bytes());
931 header_bytes[0x50..0x58].copy_from_slice(&posting_data_size.to_le_bytes());
932 header_bytes[0x58..0x60].copy_from_slice(&bloom_offset.to_le_bytes());
933 header_bytes[0x60..0x68].copy_from_slice(&bloom_size.to_le_bytes());
934 header_bytes[0x68..0x70].copy_from_slice(&string_pool_offset.to_le_bytes());
935 header_bytes[0x70..0x78].copy_from_slice(&string_pool_size.to_le_bytes());
936 header_bytes[0x78..0x80].copy_from_slice(&name_index_offset.to_le_bytes());
937 header_bytes[0x80..0x88].copy_from_slice(&name_index_size.to_le_bytes());
938 header_bytes[0x88..0x90].copy_from_slice(&cdx_block_index_offset.to_le_bytes());
939 header_bytes[0x90..0x98].copy_from_slice(&cdx_block_index_size.to_le_bytes());
940
941 let crc = crc32c::crc32c(&header_bytes[0..0xF8]);
942 header_bytes[0xF8..0xFC].copy_from_slice(&crc.to_le_bytes());
943
944 f.seek(SeekFrom::Start(0))?;
945 f.write_all(&header_bytes)?;
946 f.flush()?;
947 drop(f);
948
949 let backup_path = self.ix_dir.join("shard.ix.bak");
951 let old_index_exists = final_path.exists();
952 if old_index_exists {
953 let _ = fs::remove_file(&backup_path);
955 if let Err(e) = fs::rename(&final_path, &backup_path) {
957 tracing::warn!("Failed to backup existing index: {}", e);
958 }
959 }
960 fs::rename(&tmp_path, &final_path)?;
961 self.committed = true;
962 if old_index_exists {
964 let _ = fs::remove_file(&backup_path);
965 }
966
967 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.files"));
968 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.blooms"));
969 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.strings"));
970 for path in &self.temp_runs {
971 if let Err(e) = fs::remove_file(path) {
972 tracing::warn!("Failed to cleanup temp run {}: {}", path.display(), e);
973 }
974 }
975 self.temp_runs.clear();
976
977 Ok(final_path)
978 }
979
980 fn merge_to_run(run_paths: &[PathBuf], out_path: &Path) -> Result<()> {
981 let mut runs = Vec::new();
982 for path in run_paths {
983 runs.push(RunIterator::new(path)?);
984 }
985 let mut heap = BinaryHeap::new();
986 let mut current_items = vec![None; runs.len()];
987 for (i, run) in runs.iter_mut().enumerate() {
988 if let Some(item) = run.next_trigram()? {
989 heap.push(MergeItem {
990 tri: item.0,
991 run_idx: i,
992 });
993 current_items[i] = Some(item);
994 }
995 }
996 let mut out = BufWriter::new(File::create(out_path)?);
997 let mut current_tri: Option<Trigram> = None;
998 let mut merged_entries: Vec<PostingEntry> = Vec::new();
999 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
1000 if Some(tri) != current_tri {
1001 if let Some(t) = current_tri {
1002 Self::write_run_entry(&mut out, t, &mut merged_entries)?;
1003 merged_entries.clear();
1004 }
1005 current_tri = Some(tri);
1006 }
1007 let item = current_items[run_idx].take().ok_or_else(|| {
1008 Error::Config(format!(
1009 "merge invariant broken: no item at run index {run_idx}"
1010 ))
1011 })?;
1012 merged_entries.extend(item.1);
1013 if let Some(next_item) = runs[run_idx].next_trigram()? {
1014 heap.push(MergeItem {
1015 tri: next_item.0,
1016 run_idx,
1017 });
1018 current_items[run_idx] = Some(next_item);
1019 }
1020 }
1021 if let Some(t) = current_tri {
1022 Self::write_run_entry(&mut out, t, &mut merged_entries)?;
1023 }
1024 out.flush()?;
1025 Ok(())
1026 }
1027
1028 fn write_run_entry<W: Write>(
1029 w: &mut W,
1030 tri: Trigram,
1031 entries: &mut [PostingEntry],
1032 ) -> Result<()> {
1033 entries.sort_by_key(|e| e.file_id);
1034 w.write_all(&tri.to_le_bytes())?;
1035 w.write_all(&(entries.len() as u32).to_le_bytes())?;
1036 for entry in entries {
1037 w.write_all(&entry.file_id.to_le_bytes())?;
1038 w.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
1039 for off in &entry.offsets {
1040 w.write_all(&off.to_le_bytes())?;
1041 }
1042 }
1043 Ok(())
1044 }
1045
1046 fn write_merged_posting<W: Write + Seek>(
1047 f: &mut W,
1048 tri: Trigram,
1049 base_off: u64,
1050 entries: &mut [PostingEntry],
1051 ) -> Result<(Trigram, u64, u32, u32)> {
1052 entries.sort_by_key(|e| e.file_id);
1053 let count = entries.len() as u32;
1054 let list = PostingList {
1055 entries: entries.to_vec(),
1056 };
1057 let encoded = list.encode()?;
1058 let offset = f.stream_position()? - base_off;
1059 f.write_all(&encoded)?;
1060 let abs_off = base_off + offset;
1061 Ok((tri, abs_off, encoded.len() as u32, count))
1062 }
1063
1064 fn write_cdx_blocks<W: Write + Seek>(
1065 f: &mut W,
1066 cdx_entries: &[(Trigram, u64, u32, u32)],
1067 ) -> Result<Vec<(u32, u64)>> {
1068 let mut block_index = Vec::new();
1069 for chunk in cdx_entries.chunks(crate::format::CDX_BLOCK_SIZE) {
1070 let first_key = chunk[0].0;
1071 let block_offset = f.stream_position()?;
1072 block_index.push((first_key, block_offset));
1073
1074 let mut buf = Vec::new();
1075 varint::encode(u64::try_from(chunk.len()).unwrap_or(0), &mut buf);
1076 let mut last_key = 0u32;
1077 for &(tri, posting_offset, posting_length, doc_frequency) in chunk {
1078 varint::encode(u64::from(tri - last_key), &mut buf);
1079 last_key = tri;
1080 varint::encode(posting_offset, &mut buf);
1081 varint::encode(u64::from(posting_length), &mut buf);
1082 varint::encode(u64::from(doc_frequency), &mut buf);
1083 }
1084
1085 let compressed = zstd::encode_all(&buf[..], PostingList::ZSTD_COMPRESSION_LEVEL)
1086 .map_err(|e| Error::Config(format!("cdx zstd encode: {e}")))?;
1087 f.write_all(&compressed)?;
1088 }
1089 Ok(block_index)
1090 }
1091
1092 fn align_to_8<W: Write + Seek>(mut w: W) -> std::io::Result<u64> {
1093 let pos = w.stream_position()?;
1094 let padding = (8 - (pos % 8)) % 8;
1095 if padding > 0 {
1096 w.write_all(&vec![0u8; padding as usize])?;
1097 }
1098 w.stream_position()
1099 }
1100}
1101
1102impl Drop for Builder {
1103 fn drop(&mut self) {
1104 if !self.committed {
1105 self.cleanup_temp_files();
1106 }
1107 }
1108}