summavy/directory/
managed_directory.rs1use std::collections::HashSet;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, RwLock, RwLockWriteGuard};
5use std::{io, result};
6
7use async_trait::async_trait;
8use crc32fast::Hasher;
9
10use crate::core::MANAGED_FILEPATH;
11use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
12use crate::directory::footer::{Footer, FooterProxy};
13use crate::directory::{
14 DirectoryLock, FileHandle, FileSlice, GarbageCollectionResult, Lock, WatchCallback,
15 WatchHandle, WritePtr, META_LOCK,
16};
17use crate::error::DataCorruption;
18use crate::Directory;
19
20fn is_managed(path: &Path) -> bool {
26 path.to_str()
27 .map(|p_str| !p_str.starts_with('.'))
28 .unwrap_or(true)
29}
30
31#[derive(Debug)]
41pub struct ManagedDirectory {
42 directory: Box<dyn Directory>,
43 meta_informations: Arc<RwLock<MetaInformation>>,
44}
45
46#[derive(Debug, Default)]
47struct MetaInformation {
48 managed_paths: HashSet<PathBuf>,
49}
50
51fn save_managed_paths(
54 directory: &dyn Directory,
55 wlock: &RwLockWriteGuard<'_, MetaInformation>,
56) -> io::Result<()> {
57 let mut w = serde_json::to_vec(&wlock.managed_paths)?;
58 writeln!(&mut w)?;
59 directory.atomic_write(&MANAGED_FILEPATH, &w[..])?;
60 Ok(())
61}
62
63impl ManagedDirectory {
64 pub fn wrap(directory: Box<dyn Directory>) -> crate::Result<ManagedDirectory> {
66 match directory.atomic_read(&MANAGED_FILEPATH) {
67 Ok(data) => {
68 let managed_files_json = String::from_utf8_lossy(&data);
69 let managed_files: HashSet<PathBuf> = serde_json::from_str(&managed_files_json)
70 .map_err(|e| {
71 DataCorruption::new(
72 MANAGED_FILEPATH.to_path_buf(),
73 format!("Managed file cannot be deserialized: {:?}. ", e),
74 )
75 })?;
76 Ok(ManagedDirectory {
77 directory,
78 meta_informations: Arc::new(RwLock::new(MetaInformation {
79 managed_paths: managed_files,
80 })),
81 })
82 }
83 Err(OpenReadError::FileDoesNotExist(_)) => Ok(ManagedDirectory {
84 directory,
85 meta_informations: Arc::default(),
86 }),
87 io_err @ Err(OpenReadError::IoError { .. }) => Err(io_err.err().unwrap().into()),
88 Err(OpenReadError::IncompatibleIndex(incompatibility)) => {
89 Err(crate::TantivyError::IncompatibleIndex(incompatibility))
92 }
93 }
94 }
95
96 pub fn garbage_collect<L: FnOnce() -> HashSet<PathBuf>>(
111 &mut self,
112 get_living_files: L,
113 ) -> crate::Result<GarbageCollectionResult> {
114 info!("Garbage collect");
115 let mut files_to_delete = vec![];
116
117 {
128 let meta_informations_rlock = self
129 .meta_informations
130 .read()
131 .expect("Managed directory rlock poisoned in garbage collect.");
132
133 match self.acquire_lock(&META_LOCK) {
140 Ok(_meta_lock) => {
141 let living_files = get_living_files();
142 for managed_path in &meta_informations_rlock.managed_paths {
143 if !living_files.contains(managed_path) {
144 files_to_delete.push(managed_path.clone());
145 }
146 }
147 }
148 Err(err) => {
149 error!("Failed to acquire lock for GC");
150 return Err(crate::TantivyError::from(err));
151 }
152 }
153 }
154
155 let mut failed_to_delete_files = vec![];
156 let mut deleted_files = vec![];
157
158 for file_to_delete in files_to_delete {
159 match self.delete(&file_to_delete) {
160 Ok(_) => {
161 info!("Deleted {:?}", file_to_delete);
162 deleted_files.push(file_to_delete);
163 }
164 Err(file_error) => {
165 match file_error {
166 DeleteError::FileDoesNotExist(_) => {
167 deleted_files.push(file_to_delete.clone());
168 }
169 DeleteError::IoError { .. } => {
170 failed_to_delete_files.push(file_to_delete.clone());
171 if !cfg!(target_os = "windows") {
172 error!("Failed to delete {:?}", file_to_delete);
175 }
176 }
177 }
178 }
179 }
180 }
181
182 if !deleted_files.is_empty() {
183 let mut meta_informations_wlock = self
186 .meta_informations
187 .write()
188 .expect("Managed directory wlock poisoned (2).");
189 let managed_paths_write = &mut meta_informations_wlock.managed_paths;
190 for delete_file in &deleted_files {
191 managed_paths_write.remove(delete_file);
192 }
193 self.directory.sync_directory()?;
194 save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?;
195 }
196
197 Ok(GarbageCollectionResult {
198 deleted_files,
199 failed_to_delete_files,
200 })
201 }
202
203 fn register_file_as_managed(&self, filepath: &Path) -> io::Result<()> {
215 if !is_managed(filepath) {
217 return Ok(());
218 }
219 let mut meta_wlock = self
220 .meta_informations
221 .write()
222 .expect("Managed file lock poisoned");
223 let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned());
224 if !has_changed {
225 return Ok(());
226 }
227 save_managed_paths(self.directory.as_ref(), &meta_wlock)?;
228 let managed_file_definitely_already_exists = meta_wlock.managed_paths.len() > 1;
236 if managed_file_definitely_already_exists {
237 return Ok(());
238 }
239 self.directory.sync_directory()?;
240 Ok(())
241 }
242
243 pub fn validate_checksum(&self, path: &Path) -> result::Result<bool, OpenReadError> {
245 let reader = self.directory.open_read(path)?;
246 let (footer, data) = Footer::extract_footer(reader)
247 .map_err(|io_error| OpenReadError::wrap_io_error(io_error, path.to_path_buf()))?;
248 let bytes = data
249 .read_bytes()
250 .map_err(|io_error| OpenReadError::IoError {
251 io_error: Arc::new(io_error),
252 filepath: path.to_path_buf(),
253 })?;
254 let mut hasher = Hasher::new();
255 hasher.update(bytes.as_slice());
256 let crc = hasher.finalize();
257 Ok(footer.crc() == crc)
258 }
259
260 pub fn list_managed_files(&self) -> HashSet<PathBuf> {
262 let managed_paths = self
263 .meta_informations
264 .read()
265 .expect("Managed directory rlock poisoned in list damaged.")
266 .managed_paths
267 .clone();
268 managed_paths
269 }
270}
271
272#[async_trait]
273impl Directory for ManagedDirectory {
274 fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
275 let file_slice = self.open_read(path)?;
276 Ok(Arc::new(file_slice))
277 }
278
279 fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
280 let file_slice = self.directory.open_read(path)?;
281 let (footer, reader) = Footer::extract_footer(file_slice)
282 .map_err(|io_error| OpenReadError::wrap_io_error(io_error, path.to_path_buf()))?;
283 footer.is_compatible()?;
284 Ok(reader)
285 }
286
287 fn open_write(&self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
288 self.register_file_as_managed(path)
289 .map_err(|io_error| OpenWriteError::wrap_io_error(io_error, path.to_path_buf()))?;
290 Ok(io::BufWriter::new(Box::new(FooterProxy::new(
291 self.directory
292 .open_write(path)?
293 .into_inner()
294 .map_err(|_| ())
295 .expect("buffer should be empty"),
296 ))))
297 }
298
299 fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
300 self.register_file_as_managed(path)?;
301 self.directory.atomic_write(path, data)
302 }
303
304 fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
305 self.directory.atomic_read(path)
306 }
307
308 #[cfg(feature = "quickwit")]
309 async fn atomic_read_async(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
310 self.directory.atomic_read_async(path).await
311 }
312
313 fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
314 self.directory.delete(path)
315 }
316
317 fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
318 self.directory.exists(path)
319 }
320
321 fn acquire_lock(&self, lock: &Lock) -> result::Result<DirectoryLock, LockError> {
322 self.directory.acquire_lock(lock)
323 }
324
325 fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
326 self.directory.watch(watch_callback)
327 }
328
329 fn sync_directory(&self) -> io::Result<()> {
330 self.directory.sync_directory()?;
331 Ok(())
332 }
333}
334
335impl Clone for ManagedDirectory {
336 fn clone(&self) -> ManagedDirectory {
337 ManagedDirectory {
338 directory: self.directory.box_clone(),
339 meta_informations: Arc::clone(&self.meta_informations),
340 }
341 }
342}
343
344#[cfg(feature = "mmap")]
345#[cfg(test)]
346mod tests_mmap_specific {
347
348 use std::collections::HashSet;
349 use std::io::Write;
350 use std::path::{Path, PathBuf};
351
352 use tempfile::TempDir;
353
354 use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite};
355
356 #[test]
357 fn test_managed_directory() {
358 let tempdir = TempDir::new().unwrap();
359 let tempdir_path = PathBuf::from(tempdir.path());
360
361 let test_path1: &'static Path = Path::new("some_path_for_test");
362 let test_path2: &'static Path = Path::new("some_path_for_test_2");
363 {
364 let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
365 let mut managed_directory = ManagedDirectory::wrap(Box::new(mmap_directory)).unwrap();
366 let write_file = managed_directory.open_write(test_path1).unwrap();
367 write_file.terminate().unwrap();
368 managed_directory
369 .atomic_write(test_path2, &[0u8, 1u8])
370 .unwrap();
371 assert!(managed_directory.exists(test_path1).unwrap());
372 assert!(managed_directory.exists(test_path2).unwrap());
373 let living_files: HashSet<PathBuf> = [test_path1.to_owned()].iter().cloned().collect();
374 assert!(managed_directory.garbage_collect(|| living_files).is_ok());
375 assert!(managed_directory.exists(test_path1).unwrap());
376 assert!(!managed_directory.exists(test_path2).unwrap());
377 }
378 {
379 let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
380 let mut managed_directory = ManagedDirectory::wrap(Box::new(mmap_directory)).unwrap();
381 assert!(managed_directory.exists(test_path1).unwrap());
382 assert!(!managed_directory.exists(test_path2).unwrap());
383 let living_files: HashSet<PathBuf> = HashSet::new();
384 assert!(managed_directory.garbage_collect(|| living_files).is_ok());
385 assert!(!managed_directory.exists(test_path1).unwrap());
386 assert!(!managed_directory.exists(test_path2).unwrap());
387 }
388 }
389
390 #[test]
391 fn test_managed_directory_gc_while_mmapped() {
392 let test_path1: &'static Path = Path::new("some_path_for_test");
393
394 let tempdir = TempDir::new().unwrap();
395 let tempdir_path = PathBuf::from(tempdir.path());
396 let living_files = HashSet::new();
397
398 let mmap_directory = MmapDirectory::open(tempdir_path).unwrap();
399 let mut managed_directory = ManagedDirectory::wrap(Box::new(mmap_directory)).unwrap();
400 let mut write = managed_directory.open_write(test_path1).unwrap();
401 write.write_all(&[0u8, 1u8]).unwrap();
402 write.terminate().unwrap();
403 assert!(managed_directory.exists(test_path1).unwrap());
404
405 let _mmap_read = managed_directory.open_read(test_path1).unwrap();
406 assert!(managed_directory
407 .garbage_collect(|| living_files.clone())
408 .is_ok());
409 if cfg!(target_os = "windows") {
410 assert!(managed_directory.exists(test_path1).unwrap());
412 drop(_mmap_read);
414 assert!(managed_directory.garbage_collect(|| living_files).is_ok());
417 }
418 assert!(!managed_directory.exists(test_path1).unwrap());
419 }
420}