summavy/directory/
managed_directory.rs

1use std::collections::HashSet;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, RwLock, RwLockWriteGuard};
5use std::{io, result};
6
7use async_trait::async_trait;
8use crc32fast::Hasher;
9
10use crate::core::MANAGED_FILEPATH;
11use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
12use crate::directory::footer::{Footer, FooterProxy};
13use crate::directory::{
14    DirectoryLock, FileHandle, FileSlice, GarbageCollectionResult, Lock, WatchCallback,
15    WatchHandle, WritePtr, META_LOCK,
16};
17use crate::error::DataCorruption;
18use crate::Directory;
19
20/// Returns true if the file is "managed".
21/// Non-managed file are not subject to garbage collection.
22///
23/// Filenames that starts by a "." -typically locks-
24/// are not managed.
25fn is_managed(path: &Path) -> bool {
26    path.to_str()
27        .map(|p_str| !p_str.starts_with('.'))
28        .unwrap_or(true)
29}
30
31/// Wrapper of directories that keeps track of files created by Tantivy.
32///
33/// A managed directory is just a wrapper of a directory
34/// that keeps a (persisted) list of the files that
35/// have been created (and not deleted) by tantivy so far.
36///
37/// Thanks to this list, it implements a `garbage_collect` method
38/// that removes the files that were created by tantivy and are not
39/// useful anymore.
40#[derive(Debug)]
41pub struct ManagedDirectory {
42    directory: Box<dyn Directory>,
43    meta_informations: Arc<RwLock<MetaInformation>>,
44}
45
46#[derive(Debug, Default)]
47struct MetaInformation {
48    managed_paths: HashSet<PathBuf>,
49}
50
51/// Saves the file containing the list of existing files
52/// that were created by tantivy.
53fn save_managed_paths(
54    directory: &dyn Directory,
55    wlock: &RwLockWriteGuard<'_, MetaInformation>,
56) -> io::Result<()> {
57    let mut w = serde_json::to_vec(&wlock.managed_paths)?;
58    writeln!(&mut w)?;
59    directory.atomic_write(&MANAGED_FILEPATH, &w[..])?;
60    Ok(())
61}
62
63impl ManagedDirectory {
64    /// Wraps a directory as managed directory.
65    pub fn wrap(directory: Box<dyn Directory>) -> crate::Result<ManagedDirectory> {
66        match directory.atomic_read(&MANAGED_FILEPATH) {
67            Ok(data) => {
68                let managed_files_json = String::from_utf8_lossy(&data);
69                let managed_files: HashSet<PathBuf> = serde_json::from_str(&managed_files_json)
70                    .map_err(|e| {
71                        DataCorruption::new(
72                            MANAGED_FILEPATH.to_path_buf(),
73                            format!("Managed file cannot be deserialized: {:?}. ", e),
74                        )
75                    })?;
76                Ok(ManagedDirectory {
77                    directory,
78                    meta_informations: Arc::new(RwLock::new(MetaInformation {
79                        managed_paths: managed_files,
80                    })),
81                })
82            }
83            Err(OpenReadError::FileDoesNotExist(_)) => Ok(ManagedDirectory {
84                directory,
85                meta_informations: Arc::default(),
86            }),
87            io_err @ Err(OpenReadError::IoError { .. }) => Err(io_err.err().unwrap().into()),
88            Err(OpenReadError::IncompatibleIndex(incompatibility)) => {
89                // For the moment, this should never happen  `meta.json`
90                // do not have any footer and cannot detect incompatibility.
91                Err(crate::TantivyError::IncompatibleIndex(incompatibility))
92            }
93        }
94    }
95
96    /// Garbage collect unused files.
97    ///
98    /// Removes the files that were created by `tantivy` and are not
99    /// used by any segment anymore.
100    ///
101    /// * `living_files` - List of files that are still used by the index.
102    ///
103    /// The use a callback ensures that the list of living_files is computed
104    /// while we hold the lock on meta.
105    ///
106    /// This method does not panick nor returns errors.
107    /// If a file cannot be deleted (for permission reasons for instance)
108    /// an error is simply logged, and the file remains in the list of managed
109    /// files.
110    pub fn garbage_collect<L: FnOnce() -> HashSet<PathBuf>>(
111        &mut self,
112        get_living_files: L,
113    ) -> crate::Result<GarbageCollectionResult> {
114        info!("Garbage collect");
115        let mut files_to_delete = vec![];
116
117        // It is crucial to get the living files after acquiring the
118        // read lock of meta information. That way, we
119        // avoid the following scenario.
120        //
121        // 1) we get the list of living files.
122        // 2) someone creates a new file.
123        // 3) we start garbage collection and remove this file
124        // even though it is a living file.
125        //
126        // releasing the lock as .delete() will use it too.
127        {
128            let meta_informations_rlock = self
129                .meta_informations
130                .read()
131                .expect("Managed directory rlock poisoned in garbage collect.");
132
133            // The point of this second "file" lock is to enforce the following scenario
134            // 1) process B tries to load a new set of searcher.
135            // The list of segments is loaded
136            // 2) writer change meta.json (for instance after a merge or a commit)
137            // 3) gc kicks in.
138            // 4) gc removes a file that was useful for process B, before process B opened it.
139            match self.acquire_lock(&META_LOCK) {
140                Ok(_meta_lock) => {
141                    let living_files = get_living_files();
142                    for managed_path in &meta_informations_rlock.managed_paths {
143                        if !living_files.contains(managed_path) {
144                            files_to_delete.push(managed_path.clone());
145                        }
146                    }
147                }
148                Err(err) => {
149                    error!("Failed to acquire lock for GC");
150                    return Err(crate::TantivyError::from(err));
151                }
152            }
153        }
154
155        let mut failed_to_delete_files = vec![];
156        let mut deleted_files = vec![];
157
158        for file_to_delete in files_to_delete {
159            match self.delete(&file_to_delete) {
160                Ok(_) => {
161                    info!("Deleted {:?}", file_to_delete);
162                    deleted_files.push(file_to_delete);
163                }
164                Err(file_error) => {
165                    match file_error {
166                        DeleteError::FileDoesNotExist(_) => {
167                            deleted_files.push(file_to_delete.clone());
168                        }
169                        DeleteError::IoError { .. } => {
170                            failed_to_delete_files.push(file_to_delete.clone());
171                            if !cfg!(target_os = "windows") {
172                                // On windows, delete is expected to fail if the file
173                                // is mmapped.
174                                error!("Failed to delete {:?}", file_to_delete);
175                            }
176                        }
177                    }
178                }
179            }
180        }
181
182        if !deleted_files.is_empty() {
183            // update the list of managed files by removing
184            // the file that were removed.
185            let mut meta_informations_wlock = self
186                .meta_informations
187                .write()
188                .expect("Managed directory wlock poisoned (2).");
189            let managed_paths_write = &mut meta_informations_wlock.managed_paths;
190            for delete_file in &deleted_files {
191                managed_paths_write.remove(delete_file);
192            }
193            self.directory.sync_directory()?;
194            save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?;
195        }
196
197        Ok(GarbageCollectionResult {
198            deleted_files,
199            failed_to_delete_files,
200        })
201    }
202
203    /// Registers a file as managed
204    ///
205    /// This method must be called before the file is
206    /// actually created to ensure that a failure between
207    /// registering the filepath and creating the file
208    /// will not lead to garbage files that will
209    /// never get removed.
210    ///
211    /// File starting by "." are reserved to locks.
212    /// They are not managed and cannot be subjected
213    /// to garbage collection.
214    fn register_file_as_managed(&self, filepath: &Path) -> io::Result<()> {
215        // Files starting by "." (e.g. lock files) are not managed.
216        if !is_managed(filepath) {
217            return Ok(());
218        }
219        let mut meta_wlock = self
220            .meta_informations
221            .write()
222            .expect("Managed file lock poisoned");
223        let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned());
224        if !has_changed {
225            return Ok(());
226        }
227        save_managed_paths(self.directory.as_ref(), &meta_wlock)?;
228        // This is not the first file we add.
229        // Therefore, we are sure that `.managed.json` has been already
230        // properly created and we do not need to sync its parent directory.
231        //
232        // (It might seem like a nicer solution to create the managed_json on the
233        // creation of the ManagedDirectory instance but it would actually
234        // prevent the use of read-only directories..)
235        let managed_file_definitely_already_exists = meta_wlock.managed_paths.len() > 1;
236        if managed_file_definitely_already_exists {
237            return Ok(());
238        }
239        self.directory.sync_directory()?;
240        Ok(())
241    }
242
243    /// Verify checksum of a managed file
244    pub fn validate_checksum(&self, path: &Path) -> result::Result<bool, OpenReadError> {
245        let reader = self.directory.open_read(path)?;
246        let (footer, data) = Footer::extract_footer(reader)
247            .map_err(|io_error| OpenReadError::wrap_io_error(io_error, path.to_path_buf()))?;
248        let bytes = data
249            .read_bytes()
250            .map_err(|io_error| OpenReadError::IoError {
251                io_error: Arc::new(io_error),
252                filepath: path.to_path_buf(),
253            })?;
254        let mut hasher = Hasher::new();
255        hasher.update(bytes.as_slice());
256        let crc = hasher.finalize();
257        Ok(footer.crc() == crc)
258    }
259
260    /// List all managed files
261    pub fn list_managed_files(&self) -> HashSet<PathBuf> {
262        let managed_paths = self
263            .meta_informations
264            .read()
265            .expect("Managed directory rlock poisoned in list damaged.")
266            .managed_paths
267            .clone();
268        managed_paths
269    }
270}
271
272#[async_trait]
273impl Directory for ManagedDirectory {
274    fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
275        let file_slice = self.open_read(path)?;
276        Ok(Arc::new(file_slice))
277    }
278
279    fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
280        let file_slice = self.directory.open_read(path)?;
281        let (footer, reader) = Footer::extract_footer(file_slice)
282            .map_err(|io_error| OpenReadError::wrap_io_error(io_error, path.to_path_buf()))?;
283        footer.is_compatible()?;
284        Ok(reader)
285    }
286
287    fn open_write(&self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
288        self.register_file_as_managed(path)
289            .map_err(|io_error| OpenWriteError::wrap_io_error(io_error, path.to_path_buf()))?;
290        Ok(io::BufWriter::new(Box::new(FooterProxy::new(
291            self.directory
292                .open_write(path)?
293                .into_inner()
294                .map_err(|_| ())
295                .expect("buffer should be empty"),
296        ))))
297    }
298
299    fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
300        self.register_file_as_managed(path)?;
301        self.directory.atomic_write(path, data)
302    }
303
304    fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
305        self.directory.atomic_read(path)
306    }
307
308    #[cfg(feature = "quickwit")]
309    async fn atomic_read_async(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
310        self.directory.atomic_read_async(path).await
311    }
312
313    fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
314        self.directory.delete(path)
315    }
316
317    fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
318        self.directory.exists(path)
319    }
320
321    fn acquire_lock(&self, lock: &Lock) -> result::Result<DirectoryLock, LockError> {
322        self.directory.acquire_lock(lock)
323    }
324
325    fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
326        self.directory.watch(watch_callback)
327    }
328
329    fn sync_directory(&self) -> io::Result<()> {
330        self.directory.sync_directory()?;
331        Ok(())
332    }
333}
334
335impl Clone for ManagedDirectory {
336    fn clone(&self) -> ManagedDirectory {
337        ManagedDirectory {
338            directory: self.directory.box_clone(),
339            meta_informations: Arc::clone(&self.meta_informations),
340        }
341    }
342}
343
344#[cfg(feature = "mmap")]
345#[cfg(test)]
346mod tests_mmap_specific {
347
348    use std::collections::HashSet;
349    use std::io::Write;
350    use std::path::{Path, PathBuf};
351
352    use tempfile::TempDir;
353
354    use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite};
355
356    #[test]
357    fn test_managed_directory() {
358        let tempdir = TempDir::new().unwrap();
359        let tempdir_path = PathBuf::from(tempdir.path());
360
361        let test_path1: &'static Path = Path::new("some_path_for_test");
362        let test_path2: &'static Path = Path::new("some_path_for_test_2");
363        {
364            let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
365            let mut managed_directory = ManagedDirectory::wrap(Box::new(mmap_directory)).unwrap();
366            let write_file = managed_directory.open_write(test_path1).unwrap();
367            write_file.terminate().unwrap();
368            managed_directory
369                .atomic_write(test_path2, &[0u8, 1u8])
370                .unwrap();
371            assert!(managed_directory.exists(test_path1).unwrap());
372            assert!(managed_directory.exists(test_path2).unwrap());
373            let living_files: HashSet<PathBuf> = [test_path1.to_owned()].iter().cloned().collect();
374            assert!(managed_directory.garbage_collect(|| living_files).is_ok());
375            assert!(managed_directory.exists(test_path1).unwrap());
376            assert!(!managed_directory.exists(test_path2).unwrap());
377        }
378        {
379            let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
380            let mut managed_directory = ManagedDirectory::wrap(Box::new(mmap_directory)).unwrap();
381            assert!(managed_directory.exists(test_path1).unwrap());
382            assert!(!managed_directory.exists(test_path2).unwrap());
383            let living_files: HashSet<PathBuf> = HashSet::new();
384            assert!(managed_directory.garbage_collect(|| living_files).is_ok());
385            assert!(!managed_directory.exists(test_path1).unwrap());
386            assert!(!managed_directory.exists(test_path2).unwrap());
387        }
388    }
389
390    #[test]
391    fn test_managed_directory_gc_while_mmapped() {
392        let test_path1: &'static Path = Path::new("some_path_for_test");
393
394        let tempdir = TempDir::new().unwrap();
395        let tempdir_path = PathBuf::from(tempdir.path());
396        let living_files = HashSet::new();
397
398        let mmap_directory = MmapDirectory::open(tempdir_path).unwrap();
399        let mut managed_directory = ManagedDirectory::wrap(Box::new(mmap_directory)).unwrap();
400        let mut write = managed_directory.open_write(test_path1).unwrap();
401        write.write_all(&[0u8, 1u8]).unwrap();
402        write.terminate().unwrap();
403        assert!(managed_directory.exists(test_path1).unwrap());
404
405        let _mmap_read = managed_directory.open_read(test_path1).unwrap();
406        assert!(managed_directory
407            .garbage_collect(|| living_files.clone())
408            .is_ok());
409        if cfg!(target_os = "windows") {
410            // On Windows, gc should try and fail the file as it is mmapped.
411            assert!(managed_directory.exists(test_path1).unwrap());
412            // unmap should happen here.
413            drop(_mmap_read);
414            // The file should still be in the list of managed file and
415            // eventually be deleted once mmap is released.
416            assert!(managed_directory.garbage_collect(|| living_files).is_ok());
417        }
418        assert!(!managed_directory.exists(test_path1).unwrap());
419    }
420}