1use crate::bloom::BloomFilter;
8use crate::decompress::maybe_decompress;
9use crate::error::{Error, Result};
10use crate::format::{
11 FileStatus, HEADER_SIZE, MAGIC, VERSION_MAJOR, VERSION_MINOR, flags, is_binary,
12};
13use crate::posting::{PostingEntry, PostingList};
14use crate::trigram::{Extractor, Trigram};
15use crate::varint;
16use ignore::WalkBuilder;
17use libc;
18use llmosafe::ResourceGuard;
19use memmap2::Mmap;
20use std::collections::{BinaryHeap, HashMap};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24use std::time::{Instant, SystemTime, UNIX_EPOCH};
25
26pub struct Builder {
29 root: PathBuf,
30 ix_dir: PathBuf,
31 file_count: u32,
32
33 files_writer: BufWriter<File>,
35 blooms_writer: BufWriter<File>,
36 strings_writer: BufWriter<File>,
37
38 postings: HashMap<Trigram, Vec<PostingEntry>>,
40 postings_count: usize,
41 temp_runs: Vec<PathBuf>,
42
43 extractor: Extractor,
44 stats: BuildStats,
45 decompress: bool,
46 resource_guard: Option<ResourceGuard>,
47 dead_ends: Vec<PathBuf>,
48 max_file_size: u64,
49 committed: bool,
50}
51
52#[derive(Default, Debug)]
54pub struct BuildStats {
55 pub files_scanned: u64,
57 pub files_skipped_binary: u64,
59 pub files_skipped_size: u64,
61 pub bytes_scanned: u64,
63 pub unique_trigrams: u64,
65}
66
67struct RunIterator {
68 file: BufReader<File>,
69}
70
71impl RunIterator {
72 fn new(path: &Path) -> Result<Self> {
73 let f = File::open(path)?;
74 Ok(Self {
75 file: BufReader::new(f),
76 })
77 }
78
79 fn next_trigram(&mut self) -> Result<Option<(Trigram, Vec<PostingEntry>)>> {
80 let mut tri_buf = [0u8; 4];
81 if let Err(e) = self.file.read_exact(&mut tri_buf) {
82 if e.kind() == std::io::ErrorKind::UnexpectedEof {
83 return Ok(None);
84 }
85 return Err(e.into());
86 }
87 let tri = u32::from_le_bytes(tri_buf);
88
89 let mut len_buf = [0u8; 4];
90 self.file.read_exact(&mut len_buf)?;
91 let entries_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
92
93 let mut entries = Vec::with_capacity(entries_len);
94 for _ in 0..entries_len {
95 self.file.read_exact(&mut len_buf)?;
96 let file_id = u32::from_le_bytes(len_buf);
97
98 self.file.read_exact(&mut len_buf)?;
99 let offsets_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
100
101 let mut offsets = Vec::with_capacity(offsets_len);
102 for _ in 0..offsets_len {
103 self.file.read_exact(&mut len_buf)?;
104 offsets.push(u32::from_le_bytes(len_buf));
105 }
106 entries.push(PostingEntry { file_id, offsets });
107 }
108
109 Ok(Some((tri, entries)))
110 }
111}
112
113#[derive(Eq, PartialEq)]
114struct MergeItem {
115 tri: Trigram,
116 run_idx: usize,
117}
118
119impl PartialOrd for MergeItem {
120 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
121 Some(self.cmp(other))
122 }
123}
124
125impl Ord for MergeItem {
126 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
127 other.tri.cmp(&self.tri) }
129}
130
131#[allow(clippy::as_conversions)] #[allow(clippy::indexing_slicing)] impl Builder {
134 pub fn new(root: &Path) -> Result<Self> {
141 let ix_dir = root.join(".ix");
142 fs::create_dir_all(&ix_dir)?;
143
144 let files_tmp = ix_dir.join("shard.ix.tmp.files");
145 let blooms_tmp = ix_dir.join("shard.ix.tmp.blooms");
146 let strings_tmp = ix_dir.join("shard.ix.tmp.strings");
147
148 let files_writer = BufWriter::new(File::create(&files_tmp)?);
149 let blooms_writer = BufWriter::new(File::create(&blooms_tmp)?);
150 let mut strings_writer = BufWriter::new(File::create(&strings_tmp)?);
151
152 strings_writer.write_all(&1u32.to_le_bytes())?;
153 strings_writer.write_all(&0u16.to_le_bytes())?;
154 strings_writer.write_all(&0u16.to_le_bytes())?;
155 strings_writer.write_all(&[0u8; 2])?;
156
157 Ok(Self {
158 root: root.to_owned(),
159 ix_dir,
160 file_count: 0,
161 files_writer,
162 blooms_writer,
163 strings_writer,
164 postings: HashMap::new(),
165 postings_count: 0,
166 temp_runs: Vec::new(),
167 extractor: Extractor::new(),
168 stats: BuildStats::default(),
169 decompress: false,
170 resource_guard: None,
171 dead_ends: Vec::new(),
172 max_file_size: 100 * 1024 * 1024,
173 committed: false,
174 })
175 }
176
177 #[must_use]
179 pub const fn with_resource_guard(mut self, guard: ResourceGuard) -> Self {
180 self.resource_guard = Some(guard);
181 self
182 }
183
184 pub const fn set_decompress(&mut self, decompress: bool) {
187 self.decompress = decompress;
188 }
189
190 pub const fn set_max_file_size(&mut self, max_bytes: u64) {
192 self.max_file_size = max_bytes;
193 }
194
195 fn flush_run(&mut self) -> Result<()> {
196 if self.postings.is_empty() {
197 return Ok(());
198 }
199 let old_postings = std::mem::take(&mut self.postings);
200 let mut sorted: Vec<_> = old_postings.into_iter().collect();
201 sorted.sort_unstable_by_key(|(t, _)| *t);
202
203 let run_path = self
204 .ix_dir
205 .join(format!("shard.ix.run.{}", self.temp_runs.len()));
206 let mut f = BufWriter::new(File::create(&run_path)?);
207
208 for (tri, entries) in sorted {
209 f.write_all(&tri.to_le_bytes())?;
210 f.write_all(&u32::try_from(entries.len()).unwrap_or(0).to_le_bytes())?;
211 for entry in entries {
212 f.write_all(&entry.file_id.to_le_bytes())?;
213 f.write_all(
214 &u32::try_from(entry.offsets.len())
215 .unwrap_or(0)
216 .to_le_bytes(),
217 )?;
218 for off in entry.offsets {
219 f.write_all(&off.to_le_bytes())?;
220 }
221 }
222 }
223 f.flush()?;
224
225 self.temp_runs.push(run_path);
226 self.postings_count = 0;
227 Ok(())
228 }
229
230 pub fn build(&mut self) -> Result<PathBuf> {
236 if let Ok(free) = Self::free_bytes_at(&self.ix_dir) {
240 let existing_shard_size = fs::metadata(self.ix_dir.join("shard.ix"))
241 .map_or(0, |m| m.len());
242 let required = if existing_shard_size > 0 {
243 existing_shard_size.saturating_mul(3)
244 } else {
245 200 * 1024 * 1024 };
247 if free < required {
248 return Err(Error::Io(std::io::Error::other(format!(
249 "insufficient disk space: {} MB free, need ≥{} MB (path: {})",
250 free / 1024 / 1024,
251 required / 1024 / 1024,
252 self.ix_dir.display(),
253 ))));
254 }
255 }
256
257 if self.ix_dir.exists()
259 && let Ok(entries) = std::fs::read_dir(&self.ix_dir)
260 {
261 for entry in entries.flatten() {
262 let name = entry.file_name();
263 let name_str = name.to_string_lossy();
264 if (name_str.starts_with("shard.ix.run.")
265 || name_str.starts_with("shard.ix.merged."))
266 && let Err(e) = std::fs::remove_file(entry.path())
267 {
268 tracing::warn!("Failed to cleanup shard file {}: {}", name_str, e);
269 }
270 }
271 }
272
273 let start = Instant::now();
274 let root = self.root.clone();
275
276 if root.to_string_lossy() == "/" {
278 tracing::warn!(
279 "LLMOSafe Advisory: Indexing root filesystem. Ensure adequate resource guards are in place."
280 );
281 }
282
283 let walker = WalkBuilder::new(&root)
284 .hidden(false)
285 .git_ignore(true)
286 .git_global(true)
287 .git_exclude(true)
288 .require_git(false)
289 .add_custom_ignore_filename(".ixignore")
290 .filter_entry(move |entry| {
291 let path = entry.path();
292 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
293
294 if entry.file_type().is_some_and(|t| t.is_dir())
295 && (name == "lost+found" || name == ".git" || name == ".ix")
296 {
297 return false;
298 }
299
300 if entry.file_type().is_some_and(|t| t.is_file())
301 && (name == "shard.ix"
302 || name == "shard.ix.tmp"
303 || name.starts_with("shard.ix."))
304 {
305 return false;
306 }
307
308 if entry.file_type().is_some_and(|t| t.is_file()) {
309 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
310 match ext {
311 "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" | "jpg" | "png"
312 | "gif" | "mp4" | "mp3" | "pdf" | "zip" | "7z" | "rar" | "sqlite"
313 | "db" | "bin" => return false,
314 _ => {}
315 }
316 if name.ends_with(".tar.gz") {
317 return false;
318 }
319 }
320 true
321 })
322 .build();
323
324 let mut files_processed = 0u64;
325 for entry_res in walker {
326 let entry = match entry_res {
327 Ok(e) => e,
328 Err(e) => {
329 let backtrack_path = match &e {
331 ignore::Error::Io(io_err) if io_err.raw_os_error() == Some(-7) => {
332 Some(None)
333 }
334 ignore::Error::WithPath { path, err } => {
335 if let ignore::Error::Io(io_err) = err.as_ref() {
336 if io_err.raw_os_error() == Some(-7) {
337 Some(Some(path.clone()))
338 } else {
339 None
340 }
341 } else {
342 None
343 }
344 }
345 _ => None,
346 };
347
348 if let Some(path_opt) = backtrack_path {
349 tracing::warn!(
350 "Immune Memory Triggered: Skipping path due to backtrack signal."
351 );
352 if let Some(path) = path_opt {
353 self.dead_ends.push(path);
354 }
355 }
356 continue;
357 }
358 };
359
360 if entry.file_type().is_some_and(|t| t.is_file()) {
361 self.process_file(entry.path())?;
362 files_processed += 1;
363
364 #[allow(clippy::manual_is_multiple_of)]
367 if files_processed % 250 == 0 {
368 if let Some(guard) = &self.resource_guard {
369 if let Err(_err) = guard.check() {
370 eprintln!(
371 "ixd: memory ceiling reached... flushing intermediate chunk ({files_processed} files processed)"
372 );
373 self.flush_run()?;
374 }
375 } else {
376 if let Ok(rss) = Self::current_rss_bytes()
378 && rss > 512 * 1024 * 1024
379 {
380 eprintln!(
381 "ixd: RSS ceiling reached ({} MB) after {} files — flushing intermediate chunk",
382 rss / 1024 / 1024,
383 files_processed
384 );
385 self.flush_run()?;
386 }
387 }
388 }
389 }
390 }
391
392 let output_path = self.serialize()?;
393 tracing::info!("Build completed in {:?}: {:?}", start.elapsed(), self.stats);
394 Ok(output_path)
395 }
396
397 pub fn update(&mut self, _changed_files: &[PathBuf]) -> Result<PathBuf> {
403 self.build()
404 }
405
406 #[must_use]
408 pub const fn files_len(&self) -> usize {
409 self.file_count as usize
410 }
411
412 #[must_use]
414 pub const fn trigrams_len(&self) -> usize {
415 self.stats.unique_trigrams as usize
416 }
417
418 fn current_rss_bytes() -> std::io::Result<u64> {
420 let status = std::fs::read_to_string("/proc/self/status")?;
421 for line in status.lines() {
422 if let Some(rest) = line.strip_prefix("VmRSS:") {
423 let kb: u64 = rest
424 .split_whitespace()
425 .next()
426 .and_then(|s| s.parse().ok())
427 .unwrap_or(0);
428 return Ok(kb * 1024);
429 }
430 }
431 Ok(0)
432 }
433
434 fn free_bytes_at(path: &Path) -> std::io::Result<u64> {
436 use std::os::unix::ffi::OsStrExt;
437 let path_c = std::ffi::CString::new(path.as_os_str().as_bytes())
438 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
439 let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
440 let ret = unsafe { libc::statvfs(path_c.as_ptr(), &raw mut stat) };
441 if ret != 0 {
442 return Err(std::io::Error::last_os_error());
443 }
444 #[allow(clippy::unnecessary_cast)]
445 Ok(stat.f_bavail as u64 * stat.f_frsize as u64)
446 }
447
448 fn cleanup_temp_files(&self) {
449 let paths = [
450 self.ix_dir.join("shard.ix.tmp.files"),
451 self.ix_dir.join("shard.ix.tmp.blooms"),
452 self.ix_dir.join("shard.ix.tmp.strings"),
453 self.ix_dir.join("shard.ix.tmp"),
454 self.ix_dir.join("shard.ix.bak"),
455 ];
456 for p in &paths {
457 if let Err(e) = fs::remove_file(p)
458 && e.kind() != std::io::ErrorKind::NotFound
459 {
460 tracing::warn!("Failed to cleanup temp file {}: {}", p.display(), e);
461 }
462 }
463 for run_path in &self.temp_runs {
464 if let Err(e) = fs::remove_file(run_path)
465 && e.kind() != std::io::ErrorKind::NotFound
466 {
467 tracing::warn!(
468 "Failed to cleanup temp run {}: {}",
469 run_path.display(),
470 e
471 );
472 }
473 }
474 }
475
476 fn process_file(&mut self, path: &Path) -> Result<bool> {
477 let path_str = path.display().to_string();
478 let metadata = match fs::metadata(path) {
480 Ok(m) => m,
481 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
482 Err(e) => return Err(e.into()),
483 };
484 let size = metadata.len();
485 let mtime = metadata
486 .modified()?
487 .duration_since(UNIX_EPOCH)
488 .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(0));
489
490 if self.max_file_size > 0 && size > self.max_file_size {
491 tracing::debug!("SKIP: file too large ({size} bytes): {path_str}");
492 self.stats.files_skipped_size += 1;
493 return Ok(false);
494 }
495
496 let file = match File::open(path) {
497 Ok(f) => f,
498 Err(e)
499 if e.kind() == std::io::ErrorKind::NotFound
500 || e.kind() == std::io::ErrorKind::PermissionDenied =>
501 {
502 return Ok(false);
503 }
504 Err(e) => return Err(e.into()),
505 };
506 let mmap = unsafe { Mmap::map(&file)? };
507
508 let raw_data = if self.decompress {
509 if let Some(mut reader) = maybe_decompress(path, &mmap)? {
510 let mut buf = Vec::new();
511 let take_limit = if self.max_file_size > 0 {
512 self.max_file_size
513 } else {
514 100 * 1024 * 1024
515 };
516 reader.by_ref().take(take_limit).read_to_end(&mut buf)?;
517 std::borrow::Cow::Owned(buf)
518 } else {
519 std::borrow::Cow::Borrowed(&mmap[..])
520 }
521 } else {
522 std::borrow::Cow::Borrowed(&mmap[..])
523 };
524
525 let data = &raw_data[..];
526 if is_binary(data) {
527 tracing::debug!("SKIP: binary detection ({size} bytes): {path_str}");
528 self.stats.files_skipped_binary += 1;
529 return Ok(false);
530 }
531
532 let content_hash = xxhash_rust::xxh64::xxh64(data, 0);
533 let pairs = self.extractor.extract_with_offsets(data);
534
535 if pairs.is_empty() && !data.is_empty() {
536 tracing::debug!("SKIP: no trigrams extracted ({size} bytes): {path_str}");
537 }
538
539 let file_id = self.file_count;
540 self.file_count += 1;
541
542 let path_str = path.to_string_lossy();
543 let path_bytes = path_str.as_bytes();
544 let path_off = u32::try_from(self.strings_writer.stream_position()?).unwrap_or(0);
545 let path_len = u16::try_from(path_bytes.len()).unwrap_or(0);
546
547 self.strings_writer.write_all(&0u16.to_le_bytes())?;
548 self.strings_writer.write_all(&path_len.to_le_bytes())?;
549 self.strings_writer.write_all(path_bytes)?;
550
551 let mut bloom = BloomFilter::new(256, 5);
552 let mut trigram_count = 0u32;
553
554 let mut i = 0;
555 while i < pairs.len() {
556 let tri = pairs[i].0;
557 let mut j = i + 1;
558 while j < pairs.len() && pairs[j].0 == tri {
559 j += 1;
560 }
561
562 let take_count = (j - i).min(10_000);
563 let offsets: Vec<u32> = pairs[i..i + take_count].iter().map(|p| p.1).collect();
564
565 bloom.insert(tri);
566 self.postings
567 .entry(tri)
568 .or_default()
569 .push(PostingEntry { file_id, offsets });
570 self.postings_count += take_count + 8;
571
572 trigram_count += 1;
573 i = j;
574 }
575
576 bloom.serialize(&mut self.blooms_writer)?;
577
578 let bloom_offset = u64::from(file_id)
579 .checked_mul(260)
580 .ok_or_else(|| Error::Config("file_id * 260 overflow".into()))?;
581 let bloom_offset_u32 = u32::try_from(bloom_offset)
582 .map_err(|_| Error::Config(format!("bloom_offset {bloom_offset} exceeds u32::MAX")))?;
583 self.files_writer.write_all(&file_id.to_le_bytes())?;
584 self.files_writer.write_all(&path_off.to_le_bytes())?;
585 self.files_writer.write_all(&path_len.to_le_bytes())?;
586 self.files_writer.write_all(&[FileStatus::Fresh as u8])?;
587 self.files_writer.write_all(&[0u8])?;
588 self.files_writer.write_all(&mtime.to_le_bytes())?;
589 self.files_writer.write_all(&size.to_le_bytes())?;
590 self.files_writer.write_all(&content_hash.to_le_bytes())?;
591 self.files_writer.write_all(&trigram_count.to_le_bytes())?;
592 self.files_writer
593 .write_all(&bloom_offset_u32.to_le_bytes())?;
594 self.files_writer.write_all(&[0u8; 4])?;
595
596 self.stats.files_scanned += 1;
597 self.stats.bytes_scanned += size;
598
599 if std::env::var("IX_DEBUG_BUILD").is_ok() {
600 eprintln!(
601 "IX-INDEXED: file_id={file_id} unique_trigrams={trigram_count} size={size}: {path_str}"
602 );
603 }
604
605 if self.postings_count >= 500_000 {
608 if std::env::var("IX_DEBUG_BUILD").is_ok() {
609 eprintln!(
610 "IX-FLUSH: postings_count={} after file_id={file_id}",
611 self.postings_count
612 );
613 }
614 self.flush_run()?;
615 }
616
617 Ok(true)
618 }
619
620 fn serialize(&mut self) -> Result<PathBuf> {
621 self.flush_run()?;
622
623 self.files_writer.flush()?;
624 self.blooms_writer.flush()?;
625 self.strings_writer.flush()?;
626
627 while self.temp_runs.len() > 128 {
629 let mut next_generation = Vec::new();
630 for chunk in self.temp_runs.chunks(128) {
631 let out_path = self.ix_dir.join(format!(
632 "shard.ix.merged.{}.{}",
633 next_generation.len(),
634 SystemTime::now()
635 .duration_since(UNIX_EPOCH)
636 .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
637 .as_micros()
638 ));
639 Self::merge_to_run(chunk, &out_path)?;
640 next_generation.push(out_path);
641 for p in chunk {
642 if let Err(e) = fs::remove_file(p) {
643 tracing::warn!("Failed to cleanup temp run {}: {}", p.display(), e);
644 }
645 }
646 }
647 self.temp_runs = next_generation;
648 }
649
650 let tmp_path = self.ix_dir.join("shard.ix.tmp");
651 let final_path = self.ix_dir.join("shard.ix");
652
653 let mut f = BufWriter::new(File::create(&tmp_path)?);
654 f.write_all(&[0u8; HEADER_SIZE])?;
655
656 let file_table_offset = Self::align_to_8(&mut f)?;
657 let mut files_reader = File::open(self.ix_dir.join("shard.ix.tmp.files"))?;
658 std::io::copy(&mut files_reader, &mut f)?;
659 let file_table_size = f.stream_position()? - file_table_offset;
660
661 Self::align_to_8(&mut f)?;
662 let posting_data_offset = f.stream_position()?;
663
664 let mut cdx_entries: Vec<(Trigram, u64, u32, u32)> = Vec::new();
665 let mut global_trigram_count = 0u32;
666
667 let mut runs = Vec::new();
668 for path in &self.temp_runs {
669 runs.push(RunIterator::new(path)?);
670 }
671
672 let mut heap = BinaryHeap::new();
673 let mut current_items = vec![None; runs.len()];
674
675 for (i, run) in runs.iter_mut().enumerate() {
676 if let Some(item) = run.next_trigram()? {
677 heap.push(MergeItem {
678 tri: item.0,
679 run_idx: i,
680 });
681 current_items[i] = Some(item);
682 }
683 }
684
685 let mut current_tri: Option<Trigram> = None;
686 let mut merged_entries: Vec<PostingEntry> = Vec::new();
687
688 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
689 if Some(tri) != current_tri {
690 if let Some(t) = current_tri {
691 let cdx_entry = Self::write_merged_posting(
692 &mut f,
693 t,
694 posting_data_offset,
695 &mut merged_entries,
696 )?;
697 cdx_entries.push(cdx_entry);
698 global_trigram_count += 1;
699 merged_entries.clear();
700 }
701 current_tri = Some(tri);
702 }
703
704 let item = current_items
705 .get_mut(run_idx)
706 .ok_or(Error::Config("run_idx out of bounds".into()))?
707 .take()
708 .ok_or(Error::Config("current item is None".into()))?;
709 merged_entries.extend(item.1);
710
711 if let Some(next_item) = runs
712 .get_mut(run_idx)
713 .ok_or(Error::Config("run_idx out of bounds".into()))?
714 .next_trigram()?
715 {
716 heap.push(MergeItem {
717 tri: next_item.0,
718 run_idx,
719 });
720 *current_items
721 .get_mut(run_idx)
722 .ok_or(Error::Config("run_idx out of bounds".into()))? = Some(next_item);
723 }
724 }
725
726 if let Some(t) = current_tri {
727 let cdx_entry =
728 Self::write_merged_posting(&mut f, t, posting_data_offset, &mut merged_entries)?;
729 cdx_entries.push(cdx_entry);
730 global_trigram_count += 1;
731 }
732
733 self.stats.unique_trigrams = u64::from(global_trigram_count);
734 let posting_data_size = f.stream_position()? - posting_data_offset;
735
736 Self::align_to_8(&mut f)?;
737 let trigram_table_offset = f.stream_position()?;
738
739 let cdx_block_index = Self::write_cdx_blocks(&mut f, &cdx_entries)?;
741 let trigram_table_size = f.stream_position()? - trigram_table_offset;
742
743 Self::align_to_8(&mut f)?;
744 let cdx_block_index_offset = f.stream_position()?;
745 for entry in &cdx_block_index {
746 f.write_all(&entry.0.to_le_bytes())?;
747 f.write_all(&entry.1.to_le_bytes())?;
748 }
749 f.write_all(&u32::MAX.to_le_bytes())?;
751 f.write_all(&(trigram_table_offset + trigram_table_size).to_le_bytes())?;
752 let cdx_block_index_size = f.stream_position()? - cdx_block_index_offset;
753
754 Self::align_to_8(&mut f)?;
755 let bloom_offset = f.stream_position()?;
756 let mut blooms_reader = File::open(self.ix_dir.join("shard.ix.tmp.blooms"))?;
757 std::io::copy(&mut blooms_reader, &mut f)?;
758 let bloom_size = f.stream_position()? - bloom_offset;
759
760 Self::align_to_8(&mut f)?;
761 let string_pool_offset = f.stream_position()?;
762 let mut strings_reader = File::open(self.ix_dir.join("shard.ix.tmp.strings"))?;
763 std::io::copy(&mut strings_reader, &mut f)?;
764 let string_pool_size = f.stream_position()? - string_pool_offset;
765
766 let name_index_offset = f.stream_position()?;
767 let name_index_size = 0u64;
768
769 let created_at = u64::try_from(
770 SystemTime::now()
771 .duration_since(UNIX_EPOCH)
772 .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
773 .as_micros(),
774 )
775 .unwrap_or(0);
776 let mut header_bytes = [0u8; HEADER_SIZE];
777 header_bytes[0..4].copy_from_slice(&MAGIC);
778 header_bytes[0x04..0x06].copy_from_slice(&VERSION_MAJOR.to_le_bytes());
779 header_bytes[0x06..0x08].copy_from_slice(&VERSION_MINOR.to_le_bytes());
780 header_bytes[0x08..0x10].copy_from_slice(
781 &(flags::HAS_BLOOM_FILTERS
782 | flags::HAS_CONTENT_HASHES
783 | flags::POSTING_LISTS_CHECKSUMMED
784 | flags::HAS_CDX_INDEX)
785 .to_le_bytes(),
786 );
787 header_bytes[0x10..0x18].copy_from_slice(&created_at.to_le_bytes());
788 header_bytes[0x18..0x20].copy_from_slice(&self.stats.bytes_scanned.to_le_bytes());
789 header_bytes[0x20..0x24].copy_from_slice(&self.file_count.to_le_bytes());
790 header_bytes[0x24..0x28].copy_from_slice(&(global_trigram_count).to_le_bytes());
791 header_bytes[0x28..0x30].copy_from_slice(&file_table_offset.to_le_bytes());
792 header_bytes[0x30..0x38].copy_from_slice(&file_table_size.to_le_bytes());
793 header_bytes[0x38..0x40].copy_from_slice(&trigram_table_offset.to_le_bytes());
794 header_bytes[0x40..0x48].copy_from_slice(&trigram_table_size.to_le_bytes());
795 header_bytes[0x48..0x50].copy_from_slice(&posting_data_offset.to_le_bytes());
796 header_bytes[0x50..0x58].copy_from_slice(&posting_data_size.to_le_bytes());
797 header_bytes[0x58..0x60].copy_from_slice(&bloom_offset.to_le_bytes());
798 header_bytes[0x60..0x68].copy_from_slice(&bloom_size.to_le_bytes());
799 header_bytes[0x68..0x70].copy_from_slice(&string_pool_offset.to_le_bytes());
800 header_bytes[0x70..0x78].copy_from_slice(&string_pool_size.to_le_bytes());
801 header_bytes[0x78..0x80].copy_from_slice(&name_index_offset.to_le_bytes());
802 header_bytes[0x80..0x88].copy_from_slice(&name_index_size.to_le_bytes());
803 header_bytes[0x88..0x90].copy_from_slice(&cdx_block_index_offset.to_le_bytes());
804 header_bytes[0x90..0x98].copy_from_slice(&cdx_block_index_size.to_le_bytes());
805
806 let crc = crc32c::crc32c(&header_bytes[0..0xF8]);
807 header_bytes[0xF8..0xFC].copy_from_slice(&crc.to_le_bytes());
808
809 f.seek(SeekFrom::Start(0))?;
810 f.write_all(&header_bytes)?;
811 f.flush()?;
812 drop(f);
813
814 let backup_path = self.ix_dir.join("shard.ix.bak");
816 let old_index_exists = final_path.exists();
817 if old_index_exists {
818 let _ = fs::remove_file(&backup_path);
820 if let Err(e) = fs::rename(&final_path, &backup_path) {
822 tracing::warn!("Failed to backup existing index: {}", e);
823 }
824 }
825 fs::rename(&tmp_path, &final_path)?;
826 self.committed = true;
827 if old_index_exists {
829 let _ = fs::remove_file(&backup_path);
830 }
831
832 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.files"));
833 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.blooms"));
834 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.strings"));
835 for path in &self.temp_runs {
836 if let Err(e) = fs::remove_file(path) {
837 tracing::warn!("Failed to cleanup temp run {}: {}", path.display(), e);
838 }
839 }
840 self.temp_runs.clear();
841
842 Ok(final_path)
843 }
844
845 fn merge_to_run(run_paths: &[PathBuf], out_path: &Path) -> Result<()> {
846 let mut runs = Vec::new();
847 for path in run_paths {
848 runs.push(RunIterator::new(path)?);
849 }
850 let mut heap = BinaryHeap::new();
851 let mut current_items = vec![None; runs.len()];
852 for (i, run) in runs.iter_mut().enumerate() {
853 if let Some(item) = run.next_trigram()? {
854 heap.push(MergeItem {
855 tri: item.0,
856 run_idx: i,
857 });
858 current_items[i] = Some(item);
859 }
860 }
861 let mut out = BufWriter::new(File::create(out_path)?);
862 let mut current_tri: Option<Trigram> = None;
863 let mut merged_entries: Vec<PostingEntry> = Vec::new();
864 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
865 if Some(tri) != current_tri {
866 if let Some(t) = current_tri {
867 Self::write_run_entry(&mut out, t, &mut merged_entries)?;
868 merged_entries.clear();
869 }
870 current_tri = Some(tri);
871 }
872 let item = current_items[run_idx].take().ok_or_else(|| {
873 Error::Config(format!(
874 "merge invariant broken: no item at run index {run_idx}"
875 ))
876 })?;
877 merged_entries.extend(item.1);
878 if let Some(next_item) = runs[run_idx].next_trigram()? {
879 heap.push(MergeItem {
880 tri: next_item.0,
881 run_idx,
882 });
883 current_items[run_idx] = Some(next_item);
884 }
885 }
886 if let Some(t) = current_tri {
887 Self::write_run_entry(&mut out, t, &mut merged_entries)?;
888 }
889 out.flush()?;
890 Ok(())
891 }
892
893 fn write_run_entry<W: Write>(
894 w: &mut W,
895 tri: Trigram,
896 entries: &mut [PostingEntry],
897 ) -> Result<()> {
898 entries.sort_by_key(|e| e.file_id);
899 w.write_all(&tri.to_le_bytes())?;
900 w.write_all(&(entries.len() as u32).to_le_bytes())?;
901 for entry in entries {
902 w.write_all(&entry.file_id.to_le_bytes())?;
903 w.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
904 for off in &entry.offsets {
905 w.write_all(&off.to_le_bytes())?;
906 }
907 }
908 Ok(())
909 }
910
911 fn write_merged_posting<W: Write + Seek>(
912 f: &mut W,
913 tri: Trigram,
914 base_off: u64,
915 entries: &mut [PostingEntry],
916 ) -> Result<(Trigram, u64, u32, u32)> {
917 entries.sort_by_key(|e| e.file_id);
918 let count = entries.len() as u32;
919 let list = PostingList {
920 entries: entries.to_vec(),
921 };
922 let encoded = list.encode()?;
923 let offset = f.stream_position()? - base_off;
924 f.write_all(&encoded)?;
925 let abs_off = base_off + offset;
926 Ok((tri, abs_off, encoded.len() as u32, count))
927 }
928
929 fn write_cdx_blocks<W: Write + Seek>(
930 f: &mut W,
931 cdx_entries: &[(Trigram, u64, u32, u32)],
932 ) -> Result<Vec<(u32, u64)>> {
933 let mut block_index = Vec::new();
934 for chunk in cdx_entries.chunks(crate::format::CDX_BLOCK_SIZE) {
935 let first_key = chunk[0].0;
936 let block_offset = f.stream_position()?;
937 block_index.push((first_key, block_offset));
938
939 let mut buf = Vec::new();
940 varint::encode(u64::try_from(chunk.len()).unwrap_or(0), &mut buf);
941 let mut last_key = 0u32;
942 for &(tri, posting_offset, posting_length, doc_frequency) in chunk {
943 varint::encode(u64::from(tri - last_key), &mut buf);
944 last_key = tri;
945 varint::encode(posting_offset, &mut buf);
946 varint::encode(u64::from(posting_length), &mut buf);
947 varint::encode(u64::from(doc_frequency), &mut buf);
948 }
949
950 let compressed = zstd::encode_all(&buf[..], PostingList::ZSTD_COMPRESSION_LEVEL)
951 .map_err(|e| Error::Config(format!("cdx zstd encode: {e}")))?;
952 f.write_all(&compressed)?;
953 }
954 Ok(block_index)
955 }
956
957 fn align_to_8<W: Write + Seek>(mut w: W) -> std::io::Result<u64> {
958 let pos = w.stream_position()?;
959 let padding = (8 - (pos % 8)) % 8;
960 if padding > 0 {
961 w.write_all(&vec![0u8; padding as usize])?;
962 }
963 w.stream_position()
964 }
965}
966
967impl Drop for Builder {
968 fn drop(&mut self) {
969 if !self.committed {
970 self.cleanup_temp_files();
971 }
972 }
973}