1use std::{
2 collections::BTreeMap,
3 path::{Path, PathBuf},
4 sync::{
5 Arc, RwLock,
6 atomic::{AtomicU64, Ordering},
7 },
8 thread::JoinHandle,
9};
10
11use common::is_tombstoned;
12use fst::{Automaton as _, IntoStreamer as _, Streamer, automaton::Str};
13
14use thiserror::Error;
15
16mod collector;
17mod common;
18mod leb128;
19use collector::*;
20pub use common::{Kind, VolumeType, category};
21mod entry;
22pub use entry::FilesystemEntry;
23use entry::*;
24mod segmented_index;
25pub use segmented_index::compactor::*;
26use segmented_index::*;
27mod opstamp;
28use opstamp::*;
29use wal::Wal;
30mod search;
31mod tokenizer;
32pub use tokenizer::tokenize;
33mod wal;
34pub use search::{ScoringConfig, ScoringInputs, ScoringWeights, SearchOptions, SearchResult};
35
36pub type Tombstone = (Option<String>, String, u64);
37
38pub struct Index {
42 path: PathBuf,
43 base: Arc<RwLock<SegmentedIndex>>,
44 next_op_seq: Arc<AtomicU64>,
45 mem_idx: RwLock<BTreeMap<String, (String, IndexEntry)>>,
46 wal: RwLock<Wal>,
47 compactor_config: segmented_index::compactor::CompactorConfig,
48 compactor: Arc<RwLock<Option<JoinHandle<()>>>>,
49 flusher: Arc<RwLock<Option<JoinHandle<()>>>>,
50 prefix_tombstones: Arc<RwLock<Arc<Vec<Tombstone>>>>,
51}
52
53impl Index {
54 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, IndexError> {
60 Self::open_with_config(path, CompactorConfig::default())
61 }
62
63 pub fn open_with_config<P: AsRef<Path>>(
69 path: P,
70 compactor_config: CompactorConfig,
71 ) -> Result<Self, IndexError> {
72 let base = SegmentedIndex::open(&path).map_err(IndexError::SegmentedIndex)?;
73
74 let base = Arc::new(RwLock::new(base));
75 let mut max_seq = 0u64;
76 let mut mem_idx = BTreeMap::new();
77
78 let mut prefix_tombstones = Vec::new();
79
80 let mut apply_replay = |replay_data: crate::wal::ReplayData| {
81 for (path, volume, entry) in replay_data.inserts {
82 max_seq = max_seq.max(entry.opstamp.sequence());
83 mem_idx.insert(path, (volume, entry));
84 }
85 for (volume, prefix, seq) in replay_data.tombstones {
86 max_seq = max_seq.max(seq);
87 prefix_tombstones.push((volume, prefix, seq));
88 }
89 };
90
91 let entries = path.as_ref().read_dir().map_err(IndexError::Io)?;
92
93 let mut flushing_wals = Vec::new();
94
95 for entry in entries {
97 if let Ok(e) = entry
98 && let Ok(file_type) = e.file_type()
99 && file_type.is_file()
100 && e.file_name().to_string_lossy().ends_with(".flushing.wal")
101 {
102 flushing_wals.push(e.path());
103 }
104 }
105 flushing_wals.sort_unstable();
106
107 for wal_path in flushing_wals {
108 let partial = Wal::replay(wal_path).map_err(IndexError::Io)?;
109
110 apply_replay(partial);
111 }
112
113 let wal_path = path.as_ref().join("journal.wal");
114
115 let recovered = Wal::replay(&wal_path).map_err(IndexError::Io)?;
116 apply_replay(recovered);
117
118 let next_op_seq = Arc::new(AtomicU64::new(max_seq + 1));
119
120 let wal = Wal::open(&wal_path).map_err(IndexError::Io)?;
121
122 Ok(Self {
123 path: path.as_ref().to_path_buf(),
124 base,
125 next_op_seq,
126 mem_idx: RwLock::new(mem_idx),
127 wal: RwLock::new(wal),
128 compactor_config,
129 compactor: Arc::new(RwLock::new(None)),
130 flusher: Arc::new(RwLock::new(None)),
131 prefix_tombstones: Arc::new(RwLock::new(Arc::new(prefix_tombstones))),
132 })
133 }
134
135 fn next_op_seq(&self) -> u64 {
136 self.next_op_seq
137 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
138 }
139
140 pub fn insert(&self, item: FilesystemEntry) -> Result<(), IndexError> {
142 let threshold = self.compactor_config.flush_threshold * 3;
143
144 if self.mem_idx.read().map_err(|_| IndexError::ReadLock)?.len() > threshold {
147 let flusher = {
148 self.flusher
149 .write()
150 .map_err(|_| IndexError::WriteLock)?
151 .take()
152 };
153
154 if let Some(handle) = flusher {
155 let _ = handle.join();
156 }
157
158 let _ = self.trigger_flush();
159 }
160
161 let seq = self.next_op_seq();
162 let path_str = item.path.to_string_lossy().to_string();
163 let volume = item.volume;
164 let entry = IndexEntry {
165 opstamp: Opstamp::insertion(seq),
166 kind: item.kind,
167 last_modified: item.last_modified,
168 last_accessed: item.last_accessed,
169 category: item.category,
170 volume_type: item.volume_type,
171 };
172
173 {
174 let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
175 wal.append(&path_str, &volume, &entry)
176 .map_err(IndexError::Io)?;
177 }
178
179 {
180 self.mem_idx
181 .write()
182 .map_err(|_| IndexError::WriteLock)?
183 .insert(path_str, (volume, entry));
184 }
185
186 if self.should_flush() {
187 let _ = self.trigger_flush();
188 }
189
190 Ok(())
191 }
192
193 pub fn delete(&self, item: &Path) -> Result<(), IndexError> {
194 let seq = self.next_op_seq();
195
196 let path_str = item.to_string_lossy().to_string();
197 let entry = IndexEntry {
198 opstamp: Opstamp::deletion(seq),
199 kind: Kind::File,
200 last_modified: 0,
201 last_accessed: 0,
202 category: 0,
203 volume_type: common::VolumeType::Local,
204 };
205
206 {
207 let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
208 wal.append(&path_str, "", &entry).map_err(IndexError::Io)?;
209 }
210
211 {
212 self.mem_idx
213 .write()
214 .map_err(|_| IndexError::WriteLock)?
215 .insert(path_str, ("".to_owned(), entry));
216 }
217
218 if self.should_flush() {
219 let _ = self.trigger_flush();
220 }
221
222 Ok(())
223 }
224
225 pub fn delete_prefix(&self, prefix: &str) -> Result<(), IndexError> {
227 self.delete_by_volume_name(None, prefix)
228 }
229
230 pub fn delete_by_volume_name(
234 &self,
235 volume: Option<&str>,
236 prefix: &str,
237 ) -> Result<(), IndexError> {
238 let seq = self.next_op_seq.fetch_add(1, Ordering::SeqCst);
239 let normalized_prefix = prefix
240 .replace(['/', '\\'], std::path::MAIN_SEPARATOR_STR)
241 .to_lowercase();
242 {
243 let mut tombstones = self
244 .prefix_tombstones
245 .write()
246 .map_err(|_| IndexError::WriteLock)?;
247
248 Arc::make_mut(&mut tombstones).push((
249 volume.map(|s| s.to_string()),
250 normalized_prefix.clone(),
251 seq,
252 ));
253 }
254
255 {
256 let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
257
258 wal.write_prefix_tombstone(volume, &normalized_prefix, seq)?;
259 }
260
261 Ok(())
262 }
263
264 pub fn sync(&self) -> Result<(), IndexError> {
267 let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
268 wal.flush().map_err(IndexError::Io)?;
269
270 Ok(())
271 }
272
273 pub fn search(
276 &self,
277 query: &str,
278 limit: usize,
279 offset: usize,
280 options: SearchOptions<'_>,
281 ) -> Result<Vec<SearchResult>, IndexError> {
282 let mut tokens = crate::tokenizer::tokenize(query);
283
284 if tokens.is_empty() {
285 return Ok(Vec::new());
286 }
287
288 let query_lower = query.to_lowercase();
289 let raw_query_tokens: Vec<&str> = query_lower
290 .split(|c: char| !c.is_alphanumeric())
291 .filter(|s| !s.is_empty())
292 .collect();
293
294 tokens.sort_by_key(|b| std::cmp::Reverse(b.len()));
295
296 let segments = self.base.read().map_err(|_| IndexError::ReadLock)?;
297 let mem = self.mem_idx.read().map_err(|_| IndexError::ReadLock)?;
298
299 let required_matches = limit + offset;
300 let scoring_cap = std::cmp::max(500, required_matches * 3).min(1000);
301
302 let active_tombstones = self
303 .prefix_tombstones
304 .read()
305 .map_err(|_| IndexError::ReadLock)?
306 .clone();
307
308 let mut collector = LsmCollector::new(&active_tombstones);
309
310 let volume_type_mask = Self::compile_allowed_volume_mask(options.volume_type);
311
312 for (path, (volume, entry)) in mem.iter() {
314 let path_bytes = path.as_bytes();
315
316 if let Some(filter) = options.volume_name
317 && volume != filter
318 {
319 continue;
320 }
321
322 if let Some(category) = options.category
323 && entry.category & category == 0
324 {
325 continue;
326 }
327
328 if let Some(kind) = options.kind
329 && entry.kind != kind
330 {
331 continue;
332 }
333
334 if (volume_type_mask & (1 << entry.volume_type as u8)) == 0 {
335 continue;
336 }
337
338 let matches_all = if path.is_ascii() {
339 tokens.iter().all(|t| {
340 let token_bytes = t.as_bytes();
341 if path_bytes.len() < token_bytes.len() {
342 return false;
343 }
344 path_bytes
345 .windows(token_bytes.len())
346 .enumerate()
347 .any(|(idx, window)| {
348 if window.eq_ignore_ascii_case(token_bytes) {
349 if idx == 0 {
350 true
351 } else {
352 !path_bytes[idx - 1].is_ascii_alphanumeric()
353 }
354 } else {
355 false
356 }
357 })
358 })
359 } else {
360 let folded_path = crate::tokenizer::fold_path(path);
361 tokens.iter().all(|t| {
362 folded_path.match_indices(t.as_str()).any(|(idx, _)| {
363 if idx == 0 {
364 true
365 } else {
366 !folded_path[..idx].chars().last().unwrap().is_alphanumeric()
367 }
368 })
369 })
370 };
371
372 if matches_all {
373 collector.insert(path.to_string(), volume.clone(), *entry);
374 }
375 }
376
377 let mut token_docs = Vec::new();
379 let mut current_matches = Vec::new();
380 let mut swap_buffer = Vec::new();
381
382 let vol_token = options
383 .volume_name
384 .map(|vol| crate::tokenizer::synthesize_volume_token(&vol.to_lowercase()));
385
386 for segment in segments.segments() {
387 current_matches.clear();
388 let mut first_token = true;
389 let mut valid_matches = true;
390
391 if let Some(ref vol_token) = vol_token {
392 let map = segment.as_ref().as_ref();
393 if let Some(post_offset) = map.get(vol_token) {
394 segment.append_posting_list(post_offset, &mut current_matches);
395 current_matches.sort_unstable();
396 current_matches.dedup();
397 first_token = false;
398 } else {
399 continue;
400 }
401 }
402
403 for token in &tokens {
404 if !first_token && current_matches.is_empty() {
406 valid_matches = false;
407 break;
408 }
409
410 let matcher = Str::new(token).starts_with();
411
412 token_docs.clear();
413 let map = segment.as_ref().as_ref();
414 let mut stream = map.search(&matcher).into_stream();
415
416 while let Some((_, post_offset)) = stream.next() {
417 segment.append_posting_list(post_offset, &mut token_docs);
418 }
419
420 token_docs.sort_unstable();
421 token_docs.dedup();
422
423 if first_token {
424 std::mem::swap(&mut current_matches, &mut token_docs);
425 first_token = false;
426 } else {
427 let t_len = token_docs.len();
428 let c_len = current_matches.len();
429
430 if c_len * 10 < t_len || t_len * 10 < c_len {
431 if c_len > t_len {
432 swap_buffer.clear();
434
435 for &doc_id in &token_docs {
436 if current_matches.binary_search(&doc_id).is_ok() {
437 swap_buffer.push(doc_id);
438 }
439 }
440 std::mem::swap(&mut current_matches, &mut swap_buffer);
441 } else {
442 current_matches
443 .retain(|doc_id| token_docs.binary_search(doc_id).is_ok());
444 }
445 } else {
446 let mut j = 0;
448 current_matches.retain(|&doc_id| {
449 while j < t_len && token_docs[j] < doc_id {
450 j += 1;
451 }
452 j < t_len && token_docs[j] == doc_id
453 })
454 }
455 }
456 }
457
458 if valid_matches && !current_matches.is_empty() {
459 let valid_docs = ¤t_matches;
460 let mut enriched_docs: Vec<u128> = Vec::with_capacity(valid_docs.len());
461 let meta_mmap = segment.meta_map();
462
463 for &doc_id in valid_docs {
464 let byte_offset = (doc_id as usize) * size_of::<u128>();
465 let packed_bytes: [u8; 16] = meta_mmap
466 [byte_offset..byte_offset + size_of::<u128>()]
467 .try_into()
468 .expect("failed to unpack");
469 let packed_val = u128::from_le_bytes(packed_bytes);
470
471 let (_, _, _, _, is_dir, doc_category, vol_type) =
472 SegmentedIndex::unpack_u128(packed_val);
473
474 if let Some(kind) = options.kind {
476 let is_target_dir = kind == Kind::Directory;
477 if is_dir != is_target_dir {
478 continue;
479 }
480 }
481
482 if let Some(category) = options.category
484 && doc_category & category == 0
485 {
486 continue;
487 }
488
489 if (volume_type_mask & (1 << vol_type)) == 0 {
491 continue;
492 }
493
494 enriched_docs.push(packed_val);
495 }
496
497 if enriched_docs.len() > scoring_cap {
498 enriched_docs.select_nth_unstable_by(scoring_cap, |&a, &b| {
500 let (a_off, a_modified_at, a_accessed_at, a_depth, a_dir, _, _) =
501 SegmentedIndex::unpack_u128(a);
502 let (b_off, b_modified_at, b_accessed_at, b_depth, b_dir, _, _) =
503 SegmentedIndex::unpack_u128(b);
504
505 let a_recent = a_modified_at.max(a_accessed_at);
506 let b_recent = b_modified_at.max(b_accessed_at);
507
508 b_dir
509 .cmp(&a_dir)
510 .then_with(|| a_depth.cmp(&b_depth))
511 .then_with(|| b_recent.cmp(&a_recent))
512 .then_with(|| a_off.cmp(&b_off))
513 });
514
515 enriched_docs.truncate(scoring_cap);
516 }
517
518 enriched_docs.sort_unstable_by_key(|&packed| {
520 let (dat_offset, _, _, _, _, _, _) = SegmentedIndex::unpack_u128(packed);
521 dat_offset
522 });
523
524 for packed_val in enriched_docs {
525 let (dat_offset, _, _, _, _, _, _) = SegmentedIndex::unpack_u128(packed_val);
526
527 if let Some((path, volume, entry)) = segment.read_document(dat_offset) {
528 collector.insert(path, volume, entry);
529 }
530 }
531 }
532 }
533
534 let mut results: Vec<_> = collector.finish().collect();
535
536 if results.len() > scoring_cap {
538 results.select_nth_unstable_by(scoring_cap, |a, b| {
539 let a_recent = a.2.last_modified.max(a.2.last_accessed);
540 let b_recent = b.2.last_modified.max(b.2.last_accessed);
541
542 b_recent.cmp(&a_recent).then_with(|| a.0.cmp(&b.0))
543 });
544 results.truncate(scoring_cap);
545 }
546
547 let now_micros = std::time::SystemTime::now()
548 .duration_since(std::time::UNIX_EPOCH)
549 .expect("failed to get system time")
550 .as_micros() as f64;
551
552 let config = if let Some(config) = options.scoring {
553 config
554 } else {
555 &ScoringConfig::default()
556 };
557
558 let weights = config.weights.unwrap_or_default();
559 let mut scored: Vec<_> = results
560 .into_iter()
561 .map(|(path, volume, entry)| {
562 let inputs = ScoringInputs {
563 path: &path,
564 query_tokens: &tokens,
565 raw_query_tokens: &raw_query_tokens,
566 last_modified: entry.last_modified,
567 last_accessed: entry.last_accessed,
568 kind: entry.kind,
569 now_micros,
570 };
571
572 let score = (config.scoring_fn)(&weights, &inputs);
573
574 SearchResult {
575 path: PathBuf::from(path),
576 volume,
577 volume_type: entry.volume_type,
578 kind: entry.kind,
579 last_modified: entry.last_modified,
580 last_accessed: entry.last_accessed,
581 category: entry.category,
582 score,
583 }
584 })
585 .collect();
586
587 scored.sort();
588
589 let paginated_results = scored.into_iter().skip(offset).take(limit).collect();
590
591 Ok(paginated_results)
592 }
593
594 pub fn recent_files(
596 &self,
597 since: u64, limit: usize,
599 offset: usize,
600 options: SearchOptions<'_>,
601 ) -> Result<Vec<SearchResult>, IndexError> {
602 let segments = self.base.read().unwrap();
603 let mem = self.mem_idx.read().unwrap();
604
605 let active_tombstones = self
606 .prefix_tombstones
607 .read()
608 .map_err(|_| IndexError::ReadLock)?
609 .clone();
610
611 let mut collector = LsmCollector::new(&active_tombstones);
612
613 let volume_type_mask = Self::compile_allowed_volume_mask(options.volume_type);
614
615 for (path, (volume, entry)) in mem.iter() {
616 if entry.last_accessed >= since {
617 if let Some(filter) = options.volume_name
618 && volume != filter
619 {
620 continue;
621 }
622 if let Some(category) = options.category
623 && entry.category & category == 0
624 {
625 continue;
626 }
627 if let Some(kind) = options.kind
628 && entry.kind != kind
629 {
630 continue;
631 }
632 if (volume_type_mask & (1 << entry.volume_type as u8)) == 0 {
633 continue;
634 }
635 collector.insert(path.clone(), volume.clone(), *entry);
636 }
637 }
638
639 let required_matches = offset + limit;
640 let disk_cap = required_matches + 500;
642
643 let mut disk_candidates: Vec<(&std::sync::Arc<Segment>, u128)> = Vec::new();
644
645 for segment in segments.segments() {
646 let meta_mmap = segment.meta_map();
647
648 for chunk in meta_mmap.chunks_exact(16) {
649 let packed = u128::from_le_bytes(chunk.try_into().unwrap());
650 let (_, _, accessed, _, is_dir, doc_category, doc_vol_type) =
651 SegmentedIndex::unpack_u128(packed);
652
653 if accessed >= since {
654 if let Some(target_kind) = options.kind {
655 let is_target_dir = target_kind == Kind::Directory;
656 if is_dir != is_target_dir {
657 continue;
658 }
659 }
660
661 if let Some(category) = options.category
662 && doc_category & category == 0
663 {
664 continue;
665 }
666
667 if (volume_type_mask & (1 << doc_vol_type)) == 0 {
668 continue;
669 }
670
671 disk_candidates.push((segment, packed));
673 }
674 }
675 }
676
677 if disk_candidates.len() > disk_cap {
678 disk_candidates.select_nth_unstable_by(disk_cap, |a, b| {
679 let (a_off, a_mod, a_acc, _, _, _, _) = SegmentedIndex::unpack_u128(a.1);
680 let (b_off, b_mod, b_acc, _, _, _, _) = SegmentedIndex::unpack_u128(b.1);
681 b_acc
682 .cmp(&a_acc) .then_with(|| b_mod.cmp(&a_mod)) .then_with(|| a_off.cmp(&b_off)) });
686 disk_candidates.truncate(disk_cap);
687 }
688
689 for (segment, packed) in disk_candidates {
690 let (dat_offset, _, _, _, _, _, _) = SegmentedIndex::unpack_u128(packed);
691
692 if let Some((path, volume, entry)) = segment.read_document(dat_offset) {
693 if let Some(filter) = options.volume_name
694 && volume != filter
695 {
696 continue;
697 }
698 collector.insert(path, volume, entry);
699 }
700 }
701
702 let mut results: Vec<_> = collector.finish().collect();
703
704 if results.len() > required_matches {
705 results.select_nth_unstable_by(required_matches, |a, b| {
706 b.2.last_accessed
707 .cmp(&a.2.last_accessed)
708 .then_with(|| b.2.last_modified.cmp(&a.2.last_modified))
709 .then_with(|| a.0.cmp(&b.0))
710 });
711 results.truncate(required_matches);
712 }
713
714 results.sort_unstable_by(|a, b| {
715 b.2.last_accessed
716 .cmp(&a.2.last_accessed)
717 .then_with(|| b.2.last_modified.cmp(&a.2.last_modified))
718 .then_with(|| a.0.cmp(&b.0))
719 });
720
721 let paginated_results = results
722 .into_iter()
723 .skip(offset)
724 .take(limit)
725 .map(|(path, volume, entry)| SearchResult {
726 path: PathBuf::from(path),
727 volume,
728 volume_type: entry.volume_type,
729 kind: entry.kind,
730 last_modified: entry.last_modified,
731 last_accessed: entry.last_accessed,
732 category: entry.category,
733 score: 0.0,
734 })
735 .collect();
736
737 Ok(paginated_results)
738 }
739
740 pub fn force_compact_all(&self) -> Result<(), IndexError> {
744 loop {
746 if let Ok(mut flusher) = self.flusher.write()
747 && let Some(handle) = flusher.take()
748 {
749 log::debug!("Waiting for background flush to finish...");
750 let _ = handle.join();
751 }
752
753 if self
754 .mem_idx
755 .read()
756 .map_err(|_| IndexError::ReadLock)?
757 .is_empty()
758 {
759 break;
760 }
761
762 self.trigger_flush()?;
763 }
764
765 if let Ok(mut compactor) = self.compactor.write()
766 && let Some(handle) = compactor.take()
767 {
768 log::debug!("Waiting for background compactor to finish...");
769 let _ = handle.join();
770 }
771
772 let snapshot = {
773 let base = self.base.read().map_err(|_| IndexError::ReadLock)?;
774 let segments = base.snapshot();
775
776 if segments.len() <= 1 {
778 log::debug!("Database is already fully compacted.");
779 return Ok(());
780 }
781 segments
782 };
783
784 log::debug!("Forcing full compaction of {} segments...", snapshot.len());
785
786 let compactor_seq = self.next_op_seq.fetch_add(1, Ordering::SeqCst);
787
788 let tmp_path = self.path.join(format!("{}.tmp", compactor_seq));
789
790 let snapshot_tombstones = {
791 let guard = self.prefix_tombstones.read().expect("lock poisoned");
792 guard.clone()
793 };
794
795 compactor::merge_segments(&snapshot, snapshot_tombstones, tmp_path.clone())
796 .map_err(|e| IndexError::Io(std::io::Error::other(e)))?;
797
798 let mut base_guard = self.base.write().map_err(|_| IndexError::WriteLock)?;
799 base_guard
800 .apply_compaction(&snapshot, tmp_path)
801 .map_err(|e| IndexError::Io(std::io::Error::other(e)))?;
802
803 log::debug!("Full compaction complete");
804 Ok(())
805 }
806
807 fn should_flush(&self) -> bool {
808 self.mem_idx.read().unwrap().len() > self.compactor_config.flush_threshold
809 || self.prefix_tombstones.read().unwrap().len()
810 > self.compactor_config.tombstone_threshold
811 }
812
813 fn trigger_flush(&self) -> Result<(), IndexError> {
814 if let Some(ref flusher) = *self.flusher.read().expect("failed to read flusher")
815 && !flusher.is_finished()
816 {
817 return Ok(());
818 }
819 let mut mem = self.mem_idx.write().expect("failed to lock memory");
820 let mut wal = self.wal.write().expect("failed to lock wal");
821
822 if mem.is_empty() {
823 return Ok(());
824 }
825
826 let snapshot = std::mem::take(&mut *mem);
827 let path = self.path.clone();
828 let next_seq = self.next_op_seq();
829
830 let flushing_path = path.join(format!("journal.{}.flushing.wal", next_seq));
831 wal.rotate(&flushing_path).map_err(IndexError::Io)?;
832
833 let tombstones_cow = { self.prefix_tombstones.read().unwrap().clone() };
835 for (volume, prefix, seq) in tombstones_cow.iter() {
836 wal.write_prefix_tombstone(volume.as_deref(), prefix, *seq)?;
837 }
838
839 drop(wal);
840 drop(mem);
841
842 let base = Arc::clone(&self.base);
843 let min_merge_count = self.compactor_config.min_merge_count;
844 let compactor_lock = Arc::clone(&self.compactor);
845 let op_seq = Arc::clone(&self.next_op_seq);
846 let prefix_tombstones = Arc::clone(&self.prefix_tombstones);
847
848 let flusher = std::thread::Builder::new()
849 .name("minidex-flush".to_owned())
850 .spawn(move || {
851 let final_segment_path = path.join(format!("{}", next_seq));
852 let tmp_segment_path = path.join(format!("{}.tmp", next_seq));
853
854 if let Err(e) = SegmentedIndex::build_segment_files(
855 &tmp_segment_path,
856 snapshot
857 .into_iter()
858 .map(|(path, (volume, entry))| (path, volume, entry)),
859 false,
860 ) {
861 log::error!("flush failed to write: {}", e);
862 let tmp_paths = Segment::paths_with_additional_extension(&tmp_segment_path);
863 Segment::remove_files(&tmp_paths);
864 return;
865 }
866
867 let tmp_paths = Segment::paths_with_additional_extension(&tmp_segment_path);
868
869 let final_paths = Segment::paths_with_additional_extension(&final_segment_path);
870
871 let _ = Segment::rename_files(&tmp_paths, &final_paths);
872
873 let new_segment =
874 Arc::new(Segment::load(final_segment_path).expect("failed to load"));
875 {
876 let mut base_guard = base.write().expect("failed to lock base");
877 base_guard.add_segment(new_segment);
878 }
879
880 if let Err(e) = std::fs::remove_file(&flushing_path) {
881 log::error!("failed to delete rotated WAL: {}", e);
882 }
883
884 let snapshot = {
885 let base = base.read().expect("failed to read-lock base");
886 if base.segments().count() <= min_merge_count {
887 return;
888 }
889
890 base.snapshot()
891 };
892
893 let mut compactor_guard = compactor_lock
894 .write()
895 .expect("failed to acquire compactor write-lock");
896 if let Some(handle) = compactor_guard.as_ref()
897 && !handle.is_finished()
898 {
899 return;
900 }
901
902 *compactor_guard = Self::compact(base, path, snapshot, prefix_tombstones, op_seq);
903 })
904 .map_err(IndexError::Io)?;
905
906 *self.flusher.write().unwrap() = Some(flusher);
907 Ok(())
908 }
909
910 fn compact(
911 base: Arc<RwLock<SegmentedIndex>>,
912 path: PathBuf,
913 snapshot: Vec<Arc<Segment>>,
914 prefix_tombstones: Arc<RwLock<Arc<Vec<Tombstone>>>>,
915 next_op_seq: Arc<AtomicU64>,
916 ) -> Option<JoinHandle<()>> {
917 if snapshot.is_empty() {
918 return None;
919 }
920
921 std::thread::Builder::new()
922 .name("minidex-compactor".to_string())
923 .spawn(move || {
924 let next_seq = next_op_seq.fetch_add(1, Ordering::SeqCst);
925 let tmp_path = path.join(format!("{}.tmp", next_seq));
926
927 log::debug!("Starting compaction with {} segments", snapshot.len());
928 let snapshot_tombstones = { prefix_tombstones.read().unwrap().clone() };
929 match compactor::merge_segments(&snapshot, snapshot_tombstones, tmp_path.clone()) {
930 Ok(compactor_seq) => {
931 let mut base_guard = base
932 .write()
933 .expect("failed to lock base for compaction apply");
934 if let Err(e) = base_guard.apply_compaction(&snapshot, tmp_path) {
935 log::error!("Failed to apply compaction: {}", e);
936 }
937 let mut tombstones = prefix_tombstones.write().unwrap();
938 Arc::make_mut(&mut tombstones).retain(|(_, _, seq)| *seq >= compactor_seq);
939
940 log::debug!("Compaction finished");
941 }
942 Err(e) => log::error!("Compaction failed: {}", e),
943 }
944 })
945 .ok()
946 }
947
948 fn compile_allowed_volume_mask(allowed_volume_types: Option<&[VolumeType]>) -> u8 {
949 match allowed_volume_types {
950 Some(allowed) => allowed.iter().fold(0, |acc, &vt| acc | (1 << (vt as u8))),
951 None => 0b0000_1111,
952 }
953 }
954}
955
956impl Drop for Index {
957 fn drop(&mut self) {
958 let _ = self.sync();
959
960 if let Ok(mut flusher) = self.flusher.write()
961 && let Some(flusher) = flusher.take()
962 {
963 let _ = flusher.join();
964 }
965
966 if let Ok(mut compactor) = self.compactor.write()
967 && let Some(compactor) = compactor.take()
968 {
969 let _ = compactor.join();
970 }
971 }
972}
973
974#[derive(Debug, Error)]
975pub enum IndexError {
976 #[error("failed to open index on disk: {0}")]
977 Open(std::io::Error),
978 #[error("failed to read lock data")]
979 ReadLock,
980 #[error("failed to write lock data")]
981 WriteLock,
982 #[error(transparent)]
983 SegmentedIndex(SegmentedIndexError),
984 #[error("failed to compile matching regex: {0}")]
985 Regex(String),
986 #[error("io error: {0}")]
987 Io(#[from] std::io::Error),
988}
989
990#[cfg(test)]
991mod tests {
992 use super::*;
993 use crate::common::{VolumeType, category};
994
995 #[test]
996 fn test_index_basic_lifecycle() -> Result<(), IndexError> {
997 let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_{}", rand_id()));
998 std::fs::create_dir_all(&temp_dir)?;
999
1000 let sep = std::path::MAIN_SEPARATOR_STR;
1001 let path1 = format!("{}foo{}bar.txt", sep, sep);
1002
1003 {
1004 let index = Index::open(&temp_dir)?;
1005 index.insert(FilesystemEntry {
1006 path: PathBuf::from(&path1),
1007 volume: "vol1".to_string(),
1008 kind: Kind::File,
1009 last_modified: 100,
1010 last_accessed: 100,
1011 category: category::TEXT,
1012 volume_type: VolumeType::Local,
1013 })?;
1014
1015 let results = index.search("bar", 10, 0, SearchOptions::default())?;
1016 assert_eq!(results.len(), 1);
1017 assert_eq!(results[0].path, PathBuf::from(&path1));
1018
1019 index.sync()?;
1020 }
1021
1022 {
1024 let index = Index::open(&temp_dir)?;
1025 let results = index.search("bar", 10, 0, SearchOptions::default())?;
1026 assert_eq!(results.len(), 1);
1027 assert_eq!(results[0].path, PathBuf::from(&path1));
1028 }
1029
1030 std::fs::remove_dir_all(temp_dir)?;
1031 Ok(())
1032 }
1033
1034 #[test]
1035 fn test_index_flush_and_search() -> Result<(), IndexError> {
1036 let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_flush_{}", rand_id()));
1037 std::fs::create_dir_all(&temp_dir)?;
1038
1039 let config = CompactorConfig {
1040 flush_threshold: 1,
1041 ..Default::default()
1042 };
1043
1044 let sep = std::path::MAIN_SEPARATOR_STR;
1045
1046 let index = Index::open_with_config(&temp_dir, config)?;
1047 index.insert(FilesystemEntry {
1048 path: PathBuf::from(format!("{}foo{}a.txt", sep, sep)),
1049 volume: "vol1".to_string(),
1050 kind: Kind::File,
1051 last_modified: 100,
1052 last_accessed: 100,
1053 category: category::TEXT,
1054 volume_type: VolumeType::Local,
1055 })?;
1056
1057 index.insert(FilesystemEntry {
1059 path: PathBuf::from(format!("{}foo{}b.txt", sep, sep)),
1060 volume: "vol1".to_string(),
1061 kind: Kind::File,
1062 last_modified: 100,
1063 last_accessed: 100,
1064 category: category::TEXT,
1065 volume_type: VolumeType::Local,
1066 })?;
1067
1068 std::thread::sleep(std::time::Duration::from_millis(500));
1070
1071 let results = index.search("foo", 10, 0, SearchOptions::default())?;
1072 assert_eq!(results.len(), 2);
1073
1074 std::fs::remove_dir_all(temp_dir)?;
1075 Ok(())
1076 }
1077
1078 #[test]
1079 fn test_index_prefix_delete() -> Result<(), IndexError> {
1080 let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_del_{}", rand_id()));
1081 std::fs::create_dir_all(&temp_dir)?;
1082
1083 let sep = std::path::MAIN_SEPARATOR_STR;
1084
1085 let index = Index::open(&temp_dir)?;
1086 index.insert(FilesystemEntry {
1087 path: PathBuf::from(format!("{}foo{}bar{}a.txt", sep, sep, sep)),
1088 volume: "vol1".to_string(),
1089 kind: Kind::File,
1090 last_modified: 100,
1091 last_accessed: 100,
1092 category: 0,
1093 volume_type: VolumeType::Local,
1094 })?;
1095 let other_path = format!("{}other{}b.txt", sep, sep);
1096 index.insert(FilesystemEntry {
1097 path: PathBuf::from(&other_path),
1098 volume: "vol1".to_string(),
1099 kind: Kind::File,
1100 last_modified: 100,
1101 last_accessed: 100,
1102 category: 0,
1103 volume_type: VolumeType::Local,
1104 })?;
1105
1106 index.delete_prefix(&format!("{}foo", sep))?;
1108
1109 let results = index.search("txt", 10, 0, SearchOptions::default())?;
1110 assert_eq!(results.len(), 1);
1111 assert_eq!(results[0].path, PathBuf::from(&other_path));
1112
1113 std::fs::remove_dir_all(temp_dir)?;
1114 Ok(())
1115 }
1116
1117 #[test]
1118 fn test_index_volume_prefix_delete() -> Result<(), IndexError> {
1119 let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_vol_del_{}", rand_id()));
1120 std::fs::create_dir_all(&temp_dir)?;
1121
1122 let sep = std::path::MAIN_SEPARATOR_STR;
1123
1124 let index = Index::open(&temp_dir)?;
1125 index.insert(FilesystemEntry {
1126 path: PathBuf::from(format!("{}foo{}bar{}a.txt", sep, sep, sep)),
1127 volume: "vol1".to_string(),
1128 kind: Kind::File,
1129 last_modified: 100,
1130 last_accessed: 100,
1131 category: 0,
1132 volume_type: VolumeType::Local,
1133 })?;
1134 index.insert(FilesystemEntry {
1135 path: PathBuf::from(format!("{}foo{}bar{}b.txt", sep, sep, sep)),
1136 volume: "vol2".to_string(),
1137 kind: Kind::File,
1138 last_modified: 100,
1139 last_accessed: 100,
1140 category: 0,
1141 volume_type: VolumeType::Local,
1142 })?;
1143
1144 index.delete_by_volume_name(Some("vol1"), &format!("{}foo", sep))?;
1146
1147 let results = index.search("txt", 10, 0, SearchOptions::default())?;
1148 assert_eq!(results.len(), 1);
1149 assert_eq!(results[0].volume, "vol2");
1150
1151 std::fs::remove_dir_all(temp_dir)?;
1152 Ok(())
1153 }
1154
1155 #[test]
1156 fn test_index_compaction() -> Result<(), IndexError> {
1157 let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_comp_{}", rand_id()));
1158 std::fs::create_dir_all(&temp_dir)?;
1159
1160 let config = CompactorConfig {
1161 flush_threshold: 1,
1162 ..Default::default()
1163 };
1164
1165 let sep = std::path::MAIN_SEPARATOR_STR;
1166
1167 let index = Index::open_with_config(&temp_dir, config)?;
1168
1169 for i in 0..4 {
1171 index.insert(FilesystemEntry {
1172 path: PathBuf::from(format!("{}foo{}{}.txt", sep, sep, i)),
1173 volume: "vol1".to_string(),
1174 kind: Kind::File,
1175 last_modified: 100,
1176 last_accessed: 100,
1177 category: 0,
1178 volume_type: VolumeType::Local,
1179 })?;
1180 std::thread::sleep(std::time::Duration::from_millis(200));
1182 }
1183
1184 if let Ok(mut flusher) = index.flusher.write()
1186 && let Some(h) = flusher.take()
1187 {
1188 let _ = h.join();
1189 }
1190
1191 {
1192 let base = index.base.read().unwrap();
1193 assert!(
1194 base.segments().count() >= 2,
1195 "Should have at least 2 segments, got {}",
1196 base.segments().count()
1197 );
1198 }
1199
1200 index.force_compact_all()?;
1201
1202 {
1203 let base = index.base.read().unwrap();
1204 assert_eq!(base.segments().count(), 1);
1205 }
1206
1207 let results = index.search("foo", 10, 0, SearchOptions::default())?;
1208 assert_eq!(results.len(), 4);
1209
1210 std::fs::remove_dir_all(temp_dir)?;
1211 Ok(())
1212 }
1213
1214 #[test]
1215 fn test_index_recent_files() -> Result<(), IndexError> {
1216 let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_recent_{}", rand_id()));
1217 std::fs::create_dir_all(&temp_dir)?;
1218
1219 let sep = std::path::MAIN_SEPARATOR_STR;
1220
1221 let index = Index::open(&temp_dir)?;
1222 index.insert(FilesystemEntry {
1223 path: PathBuf::from(format!("{}foo{}old.txt", sep, sep)),
1224 volume: "vol1".to_string(),
1225 kind: Kind::File,
1226 last_modified: 100,
1227 last_accessed: 100, category: 0,
1229 volume_type: VolumeType::Local,
1230 })?;
1231 let new_path = format!("{}foo{}new.txt", sep, sep);
1232 index.insert(FilesystemEntry {
1233 path: PathBuf::from(&new_path),
1234 volume: "vol1".to_string(),
1235 kind: Kind::File,
1236 last_modified: 1000,
1237 last_accessed: 1000, category: 0,
1239 volume_type: VolumeType::Local,
1240 })?;
1241
1242 let results = index.recent_files(500, 10, 0, SearchOptions::default())?;
1243 assert_eq!(results.len(), 1);
1244 assert_eq!(results[0].path, PathBuf::from(&new_path));
1245
1246 std::fs::remove_dir_all(temp_dir)?;
1247 Ok(())
1248 }
1249
1250 #[test]
1251 fn test_index_search_filters() -> Result<(), IndexError> {
1252 let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_filter_{}", rand_id()));
1253 std::fs::create_dir_all(&temp_dir)?;
1254
1255 let sep = std::path::MAIN_SEPARATOR_STR;
1256
1257 let index = Index::open(&temp_dir)?;
1258 index.insert(FilesystemEntry {
1259 path: PathBuf::from(format!("{}vol1{}a.txt", sep, sep)),
1260 volume: "vol1".to_string(),
1261 kind: Kind::File,
1262 last_modified: 100,
1263 last_accessed: 100,
1264 category: category::TEXT,
1265 volume_type: VolumeType::Local,
1266 })?;
1267 index.insert(FilesystemEntry {
1268 path: PathBuf::from(format!("{}vol2{}b.txt", sep, sep)),
1269 volume: "vol2".to_string(),
1270 kind: Kind::File,
1271 last_modified: 100,
1272 last_accessed: 100,
1273 category: category::IMAGE,
1274 volume_type: VolumeType::Local,
1275 })?;
1276
1277 let opts_vol1 = SearchOptions {
1279 volume_name: Some("vol1"),
1280 ..Default::default()
1281 };
1282 let res_vol1 = index.search("txt", 10, 0, opts_vol1)?;
1283 assert_eq!(res_vol1.len(), 1);
1284 assert_eq!(res_vol1[0].volume, "vol1");
1285
1286 let opts_img = SearchOptions {
1288 category: Some(category::IMAGE),
1289 ..Default::default()
1290 };
1291 let res_img = index.search("txt", 10, 0, opts_img)?;
1292 assert_eq!(res_img.len(), 1);
1293 assert_eq!(res_img[0].category, category::IMAGE);
1294
1295 let opts_dir = SearchOptions {
1297 kind: Some(Kind::Directory),
1298 ..Default::default()
1299 };
1300 let res_dir = index.search("txt", 10, 0, opts_dir)?;
1301 assert_eq!(res_dir.len(), 0);
1302
1303 std::fs::remove_dir_all(temp_dir)?;
1304 Ok(())
1305 }
1306
1307 fn rand_id() -> u64 {
1308 std::time::SystemTime::now()
1309 .duration_since(std::time::UNIX_EPOCH)
1310 .unwrap()
1311 .as_nanos() as u64
1312 }
1313}