1use crate::bloom::BloomFilter;
8use crate::decompress::maybe_decompress;
9use crate::error::{Error, Result};
10use crate::format::{
11 FileStatus, HEADER_SIZE, MAGIC, VERSION_MAJOR, VERSION_MINOR, flags, is_binary,
12};
13use crate::posting::{PostingEntry, PostingList};
14use crate::trigram::{Extractor, Trigram};
15use crate::varint;
16use ignore::WalkBuilder;
17use libc;
18use llmosafe::ResourceGuard;
19use memmap2::Mmap;
20use std::collections::{BinaryHeap, HashMap};
21use std::fs::{self, File};
22use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
23use std::path::{Path, PathBuf};
24use std::time::{Instant, SystemTime, UNIX_EPOCH};
25
26pub struct Builder {
29 root: PathBuf,
30 ix_dir: PathBuf,
31 file_count: u32,
32
33 files_writer: BufWriter<File>,
35 blooms_writer: BufWriter<File>,
36 strings_writer: BufWriter<File>,
37
38 postings: HashMap<Trigram, Vec<PostingEntry>>,
40 postings_count: usize,
41 temp_runs: Vec<PathBuf>,
42
43 extractor: Extractor,
44 stats: BuildStats,
45 decompress: bool,
46 resource_guard: Option<ResourceGuard>,
47 dead_ends: Vec<PathBuf>,
48 max_file_size: u64,
49}
50
51#[derive(Default, Debug)]
53pub struct BuildStats {
54 pub files_scanned: u64,
56 pub files_skipped_binary: u64,
58 pub files_skipped_size: u64,
60 pub bytes_scanned: u64,
62 pub unique_trigrams: u64,
64}
65
66struct RunIterator {
67 file: BufReader<File>,
68}
69
70impl RunIterator {
71 fn new(path: &Path) -> Result<Self> {
72 let f = File::open(path)?;
73 Ok(Self {
74 file: BufReader::new(f),
75 })
76 }
77
78 fn next_trigram(&mut self) -> Result<Option<(Trigram, Vec<PostingEntry>)>> {
79 let mut tri_buf = [0u8; 4];
80 if let Err(e) = self.file.read_exact(&mut tri_buf) {
81 if e.kind() == std::io::ErrorKind::UnexpectedEof {
82 return Ok(None);
83 }
84 return Err(e.into());
85 }
86 let tri = u32::from_le_bytes(tri_buf);
87
88 let mut len_buf = [0u8; 4];
89 self.file.read_exact(&mut len_buf)?;
90 let entries_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
91
92 let mut entries = Vec::with_capacity(entries_len);
93 for _ in 0..entries_len {
94 self.file.read_exact(&mut len_buf)?;
95 let file_id = u32::from_le_bytes(len_buf);
96
97 self.file.read_exact(&mut len_buf)?;
98 let offsets_len = usize::try_from(u32::from_le_bytes(len_buf)).unwrap_or(0);
99
100 let mut offsets = Vec::with_capacity(offsets_len);
101 for _ in 0..offsets_len {
102 self.file.read_exact(&mut len_buf)?;
103 offsets.push(u32::from_le_bytes(len_buf));
104 }
105 entries.push(PostingEntry { file_id, offsets });
106 }
107
108 Ok(Some((tri, entries)))
109 }
110}
111
112#[derive(Eq, PartialEq)]
113struct MergeItem {
114 tri: Trigram,
115 run_idx: usize,
116}
117
118impl PartialOrd for MergeItem {
119 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
120 Some(self.cmp(other))
121 }
122}
123
124impl Ord for MergeItem {
125 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
126 other.tri.cmp(&self.tri) }
128}
129
130#[allow(clippy::as_conversions)] #[allow(clippy::indexing_slicing)] impl Builder {
133 pub fn new(root: &Path) -> Result<Self> {
140 let ix_dir = root.join(".ix");
141 fs::create_dir_all(&ix_dir)?;
142
143 let files_tmp = ix_dir.join("shard.ix.tmp.files");
144 let blooms_tmp = ix_dir.join("shard.ix.tmp.blooms");
145 let strings_tmp = ix_dir.join("shard.ix.tmp.strings");
146
147 let files_writer = BufWriter::new(File::create(&files_tmp)?);
148 let blooms_writer = BufWriter::new(File::create(&blooms_tmp)?);
149 let mut strings_writer = BufWriter::new(File::create(&strings_tmp)?);
150
151 strings_writer.write_all(&1u32.to_le_bytes())?;
152 strings_writer.write_all(&0u16.to_le_bytes())?;
153 strings_writer.write_all(&0u16.to_le_bytes())?;
154 strings_writer.write_all(&[0u8; 2])?;
155
156 Ok(Self {
157 root: root.to_owned(),
158 ix_dir,
159 file_count: 0,
160 files_writer,
161 blooms_writer,
162 strings_writer,
163 postings: HashMap::new(),
164 postings_count: 0,
165 temp_runs: Vec::new(),
166 extractor: Extractor::new(),
167 stats: BuildStats::default(),
168 decompress: false,
169 resource_guard: None,
170 dead_ends: Vec::new(),
171 max_file_size: 100 * 1024 * 1024,
172 })
173 }
174
175 #[must_use]
177 pub const fn with_resource_guard(mut self, guard: ResourceGuard) -> Self {
178 self.resource_guard = Some(guard);
179 self
180 }
181
182 pub const fn set_decompress(&mut self, decompress: bool) {
185 self.decompress = decompress;
186 }
187
188 pub const fn set_max_file_size(&mut self, max_bytes: u64) {
190 self.max_file_size = max_bytes;
191 }
192
193 fn flush_run(&mut self) -> Result<()> {
194 if self.postings.is_empty() {
195 return Ok(());
196 }
197 let old_postings = std::mem::take(&mut self.postings);
198 let mut sorted: Vec<_> = old_postings.into_iter().collect();
199 sorted.sort_unstable_by_key(|(t, _)| *t);
200
201 let run_path = self
202 .ix_dir
203 .join(format!("shard.ix.run.{}", self.temp_runs.len()));
204 let mut f = BufWriter::new(File::create(&run_path)?);
205
206 for (tri, entries) in sorted {
207 f.write_all(&tri.to_le_bytes())?;
208 f.write_all(&u32::try_from(entries.len()).unwrap_or(0).to_le_bytes())?;
209 for entry in entries {
210 f.write_all(&entry.file_id.to_le_bytes())?;
211 f.write_all(
212 &u32::try_from(entry.offsets.len())
213 .unwrap_or(0)
214 .to_le_bytes(),
215 )?;
216 for off in entry.offsets {
217 f.write_all(&off.to_le_bytes())?;
218 }
219 }
220 }
221 f.flush()?;
222
223 self.temp_runs.push(run_path);
224 self.postings_count = 0;
225 Ok(())
226 }
227
228 pub fn build(&mut self) -> Result<PathBuf> {
234 if self.ix_dir.exists()
236 && let Ok(entries) = std::fs::read_dir(&self.ix_dir)
237 {
238 for entry in entries.flatten() {
239 let name = entry.file_name();
240 let name_str = name.to_string_lossy();
241 if (name_str.starts_with("shard.ix.run.")
242 || name_str.starts_with("shard.ix.merged."))
243 && let Err(e) = std::fs::remove_file(entry.path())
244 {
245 tracing::warn!("Failed to cleanup shard file {}: {}", name_str, e);
246 }
247 }
248 }
249
250 let start = Instant::now();
251 let root = self.root.clone();
252
253 if root.to_string_lossy() == "/" {
255 tracing::warn!(
256 "LLMOSafe Advisory: Indexing root filesystem. Ensure adequate resource guards are in place."
257 );
258 }
259
260 let walker = WalkBuilder::new(&root)
261 .hidden(false)
262 .git_ignore(true)
263 .git_global(true)
264 .git_exclude(true)
265 .require_git(false)
266 .add_custom_ignore_filename(".ixignore")
267 .filter_entry(move |entry| {
268 let path = entry.path();
269 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
270
271 if entry.file_type().is_some_and(|t| t.is_dir())
272 && (name == "lost+found" || name == ".git" || name == ".ix")
273 {
274 return false;
275 }
276
277 if entry.file_type().is_some_and(|t| t.is_file())
278 && (name == "shard.ix"
279 || name == "shard.ix.tmp"
280 || name.starts_with("shard.ix."))
281 {
282 return false;
283 }
284
285 if entry.file_type().is_some_and(|t| t.is_file()) {
286 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
287 match ext {
288 "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" | "jpg" | "png"
289 | "gif" | "mp4" | "mp3" | "pdf" | "zip" | "7z" | "rar" | "sqlite"
290 | "db" | "bin" => return false,
291 _ => {}
292 }
293 if name.ends_with(".tar.gz") {
294 return false;
295 }
296 }
297 true
298 })
299 .build();
300
301 let mut files_processed = 0u64;
302 for entry_res in walker {
303 let entry = match entry_res {
304 Ok(e) => e,
305 Err(e) => {
306 let backtrack_path = match &e {
308 ignore::Error::Io(io_err) if io_err.raw_os_error() == Some(-7) => {
309 Some(None)
310 }
311 ignore::Error::WithPath { path, err } => {
312 if let ignore::Error::Io(io_err) = err.as_ref() {
313 if io_err.raw_os_error() == Some(-7) {
314 Some(Some(path.clone()))
315 } else {
316 None
317 }
318 } else {
319 None
320 }
321 }
322 _ => None,
323 };
324
325 if let Some(path_opt) = backtrack_path {
326 tracing::warn!(
327 "Immune Memory Triggered: Skipping path due to backtrack signal."
328 );
329 if let Some(path) = path_opt {
330 self.dead_ends.push(path);
331 }
332 }
333 continue;
334 }
335 };
336
337 if entry.file_type().is_some_and(|t| t.is_file()) {
338 self.process_file(entry.path())?;
339 files_processed += 1;
340
341 #[allow(clippy::manual_is_multiple_of)]
344 if files_processed % 250 == 0 {
345 if let Some(guard) = &self.resource_guard {
346 if let Err(_err) = guard.check() {
347 eprintln!(
348 "ixd: memory ceiling reached... flushing intermediate chunk ({files_processed} files processed)"
349 );
350 self.flush_run()?;
351 }
352 } else {
353 if let Ok(rss) = Self::current_rss_bytes()
355 && rss > 512 * 1024 * 1024
356 {
357 eprintln!(
358 "ixd: RSS ceiling reached ({} MB) after {} files — flushing intermediate chunk",
359 rss / 1024 / 1024,
360 files_processed
361 );
362 self.flush_run()?;
363 }
364 }
365 }
366 }
367 }
368
369 let output_path = self.serialize()?;
370 tracing::info!("Build completed in {:?}: {:?}", start.elapsed(), self.stats);
371 Ok(output_path)
372 }
373
374 pub fn update(&mut self, _changed_files: &[PathBuf]) -> Result<PathBuf> {
380 self.build()
381 }
382
383 #[must_use]
385 pub const fn files_len(&self) -> usize {
386 self.file_count as usize
387 }
388
389 #[must_use]
391 pub const fn trigrams_len(&self) -> usize {
392 self.stats.unique_trigrams as usize
393 }
394
395 fn current_rss_bytes() -> std::io::Result<u64> {
397 let status = std::fs::read_to_string("/proc/self/status")?;
398 for line in status.lines() {
399 if let Some(rest) = line.strip_prefix("VmRSS:") {
400 let kb: u64 = rest
401 .split_whitespace()
402 .next()
403 .and_then(|s| s.parse().ok())
404 .unwrap_or(0);
405 return Ok(kb * 1024);
406 }
407 }
408 Ok(0)
409 }
410
411 fn free_bytes_at(path: &Path) -> std::io::Result<u64> {
413 use std::os::unix::ffi::OsStrExt;
414 let path_c = std::ffi::CString::new(path.as_os_str().as_bytes())
415 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
416 let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
417 let ret = unsafe { libc::statvfs(path_c.as_ptr(), &raw mut stat) };
418 if ret != 0 {
419 return Err(std::io::Error::last_os_error());
420 }
421 #[allow(clippy::unnecessary_cast)]
422 Ok(stat.f_bavail as u64 * stat.f_frsize as u64)
423 }
424
425 fn process_file(&mut self, path: &Path) -> Result<bool> {
426 let path_str = path.display().to_string();
427 let metadata = match fs::metadata(path) {
429 Ok(m) => m,
430 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
431 Err(e) => return Err(e.into()),
432 };
433 let size = metadata.len();
434 let mtime = metadata
435 .modified()?
436 .duration_since(UNIX_EPOCH)
437 .map_or(0, |d| u64::try_from(d.as_nanos()).unwrap_or(0));
438
439 if self.max_file_size > 0 && size > self.max_file_size {
440 tracing::debug!("SKIP: file too large ({size} bytes): {path_str}");
441 self.stats.files_skipped_size += 1;
442 return Ok(false);
443 }
444
445 let file = match File::open(path) {
446 Ok(f) => f,
447 Err(e)
448 if e.kind() == std::io::ErrorKind::NotFound
449 || e.kind() == std::io::ErrorKind::PermissionDenied =>
450 {
451 return Ok(false);
452 }
453 Err(e) => return Err(e.into()),
454 };
455 let mmap = unsafe { Mmap::map(&file)? };
456
457 let raw_data = if self.decompress {
458 if let Some(mut reader) = maybe_decompress(path, &mmap)? {
459 let mut buf = Vec::new();
460 let take_limit = if self.max_file_size > 0 {
461 self.max_file_size
462 } else {
463 100 * 1024 * 1024
464 };
465 reader.by_ref().take(take_limit).read_to_end(&mut buf)?;
466 std::borrow::Cow::Owned(buf)
467 } else {
468 std::borrow::Cow::Borrowed(&mmap[..])
469 }
470 } else {
471 std::borrow::Cow::Borrowed(&mmap[..])
472 };
473
474 let data = &raw_data[..];
475 if is_binary(data) {
476 tracing::debug!("SKIP: binary detection ({size} bytes): {path_str}");
477 self.stats.files_skipped_binary += 1;
478 return Ok(false);
479 }
480
481 let content_hash = xxhash_rust::xxh64::xxh64(data, 0);
482 let pairs = self.extractor.extract_with_offsets(data);
483
484 if pairs.is_empty() && !data.is_empty() {
485 tracing::debug!("SKIP: no trigrams extracted ({size} bytes): {path_str}");
486 }
487
488 let file_id = self.file_count;
489 self.file_count += 1;
490
491 let path_str = path.to_string_lossy();
492 let path_bytes = path_str.as_bytes();
493 let path_off = u32::try_from(self.strings_writer.stream_position()?).unwrap_or(0);
494 let path_len = u16::try_from(path_bytes.len()).unwrap_or(0);
495
496 self.strings_writer.write_all(&0u16.to_le_bytes())?;
497 self.strings_writer.write_all(&path_len.to_le_bytes())?;
498 self.strings_writer.write_all(path_bytes)?;
499
500 let mut bloom = BloomFilter::new(256, 5);
501 let mut trigram_count = 0u32;
502
503 let mut i = 0;
504 while i < pairs.len() {
505 let tri = pairs[i].0;
506 let mut j = i + 1;
507 while j < pairs.len() && pairs[j].0 == tri {
508 j += 1;
509 }
510
511 let take_count = (j - i).min(10_000);
512 let offsets: Vec<u32> = pairs[i..i + take_count].iter().map(|p| p.1).collect();
513
514 bloom.insert(tri);
515 self.postings
516 .entry(tri)
517 .or_default()
518 .push(PostingEntry { file_id, offsets });
519 self.postings_count += take_count + 8;
520
521 trigram_count += 1;
522 i = j;
523 }
524
525 bloom.serialize(&mut self.blooms_writer)?;
526
527 let bloom_offset = u64::from(file_id)
528 .checked_mul(260)
529 .ok_or_else(|| Error::Config("file_id * 260 overflow".into()))?;
530 let bloom_offset_u32 = u32::try_from(bloom_offset)
531 .map_err(|_| Error::Config(format!("bloom_offset {bloom_offset} exceeds u32::MAX")))?;
532 self.files_writer.write_all(&file_id.to_le_bytes())?;
533 self.files_writer.write_all(&path_off.to_le_bytes())?;
534 self.files_writer.write_all(&path_len.to_le_bytes())?;
535 self.files_writer.write_all(&[FileStatus::Fresh as u8])?;
536 self.files_writer.write_all(&[0u8])?;
537 self.files_writer.write_all(&mtime.to_le_bytes())?;
538 self.files_writer.write_all(&size.to_le_bytes())?;
539 self.files_writer.write_all(&content_hash.to_le_bytes())?;
540 self.files_writer.write_all(&trigram_count.to_le_bytes())?;
541 self.files_writer
542 .write_all(&bloom_offset_u32.to_le_bytes())?;
543 self.files_writer.write_all(&[0u8; 4])?;
544
545 self.stats.files_scanned += 1;
546 self.stats.bytes_scanned += size;
547
548 if std::env::var("IX_DEBUG_BUILD").is_ok() {
549 eprintln!(
550 "IX-INDEXED: file_id={file_id} unique_trigrams={trigram_count} size={size}: {path_str}"
551 );
552 }
553
554 if self.postings_count >= 500_000 {
557 if std::env::var("IX_DEBUG_BUILD").is_ok() {
558 eprintln!(
559 "IX-FLUSH: postings_count={} after file_id={file_id}",
560 self.postings_count
561 );
562 }
563 self.flush_run()?;
564 }
565
566 Ok(true)
567 }
568
569 fn serialize(&mut self) -> Result<PathBuf> {
570 if let Ok(free) = Self::free_bytes_at(&self.ix_dir) {
572 const MIN_FREE: u64 = 100 * 1024 * 1024; if free < MIN_FREE {
574 return Err(crate::error::Error::Io(std::io::Error::other(format!(
575 "insufficient disk space: {} MB free, need ≥100 MB (path: {})",
576 free / 1024 / 1024,
577 self.ix_dir.display()
578 ))));
579 }
580 }
581 self.flush_run()?;
582
583 self.files_writer.flush()?;
584 self.blooms_writer.flush()?;
585 self.strings_writer.flush()?;
586
587 while self.temp_runs.len() > 128 {
589 let mut next_generation = Vec::new();
590 for chunk in self.temp_runs.chunks(128) {
591 let out_path = self.ix_dir.join(format!(
592 "shard.ix.merged.{}.{}",
593 next_generation.len(),
594 SystemTime::now()
595 .duration_since(UNIX_EPOCH)
596 .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
597 .as_micros()
598 ));
599 Self::merge_to_run(chunk, &out_path)?;
600 next_generation.push(out_path);
601 for p in chunk {
602 if let Err(e) = fs::remove_file(p) {
603 tracing::warn!("Failed to cleanup temp run {}: {}", p.display(), e);
604 }
605 }
606 }
607 self.temp_runs = next_generation;
608 }
609
610 let tmp_path = self.ix_dir.join("shard.ix.tmp");
611 let final_path = self.ix_dir.join("shard.ix");
612
613 let mut f = BufWriter::new(File::create(&tmp_path)?);
614 f.write_all(&[0u8; HEADER_SIZE])?;
615
616 let file_table_offset = Self::align_to_8(&mut f)?;
617 let mut files_reader = File::open(self.ix_dir.join("shard.ix.tmp.files"))?;
618 std::io::copy(&mut files_reader, &mut f)?;
619 let file_table_size = f.stream_position()? - file_table_offset;
620
621 Self::align_to_8(&mut f)?;
622 let posting_data_offset = f.stream_position()?;
623
624 let mut cdx_entries: Vec<(Trigram, u64, u32, u32)> = Vec::new();
625 let mut global_trigram_count = 0u32;
626
627 let mut runs = Vec::new();
628 for path in &self.temp_runs {
629 runs.push(RunIterator::new(path)?);
630 }
631
632 let mut heap = BinaryHeap::new();
633 let mut current_items = vec![None; runs.len()];
634
635 for (i, run) in runs.iter_mut().enumerate() {
636 if let Some(item) = run.next_trigram()? {
637 heap.push(MergeItem {
638 tri: item.0,
639 run_idx: i,
640 });
641 current_items[i] = Some(item);
642 }
643 }
644
645 let mut current_tri: Option<Trigram> = None;
646 let mut merged_entries: Vec<PostingEntry> = Vec::new();
647
648 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
649 if Some(tri) != current_tri {
650 if let Some(t) = current_tri {
651 let cdx_entry = Self::write_merged_posting(
652 &mut f,
653 t,
654 posting_data_offset,
655 &mut merged_entries,
656 )?;
657 cdx_entries.push(cdx_entry);
658 global_trigram_count += 1;
659 merged_entries.clear();
660 }
661 current_tri = Some(tri);
662 }
663
664 let item = current_items
665 .get_mut(run_idx)
666 .ok_or(Error::Config("run_idx out of bounds".into()))?
667 .take()
668 .ok_or(Error::Config("current item is None".into()))?;
669 merged_entries.extend(item.1);
670
671 if let Some(next_item) = runs
672 .get_mut(run_idx)
673 .ok_or(Error::Config("run_idx out of bounds".into()))?
674 .next_trigram()?
675 {
676 heap.push(MergeItem {
677 tri: next_item.0,
678 run_idx,
679 });
680 *current_items
681 .get_mut(run_idx)
682 .ok_or(Error::Config("run_idx out of bounds".into()))? = Some(next_item);
683 }
684 }
685
686 if let Some(t) = current_tri {
687 let cdx_entry =
688 Self::write_merged_posting(&mut f, t, posting_data_offset, &mut merged_entries)?;
689 cdx_entries.push(cdx_entry);
690 global_trigram_count += 1;
691 }
692
693 self.stats.unique_trigrams = u64::from(global_trigram_count);
694 let posting_data_size = f.stream_position()? - posting_data_offset;
695
696 Self::align_to_8(&mut f)?;
697 let trigram_table_offset = f.stream_position()?;
698
699 let cdx_block_index = Self::write_cdx_blocks(&mut f, &cdx_entries)?;
701 let trigram_table_size = f.stream_position()? - trigram_table_offset;
702
703 Self::align_to_8(&mut f)?;
704 let cdx_block_index_offset = f.stream_position()?;
705 for entry in &cdx_block_index {
706 f.write_all(&entry.0.to_le_bytes())?;
707 f.write_all(&entry.1.to_le_bytes())?;
708 }
709 f.write_all(&u32::MAX.to_le_bytes())?;
711 f.write_all(&(trigram_table_offset + trigram_table_size).to_le_bytes())?;
712 let cdx_block_index_size = f.stream_position()? - cdx_block_index_offset;
713
714 Self::align_to_8(&mut f)?;
715 let bloom_offset = f.stream_position()?;
716 let mut blooms_reader = File::open(self.ix_dir.join("shard.ix.tmp.blooms"))?;
717 std::io::copy(&mut blooms_reader, &mut f)?;
718 let bloom_size = f.stream_position()? - bloom_offset;
719
720 Self::align_to_8(&mut f)?;
721 let string_pool_offset = f.stream_position()?;
722 let mut strings_reader = File::open(self.ix_dir.join("shard.ix.tmp.strings"))?;
723 std::io::copy(&mut strings_reader, &mut f)?;
724 let string_pool_size = f.stream_position()? - string_pool_offset;
725
726 let name_index_offset = f.stream_position()?;
727 let name_index_size = 0u64;
728
729 let created_at = u64::try_from(
730 SystemTime::now()
731 .duration_since(UNIX_EPOCH)
732 .map_err(|e| Error::Config(format!("time went backwards: {e}")))?
733 .as_micros(),
734 )
735 .unwrap_or(0);
736 let mut header_bytes = [0u8; HEADER_SIZE];
737 header_bytes[0..4].copy_from_slice(&MAGIC);
738 header_bytes[0x04..0x06].copy_from_slice(&VERSION_MAJOR.to_le_bytes());
739 header_bytes[0x06..0x08].copy_from_slice(&VERSION_MINOR.to_le_bytes());
740 header_bytes[0x08..0x10].copy_from_slice(
741 &(flags::HAS_BLOOM_FILTERS
742 | flags::HAS_CONTENT_HASHES
743 | flags::POSTING_LISTS_CHECKSUMMED
744 | flags::HAS_CDX_INDEX)
745 .to_le_bytes(),
746 );
747 header_bytes[0x10..0x18].copy_from_slice(&created_at.to_le_bytes());
748 header_bytes[0x18..0x20].copy_from_slice(&self.stats.bytes_scanned.to_le_bytes());
749 header_bytes[0x20..0x24].copy_from_slice(&self.file_count.to_le_bytes());
750 header_bytes[0x24..0x28].copy_from_slice(&(global_trigram_count).to_le_bytes());
751 header_bytes[0x28..0x30].copy_from_slice(&file_table_offset.to_le_bytes());
752 header_bytes[0x30..0x38].copy_from_slice(&file_table_size.to_le_bytes());
753 header_bytes[0x38..0x40].copy_from_slice(&trigram_table_offset.to_le_bytes());
754 header_bytes[0x40..0x48].copy_from_slice(&trigram_table_size.to_le_bytes());
755 header_bytes[0x48..0x50].copy_from_slice(&posting_data_offset.to_le_bytes());
756 header_bytes[0x50..0x58].copy_from_slice(&posting_data_size.to_le_bytes());
757 header_bytes[0x58..0x60].copy_from_slice(&bloom_offset.to_le_bytes());
758 header_bytes[0x60..0x68].copy_from_slice(&bloom_size.to_le_bytes());
759 header_bytes[0x68..0x70].copy_from_slice(&string_pool_offset.to_le_bytes());
760 header_bytes[0x70..0x78].copy_from_slice(&string_pool_size.to_le_bytes());
761 header_bytes[0x78..0x80].copy_from_slice(&name_index_offset.to_le_bytes());
762 header_bytes[0x80..0x88].copy_from_slice(&name_index_size.to_le_bytes());
763 header_bytes[0x88..0x90].copy_from_slice(&cdx_block_index_offset.to_le_bytes());
764 header_bytes[0x90..0x98].copy_from_slice(&cdx_block_index_size.to_le_bytes());
765
766 let crc = crc32c::crc32c(&header_bytes[0..0xF8]);
767 header_bytes[0xF8..0xFC].copy_from_slice(&crc.to_le_bytes());
768
769 f.seek(SeekFrom::Start(0))?;
770 f.write_all(&header_bytes)?;
771 f.flush()?;
772 drop(f);
773
774 let backup_path = self.ix_dir.join("shard.ix.bak");
776 let old_index_exists = final_path.exists();
777 if old_index_exists {
778 let _ = fs::remove_file(&backup_path);
780 if let Err(e) = fs::rename(&final_path, &backup_path) {
782 tracing::warn!("Failed to backup existing index: {}", e);
783 }
784 }
785 fs::rename(&tmp_path, &final_path)?;
786 if old_index_exists {
788 let _ = fs::remove_file(&backup_path);
789 }
790
791 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.files"));
792 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.blooms"));
793 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.strings"));
794 for path in &self.temp_runs {
795 if let Err(e) = fs::remove_file(path) {
796 tracing::warn!("Failed to cleanup temp run {}: {}", path.display(), e);
797 }
798 }
799 self.temp_runs.clear();
800
801 Ok(final_path)
802 }
803
804 fn merge_to_run(run_paths: &[PathBuf], out_path: &Path) -> Result<()> {
805 let mut runs = Vec::new();
806 for path in run_paths {
807 runs.push(RunIterator::new(path)?);
808 }
809 let mut heap = BinaryHeap::new();
810 let mut current_items = vec![None; runs.len()];
811 for (i, run) in runs.iter_mut().enumerate() {
812 if let Some(item) = run.next_trigram()? {
813 heap.push(MergeItem {
814 tri: item.0,
815 run_idx: i,
816 });
817 current_items[i] = Some(item);
818 }
819 }
820 let mut out = BufWriter::new(File::create(out_path)?);
821 let mut current_tri: Option<Trigram> = None;
822 let mut merged_entries: Vec<PostingEntry> = Vec::new();
823 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
824 if Some(tri) != current_tri {
825 if let Some(t) = current_tri {
826 Self::write_run_entry(&mut out, t, &mut merged_entries)?;
827 merged_entries.clear();
828 }
829 current_tri = Some(tri);
830 }
831 let item = current_items[run_idx].take().ok_or_else(|| {
832 Error::Config(format!(
833 "merge invariant broken: no item at run index {run_idx}"
834 ))
835 })?;
836 merged_entries.extend(item.1);
837 if let Some(next_item) = runs[run_idx].next_trigram()? {
838 heap.push(MergeItem {
839 tri: next_item.0,
840 run_idx,
841 });
842 current_items[run_idx] = Some(next_item);
843 }
844 }
845 if let Some(t) = current_tri {
846 Self::write_run_entry(&mut out, t, &mut merged_entries)?;
847 }
848 out.flush()?;
849 Ok(())
850 }
851
852 fn write_run_entry<W: Write>(
853 w: &mut W,
854 tri: Trigram,
855 entries: &mut [PostingEntry],
856 ) -> Result<()> {
857 entries.sort_by_key(|e| e.file_id);
858 w.write_all(&tri.to_le_bytes())?;
859 w.write_all(&(entries.len() as u32).to_le_bytes())?;
860 for entry in entries {
861 w.write_all(&entry.file_id.to_le_bytes())?;
862 w.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
863 for off in &entry.offsets {
864 w.write_all(&off.to_le_bytes())?;
865 }
866 }
867 Ok(())
868 }
869
870 fn write_merged_posting<W: Write + Seek>(
871 f: &mut W,
872 tri: Trigram,
873 base_off: u64,
874 entries: &mut [PostingEntry],
875 ) -> Result<(Trigram, u64, u32, u32)> {
876 entries.sort_by_key(|e| e.file_id);
877 let count = entries.len() as u32;
878 let list = PostingList {
879 entries: entries.to_vec(),
880 };
881 let encoded = list.encode()?;
882 let offset = f.stream_position()? - base_off;
883 f.write_all(&encoded)?;
884 let abs_off = base_off + offset;
885 Ok((tri, abs_off, encoded.len() as u32, count))
886 }
887
888 fn write_cdx_blocks<W: Write + Seek>(
889 f: &mut W,
890 cdx_entries: &[(Trigram, u64, u32, u32)],
891 ) -> Result<Vec<(u32, u64)>> {
892 let mut block_index = Vec::new();
893 for chunk in cdx_entries.chunks(crate::format::CDX_BLOCK_SIZE) {
894 let first_key = chunk[0].0;
895 let block_offset = f.stream_position()?;
896 block_index.push((first_key, block_offset));
897
898 let mut buf = Vec::new();
899 varint::encode(u64::try_from(chunk.len()).unwrap_or(0), &mut buf);
900 let mut last_key = 0u32;
901 for &(tri, posting_offset, posting_length, doc_frequency) in chunk {
902 varint::encode(u64::from(tri - last_key), &mut buf);
903 last_key = tri;
904 varint::encode(posting_offset, &mut buf);
905 varint::encode(u64::from(posting_length), &mut buf);
906 varint::encode(u64::from(doc_frequency), &mut buf);
907 }
908
909 let compressed = zstd::encode_all(&buf[..], PostingList::ZSTD_COMPRESSION_LEVEL)
910 .map_err(|e| Error::Config(format!("cdx zstd encode: {e}")))?;
911 f.write_all(&compressed)?;
912 }
913 Ok(block_index)
914 }
915
916 fn align_to_8<W: Write + Seek>(mut w: W) -> std::io::Result<u64> {
917 let pos = w.stream_position()?;
918 let padding = (8 - (pos % 8)) % 8;
919 if padding > 0 {
920 w.write_all(&vec![0u8; padding as usize])?;
921 }
922 w.stream_position()
923 }
924}