1use crate::bloom::BloomFilter;
8use crate::decompress::maybe_decompress;
9use crate::error::Result;
10use crate::format::*;
11use crate::posting::{PostingEntry, PostingList};
12use crate::trigram::{Extractor, Trigram};
13use ignore::WalkBuilder;
14use memmap2::Mmap;
15use std::collections::{BinaryHeap, HashMap};
16use std::fs::{self, File};
17use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
18use std::path::{Path, PathBuf};
19use std::time::{Instant, SystemTime, UNIX_EPOCH};
20use libc;
21use llmosafe::{
22 ResourceGuard, sift_perceptions, WorkingMemory,
23};
24
25pub struct Builder {
26 root: PathBuf,
27 ix_dir: PathBuf,
28 file_count: u32,
29
30 files_writer: BufWriter<File>,
32 blooms_writer: BufWriter<File>,
33 strings_writer: BufWriter<File>,
34
35 postings: HashMap<Trigram, Vec<PostingEntry>>,
37 postings_count: usize,
38 temp_runs: Vec<PathBuf>,
39
40 extractor: Extractor,
41 stats: BuildStats,
42 decompress: bool,
43 resource_guard: Option<ResourceGuard>,
44 cognitive_memory: WorkingMemory<128>,
45 dead_ends: Vec<PathBuf>,
46}
47
48#[derive(Default, Debug)]
49pub struct BuildStats {
50 pub files_scanned: u64,
51 pub files_skipped_binary: u64,
52 pub files_skipped_size: u64,
53 pub bytes_scanned: u64,
54 pub unique_trigrams: u64,
55}
56
57struct RunIterator {
58 file: BufReader<File>,
59}
60
61impl RunIterator {
62 fn new(path: &Path) -> Result<Self> {
63 let f = File::open(path)?;
64 Ok(Self {
65 file: BufReader::new(f),
66 })
67 }
68
69 fn next_trigram(&mut self) -> Result<Option<(Trigram, Vec<PostingEntry>)>> {
70 let mut tri_buf = [0u8; 4];
71 if let Err(e) = self.file.read_exact(&mut tri_buf) {
72 if e.kind() == std::io::ErrorKind::UnexpectedEof {
73 return Ok(None);
74 }
75 return Err(e.into());
76 }
77 let tri = u32::from_le_bytes(tri_buf);
78
79 let mut len_buf = [0u8; 4];
80 self.file.read_exact(&mut len_buf)?;
81 let entries_len = u32::from_le_bytes(len_buf) as usize;
82
83 let mut entries = Vec::with_capacity(entries_len);
84 for _ in 0..entries_len {
85 self.file.read_exact(&mut len_buf)?;
86 let file_id = u32::from_le_bytes(len_buf);
87
88 self.file.read_exact(&mut len_buf)?;
89 let offsets_len = u32::from_le_bytes(len_buf) as usize;
90
91 let mut offsets = Vec::with_capacity(offsets_len);
92 for _ in 0..offsets_len {
93 self.file.read_exact(&mut len_buf)?;
94 offsets.push(u32::from_le_bytes(len_buf));
95 }
96 entries.push(PostingEntry { file_id, offsets });
97 }
98
99 Ok(Some((tri, entries)))
100 }
101}
102
103#[derive(Eq, PartialEq)]
104struct MergeItem {
105 tri: Trigram,
106 run_idx: usize,
107}
108
109impl PartialOrd for MergeItem {
110 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
111 Some(self.cmp(other))
112 }
113}
114
115impl Ord for MergeItem {
116 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
117 other.tri.cmp(&self.tri) }
119}
120
121impl Builder {
122 pub fn new(root: &Path) -> Result<Self> {
123 let ix_dir = root.join(".ix");
124 fs::create_dir_all(&ix_dir)?;
125
126 let files_tmp = ix_dir.join("shard.ix.tmp.files");
127 let blooms_tmp = ix_dir.join("shard.ix.tmp.blooms");
128 let strings_tmp = ix_dir.join("shard.ix.tmp.strings");
129
130 let files_writer = BufWriter::new(File::create(&files_tmp)?);
131 let blooms_writer = BufWriter::new(File::create(&blooms_tmp)?);
132 let mut strings_writer = BufWriter::new(File::create(&strings_tmp)?);
133
134 strings_writer.write_all(&1u32.to_le_bytes())?;
135 strings_writer.write_all(&0u16.to_le_bytes())?;
136 strings_writer.write_all(&0u16.to_le_bytes())?;
137 strings_writer.write_all(&[0u8; 2])?;
138
139 Ok(Self {
140 root: root.to_owned(),
141 ix_dir,
142 file_count: 0,
143 files_writer,
144 blooms_writer,
145 strings_writer,
146 postings: HashMap::new(),
147 postings_count: 0,
148 temp_runs: Vec::new(),
149 extractor: Extractor::new(),
150 stats: BuildStats::default(),
151 decompress: false,
152 resource_guard: None,
153 cognitive_memory: WorkingMemory::new(1000), dead_ends: Vec::new(),
155 })
156 }
157
158 pub fn with_resource_guard(mut self, guard: ResourceGuard) -> Self {
159 self.resource_guard = Some(guard);
160 self
161 }
162
163 pub fn set_decompress(&mut self, decompress: bool) {
164 self.decompress = decompress;
165 }
166
167 fn flush_run(&mut self) -> Result<()> {
168 if self.postings.is_empty() {
169 return Ok(());
170 }
171 let old_postings = std::mem::take(&mut self.postings);
172 let mut sorted: Vec<_> = old_postings.into_iter().collect();
173 sorted.sort_unstable_by_key(|(t, _)| *t);
174
175 let run_path = self.ix_dir.join(format!("shard.ix.run.{}", self.temp_runs.len()));
176 let mut f = BufWriter::new(File::create(&run_path)?);
177
178 for (tri, entries) in sorted {
179 f.write_all(&tri.to_le_bytes())?;
180 f.write_all(&(entries.len() as u32).to_le_bytes())?;
181 for entry in entries {
182 f.write_all(&entry.file_id.to_le_bytes())?;
183 f.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
184 for off in entry.offsets {
185 f.write_all(&off.to_le_bytes())?;
186 }
187 }
188 }
189 f.flush()?;
190
191 self.temp_runs.push(run_path);
192 self.postings_count = 0;
193 Ok(())
194 }
195
196 pub fn build(&mut self) -> Result<PathBuf> {
197 let start = Instant::now();
198 let root = self.root.clone();
199
200 if root.to_string_lossy() == "/" {
202 tracing::warn!("LLMOSafe Advisory: Indexing root filesystem. Ensure adequate resource guards are in place.");
203 }
204
205 let walker = WalkBuilder::new(&root)
206 .hidden(false)
207 .git_ignore(true)
208 .require_git(false)
209 .add_custom_ignore_filename(".ixignore")
210 .filter_entry(move |entry| {
211 let path = entry.path();
212 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
213
214 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
215 && (name == "lost+found" || name == ".git" || name == "node_modules" ||
216 name == "target" || name == "__pycache__" || name == ".tox" ||
217 name == ".venv" || name == "venv" || name == ".ix")
218 {
219 return false;
220 }
221
222 if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
223 if let Ok(metadata) = entry.metadata()
224 && metadata.len() > 10 * 1024 * 1024
225 {
226 return false;
227 }
228 if name == "Cargo.lock" || name == "package-lock.json" || name == "pnpm-lock.yaml" ||
229 name == "shard.ix" || name == "shard.ix.tmp" || name.starts_with("shard.ix.")
230 {
231 return false;
232 }
233 }
234
235 if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
236 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
237 match ext {
238 "so" | "o" | "dylib" | "a" | "dll" | "exe" | "pyc" |
239 "jpg" | "png" | "gif" | "mp4" | "mp3" | "pdf" |
240 "zip" | "7z" | "rar" |
241 "sqlite" | "db" | "bin" => return false,
242 _ => {}
243 }
244 if name.ends_with(".tar.gz") {
245 return false;
246 }
247 }
248 true
249 })
250 .build();
251
252 let mut files_processed = 0u64;
253 for entry_res in walker {
254 let entry = match entry_res {
255 Ok(e) => e,
256 Err(e) => {
257 let backtrack_path = match &e {
259 ignore::Error::Io(io_err) if io_err.raw_os_error() == Some(-7) => Some(None),
260 ignore::Error::WithPath { path, err } => {
261 if let ignore::Error::Io(io_err) = err.as_ref() {
262 if io_err.raw_os_error() == Some(-7) {
263 Some(Some(path.clone()))
264 } else {
265 None
266 }
267 } else {
268 None
269 }
270 }
271 _ => None,
272 };
273
274 if let Some(path_opt) = backtrack_path {
275 tracing::warn!("Immune Memory Triggered: Skipping path due to backtrack signal.");
276 if let Some(path) = path_opt {
277 self.dead_ends.push(path);
278 }
279 }
280 continue;
281 }
282 };
283
284 if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
285 self.process_file(entry.path().to_owned())?;
286 files_processed += 1;
287
288 if files_processed.is_multiple_of(250) {
290 if let Some(guard) = &self.resource_guard {
291 if guard.check().map(|_s: ::llmosafe::Synapse| ()).is_err() {
292 let _err = guard.check().unwrap_err();
293 eprintln!("ixd: memory ceiling reached... flushing intermediate chunk ({} files processed)", files_processed);
294 self.flush_run()?;
295 continue;
296 }
297 } else {
298 if let Ok(rss) = Self::current_rss_bytes()
300 && rss > 512 * 1024 * 1024
301 {
302 eprintln!("ixd: RSS ceiling reached ({} MB) after {} files — flushing intermediate chunk",
303 rss / 1024 / 1024, files_processed);
304 self.flush_run()?;
305 continue;
306 }
307 }
308 }
309 }
310 }
311
312 let output_path = self.serialize()?;
313 tracing::info!("Build completed in {:?}: {:?}", start.elapsed(), self.stats);
314 Ok(output_path)
315 }
316
317 pub fn update(&mut self, _changed_files: &[PathBuf]) -> Result<PathBuf> {
318 self.build()
319 }
320
321 pub fn files_len(&self) -> usize {
322 self.file_count as usize
323 }
324
325 pub fn trigrams_len(&self) -> usize {
326 self.stats.unique_trigrams as usize
327 }
328
329 fn current_rss_bytes() -> std::io::Result<u64> {
331 let status = std::fs::read_to_string("/proc/self/status")?;
332 for line in status.lines() {
333 if let Some(rest) = line.strip_prefix("VmRSS:") {
334 let kb: u64 = rest.split_whitespace().next()
335 .and_then(|s| s.parse().ok())
336 .unwrap_or(0);
337 return Ok(kb * 1024);
338 }
339 }
340 Ok(0)
341 }
342
343 fn free_bytes_at(path: &Path) -> std::io::Result<u64> {
345 use std::os::unix::ffi::OsStrExt;
346 let path_c = std::ffi::CString::new(path.as_os_str().as_bytes())
347 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
348 let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
349 let ret = unsafe { libc::statvfs(path_c.as_ptr(), &mut stat) };
350 if ret != 0 {
351 return Err(std::io::Error::last_os_error());
352 }
353 Ok(stat.f_bavail * stat.f_frsize)
354 }
355
356 fn process_file(&mut self, path: PathBuf) -> Result<bool> {
357 let metadata = match fs::metadata(&path) {
359 Ok(m) => m,
360 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
361 Err(e) => return Err(e.into()),
362 };
363 let size = metadata.len();
364 let mtime = metadata.modified()?.duration_since(UNIX_EPOCH).map(|d| d.as_nanos() as u64).unwrap_or(0);
365
366 if size > 10 * 1024 * 1024 {
367 self.stats.files_skipped_size += 1;
368 return Ok(false);
369 }
370
371 let file = match File::open(&path) {
372 Ok(f) => f,
373 Err(e) if e.kind() == std::io::ErrorKind::NotFound
374 || e.kind() == std::io::ErrorKind::PermissionDenied => return Ok(false),
375 Err(e) => return Err(e.into()),
376 };
377 let mmap = unsafe { Mmap::map(&file)? };
378
379 let raw_data = if self.decompress {
380 if let Some(mut reader) = maybe_decompress(&path, &mmap)? {
381 let mut buf = Vec::new();
382 use std::io::Read;
383 reader.by_ref().take(10 * 1024 * 1024).read_to_end(&mut buf)?;
384 std::borrow::Cow::Owned(buf)
385 } else {
386 std::borrow::Cow::Borrowed(&mmap[..])
387 }
388 } else {
389 std::borrow::Cow::Borrowed(&mmap[..])
390 };
391
392 let data = &raw_data[..];
393 if is_binary(data) {
394 self.stats.files_skipped_binary += 1;
395 return Ok(false);
396 }
397
398 let sample_len = data.len().min(2048);
401 let sample = String::from_utf8_lossy(&data[..sample_len]);
402 let objective = "High-signal source code for semantic indexing";
403 let sifted = sift_perceptions(&[sample.as_ref()], objective);
404
405 if let Err(e) = self.cognitive_memory.update(sifted) {
406 tracing::warn!("LLMOSafe Cognitive Guard rejection for {}: {:?}", path.display(), e);
407 return Ok(false);
409 }
410
411 let content_hash = xxhash_rust::xxh64::xxh64(data, 0);
412 let pairs = self.extractor.extract_with_offsets(data);
413
414 let file_id = self.file_count;
415 self.file_count += 1;
416
417 let path_str = path.to_string_lossy();
418 let path_bytes = path_str.as_bytes();
419 let path_off = (self.strings_writer.stream_position()?) as u32;
420 let path_len = path_bytes.len() as u16;
421
422 self.strings_writer.write_all(&0u16.to_le_bytes())?;
423 self.strings_writer.write_all(&path_len.to_le_bytes())?;
424 self.strings_writer.write_all(path_bytes)?;
425
426 let mut bloom = BloomFilter::new(256, 5);
427 let mut trigram_count = 0u32;
428
429 let mut i = 0;
430 while i < pairs.len() && trigram_count < 20_000 {
431 let tri = pairs[i].0;
432 let mut j = i + 1;
433 while j < pairs.len() && pairs[j].0 == tri {
434 j += 1;
435 }
436
437 let take_count = (j - i).min(10_000);
438 let offsets: Vec<u32> = pairs[i..i + take_count].iter().map(|p| p.1).collect();
439
440 bloom.insert(tri);
441 self.postings.entry(tri).or_default().push(PostingEntry {
442 file_id,
443 offsets,
444 });
445 self.postings_count += take_count + 8;
446
447 trigram_count += 1;
448 i = j;
449 }
450
451 bloom.serialize(&mut self.blooms_writer)?;
452
453 let bloom_offset = file_id * 260;
454 self.files_writer.write_all(&file_id.to_le_bytes())?;
455 self.files_writer.write_all(&path_off.to_le_bytes())?;
456 self.files_writer.write_all(&path_len.to_le_bytes())?;
457 self.files_writer.write_all(&[FileStatus::Fresh as u8])?;
458 self.files_writer.write_all(&[0u8])?;
459 self.files_writer.write_all(&mtime.to_le_bytes())?;
460 self.files_writer.write_all(&size.to_le_bytes())?;
461 self.files_writer.write_all(&content_hash.to_le_bytes())?;
462 self.files_writer.write_all(&trigram_count.to_le_bytes())?;
463 self.files_writer.write_all(&bloom_offset.to_le_bytes())?;
464 self.files_writer.write_all(&[0u8; 4])?;
465
466 self.stats.files_scanned += 1;
467 self.stats.bytes_scanned += size;
468
469 if self.postings_count >= 500_000 {
472 self.flush_run()?;
473 }
474
475 Ok(true)
476 }
477
478 fn serialize(&mut self) -> Result<PathBuf> {
479 if let Ok(free) = Self::free_bytes_at(&self.ix_dir) {
481 const MIN_FREE: u64 = 100 * 1024 * 1024; if free < MIN_FREE {
483 return Err(crate::error::Error::Io(std::io::Error::other(
484 format!(
485 "insufficient disk space: {} MB free, need ≥100 MB (path: {})",
486 free / 1024 / 1024,
487 self.ix_dir.display()
488 ),
489 )));
490 }
491 }
492 self.flush_run()?;
493
494 self.files_writer.flush()?;
495 self.blooms_writer.flush()?;
496 self.strings_writer.flush()?;
497
498 while self.temp_runs.len() > 128 {
500 let mut next_generation = Vec::new();
501 for chunk in self.temp_runs.chunks(128) {
502 let out_path = self.ix_dir.join(format!("shard.ix.merged.{}.{}", next_generation.len(), SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros()));
503 self.merge_to_run(chunk, &out_path)?;
504 next_generation.push(out_path);
505 for p in chunk { let _ = fs::remove_file(p); }
506 }
507 self.temp_runs = next_generation;
508 }
509
510 let tmp_path = self.ix_dir.join("shard.ix.tmp");
511 let final_path = self.ix_dir.join("shard.ix");
512 let temp_trigrams_path = self.ix_dir.join("shard.ix.tmp.trigrams");
513
514 let mut f = BufWriter::new(File::create(&tmp_path)?);
515 f.write_all(&[0u8; HEADER_SIZE])?;
516
517 let file_table_offset = self.align_to_8(&mut f)?;
518 let mut files_reader = File::open(self.ix_dir.join("shard.ix.tmp.files"))?;
519 std::io::copy(&mut files_reader, &mut f)?;
520 let file_table_size = f.stream_position()? - file_table_offset;
521
522 self.align_to_8(&mut f)?;
523 let posting_data_offset = f.stream_position()?;
524
525 let mut trigram_table_writer = BufWriter::new(File::create(&temp_trigrams_path)?);
526 let mut global_trigram_count = 0u32;
527
528 let mut runs = Vec::new();
529 for path in &self.temp_runs {
530 runs.push(RunIterator::new(path)?);
531 }
532
533 let mut heap = BinaryHeap::new();
534 let mut current_items = vec![None; runs.len()];
535
536 for (i, run) in runs.iter_mut().enumerate() {
537 if let Some(item) = run.next_trigram()? {
538 heap.push(MergeItem { tri: item.0, run_idx: i });
539 current_items[i] = Some(item);
540 }
541 }
542
543 let mut current_tri: Option<Trigram> = None;
544 let mut merged_entries: Vec<PostingEntry> = Vec::new();
545
546 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
547 if Some(tri) != current_tri {
548 if let Some(t) = current_tri {
549 self.write_merged_posting(&mut f, &mut trigram_table_writer, t, posting_data_offset, &mut merged_entries)?;
550 global_trigram_count += 1;
551 merged_entries.clear();
552 }
553 current_tri = Some(tri);
554 }
555
556 let item = current_items[run_idx].take().unwrap();
557 merged_entries.extend(item.1);
558
559 if let Some(next_item) = runs[run_idx].next_trigram()? {
560 heap.push(MergeItem { tri: next_item.0, run_idx });
561 current_items[run_idx] = Some(next_item);
562 }
563 }
564
565 if let Some(t) = current_tri {
566 self.write_merged_posting(&mut f, &mut trigram_table_writer, t, posting_data_offset, &mut merged_entries)?;
567 global_trigram_count += 1;
568 }
569
570 self.stats.unique_trigrams = global_trigram_count as u64;
571 let posting_data_size = f.stream_position()? - posting_data_offset;
572
573 self.align_to_8(&mut f)?;
574 let trigram_table_offset = f.stream_position()?;
575 trigram_table_writer.flush()?;
576 drop(trigram_table_writer);
577
578 let mut trigram_table_file = File::open(&temp_trigrams_path)?;
579 std::io::copy(&mut trigram_table_file, &mut f)?;
580 let trigram_table_size = f.stream_position()? - trigram_table_offset;
581
582 self.align_to_8(&mut f)?;
583 let bloom_offset = f.stream_position()?;
584 let mut blooms_reader = File::open(self.ix_dir.join("shard.ix.tmp.blooms"))?;
585 std::io::copy(&mut blooms_reader, &mut f)?;
586 let bloom_size = f.stream_position()? - bloom_offset;
587
588 self.align_to_8(&mut f)?;
589 let string_pool_offset = f.stream_position()?;
590 let mut strings_reader = File::open(self.ix_dir.join("shard.ix.tmp.strings"))?;
591 std::io::copy(&mut strings_reader, &mut f)?;
592 let string_pool_size = f.stream_position()? - string_pool_offset;
593
594 let name_index_offset = f.stream_position()?;
595 let name_index_size = 0u64;
596
597 let created_at = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as u64;
598 let mut header_bytes = [0u8; HEADER_SIZE];
599 header_bytes[0..4].copy_from_slice(&MAGIC);
600 header_bytes[0x04..0x06].copy_from_slice(&VERSION_MAJOR.to_le_bytes());
601 header_bytes[0x06..0x08].copy_from_slice(&VERSION_MINOR.to_le_bytes());
602 header_bytes[0x08..0x10].copy_from_slice(&(flags::HAS_BLOOM_FILTERS | flags::HAS_CONTENT_HASHES | flags::POSTING_LISTS_CHECKSUMMED).to_le_bytes());
603 header_bytes[0x10..0x18].copy_from_slice(&created_at.to_le_bytes());
604 header_bytes[0x18..0x20].copy_from_slice(&self.stats.bytes_scanned.to_le_bytes());
605 header_bytes[0x20..0x24].copy_from_slice(&self.file_count.to_le_bytes());
606 header_bytes[0x24..0x28].copy_from_slice(&(global_trigram_count).to_le_bytes());
607 header_bytes[0x28..0x30].copy_from_slice(&file_table_offset.to_le_bytes());
608 header_bytes[0x30..0x38].copy_from_slice(&file_table_size.to_le_bytes());
609 header_bytes[0x38..0x40].copy_from_slice(&trigram_table_offset.to_le_bytes());
610 header_bytes[0x40..0x48].copy_from_slice(&trigram_table_size.to_le_bytes());
611 header_bytes[0x48..0x50].copy_from_slice(&posting_data_offset.to_le_bytes());
612 header_bytes[0x50..0x58].copy_from_slice(&posting_data_size.to_le_bytes());
613 header_bytes[0x58..0x60].copy_from_slice(&bloom_offset.to_le_bytes());
614 header_bytes[0x60..0x68].copy_from_slice(&bloom_size.to_le_bytes());
615 header_bytes[0x68..0x70].copy_from_slice(&string_pool_offset.to_le_bytes());
616 header_bytes[0x70..0x78].copy_from_slice(&string_pool_size.to_le_bytes());
617 header_bytes[0x78..0x80].copy_from_slice(&name_index_offset.to_le_bytes());
618 header_bytes[0x80..0x88].copy_from_slice(&name_index_size.to_le_bytes());
619
620 let crc = crc32c::crc32c(&header_bytes[0..0xF8]);
621 header_bytes[0xF8..0xFC].copy_from_slice(&crc.to_le_bytes());
622
623 f.seek(SeekFrom::Start(0))?;
624 f.write_all(&header_bytes)?;
625 f.flush()?;
626 drop(f);
627
628 fs::rename(&tmp_path, &final_path)?;
629
630 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.files"));
631 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.blooms"));
632 let _ = fs::remove_file(self.ix_dir.join("shard.ix.tmp.strings"));
633 let _ = fs::remove_file(&temp_trigrams_path);
634 for path in &self.temp_runs { let _ = fs::remove_file(path); }
635 self.temp_runs.clear();
636
637 Ok(final_path)
638 }
639
640 fn merge_to_run(&self, run_paths: &[PathBuf], out_path: &Path) -> Result<()> {
641 let mut runs = Vec::new();
642 for path in run_paths { runs.push(RunIterator::new(path)?); }
643 let mut heap = BinaryHeap::new();
644 let mut current_items = vec![None; runs.len()];
645 for (i, run) in runs.iter_mut().enumerate() {
646 if let Some(item) = run.next_trigram()? {
647 heap.push(MergeItem { tri: item.0, run_idx: i });
648 current_items[i] = Some(item);
649 }
650 }
651 let mut out = BufWriter::new(File::create(out_path)?);
652 let mut current_tri: Option<Trigram> = None;
653 let mut merged_entries: Vec<PostingEntry> = Vec::new();
654 while let Some(MergeItem { tri, run_idx }) = heap.pop() {
655 if Some(tri) != current_tri {
656 if let Some(t) = current_tri {
657 self.write_run_entry(&mut out, t, &mut merged_entries)?;
658 merged_entries.clear();
659 }
660 current_tri = Some(tri);
661 }
662 let item = current_items[run_idx].take().unwrap();
663 merged_entries.extend(item.1);
664 if let Some(next_item) = runs[run_idx].next_trigram()? {
665 heap.push(MergeItem { tri: next_item.0, run_idx });
666 current_items[run_idx] = Some(next_item);
667 }
668 }
669 if let Some(t) = current_tri { self.write_run_entry(&mut out, t, &mut merged_entries)?; }
670 out.flush()?;
671 Ok(())
672 }
673
674 fn write_run_entry<W: Write>(&self, w: &mut W, tri: Trigram, entries: &mut [PostingEntry]) -> Result<()> {
675 entries.sort_by_key(|e| e.file_id);
676 w.write_all(&tri.to_le_bytes())?;
677 w.write_all(&(entries.len() as u32).to_le_bytes())?;
678 for entry in entries {
679 w.write_all(&entry.file_id.to_le_bytes())?;
680 w.write_all(&(entry.offsets.len() as u32).to_le_bytes())?;
681 for off in &entry.offsets { w.write_all(&off.to_le_bytes())?; }
682 }
683 Ok(())
684 }
685
686 fn write_merged_posting<W: Write + Seek>(&self, f: &mut W, table: &mut W, tri: Trigram, base_off: u64, entries: &mut [PostingEntry]) -> Result<()> {
687 entries.sort_by_key(|e| e.file_id);
688 let count = entries.len() as u32;
689 let list = PostingList { entries: entries.to_vec() };
690 let encoded = list.encode();
691 let offset = f.stream_position()? - base_off;
692 f.write_all(&encoded)?;
693 let abs_off = base_off + offset;
694 table.write_all(&tri.to_le_bytes())?;
695 table.write_all(&abs_off.to_le_bytes()[..6])?;
696 table.write_all(&(encoded.len() as u32).to_le_bytes())?;
697 table.write_all(&count.to_le_bytes())?;
698 table.write_all(&[0u8; 2])?;
699 Ok(())
700 }
701
702 fn align_to_8<W: Write + Seek>(&self, mut w: W) -> std::io::Result<u64> {
703 let pos = w.stream_position()?;
704 let padding = (8 - (pos % 8)) % 8;
705 if padding > 0 { w.write_all(&vec![0u8; padding as usize])?; }
706 w.stream_position()
707 }
708}