1use crate::bloom::BloomFilter;
8use crate::decompress::maybe_decompress;
9use crate::error::Result;
10use crate::format::*;
11use crate::posting::{PostingEntry, PostingList};
12use crate::trigram::{Extractor, Trigram};
13use ignore::WalkBuilder;
14use libc;
15use llmosafe::ResourceGuard;
16use memmap2::Mmap;
17use std::collections::{BinaryHeap, HashMap};
18use std::fs::{self, File};
19use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
20use std::path::{Path, PathBuf};
21use std::time::{Instant, SystemTime, UNIX_EPOCH};
22
23pub struct Builder {
24 root: PathBuf,
25 ix_dir: PathBuf,
26 file_count: u32,
27
28 files_writer: BufWriter<File>,
30 blooms_writer: BufWriter<File>,
31 strings_writer: BufWriter<File>,
32
33 postings: HashMap<Trigram, Vec<PostingEntry>>,
35 postings_count: usize,
36 temp_runs: Vec<PathBuf>,
37
38extractor: Extractor,
39stats: BuildStats,
40decompress: bool,
41resource_guard: Option<ResourceGuard>,
42dead_ends: Vec<PathBuf>,
43}
44
45#[derive(Default, Debug)]
46pub struct BuildStats {
47 pub files_scanned: u64,
48 pub files_skipped_binary: u64,
49 pub files_skipped_size: u64,
50 pub bytes_scanned: u64,
51 pub unique_trigrams: u64,
52}
53
54struct RunIterator {
55 file: BufReader<File>,
56}
57
58impl RunIterator {
59 fn new(path: &Path) -> Result<Self> {
60 let f = File::open(path)?;
61 Ok(Self {
62 file: BufReader::new(f),
63 })
64 }
65
66 fn next_trigram(&mut self) -> Result<Option<(Trigram, Vec<PostingEntry>)>> {
67 let mut tri_buf = [0u8; 4];
68 if let Err(e) = self.file.read_exact(&mut tri_buf) {
69 if e.kind() == std::io::ErrorKind::UnexpectedEof {
70 return Ok(None);
71 }
72 return Err(e.into());
73 }
74 let tri = u32::from_le_bytes(tri_buf);
75
76 let mut len_buf = [0u8; 4];
77 self.file.read_exact(&mut len_buf)?;
78 let entries_len = u32::from_le_bytes(len_buf) as usize;
79
80 let mut entries = Vec::with_capacity(entries_len);
81 for _ in 0..entries_len {
82 self.file.read_exact(&mut len_buf)?;
83 let file_id = u32::from_le_bytes(len_buf);
84
85 self.file.read_exact(&mut len_buf)?;
86 let offsets_len = u32::from_le_bytes(len_buf) as usize;
87
88 let mut offsets = Vec::with_capacity(offsets_len);
89 for _ in 0..offsets_len {
90 self.file.read_exact(&mut len_buf)?;
91 offsets.push(u32::from_le_bytes(len_buf));
92 }
93 entries.push(PostingEntry { file_id, offsets });
94 }
95
96 Ok(Some((tri, entries)))
97 }
98}
99
100#[derive(Eq, PartialEq)]
101struct MergeItem {
102 tri: Trigram,
103 run_idx: usize,
104}
105
106impl PartialOrd for MergeItem {
107 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
108 Some(self.cmp(other))
109 }
110}
111
112impl Ord for MergeItem {
113 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
114 other.tri.cmp(&self.tri) }
116}
117
118impl Builder {
119 pub fn new(root: &Path) -> Result<Self> {
120 let ix_dir = root.join(".ix");
121 fs::create_dir_all(&ix_dir)?;
122
123 let files_tmp = ix_dir.join("shard.ix.tmp.files");
124 let blooms_tmp = ix_dir.join("shard.ix.tmp.blooms");
125 let strings_tmp = ix_dir.join("shard.ix.tmp.strings");
126
127 let files_writer = BufWriter::new(File::create(&files_tmp)?);
128 let blooms_writer = BufWriter::new(File::create(&blooms_tmp)?);
129 let mut strings_writer = BufWriter::new(File::create(&strings_tmp)?);
130
131 strings_writer.write_all(&1u32.to_le_bytes())?;
132 strings_writer.write_all(&0u16.to_le_bytes())?;
133 strings_writer.write_all(&0u16.to_le_bytes())?;
134 strings_writer.write_all(&[0u8; 2])?;
135
136 Ok(Self {
137 root: root.to_owned(),
138 ix_dir,
139 file_count: 0,
140 files_writer,
141 blooms_writer,
142 strings_writer,
143 postings: HashMap::new(),
144 postings_count: 0,
145temp_runs: Vec::new(),
146extractor: Extractor::new(),
147stats: BuildStats::default(),
148decompress: false,
149resource_guard: None,
150dead_ends: Vec::new(),
151})
152 }
153
154 pub fn with_resource_guard(mut self, guard: ResourceGuard) -> Self {
155 self.resource_guard = Some(guard);
156 self
157 }
158
159 pub fn set_decompress(&mut self, decompress: bool) {
160 self.decompress = decompress;
161 }
162
163 fn flush_run(&mut self) -> Result<()> {
164 if self.postings.is_empty() {
165 return Ok(());
166 }
167 let old_postings = std::mem::take(&mut self.postings);
168 let mut sorted: Vec<_> = old_postings.into_iter().collect();
169 sorted.sort_unstable_by_key(|(t, _)| *t);
170
171 let run_path = self
172 .ix_dir
173 .join(format!("shard.ix.run.{}", self.temp_runs.len()));
174 let mut f = BufWriter::new(File::create(&run_path)?);
175
176 for (tri, entries) in sorted {
177 f.write_all(&tri.to_le_bytes())?;
178 f.write_all(&(entries.len() as u32).to_le_bytes())?;
179 for entry in entries {
180 f.write_all(&entry.file_id.to_le_bytes())?;
181 f.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
182 for off in entry.offsets {
183 f.write_all(&off.to_le_bytes())?;
184 }
185 }
186 }
187 f.flush()?;
188
189 self.temp_runs.push(run_path);
190 self.postings_count = 0;
191 Ok(())
192 }
193
194 pub fn build(&mut self) -> Result<PathBuf> {
195 if self.ix_dir.exists()
197 && let Ok(entries) = std::fs::read_dir(&self.ix_dir) {
198 for entry in entries.flatten() {
199 let name = entry.file_name();
200 let name_str = name.to_string_lossy();
201 if (name_str.starts_with("shard.ix.run.") || name_str.starts_with("shard.ix.merged."))
202 && let Err(e) = std::fs::remove_file(entry.path()) {
203 tracing::warn!("Failed to cleanup shard file {}: {}", name_str, e);
204 }
205 }
206 }
207
208 let start = Instant::now();
209 let root = self.root.clone();
210
211 if root.to_string_lossy() == "/" {
213 tracing::warn!(
214 "LLMOSafe Advisory: Indexing root filesystem. Ensure adequate resource guards are in place."
215 );
216 }
217
218 let walker = WalkBuilder::new(&root)
219 .hidden(false)
220 .git_ignore(true)
221 .require_git(false)
222 .add_custom_ignore_filename(".ixignore")
223 .filter_entry(move |entry| {
224 let path = entry.path();
225 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
226
227 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
228 && (name == "lost+found"
229 || name == ".git"
230 || name == "node_modules"
231 || name == "target"
232 || name == "__pycache__"
233 || name == ".tox"
234 || name == ".venv"
235 || name == "venv"
236 || name == ".ix")
237 {
238 return false;
239 }
240
241 if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
242 if let Ok(metadata) = entry.metadata()
243 && metadata.len() > 10 * 1024 * 1024
244 {
245 return false;
246 }
247 if name == "Cargo.lock"
248 || name == "package-lock.json"
249 || name == "pnpm-lock.yaml"
250 || name == "shard.ix"
251 || name == "shard.ix.tmp"
252 || name.starts_with("shard.ix.")
253 {
254 return false;
255 }
256 }
257
258 if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
259 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
260 match ext {
261 "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" | "jpg" | "png"
262 | "gif" | "mp4" | "mp3" | "pdf" | "zip" | "7z" | "rar" | "sqlite"
263 | "db" | "bin" => return false,
264 _ => {}
265 }
266 if name.ends_with(".tar.gz") {
267 return false;
268 }
269 }
270 true
271 })
272 .build();
273
274 let mut files_processed = 0u64;
275 for entry_res in walker {
276 let entry = match entry_res {
277 Ok(e) => e,
278 Err(e) => {
279 let backtrack_path = match &e {
281 ignore::Error::Io(io_err) if io_err.raw_os_error() == Some(-7) => {
282 Some(None)
283 }
284 ignore::Error::WithPath { path, err } => {
285 if let ignore::Error::Io(io_err) = err.as_ref() {
286 if io_err.raw_os_error() == Some(-7) {
287 Some(Some(path.clone()))
288 } else {
289 None
290 }
291 } else {
292 None
293 }
294 }
295 _ => None,
296 };
297
298 if let Some(path_opt) = backtrack_path {
299 tracing::warn!(
300 "Immune Memory Triggered: Skipping path due to backtrack signal."
301 );
302 if let Some(path) = path_opt {
303 self.dead_ends.push(path);
304 }
305 }
306 continue;
307 }
308 };
309
310 if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
311 self.process_file(entry.path().to_owned())?;
312 files_processed += 1;
313
314 if files_processed.is_multiple_of(250) {
316 if let Some(guard) = &self.resource_guard {
317 if guard.check().map(|_s: ::llmosafe::Synapse| ()).is_err() {
318 let _err = guard.check().unwrap_err();
319 eprintln!(
320 "ixd: memory ceiling reached... flushing intermediate chunk ({} files processed)",
321 files_processed
322 );
323 self.flush_run()?;
324 continue;
325 }
326 } else {
327 if let Ok(rss) = Self::current_rss_bytes()
329 && rss > 512 * 1024 * 1024
330 {
331 eprintln!(
332 "ixd: RSS ceiling reached ({} MB) after {} files — flushing intermediate chunk",
333 rss / 1024 / 1024,
334 files_processed
335 );
336 self.flush_run()?;
337 continue;
338 }
339 }
340 }
341 }
342 }
343
344 let output_path = self.serialize()?;
345 tracing::info!("Build completed in {:?}: {:?}", start.elapsed(), self.stats);
346 Ok(output_path)
347 }
348
349 pub fn update(&mut self, _changed_files: &[PathBuf]) -> Result<PathBuf> {
350 self.build()
351 }
352
353 pub fn files_len(&self) -> usize {
354 self.file_count as usize
355 }
356
357 pub fn trigrams_len(&self) -> usize {
358 self.stats.unique_trigrams as usize
359 }
360
361 fn current_rss_bytes() -> std::io::Result<u64> {
363 let status = std::fs::read_to_string("/proc/self/status")?;
364 for line in status.lines() {
365 if let Some(rest) = line.strip_prefix("VmRSS:") {
366 let kb: u64 = rest
367 .split_whitespace()
368 .next()
369 .and_then(|s| s.parse().ok())
370 .unwrap_or(0);
371 return Ok(kb * 1024);
372 }
373 }
374 Ok(0)
375 }
376
377 fn free_bytes_at(path: &Path) -> std::io::Result<u64> {
379 use std::os::unix::ffi::OsStrExt;
380 let path_c = std::ffi::CString::new(path.as_os_str().as_bytes())
381 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
382 let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
383 let ret = unsafe { libc::statvfs(path_c.as_ptr(), &mut stat) };
384 if ret != 0 {
385 return Err(std::io::Error::last_os_error());
386 }
387 Ok(stat.f_bavail * stat.f_frsize)
388 }
389
390 fn process_file(&mut self, path: PathBuf) -> Result<bool> {
391 let metadata = match fs::metadata(&path) {
393 Ok(m) => m,
394 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
395 Err(e) => return Err(e.into()),
396 };
397 let size = metadata.len();
398 let mtime = metadata
399 .modified()?
400 .duration_since(UNIX_EPOCH)
401 .map(|d| d.as_nanos() as u64)
402 .unwrap_or(0);
403
404 if size > 10 * 1024 * 1024 {
405 self.stats.files_skipped_size += 1;
406 return Ok(false);
407 }
408
409 let file = match File::open(&path) {
410 Ok(f) => f,
411 Err(e)
412 if e.kind() == std::io::ErrorKind::NotFound
413 || e.kind() == std::io::ErrorKind::PermissionDenied =>
414 {
415 return Ok(false);
416 }
417 Err(e) => return Err(e.into()),
418 };
419 let mmap = unsafe { Mmap::map(&file)? };
420
421 let raw_data = if self.decompress {
422 if let Some(mut reader) = maybe_decompress(&path, &mmap)? {
423 let mut buf = Vec::new();
424 use std::io::Read;
425 reader
426 .by_ref()
427 .take(10 * 1024 * 1024)
428 .read_to_end(&mut buf)?;
429 std::borrow::Cow::Owned(buf)
430 } else {
431 std::borrow::Cow::Borrowed(&mmap[..])
432 }
433 } else {
434 std::borrow::Cow::Borrowed(&mmap[..])
435 };
436
437 let data = &raw_data[..];
438if is_binary(data) {
439 self.stats.files_skipped_binary += 1;
440 return Ok(false);
441}
442
443let content_hash = xxhash_rust::xxh64::xxh64(data, 0);
444 let pairs = self.extractor.extract_with_offsets(data);
445
446 let file_id = self.file_count;
447 self.file_count += 1;
448
449 let path_str = path.to_string_lossy();
450 let path_bytes = path_str.as_bytes();
451 let path_off = (self.strings_writer.stream_position()?) as u32;
452 let path_len = path_bytes.len() as u16;
453
454 self.strings_writer.write_all(&0u16.to_le_bytes())?;
455 self.strings_writer.write_all(&path_len.to_le_bytes())?;
456 self.strings_writer.write_all(path_bytes)?;
457
458 let mut bloom = BloomFilter::new(256, 5);
459 let mut trigram_count = 0u32;
460
461 let mut i = 0;
462 while i < pairs.len() && trigram_count < 20_000 {
463 let tri = pairs[i].0;
464 let mut j = i + 1;
465 while j < pairs.len() && pairs[j].0 == tri {
466 j += 1;
467 }
468
469 let take_count = (j - i).min(10_000);
470 let offsets: Vec<u32> = pairs[i..i + take_count].iter().map(|p| p.1).collect();
471
472 bloom.insert(tri);
473 self.postings
474 .entry(tri)
475 .or_default()
476 .push(PostingEntry { file_id, offsets });
477 self.postings_count += take_count + 8;
478
479 trigram_count += 1;
480 i = j;
481 }
482
483 bloom.serialize(&mut self.blooms_writer)?;
484
485 let bloom_offset = file_id * 260;
486 self.files_writer.write_all(&file_id.to_le_bytes())?;
487 self.files_writer.write_all(&path_off.to_le_bytes())?;
488 self.files_writer.write_all(&path_len.to_le_bytes())?;
489 self.files_writer.write_all(&[FileStatus::Fresh as u8])?;
490 self.files_writer.write_all(&[0u8])?;
491 self.files_writer.write_all(&mtime.to_le_bytes())?;
492 self.files_writer.write_all(&size.to_le_bytes())?;
493 self.files_writer.write_all(&content_hash.to_le_bytes())?;
494 self.files_writer.write_all(&trigram_count.to_le_bytes())?;
495 self.files_writer.write_all(&bloom_offset.to_le_bytes())?;
496 self.files_writer.write_all(&[0u8; 4])?;
497
498 self.stats.files_scanned += 1;
499 self.stats.bytes_scanned += size;
500
501 if self.postings_count >= 500_000 {
504 self.flush_run()?;
505 }
506
507 Ok(true)
508 }
509
510 fn serialize(&mut self) -> Result<PathBuf> {
511 if let Ok(free) = Self::free_bytes_at(&self.ix_dir) {
513 const MIN_FREE: u64 = 100 * 1024 * 1024; if free < MIN_FREE {
515 return Err(crate::error::Error::Io(std::io::Error::other(format!(
516 "insufficient disk space: {} MB free, need ≥100 MB (path: {})",
517 free / 1024 / 1024,
518 self.ix_dir.display()
519 ))));
520 }
521 }
522 self.flush_run()?;
523
524 self.files_writer.flush()?;
525 self.blooms_writer.flush()?;
526 self.strings_writer.flush()?;
527
528 while self.temp_runs.len() > 128 {
530 let mut next_generation = Vec::new();
531 for chunk in self.temp_runs.chunks(128) {
532 let out_path = self.ix_dir.join(format!(
533 "shard.ix.merged.{}.{}",
534 next_generation.len(),
535 SystemTime::now()
536 .duration_since(UNIX_EPOCH)
537 .unwrap()
538 .as_micros()
539 ));
540 self.merge_to_run(chunk, &out_path)?;
541 next_generation.push(out_path);
542 for p in chunk {
543 if let Err(e) = fs::remove_file(p) {
544 tracing::warn!("Failed to cleanup temp run {}: {}", p.display(), e);
545 }
546 }
547 }
548 self.temp_runs = next_generation;
549 }
550
551 let tmp_path = self.ix_dir.join("shard.ix.tmp");
552 let final_path = self.ix_dir.join("shard.ix");
553 let temp_trigrams_path = self.ix_dir.join("shard.ix.tmp.trigrams");
554
555 let mut f = BufWriter::new(File::create(&tmp_path)?);
556 f.write_all(&[0u8; HEADER_SIZE])?;
557
558 let file_table_offset = self.align_to_8(&mut f)?;
559 let mut files_reader = File::open(self.ix_dir.join("shard.ix.tmp.files"))?;
560 std::io::copy(&mut files_reader, &mut f)?;
561 let file_table_size = f.stream_position()? - file_table_offset;
562
563 self.align_to_8(&mut f)?;
564 let posting_data_offset = f.stream_position()?;
565
566 let mut trigram_table_writer = BufWriter::new(File::create(&temp_trigrams_path)?);
567 let mut global_trigram_count = 0u32;
568
569 let mut runs = Vec::new();
570 for path in &self.temp_runs {
571 runs.push(RunIterator::new(path)?);
572 }
573
574 let mut heap = BinaryHeap::new();
575 let mut current_items = vec![None; runs.len()];
576
577 for (i, run) in runs.iter_mut().enumerate() {
578 if let Some(item) = run.next_trigram()? {
579 heap.push(MergeItem {
580 tri: item.0,
581 run_idx: i,
582 });
583 current_items[i] = Some(item);
584 }
585 }
586
587 let mut current_tri: Option<Trigram> = None;
588 let mut merged_entries: Vec<PostingEntry> = Vec::new();
589
590 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
591 if Some(tri) != current_tri {
592 if let Some(t) = current_tri {
593 self.write_merged_posting(
594 &mut f,
595 &mut trigram_table_writer,
596 t,
597 posting_data_offset,
598 &mut merged_entries,
599 )?;
600 global_trigram_count += 1;
601 merged_entries.clear();
602 }
603 current_tri = Some(tri);
604 }
605
606 let item = current_items[run_idx].take().unwrap();
607 merged_entries.extend(item.1);
608
609 if let Some(next_item) = runs[run_idx].next_trigram()? {
610 heap.push(MergeItem {
611 tri: next_item.0,
612 run_idx,
613 });
614 current_items[run_idx] = Some(next_item);
615 }
616 }
617
618 if let Some(t) = current_tri {
619 self.write_merged_posting(
620 &mut f,
621 &mut trigram_table_writer,
622 t,
623 posting_data_offset,
624 &mut merged_entries,
625 )?;
626 global_trigram_count += 1;
627 }
628
629 self.stats.unique_trigrams = global_trigram_count as u64;
630 let posting_data_size = f.stream_position()? - posting_data_offset;
631
632 self.align_to_8(&mut f)?;
633 let trigram_table_offset = f.stream_position()?;
634 trigram_table_writer.flush()?;
635 drop(trigram_table_writer);
636
637 let mut trigram_table_file = File::open(&temp_trigrams_path)?;
638 std::io::copy(&mut trigram_table_file, &mut f)?;
639 let trigram_table_size = f.stream_position()? - trigram_table_offset;
640
641 self.align_to_8(&mut f)?;
642 let bloom_offset = f.stream_position()?;
643 let mut blooms_reader = File::open(self.ix_dir.join("shard.ix.tmp.blooms"))?;
644 std::io::copy(&mut blooms_reader, &mut f)?;
645 let bloom_size = f.stream_position()? - bloom_offset;
646
647 self.align_to_8(&mut f)?;
648 let string_pool_offset = f.stream_position()?;
649 let mut strings_reader = File::open(self.ix_dir.join("shard.ix.tmp.strings"))?;
650 std::io::copy(&mut strings_reader, &mut f)?;
651 let string_pool_size = f.stream_position()? - string_pool_offset;
652
653 let name_index_offset = f.stream_position()?;
654 let name_index_size = 0u64;
655
656 let created_at = SystemTime::now()
657 .duration_since(UNIX_EPOCH)
658 .unwrap()
659 .as_micros() as u64;
660 let mut header_bytes = [0u8; HEADER_SIZE];
661 header_bytes[0..4].copy_from_slice(&MAGIC);
662 header_bytes[0x04..0x06].copy_from_slice(&VERSION_MAJOR.to_le_bytes());
663 header_bytes[0x06..0x08].copy_from_slice(&VERSION_MINOR.to_le_bytes());
664 header_bytes[0x08..0x10].copy_from_slice(
665 &(flags::HAS_BLOOM_FILTERS
666 | flags::HAS_CONTENT_HASHES
667 | flags::POSTING_LISTS_CHECKSUMMED)
668 .to_le_bytes(),
669 );
670 header_bytes[0x10..0x18].copy_from_slice(&created_at.to_le_bytes());
671 header_bytes[0x18..0x20].copy_from_slice(&self.stats.bytes_scanned.to_le_bytes());
672 header_bytes[0x20..0x24].copy_from_slice(&self.file_count.to_le_bytes());
673 header_bytes[0x24..0x28].copy_from_slice(&(global_trigram_count).to_le_bytes());
674 header_bytes[0x28..0x30].copy_from_slice(&file_table_offset.to_le_bytes());
675 header_bytes[0x30..0x38].copy_from_slice(&file_table_size.to_le_bytes());
676 header_bytes[0x38..0x40].copy_from_slice(&trigram_table_offset.to_le_bytes());
677 header_bytes[0x40..0x48].copy_from_slice(&trigram_table_size.to_le_bytes());
678 header_bytes[0x48..0x50].copy_from_slice(&posting_data_offset.to_le_bytes());
679 header_bytes[0x50..0x58].copy_from_slice(&posting_data_size.to_le_bytes());
680 header_bytes[0x58..0x60].copy_from_slice(&bloom_offset.to_le_bytes());
681 header_bytes[0x60..0x68].copy_from_slice(&bloom_size.to_le_bytes());
682 header_bytes[0x68..0x70].copy_from_slice(&string_pool_offset.to_le_bytes());
683 header_bytes[0x70..0x78].copy_from_slice(&string_pool_size.to_le_bytes());
684 header_bytes[0x78..0x80].copy_from_slice(&name_index_offset.to_le_bytes());
685 header_bytes[0x80..0x88].copy_from_slice(&name_index_size.to_le_bytes());
686
687 let crc = crc32c::crc32c(&header_bytes[0..0xF8]);
688 header_bytes[0xF8..0xFC].copy_from_slice(&crc.to_le_bytes());
689
690 f.seek(SeekFrom::Start(0))?;
691 f.write_all(&header_bytes)?;
692 f.flush()?;
693 drop(f);
694
695 let backup_path = self.ix_dir.join("shard.ix.bak");
697 let old_index_exists = final_path.exists();
698 if old_index_exists {
699 let _ = fs::remove_file(&backup_path);
701 if let Err(e) = fs::rename(&final_path, &backup_path) {
703 tracing::warn!("Failed to backup existing index: {}", e);
704 }
705 }
706 fs::rename(&tmp_path, &final_path)?;
707 if old_index_exists {
709 let _ = fs::remove_file(&backup_path);
710 }
711
712 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.files"));
713 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.blooms"));
714 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.strings"));
715 let _ = fs::remove_file(&temp_trigrams_path);
716 for path in &self.temp_runs {
717 if let Err(e) = fs::remove_file(path) {
718 tracing::warn!("Failed to cleanup temp run {}: {}", path.display(), e);
719 }
720 }
721 self.temp_runs.clear();
722
723 Ok(final_path)
724 }
725
726 fn merge_to_run(&self, run_paths: &[PathBuf], out_path: &Path) -> Result<()> {
727 let mut runs = Vec::new();
728 for path in run_paths {
729 runs.push(RunIterator::new(path)?);
730 }
731 let mut heap = BinaryHeap::new();
732 let mut current_items = vec![None; runs.len()];
733 for (i, run) in runs.iter_mut().enumerate() {
734 if let Some(item) = run.next_trigram()? {
735 heap.push(MergeItem {
736 tri: item.0,
737 run_idx: i,
738 });
739 current_items[i] = Some(item);
740 }
741 }
742 let mut out = BufWriter::new(File::create(out_path)?);
743 let mut current_tri: Option<Trigram> = None;
744 let mut merged_entries: Vec<PostingEntry> = Vec::new();
745 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
746 if Some(tri) != current_tri {
747 if let Some(t) = current_tri {
748 self.write_run_entry(&mut out, t, &mut merged_entries)?;
749 merged_entries.clear();
750 }
751 current_tri = Some(tri);
752 }
753 let item = current_items[run_idx].take().unwrap();
754 merged_entries.extend(item.1);
755 if let Some(next_item) = runs[run_idx].next_trigram()? {
756 heap.push(MergeItem {
757 tri: next_item.0,
758 run_idx,
759 });
760 current_items[run_idx] = Some(next_item);
761 }
762 }
763 if let Some(t) = current_tri {
764 self.write_run_entry(&mut out, t, &mut merged_entries)?;
765 }
766 out.flush()?;
767 Ok(())
768 }
769
770 fn write_run_entry<W: Write>(
771 &self,
772 w: &mut W,
773 tri: Trigram,
774 entries: &mut [PostingEntry],
775 ) -> Result<()> {
776 entries.sort_by_key(|e| e.file_id);
777 w.write_all(&tri.to_le_bytes())?;
778 w.write_all(&(entries.len() as u32).to_le_bytes())?;
779 for entry in entries {
780 w.write_all(&entry.file_id.to_le_bytes())?;
781 w.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
782 for off in &entry.offsets {
783 w.write_all(&off.to_le_bytes())?;
784 }
785 }
786 Ok(())
787 }
788
789 fn write_merged_posting<W: Write + Seek>(
790 &self,
791 f: &mut W,
792 table: &mut W,
793 tri: Trigram,
794 base_off: u64,
795 entries: &mut [PostingEntry],
796 ) -> Result<()> {
797 entries.sort_by_key(|e| e.file_id);
798 let count = entries.len() as u32;
799 let list = PostingList {
800 entries: entries.to_vec(),
801 };
802 let encoded = list.encode();
803 let offset = f.stream_position()? - base_off;
804 f.write_all(&encoded)?;
805 let abs_off = base_off + offset;
806 table.write_all(&tri.to_le_bytes())?;
807 table.write_all(&abs_off.to_le_bytes()[..6])?;
808 table.write_all(&(encoded.len() as u32).to_le_bytes())?;
809 table.write_all(&count.to_le_bytes())?;
810 table.write_all(&[0u8; 2])?;
811 Ok(())
812 }
813
814 fn align_to_8<W: Write + Seek>(&self, mut w: W) -> std::io::Result<u64> {
815 let pos = w.stream_position()?;
816 let padding = (8 - (pos % 8)) % 8;
817 if padding > 0 {
818 w.write_all(&vec![0u8; padding as usize])?;
819 }
820 w.stream_position()
821 }
822}