1use std::collections::{HashMap, HashSet};
25use std::default::Default;
26use std::ffi::OsStr;
27use std::fmt::Debug;
28use std::sync::OnceLock;
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::time::Duration;
31
32use async_trait::async_trait;
33use futures::{FutureExt, select_biased};
34use mountpoint_s3_client::ObjectClient;
35use mountpoint_s3_client::error::{HeadObjectError, ObjectClientError, RenameObjectError};
36use mountpoint_s3_client::types::{
37 ETag, HeadObjectParams, HeadObjectResult, RenameObjectParams, RenamePreconditionTypes,
38};
39use thiserror::Error;
40use time::OffsetDateTime;
41use tracing::{debug, error, trace, warn};
42
43use crate::fs::{CacheConfig, FUSE_ROOT_INODE, OpenFlags};
44use crate::logging;
45use crate::metablock::{
46 AddDirEntry, AddDirEntryResult, InodeError, InodeInformation, InodeKind, InodeNo, InodeStat, Lookup, Metablock,
47 NewHandle, PendingUploadHook, ReadWriteMode, S3Location, ValidKey, ValidName, WriteMode,
48};
49use crate::s3::{S3Path, S3Personality};
50use crate::sync::{Arc, RwLock};
51
52mod handles_map;
53use handles_map::{InodeHandleMap, SetWriterError};
54
55mod inode;
56pub use inode::{Inode, InodeKindData, InodeLockedForWriting, InodeState, WriteStatus};
57
58mod negative_cache;
59use negative_cache::NegativeCache;
60mod readdir;
61pub use readdir::ReaddirHandle;
62use readdir::{DirHandle, DirectoryEntryReaddir};
63
64#[derive(Debug)]
66pub struct Superblock<OC: ObjectClient + Send + Sync> {
67 inner: Arc<SuperblockInner<OC>>,
68}
69
70#[derive(Debug)]
76struct RenameCache {
77 rename_supported: std::sync::OnceLock<bool>,
79}
80
81impl RenameCache {
82 fn new() -> Self {
83 RenameCache {
84 rename_supported: OnceLock::new(),
85 }
86 }
87
88 fn should_try_rename(&self) -> bool {
92 self.rename_supported.get().is_none_or(|&cached| cached)
93 }
94
95 fn cache_failure(&self) {
97 if let Ok(()) = self.rename_supported.set(false) {
98 debug!("cached rename failure for the first time, disabling future renames");
99 }
100 }
101
102 fn cache_success(&self) {
104 let _ = self.rename_supported.set(true);
105 }
106}
107
108#[derive(Debug)]
109struct SuperblockInner<OC: ObjectClient + Send + Sync> {
110 s3_path: Arc<S3Path>,
111 inodes: RwLock<InodeMap>,
112 open_handles: InodeHandleMap,
113 negative_cache: NegativeCache,
114 cached_rename_support: RenameCache,
115 next_ino: AtomicU64,
116 mount_time: OffsetDateTime,
117 config: SuperblockConfig,
118 client: OC,
119 dir_handles: RwLock<HashMap<u64, Arc<DirHandle>>>,
120 next_dir_handle_id: AtomicU64,
121}
122
123#[derive(Debug, Clone, Default)]
125pub struct SuperblockConfig {
126 pub cache_config: CacheConfig,
127 pub s3_personality: S3Personality,
128}
129
130pub struct PendingRenameGuard<'a> {
132 pub inode: Option<&'a Inode>,
133}
134
135impl<'a> PendingRenameGuard<'a> {
136 fn try_transition(inode: &'a Inode) -> Result<Self, InodeError> {
138 let mut locked = inode.get_mut_inode_state()?;
139 match locked.write_status {
140 WriteStatus::LocalUnopened | WriteStatus::LocalOpenForWriting | WriteStatus::PendingRename => {
141 return Err(InodeError::RenameNotPermittedWhileWriting(inode.err()));
142 }
143 WriteStatus::Remote => {} }
145
146 locked.write_status = WriteStatus::PendingRename;
147 drop(locked);
148 Ok(PendingRenameGuard { inode: Some(inode) })
149 }
150
151 fn confirm(&mut self) {
153 self.inode = None;
154 }
155}
156
157impl Drop for PendingRenameGuard<'_> {
159 fn drop(&mut self) {
160 if let Some(inode) = self.inode {
161 let mut inode_locked = inode.get_mut_inode_state().unwrap();
162 if inode_locked.write_status == WriteStatus::PendingRename {
163 inode_locked.write_status = WriteStatus::Remote;
164 }
165 }
166 }
167}
168pub struct RenameLockGuard<'a> {
170 src_parent_lock: InodeLockedForWriting<'a>,
171 dst_parent_lock: Option<InodeLockedForWriting<'a>>,
172}
173
174impl<'a> RenameLockGuard<'a> {
176 fn new<OC: ObjectClient + Send + Sync>(
177 source_parent: &'a Inode,
178 dest_parent: &'a Inode,
179 superblock_inner: &'a SuperblockInner<OC>,
180 ) -> Result<Self, InodeError> {
181 let src_ino = source_parent.ino();
182 let dst_ino = dest_parent.ino();
183
184 if src_ino == dst_ino {
185 return Ok(RenameLockGuard {
186 src_parent_lock: source_parent.get_mut_inode_state()?,
187 dst_parent_lock: None,
188 });
189 }
190
191 let dst_parent_lock: Option<InodeLockedForWriting<'a>>;
192 let src_parent_lock: InodeLockedForWriting<'a>;
193 let ancestor_check = Self::dst_is_ancestor(&superblock_inner.inodes.read().unwrap(), source_parent, dst_ino);
195 if ancestor_check || src_ino > dst_ino {
196 dst_parent_lock = Some(dest_parent.get_mut_inode_state()?);
197 src_parent_lock = source_parent.get_mut_inode_state()?;
198 } else {
199 src_parent_lock = source_parent.get_mut_inode_state()?;
200 dst_parent_lock = Some(dest_parent.get_mut_inode_state()?);
201 }
202 Ok(RenameLockGuard {
203 src_parent_lock,
204 dst_parent_lock,
205 })
206 }
207
208 fn dst_is_ancestor(inodes: &InodeMap, start: &Inode, dst: InodeNo) -> bool {
209 std::iter::successors(Some(start), |current| inodes.get_inode(¤t.parent()))
210 .take_while(|¤t| current.ino() != FUSE_ROOT_INODE)
211 .any(|current| current.ino() == dst)
212 }
213
214 fn source_parent_mut(&mut self) -> &mut InodeState {
215 &mut self.src_parent_lock
216 }
217
218 fn destination_parent_mut(&mut self) -> &mut InodeState {
219 self.dst_parent_lock.as_mut().unwrap_or(&mut self.src_parent_lock)
220 }
221}
222
223impl<OC: ObjectClient + Send + Sync> Superblock<OC> {
224 pub fn new(client: OC, s3_path: S3Path, config: SuperblockConfig) -> Self {
226 let mount_time = OffsetDateTime::now_utc();
227 let root = Inode::new_root(&s3_path.prefix, mount_time);
228
229 let mut inodes = InodeMap::default();
230 inodes.insert(root.ino(), root, 1);
231
232 let negative_cache = NegativeCache::new(
233 config.cache_config.negative_cache_size,
234 config.cache_config.negative_cache_ttl,
235 );
236
237 let inner = SuperblockInner {
238 s3_path: Arc::new(s3_path),
239 inodes: RwLock::new(inodes),
240 open_handles: Default::default(),
241 negative_cache,
242 next_ino: AtomicU64::new(2),
243 mount_time,
244 config,
245 cached_rename_support: RenameCache::new(),
246 client,
247 next_dir_handle_id: AtomicU64::new(1),
248 dir_handles: Default::default(),
249 };
250 Self { inner: Arc::new(inner) }
251 }
252
253 #[cfg(test)]
254 fn get_lookup_count(&self, ino: InodeNo) -> u64 {
255 let inode_read = self.inner.inodes.read().unwrap();
256 inode_read.get_count(&ino).unwrap_or(0)
257 }
258
259 #[cfg(test)]
260 fn get_write_status(&self, ino: InodeNo) -> Option<WriteStatus> {
261 let inode_read = self.inner.inodes.read().unwrap();
262
263 inode_read
264 .get_inode(&ino)
265 .and_then(|inode| inode.get_inode_state().ok())
266 .map(|state| state.write_status)
267 }
268
269 async fn getattr_with_inode(
274 &self,
275 ino: InodeNo,
276 force_revalidate_if_remote: bool,
277 ) -> Result<LookedUpInode, InodeError> {
278 let inode = self.inner.get(ino)?;
279 logging::record_name(inode.name());
280
281 {
282 let mut sync = inode.get_mut_inode_state()?;
283 let is_remote = sync.write_status == WriteStatus::Remote;
284
285 if !is_remote || !force_revalidate_if_remote {
286 if !is_remote {
288 let validity = match inode.kind() {
289 InodeKind::File => self.inner.config.cache_config.file_ttl,
290 InodeKind::Directory => self.inner.config.cache_config.dir_ttl,
291 };
292 sync.stat.update_validity(validity);
293 }
294 if sync.stat.is_valid() {
295 let stat = sync.stat.clone();
296 let write_status = sync.write_status;
297 drop(sync);
298 return Ok(LookedUpInode {
299 inode,
300 stat,
301 path: self.inner.s3_path.clone(),
302 write_status,
303 });
304 }
305 }
306 };
307
308 let lookup = self
309 .inner
310 .lookup_by_name(inode.parent(), inode.name().as_ref(), false)
311 .await?;
312 if lookup.inode.ino() != ino {
313 Err(InodeError::StaleInode {
314 remote_key: self.inner.full_key_for_inode(&lookup.inode).into(),
315 old_inode: inode.err(),
316 new_inode: lookup.inode.err(),
317 })
318 } else {
319 Ok(lookup)
320 }
321 }
322
323 async fn new_readdir_handle_with_pagesize(&self, dir_ino: InodeNo, page_size: usize) -> Result<u64, InodeError> {
324 trace!(dir=?dir_ino, "readdir");
325
326 let dir = self.inner.get(dir_ino)?;
327 logging::record_name(dir.name());
328 if dir.kind() != InodeKind::Directory {
329 return Err(InodeError::NotADirectory(dir.err()));
330 }
331 let parent_ino = dir.parent();
332
333 let dir_key = self.inner.full_key_for_inode(&dir);
334 assert_eq!(dir_key.kind(), InodeKind::Directory);
335 let handle = ReaddirHandle::new(&self.inner, dir_ino, parent_ino, dir_key.into(), page_size)?;
336 let handle_id = self.inner.next_dir_handle_id.fetch_add(1, Ordering::SeqCst);
337 let dirhandle = DirHandle::new(handle.parent(), handle);
338 self.inner
339 .dir_handles
340 .write()
341 .unwrap()
342 .insert(handle_id, Arc::new(dirhandle));
343 trace!("Added handle with id: {}", handle_id);
344 Ok(handle_id)
345 }
346
347 fn start_reading(
351 &self,
352 locked_inode: &mut InodeLockedForWriting,
353 inode: Inode,
354 fh: u64,
355 ) -> Result<Option<PendingUploadHook>, InodeError> {
356 match locked_inode.write_status {
357 WriteStatus::LocalUnopened => Err(InodeError::InodeNotReadableWhileWriting(inode.err())),
358 WriteStatus::LocalOpenForWriting | WriteStatus::PendingRename | WriteStatus::Remote => {
359 if !self.inner.open_handles.try_add_reader(locked_inode, fh) {
360 return Err(InodeError::InodeNotReadableWhileWriting(inode.err()));
361 }
362 if matches!(locked_inode.write_status, WriteStatus::LocalOpenForWriting) {
363 locked_inode.write_status = WriteStatus::Remote;
364 }
365 Ok(locked_inode.pending_upload_hook.clone())
366 }
367 }
368 }
369
370 fn start_writing(
374 &self,
375 locked_inode: &mut InodeLockedForWriting,
376 inode: Inode,
377 mode: &WriteMode,
378 is_truncate: bool,
379 handle_id: u64,
380 ) -> Result<Option<PendingUploadHook>, InodeError> {
381 let setup_inode_for_writing = |locked_inode: &mut InodeLockedForWriting| -> Result<(), InodeError> {
382 self.inner
383 .open_handles
384 .set_writer(locked_inode, handle_id)
385 .map_err(|e| match e {
386 SetWriterError::ActiveWriter => InodeError::InodeAlreadyWriting(inode.err()),
387 SetWriterError::ActiveReaders => InodeError::InodeNotWritableWhileReading(inode.err()),
388 })?;
389 locked_inode.write_status = WriteStatus::LocalOpenForWriting;
390 Ok(())
391 };
392 match locked_inode.write_status {
393 WriteStatus::LocalUnopened => {
394 setup_inode_for_writing(locked_inode)?;
395 locked_inode.stat.size = 0;
396 Ok(None)
397 }
398 WriteStatus::PendingRename => Err(InodeError::InodeAlreadyWriting(inode.err())),
399 WriteStatus::LocalOpenForWriting | WriteStatus::Remote => {
400 if !mode.is_inode_writable(is_truncate) {
401 return Err(InodeError::InodeNotWritable(inode.err()));
402 }
403 setup_inode_for_writing(locked_inode)?;
404 if is_truncate {
405 locked_inode.stat.size = 0;
406 }
407 Ok(locked_inode.pending_upload_hook.clone())
408 }
409 }
410 }
411}
412
413#[async_trait]
414impl<OC: ObjectClient + Send + Sync + Clone> Metablock for Superblock<OC> {
415 async fn lookup(&self, parent_ino: InodeNo, name: &OsStr) -> Result<Lookup, InodeError> {
417 trace!(parent=?parent_ino, ?name, "lookup");
418 let lookup = self
419 .inner
420 .lookup_by_name(parent_ino, name, self.inner.config.cache_config.serve_lookup_from_cache)
421 .await?;
422 self.inner.remember(&lookup.inode);
423 Ok(lookup.into())
424 }
425
426 async fn getattr(&self, ino: InodeNo, force_revalidate_if_remote: bool) -> Result<Lookup, InodeError> {
427 self.getattr_with_inode(ino, force_revalidate_if_remote)
428 .await
429 .map(|lookup| lookup.into())
430 }
431
432 async fn rename(
437 &self,
438 src_parent_ino: InodeNo,
439 src_name: &OsStr,
440 dst_parent_ino: InodeNo,
441 dst_name: &OsStr,
442 allow_overwrite: bool,
443 ) -> Result<(), InodeError> {
444 if !self.inner.cached_rename_support.should_try_rename() {
446 trace!("Cached rename failure, returning NotSupported");
447 return Err(InodeError::RenameNotSupported());
448 }
449 let src_parent = self.inner.get(src_parent_ino)?;
450 let dst_parent = self.inner.get(dst_parent_ino)?;
451 let src_inode = self
452 .inner
453 .lookup_by_name(
454 src_parent_ino,
455 src_name,
456 self.inner.config.cache_config.serve_lookup_from_cache,
457 )
458 .await?
459 .inode;
460 if src_inode.kind() == InodeKind::Directory {
461 return Err(InodeError::CannotRenameDirectory(src_inode.err()));
462 }
463 let mut src_status_guard = PendingRenameGuard::try_transition(&src_inode)?;
465
466 let dest_inode = self
467 .inner
468 .lookup_by_name(
469 dst_parent_ino,
470 dst_name,
471 self.inner.config.cache_config.serve_lookup_from_cache,
472 )
473 .await
474 .ok()
475 .map(|looked_up| looked_up.inode);
476 let dest_status_guard = dest_inode
478 .as_ref()
479 .map(|inode| {
480 if !allow_overwrite {
481 return Err(InodeError::RenameDestinationExists {
482 dest_key: self.inner.full_key_for_inode(inode).to_string(),
483 src_inode: src_inode.err(),
484 });
485 }
486
487 PendingRenameGuard::try_transition(inode)
488 })
489 .transpose()?;
490
491 let src_key = self.inner.full_key_for_inode(&src_inode);
492 let dest_name = ValidName::parse_os_str(dst_name)?;
493 let dest_full_valid_name = dst_parent
494 .valid_key()
495 .new_child(dest_name, InodeKind::File)
496 .map_err(|_| InodeError::NotADirectory(dst_parent.err()))?;
497 let dest_key: String = format!("{}{}", self.inner.s3_path.prefix, dest_full_valid_name.as_ref());
498 debug!(?src_key, ?dest_key, "rename on remote file will now be actioned");
499 let rename_params = if allow_overwrite {
501 RenameObjectParams::new()
502 } else {
503 RenameObjectParams::new().if_none_match(Some("*".to_string()))
504 };
505
506 let rename_object_result = self
507 .inner
508 .client
509 .rename_object(&self.inner.s3_path.bucket, src_key.as_ref(), &dest_key, &rename_params)
510 .await;
511
512 match rename_object_result {
513 Ok(_res) => {
514 debug!(?src_key, ?dest_key, "RenameObject succeeded");
515 }
516 Err(error) => {
517 debug!(?src_key, ?dest_key, ?error, "RenameObject failed");
518
519 return match error {
520 ObjectClientError::ServiceError(RenameObjectError::PreConditionFailed(
521 RenamePreconditionTypes::IfNoneMatch,
522 )) => Err(InodeError::RenameDestinationExists {
523 dest_key,
524 src_inode: src_inode.err(),
525 }),
526 ObjectClientError::ServiceError(RenameObjectError::KeyNotFound) => {
527 Err(InodeError::InodeDoesNotExist(src_inode.ino()))
528 }
529 ObjectClientError::ServiceError(RenameObjectError::KeyTooLong) => {
530 Err(InodeError::NameTooLong(dest_key))
531 }
532 ObjectClientError::ServiceError(RenameObjectError::NotImplementedError) => {
533 self.inner.cached_rename_support.cache_failure();
536 Err(InodeError::RenameNotSupported())
537 }
538 _ => Err(InodeError::client_error(
539 error,
540 "RenameObject failed",
541 &self.inner.s3_path.bucket,
542 src_key.as_ref(),
543 )),
544 };
545 }
546 };
547
548 self.inner.cached_rename_support.cache_success();
549 self.inner.negative_cache.remove(
551 dst_parent.ino(),
552 dst_name
553 .to_str()
554 .ok_or_else(|| InodeError::InvalidFileName(dst_name.to_owned()))?,
555 );
556 let mut rename_guard = RenameLockGuard::new(&src_parent, &dst_parent, &self.inner)?;
558 {
560 let source_state = rename_guard.source_parent_mut();
561
562 match &mut source_state.kind_data {
563 InodeKindData::File { .. } => {
564 debug_assert!(false, "inodes never change kind");
565 return Err(InodeError::NotADirectory(src_parent.err()));
566 }
567 InodeKindData::Directory { children, .. } => {
568 if let Some((_name, child)) = children.remove_entry(src_inode.name()) {
569 debug_assert_eq!(src_inode.ino(), child.ino(), "inode should have stayed identical");
571 }
572 }
573 }
574 }
575 {
577 let dst_state = rename_guard.destination_parent_mut();
578 match &mut dst_state.kind_data {
579 InodeKindData::File { .. } => {
580 debug_assert!(false, "inodes never change kind");
581 return Err(InodeError::NotADirectory(src_parent.err()));
582 }
583 InodeKindData::Directory { children, .. } => {
584 let dst_name_as_str: Box<str> = dest_name.as_ref().into();
585 let new_inode = src_inode.try_clone_with_new_key(
586 dest_full_valid_name,
587 &self.inner.s3_path.prefix,
588 self.inner.config.cache_config.file_ttl,
589 dst_parent_ino,
590 )?;
591
592 let concurrent_modification_detected =
594 if let Some((_name, old_inode)) = children.remove_entry(dst_name_as_str.as_ref()) {
595 dest_inode
596 .as_ref()
597 .map(|inode| inode.ino() != old_inode.ino())
598 .unwrap_or(true)
599 } else {
600 dest_inode.is_some()
601 };
602
603 if concurrent_modification_detected {
604 warn!(
605 src_ino = src_inode.ino(),
606 "concurrent modification detected during rename of inode {}, dest parent state changed unexpectedly which may cause unexpected behavior",
607 src_inode.err(),
608 );
609 }
610
611 children.insert(dst_name_as_str, new_inode.clone());
612 let mut inodes_write = self.inner.inodes.write().unwrap();
613 inodes_write.replace_or_insert(new_inode.ino(), &new_inode);
614 }
615 }
616 }
617 debug!("Rename completed in superblock");
618 src_status_guard.confirm();
619 if let Some(mut guard) = dest_status_guard {
620 guard.confirm()
621 }
622 Ok(())
623 }
624
625 async fn rmdir(&self, parent_ino: InodeNo, name: &OsStr) -> Result<(), InodeError> {
628 let LookedUpInode { inode, .. } = self
629 .inner
630 .lookup_by_name(parent_ino, name, self.inner.config.cache_config.serve_lookup_from_cache)
631 .await?;
632
633 if inode.kind() == InodeKind::File {
634 return Err(InodeError::NotADirectory(inode.err()));
635 }
636
637 let parent = self.inner.get(parent_ino)?;
638 let mut parent_state = parent.get_mut_inode_state()?;
639 let mut inode_state = inode.get_mut_inode_state()?;
640
641 match &inode_state.write_status {
642 WriteStatus::LocalOpenForWriting => unreachable!("A directory cannot be in LocalOpenForWriting state"),
643 WriteStatus::Remote => {
644 return Err(InodeError::CannotRemoveRemoteDirectory(inode.err()));
645 }
646 WriteStatus::LocalUnopened => match &mut inode_state.kind_data {
647 InodeKindData::File {} => unreachable!("Already checked that inode is a directory"),
648 InodeKindData::Directory {
649 writing_children,
650 deleted,
651 ..
652 } => {
653 if !writing_children.is_empty() {
654 return Err(InodeError::DirectoryNotEmpty(inode.err()));
655 }
656 *deleted = true;
657 }
658 },
659 WriteStatus::PendingRename => unreachable!("Only files can be in PendingRename"),
660 }
661
662 match &mut parent_state.kind_data {
663 InodeKindData::File {} => {
664 debug_assert!(false, "inodes never change kind");
665 return Err(InodeError::NotADirectory(parent.err()));
666 }
667 InodeKindData::Directory {
668 children,
669 writing_children,
670 ..
671 } => {
672 let removed = writing_children.remove(&inode.ino());
673 debug_assert!(
674 removed,
675 "should be able to remove the directory from its parents writing children as it was local"
676 );
677 children.remove(inode.name());
678 }
679 }
680
681 Ok(())
682 }
683
684 async fn unlink(&self, parent_ino: InodeNo, name: &OsStr) -> Result<(), InodeError> {
688 let parent = self.inner.get(parent_ino)?;
689 let LookedUpInode { inode, .. } = self
690 .inner
691 .lookup_by_name(parent_ino, name, self.inner.config.cache_config.serve_lookup_from_cache)
692 .await?;
693
694 if inode.kind() == InodeKind::Directory {
695 return Err(InodeError::IsDirectory(inode.err()));
696 }
697
698 let write_status = {
699 let inode_state = inode.get_inode_state()?;
700 inode_state.write_status
701 };
702
703 match write_status {
704 WriteStatus::LocalUnopened | WriteStatus::LocalOpenForWriting | WriteStatus::PendingRename => {
705 warn!(
707 parent = parent_ino,
708 ?name,
709 "unlink on local file not allowed until write is complete",
710 );
711 return Err(InodeError::UnlinkNotPermittedWhileWriting(inode.err()));
712 }
713 WriteStatus::Remote => {
714 let bucket = &self.inner.s3_path.bucket;
715 let s3_key = self.inner.full_key_for_inode(&inode);
716 debug!(parent=?parent_ino, ?name, "unlink on remote file will delete key {}", s3_key);
717 let delete_obj_result = self.inner.client.delete_object(bucket, &s3_key).await;
718
719 match delete_obj_result {
720 Ok(_res) => (),
721 Err(e) => {
722 error!(
723 inode=%inode.err(),
724 error=?e,
725 "DeleteObject failed for unlink",
726 );
727 Err(InodeError::client_error(e, "DeleteObject failed", bucket, &s3_key))?;
728 }
729 };
730 }
731 }
732
733 let mut parent_state = parent.get_mut_inode_state()?;
737 match &mut parent_state.kind_data {
738 InodeKindData::File { .. } => {
739 debug_assert!(false, "inodes never change kind");
740 return Err(InodeError::NotADirectory(parent.err()));
741 }
742 InodeKindData::Directory { children, .. } => {
743 if let Some(existing_inode) = children.get(inode.name()) {
747 if existing_inode.ino() == inode.ino() {
748 children.remove(inode.name());
749 } else {
750 let mut state = existing_inode.get_mut_inode_state_no_check();
752
753 state.stat.update_validity(Duration::from_secs(0));
755 }
756 } else {
757 debug!("parent did not contain child after deletion during unlink");
758 }
759 }
760 };
761
762 Ok(())
763 }
764
765 async fn setattr(
766 &self,
767 ino: InodeNo,
768 atime: Option<OffsetDateTime>,
769 mtime: Option<OffsetDateTime>,
770 ) -> Result<Lookup, InodeError> {
771 let inode = self.inner.get(ino)?;
772 logging::record_name(inode.name());
773 let mut sync = inode.get_mut_inode_state()?;
774
775 if sync.write_status == WriteStatus::Remote {
776 return Err(InodeError::SetAttrNotPermittedOnRemoteInode(inode.err()));
777 }
778
779 let validity = match inode.kind() {
780 InodeKind::File => self.inner.config.cache_config.file_ttl,
781 InodeKind::Directory => self.inner.config.cache_config.dir_ttl,
782 };
783
784 sync.stat.update_validity(validity);
786
787 if let Some(t) = atime {
788 sync.stat.atime = t;
789 }
790 if let Some(t) = mtime {
791 sync.stat.mtime = t;
792 };
793
794 let stat = sync.stat.clone();
795 drop(sync);
796 Ok(Lookup::new(
797 inode.ino(),
798 stat,
799 inode.kind(),
800 Some(S3Location::new(self.inner.s3_path.clone(), inode.valid_key().clone())),
801 ))
802 }
803
804 async fn open_handle(
812 &self,
813 ino: InodeNo,
814 fh: u64,
815 write_mode: &WriteMode,
816 flags: OpenFlags,
817 ) -> Result<NewHandle, InodeError> {
818 let force_revalidate_if_remote = !self.inner.config.cache_config.serve_lookup_from_cache || flags.direct_io();
819 let looked_up_inode = self.getattr_with_inode(ino, force_revalidate_if_remote).await?;
820 match looked_up_inode.inode.kind() {
821 InodeKind::Directory => return Err(InodeError::IsDirectory(looked_up_inode.inode.err())),
822 InodeKind::File => (),
823 }
824
825 let mode = if flags.contains(OpenFlags::O_RDWR) {
826 if looked_up_inode.write_status == WriteStatus::LocalUnopened
827 || (write_mode.allow_overwrite && flags.contains(OpenFlags::O_TRUNC))
828 || (write_mode.incremental_upload && flags.contains(OpenFlags::O_APPEND))
829 {
830 debug!("open choosing write handle for O_RDWR");
833 ReadWriteMode::Write
834 } else {
835 debug!("open choosing read handle for O_RDWR");
837 ReadWriteMode::Read
838 }
839 } else if flags.contains(OpenFlags::O_WRONLY) {
840 ReadWriteMode::Write
841 } else {
842 ReadWriteMode::Read
843 };
844
845 if matches!(mode, ReadWriteMode::Read) && !looked_up_inode.stat.is_readable {
846 return Err(InodeError::FlexibleRetrievalObjectNotAccessible(
847 looked_up_inode.inode.err(),
848 ));
849 }
850
851 let (pending_upload_hook, inode_lookup) = {
852 let inode = looked_up_inode.inode.clone();
853 let mut locked_inode = looked_up_inode.inode.get_mut_inode_state()?;
854
855 let pending_upload_hook = match mode {
856 ReadWriteMode::Read => self.start_reading(&mut locked_inode, inode.clone(), fh)?,
857 ReadWriteMode::Write => {
858 let is_truncate = flags.contains(OpenFlags::O_TRUNC);
859 self.start_writing(&mut locked_inode, inode.clone(), write_mode, is_truncate, fh)?
860 }
861 };
862
863 let inode_lookup = Lookup::new(
864 inode.ino(),
865 locked_inode.stat.clone(),
866 inode.kind(),
867 Some(S3Location::new(self.inner.s3_path.clone(), inode.valid_key().clone())),
868 );
869
870 (pending_upload_hook, inode_lookup)
871 };
872
873 let lookup = if let Some(upload_hook) = pending_upload_hook
874 && let Some(lookup_after_upload) = upload_hook.wait_for_completion().await?
875 {
876 lookup_after_upload
877 } else {
878 inode_lookup
879 };
880
881 Ok(NewHandle { lookup, mode })
882 }
883
884 async fn inc_file_size(&self, ino: InodeNo, len: usize) -> Result<usize, InodeError> {
885 let inode = self.inner.get(ino)?;
886 let mut state = inode.get_mut_inode_state()?;
887 if !matches!(state.write_status, WriteStatus::LocalOpenForWriting) {
888 debug!(?inode, "Error trying to increase file size on write");
889 return Err(InodeError::InodeInvalidWriteStatus(inode.err()));
890 }
891 state.stat.size += len;
892 Ok(state.stat.size)
893 }
894
895 async fn finish_writing(&self, ino: InodeNo, etag: Option<ETag>, fh: u64) -> Result<Lookup, InodeError> {
901 let inode = self.inner.get(ino)?;
902 let ancestors = {
905 let mut ancestors = Vec::new();
906 let mut ancestor_ino = inode.parent();
907 let mut visited = HashSet::new();
908 loop {
909 assert!(visited.insert(ancestor_ino), "cycle detected in inode ancestors");
910 let ancestor = self.inner.get(ancestor_ino)?;
911 ancestors.push(ancestor.clone());
912 if ancestor.ino() == FUSE_ROOT_INODE || ancestor.get_inode_state()?.write_status == WriteStatus::Remote
913 {
914 break;
915 }
916 ancestor_ino = ancestor.parent();
917 }
918 ancestors
919 };
920
921 let mut ancestors_states = ancestors
923 .iter()
924 .rev()
925 .map(|inode| inode.get_mut_inode_state())
926 .collect::<Result<Vec<_>, _>>()?;
927
928 let mut locked_inode = inode.get_mut_inode_state()?;
929 match locked_inode.write_status {
930 WriteStatus::LocalOpenForWriting | WriteStatus::Remote => {
931 locked_inode.pending_upload_hook = None;
932 if let Some(etag) = etag {
941 if self.inner.open_handles.try_remove_writer(&locked_inode, fh) {
946 locked_inode.write_status = WriteStatus::Remote;
947 }
948 locked_inode.stat.etag = Some(etag.into_inner().into_boxed_str());
949 locked_inode
950 .stat
951 .update_validity(self.inner.config.cache_config.file_ttl);
952 } else {
953 locked_inode.write_status = WriteStatus::Remote;
955 self.inner.open_handles.remove_inode(ino); locked_inode.stat.update_validity(Duration::from_secs(0));
957 }
958
959 let children_inos = std::iter::once(inode.ino()).chain(ancestors.iter().map(|ancestor| ancestor.ino()));
962 for (ancestor_state, child_ino) in ancestors_states.iter_mut().rev().zip(children_inos) {
963 match &mut ancestor_state.kind_data {
964 InodeKindData::File { .. } => unreachable!("we know the ancestor is a directory"),
965 InodeKindData::Directory { writing_children, .. } => {
966 writing_children.remove(&child_ino);
967 }
968 }
969 ancestor_state.write_status = WriteStatus::Remote;
970 }
971
972 let stat = locked_inode.stat.clone();
973 drop(locked_inode);
974
975 Ok(Lookup::new(
976 inode.ino(),
977 stat,
978 inode.kind(),
979 Some(S3Location::new(self.inner.s3_path.clone(), inode.valid_key().clone())),
980 ))
981 }
982 _ => Err(InodeError::InodeInvalidWriteStatus(inode.err())),
983 }
984 }
985
986 async fn flush_reader(&self, ino: InodeNo, fh: u64) -> Result<(), InodeError> {
988 let inode = self.inner.get(ino)?;
989 let locked_inode = inode.get_mut_inode_state()?;
990 self.inner.open_handles.deactivate_reader(&locked_inode, fh);
991 Ok(())
992 }
993
994 async fn flush_writer(
1000 &self,
1001 ino: InodeNo,
1002 fh: u64,
1003 hook: PendingUploadHook,
1004 ) -> Result<Option<PendingUploadHook>, InodeError> {
1005 let inode = self.inner.get(ino)?;
1006 let pending_upload_hook = {
1007 let mut locked_inode = inode.get_mut_inode_state()?;
1008 match locked_inode.write_status {
1009 WriteStatus::LocalOpenForWriting => {
1010 if self.inner.open_handles.try_deactivate_writer(&locked_inode, fh) {
1011 Some(locked_inode.pending_upload_hook.get_or_insert(hook).clone())
1012 } else {
1013 None
1014 }
1015 }
1016 _ => None,
1017 }
1018 };
1019 Ok(pending_upload_hook)
1020 }
1021
1022 async fn finish_reading(&self, ino: InodeNo, fh: u64) -> Result<(), InodeError> {
1026 let inode = self.inner.get(ino)?;
1027 let state = inode.get_mut_inode_state()?;
1028 self.inner.open_handles.remove_reader(&state, fh);
1029 Ok(())
1030 }
1031
1032 async fn release_writer(
1038 &self,
1039 ino: InodeNo,
1040 fh: u64,
1041 pending_upload_hook: PendingUploadHook,
1042 location: &S3Location,
1043 ) -> Result<(), InodeError> {
1044 let pending_upload_hook = self.flush_writer(ino, fh, pending_upload_hook).await;
1045 match pending_upload_hook {
1046 Ok(Some(upload_hook)) => {
1047 let completion_result = upload_hook.wait_for_completion().await?;
1048 if completion_result.is_some() {
1049 debug!(key = %location, "upload completed async after file was closed");
1050 }
1051 }
1052 Ok(None) => {}
1053 Err(e) => {
1054 debug!(key = %location, "failed to flush open file handle during release: {e}");
1055 }
1056 }
1057 Ok(())
1058 }
1059
1060 async fn new_readdir_handle(&self, dir_ino: InodeNo) -> Result<u64, InodeError> {
1061 self.new_readdir_handle_with_pagesize(dir_ino, 1000).await
1062 }
1063
1064 async fn readdir<'a>(
1065 &self,
1066 parent: InodeNo,
1067 fh: u64,
1068 offset: i64,
1069 is_readdirplus: bool,
1070 mut add: AddDirEntry<'a>,
1071 ) -> Result<(), InodeError> {
1072 let dir_handle = {
1073 let dir_handles = self.inner.dir_handles.read().unwrap();
1074 dir_handles.get(&fh).cloned().ok_or(InodeError::NoSuchDirHandle { fh })
1075 }?;
1076 trace!("readdir in superblock");
1077 if offset == 0 && dir_handle.offset() != 0 {
1079 trace!("new handle");
1080 let dir = self.inner.get(parent)?;
1081 let new_handle = ReaddirHandle::new(
1082 &self.inner,
1083 dir.ino(),
1084 dir.parent(),
1085 self.inner.full_key_for_inode(&dir).to_string(),
1086 1000,
1087 )?;
1088 *dir_handle.handle.lock().await = new_handle;
1089 dir_handle.rewind_offset();
1090 *dir_handle.last_response.lock().await = None;
1092 }
1093 let readdir_handle = dir_handle.handle.lock().await;
1094
1095 if offset != dir_handle.offset() && offset > 0 {
1098 let last_response = dir_handle.last_response.lock().await;
1107 if let Some((last_offset, entries)) = last_response.as_ref() {
1108 let offset = offset as usize;
1109 let last_offset = *last_offset as usize;
1110 if (last_offset..last_offset + entries.len()).contains(&offset) {
1111 trace!(offset, "repeating readdir response");
1112 for entry in entries[offset - last_offset..].iter() {
1113 if add(
1114 entry.lookup.clone().into(),
1115 entry.name.clone(),
1116 entry.offset,
1117 entry.generation,
1118 ) == AddDirEntryResult::ReplyBufferFull
1119 {
1120 break;
1121 }
1122 if is_readdirplus && entry.name != "." && entry.name != ".." {
1126 self.inner.remember(&entry.lookup.inode)
1127 }
1128 }
1129 return Ok(());
1130 }
1131 }
1132 return Err(InodeError::OutOfOrderReadDir {
1133 expected: dir_handle.offset(),
1134 actual: offset,
1135 fh,
1136 });
1137 }
1138
1139 struct Reply<'a> {
1142 add: AddDirEntry<'a>,
1143 entries: Vec<DirectoryEntryReaddir>,
1144 }
1145
1146 impl Reply<'_> {
1147 async fn finish(self, offset: i64, dir_handle: &DirHandle) {
1148 *dir_handle.last_response.lock().await = Some((offset, self.entries));
1149 }
1150 fn add(&mut self, entry: DirectoryEntryReaddir) -> AddDirEntryResult {
1151 let result = (self.add)(
1152 entry.lookup.clone().into(),
1153 entry.name.clone(),
1154 entry.offset,
1155 entry.generation,
1156 );
1157 if result == AddDirEntryResult::EntryAdded {
1158 self.entries.push(entry);
1159 }
1160 result
1161 }
1162 }
1163
1164 let mut reply = Reply { add, entries: vec![] };
1165
1166 if dir_handle.offset() < 1 {
1167 let lookup = self.getattr_with_inode(parent, false).await?;
1168 let entry = DirectoryEntryReaddir {
1169 offset: dir_handle.offset() + 1,
1170 name: ".".into(),
1171 generation: 0,
1172 lookup,
1173 };
1174 if reply.add(entry) == AddDirEntryResult::ReplyBufferFull {
1175 reply.finish(offset, &dir_handle).await;
1176 return Ok(());
1177 }
1178 dir_handle.next_offset();
1179 }
1180 if dir_handle.offset() < 2 {
1181 let lookup = self.getattr_with_inode(readdir_handle.parent(), false).await?;
1182 let entry = DirectoryEntryReaddir {
1183 offset: dir_handle.offset() + 1,
1184 name: "..".into(),
1185 generation: 0,
1186 lookup,
1187 };
1188 if reply.add(entry) == AddDirEntryResult::ReplyBufferFull {
1189 reply.finish(offset, &dir_handle).await;
1190 return Ok(());
1191 }
1192 dir_handle.next_offset();
1193 }
1194
1195 loop {
1196 let next = match readdir_handle.next(&self.inner).await? {
1197 None => {
1198 reply.finish(offset, &dir_handle).await;
1199 return Ok(());
1200 }
1201 Some(next) => next,
1202 };
1203 trace!(next_inode = ?next.inode, "new inode yielded by readdir handle");
1204 let entry = DirectoryEntryReaddir {
1205 offset: dir_handle.offset() + 1,
1206 name: next.inode.name().into(),
1207 generation: 0,
1208 lookup: next.clone(),
1209 };
1210
1211 if reply.add(entry) == AddDirEntryResult::ReplyBufferFull {
1212 readdir_handle.readd(next);
1213 reply.finish(offset, &dir_handle).await;
1214 return Ok(());
1215 }
1216 if is_readdirplus {
1217 self.inner.remember(&next.inode)
1218 }
1219 dir_handle.next_offset();
1220 }
1221 }
1222
1223 async fn releasedir(&self, fh: u64) -> Result<(), InodeError> {
1224 let mut dir_handles = self.inner.dir_handles.write().unwrap();
1225 dir_handles
1226 .remove(&fh)
1227 .map(|_| ())
1228 .ok_or(InodeError::NoSuchDirHandle { fh })
1229 }
1230
1231 async fn create(&self, dir: InodeNo, name: &OsStr, kind: InodeKind) -> Result<Lookup, InodeError> {
1232 trace!(parent=?dir, ?name, "create");
1233
1234 let existing = self
1235 .inner
1236 .lookup_by_name(dir, name, self.inner.config.cache_config.serve_lookup_from_cache)
1237 .await;
1238 match existing {
1239 Ok(lookup) => return Err(InodeError::FileAlreadyExists(lookup.inode.err())),
1240 Err(InodeError::FileDoesNotExist(_, _)) => (),
1241 Err(e) => return Err(e),
1242 }
1243
1244 let name: ValidName = name.try_into()?;
1246
1247 let (lookup, inode) = {
1249 let parent_inode = self.inner.get(dir)?;
1250 let mut parent_state = parent_inode.get_mut_inode_state()?;
1251
1252 let InodeKindData::Directory { children, .. } = &mut parent_state.kind_data else {
1256 return Err(InodeError::NotADirectory(parent_inode.err()));
1257 };
1258 if let Some(inode) = children.get(name.as_ref()) {
1259 return Err(InodeError::FileAlreadyExists(inode.err()));
1260 }
1261
1262 let stat = match kind {
1263 InodeKind::File => InodeStat::for_file(
1265 0,
1266 OffsetDateTime::now_utc(),
1267 None,
1268 None,
1269 None,
1270 self.inner.config.cache_config.file_ttl,
1271 ),
1272 InodeKind::Directory => {
1273 InodeStat::for_directory(self.inner.mount_time, self.inner.config.cache_config.dir_ttl)
1274 }
1275 };
1276
1277 let write_status = WriteStatus::LocalUnopened;
1278 let state = InodeState::new(&stat, kind, write_status);
1279 let inode = self
1280 .inner
1281 .create_inode_locked(&parent_inode, &mut parent_state, name, kind, state, true)?;
1282 let lookup = Lookup::new(
1283 inode.ino(),
1284 stat,
1285 inode.kind(),
1286 Some(S3Location::new(self.inner.s3_path.clone(), inode.valid_key().clone())),
1287 );
1288 (lookup, inode)
1289 };
1290
1291 self.inner.remember(&inode);
1292 Ok(lookup)
1293 }
1294
1295 async fn forget(&self, ino: InodeNo, n: u64) {
1298 let mut inodes = self.inner.inodes.write().unwrap();
1299
1300 match inodes.decrease_lookup_count(ino, n) {
1301 Ok(Some(removed_inode)) => {
1302 trace!(ino, "removing inode from superblock");
1303 let parent_ino = removed_inode.parent();
1304
1305 if let Some((parent, _)) = inodes.get_mut(&parent_ino) {
1308 let mut parent_state = parent.get_mut_inode_state_no_check();
1309 let InodeKindData::Directory {
1310 children,
1311 writing_children,
1312 ..
1313 } = &mut parent_state.kind_data
1314 else {
1315 unreachable!("parent is always a directory");
1316 };
1317 if let Some(child) = children.get(removed_inode.name()) {
1318 if child.ino() == ino {
1320 children.remove(removed_inode.name());
1321 }
1322 }
1323 writing_children.remove(&ino);
1324 }
1325
1326 drop(inodes);
1327
1328 if let Ok(state) = removed_inode.get_inode_state() {
1329 metrics::counter!("metadata_cache.inode_forgotten_before_expiry")
1330 .increment(state.stat.is_valid().into());
1331 };
1332 if self.inner.open_handles.remove_inode(ino) {
1333 debug!("Open file handle(s) found for forgotten inode {}", ino);
1341 }
1342 }
1343 Ok(None) => {}
1344 Err(_) => {
1345 debug_assert!(
1346 false,
1347 "forget should not be called on inode already removed from superblock"
1348 );
1349 error!("forget called on inode {ino} already removed from the superblock");
1350 }
1351 }
1352 }
1353
1354 async fn try_reactivate_handle(&self, ino: InodeNo, fh: u64, mode: ReadWriteMode) -> Result<bool, InodeError> {
1361 let inode = self.inner.get(ino)?;
1362 let mut locked_inode = inode.get_mut_inode_state()?;
1363 match mode {
1364 ReadWriteMode::Read => {
1365 if self.inner.open_handles.try_activate_reader(&locked_inode, fh) {
1366 return Ok(true);
1367 }
1368 }
1369 ReadWriteMode::Write => {
1370 if self.inner.open_handles.try_activate_writer(&locked_inode, fh) {
1371 debug_assert!(locked_inode.write_status == WriteStatus::LocalOpenForWriting);
1372 locked_inode.pending_upload_hook = None;
1373 return Ok(true);
1374 }
1375 }
1376 }
1377 Ok(false)
1378 }
1379}
1380
1381impl<OC: ObjectClient + Send + Sync> SuperblockInner<OC> {
1382 pub fn get(&self, ino: InodeNo) -> Result<Inode, InodeError> {
1387 let inode = self
1388 .inodes
1389 .read()
1390 .unwrap()
1391 .get_inode(&ino)
1392 .cloned()
1393 .ok_or(InodeError::InodeDoesNotExist(ino))?;
1394 inode.verify_inode(ino, &self.s3_path.prefix)?;
1395 Ok(inode)
1396 }
1397
1398 fn full_key_for_inode(&self, inode: &Inode) -> ValidKey {
1399 inode.valid_key().full_key(&self.s3_path.prefix)
1400 }
1401
1402 pub fn remember(&self, inode: &Inode) {
1407 let mut inodes_write = self.inodes.write().unwrap();
1408 inodes_write.increase_lookup_count(inode);
1409 }
1410
1411 async fn lookup_by_name(
1417 &self,
1418 parent_ino: InodeNo,
1419 name: &OsStr,
1420 allow_cache: bool,
1421 ) -> Result<LookedUpInode, InodeError> {
1422 let name: ValidName = name.try_into()?;
1423
1424 let lookup = if allow_cache {
1425 self.cache_lookup(parent_ino, &name)
1426 } else {
1427 None
1428 };
1429
1430 let lookup = match lookup {
1431 Some(lookup) => lookup?,
1432 None => {
1433 let remote = self.remote_lookup(parent_ino, name).await?;
1434 self.update_from_remote(parent_ino, name, remote)?
1435 }
1436 };
1437
1438 lookup
1439 .inode
1440 .verify_child(parent_ino, name.as_ref(), &self.s3_path.prefix)?;
1441 Ok(lookup)
1442 }
1443
1444 fn cache_lookup(&self, parent_ino: InodeNo, name: &str) -> Option<Result<LookedUpInode, InodeError>> {
1449 fn do_cache_lookup<O: ObjectClient + Send + Sync>(
1450 superblock: &SuperblockInner<O>,
1451 parent: Inode,
1452 name: &str,
1453 ) -> Option<Result<LookedUpInode, InodeError>> {
1454 match &parent.get_inode_state().ok()?.kind_data {
1455 InodeKindData::File { .. } => unreachable!("parent should be a directory!"),
1456 InodeKindData::Directory { children, .. } => {
1457 if let Some(inode) = children.get(name) {
1458 let locked = inode.get_inode_state().ok()?;
1459 if locked.stat.is_valid() {
1460 let lookup = LookedUpInode {
1461 inode: inode.clone(),
1462 stat: locked.stat.clone(),
1463 path: superblock.s3_path.clone(),
1464 write_status: locked.write_status,
1465 };
1466 return Some(Ok(lookup));
1467 }
1468 }
1469 }
1470 };
1471
1472 if superblock.negative_cache.contains(parent.ino(), name) {
1473 return Some(Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err())));
1474 }
1475
1476 None
1477 }
1478
1479 let lookup = self
1480 .get(parent_ino)
1481 .ok()
1482 .and_then(|parent| do_cache_lookup(self, parent, name));
1483
1484 match &lookup {
1485 Some(lookup) => trace!("lookup returned from cache: {:?}", lookup),
1486 None => trace!("no lookup available from cache"),
1487 }
1488 metrics::counter!("metadata_cache.cache_hit").increment(lookup.is_some().into());
1489
1490 lookup
1491 }
1492
1493 async fn remote_lookup(
1496 &self,
1497 parent_ino: InodeNo,
1498 name: ValidName<'_>,
1499 ) -> Result<Option<RemoteLookup>, InodeError> {
1500 let parent = self.get(parent_ino)?;
1501 let full_path: String = self
1502 .full_key_for_inode(&parent)
1503 .new_child(name, InodeKind::Directory)
1504 .map_err(|_| InodeError::NotADirectory(parent.err()))?
1505 .into();
1506
1507 let object_key = &full_path[..(full_path.len() - 1)];
1508 let directory_prefix = &full_path[..];
1509
1510 let head_object_params = HeadObjectParams::new();
1535 let mut file_lookup = self
1536 .client
1537 .head_object(&self.s3_path.bucket, object_key, &head_object_params)
1538 .fuse();
1539 let mut dir_lookup = self
1540 .client
1541 .list_objects(&self.s3_path.bucket, None, "/", 1, directory_prefix)
1542 .fuse();
1543
1544 let mut file_state = None;
1545
1546 for _ in 0..2 {
1547 select_biased! {
1548 result = file_lookup => {
1549 match result {
1550 Ok(HeadObjectResult { size, last_modified, restore_status, etag, storage_class, .. }) => {
1551 let stat = InodeStat::for_file(size as usize, last_modified, Some(etag.into_inner().into_boxed_str()), storage_class.as_deref(), restore_status, self.config.cache_config.file_ttl);
1552 file_state = Some(stat);
1553 }
1554 Err(ObjectClientError::ServiceError(HeadObjectError::NotFound)) => {},
1556 Err(e) => return Err(InodeError::client_error(e, "HeadObject failed", &self.s3_path.bucket, object_key)),
1557 }
1558 }
1559
1560 result = dir_lookup => {
1561 let result = result.map_err(|e| InodeError::client_error(e, "ListObjectsV2 failed", &self.s3_path.bucket, object_key))?;
1562
1563 let found_directory = if result
1564 .common_prefixes
1565 .first()
1566 .map(|prefix| prefix.starts_with(directory_prefix))
1567 .unwrap_or(false)
1568 {
1569 true
1570 } else if result
1571 .objects
1572 .first()
1573 .map(|object| object.key.starts_with(directory_prefix))
1574 .unwrap_or(false)
1575 {
1576 if result.objects[0].key == directory_prefix {
1577 trace!(
1578 parent = ?parent_ino,
1579 ?name,
1580 size = result.objects[0].size,
1581 "found a directory that shadows this name"
1582 );
1583 if result.objects[0].size > 0 {
1586 warn!(
1587 "key {:?} is not a valid filename (ends in `/`); will be hidden and unavailable",
1588 directory_prefix
1589 );
1590 }
1591 }
1592 true
1593 } else {
1594 false
1595 };
1596
1597 if found_directory {
1600 trace!(parent = ?parent_ino, ?name, "lookup ListObjects found a directory");
1601 let stat = InodeStat::for_directory(self.mount_time, self.config.cache_config.dir_ttl);
1602 return Ok(Some(RemoteLookup { kind: InodeKind::Directory, stat }));
1603 }
1604 }
1605 }
1606 }
1607
1608 if let Some(mut stat) = file_state {
1611 trace!(parent = ?parent_ino, ?name, etag =? stat.etag, "found a regular file in S3");
1612 stat.update_validity(self.config.cache_config.file_ttl);
1614 Ok(Some(RemoteLookup {
1615 kind: InodeKind::File,
1616 stat,
1617 }))
1618 } else {
1619 trace!(parent = ?parent_ino, ?name, "not found");
1620 Ok(None)
1621 }
1622 }
1623
1624 fn update_from_remote(
1627 &self,
1628 parent_ino: InodeNo,
1629 name: ValidName,
1630 remote: Option<RemoteLookup>,
1631 ) -> Result<LookedUpInode, InodeError> {
1632 let parent = self.get(parent_ino)?;
1633
1634 if parent.kind() != InodeKind::Directory {
1636 return Err(InodeError::NotADirectory(parent.err()));
1637 }
1638
1639 if self.config.cache_config.use_negative_cache {
1640 match &remote {
1641 Some(_) => self.negative_cache.remove(parent_ino, &name),
1643 None => self.negative_cache.insert(parent_ino, &name),
1645 }
1646 }
1647
1648 if let Some(looked_up) = Self::try_update_fast_path(&parent, &name, &remote, self.s3_path.clone())? {
1650 return Ok(looked_up);
1651 }
1652
1653 self.update_slow_path(parent, name, remote)
1654 }
1655
1656 fn try_update_fast_path(
1659 parent: &Inode,
1660 name: &str,
1661 remote: &Option<RemoteLookup>,
1662 s3_path: Arc<S3Path>,
1663 ) -> Result<Option<LookedUpInode>, InodeError> {
1664 let parent_state = parent.get_inode_state()?;
1665 let inode = match &parent_state.kind_data {
1666 InodeKindData::File { .. } => unreachable!("we know parent is a directory"),
1667 InodeKindData::Directory { children, .. } => children.get(name),
1668 };
1669 match (remote, inode) {
1670 (None, None) => Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err())),
1671 (Some(remote), Some(existing_inode)) => {
1672 let mut existing_state = existing_inode.get_mut_inode_state()?;
1673 if remote.kind == existing_inode.kind()
1674 && existing_state.write_status == WriteStatus::Remote
1675 && existing_state.stat.etag == remote.stat.etag
1676 {
1677 trace!(parent=?existing_inode.parent(), name=?existing_inode.name(), ino=?existing_inode.ino(), "updating inode in place");
1678 existing_state.stat = remote.stat.clone();
1679 Ok(Some(LookedUpInode {
1680 inode: existing_inode.clone(),
1681 stat: remote.stat.clone(),
1682 path: s3_path.clone(),
1683 write_status: existing_state.write_status,
1684 }))
1685 } else {
1686 Ok(None)
1687 }
1688 }
1689 _ => Ok(None),
1690 }
1691 }
1692
1693 fn update_slow_path(
1697 &self,
1698 parent: Inode,
1699 name: ValidName,
1700 remote: Option<RemoteLookup>,
1701 ) -> Result<LookedUpInode, InodeError> {
1702 let mut parent_state = parent.get_mut_inode_state()?;
1703 let inode = match &parent_state.kind_data {
1704 InodeKindData::File { .. } => unreachable!("we know parent is a directory"),
1705 InodeKindData::Directory { children, .. } => children.get(name.as_ref()).cloned(),
1706 };
1707 match (remote, inode) {
1708 (None, None) => Err(InodeError::FileDoesNotExist(name.to_string(), parent.err())),
1709 (None, Some(existing_inode)) => {
1710 let InodeKindData::Directory {
1711 children,
1712 writing_children,
1713 ..
1714 } = &mut parent_state.kind_data
1715 else {
1716 unreachable!("we know parent is a directory");
1717 };
1718 if writing_children.contains(&existing_inode.ino()) {
1719 let mut sync = existing_inode.get_mut_inode_state()?;
1720
1721 let validity = match existing_inode.kind() {
1722 InodeKind::File => self.config.cache_config.file_ttl,
1723 InodeKind::Directory => self.config.cache_config.dir_ttl,
1724 };
1725 sync.stat.update_validity(validity);
1726 let stat = sync.stat.clone();
1727 let write_status = sync.write_status;
1728 drop(sync);
1729
1730 Ok(LookedUpInode {
1731 inode: existing_inode,
1732 stat,
1733 path: self.s3_path.clone(),
1734 write_status,
1735 })
1736 } else {
1737 children.remove(name.as_ref());
1741 Err(InodeError::FileDoesNotExist(name.to_string(), parent.err()))
1742 }
1743 }
1744 (Some(remote), None) => {
1745 let write_status = WriteStatus::Remote;
1746 let state = InodeState::new(&remote.stat, remote.kind, write_status);
1747 self.create_inode_locked(&parent, &mut parent_state, name, remote.kind, state, false)
1748 .map(|inode| LookedUpInode {
1749 inode,
1750 stat: remote.stat,
1751 path: self.s3_path.clone(),
1752 write_status,
1753 })
1754 }
1755 (Some(remote), Some(existing_inode)) => {
1756 let mut existing_state = existing_inode.get_mut_inode_state()?;
1761 let existing_is_remote = existing_state.write_status == WriteStatus::Remote;
1762
1763 if remote.kind == InodeKind::File && !existing_is_remote {
1766 return Ok(LookedUpInode {
1767 inode: existing_inode.clone(),
1768 stat: existing_state.stat.clone(),
1769 path: self.s3_path.clone(),
1770 write_status: existing_state.write_status,
1771 });
1772 }
1773
1774 let same_kind = remote.kind == existing_inode.kind();
1778 let same_etag = existing_state.stat.etag == remote.stat.etag;
1779 if same_kind && same_etag && (existing_is_remote || remote.kind == InodeKind::Directory) {
1780 trace!(parent=?existing_inode.parent(), name=?existing_inode.name(), ino=?existing_inode.ino(), "updating inode in place (slow path)");
1781 existing_state.stat = remote.stat.clone();
1782 if remote.kind == InodeKind::Directory && !existing_is_remote {
1783 trace!(parent=?existing_inode.parent(), name=?existing_inode.name(), ino=?existing_inode.ino(), "local directory has become remote");
1784 existing_state.write_status = WriteStatus::Remote;
1785 let InodeKindData::Directory { writing_children, .. } = &mut parent_state.kind_data else {
1786 unreachable!("we know parent is a directory");
1787 };
1788 writing_children.remove(&existing_inode.ino());
1789 }
1790 return Ok(LookedUpInode {
1791 inode: existing_inode.clone(),
1792 stat: remote.stat,
1793 path: self.s3_path.clone(),
1794 write_status: existing_state.write_status,
1795 });
1796 }
1797
1798 trace!(
1799 ino=?existing_inode.ino(),
1800 same_kind,
1801 same_etag,
1802 existing_is_remote,
1803 remote_is_dir = remote.kind == InodeKind::Directory,
1804 "inode could not be updated in place",
1805 );
1806
1807 debug!(
1811 parent=?existing_inode.parent(),
1812 name=?existing_inode.name(),
1813 ino=?existing_inode.ino(),
1814 "inode needs to be recreated",
1815 );
1816 let write_status = WriteStatus::Remote;
1817 let state = InodeState::new(&remote.stat, remote.kind, write_status);
1818 let new_inode =
1819 self.create_inode_locked(&parent, &mut parent_state, name, remote.kind, state, false)?;
1820 Ok(LookedUpInode {
1821 inode: new_inode,
1822 stat: remote.stat,
1823 path: self.s3_path.clone(),
1824 write_status,
1825 })
1826 }
1827 }
1828 }
1829
1830 fn create_inode_locked(
1835 &self,
1836 parent: &Inode,
1837 parent_locked: &mut InodeState,
1838 name: ValidName,
1839 kind: InodeKind,
1840 state: InodeState,
1841 is_new_file: bool,
1842 ) -> Result<Inode, InodeError> {
1843 let key = parent
1844 .valid_key()
1845 .new_child(name, kind)
1846 .map_err(|_| InodeError::NotADirectory(parent.err()))?;
1847 let next_ino = self.next_ino.fetch_add(1, Ordering::SeqCst);
1848 let inode = Inode::new(next_ino, parent.ino(), key, &self.s3_path.prefix, state);
1849 trace!(parent=?inode.parent(), name=?inode.name(), kind=?inode.kind(), new_ino=?inode.ino(), key=?inode.key(), "created new inode");
1850
1851 match &mut parent_locked.kind_data {
1852 InodeKindData::File {} => {
1853 debug_assert!(false, "inodes never change kind");
1854 return Err(InodeError::NotADirectory(parent.err()));
1855 }
1856 InodeKindData::Directory {
1857 children,
1858 writing_children,
1859 ..
1860 } => {
1861 let existing_inode = children.insert(name.as_ref().into(), inode.clone());
1862 if is_new_file {
1863 writing_children.insert(next_ino);
1864 }
1865 if let Some(existing_inode) = existing_inode {
1866 writing_children.remove(&existing_inode.ino());
1867 }
1868 }
1869 }
1870
1871 Ok(inode)
1872 }
1873}
1874
1875#[derive(Debug, Clone)]
1877pub struct RemoteLookup {
1878 kind: InodeKind,
1879 stat: InodeStat,
1880}
1881
1882#[derive(Debug, Clone)]
1886pub struct LookedUpInode {
1887 pub inode: Inode,
1888 pub stat: InodeStat,
1889 pub path: Arc<S3Path>,
1890 pub write_status: WriteStatus,
1891}
1892
1893impl LookedUpInode {
1894 pub fn validity(&self) -> Duration {
1896 self.stat.expiry.remaining_ttl()
1897 }
1898}
1899
1900impl From<LookedUpInode> for InodeInformation {
1901 fn from(val: LookedUpInode) -> Self {
1902 InodeInformation::new(val.inode.ino(), val.stat, val.inode.kind())
1903 }
1904}
1905
1906impl From<LookedUpInode> for Lookup {
1907 fn from(val: LookedUpInode) -> Self {
1908 let location = Some(S3Location::new(val.path.clone(), val.inode.valid_key().clone()));
1909
1910 Lookup::new_from_info_and_loc(val.into(), location)
1911 }
1912}
1913
1914#[derive(Debug, Default)]
1919struct InodeMap {
1920 map: HashMap<InodeNo, (Inode, u64)>,
1921}
1922
1923impl InodeMap {
1924 fn get_inode(&self, ino: &InodeNo) -> Option<&Inode> {
1925 self.map.get(ino).map(|(node, _lookup_count)| node)
1926 }
1927
1928 fn get_mut(&mut self, ino: &InodeNo) -> Option<&mut (Inode, u64)> {
1929 self.map.get_mut(ino)
1930 }
1931
1932 #[cfg(test)]
1933 fn get_count(&self, ino: &InodeNo) -> Option<u64> {
1934 self.map.get(ino).map(|(_, lookup_count)| *lookup_count)
1935 }
1936
1937 fn insert(&mut self, ino: InodeNo, inode: Inode, lookup_count: u64) -> Option<Inode> {
1938 Self::add_metrics(&inode);
1939 trace!(ino, lookup_count, "inserting inode");
1940 self.map
1941 .insert(ino, (inode, lookup_count))
1942 .inspect(|(inode, _count)| {
1943 Self::remove_metrics(inode);
1944 })
1945 .map(|(node, _lookup_count)| node)
1946 }
1947
1948 fn replace_or_insert(&mut self, ino: InodeNo, new_inode: &Inode) {
1949 self.map
1950 .entry(ino)
1951 .and_modify(|(inode, count)| {
1952 Self::remove_metrics(inode);
1953 Self::add_metrics(new_inode);
1954 trace!(ino = inode.ino(), lookup_count = count, "replaced inode");
1955 *inode = new_inode.clone();
1956 })
1957 .or_insert_with(|| {
1958 trace!(ino, lookup_count = 1, "inserting inode");
1959 Self::add_metrics(new_inode);
1960 (new_inode.clone(), 1)
1961 });
1962 }
1963
1964 fn increase_lookup_count(&mut self, inode: &Inode) {
1966 self.map
1967 .entry(inode.ino())
1968 .and_modify(|(_, count)| {
1969 *count += 1;
1970 trace!(ino = inode.ino(), new_lookup_count = *count, "incremented lookup count");
1971 })
1972 .or_insert_with(|| {
1973 Self::add_metrics(inode);
1974 trace!(ino = inode.ino(), lookup_count = 1, "inserting inode");
1975 (inode.clone(), 1)
1976 });
1977 }
1978
1979 fn decrease_lookup_count(&mut self, ino: InodeNo, n: u64) -> Result<Option<Inode>, InodeMapError> {
1981 match self.map.get_mut(&ino) {
1982 Some((_, count)) => {
1983 *count = count.saturating_sub(n);
1985 trace!(ino = ino, new_lookup_count = *count, "decremented lookup count");
1986
1987 if *count == 0 {
1988 trace!(ino, "removing inode from superblock");
1989 let (inode, _) = self.map.remove(&ino).unwrap();
1990 Ok(Some(inode))
1991 } else {
1992 Ok(None)
1993 }
1994 }
1995 None => Err(InodeMapError::InodeNotFound(ino)),
1996 }
1997 }
1998
1999 fn remove_metrics(inode: &Inode) {
2000 metrics::gauge!("fs.inodes").decrement(1.0);
2001 metrics::gauge!("fs.inode_kinds", "kind" => inode.kind().as_str()).decrement(1.0);
2002 }
2003
2004 fn add_metrics(inode: &Inode) {
2005 metrics::gauge!("fs.inodes").increment(1.0);
2006 metrics::gauge!("fs.inode_kinds", "kind" => inode.kind().as_str()).increment(1.0);
2007 }
2008}
2009
2010#[derive(Debug, Error)]
2011pub enum InodeMapError {
2012 #[error("inode {0} not found in InodeMap")]
2013 InodeNotFound(InodeNo),
2014}
2015
2016#[cfg(test)]
2017mod tests {
2018 use mountpoint_s3_client::{
2019 mock_client::{MockClient, MockObject, Operation},
2020 types::ETag,
2021 };
2022 use std::ffi::OsString;
2023 use std::str::FromStr;
2024 use test_case::test_case;
2025 use time::{Duration, OffsetDateTime};
2026
2027 use crate::fs::{FUSE_ROOT_INODE, TimeToLive, ToErrno};
2028 use crate::metablock::AddDirEntryResult;
2029 use crate::s3::{Bucket, Prefix};
2030
2031 use super::*;
2032
2033 macro_rules! assert_inode_stat {
2035 ($lookup:expr, $kind:expr, $datetime:expr, $size:expr) => {
2036 assert_eq!($lookup.kind(), $kind);
2037 assert!($lookup.stat().atime >= $datetime && $lookup.stat().atime < $datetime + Duration::seconds(5));
2038 assert!($lookup.stat().ctime >= $datetime && $lookup.stat().ctime < $datetime + Duration::seconds(5));
2039 assert!($lookup.stat().mtime >= $datetime && $lookup.stat().mtime < $datetime + Duration::seconds(5));
2040 assert_eq!($lookup.stat().size, $size);
2041 };
2042 }
2043
2044 #[test_case(""; "unprefixed")]
2045 #[test_case("test_prefix/"; "prefixed")]
2046 #[tokio::test]
2047 async fn test_lookup(prefix: &str) {
2048 let bucket = Bucket::new("test_bucket").unwrap();
2049 let client = Arc::new(
2050 MockClient::config()
2051 .bucket(bucket.to_string())
2052 .part_size(1024 * 1024)
2053 .build(),
2054 );
2055
2056 let keys = &[
2057 format!("{prefix}dir0/file0.txt"),
2058 format!("{prefix}dir0/sdir0/file0.txt"),
2059 format!("{prefix}dir0/sdir0/file1.txt"),
2060 format!("{prefix}dir0/sdir0/file2.txt"),
2061 format!("{prefix}dir0/sdir1/file0.txt"),
2062 format!("{prefix}dir0/sdir1/file1.txt"),
2063 format!("{prefix}dir1/sdir2/file0.txt"),
2064 format!("{prefix}dir1/sdir2/file1.txt"),
2065 format!("{prefix}dir1/sdir2/file2.txt"),
2066 format!("{prefix}dir1/sdir3/file0.txt"),
2067 format!("{prefix}dir1/sdir3/file1.txt"),
2068 ];
2069
2070 let object_size = 30;
2071 let mut last_modified = OffsetDateTime::UNIX_EPOCH;
2072 for key in keys {
2073 let mut obj = MockObject::constant(0xaa, object_size, ETag::for_tests());
2074 last_modified += Duration::days(1);
2075 obj.set_last_modified(last_modified);
2076 client.add_object(key, obj);
2077 }
2078
2079 let prefix = Prefix::new(prefix).expect("valid prefix");
2080 let ts = OffsetDateTime::now_utc();
2081 let superblock = Superblock::new(
2082 client.clone(),
2083 S3Path::new(bucket.clone(), prefix.clone()),
2084 Default::default(),
2085 );
2086
2087 for _ in 0..2 {
2089 let dir0 = superblock
2090 .lookup(FUSE_ROOT_INODE, &OsString::from("dir0"))
2091 .await
2092 .expect("should exist");
2093 assert_inode_stat!(dir0, InodeKind::Directory, ts, 0);
2094 assert_eq!(
2095 dir0.s3_location().expect("should have location").full_key().as_ref(),
2096 format!("{prefix}dir0/")
2097 );
2098
2099 let dir1 = superblock
2100 .lookup(FUSE_ROOT_INODE, &OsString::from("dir1"))
2101 .await
2102 .expect("should exist");
2103 assert_inode_stat!(dir1, InodeKind::Directory, ts, 0);
2104 assert_eq!(
2105 dir1.s3_location().expect("should have location").full_key().as_ref(),
2106 format!("{prefix}dir1/")
2107 );
2108
2109 let sdir0 = superblock
2110 .lookup(dir0.ino(), &OsString::from("sdir0"))
2111 .await
2112 .expect("should exist");
2113 assert_inode_stat!(sdir0, InodeKind::Directory, ts, 0);
2114 assert_eq!(
2115 sdir0.s3_location().expect("should have location").full_key().as_ref(),
2116 format!("{prefix}dir0/sdir0/")
2117 );
2118
2119 let sdir1 = superblock
2120 .lookup(dir0.ino(), &OsString::from("sdir1"))
2121 .await
2122 .expect("should exist");
2123 assert_inode_stat!(sdir1, InodeKind::Directory, ts, 0);
2124 assert_eq!(
2125 sdir1.s3_location().expect("should have location").full_key().as_ref(),
2126 format!("{prefix}dir0/sdir1/")
2127 );
2128
2129 let sdir2 = superblock
2130 .lookup(dir1.ino(), &OsString::from("sdir2"))
2131 .await
2132 .expect("should exist");
2133 assert_inode_stat!(sdir2, InodeKind::Directory, ts, 0);
2134 assert_eq!(
2135 sdir2.s3_location().expect("should have location").full_key().as_ref(),
2136 format!("{prefix}dir1/sdir2/")
2137 );
2138
2139 let sdir3 = superblock
2140 .lookup(dir1.ino(), &OsString::from("sdir3"))
2141 .await
2142 .expect("should exist");
2143 assert_inode_stat!(sdir3, InodeKind::Directory, ts, 0);
2144 assert_eq!(
2145 sdir3.s3_location().expect("should have location").full_key().as_ref(),
2146 format!("{prefix}dir1/sdir3/")
2147 );
2148
2149 for (dir, sdir, ino, n) in &[
2150 (0, 0, sdir0.ino(), 3),
2151 (0, 1, sdir1.ino(), 2),
2152 (1, 2, sdir2.ino(), 3),
2153 (1, 3, sdir3.ino(), 2),
2154 ] {
2155 for i in 0..*n {
2156 let file = superblock
2157 .lookup(*ino, &OsString::from(format!("file{i}.txt")))
2158 .await
2159 .expect("inode should exist");
2160 let full_key = file.s3_location().expect("should have location").full_key();
2162 let modified_time = client
2163 .head_object(&bucket, full_key.as_ref(), &HeadObjectParams::new())
2164 .await
2165 .expect("object should exist")
2166 .last_modified;
2167 assert_inode_stat!(file, InodeKind::File, modified_time, object_size);
2168 assert_eq!(full_key.as_ref(), format!("{prefix}dir{dir}/sdir{sdir}/file{i}.txt"));
2169 }
2170 }
2171 }
2172 }
2173
2174 #[test_case(true; "cached")]
2175 #[test_case(false; "not cached")]
2176 #[tokio::test]
2177 async fn test_lookup_with_caching(cached: bool) {
2178 let bucket = Bucket::new("test_bucket").unwrap();
2179 let prefix = "prefix/";
2180 let client = Arc::new(
2181 MockClient::config()
2182 .bucket(bucket.to_string())
2183 .part_size(1024 * 1024)
2184 .build(),
2185 );
2186
2187 let keys = &[
2188 format!("{prefix}file0.txt"),
2189 format!("{prefix}sdir0/file0.txt"),
2190 format!("{prefix}sdir0/file1.txt"),
2191 ];
2192
2193 let object_size = 30;
2194 let mut last_modified = OffsetDateTime::UNIX_EPOCH;
2195 for key in keys {
2196 let mut obj = MockObject::constant(0xaa, object_size, ETag::for_tests());
2197 last_modified += Duration::days(1);
2198 obj.set_last_modified(last_modified);
2199 client.add_object(key, obj);
2200 }
2201
2202 let prefix = Prefix::new(prefix).expect("valid prefix");
2203 let ttl = if cached {
2204 std::time::Duration::from_secs(60 * 60 * 24 * 7) } else {
2206 std::time::Duration::ZERO
2207 };
2208 let superblock = Superblock::new(
2209 client.clone(),
2210 S3Path::new(bucket, prefix.clone()),
2211 SuperblockConfig {
2212 cache_config: CacheConfig::new(TimeToLive::Duration(ttl)),
2213 s3_personality: S3Personality::Standard,
2214 },
2215 );
2216
2217 let entries = ["file0.txt", "sdir0"];
2218 for entry in entries {
2219 _ = superblock
2220 .lookup(FUSE_ROOT_INODE, entry.as_ref())
2221 .await
2222 .expect("should exist");
2223 }
2224
2225 for key in keys {
2226 client.remove_object(key);
2227 }
2228
2229 for entry in entries {
2230 let lookup = superblock.lookup(FUSE_ROOT_INODE, entry.as_ref()).await;
2231 if cached {
2232 lookup.expect("inode should still be served from cache");
2233 } else {
2234 lookup.expect_err("entry should have expired, and not be found in S3");
2235 }
2236 }
2237 }
2238
2239 #[test_case(true; "cached")]
2240 #[test_case(false; "not cached")]
2241 #[tokio::test]
2242 async fn test_negative_lookup_with_caching(cached: bool) {
2243 let bucket = Bucket::new("test_bucket").unwrap();
2244 let prefix = "prefix/";
2245 let client = Arc::new(MockClient::config().bucket(bucket.to_string()).part_size(32).build());
2246
2247 let prefix = Prefix::new(prefix).expect("valid prefix");
2248 let ttl = if cached {
2249 std::time::Duration::from_secs(60 * 60 * 24 * 7) } else {
2251 std::time::Duration::ZERO
2252 };
2253 let superblock = Superblock::new(
2254 client.clone(),
2255 S3Path::new(bucket, prefix.clone()),
2256 SuperblockConfig {
2257 cache_config: CacheConfig::new(TimeToLive::Duration(ttl)),
2258 s3_personality: S3Personality::Standard,
2259 },
2260 );
2261
2262 let entries = ["file0.txt", "sdir0"];
2263 for entry in entries {
2264 _ = superblock
2265 .lookup(FUSE_ROOT_INODE, entry.as_ref())
2266 .await
2267 .expect_err("should not exist");
2268 }
2269
2270 let keys = &[
2271 format!("{prefix}file0.txt"),
2272 format!("{prefix}sdir0/file0.txt"),
2273 format!("{prefix}sdir0/file1.txt"),
2274 ];
2275
2276 let object_size = 30;
2277 let mut last_modified = OffsetDateTime::UNIX_EPOCH;
2278 for key in keys {
2279 let mut obj = MockObject::constant(0xaa, object_size, ETag::for_tests());
2280 last_modified += Duration::days(1);
2281 obj.set_last_modified(last_modified);
2282 client.add_object(key, obj);
2283 }
2284
2285 for entry in entries {
2286 let lookup = superblock.lookup(FUSE_ROOT_INODE, entry.as_ref()).await;
2287 if cached {
2288 lookup.expect_err("negative entry should still be valid in the cache, so the new key should not have been looked up in S3");
2289 } else {
2290 lookup.expect("new object should have been looked up in S3");
2291 }
2292 }
2293 }
2294
2295 #[tokio::test]
2296 async fn test_getattr_with_inode_local_invalid_stat_force_revalidate() {
2297 let (superblock, client) = setup_test_superblock();
2298 let head_object_counter = client.new_counter(Operation::HeadObject);
2299 let inode = create_test_inode(&superblock, WriteStatus::LocalUnopened, true, None, true).unwrap();
2300
2301 let result = superblock.getattr_with_inode(inode.ino(), true).await;
2302 assert!(result.is_ok());
2303 let looked_up = result.unwrap();
2304 assert_eq!(looked_up.write_status, WriteStatus::LocalUnopened);
2305 assert_eq!(looked_up.inode.ino(), inode.ino());
2306 assert_eq!(looked_up.stat.etag, None);
2307 assert_eq!(head_object_counter.count(), 0);
2308 }
2309
2310 #[tokio::test]
2311 async fn test_getattr_with_inode_remote_valid_stat_no_force_revalidate() {
2312 let (superblock, client) = setup_test_superblock();
2313 let test_etag = ETag::for_tests();
2314 let head_object_counter = client.new_counter(Operation::HeadObject);
2315 let inode = create_test_inode(&superblock, WriteStatus::Remote, false, Some(test_etag.clone()), false).unwrap();
2316
2317 let result = superblock.getattr_with_inode(inode.ino(), false).await;
2318 assert!(result.is_ok());
2319 let looked_up = result.unwrap();
2320 assert_eq!(looked_up.write_status, WriteStatus::Remote);
2321 assert_eq!(looked_up.inode.ino(), inode.ino());
2322 assert_eq!(looked_up.stat.etag, Some(test_etag.into_inner().into_boxed_str()));
2323 assert_eq!(head_object_counter.count(), 0);
2324 }
2325
2326 #[tokio::test]
2327 async fn test_getattr_with_inode_remote_force_revalidate() {
2328 let (superblock, client) = setup_test_superblock();
2329 let test_etag = ETag::for_tests();
2330 let head_object_counter = client.new_counter(Operation::HeadObject);
2331 client.add_object("prefix/test inode", MockObject::constant(0xaa, 1024, test_etag.clone()));
2332 let inode = create_test_inode(&superblock, WriteStatus::Remote, false, Some(test_etag.clone()), false).unwrap();
2333
2334 let result = superblock.getattr_with_inode(inode.ino(), true).await;
2335 assert!(result.is_ok());
2336 let looked_up = result.unwrap();
2337 assert_eq!(looked_up.write_status, WriteStatus::Remote);
2338 assert_eq!(looked_up.inode.ino(), inode.ino());
2339 assert_eq!(looked_up.stat.etag, Some(test_etag.into_inner().into_boxed_str()));
2340 assert!(head_object_counter.count() > 0);
2341 }
2342
2343 #[tokio::test]
2344 async fn test_getattr_with_stale_inode_remote_no_force_revalidate() {
2345 let (superblock, client) = setup_test_superblock();
2346 let head_object_counter = client.new_counter(Operation::HeadObject);
2347 client.add_object("prefix/test inode", MockObject::constant(0xaa, 1024, ETag::for_tests()));
2349 let local_etag = ETag::from_str("Stale local etag").unwrap();
2350 let inode = create_test_inode(&superblock, WriteStatus::Remote, false, Some(local_etag), true).unwrap();
2351
2352 let result = superblock.getattr_with_inode(inode.ino(), false).await;
2353 if let Err(InodeError::StaleInode {
2354 remote_key,
2355 old_inode,
2356 new_inode,
2357 }) = result
2358 {
2359 assert_eq!(remote_key, "prefix/test inode");
2360 assert_ne!(old_inode.ino, new_inode.ino);
2361 assert!(head_object_counter.count() > 0);
2362 } else {
2363 panic!("Expected StaleInode error");
2364 }
2365 }
2366
2367 fn setup_test_superblock() -> (Superblock<Arc<MockClient>>, Arc<MockClient>) {
2368 let bucket = Bucket::new("test_bucket").unwrap();
2369 let prefix = "prefix/";
2370 let client = Arc::new(MockClient::config().bucket(bucket.to_string()).part_size(32).build());
2371 let prefix = Prefix::new(prefix).expect("valid prefix");
2372 let superblock = Superblock::new(
2373 client.clone(),
2374 S3Path::new(bucket, prefix.clone()),
2375 SuperblockConfig {
2376 cache_config: CacheConfig::new(TimeToLive::Duration(std::time::Duration::from_secs(24 * 60 * 60))),
2377 s3_personality: S3Personality::Standard,
2378 },
2379 );
2380 (superblock, client)
2381 }
2382
2383 fn create_test_inode(
2384 superblock: &Superblock<Arc<MockClient>>,
2385 write_status: WriteStatus,
2386 is_new_file: bool,
2387 etag: Option<ETag>,
2388 invalidate_stat: bool,
2389 ) -> Result<Inode, InodeError> {
2390 let parent_inode = superblock.inner.get(FUSE_ROOT_INODE).unwrap();
2391 let etag = etag.map(|etag| etag.into_inner().into_boxed_str());
2392
2393 let mut stat = InodeStat::for_file(
2394 1024,
2395 OffsetDateTime::now_utc(),
2396 etag,
2397 None,
2398 None,
2399 std::time::Duration::from_secs(24 * 60 * 60),
2400 );
2401 if invalidate_stat {
2402 stat.update_validity(std::time::Duration::from_secs(0));
2404 }
2405 let state = InodeState::new(&stat, InodeKind::File, write_status);
2406
2407 let mut parent_state = parent_inode.get_mut_inode_state()?;
2408
2409 let inode = superblock.inner.create_inode_locked(
2410 &parent_inode,
2411 &mut parent_state,
2412 ValidName::parse_str("test inode")?,
2413 InodeKind::File,
2414 state,
2415 is_new_file,
2416 )?;
2417 superblock.inner.remember(&inode);
2419 Ok(inode)
2420 }
2421
2422 #[test_case(""; "unprefixed")]
2423 #[test_case("test_prefix/"; "prefixed")]
2424 #[tokio::test]
2425 async fn test_readdir(prefix: &str) {
2426 let bucket = Bucket::new("test_bucket").unwrap();
2427 let client = Arc::new(
2428 MockClient::config()
2429 .bucket(bucket.to_string())
2430 .part_size(1024 * 1024)
2431 .build(),
2432 );
2433
2434 let keys = &[
2435 format!("{prefix}dir0/file0.txt"),
2436 format!("{prefix}dir0/sdir0/file0.txt"),
2437 format!("{prefix}dir0/sdir0/file1.txt"),
2438 format!("{prefix}dir0/sdir0/file2.txt"),
2439 format!("{prefix}dir0/sdir1/file0.txt"),
2440 format!("{prefix}dir0/sdir1/file1.txt"),
2441 format!("{prefix}dir1/sdir2/file0.txt"),
2442 format!("{prefix}dir1/sdir2/file1.txt"),
2443 format!("{prefix}dir1/sdir2/file2.txt"),
2444 format!("{prefix}dir1/sdir3/file0.txt"),
2445 format!("{prefix}dir1/sdir3/file1.txt"),
2446 ];
2447
2448 let last_modified = OffsetDateTime::UNIX_EPOCH + Duration::days(30);
2449 for key in keys {
2450 let mut obj = MockObject::constant(0xaa, 30, ETag::for_tests());
2451 obj.set_last_modified(last_modified);
2452 client.add_object(key, obj);
2453 }
2454
2455 let prefix = Prefix::new(prefix).expect("valid prefix");
2456 let ts = OffsetDateTime::now_utc();
2457 let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2458
2459 for _ in 0..2 {
2461 let root_entries = collect_dir_entries_with_info(&superblock, FUSE_ROOT_INODE, true, 100).await;
2463
2464 assert_eq!(root_entries.len(), 2);
2465 assert_eq!(
2466 root_entries.iter().map(|(_, name)| name).collect::<Vec<_>>(),
2467 &["dir0", "dir1"]
2468 );
2469
2470 let (dir0_entry, _) = &root_entries[0];
2471 let (dir1_entry, _) = &root_entries[1];
2472
2473 assert_eq!(dir0_entry.kind(), InodeKind::Directory);
2475 assert_eq!(dir0_entry.stat().mtime.date(), ts.date());
2476 assert_eq!(dir0_entry.stat().size, 0);
2477
2478 assert_eq!(dir1_entry.kind(), InodeKind::Directory);
2479 assert_eq!(dir1_entry.stat().mtime.date(), ts.date());
2480 assert_eq!(dir1_entry.stat().size, 0);
2481
2482 let dir0_ino = dir0_entry.ino();
2484 let dir0_entries = collect_dir_entries_with_info(&superblock, dir0_ino, true, 100).await;
2485
2486 assert_eq!(dir0_entries.len(), 3);
2487 assert_eq!(
2488 dir0_entries.iter().map(|(_, name)| name).collect::<Vec<_>>(),
2489 &["file0.txt", "sdir0", "sdir1"]
2490 );
2491
2492 let (file0_entry, _) = &dir0_entries[0];
2493 let (sdir0_entry, _) = &dir0_entries[1];
2494 let (sdir1_entry, _) = &dir0_entries[2];
2495
2496 assert_eq!(file0_entry.kind(), InodeKind::File);
2498 assert_eq!(file0_entry.stat().mtime.date(), last_modified.date());
2499 assert_eq!(file0_entry.stat().size, 30);
2500
2501 assert_eq!(sdir0_entry.kind(), InodeKind::Directory);
2502 assert_eq!(sdir0_entry.stat().mtime.date(), ts.date());
2503 assert_eq!(sdir0_entry.stat().size, 0);
2504
2505 assert_eq!(sdir1_entry.kind(), InodeKind::Directory);
2506 assert_eq!(sdir1_entry.stat().mtime.date(), ts.date());
2507 assert_eq!(sdir1_entry.stat().size, 0);
2508
2509 let sdir0_ino = sdir0_entry.ino();
2511 let sdir0_entries = collect_dir_entries_with_info(&superblock, sdir0_ino, true, 100).await;
2512
2513 assert_eq!(sdir0_entries.len(), 3);
2514 assert_eq!(
2515 sdir0_entries.iter().map(|(_, name)| name).collect::<Vec<_>>(),
2516 &["file0.txt", "file1.txt", "file2.txt"]
2517 );
2518
2519 for (entry, _) in &sdir0_entries {
2521 assert_eq!(entry.kind(), InodeKind::File);
2522 assert_eq!(entry.stat().mtime.date(), last_modified.date());
2523 assert_eq!(entry.stat().size, 30);
2524 }
2525 }
2526 }
2527
2528 async fn collect_dir_entries_with_info(
2533 superblock: &Superblock<Arc<MockClient>>,
2534 dir_ino: InodeNo,
2535 is_readdir_plus: bool,
2536 page_size: usize,
2537 ) -> Vec<(InodeInformation, OsString)> {
2538 let fh = superblock
2539 .new_readdir_handle_with_pagesize(dir_ino, page_size)
2540 .await
2541 .unwrap();
2542 let mut entries = Vec::new();
2543
2544 superblock
2545 .readdir(
2546 dir_ino,
2547 fh,
2548 0,
2549 is_readdir_plus,
2550 Box::new(|entry, name, _offset, _generation| {
2551 if name != OsStr::new(".") && name != OsStr::new("..") {
2553 entries.push((entry, name.to_owned()));
2554 }
2555 AddDirEntryResult::EntryAdded }),
2557 )
2558 .await
2559 .unwrap();
2560
2561 entries
2562 }
2563
2564 async fn collect_dir_entries(
2566 superblock: &Superblock<Arc<MockClient>>,
2567 dir_ino: InodeNo,
2568 is_readdir_plus: bool,
2569 page_size: usize,
2570 ) -> Vec<OsString> {
2571 collect_dir_entries_with_info(superblock, dir_ino, is_readdir_plus, page_size)
2572 .await
2573 .into_iter()
2574 .map(|(_, name)| name)
2575 .collect()
2576 }
2577
2578 #[test_case(""; "unprefixed")]
2579 #[test_case("test_prefix/"; "prefixed")]
2580 #[tokio::test]
2581 async fn test_readdir_no_remote_keys(prefix: &str) {
2582 let bucket = Bucket::new("test_bucket").unwrap();
2583 let client = Arc::new(MockClient::config().bucket(bucket.to_string()).part_size(32).build());
2584
2585 let prefix = Prefix::new(prefix).expect("valid prefix");
2586 let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2587
2588 let mut expected_list: Vec<OsString> = Vec::new();
2589
2590 for i in 0..5 {
2592 let filename = format!("file{i}.txt");
2593 let new_inode = superblock
2594 .create(FUSE_ROOT_INODE, filename.as_ref(), InodeKind::File)
2595 .await
2596 .unwrap();
2597 superblock
2598 .open_handle(new_inode.ino(), 0, &Default::default(), OpenFlags::O_WRONLY)
2599 .await
2600 .expect("should be able to start writing");
2601 expected_list.push(filename.into());
2602 }
2603
2604 for _ in 0..2 {
2606 let entries = collect_dir_entries(&superblock, FUSE_ROOT_INODE, false, 2).await;
2607 assert_eq!(entries, expected_list);
2608 }
2609 }
2610
2611 #[test_case(""; "unprefixed")]
2612 #[test_case("test_prefix/"; "prefixed")]
2613 #[tokio::test]
2614 async fn test_readdir_local_keys_after_remote_keys(prefix: &str) {
2615 let bucket = Bucket::new("test_bucket").unwrap();
2616 let client = Arc::new(
2617 MockClient::config()
2618 .bucket(bucket.to_string())
2619 .part_size(1024 * 1024)
2620 .build(),
2621 );
2622
2623 let prefix = Prefix::new(prefix).expect("valid prefix");
2624 let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2625
2626 let mut expected_list: Vec<OsString> = Vec::new();
2627
2628 let remote_filenames = ["file0.txt", "file1.txt", "file2.txt"];
2629
2630 let last_modified = OffsetDateTime::UNIX_EPOCH + Duration::days(30);
2631 for filename in remote_filenames {
2632 let mut obj = MockObject::constant(0xaa, 30, ETag::for_tests());
2633 obj.set_last_modified(last_modified);
2634 let key = format!("{prefix}{filename}");
2635 client.add_object(&key, obj);
2636 expected_list.push(filename.to_owned().into());
2637 }
2638
2639 for i in 0..5 {
2641 let filename = format!("newfile{i}.txt");
2642 let new_inode = superblock
2643 .create(FUSE_ROOT_INODE, filename.as_ref(), InodeKind::File)
2644 .await
2645 .unwrap();
2646 superblock
2647 .open_handle(new_inode.ino(), 0, &Default::default(), OpenFlags::O_WRONLY)
2648 .await
2649 .expect("should be able to start writing");
2650 expected_list.push(filename.to_owned().into());
2651 }
2652
2653 for _ in 0..2 {
2655 let entries = collect_dir_entries(&superblock, FUSE_ROOT_INODE, true, 10).await;
2656 assert_eq!(expected_list, entries);
2657 }
2658 }
2659
2660 #[test_case(""; "unprefixed")]
2661 #[test_case("test_prefix/"; "prefixed")]
2662 #[tokio::test]
2663 async fn test_create_local_dir(prefix: &str) {
2664 let bucket = Bucket::new("test_bucket").unwrap();
2665 let client = Arc::new(
2666 MockClient::config()
2667 .bucket(bucket.to_string())
2668 .part_size(1024 * 1024)
2669 .build(),
2670 );
2671 let prefix = Prefix::new(prefix).expect("valid prefix");
2672 let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2673
2674 let dirname = "local_dir";
2676 superblock
2677 .create(FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory)
2678 .await
2679 .unwrap();
2680
2681 let lookedup = superblock
2682 .lookup(FUSE_ROOT_INODE, dirname.as_ref())
2683 .await
2684 .expect("lookup should succeed on local dirs");
2685 assert_eq!(
2686 superblock.get_write_status(lookedup.ino()),
2687 Some(WriteStatus::LocalUnopened)
2688 );
2689
2690 let entries = collect_dir_entries(&superblock, FUSE_ROOT_INODE, true, 2).await;
2691 assert_eq!(entries.iter().collect::<Vec<_>>(), vec![dirname]);
2692
2693 let prefix = format!("{prefix}{dirname}");
2695 assert!(!client.contains_prefix(&prefix));
2696 }
2697
2698 #[test_case(""; "unprefixed")]
2699 #[test_case("test_prefix/"; "prefixed")]
2700 #[tokio::test]
2701 async fn test_readdir_lookup_after_rmdir(prefix: &str) {
2702 let bucket = Bucket::new("test_bucket").unwrap();
2703 let client = Arc::new(
2704 MockClient::config()
2705 .bucket(bucket.to_string())
2706 .part_size(1024 * 1024)
2707 .build(),
2708 );
2709 let prefix = Prefix::new(prefix).expect("valid prefix");
2710 let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2711
2712 let dirname = "local_dir";
2714 let lookedup = superblock
2715 .create(FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory)
2716 .await
2717 .expect("Should be able to create directory");
2718
2719 let fh = superblock
2720 .new_readdir_handle_with_pagesize(lookedup.ino(), 1024)
2721 .await
2722 .expect("should get new readhhandle");
2723
2724 superblock
2725 .rmdir(FUSE_ROOT_INODE, dirname.as_ref())
2726 .await
2727 .expect("rmdir on empty local directory should succeed");
2728
2729 superblock
2730 .lookup(FUSE_ROOT_INODE, dirname.as_ref())
2731 .await
2732 .expect_err("should not do lookup on removed directory");
2733
2734 superblock
2735 .readdir(
2736 lookedup.ino(),
2737 fh,
2738 0,
2739 true,
2740 Box::new(|_entry, _name, _offset, _generation| {
2741 AddDirEntryResult::EntryAdded }),
2743 )
2744 .await
2745 .expect_err("should not do readdir on removed directory");
2746
2747 superblock
2748 .getattr(lookedup.ino(), false)
2749 .await
2750 .expect_err("should not do getattr on removed directory");
2751 }
2752
2753 #[test_case("", true; "unprefixed ordered")]
2754 #[test_case("test_prefix/", true; "prefixed ordered")]
2755 #[test_case("", false; "unprefixed unordered")]
2756 #[test_case("test_prefix/", false; "prefixed unordered")]
2757 #[tokio::test]
2758 async fn test_readdir_unordered(prefix: &str, ordered: bool) {
2759 let bucket = Bucket::new("test_bucket").unwrap();
2760 let mut config = MockClient::config().bucket(bucket.to_string()).part_size(1024 * 1024);
2761 if !ordered {
2762 config = config.unordered_list_seed(Some(123456));
2763 }
2764 let client = Arc::new(config.build());
2765
2766 let prefix = Prefix::new(prefix).expect("valid prefix");
2767 let s3_personality = if ordered {
2768 S3Personality::Standard
2769 } else {
2770 S3Personality::ExpressOneZone
2771 };
2772 let superblock = Superblock::new(
2773 client.clone(),
2774 S3Path::new(bucket, prefix.clone()),
2775 SuperblockConfig {
2776 s3_personality,
2777 ..Default::default()
2778 },
2779 );
2780
2781 for filename in ["aaa", "dir3", "dm3", "file3", "zzz"] {
2798 let new_inode = superblock
2799 .create(FUSE_ROOT_INODE, filename.as_ref(), InodeKind::File)
2800 .await
2801 .unwrap();
2802 superblock
2803 .open_handle(new_inode.ino(), 0, &Default::default(), OpenFlags::O_WRONLY)
2804 .await
2805 .expect("should be able to start writing");
2806 }
2807
2808 for dirname in ["dir2", "dm2", "file2"] {
2810 let _new_inode = superblock
2811 .create(FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory)
2812 .await
2813 .unwrap();
2814 }
2815
2816 let keys = &[
2818 format!("{prefix}dir1/file.txt"),
2819 format!("{prefix}dir2/file.txt"),
2820 format!("{prefix}dir3/file.txt"),
2821 format!("{prefix}dir4/file.txt"),
2822 format!("{prefix}dir4"),
2823 format!("{prefix}dm1/"),
2824 format!("{prefix}dm2/"),
2825 format!("{prefix}dm3/"),
2826 format!("{prefix}dm4/"),
2827 format!("{prefix}dm4"),
2828 format!("{prefix}file1"),
2829 format!("{prefix}file2"),
2830 format!("{prefix}file3"),
2831 ];
2832
2833 let last_modified = OffsetDateTime::UNIX_EPOCH + Duration::days(30);
2834 for key in keys {
2835 let mut obj = MockObject::constant(0xaa, 30, ETag::for_tests());
2836 obj.set_last_modified(last_modified);
2837 client.add_object(key, obj);
2838 }
2839
2840 let readdir_entries = collect_dir_entries_with_info(&superblock, FUSE_ROOT_INODE, true, 20).await;
2842 let entries: Vec<(&str, InodeKind)> = readdir_entries
2843 .iter()
2844 .map(|(info, name)| (name.to_str().unwrap(), info.kind()))
2845 .collect();
2846
2847 let expected_entries = [
2848 ("aaa", InodeKind::File),
2849 ("dir1", InodeKind::Directory),
2850 ("dir2", InodeKind::Directory),
2851 ("dir3", InodeKind::Directory),
2852 ("dir4", InodeKind::Directory),
2853 ("dm1", InodeKind::Directory),
2854 ("dm2", InodeKind::Directory),
2855 ("dm3", InodeKind::Directory),
2856 ("dm4", InodeKind::Directory),
2857 ("file1", InodeKind::File),
2858 ("file2", InodeKind::Directory), ("file3", InodeKind::File),
2860 ("zzz", InodeKind::File),
2861 ];
2862
2863 for entry in expected_entries {
2864 assert!(entries.contains(&entry), "missing entry {entry:?}");
2865 }
2866
2867 if ordered {
2868 assert_eq!(entries, expected_entries);
2869 }
2870 }
2871
2872 #[test_case(""; "unprefixed")]
2873 #[test_case("test_prefix/"; "prefixed")]
2874 #[tokio::test]
2875 async fn test_rmdir_delete_status(prefix: &str) {
2876 let bucket = Bucket::new("test_bucket").unwrap();
2877 let client = Arc::new(
2878 MockClient::config()
2879 .bucket(bucket.to_string())
2880 .part_size(1024 * 1024)
2881 .build(),
2882 );
2883 let prefix = Prefix::new(prefix).expect("valid prefix");
2884 let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2885
2886 let dirname = "local_dir";
2888 let lookedup = superblock
2889 .create(FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory)
2890 .await
2891 .expect("Should be able to create directory");
2892
2893 superblock
2894 .rmdir(FUSE_ROOT_INODE, dirname.as_ref())
2895 .await
2896 .expect("rmdir on empty local directory should succeed");
2897
2898 let parent = superblock.inner.get(FUSE_ROOT_INODE).unwrap();
2899 let parent_state = parent
2900 .get_inode_state()
2901 .expect("should get parent state with read lock");
2902 match &parent_state.kind_data {
2903 InodeKindData::File {} => unreachable!("Parent can only be a Directory"),
2904 InodeKindData::Directory {
2905 children,
2906 writing_children,
2907 ..
2908 } => {
2909 assert!(writing_children.get(&lookedup.ino()).is_none());
2910 assert!(
2911 children
2912 .get(lookedup.clone().try_into_s3_location().unwrap().name())
2913 .is_none()
2914 );
2915 }
2916 }
2917
2918 assert_eq!(
2919 superblock.get_write_status(lookedup.ino()),
2920 None,
2921 "Should not be able to get llokup count of deleted Inode"
2922 );
2923 }
2924
2925 #[test_case(""; "unprefixed")]
2926 #[test_case("test_prefix/"; "prefixed")]
2927 #[tokio::test]
2928 async fn test_parent_readdir_after_rmdir(prefix: &str) {
2929 let bucket = Bucket::new("test_bucket").unwrap();
2930 let client = Arc::new(
2931 MockClient::config()
2932 .bucket(bucket.to_string())
2933 .part_size(1024 * 1024)
2934 .build(),
2935 );
2936 let prefix = Prefix::new(prefix).expect("valid prefix");
2937 let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2938
2939 let dirname = "local_dir";
2941 superblock
2942 .create(FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory)
2943 .await
2944 .expect("Should be able to create directory");
2945
2946 let dirname_to_stay = "staying_local_dir";
2947 superblock
2948 .create(FUSE_ROOT_INODE, dirname_to_stay.as_ref(), InodeKind::Directory)
2949 .await
2950 .expect("Should be able to create directory");
2951
2952 superblock
2953 .rmdir(FUSE_ROOT_INODE, dirname.as_ref())
2954 .await
2955 .expect("rmdir on empty local directory should succeed");
2956
2957 let entries = collect_dir_entries(&superblock, FUSE_ROOT_INODE, false, 2).await;
2960 assert_eq!(entries.iter().collect::<Vec<_>>(), &[dirname_to_stay]);
2961 }
2962
2963 #[test_case(""; "unprefixed")]
2964 #[test_case("test_prefix/"; "prefixed")]
2965 #[tokio::test]
2966 async fn test_lookup_after_unlink(prefix: &str) {
2967 let bucket = Bucket::new("test_bucket").unwrap();
2968 let client = Arc::new(
2969 MockClient::config()
2970 .bucket(bucket.to_string())
2971 .part_size(1024 * 1024)
2972 .build(),
2973 );
2974 let prefix = Prefix::new(prefix).expect("valid prefix");
2975 let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2976
2977 let file_name = "file.txt";
2978 let file_key = format!("{prefix}{file_name}");
2979 client.add_object(file_key.as_ref(), MockObject::constant(0xaa, 30, ETag::for_tests()));
2980 let parent_ino = FUSE_ROOT_INODE;
2981
2982 superblock
2983 .lookup(parent_ino, file_name.as_ref())
2984 .await
2985 .expect("file should exist");
2986
2987 superblock
2988 .unlink(parent_ino, file_name.as_ref())
2989 .await
2990 .expect("file delete should succeed as it exists");
2991
2992 let err: i32 = superblock
2993 .lookup(parent_ino, file_name.as_ref())
2994 .await
2995 .expect_err("lookup should no longer find deleted file")
2996 .to_errno();
2997 assert_eq!(libc::ENOENT, err, "lookup should return no existing entry error");
2998 }
2999
3000 #[tokio::test]
3001 async fn test_finish_writing_convert_parent_local_dirs_to_remote() {
3002 let bucket = Bucket::new("test_bucket").unwrap();
3003 let client = Arc::new(
3004 MockClient::config()
3005 .bucket(bucket.to_string())
3006 .part_size(1024 * 1024)
3007 .build(),
3008 );
3009 let superblock = Superblock::new(
3010 client.clone(),
3011 S3Path::new(bucket, Default::default()),
3012 Default::default(),
3013 );
3014
3015 let nested_dirs = (0..5).map(|i| format!("level{i}")).collect::<Vec<_>>();
3016 let leaf_dir_ino = {
3017 let mut parent_dir_ino = FUSE_ROOT_INODE;
3018 for dirname in &nested_dirs {
3019 let dir_lookedup = superblock
3020 .create(parent_dir_ino, dirname.as_ref(), InodeKind::Directory)
3021 .await
3022 .unwrap();
3023
3024 assert_eq!(
3025 superblock
3026 .get_write_status(dir_lookedup.ino())
3027 .expect("should get write status with read lock"),
3028 WriteStatus::LocalUnopened
3029 );
3030
3031 parent_dir_ino = dir_lookedup.ino();
3032 }
3033 parent_dir_ino
3034 };
3035
3036 let filename = "newfile.txt";
3038 let new_inode = superblock
3039 .create(leaf_dir_ino, filename.as_ref(), InodeKind::File)
3040 .await
3041 .unwrap();
3042
3043 superblock
3044 .open_handle(new_inode.ino(), 0, &Default::default(), OpenFlags::O_WRONLY)
3045 .await
3046 .expect("should be able to start writing");
3047
3048 superblock.finish_writing(new_inode.ino(), None, 0).await.unwrap();
3051
3052 let dirname = nested_dirs.first().unwrap();
3054 let lookedup = superblock.lookup(FUSE_ROOT_INODE, dirname.as_ref()).await;
3055 assert!(matches!(lookedup, Err(InodeError::FileDoesNotExist(_, _))));
3056 }
3057
3058 #[tokio::test]
3059 async fn test_inode_reuse() {
3060 let bucket = Bucket::new("test_bucket").unwrap();
3061 let client = Arc::new(
3062 MockClient::config()
3063 .bucket(bucket.to_string())
3064 .part_size(1024 * 1024)
3065 .build(),
3066 );
3067 client.add_object("dir1/file1.txt", MockObject::constant(0xaa, 30, ETag::for_tests()));
3068
3069 let superblock = Superblock::new(
3070 client.clone(),
3071 S3Path::new(bucket, Default::default()),
3072 Default::default(),
3073 );
3074
3075 for _ in 0..2 {
3076 let dir1_1 = superblock.lookup(FUSE_ROOT_INODE, "dir1".as_ref()).await.unwrap();
3077 let dir1_2 = superblock.lookup(FUSE_ROOT_INODE, "dir1".as_ref()).await.unwrap();
3078 assert_eq!(dir1_1.ino(), dir1_2.ino());
3079
3080 let file1_1 = superblock.lookup(dir1_1.ino(), "file1.txt".as_ref()).await.unwrap();
3081 let file1_2 = superblock.lookup(dir1_1.ino(), "file1.txt".as_ref()).await.unwrap();
3082 assert_eq!(file1_1.ino(), file1_2.ino());
3083 }
3084 }
3085
3086 #[test_case(""; "no subdirectory")]
3087 #[test_case("subdir/"; "with subdirectory")]
3088 #[tokio::test]
3089 async fn test_lookup_directory_overlap(subdir: &str) {
3090 let bucket = Bucket::new("test_bucket").unwrap();
3091 let client = Arc::new(
3092 MockClient::config()
3093 .bucket(bucket.to_string())
3094 .part_size(1024 * 1024)
3095 .build(),
3096 );
3097 client.add_object(
3103 &format!("dir/{subdir}file1.txt"),
3104 MockObject::constant(0xaa, 30, ETag::for_tests()),
3105 );
3106 client.add_object(
3107 &format!("dir-1/{subdir}file1.txt"),
3108 MockObject::constant(0xaa, 30, ETag::for_tests()),
3109 );
3110
3111 let superblock = Superblock::new(
3112 client.clone(),
3113 S3Path::new(bucket, Default::default()),
3114 Default::default(),
3115 );
3116
3117 let entries = collect_dir_entries(&superblock, FUSE_ROOT_INODE, false, 2).await;
3118 assert_eq!(entries, &["dir", "dir-1"]);
3119
3120 let dir = superblock.lookup(FUSE_ROOT_INODE, "dir".as_ref()).await.unwrap();
3121 assert_eq!(
3122 dir.s3_location().expect("should have location").full_key().as_ref(),
3123 "dir/"
3124 );
3125 }
3126
3127 #[tokio::test]
3128 async fn test_invalid_names() {
3129 let bucket = Bucket::new("test_bucket").unwrap();
3130 let client = Arc::new(
3131 MockClient::config()
3132 .bucket(bucket.to_string())
3133 .part_size(1024 * 1024)
3134 .build(),
3135 );
3136
3137 client.add_object(
3140 "dir1/",
3141 MockObject::constant(0xaa, 30, ETag::from_str("test_etag_1").unwrap()),
3142 );
3143 client.add_object(
3144 "dir1//",
3145 MockObject::constant(0xaa, 30, ETag::from_str("test_etag_2").unwrap()),
3146 );
3147 client.add_object(
3148 "dir1/a",
3149 MockObject::constant(0xaa, 30, ETag::from_str("test_etag_3").unwrap()),
3150 );
3151 client.add_object(
3152 "dir1/.",
3153 MockObject::constant(0xaa, 30, ETag::from_str("test_etag_4").unwrap()),
3154 );
3155 client.add_object(
3156 "dir1/./a",
3157 MockObject::constant(0xaa, 30, ETag::from_str("test_etag_5").unwrap()),
3158 );
3159
3160 let superblock = Superblock::new(
3161 client.clone(),
3162 S3Path::new(bucket, Default::default()),
3163 Default::default(),
3164 );
3165 let entries = collect_dir_entries_with_info(&superblock, FUSE_ROOT_INODE, true, 2).await;
3166 assert_eq!(entries.iter().map(|(_info, name)| name).collect::<Vec<_>>(), &["dir1"]);
3167
3168 let dir1_ino = entries[0].0.ino();
3169 let entries = collect_dir_entries(&superblock, dir1_ino, false, 2).await;
3170 assert_eq!(entries, &["a"]);
3171
3172 for key in ["/", "."] {
3174 let lookup = superblock.lookup(dir1_ino, key.as_ref()).await;
3175 assert!(matches!(lookup, Err(InodeError::InvalidFileName(_))));
3176 }
3177 }
3178
3179 #[test_case(""; "unprefixed")]
3180 #[test_case("test_prefix/"; "prefixed")]
3181 #[tokio::test]
3182 async fn test_setattr(prefix: &str) {
3183 let bucket = Bucket::new("test_bucket").unwrap();
3184 let client = Arc::new(
3185 MockClient::config()
3186 .bucket(bucket.to_string())
3187 .part_size(1024 * 1024)
3188 .build(),
3189 );
3190 let prefix = Prefix::new(prefix).expect("valid prefix");
3191 let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
3192
3193 let filename = "newfile.txt";
3195 let new_inode = superblock
3196 .create(FUSE_ROOT_INODE, filename.as_ref(), InodeKind::File)
3197 .await
3198 .unwrap();
3199
3200 superblock
3201 .open_handle(new_inode.ino(), 0, &Default::default(), OpenFlags::O_WRONLY)
3202 .await
3203 .expect("should be able to start writing");
3204
3205 let atime = OffsetDateTime::UNIX_EPOCH + Duration::days(90);
3206 let mtime = OffsetDateTime::UNIX_EPOCH + Duration::days(60);
3207
3208 let lookup = superblock
3210 .setattr(new_inode.ino(), Some(atime), Some(mtime))
3211 .await
3212 .expect("setattr should be successful");
3213 let stat = lookup.stat();
3214 assert_eq!(stat.atime, atime);
3215 assert_eq!(stat.mtime, mtime);
3216
3217 let lookup = superblock
3218 .getattr(new_inode.ino(), false)
3219 .await
3220 .expect("getattr should be successful");
3221 let stat = lookup.stat();
3222 assert_eq!(stat.atime, atime);
3223 assert_eq!(stat.mtime, mtime);
3224
3225 superblock
3227 .finish_writing(new_inode.ino(), Some(ETag::for_tests()), 0)
3228 .await
3229 .unwrap();
3230
3231 let result = superblock.setattr(new_inode.ino(), Some(atime), Some(mtime)).await;
3233 assert!(matches!(result, Err(InodeError::SetAttrNotPermittedOnRemoteInode(_))));
3234 }
3235
3236 #[test]
3237 fn test_inodestat_constructors() {
3238 let ts = OffsetDateTime::UNIX_EPOCH + Duration::days(90);
3239 let file_inodestat = InodeStat::for_file(128, ts, None, None, None, Default::default());
3240 assert_eq!(file_inodestat.size, 128);
3241 assert_eq!(file_inodestat.atime, ts);
3242 assert_eq!(file_inodestat.ctime, ts);
3243 assert_eq!(file_inodestat.mtime, ts);
3244
3245 let ts = OffsetDateTime::UNIX_EPOCH + Duration::days(180);
3246 let file_inodestat = InodeStat::for_directory(ts, Default::default());
3247 assert_eq!(file_inodestat.size, 0);
3248 assert_eq!(file_inodestat.atime, ts);
3249 assert_eq!(file_inodestat.ctime, ts);
3250 assert_eq!(file_inodestat.mtime, ts);
3251 }
3252
3253 #[test]
3254 fn test_rename_cache_positive() {
3255 let cache = RenameCache::new();
3256 assert!(
3257 cache.should_try_rename(),
3258 "Without a registered failure, should try a rename"
3259 );
3260 cache.cache_success();
3261 assert!(cache.should_try_rename(), "After a success, should still try renames");
3262 cache.cache_failure();
3264 assert!(cache.should_try_rename(), "Failure should not change cache state");
3265 }
3266
3267 #[test]
3268 fn test_rename_cache_negative() {
3269 let cache = RenameCache::new();
3270 assert!(
3271 cache.should_try_rename(),
3272 "Without a registered failure, should try a rename"
3273 );
3274 cache.cache_failure();
3275 assert!(!cache.should_try_rename(), "After a failure, should not try renames");
3276 cache.cache_success();
3278 assert!(
3279 !cache.should_try_rename(),
3280 "Success after failure should not modify cache state"
3281 );
3282 }
3283}