Skip to main content

minidex/
lib.rs

1use std::{
2    collections::BTreeMap,
3    path::{Path, PathBuf},
4    sync::{
5        Arc, RwLock,
6        atomic::{AtomicU64, Ordering},
7    },
8    thread::JoinHandle,
9};
10
11use common::is_tombstoned;
12use fst::{Automaton as _, IntoStreamer as _, Streamer, automaton::Str};
13
14use thiserror::Error;
15
16mod collector;
17mod common;
18mod leb128;
19use collector::*;
20pub use common::{Kind, VolumeType, category};
21mod entry;
22pub use entry::FilesystemEntry;
23use entry::*;
24mod segmented_index;
25pub use segmented_index::compactor::*;
26use segmented_index::*;
27mod opstamp;
28use opstamp::*;
29use wal::Wal;
30mod search;
31mod tokenizer;
32pub use tokenizer::tokenize;
33mod wal;
34pub use search::{ScoringConfig, ScoringInputs, ScoringWeights, SearchOptions, SearchResult};
35
36pub type Tombstone = (Option<String>, String, u64);
37
38/// A Minidex Index, managing both the in-memory and disk data.
39/// Insertions and deletions auto-commit to the Write-Ahead Log
40/// and may trigger compaction.
41pub struct Index {
42    path: PathBuf,
43    base: Arc<RwLock<SegmentedIndex>>,
44    next_op_seq: Arc<AtomicU64>,
45    mem_idx: RwLock<BTreeMap<String, (String, IndexEntry)>>,
46    wal: RwLock<Wal>,
47    compactor_config: segmented_index::compactor::CompactorConfig,
48    compactor: Arc<RwLock<Option<JoinHandle<()>>>>,
49    flusher: Arc<RwLock<Option<JoinHandle<()>>>>,
50    prefix_tombstones: Arc<RwLock<Arc<Vec<Tombstone>>>>,
51}
52
53impl Index {
54    /// Open the index on disk with a default compactor configuration.
55    /// This function will:
56    /// 1. Create (if it doesn't exist) the directory at `path`
57    /// 2. Try to obtain a lock on the directory
58    /// 3. Load the discovered segments, data and posting
59    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, IndexError> {
60        Self::open_with_config(path, CompactorConfig::default())
61    }
62
63    /// Open the index on disk with a custom compactor configuration.
64    /// This function will:
65    /// 1. Create (if it doesn't exist) the directory at `path`
66    /// 2. Try to obtain a lock on the directory
67    /// 3. Load the discovered segments, data and posting
68    pub fn open_with_config<P: AsRef<Path>>(
69        path: P,
70        compactor_config: CompactorConfig,
71    ) -> Result<Self, IndexError> {
72        let base = SegmentedIndex::open(&path).map_err(IndexError::SegmentedIndex)?;
73
74        let base = Arc::new(RwLock::new(base));
75        let mut max_seq = 0u64;
76        let mut mem_idx = BTreeMap::new();
77
78        let mut prefix_tombstones = Vec::new();
79
80        let mut apply_replay = |replay_data: crate::wal::ReplayData| {
81            for (path, volume, entry) in replay_data.inserts {
82                max_seq = max_seq.max(entry.opstamp.sequence());
83                mem_idx.insert(path, (volume, entry));
84            }
85            for (volume, prefix, seq) in replay_data.tombstones {
86                max_seq = max_seq.max(seq);
87                prefix_tombstones.push((volume, prefix, seq));
88            }
89        };
90
91        let entries = path.as_ref().read_dir().map_err(IndexError::Io)?;
92
93        let mut flushing_wals = Vec::new();
94
95        // Recover partial, flushing WAL files
96        for entry in entries {
97            if let Ok(e) = entry
98                && let Ok(file_type) = e.file_type()
99                && file_type.is_file()
100                && e.file_name().to_string_lossy().ends_with(".flushing.wal")
101            {
102                flushing_wals.push(e.path());
103            }
104        }
105        flushing_wals.sort_unstable();
106
107        for wal_path in flushing_wals {
108            let partial = Wal::replay(wal_path).map_err(IndexError::Io)?;
109
110            apply_replay(partial);
111        }
112
113        let wal_path = path.as_ref().join("journal.wal");
114
115        let recovered = Wal::replay(&wal_path).map_err(IndexError::Io)?;
116        apply_replay(recovered);
117
118        let next_op_seq = Arc::new(AtomicU64::new(max_seq + 1));
119
120        let wal = Wal::open(&wal_path).map_err(IndexError::Io)?;
121
122        Ok(Self {
123            path: path.as_ref().to_path_buf(),
124            base,
125            next_op_seq,
126            mem_idx: RwLock::new(mem_idx),
127            wal: RwLock::new(wal),
128            compactor_config,
129            compactor: Arc::new(RwLock::new(None)),
130            flusher: Arc::new(RwLock::new(None)),
131            prefix_tombstones: Arc::new(RwLock::new(Arc::new(prefix_tombstones))),
132        })
133    }
134
135    fn next_op_seq(&self) -> u64 {
136        self.next_op_seq
137            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
138    }
139
140    /// Insert a filesystem entry into the index.
141    pub fn insert(&self, item: FilesystemEntry) -> Result<(), IndexError> {
142        let threshold = self.compactor_config.flush_threshold * 3;
143
144        // Backpressure mechanism - block inserts if we're blowing through
145        // the flushing threshold.
146        if self.mem_idx.read().map_err(|_| IndexError::ReadLock)?.len() > threshold {
147            let flusher = {
148                self.flusher
149                    .write()
150                    .map_err(|_| IndexError::WriteLock)?
151                    .take()
152            };
153
154            if let Some(handle) = flusher {
155                let _ = handle.join();
156            }
157
158            let _ = self.trigger_flush();
159        }
160
161        let seq = self.next_op_seq();
162        let path_str = item.path.to_string_lossy().to_string();
163        let volume = item.volume;
164        let entry = IndexEntry {
165            opstamp: Opstamp::insertion(seq),
166            kind: item.kind,
167            last_modified: item.last_modified,
168            last_accessed: item.last_accessed,
169            category: item.category,
170            volume_type: item.volume_type,
171        };
172
173        {
174            let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
175            wal.append(&path_str, &volume, &entry)
176                .map_err(IndexError::Io)?;
177        }
178
179        {
180            self.mem_idx
181                .write()
182                .map_err(|_| IndexError::WriteLock)?
183                .insert(path_str, (volume, entry));
184        }
185
186        if self.should_flush() {
187            let _ = self.trigger_flush();
188        }
189
190        Ok(())
191    }
192
193    pub fn delete(&self, item: &Path) -> Result<(), IndexError> {
194        let seq = self.next_op_seq();
195
196        let path_str = item.to_string_lossy().to_string();
197        let entry = IndexEntry {
198            opstamp: Opstamp::deletion(seq),
199            kind: Kind::File,
200            last_modified: 0,
201            last_accessed: 0,
202            category: 0,
203            volume_type: common::VolumeType::Local,
204        };
205
206        {
207            let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
208            wal.append(&path_str, "", &entry).map_err(IndexError::Io)?;
209        }
210
211        {
212            self.mem_idx
213                .write()
214                .map_err(|_| IndexError::WriteLock)?
215                .insert(path_str, ("".to_owned(), entry));
216        }
217
218        if self.should_flush() {
219            let _ = self.trigger_flush();
220        }
221
222        Ok(())
223    }
224
225    /// Deletes all index entries under the given prefix, across all volumes
226    pub fn delete_prefix(&self, prefix: &str) -> Result<(), IndexError> {
227        self.delete_by_volume_name(None, prefix)
228    }
229
230    /// Deletes all index items under the given prefix,
231    /// belonging to the given volume. If volume is `None`, we delete
232    /// all entries for the prefix across all volumes.
233    pub fn delete_by_volume_name(
234        &self,
235        volume: Option<&str>,
236        prefix: &str,
237    ) -> Result<(), IndexError> {
238        let seq = self.next_op_seq.fetch_add(1, Ordering::SeqCst);
239        let normalized_prefix = prefix
240            .replace(['/', '\\'], std::path::MAIN_SEPARATOR_STR)
241            .to_lowercase();
242        {
243            let mut tombstones = self
244                .prefix_tombstones
245                .write()
246                .map_err(|_| IndexError::WriteLock)?;
247
248            Arc::make_mut(&mut tombstones).push((
249                volume.map(|s| s.to_string()),
250                normalized_prefix.clone(),
251                seq,
252            ));
253        }
254
255        {
256            let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
257
258            wal.write_prefix_tombstone(volume, &normalized_prefix, seq)?;
259        }
260
261        Ok(())
262    }
263
264    /// Writes the in-memory index to disk.
265    /// This method can fail if the disk is not writable.
266    pub fn sync(&self) -> Result<(), IndexError> {
267        let mut wal = self.wal.write().map_err(|_| IndexError::WriteLock)?;
268        wal.flush().map_err(IndexError::Io)?;
269
270        Ok(())
271    }
272
273    /// Search the index for the given search term (usually a path or
274    /// file name), bound by limit and offset.
275    pub fn search(
276        &self,
277        query: &str,
278        limit: usize,
279        offset: usize,
280        options: SearchOptions<'_>,
281    ) -> Result<Vec<SearchResult>, IndexError> {
282        let mut tokens = crate::tokenizer::tokenize(query);
283
284        if tokens.is_empty() {
285            return Ok(Vec::new());
286        }
287
288        let query_lower = query.to_lowercase();
289        let raw_query_tokens: Vec<&str> = query_lower
290            .split(|c: char| !c.is_alphanumeric())
291            .filter(|s| !s.is_empty())
292            .collect();
293
294        tokens.sort_by_key(|b| std::cmp::Reverse(b.len()));
295
296        let segments = self.base.read().map_err(|_| IndexError::ReadLock)?;
297        let mem = self.mem_idx.read().map_err(|_| IndexError::ReadLock)?;
298
299        let required_matches = limit + offset;
300        let scoring_cap = std::cmp::max(500, required_matches * 3).min(1000);
301
302        let active_tombstones = self
303            .prefix_tombstones
304            .read()
305            .map_err(|_| IndexError::ReadLock)?
306            .clone();
307
308        let mut collector = LsmCollector::new(&active_tombstones);
309
310        let volume_type_mask = Self::compile_allowed_volume_mask(options.volume_type);
311
312        // In-memory searches
313        for (path, (volume, entry)) in mem.iter() {
314            let path_bytes = path.as_bytes();
315
316            if let Some(filter) = options.volume_name
317                && volume != filter
318            {
319                continue;
320            }
321
322            if let Some(category) = options.category
323                && entry.category & category == 0
324            {
325                continue;
326            }
327
328            if let Some(kind) = options.kind
329                && entry.kind != kind
330            {
331                continue;
332            }
333
334            if (volume_type_mask & (1 << entry.volume_type as u8)) == 0 {
335                continue;
336            }
337
338            let matches_all = if path.is_ascii() {
339                tokens.iter().all(|t| {
340                    let token_bytes = t.as_bytes();
341                    if path_bytes.len() < token_bytes.len() {
342                        return false;
343                    }
344                    path_bytes
345                        .windows(token_bytes.len())
346                        .enumerate()
347                        .any(|(idx, window)| {
348                            if window.eq_ignore_ascii_case(token_bytes) {
349                                if idx == 0 {
350                                    true
351                                } else {
352                                    !path_bytes[idx - 1].is_ascii_alphanumeric()
353                                }
354                            } else {
355                                false
356                            }
357                        })
358                })
359            } else {
360                let folded_path = crate::tokenizer::fold_path(path);
361                tokens.iter().all(|t| {
362                    folded_path.match_indices(t.as_str()).any(|(idx, _)| {
363                        if idx == 0 {
364                            true
365                        } else {
366                            !folded_path[..idx].chars().last().unwrap().is_alphanumeric()
367                        }
368                    })
369                })
370            };
371
372            if matches_all {
373                collector.insert(path.to_string(), volume.clone(), *entry);
374            }
375        }
376
377        // Disk searches
378        let mut token_docs = Vec::new();
379        let mut current_matches = Vec::new();
380        let mut swap_buffer = Vec::new();
381
382        let vol_token = options
383            .volume_name
384            .map(|vol| crate::tokenizer::synthesize_volume_token(&vol.to_lowercase()));
385
386        for segment in segments.segments() {
387            current_matches.clear();
388            let mut first_token = true;
389            let mut valid_matches = true;
390
391            if let Some(ref vol_token) = vol_token {
392                let map = segment.as_ref().as_ref();
393                if let Some(post_offset) = map.get(vol_token) {
394                    segment.append_posting_list(post_offset, &mut current_matches);
395                    current_matches.sort_unstable();
396                    current_matches.dedup();
397                    first_token = false;
398                } else {
399                    continue;
400                }
401            }
402
403            for token in &tokens {
404                // Skip on 0 matches
405                if !first_token && current_matches.is_empty() {
406                    valid_matches = false;
407                    break;
408                }
409
410                let matcher = Str::new(token).starts_with();
411
412                token_docs.clear();
413                let map = segment.as_ref().as_ref();
414                let mut stream = map.search(&matcher).into_stream();
415
416                while let Some((_, post_offset)) = stream.next() {
417                    segment.append_posting_list(post_offset, &mut token_docs);
418                }
419
420                token_docs.sort_unstable();
421                token_docs.dedup();
422
423                if first_token {
424                    std::mem::swap(&mut current_matches, &mut token_docs);
425                    first_token = false;
426                } else {
427                    let t_len = token_docs.len();
428                    let c_len = current_matches.len();
429
430                    if c_len * 10 < t_len || t_len * 10 < c_len {
431                        if c_len > t_len {
432                            // current_matches is massive, so we iterate on token docs isnstead
433                            swap_buffer.clear();
434
435                            for &doc_id in &token_docs {
436                                if current_matches.binary_search(&doc_id).is_ok() {
437                                    swap_buffer.push(doc_id);
438                                }
439                            }
440                            std::mem::swap(&mut current_matches, &mut swap_buffer);
441                        } else {
442                            current_matches
443                                .retain(|doc_id| token_docs.binary_search(doc_id).is_ok());
444                        }
445                    } else {
446                        // O(N+M) Two-Pointer traversal otherwise
447                        let mut j = 0;
448                        current_matches.retain(|&doc_id| {
449                            while j < t_len && token_docs[j] < doc_id {
450                                j += 1;
451                            }
452                            j < t_len && token_docs[j] == doc_id
453                        })
454                    }
455                }
456            }
457
458            if valid_matches && !current_matches.is_empty() {
459                let valid_docs = &current_matches;
460                let mut enriched_docs: Vec<u128> = Vec::with_capacity(valid_docs.len());
461                let meta_mmap = segment.meta_map();
462
463                for &doc_id in valid_docs {
464                    let byte_offset = (doc_id as usize) * size_of::<u128>();
465                    let packed_bytes: [u8; 16] = meta_mmap
466                        [byte_offset..byte_offset + size_of::<u128>()]
467                        .try_into()
468                        .expect("failed to unpack");
469                    let packed_val = u128::from_le_bytes(packed_bytes);
470
471                    let (_, _, _, _, is_dir, doc_category, vol_type) =
472                        SegmentedIndex::unpack_u128(packed_val);
473
474                    // Kind filter
475                    if let Some(kind) = options.kind {
476                        let is_target_dir = kind == Kind::Directory;
477                        if is_dir != is_target_dir {
478                            continue;
479                        }
480                    }
481
482                    // Filter categories
483                    if let Some(category) = options.category
484                        && doc_category & category == 0
485                    {
486                        continue;
487                    }
488
489                    // Filter volume type
490                    if (volume_type_mask & (1 << vol_type)) == 0 {
491                        continue;
492                    }
493
494                    enriched_docs.push(packed_val);
495                }
496
497                if enriched_docs.len() > scoring_cap {
498                    // O(N) quickselect
499                    enriched_docs.select_nth_unstable_by(scoring_cap, |&a, &b| {
500                        let (a_off, a_modified_at, a_accessed_at, a_depth, a_dir, _, _) =
501                            SegmentedIndex::unpack_u128(a);
502                        let (b_off, b_modified_at, b_accessed_at, b_depth, b_dir, _, _) =
503                            SegmentedIndex::unpack_u128(b);
504
505                        let a_recent = a_modified_at.max(a_accessed_at);
506                        let b_recent = b_modified_at.max(b_accessed_at);
507
508                        b_dir
509                            .cmp(&a_dir)
510                            .then_with(|| a_depth.cmp(&b_depth))
511                            .then_with(|| b_recent.cmp(&a_recent))
512                            .then_with(|| a_off.cmp(&b_off))
513                    });
514
515                    enriched_docs.truncate(scoring_cap);
516                }
517
518                // Re-sort by dat_offset ascending to align with in-disk layout
519                enriched_docs.sort_unstable_by_key(|&packed| {
520                    let (dat_offset, _, _, _, _, _, _) = SegmentedIndex::unpack_u128(packed);
521                    dat_offset
522                });
523
524                for packed_val in enriched_docs {
525                    let (dat_offset, _, _, _, _, _, _) = SegmentedIndex::unpack_u128(packed_val);
526
527                    if let Some((path, volume, entry)) = segment.read_document(dat_offset) {
528                        collector.insert(path, volume, entry);
529                    }
530                }
531            }
532        }
533
534        let mut results: Vec<_> = collector.finish().collect();
535
536        // Rough top-k
537        if results.len() > scoring_cap {
538            results.select_nth_unstable_by(scoring_cap, |a, b| {
539                let a_recent = a.2.last_modified.max(a.2.last_accessed);
540                let b_recent = b.2.last_modified.max(b.2.last_accessed);
541
542                b_recent.cmp(&a_recent).then_with(|| a.0.cmp(&b.0))
543            });
544            results.truncate(scoring_cap);
545        }
546
547        let now_micros = std::time::SystemTime::now()
548            .duration_since(std::time::UNIX_EPOCH)
549            .expect("failed to get system time")
550            .as_micros() as f64;
551
552        let config = if let Some(config) = options.scoring {
553            config
554        } else {
555            &ScoringConfig::default()
556        };
557
558        let weights = config.weights.unwrap_or_default();
559        let mut scored: Vec<_> = results
560            .into_iter()
561            .map(|(path, volume, entry)| {
562                let inputs = ScoringInputs {
563                    path: &path,
564                    query_tokens: &tokens,
565                    raw_query_tokens: &raw_query_tokens,
566                    last_modified: entry.last_modified,
567                    last_accessed: entry.last_accessed,
568                    kind: entry.kind,
569                    now_micros,
570                };
571
572                let score = (config.scoring_fn)(&weights, &inputs);
573
574                SearchResult {
575                    path: PathBuf::from(path),
576                    volume,
577                    volume_type: entry.volume_type,
578                    kind: entry.kind,
579                    last_modified: entry.last_modified,
580                    last_accessed: entry.last_accessed,
581                    category: entry.category,
582                    score,
583                }
584            })
585            .collect();
586
587        scored.sort();
588
589        let paginated_results = scored.into_iter().skip(offset).take(limit).collect();
590
591        Ok(paginated_results)
592    }
593
594    /// Retrieve all indexed files last accessed until the given timestamp (in seconds).
595    pub fn recent_files(
596        &self,
597        since: u64, // Renamed 'until' to 'since' for clarity
598        limit: usize,
599        offset: usize,
600        options: SearchOptions<'_>,
601    ) -> Result<Vec<SearchResult>, IndexError> {
602        let segments = self.base.read().unwrap();
603        let mem = self.mem_idx.read().unwrap();
604
605        let active_tombstones = self
606            .prefix_tombstones
607            .read()
608            .map_err(|_| IndexError::ReadLock)?
609            .clone();
610
611        let mut collector = LsmCollector::new(&active_tombstones);
612
613        let volume_type_mask = Self::compile_allowed_volume_mask(options.volume_type);
614
615        for (path, (volume, entry)) in mem.iter() {
616            if entry.last_accessed >= since {
617                if let Some(filter) = options.volume_name
618                    && volume != filter
619                {
620                    continue;
621                }
622                if let Some(category) = options.category
623                    && entry.category & category == 0
624                {
625                    continue;
626                }
627                if let Some(kind) = options.kind
628                    && entry.kind != kind
629                {
630                    continue;
631                }
632                if (volume_type_mask & (1 << entry.volume_type as u8)) == 0 {
633                    continue;
634                }
635                collector.insert(path.clone(), volume.clone(), *entry);
636            }
637        }
638
639        let required_matches = offset + limit;
640        // Buffer to account for items that might be filtered out by volume or tombstones
641        let disk_cap = required_matches + 500;
642
643        let mut disk_candidates: Vec<(&std::sync::Arc<Segment>, u128)> = Vec::new();
644
645        for segment in segments.segments() {
646            let meta_mmap = segment.meta_map();
647
648            for chunk in meta_mmap.chunks_exact(16) {
649                let packed = u128::from_le_bytes(chunk.try_into().unwrap());
650                let (_, _, accessed, _, is_dir, doc_category, doc_vol_type) =
651                    SegmentedIndex::unpack_u128(packed);
652
653                if accessed >= since {
654                    if let Some(target_kind) = options.kind {
655                        let is_target_dir = target_kind == Kind::Directory;
656                        if is_dir != is_target_dir {
657                            continue;
658                        }
659                    }
660
661                    if let Some(category) = options.category
662                        && doc_category & category == 0
663                    {
664                        continue;
665                    }
666
667                    if (volume_type_mask & (1 << doc_vol_type)) == 0 {
668                        continue;
669                    }
670
671                    // DO NOT read the document yet! Just save the integer.
672                    disk_candidates.push((segment, packed));
673                }
674            }
675        }
676
677        if disk_candidates.len() > disk_cap {
678            disk_candidates.select_nth_unstable_by(disk_cap, |a, b| {
679                let (a_off, a_mod, a_acc, _, _, _, _) = SegmentedIndex::unpack_u128(a.1);
680                let (b_off, b_mod, b_acc, _, _, _, _) = SegmentedIndex::unpack_u128(b.1);
681                b_acc
682                    .cmp(&a_acc) // Sort descending by access time
683                    .then_with(|| b_mod.cmp(&a_mod)) // Then by modified time
684                    .then_with(|| a_off.cmp(&b_off)) // Then by on-disk offset (ascending)
685            });
686            disk_candidates.truncate(disk_cap);
687        }
688
689        for (segment, packed) in disk_candidates {
690            let (dat_offset, _, _, _, _, _, _) = SegmentedIndex::unpack_u128(packed);
691
692            if let Some((path, volume, entry)) = segment.read_document(dat_offset) {
693                if let Some(filter) = options.volume_name
694                    && volume != filter
695                {
696                    continue;
697                }
698                collector.insert(path, volume, entry);
699            }
700        }
701
702        let mut results: Vec<_> = collector.finish().collect();
703
704        if results.len() > required_matches {
705            results.select_nth_unstable_by(required_matches, |a, b| {
706                b.2.last_accessed
707                    .cmp(&a.2.last_accessed)
708                    .then_with(|| b.2.last_modified.cmp(&a.2.last_modified))
709                    .then_with(|| a.0.cmp(&b.0))
710            });
711            results.truncate(required_matches);
712        }
713
714        results.sort_unstable_by(|a, b| {
715            b.2.last_accessed
716                .cmp(&a.2.last_accessed)
717                .then_with(|| b.2.last_modified.cmp(&a.2.last_modified))
718                .then_with(|| a.0.cmp(&b.0))
719        });
720
721        let paginated_results = results
722            .into_iter()
723            .skip(offset)
724            .take(limit)
725            .map(|(path, volume, entry)| SearchResult {
726                path: PathBuf::from(path),
727                volume,
728                volume_type: entry.volume_type,
729                kind: entry.kind,
730                last_modified: entry.last_modified,
731                last_accessed: entry.last_accessed,
732                category: entry.category,
733                score: 0.0,
734            })
735            .collect();
736
737        Ok(paginated_results)
738    }
739
740    /// Force index compaction, minimizing the amount of disk space
741    /// utilized by the index.
742    /// NOTE: this operation is very IO intensive and can take some time
743    pub fn force_compact_all(&self) -> Result<(), IndexError> {
744        // Force all data to be flushed before proceeding
745        loop {
746            if let Ok(mut flusher) = self.flusher.write()
747                && let Some(handle) = flusher.take()
748            {
749                log::debug!("Waiting for background flush to finish...");
750                let _ = handle.join();
751            }
752
753            if self
754                .mem_idx
755                .read()
756                .map_err(|_| IndexError::ReadLock)?
757                .is_empty()
758            {
759                break;
760            }
761
762            self.trigger_flush()?;
763        }
764
765        if let Ok(mut compactor) = self.compactor.write()
766            && let Some(handle) = compactor.take()
767        {
768            log::debug!("Waiting for background compactor to finish...");
769            let _ = handle.join();
770        }
771
772        let snapshot = {
773            let base = self.base.read().map_err(|_| IndexError::ReadLock)?;
774            let segments = base.snapshot();
775
776            // If we have 1 or 0 segments, the database is already perfectly compacted!
777            if segments.len() <= 1 {
778                log::debug!("Database is already fully compacted.");
779                return Ok(());
780            }
781            segments
782        };
783
784        log::debug!("Forcing full compaction of {} segments...", snapshot.len());
785
786        let compactor_seq = self.next_op_seq.fetch_add(1, Ordering::SeqCst);
787
788        let tmp_path = self.path.join(format!("{}.tmp", compactor_seq));
789
790        let snapshot_tombstones = {
791            let guard = self.prefix_tombstones.read().expect("lock poisoned");
792            guard.clone()
793        };
794
795        compactor::merge_segments(&snapshot, snapshot_tombstones, tmp_path.clone())
796            .map_err(|e| IndexError::Io(std::io::Error::other(e)))?;
797
798        let mut base_guard = self.base.write().map_err(|_| IndexError::WriteLock)?;
799        base_guard
800            .apply_compaction(&snapshot, tmp_path)
801            .map_err(|e| IndexError::Io(std::io::Error::other(e)))?;
802
803        log::debug!("Full compaction complete");
804        Ok(())
805    }
806
807    fn should_flush(&self) -> bool {
808        self.mem_idx.read().unwrap().len() > self.compactor_config.flush_threshold
809            || self.prefix_tombstones.read().unwrap().len()
810                > self.compactor_config.tombstone_threshold
811    }
812
813    fn trigger_flush(&self) -> Result<(), IndexError> {
814        if let Some(ref flusher) = *self.flusher.read().expect("failed to read flusher")
815            && !flusher.is_finished()
816        {
817            return Ok(());
818        }
819        let mut mem = self.mem_idx.write().expect("failed to lock memory");
820        let mut wal = self.wal.write().expect("failed to lock wal");
821
822        if mem.is_empty() {
823            return Ok(());
824        }
825
826        let snapshot = std::mem::take(&mut *mem);
827        let path = self.path.clone();
828        let next_seq = self.next_op_seq();
829
830        let flushing_path = path.join(format!("journal.{}.flushing.wal", next_seq));
831        wal.rotate(&flushing_path).map_err(IndexError::Io)?;
832
833        // Re-write tombstones to the WAL until a full compaction runs.
834        let tombstones_cow = { self.prefix_tombstones.read().unwrap().clone() };
835        for (volume, prefix, seq) in tombstones_cow.iter() {
836            wal.write_prefix_tombstone(volume.as_deref(), prefix, *seq)?;
837        }
838
839        drop(wal);
840        drop(mem);
841
842        let base = Arc::clone(&self.base);
843        let min_merge_count = self.compactor_config.min_merge_count;
844        let compactor_lock = Arc::clone(&self.compactor);
845        let op_seq = Arc::clone(&self.next_op_seq);
846        let prefix_tombstones = Arc::clone(&self.prefix_tombstones);
847
848        let flusher = std::thread::Builder::new()
849            .name("minidex-flush".to_owned())
850            .spawn(move || {
851                let final_segment_path = path.join(format!("{}", next_seq));
852                let tmp_segment_path = path.join(format!("{}.tmp", next_seq));
853
854                if let Err(e) = SegmentedIndex::build_segment_files(
855                    &tmp_segment_path,
856                    snapshot
857                        .into_iter()
858                        .map(|(path, (volume, entry))| (path, volume, entry)),
859                    false,
860                ) {
861                    log::error!("flush failed to write: {}", e);
862                    let tmp_paths = Segment::paths_with_additional_extension(&tmp_segment_path);
863                    Segment::remove_files(&tmp_paths);
864                    return;
865                }
866
867                let tmp_paths = Segment::paths_with_additional_extension(&tmp_segment_path);
868
869                let final_paths = Segment::paths_with_additional_extension(&final_segment_path);
870
871                let _ = Segment::rename_files(&tmp_paths, &final_paths);
872
873                let new_segment =
874                    Arc::new(Segment::load(final_segment_path).expect("failed to load"));
875                {
876                    let mut base_guard = base.write().expect("failed to lock base");
877                    base_guard.add_segment(new_segment);
878                }
879
880                if let Err(e) = std::fs::remove_file(&flushing_path) {
881                    log::error!("failed to delete rotated WAL: {}", e);
882                }
883
884                let snapshot = {
885                    let base = base.read().expect("failed to read-lock base");
886                    if base.segments().count() <= min_merge_count {
887                        return;
888                    }
889
890                    base.snapshot()
891                };
892
893                let mut compactor_guard = compactor_lock
894                    .write()
895                    .expect("failed to acquire compactor write-lock");
896                if let Some(handle) = compactor_guard.as_ref()
897                    && !handle.is_finished()
898                {
899                    return;
900                }
901
902                *compactor_guard = Self::compact(base, path, snapshot, prefix_tombstones, op_seq);
903            })
904            .map_err(IndexError::Io)?;
905
906        *self.flusher.write().unwrap() = Some(flusher);
907        Ok(())
908    }
909
910    fn compact(
911        base: Arc<RwLock<SegmentedIndex>>,
912        path: PathBuf,
913        snapshot: Vec<Arc<Segment>>,
914        prefix_tombstones: Arc<RwLock<Arc<Vec<Tombstone>>>>,
915        next_op_seq: Arc<AtomicU64>,
916    ) -> Option<JoinHandle<()>> {
917        if snapshot.is_empty() {
918            return None;
919        }
920
921        std::thread::Builder::new()
922            .name("minidex-compactor".to_string())
923            .spawn(move || {
924                let next_seq = next_op_seq.fetch_add(1, Ordering::SeqCst);
925                let tmp_path = path.join(format!("{}.tmp", next_seq));
926
927                log::debug!("Starting compaction with {} segments", snapshot.len());
928                let snapshot_tombstones = { prefix_tombstones.read().unwrap().clone() };
929                match compactor::merge_segments(&snapshot, snapshot_tombstones, tmp_path.clone()) {
930                    Ok(compactor_seq) => {
931                        let mut base_guard = base
932                            .write()
933                            .expect("failed to lock base for compaction apply");
934                        if let Err(e) = base_guard.apply_compaction(&snapshot, tmp_path) {
935                            log::error!("Failed to apply compaction: {}", e);
936                        }
937                        let mut tombstones = prefix_tombstones.write().unwrap();
938                        Arc::make_mut(&mut tombstones).retain(|(_, _, seq)| *seq >= compactor_seq);
939
940                        log::debug!("Compaction finished");
941                    }
942                    Err(e) => log::error!("Compaction failed: {}", e),
943                }
944            })
945            .ok()
946    }
947
948    fn compile_allowed_volume_mask(allowed_volume_types: Option<&[VolumeType]>) -> u8 {
949        match allowed_volume_types {
950            Some(allowed) => allowed.iter().fold(0, |acc, &vt| acc | (1 << (vt as u8))),
951            None => 0b0000_1111,
952        }
953    }
954}
955
956impl Drop for Index {
957    fn drop(&mut self) {
958        let _ = self.sync();
959
960        if let Ok(mut flusher) = self.flusher.write()
961            && let Some(flusher) = flusher.take()
962        {
963            let _ = flusher.join();
964        }
965
966        if let Ok(mut compactor) = self.compactor.write()
967            && let Some(compactor) = compactor.take()
968        {
969            let _ = compactor.join();
970        }
971    }
972}
973
974#[derive(Debug, Error)]
975pub enum IndexError {
976    #[error("failed to open index on disk: {0}")]
977    Open(std::io::Error),
978    #[error("failed to read lock data")]
979    ReadLock,
980    #[error("failed to write lock data")]
981    WriteLock,
982    #[error(transparent)]
983    SegmentedIndex(SegmentedIndexError),
984    #[error("failed to compile matching regex: {0}")]
985    Regex(String),
986    #[error("io error: {0}")]
987    Io(#[from] std::io::Error),
988}
989
990#[cfg(test)]
991mod tests {
992    use super::*;
993    use crate::common::{VolumeType, category};
994
995    #[test]
996    fn test_index_basic_lifecycle() -> Result<(), IndexError> {
997        let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_{}", rand_id()));
998        std::fs::create_dir_all(&temp_dir)?;
999
1000        let sep = std::path::MAIN_SEPARATOR_STR;
1001        let path1 = format!("{}foo{}bar.txt", sep, sep);
1002
1003        {
1004            let index = Index::open(&temp_dir)?;
1005            index.insert(FilesystemEntry {
1006                path: PathBuf::from(&path1),
1007                volume: "vol1".to_string(),
1008                kind: Kind::File,
1009                last_modified: 100,
1010                last_accessed: 100,
1011                category: category::TEXT,
1012                volume_type: VolumeType::Local,
1013            })?;
1014
1015            let results = index.search("bar", 10, 0, SearchOptions::default())?;
1016            assert_eq!(results.len(), 1);
1017            assert_eq!(results[0].path, PathBuf::from(&path1));
1018
1019            index.sync()?;
1020        }
1021
1022        // Reopen index and verify data is still there
1023        {
1024            let index = Index::open(&temp_dir)?;
1025            let results = index.search("bar", 10, 0, SearchOptions::default())?;
1026            assert_eq!(results.len(), 1);
1027            assert_eq!(results[0].path, PathBuf::from(&path1));
1028        }
1029
1030        std::fs::remove_dir_all(temp_dir)?;
1031        Ok(())
1032    }
1033
1034    #[test]
1035    fn test_index_flush_and_search() -> Result<(), IndexError> {
1036        let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_flush_{}", rand_id()));
1037        std::fs::create_dir_all(&temp_dir)?;
1038
1039        let config = CompactorConfig {
1040            flush_threshold: 1,
1041            ..Default::default()
1042        };
1043
1044        let sep = std::path::MAIN_SEPARATOR_STR;
1045
1046        let index = Index::open_with_config(&temp_dir, config)?;
1047        index.insert(FilesystemEntry {
1048            path: PathBuf::from(format!("{}foo{}a.txt", sep, sep)),
1049            volume: "vol1".to_string(),
1050            kind: Kind::File,
1051            last_modified: 100,
1052            last_accessed: 100,
1053            category: category::TEXT,
1054            volume_type: VolumeType::Local,
1055        })?;
1056
1057        // This insert should trigger a flush in the background
1058        index.insert(FilesystemEntry {
1059            path: PathBuf::from(format!("{}foo{}b.txt", sep, sep)),
1060            volume: "vol1".to_string(),
1061            kind: Kind::File,
1062            last_modified: 100,
1063            last_accessed: 100,
1064            category: category::TEXT,
1065            volume_type: VolumeType::Local,
1066        })?;
1067
1068        // Wait a bit for background flush
1069        std::thread::sleep(std::time::Duration::from_millis(500));
1070
1071        let results = index.search("foo", 10, 0, SearchOptions::default())?;
1072        assert_eq!(results.len(), 2);
1073
1074        std::fs::remove_dir_all(temp_dir)?;
1075        Ok(())
1076    }
1077
1078    #[test]
1079    fn test_index_prefix_delete() -> Result<(), IndexError> {
1080        let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_del_{}", rand_id()));
1081        std::fs::create_dir_all(&temp_dir)?;
1082
1083        let sep = std::path::MAIN_SEPARATOR_STR;
1084
1085        let index = Index::open(&temp_dir)?;
1086        index.insert(FilesystemEntry {
1087            path: PathBuf::from(format!("{}foo{}bar{}a.txt", sep, sep, sep)),
1088            volume: "vol1".to_string(),
1089            kind: Kind::File,
1090            last_modified: 100,
1091            last_accessed: 100,
1092            category: 0,
1093            volume_type: VolumeType::Local,
1094        })?;
1095        let other_path = format!("{}other{}b.txt", sep, sep);
1096        index.insert(FilesystemEntry {
1097            path: PathBuf::from(&other_path),
1098            volume: "vol1".to_string(),
1099            kind: Kind::File,
1100            last_modified: 100,
1101            last_accessed: 100,
1102            category: 0,
1103            volume_type: VolumeType::Local,
1104        })?;
1105
1106        // Delete everything under /foo
1107        index.delete_prefix(&format!("{}foo", sep))?;
1108
1109        let results = index.search("txt", 10, 0, SearchOptions::default())?;
1110        assert_eq!(results.len(), 1);
1111        assert_eq!(results[0].path, PathBuf::from(&other_path));
1112
1113        std::fs::remove_dir_all(temp_dir)?;
1114        Ok(())
1115    }
1116
1117    #[test]
1118    fn test_index_volume_prefix_delete() -> Result<(), IndexError> {
1119        let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_vol_del_{}", rand_id()));
1120        std::fs::create_dir_all(&temp_dir)?;
1121
1122        let sep = std::path::MAIN_SEPARATOR_STR;
1123
1124        let index = Index::open(&temp_dir)?;
1125        index.insert(FilesystemEntry {
1126            path: PathBuf::from(format!("{}foo{}bar{}a.txt", sep, sep, sep)),
1127            volume: "vol1".to_string(),
1128            kind: Kind::File,
1129            last_modified: 100,
1130            last_accessed: 100,
1131            category: 0,
1132            volume_type: VolumeType::Local,
1133        })?;
1134        index.insert(FilesystemEntry {
1135            path: PathBuf::from(format!("{}foo{}bar{}b.txt", sep, sep, sep)),
1136            volume: "vol2".to_string(),
1137            kind: Kind::File,
1138            last_modified: 100,
1139            last_accessed: 100,
1140            category: 0,
1141            volume_type: VolumeType::Local,
1142        })?;
1143
1144        // Delete /foo on vol1 only
1145        index.delete_by_volume_name(Some("vol1"), &format!("{}foo", sep))?;
1146
1147        let results = index.search("txt", 10, 0, SearchOptions::default())?;
1148        assert_eq!(results.len(), 1);
1149        assert_eq!(results[0].volume, "vol2");
1150
1151        std::fs::remove_dir_all(temp_dir)?;
1152        Ok(())
1153    }
1154
1155    #[test]
1156    fn test_index_compaction() -> Result<(), IndexError> {
1157        let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_comp_{}", rand_id()));
1158        std::fs::create_dir_all(&temp_dir)?;
1159
1160        let config = CompactorConfig {
1161            flush_threshold: 1,
1162            ..Default::default()
1163        };
1164
1165        let sep = std::path::MAIN_SEPARATOR_STR;
1166
1167        let index = Index::open_with_config(&temp_dir, config)?;
1168
1169        // Create 4 items to trigger 2 flushes (with flush_threshold=1)
1170        for i in 0..4 {
1171            index.insert(FilesystemEntry {
1172                path: PathBuf::from(format!("{}foo{}{}.txt", sep, sep, i)),
1173                volume: "vol1".to_string(),
1174                kind: Kind::File,
1175                last_modified: 100,
1176                last_accessed: 100,
1177                category: 0,
1178                volume_type: VolumeType::Local,
1179            })?;
1180            // Force wait for each flush
1181            std::thread::sleep(std::time::Duration::from_millis(200));
1182        }
1183
1184        // Wait for final flush to finish
1185        if let Ok(mut flusher) = index.flusher.write()
1186            && let Some(h) = flusher.take()
1187        {
1188            let _ = h.join();
1189        }
1190
1191        {
1192            let base = index.base.read().unwrap();
1193            assert!(
1194                base.segments().count() >= 2,
1195                "Should have at least 2 segments, got {}",
1196                base.segments().count()
1197            );
1198        }
1199
1200        index.force_compact_all()?;
1201
1202        {
1203            let base = index.base.read().unwrap();
1204            assert_eq!(base.segments().count(), 1);
1205        }
1206
1207        let results = index.search("foo", 10, 0, SearchOptions::default())?;
1208        assert_eq!(results.len(), 4);
1209
1210        std::fs::remove_dir_all(temp_dir)?;
1211        Ok(())
1212    }
1213
1214    #[test]
1215    fn test_index_recent_files() -> Result<(), IndexError> {
1216        let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_recent_{}", rand_id()));
1217        std::fs::create_dir_all(&temp_dir)?;
1218
1219        let sep = std::path::MAIN_SEPARATOR_STR;
1220
1221        let index = Index::open(&temp_dir)?;
1222        index.insert(FilesystemEntry {
1223            path: PathBuf::from(format!("{}foo{}old.txt", sep, sep)),
1224            volume: "vol1".to_string(),
1225            kind: Kind::File,
1226            last_modified: 100,
1227            last_accessed: 100, // Very old
1228            category: 0,
1229            volume_type: VolumeType::Local,
1230        })?;
1231        let new_path = format!("{}foo{}new.txt", sep, sep);
1232        index.insert(FilesystemEntry {
1233            path: PathBuf::from(&new_path),
1234            volume: "vol1".to_string(),
1235            kind: Kind::File,
1236            last_modified: 1000,
1237            last_accessed: 1000, // Newer
1238            category: 0,
1239            volume_type: VolumeType::Local,
1240        })?;
1241
1242        let results = index.recent_files(500, 10, 0, SearchOptions::default())?;
1243        assert_eq!(results.len(), 1);
1244        assert_eq!(results[0].path, PathBuf::from(&new_path));
1245
1246        std::fs::remove_dir_all(temp_dir)?;
1247        Ok(())
1248    }
1249
1250    #[test]
1251    fn test_index_search_filters() -> Result<(), IndexError> {
1252        let temp_dir = std::env::temp_dir().join(format!("minidex_test_lib_filter_{}", rand_id()));
1253        std::fs::create_dir_all(&temp_dir)?;
1254
1255        let sep = std::path::MAIN_SEPARATOR_STR;
1256
1257        let index = Index::open(&temp_dir)?;
1258        index.insert(FilesystemEntry {
1259            path: PathBuf::from(format!("{}vol1{}a.txt", sep, sep)),
1260            volume: "vol1".to_string(),
1261            kind: Kind::File,
1262            last_modified: 100,
1263            last_accessed: 100,
1264            category: category::TEXT,
1265            volume_type: VolumeType::Local,
1266        })?;
1267        index.insert(FilesystemEntry {
1268            path: PathBuf::from(format!("{}vol2{}b.txt", sep, sep)),
1269            volume: "vol2".to_string(),
1270            kind: Kind::File,
1271            last_modified: 100,
1272            last_accessed: 100,
1273            category: category::IMAGE,
1274            volume_type: VolumeType::Local,
1275        })?;
1276
1277        // Filter by volume
1278        let opts_vol1 = SearchOptions {
1279            volume_name: Some("vol1"),
1280            ..Default::default()
1281        };
1282        let res_vol1 = index.search("txt", 10, 0, opts_vol1)?;
1283        assert_eq!(res_vol1.len(), 1);
1284        assert_eq!(res_vol1[0].volume, "vol1");
1285
1286        // Filter by category
1287        let opts_img = SearchOptions {
1288            category: Some(category::IMAGE),
1289            ..Default::default()
1290        };
1291        let res_img = index.search("txt", 10, 0, opts_img)?;
1292        assert_eq!(res_img.len(), 1);
1293        assert_eq!(res_img[0].category, category::IMAGE);
1294
1295        // Filter by kind
1296        let opts_dir = SearchOptions {
1297            kind: Some(Kind::Directory),
1298            ..Default::default()
1299        };
1300        let res_dir = index.search("txt", 10, 0, opts_dir)?;
1301        assert_eq!(res_dir.len(), 0);
1302
1303        std::fs::remove_dir_all(temp_dir)?;
1304        Ok(())
1305    }
1306
1307    fn rand_id() -> u64 {
1308        std::time::SystemTime::now()
1309            .duration_since(std::time::UNIX_EPOCH)
1310            .unwrap()
1311            .as_nanos() as u64
1312    }
1313}