Skip to main content

minidex/
lib.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    path::{Path, PathBuf},
4    sync::{
5        Arc, RwLock,
6        atomic::{AtomicU64, Ordering},
7    },
8    thread::JoinHandle,
9};
10
11use bstr::ByteSlice;
12use fst::{Automaton as _, IntoStreamer as _, Streamer, automaton::Str};
13
14use thiserror::Error;
15
16mod common;
17pub use common::Kind;
18mod entry;
19pub use entry::FilesystemEntry;
20use entry::*;
21mod segmented_index;
22pub use segmented_index::compactor::*;
23use segmented_index::*;
24mod opstamp;
25use opstamp::*;
26use wal::Wal;
27mod search;
28mod tokenizer;
29mod wal;
30pub use search::{ScoringConfig, SearchOptions, SearchResult};
31
32/// A Minidex Index, managing both the in-memory and disk data.
33/// Insertions and deletions auto-commit to the Write-Ahead Log
34/// and may trigger compaction.
35pub struct Index {
36    path: PathBuf,
37    base: Arc<RwLock<SegmentedIndex>>,
38    next_op_seq: Arc<AtomicU64>,
39    mem_idx: RwLock<BTreeMap<String, (String, IndexEntry)>>,
40    wal: RwLock<Wal>,
41    compactor_config: segmented_index::compactor::CompactorConfig,
42    compactor: Arc<RwLock<Option<JoinHandle<()>>>>,
43    flusher: Arc<RwLock<Option<JoinHandle<()>>>>,
44    prefix_tombstones: Arc<RwLock<Vec<(String, u64)>>>,
45}
46
47impl Index {
48    /// Open the index on disk with a default compactor configuration.
49    /// This function will:
50    /// 1. Create (if it doesn't exist) the directory at `path`
51    /// 2. Try to obtain a lock on the directory
52    /// 3. Load the discovered segments, data and posting
53    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, IndexError> {
54        Self::open_with_config(path, CompactorConfig::default())
55    }
56
57    /// Open the index on disk with a custom compactor configuration.
58    /// This function will:
59    /// 1. Create (if it doesn't exist) the directory at `path`
60    /// 2. Try to obtain a lock on the directory
61    /// 3. Load the discovered segments, data and posting
62    pub fn open_with_config<P: AsRef<Path>>(
63        path: P,
64        compactor_config: CompactorConfig,
65    ) -> Result<Self, IndexError> {
66        let base = SegmentedIndex::open(&path).map_err(IndexError::SegmentedIndex)?;
67
68        let base = Arc::new(RwLock::new(base));
69        let mut max_seq = 0u64;
70        let mut mem_idx = BTreeMap::new();
71
72        let mut prefix_tombstones = Vec::new();
73
74        let mut apply_replay = |replay_data: crate::wal::ReplayData| {
75            for (path, volume, entry) in replay_data.inserts {
76                max_seq = max_seq.max(entry.opstamp.sequence());
77                mem_idx.insert(path, (volume, entry));
78            }
79            for (prefix, seq) in replay_data.tombstones {
80                max_seq = max_seq.max(seq);
81                prefix_tombstones.push((prefix, seq));
82            }
83        };
84
85        let entries = path.as_ref().read_dir().map_err(IndexError::Io)?;
86
87        let mut flushing_wals = Vec::new();
88
89        // Recover partial, flushing WAL files
90        for entry in entries {
91            if let Ok(e) = entry
92                && let Ok(file_type) = e.file_type()
93                && file_type.is_file()
94                && e.file_name().to_string_lossy().ends_with(".flushing.wal")
95            {
96                flushing_wals.push(e.path());
97            }
98        }
99        flushing_wals.sort_unstable();
100
101        for wal_path in flushing_wals {
102            let partial = Wal::replay(wal_path).map_err(IndexError::Io)?;
103
104            apply_replay(partial);
105        }
106
107        let wal_path = path.as_ref().join("journal.wal");
108
109        let recovered = Wal::replay(&wal_path).map_err(IndexError::Io)?;
110        apply_replay(recovered);
111
112        let next_op_seq = Arc::new(AtomicU64::new(max_seq + 1));
113
114        let wal = Wal::open(&wal_path).map_err(IndexError::Io)?;
115
116        Ok(Self {
117            path: path.as_ref().to_path_buf(),
118            base,
119            next_op_seq,
120            mem_idx: RwLock::new(mem_idx),
121            wal: RwLock::new(wal),
122            compactor_config,
123            compactor: Arc::new(RwLock::new(None)),
124            flusher: Arc::new(RwLock::new(None)),
125            prefix_tombstones: Arc::new(RwLock::new(prefix_tombstones)),
126        })
127    }
128
129    fn next_op_seq(&self) -> u64 {
130        self.next_op_seq
131            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
132    }
133
134    /// Insert a filesystem entry into the index.
135    pub fn insert(&self, item: FilesystemEntry) -> Result<(), IndexError> {
136        let seq = self.next_op_seq();
137        let path_str = item.path.to_string_lossy().to_string();
138        let volume = item.volume;
139        let entry = IndexEntry {
140            opstamp: Opstamp::insertion(seq),
141            kind: item.kind,
142            last_modified: item.last_modified,
143            last_accessed: item.last_accessed,
144        };
145
146        {
147            let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
148            wal.append(&path_str, &volume, &entry)
149                .map_err(IndexError::Io)?;
150        }
151
152        {
153            self.mem_idx
154                .write()
155                .map_err(|_| IndexError::WriteLock)?
156                .insert(path_str, (volume, entry));
157        }
158
159        if self.should_flush() {
160            let _ = self.trigger_flush();
161        }
162
163        Ok(())
164    }
165
166    pub fn delete(&self, item: &Path) -> Result<(), IndexError> {
167        let seq = self.next_op_seq();
168
169        let path_str = item.to_string_lossy().to_string();
170        let entry = IndexEntry {
171            opstamp: Opstamp::deletion(seq),
172            kind: Kind::File,
173            last_modified: 0,
174            last_accessed: 0,
175        };
176
177        {
178            let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
179            wal.append(&path_str, "", &entry).map_err(IndexError::Io)?;
180        }
181
182        {
183            self.mem_idx
184                .write()
185                .map_err(|_| IndexError::WriteLock)?
186                .insert(path_str, ("".to_owned(), entry));
187        }
188
189        if self.should_flush() {
190            let _ = self.trigger_flush();
191        }
192
193        Ok(())
194    }
195
196    pub fn delete_prefix(&self, prefix: &str) -> Result<(), IndexError> {
197        let seq = self.next_op_seq.fetch_add(1, Ordering::SeqCst);
198        let prefix_lower = prefix.to_lowercase();
199        {
200            let mut tombstones = self
201                .prefix_tombstones
202                .write()
203                .map_err(|_| IndexError::WriteLock)?;
204            tombstones.push((prefix_lower.clone(), seq));
205        }
206
207        {
208            let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
209
210            wal.write_prefix_tombstone(&prefix_lower, seq)?;
211        }
212
213        Ok(())
214    }
215
216    /// Writes the in-memory index to disk.
217    /// This method can fail if the disk is not writable.
218    pub fn sync(&self) -> Result<(), IndexError> {
219        let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
220        wal.flush().map_err(IndexError::Io)?;
221
222        Ok(())
223    }
224
225    /// Search the index for the given search term (usually a path or
226    /// file name), bound by limit and offset.
227    pub fn search(
228        &self,
229        query: &str,
230        limit: usize,
231        offset: usize,
232        options: SearchOptions<'_>,
233    ) -> Result<Vec<SearchResult>, IndexError> {
234        let mut tokens = crate::tokenizer::tokenize(query);
235
236        if tokens.is_empty() {
237            return Ok(Vec::new());
238        }
239
240        tokens.sort_by_key(|b| std::cmp::Reverse(b.len()));
241
242        let segments = self.base.read().map_err(|_| IndexError::ReadLock)?;
243        let mem = self.mem_idx.read().map_err(|_| IndexError::ReadLock)?;
244
245        let mut candidates: HashMap<String, (String, IndexEntry)> = HashMap::new();
246
247        let required_matches = limit + offset;
248        let scoring_cap = std::cmp::max(500, required_matches * 3).min(1000);
249
250        let short_circuit_threshold = std::cmp::max(5000, required_matches * 10);
251
252        let active_tombstones = self
253            .prefix_tombstones
254            .read()
255            .map_err(|_| IndexError::ReadLock)?
256            .clone();
257
258        for (path, (volume, entry)) in mem.iter() {
259            let path_bytes = path.as_bytes();
260
261            if is_tombstoned(path_bytes, entry.opstamp.sequence(), &active_tombstones) {
262                continue;
263            }
264
265            if let Some(filter) = options.volume_filter {
266                if volume != filter {
267                    continue;
268                }
269            }
270            let matches_all = tokens
271                .iter()
272                .all(|t| path_bytes.find_iter(t.as_bytes()).next().is_some());
273            if matches_all {
274                candidates
275                    .entry(path.clone())
276                    .and_modify(|(current_volume, current_entry)| {
277                        if entry.opstamp.sequence() > current_entry.opstamp.sequence() {
278                            *current_entry = *entry;
279                            *current_volume = volume.clone();
280                        }
281                    })
282                    .or_insert((volume.clone(), *entry));
283            }
284        }
285
286        for segment in segments.segments() {
287            let mut segment_doc_matches: Option<Vec<DocumentId>> =
288                if let Some(vol) = options.volume_filter {
289                    let vol_token = crate::tokenizer::synthesize_volume_token(&vol.to_lowercase());
290                    let map = segment.as_ref().as_ref();
291                    match map.get(&vol_token) {
292                        Some(post_offset) => {
293                            let mut docs = segment.read_posting_list(post_offset);
294                            docs.sort_unstable();
295                            docs.dedup();
296                            Some(docs)
297                        }
298                        None => continue, // We can skip this segment since it has no entries for this volume
299                    }
300                } else {
301                    None
302                };
303
304            for token in &tokens {
305                if let Some(existing) = &segment_doc_matches
306                    && existing.len() <= short_circuit_threshold
307                {
308                    break;
309                }
310                let matcher = Str::new(token).starts_with();
311
312                let mut token_docs = Vec::new();
313                let map = segment.as_ref().as_ref();
314                let mut stream = map.search(&matcher).into_stream();
315
316                while let Some((_, post_offset)) = stream.next() {
317                    let docs = segment.read_posting_list(post_offset);
318                    token_docs.extend(docs);
319
320                    if segment_doc_matches.is_none() && token_docs.len() > short_circuit_threshold {
321                        break;
322                    }
323                }
324
325                token_docs.sort_unstable();
326                token_docs.dedup();
327
328                if let Some(mut existing) = segment_doc_matches {
329                    existing.retain(|doc_id| token_docs.binary_search(doc_id).is_ok());
330                    segment_doc_matches = Some(existing);
331                } else {
332                    segment_doc_matches = Some(token_docs);
333                }
334
335                if segment_doc_matches.as_ref().is_some_and(|m| m.is_empty()) {
336                    break;
337                }
338            }
339
340            if let Some(valid_docs) = segment_doc_matches {
341                let mut enriched_docs: Vec<u128> = Vec::with_capacity(valid_docs.len());
342                let meta_mmap = segment.meta_map();
343
344                for &doc_id in &valid_docs {
345                    let byte_offset = (doc_id as usize) * size_of::<u128>();
346                    let packed_bytes: [u8; 16] = meta_mmap
347                        [byte_offset..byte_offset + size_of::<u128>()]
348                        .try_into()
349                        .expect("failed to unpack");
350                    let packed_val = u128::from_le_bytes(packed_bytes);
351
352                    // Filter categories - TODO
353                    /*if let Some(category) = options.category {
354                        let (_, _, _, _, doc_category) = SegmentedIndex::unpack_u128(packed_val);
355                        if doc_category != category as u16 {
356                            continue;
357                        }
358                    }*/
359
360                    enriched_docs.push(packed_val);
361                }
362
363                enriched_docs.sort_unstable_by(|&a, &b| {
364                    let (_, a_modified_at, a_depth, a_dir) = SegmentedIndex::unpack_u128(a);
365                    let (_, b_modified_at, b_depth, b_dir) = SegmentedIndex::unpack_u128(b);
366
367                    b_dir
368                        .cmp(&a_dir)
369                        .then_with(|| a_depth.cmp(&b_depth))
370                        .then_with(|| b_modified_at.cmp(&a_modified_at))
371                });
372
373                enriched_docs.truncate(scoring_cap);
374
375                for packed_val in enriched_docs {
376                    let (dat_offset, _, _, _) = SegmentedIndex::unpack_u128(packed_val);
377
378                    if let Some((path, volume, entry)) = segment.read_document(dat_offset) {
379                        let path_bytes = path.as_bytes();
380
381                        if is_tombstoned(path_bytes, entry.opstamp.sequence(), &active_tombstones) {
382                            continue;
383                        }
384
385                        let matches_all = tokens
386                            .iter()
387                            .all(|t| path_bytes.find_iter(t.as_bytes()).next().is_some());
388
389                        if !matches_all {
390                            continue;
391                        }
392                        candidates
393                            .entry(path)
394                            .and_modify(|(current_volume, current_entry)| {
395                                if entry.opstamp.sequence() > current_entry.opstamp.sequence() {
396                                    *current_entry = entry;
397                                    *current_volume = volume.clone();
398                                }
399                            })
400                            .or_insert((volume, entry));
401                    }
402                }
403            }
404        }
405
406        let mut results: Vec<_> = candidates
407            .into_iter()
408            .filter(|(_, (_, entry))| !entry.opstamp.is_deletion())
409            .map(|(path, (volume, entry))| (path, volume, entry))
410            .collect();
411
412        // Rough top-k
413        if results.len() > scoring_cap {
414            results.select_nth_unstable_by(scoring_cap, |a, b| {
415                b.2.last_modified.cmp(&a.2.last_modified)
416            });
417            results.truncate(scoring_cap);
418        }
419
420        let now_micros = std::time::SystemTime::now()
421            .duration_since(std::time::UNIX_EPOCH)
422            .expect("failed to get system time")
423            .as_micros() as f64;
424
425        let config = if let Some(config) = options.scoring {
426            config
427        } else {
428            &ScoringConfig::default()
429        };
430
431        let mut scored: Vec<_> = results
432            .into_iter()
433            .map(|(path, volume, entry)| {
434                let score = crate::search::compute_score(
435                    config,
436                    &path,
437                    &tokens,
438                    entry.last_modified,
439                    entry.kind,
440                    now_micros,
441                );
442                SearchResult {
443                    path: PathBuf::from(path),
444                    volume: volume,
445                    kind: entry.kind,
446                    last_modified: entry.last_modified,
447                    last_accessed: entry.last_accessed,
448                    score,
449                }
450            })
451            .collect();
452
453        scored.sort();
454
455        let paginated_results = scored.into_iter().skip(offset).take(limit).collect();
456
457        Ok(paginated_results)
458    }
459
460    /// Force index compaction, minimizing the amount of disk space
461    /// utilized by the index.
462    /// NOTE: this operation is very IO intensive and can take some time
463    pub fn force_compact_all(&self) -> Result<(), IndexError> {
464        if let Ok(mut flusher) = self.flusher.write()
465            && let Some(handle) = flusher.take()
466        {
467            log::debug!("Waiting for background flush to finish...");
468            let _ = handle.join();
469        }
470
471        if let Ok(mut compactor) = self.compactor.write()
472            && let Some(handle) = compactor.take()
473        {
474            log::debug!("Waiting for background compactor to finish...");
475            let _ = handle.join();
476        }
477
478        let snapshot = {
479            let base = self.base.read().map_err(|_| IndexError::ReadLock)?;
480            let segments = base.snapshot();
481
482            // If we have 1 or 0 segments, the database is already perfectly compacted!
483            if segments.len() <= 1 {
484                log::debug!("Database is already fully compacted.");
485                return Ok(());
486            }
487            segments
488        };
489
490        log::debug!("Forcing full compaction of {} segments...", snapshot.len());
491
492        let compactor_seq = self.next_op_seq.fetch_add(1, Ordering::SeqCst);
493
494        let tmp_path = self.path.join(format!("{}.tmp", compactor_seq));
495
496        let snapshot_tombstones = {
497            let guard = self.prefix_tombstones.read().expect("lock poisoned");
498            guard.clone()
499        };
500
501        compactor::merge_segments(&snapshot, snapshot_tombstones, tmp_path.clone())
502            .map_err(|e| IndexError::Io(std::io::Error::other(e)))?;
503
504        let mut base_guard = self.base.write().map_err(|_| IndexError::WriteLock)?;
505        base_guard
506            .apply_compaction(&snapshot, tmp_path)
507            .map_err(|e| IndexError::Io(std::io::Error::other(e)))?;
508
509        log::debug!("Full compaction complete");
510        Ok(())
511    }
512
513    fn should_flush(&self) -> bool {
514        self.mem_idx.read().unwrap().len() > self.compactor_config.flush_threshold
515            || self.prefix_tombstones.read().unwrap().len()
516                > self.compactor_config.tombstone_threshold
517    }
518
519    fn trigger_flush(&self) -> Result<(), IndexError> {
520        if let Some(ref flusher) = *self.flusher.read().expect("failed to read flusher")
521            && !flusher.is_finished()
522        {
523            return Ok(());
524        }
525        let mut mem = self.mem_idx.write().expect("failed to lock memory");
526        let mut wal = self.wal.write().expect("failed to lock wal");
527
528        if mem.is_empty() {
529            return Ok(());
530        }
531
532        let snapshot = std::mem::take(&mut *mem);
533        let path = self.path.clone();
534        let next_seq = self.next_op_seq();
535
536        let flushing_path = path.join(format!("journal.{}.flushing.wal", next_seq));
537        wal.rotate(&flushing_path).map_err(IndexError::Io)?;
538
539        // Re-write tombstones to the WAL until a full compaction runs.
540        let tombstones = self
541            .prefix_tombstones
542            .read()
543            .map_err(|_| IndexError::ReadLock)?;
544        for (prefix, seq) in tombstones.iter() {
545            wal.write_prefix_tombstone(prefix, *seq)?;
546        }
547
548        drop(tombstones);
549        drop(wal);
550        drop(mem);
551
552        let base = Arc::clone(&self.base);
553        let min_merge_count = self.compactor_config.min_merge_count;
554        let compactor_lock = Arc::clone(&self.compactor);
555        let op_seq = Arc::clone(&self.next_op_seq);
556        let prefix_tombstones = Arc::clone(&self.prefix_tombstones);
557
558        let flusher = std::thread::Builder::new()
559            .name("minidex-flush".to_owned())
560            .spawn(move || {
561                let final_segment_path = path.join(format!("{}", next_seq));
562                let tmp_segment_path = path.join(format!("{}.tmp", next_seq));
563
564                {
565                    let mut base_guard = base.write().expect("failed to lock base");
566
567                    if let Err(e) = base_guard.write_segment(
568                        &tmp_segment_path,
569                        snapshot
570                            .into_iter()
571                            .map(|(path, (volume, entry))| (path, volume, entry)),
572                    ) {
573                        log::error!("flush failed to write: {}", e);
574                        let tmp_paths = Segment::paths_with_additional_extension(&tmp_segment_path);
575                        Segment::remove_files(&tmp_paths);
576                        return;
577                    }
578
579                    let tmp_paths = Segment::paths_with_additional_extension(&tmp_segment_path);
580
581                    let final_paths = Segment::paths_with_additional_extension(&final_segment_path);
582
583                    let _ = Segment::rename_files(&tmp_paths, &final_paths);
584                    base_guard
585                        .load(&final_segment_path)
586                        .expect("failed to reload segment during flush");
587                }
588
589                if let Err(e) = std::fs::remove_file(&flushing_path) {
590                    log::error!("failed to delete rotated WAL: {}", e);
591                }
592
593                let snapshot = {
594                    let base = base.read().expect("failed to read-lock base");
595                    if base.segments().count() <= min_merge_count {
596                        return;
597                    }
598
599                    base.snapshot()
600                };
601
602                let mut compactor_guard = compactor_lock
603                    .write()
604                    .expect("failed to acquire compactor write-lock");
605                if let Some(handle) = compactor_guard.as_ref()
606                    && !handle.is_finished()
607                {
608                    return;
609                }
610
611                *compactor_guard = Self::compact(base, path, snapshot, prefix_tombstones, op_seq);
612            })
613            .map_err(IndexError::Io)?;
614
615        *self.flusher.write().unwrap() = Some(flusher);
616        Ok(())
617    }
618
619    fn compact(
620        base: Arc<RwLock<SegmentedIndex>>,
621        path: PathBuf,
622        snapshot: Vec<Arc<Segment>>,
623        prefix_tombstones: Arc<RwLock<Vec<(String, u64)>>>,
624        next_op_seq: Arc<AtomicU64>,
625    ) -> Option<JoinHandle<()>> {
626        if snapshot.is_empty() {
627            return None;
628        }
629
630        std::thread::Builder::new()
631            .name("minidex-compactor".to_string())
632            .spawn(move || {
633                let next_seq = next_op_seq.fetch_add(1, Ordering::SeqCst);
634                let tmp_path = path.join(format!("{}.tmp", next_seq));
635
636                log::debug!("Starting compaction with {} segments", snapshot.len());
637                let snapshot_tombstones = { prefix_tombstones.read().unwrap().clone() };
638                match compactor::merge_segments(&snapshot, snapshot_tombstones, tmp_path.clone()) {
639                    Ok(compactor_seq) => {
640                        let mut base_guard = base
641                            .write()
642                            .expect("failed to lock base for compaction apply");
643                        if let Err(e) = base_guard.apply_compaction(&snapshot, tmp_path) {
644                            log::error!("Failed to apply compaction: {}", e);
645                        }
646                        let mut tombstones = prefix_tombstones.write().unwrap();
647                        tombstones.retain(|(_, seq)| *seq >= compactor_seq);
648                        log::debug!("Compaction finished");
649                    }
650                    Err(e) => log::error!("Compaction failed: {}", e),
651                }
652            })
653            .ok()
654    }
655}
656
657impl Drop for Index {
658    fn drop(&mut self) {
659        let _ = self.sync();
660
661        if let Ok(mut flusher) = self.flusher.write()
662            && let Some(flusher) = flusher.take()
663        {
664            let _ = flusher.join();
665        }
666
667        if let Ok(mut compactor) = self.compactor.write()
668            && let Some(compactor) = compactor.take()
669        {
670            let _ = compactor.join();
671        }
672    }
673}
674
675#[derive(Debug, Error)]
676pub enum IndexError {
677    #[error("failed to open index on disk: {0}")]
678    Open(std::io::Error),
679    #[error("failed to read lock data")]
680    ReadLock,
681    #[error("failed to write lock data")]
682    WriteLock,
683    #[error(transparent)]
684    SegmentedIndex(SegmentedIndexError),
685    #[error("failed to compile matching regex: {0}")]
686    Regex(String),
687    #[error("io error: {0}")]
688    Io(#[from] std::io::Error),
689}
690
691#[inline]
692pub(crate) fn is_tombstoned(
693    path_bytes: &[u8],
694    sequence: u64,
695    active_tombstones: &[(String, u64)],
696) -> bool {
697    active_tombstones.iter().any(|(prefix, stamp)| {
698        let prefix_bytes = prefix.as_bytes();
699        path_bytes.len() >= prefix_bytes.len()
700            && path_bytes[..prefix_bytes.len()].eq_ignore_ascii_case(prefix_bytes)
701            && sequence < *stamp
702    })
703}