1use std::{
2 collections::{BTreeMap, HashMap},
3 path::{Path, PathBuf},
4 sync::{
5 Arc, RwLock,
6 atomic::{AtomicU64, Ordering},
7 },
8 thread::JoinHandle,
9};
10
11use bstr::ByteSlice;
12use fst::{Automaton as _, IntoStreamer as _, Streamer, automaton::Str};
13
14use thiserror::Error;
15
16mod common;
17pub use common::Kind;
18mod entry;
19pub use entry::FilesystemEntry;
20use entry::*;
21mod segmented_index;
22pub use segmented_index::compactor::*;
23use segmented_index::*;
24mod opstamp;
25use opstamp::*;
26use wal::Wal;
27mod search;
28mod tokenizer;
29mod wal;
30pub use search::{ScoringConfig, SearchOptions, SearchResult};
31
32pub struct Index {
36 path: PathBuf,
37 base: Arc<RwLock<SegmentedIndex>>,
38 next_op_seq: Arc<AtomicU64>,
39 mem_idx: RwLock<BTreeMap<String, (String, IndexEntry)>>,
40 wal: RwLock<Wal>,
41 compactor_config: segmented_index::compactor::CompactorConfig,
42 compactor: Arc<RwLock<Option<JoinHandle<()>>>>,
43 flusher: Arc<RwLock<Option<JoinHandle<()>>>>,
44 prefix_tombstones: Arc<RwLock<Vec<(String, u64)>>>,
45}
46
47impl Index {
48 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, IndexError> {
54 Self::open_with_config(path, CompactorConfig::default())
55 }
56
57 pub fn open_with_config<P: AsRef<Path>>(
63 path: P,
64 compactor_config: CompactorConfig,
65 ) -> Result<Self, IndexError> {
66 let base = SegmentedIndex::open(&path).map_err(IndexError::SegmentedIndex)?;
67
68 let base = Arc::new(RwLock::new(base));
69 let mut max_seq = 0u64;
70 let mut mem_idx = BTreeMap::new();
71
72 let mut prefix_tombstones = Vec::new();
73
74 let mut apply_replay = |replay_data: crate::wal::ReplayData| {
75 for (path, volume, entry) in replay_data.inserts {
76 max_seq = max_seq.max(entry.opstamp.sequence());
77 mem_idx.insert(path, (volume, entry));
78 }
79 for (prefix, seq) in replay_data.tombstones {
80 max_seq = max_seq.max(seq);
81 prefix_tombstones.push((prefix, seq));
82 }
83 };
84
85 let entries = path.as_ref().read_dir().map_err(IndexError::Io)?;
86
87 let mut flushing_wals = Vec::new();
88
89 for entry in entries {
91 if let Ok(e) = entry
92 && let Ok(file_type) = e.file_type()
93 && file_type.is_file()
94 && e.file_name().to_string_lossy().ends_with(".flushing.wal")
95 {
96 flushing_wals.push(e.path());
97 }
98 }
99 flushing_wals.sort_unstable();
100
101 for wal_path in flushing_wals {
102 let partial = Wal::replay(wal_path).map_err(IndexError::Io)?;
103
104 apply_replay(partial);
105 }
106
107 let wal_path = path.as_ref().join("journal.wal");
108
109 let recovered = Wal::replay(&wal_path).map_err(IndexError::Io)?;
110 apply_replay(recovered);
111
112 let next_op_seq = Arc::new(AtomicU64::new(max_seq + 1));
113
114 let wal = Wal::open(&wal_path).map_err(IndexError::Io)?;
115
116 Ok(Self {
117 path: path.as_ref().to_path_buf(),
118 base,
119 next_op_seq,
120 mem_idx: RwLock::new(mem_idx),
121 wal: RwLock::new(wal),
122 compactor_config,
123 compactor: Arc::new(RwLock::new(None)),
124 flusher: Arc::new(RwLock::new(None)),
125 prefix_tombstones: Arc::new(RwLock::new(prefix_tombstones)),
126 })
127 }
128
129 fn next_op_seq(&self) -> u64 {
130 self.next_op_seq
131 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
132 }
133
134 pub fn insert(&self, item: FilesystemEntry) -> Result<(), IndexError> {
136 let seq = self.next_op_seq();
137 let path_str = item.path.to_string_lossy().to_string();
138 let volume = item.volume;
139 let entry = IndexEntry {
140 opstamp: Opstamp::insertion(seq),
141 kind: item.kind,
142 last_modified: item.last_modified,
143 last_accessed: item.last_accessed,
144 };
145
146 {
147 let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
148 wal.append(&path_str, &volume, &entry)
149 .map_err(IndexError::Io)?;
150 }
151
152 {
153 self.mem_idx
154 .write()
155 .map_err(|_| IndexError::WriteLock)?
156 .insert(path_str, (volume, entry));
157 }
158
159 if self.should_flush() {
160 let _ = self.trigger_flush();
161 }
162
163 Ok(())
164 }
165
166 pub fn delete(&self, item: &Path) -> Result<(), IndexError> {
167 let seq = self.next_op_seq();
168
169 let path_str = item.to_string_lossy().to_string();
170 let entry = IndexEntry {
171 opstamp: Opstamp::deletion(seq),
172 kind: Kind::File,
173 last_modified: 0,
174 last_accessed: 0,
175 };
176
177 {
178 let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
179 wal.append(&path_str, "", &entry).map_err(IndexError::Io)?;
180 }
181
182 {
183 self.mem_idx
184 .write()
185 .map_err(|_| IndexError::WriteLock)?
186 .insert(path_str, ("".to_owned(), entry));
187 }
188
189 if self.should_flush() {
190 let _ = self.trigger_flush();
191 }
192
193 Ok(())
194 }
195
196 pub fn delete_prefix(&self, prefix: &str) -> Result<(), IndexError> {
197 let seq = self.next_op_seq.fetch_add(1, Ordering::SeqCst);
198 let prefix_lower = prefix.to_lowercase();
199 {
200 let mut tombstones = self
201 .prefix_tombstones
202 .write()
203 .map_err(|_| IndexError::WriteLock)?;
204 tombstones.push((prefix_lower.clone(), seq));
205 }
206
207 {
208 let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
209
210 wal.write_prefix_tombstone(&prefix_lower, seq)?;
211 }
212
213 Ok(())
214 }
215
216 pub fn sync(&self) -> Result<(), IndexError> {
219 let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
220 wal.flush().map_err(IndexError::Io)?;
221
222 Ok(())
223 }
224
225 pub fn search(
228 &self,
229 query: &str,
230 limit: usize,
231 offset: usize,
232 options: SearchOptions<'_>,
233 ) -> Result<Vec<SearchResult>, IndexError> {
234 let mut tokens = crate::tokenizer::tokenize(query);
235
236 if tokens.is_empty() {
237 return Ok(Vec::new());
238 }
239
240 tokens.sort_by_key(|b| std::cmp::Reverse(b.len()));
241
242 let segments = self.base.read().map_err(|_| IndexError::ReadLock)?;
243 let mem = self.mem_idx.read().map_err(|_| IndexError::ReadLock)?;
244
245 let mut candidates: HashMap<String, (String, IndexEntry)> = HashMap::new();
246
247 let required_matches = limit + offset;
248 let scoring_cap = std::cmp::max(500, required_matches * 3).min(1000);
249
250 let short_circuit_threshold = std::cmp::max(5000, required_matches * 10);
251
252 let active_tombstones = self
253 .prefix_tombstones
254 .read()
255 .map_err(|_| IndexError::ReadLock)?
256 .clone();
257
258 for (path, (volume, entry)) in mem.iter() {
259 let path_bytes = path.as_bytes();
260
261 if is_tombstoned(path_bytes, entry.opstamp.sequence(), &active_tombstones) {
262 continue;
263 }
264
265 if let Some(filter) = options.volume_filter {
266 if volume != filter {
267 continue;
268 }
269 }
270 let matches_all = tokens
271 .iter()
272 .all(|t| path_bytes.find_iter(t.as_bytes()).next().is_some());
273 if matches_all {
274 candidates
275 .entry(path.clone())
276 .and_modify(|(current_volume, current_entry)| {
277 if entry.opstamp.sequence() > current_entry.opstamp.sequence() {
278 *current_entry = *entry;
279 *current_volume = volume.clone();
280 }
281 })
282 .or_insert((volume.clone(), *entry));
283 }
284 }
285
286 for segment in segments.segments() {
287 let mut segment_doc_matches: Option<Vec<DocumentId>> =
288 if let Some(vol) = options.volume_filter {
289 let vol_token = crate::tokenizer::synthesize_volume_token(&vol.to_lowercase());
290 let map = segment.as_ref().as_ref();
291 match map.get(&vol_token) {
292 Some(post_offset) => {
293 let mut docs = segment.read_posting_list(post_offset);
294 docs.sort_unstable();
295 docs.dedup();
296 Some(docs)
297 }
298 None => continue, }
300 } else {
301 None
302 };
303
304 for token in &tokens {
305 if let Some(existing) = &segment_doc_matches
306 && existing.len() <= short_circuit_threshold
307 {
308 break;
309 }
310 let matcher = Str::new(token).starts_with();
311
312 let mut token_docs = Vec::new();
313 let map = segment.as_ref().as_ref();
314 let mut stream = map.search(&matcher).into_stream();
315
316 while let Some((_, post_offset)) = stream.next() {
317 let docs = segment.read_posting_list(post_offset);
318 token_docs.extend(docs);
319
320 if segment_doc_matches.is_none() && token_docs.len() > short_circuit_threshold {
321 break;
322 }
323 }
324
325 token_docs.sort_unstable();
326 token_docs.dedup();
327
328 if let Some(mut existing) = segment_doc_matches {
329 existing.retain(|doc_id| token_docs.binary_search(doc_id).is_ok());
330 segment_doc_matches = Some(existing);
331 } else {
332 segment_doc_matches = Some(token_docs);
333 }
334
335 if segment_doc_matches.as_ref().is_some_and(|m| m.is_empty()) {
336 break;
337 }
338 }
339
340 if let Some(valid_docs) = segment_doc_matches {
341 let mut enriched_docs: Vec<u128> = Vec::with_capacity(valid_docs.len());
342 let meta_mmap = segment.meta_map();
343
344 for &doc_id in &valid_docs {
345 let byte_offset = (doc_id as usize) * size_of::<u128>();
346 let packed_bytes: [u8; 16] = meta_mmap
347 [byte_offset..byte_offset + size_of::<u128>()]
348 .try_into()
349 .expect("failed to unpack");
350 let packed_val = u128::from_le_bytes(packed_bytes);
351
352 enriched_docs.push(packed_val);
361 }
362
363 enriched_docs.sort_unstable_by(|&a, &b| {
364 let (_, a_modified_at, a_depth, a_dir) = SegmentedIndex::unpack_u128(a);
365 let (_, b_modified_at, b_depth, b_dir) = SegmentedIndex::unpack_u128(b);
366
367 b_dir
368 .cmp(&a_dir)
369 .then_with(|| a_depth.cmp(&b_depth))
370 .then_with(|| b_modified_at.cmp(&a_modified_at))
371 });
372
373 enriched_docs.truncate(scoring_cap);
374
375 for packed_val in enriched_docs {
376 let (dat_offset, _, _, _) = SegmentedIndex::unpack_u128(packed_val);
377
378 if let Some((path, volume, entry)) = segment.read_document(dat_offset) {
379 let path_bytes = path.as_bytes();
380
381 if is_tombstoned(path_bytes, entry.opstamp.sequence(), &active_tombstones) {
382 continue;
383 }
384
385 let matches_all = tokens
386 .iter()
387 .all(|t| path_bytes.find_iter(t.as_bytes()).next().is_some());
388
389 if !matches_all {
390 continue;
391 }
392 candidates
393 .entry(path)
394 .and_modify(|(current_volume, current_entry)| {
395 if entry.opstamp.sequence() > current_entry.opstamp.sequence() {
396 *current_entry = entry;
397 *current_volume = volume.clone();
398 }
399 })
400 .or_insert((volume, entry));
401 }
402 }
403 }
404 }
405
406 let mut results: Vec<_> = candidates
407 .into_iter()
408 .filter(|(_, (_, entry))| !entry.opstamp.is_deletion())
409 .map(|(path, (volume, entry))| (path, volume, entry))
410 .collect();
411
412 if results.len() > scoring_cap {
414 results.select_nth_unstable_by(scoring_cap, |a, b| {
415 b.2.last_modified.cmp(&a.2.last_modified)
416 });
417 results.truncate(scoring_cap);
418 }
419
420 let now_micros = std::time::SystemTime::now()
421 .duration_since(std::time::UNIX_EPOCH)
422 .expect("failed to get system time")
423 .as_micros() as f64;
424
425 let config = if let Some(config) = options.scoring {
426 config
427 } else {
428 &ScoringConfig::default()
429 };
430
431 let mut scored: Vec<_> = results
432 .into_iter()
433 .map(|(path, volume, entry)| {
434 let score = crate::search::compute_score(
435 config,
436 &path,
437 &tokens,
438 entry.last_modified,
439 entry.kind,
440 now_micros,
441 );
442 SearchResult {
443 path: PathBuf::from(path),
444 volume: volume,
445 kind: entry.kind,
446 last_modified: entry.last_modified,
447 last_accessed: entry.last_accessed,
448 score,
449 }
450 })
451 .collect();
452
453 scored.sort();
454
455 let paginated_results = scored.into_iter().skip(offset).take(limit).collect();
456
457 Ok(paginated_results)
458 }
459
460 pub fn force_compact_all(&self) -> Result<(), IndexError> {
464 if let Ok(mut flusher) = self.flusher.write()
465 && let Some(handle) = flusher.take()
466 {
467 log::debug!("Waiting for background flush to finish...");
468 let _ = handle.join();
469 }
470
471 if let Ok(mut compactor) = self.compactor.write()
472 && let Some(handle) = compactor.take()
473 {
474 log::debug!("Waiting for background compactor to finish...");
475 let _ = handle.join();
476 }
477
478 let snapshot = {
479 let base = self.base.read().map_err(|_| IndexError::ReadLock)?;
480 let segments = base.snapshot();
481
482 if segments.len() <= 1 {
484 log::debug!("Database is already fully compacted.");
485 return Ok(());
486 }
487 segments
488 };
489
490 log::debug!("Forcing full compaction of {} segments...", snapshot.len());
491
492 let compactor_seq = self.next_op_seq.fetch_add(1, Ordering::SeqCst);
493
494 let tmp_path = self.path.join(format!("{}.tmp", compactor_seq));
495
496 let snapshot_tombstones = {
497 let guard = self.prefix_tombstones.read().expect("lock poisoned");
498 guard.clone()
499 };
500
501 compactor::merge_segments(&snapshot, snapshot_tombstones, tmp_path.clone())
502 .map_err(|e| IndexError::Io(std::io::Error::other(e)))?;
503
504 let mut base_guard = self.base.write().map_err(|_| IndexError::WriteLock)?;
505 base_guard
506 .apply_compaction(&snapshot, tmp_path)
507 .map_err(|e| IndexError::Io(std::io::Error::other(e)))?;
508
509 log::debug!("Full compaction complete");
510 Ok(())
511 }
512
513 fn should_flush(&self) -> bool {
514 self.mem_idx.read().unwrap().len() > self.compactor_config.flush_threshold
515 || self.prefix_tombstones.read().unwrap().len()
516 > self.compactor_config.tombstone_threshold
517 }
518
519 fn trigger_flush(&self) -> Result<(), IndexError> {
520 if let Some(ref flusher) = *self.flusher.read().expect("failed to read flusher")
521 && !flusher.is_finished()
522 {
523 return Ok(());
524 }
525 let mut mem = self.mem_idx.write().expect("failed to lock memory");
526 let mut wal = self.wal.write().expect("failed to lock wal");
527
528 if mem.is_empty() {
529 return Ok(());
530 }
531
532 let snapshot = std::mem::take(&mut *mem);
533 let path = self.path.clone();
534 let next_seq = self.next_op_seq();
535
536 let flushing_path = path.join(format!("journal.{}.flushing.wal", next_seq));
537 wal.rotate(&flushing_path).map_err(IndexError::Io)?;
538
539 let tombstones = self
541 .prefix_tombstones
542 .read()
543 .map_err(|_| IndexError::ReadLock)?;
544 for (prefix, seq) in tombstones.iter() {
545 wal.write_prefix_tombstone(prefix, *seq)?;
546 }
547
548 drop(tombstones);
549 drop(wal);
550 drop(mem);
551
552 let base = Arc::clone(&self.base);
553 let min_merge_count = self.compactor_config.min_merge_count;
554 let compactor_lock = Arc::clone(&self.compactor);
555 let op_seq = Arc::clone(&self.next_op_seq);
556 let prefix_tombstones = Arc::clone(&self.prefix_tombstones);
557
558 let flusher = std::thread::Builder::new()
559 .name("minidex-flush".to_owned())
560 .spawn(move || {
561 let final_segment_path = path.join(format!("{}", next_seq));
562 let tmp_segment_path = path.join(format!("{}.tmp", next_seq));
563
564 {
565 let mut base_guard = base.write().expect("failed to lock base");
566
567 if let Err(e) = base_guard.write_segment(
568 &tmp_segment_path,
569 snapshot
570 .into_iter()
571 .map(|(path, (volume, entry))| (path, volume, entry)),
572 ) {
573 log::error!("flush failed to write: {}", e);
574 let tmp_paths = Segment::paths_with_additional_extension(&tmp_segment_path);
575 Segment::remove_files(&tmp_paths);
576 return;
577 }
578
579 let tmp_paths = Segment::paths_with_additional_extension(&tmp_segment_path);
580
581 let final_paths = Segment::paths_with_additional_extension(&final_segment_path);
582
583 let _ = Segment::rename_files(&tmp_paths, &final_paths);
584 base_guard
585 .load(&final_segment_path)
586 .expect("failed to reload segment during flush");
587 }
588
589 if let Err(e) = std::fs::remove_file(&flushing_path) {
590 log::error!("failed to delete rotated WAL: {}", e);
591 }
592
593 let snapshot = {
594 let base = base.read().expect("failed to read-lock base");
595 if base.segments().count() <= min_merge_count {
596 return;
597 }
598
599 base.snapshot()
600 };
601
602 let mut compactor_guard = compactor_lock
603 .write()
604 .expect("failed to acquire compactor write-lock");
605 if let Some(handle) = compactor_guard.as_ref()
606 && !handle.is_finished()
607 {
608 return;
609 }
610
611 *compactor_guard = Self::compact(base, path, snapshot, prefix_tombstones, op_seq);
612 })
613 .map_err(IndexError::Io)?;
614
615 *self.flusher.write().unwrap() = Some(flusher);
616 Ok(())
617 }
618
619 fn compact(
620 base: Arc<RwLock<SegmentedIndex>>,
621 path: PathBuf,
622 snapshot: Vec<Arc<Segment>>,
623 prefix_tombstones: Arc<RwLock<Vec<(String, u64)>>>,
624 next_op_seq: Arc<AtomicU64>,
625 ) -> Option<JoinHandle<()>> {
626 if snapshot.is_empty() {
627 return None;
628 }
629
630 std::thread::Builder::new()
631 .name("minidex-compactor".to_string())
632 .spawn(move || {
633 let next_seq = next_op_seq.fetch_add(1, Ordering::SeqCst);
634 let tmp_path = path.join(format!("{}.tmp", next_seq));
635
636 log::debug!("Starting compaction with {} segments", snapshot.len());
637 let snapshot_tombstones = { prefix_tombstones.read().unwrap().clone() };
638 match compactor::merge_segments(&snapshot, snapshot_tombstones, tmp_path.clone()) {
639 Ok(compactor_seq) => {
640 let mut base_guard = base
641 .write()
642 .expect("failed to lock base for compaction apply");
643 if let Err(e) = base_guard.apply_compaction(&snapshot, tmp_path) {
644 log::error!("Failed to apply compaction: {}", e);
645 }
646 let mut tombstones = prefix_tombstones.write().unwrap();
647 tombstones.retain(|(_, seq)| *seq >= compactor_seq);
648 log::debug!("Compaction finished");
649 }
650 Err(e) => log::error!("Compaction failed: {}", e),
651 }
652 })
653 .ok()
654 }
655}
656
657impl Drop for Index {
658 fn drop(&mut self) {
659 let _ = self.sync();
660
661 if let Ok(mut flusher) = self.flusher.write()
662 && let Some(flusher) = flusher.take()
663 {
664 let _ = flusher.join();
665 }
666
667 if let Ok(mut compactor) = self.compactor.write()
668 && let Some(compactor) = compactor.take()
669 {
670 let _ = compactor.join();
671 }
672 }
673}
674
675#[derive(Debug, Error)]
676pub enum IndexError {
677 #[error("failed to open index on disk: {0}")]
678 Open(std::io::Error),
679 #[error("failed to read lock data")]
680 ReadLock,
681 #[error("failed to write lock data")]
682 WriteLock,
683 #[error(transparent)]
684 SegmentedIndex(SegmentedIndexError),
685 #[error("failed to compile matching regex: {0}")]
686 Regex(String),
687 #[error("io error: {0}")]
688 Io(#[from] std::io::Error),
689}
690
691#[inline]
692pub(crate) fn is_tombstoned(
693 path_bytes: &[u8],
694 sequence: u64,
695 active_tombstones: &[(String, u64)],
696) -> bool {
697 active_tombstones.iter().any(|(prefix, stamp)| {
698 let prefix_bytes = prefix.as_bytes();
699 path_bytes.len() >= prefix_bytes.len()
700 && path_bytes[..prefix_bytes.len()].eq_ignore_ascii_case(prefix_bytes)
701 && sequence < *stamp
702 })
703}