mountpoint_s3_fs/
superblock.rs

1//! [Inode] management
2//!
3//! # Superblock
4//!
5//! The [Superblock] is the entry point for a file system. It manages a set of [Inode]s and their
6//! caches. The [Superblock] is a partial view of the actual file system, which is stored remotely
7//! in an ObjectClient, and the [Superblock] coordinates sending requests to the ObjectClient to (a)
8//! discover [Inode]s it doesn't already know about and (b) refresh the stats of [Inode]s when they
9//! are expired.
10//!
11//! # [Inode] management and assumptions
12//!
13//! We allocate a new [Inode] the first time we find out about a new file/directory. Each [Inode]
14//! has an [InodeNo], and knows its parent [InodeNo], its own name, and its kind (currently only
15//! file or directory). [Inode]s always refer to a unique object; if the object changes (either
16//! because the object itself was mutated, or it changed types between file and directory), the
17//! inode must be recreated.
18//!
19//! In addition to this "permanent" state, an [Inode] also has some cached state called [InodeStat].
20//! Cached state is subject to an expiry time, and must be refreshed before use if it has expired.
21//! Some cached state is dependent on the inode kind; that state is hidden behind a [InodeStatKind]
22//! enum.
23
24use std::collections::{HashMap, HashSet};
25use std::default::Default;
26use std::ffi::OsStr;
27use std::fmt::Debug;
28use std::sync::OnceLock;
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::time::Duration;
31
32use async_trait::async_trait;
33use futures::{FutureExt, select_biased};
34use mountpoint_s3_client::ObjectClient;
35use mountpoint_s3_client::error::{HeadObjectError, ObjectClientError, RenameObjectError};
36use mountpoint_s3_client::types::{
37    ETag, HeadObjectParams, HeadObjectResult, RenameObjectParams, RenamePreconditionTypes,
38};
39use thiserror::Error;
40use time::OffsetDateTime;
41use tracing::{debug, error, trace, warn};
42
43use crate::fs::{CacheConfig, FUSE_ROOT_INODE, OpenFlags};
44use crate::logging;
45use crate::metablock::{
46    AddDirEntry, AddDirEntryResult, InodeError, InodeInformation, InodeKind, InodeNo, InodeStat, Lookup, Metablock,
47    NewHandle, PendingUploadHook, ReadWriteMode, S3Location, ValidKey, ValidName, WriteMode,
48};
49use crate::s3::{S3Path, S3Personality};
50use crate::sync::{Arc, RwLock};
51
52mod handles_map;
53use handles_map::{InodeHandleMap, SetWriterError};
54
55mod inode;
56pub use inode::{Inode, InodeKindData, InodeLockedForWriting, InodeState, WriteStatus};
57
58mod negative_cache;
59use negative_cache::NegativeCache;
60mod readdir;
61pub use readdir::ReaddirHandle;
62use readdir::{DirHandle, DirectoryEntryReaddir};
63
64/// Superblock is the root object of the file system
65#[derive(Debug)]
66pub struct Superblock<OC: ObjectClient + Send + Sync> {
67    inner: Arc<SuperblockInner<OC>>,
68}
69
70/// A [RenameCache] is used by the superblock to keep track
71/// of whether the S3 backend supports `RenameObject`.
72/// The state can only transition from trying renames to caching
73/// either a success or failure once.
74/// Having cached a failure will turn off all renames in the future.
75#[derive(Debug)]
76struct RenameCache {
77    /// Cached value of if `RenameObject` was supported.
78    rename_supported: std::sync::OnceLock<bool>,
79}
80
81impl RenameCache {
82    fn new() -> Self {
83        RenameCache {
84            rename_supported: OnceLock::new(),
85        }
86    }
87
88    /// Indicates if a `RenameObject` should be attempted.
89    ///
90    /// Returns [false] if we cached a failure.
91    fn should_try_rename(&self) -> bool {
92        self.rename_supported.get().is_none_or(|&cached| cached)
93    }
94
95    /// Caches a failure, if [rename_supported] is uninitialized.
96    fn cache_failure(&self) {
97        if let Ok(()) = self.rename_supported.set(false) {
98            debug!("cached rename failure for the first time, disabling future renames");
99        }
100    }
101
102    /// Caches a success, if [rename_supported] is uninitialized.
103    fn cache_success(&self) {
104        let _ = self.rename_supported.set(true);
105    }
106}
107
108#[derive(Debug)]
109struct SuperblockInner<OC: ObjectClient + Send + Sync> {
110    s3_path: Arc<S3Path>,
111    inodes: RwLock<InodeMap>,
112    open_handles: InodeHandleMap,
113    negative_cache: NegativeCache,
114    cached_rename_support: RenameCache,
115    next_ino: AtomicU64,
116    mount_time: OffsetDateTime,
117    config: SuperblockConfig,
118    client: OC,
119    dir_handles: RwLock<HashMap<u64, Arc<DirHandle>>>,
120    next_dir_handle_id: AtomicU64,
121}
122
123/// Configuration for superblock operations
124#[derive(Debug, Clone, Default)]
125pub struct SuperblockConfig {
126    pub cache_config: CacheConfig,
127    pub s3_personality: S3Personality,
128}
129
130/// A manager for automatically setting and removing the `PendingRename` write status on an inode.
131pub struct PendingRenameGuard<'a> {
132    pub inode: Option<&'a Inode>,
133}
134
135impl<'a> PendingRenameGuard<'a> {
136    /// Take an inode and, if (and only if) currently in Remote state, transition into `PendingRename`.
137    fn try_transition(inode: &'a Inode) -> Result<Self, InodeError> {
138        let mut locked = inode.get_mut_inode_state()?;
139        match locked.write_status {
140            WriteStatus::LocalUnopened | WriteStatus::LocalOpenForWriting | WriteStatus::PendingRename => {
141                return Err(InodeError::RenameNotPermittedWhileWriting(inode.err()));
142            }
143            WriteStatus::Remote => {} // All OK.
144        }
145
146        locked.write_status = WriteStatus::PendingRename;
147        drop(locked);
148        Ok(PendingRenameGuard { inode: Some(inode) })
149    }
150
151    /// Used to mark that this inode will be replaced by rename and we do not need to reset the write status
152    fn confirm(&mut self) {
153        self.inode = None;
154    }
155}
156
157/// On drop, we make sure to transition back to Remote
158impl Drop for PendingRenameGuard<'_> {
159    fn drop(&mut self) {
160        if let Some(inode) = self.inode {
161            let mut inode_locked = inode.get_mut_inode_state().unwrap();
162            if inode_locked.write_status == WriteStatus::PendingRename {
163                inode_locked.write_status = WriteStatus::Remote;
164            }
165        }
166    }
167}
168/// A manager for automatically locking two inodes in filesysyem locking order.
169pub struct RenameLockGuard<'a> {
170    src_parent_lock: InodeLockedForWriting<'a>,
171    dst_parent_lock: Option<InodeLockedForWriting<'a>>,
172}
173
174/// A manager for automatically locking two inodes in filesysyem locking order.
175impl<'a> RenameLockGuard<'a> {
176    fn new<OC: ObjectClient + Send + Sync>(
177        source_parent: &'a Inode,
178        dest_parent: &'a Inode,
179        superblock_inner: &'a SuperblockInner<OC>,
180    ) -> Result<Self, InodeError> {
181        let src_ino = source_parent.ino();
182        let dst_ino = dest_parent.ino();
183
184        if src_ino == dst_ino {
185            return Ok(RenameLockGuard {
186                src_parent_lock: source_parent.get_mut_inode_state()?,
187                dst_parent_lock: None,
188            });
189        }
190
191        let dst_parent_lock: Option<InodeLockedForWriting<'a>>;
192        let src_parent_lock: InodeLockedForWriting<'a>;
193        // Take read lock
194        let ancestor_check = Self::dst_is_ancestor(&superblock_inner.inodes.read().unwrap(), source_parent, dst_ino);
195        if ancestor_check || src_ino > dst_ino {
196            dst_parent_lock = Some(dest_parent.get_mut_inode_state()?);
197            src_parent_lock = source_parent.get_mut_inode_state()?;
198        } else {
199            src_parent_lock = source_parent.get_mut_inode_state()?;
200            dst_parent_lock = Some(dest_parent.get_mut_inode_state()?);
201        }
202        Ok(RenameLockGuard {
203            src_parent_lock,
204            dst_parent_lock,
205        })
206    }
207
208    fn dst_is_ancestor(inodes: &InodeMap, start: &Inode, dst: InodeNo) -> bool {
209        std::iter::successors(Some(start), |current| inodes.get_inode(&current.parent()))
210            .take_while(|&current| current.ino() != FUSE_ROOT_INODE)
211            .any(|current| current.ino() == dst)
212    }
213
214    fn source_parent_mut(&mut self) -> &mut InodeState {
215        &mut self.src_parent_lock
216    }
217
218    fn destination_parent_mut(&mut self) -> &mut InodeState {
219        self.dst_parent_lock.as_mut().unwrap_or(&mut self.src_parent_lock)
220    }
221}
222
223impl<OC: ObjectClient + Send + Sync> Superblock<OC> {
224    /// Create a new Superblock that targets the given bucket/prefix
225    pub fn new(client: OC, s3_path: S3Path, config: SuperblockConfig) -> Self {
226        let mount_time = OffsetDateTime::now_utc();
227        let root = Inode::new_root(&s3_path.prefix, mount_time);
228
229        let mut inodes = InodeMap::default();
230        inodes.insert(root.ino(), root, 1);
231
232        let negative_cache = NegativeCache::new(
233            config.cache_config.negative_cache_size,
234            config.cache_config.negative_cache_ttl,
235        );
236
237        let inner = SuperblockInner {
238            s3_path: Arc::new(s3_path),
239            inodes: RwLock::new(inodes),
240            open_handles: Default::default(),
241            negative_cache,
242            next_ino: AtomicU64::new(2),
243            mount_time,
244            config,
245            cached_rename_support: RenameCache::new(),
246            client,
247            next_dir_handle_id: AtomicU64::new(1),
248            dir_handles: Default::default(),
249        };
250        Self { inner: Arc::new(inner) }
251    }
252
253    #[cfg(test)]
254    fn get_lookup_count(&self, ino: InodeNo) -> u64 {
255        let inode_read = self.inner.inodes.read().unwrap();
256        inode_read.get_count(&ino).unwrap_or(0)
257    }
258
259    #[cfg(test)]
260    fn get_write_status(&self, ino: InodeNo) -> Option<WriteStatus> {
261        let inode_read = self.inner.inodes.read().unwrap();
262
263        inode_read
264            .get_inode(&ino)
265            .and_then(|inode| inode.get_inode_state().ok())
266            .map(|state| state.write_status)
267    }
268
269    /// Lookup the inode by ino number and name, in the filesystem's local state or the remote state
270    /// in S3 if needed.
271    /// Note that forced-revalidation is only applicable if the inode is remote, otherwise the local
272    /// state is returned.
273    async fn getattr_with_inode(
274        &self,
275        ino: InodeNo,
276        force_revalidate_if_remote: bool,
277    ) -> Result<LookedUpInode, InodeError> {
278        let inode = self.inner.get(ino)?;
279        logging::record_name(inode.name());
280
281        {
282            let mut sync = inode.get_mut_inode_state()?;
283            let is_remote = sync.write_status == WriteStatus::Remote;
284
285            if !is_remote || !force_revalidate_if_remote {
286                // If the inode is local (open/unopened), extend its stat's validity before returning
287                if !is_remote {
288                    let validity = match inode.kind() {
289                        InodeKind::File => self.inner.config.cache_config.file_ttl,
290                        InodeKind::Directory => self.inner.config.cache_config.dir_ttl,
291                    };
292                    sync.stat.update_validity(validity);
293                }
294                if sync.stat.is_valid() {
295                    let stat = sync.stat.clone();
296                    let write_status = sync.write_status;
297                    drop(sync);
298                    return Ok(LookedUpInode {
299                        inode,
300                        stat,
301                        path: self.inner.s3_path.clone(),
302                        write_status,
303                    });
304                }
305            }
306        };
307
308        let lookup = self
309            .inner
310            .lookup_by_name(inode.parent(), inode.name().as_ref(), false)
311            .await?;
312        if lookup.inode.ino() != ino {
313            Err(InodeError::StaleInode {
314                remote_key: self.inner.full_key_for_inode(&lookup.inode).into(),
315                old_inode: inode.err(),
316                new_inode: lookup.inode.err(),
317            })
318        } else {
319            Ok(lookup)
320        }
321    }
322
323    async fn new_readdir_handle_with_pagesize(&self, dir_ino: InodeNo, page_size: usize) -> Result<u64, InodeError> {
324        trace!(dir=?dir_ino, "readdir");
325
326        let dir = self.inner.get(dir_ino)?;
327        logging::record_name(dir.name());
328        if dir.kind() != InodeKind::Directory {
329            return Err(InodeError::NotADirectory(dir.err()));
330        }
331        let parent_ino = dir.parent();
332
333        let dir_key = self.inner.full_key_for_inode(&dir);
334        assert_eq!(dir_key.kind(), InodeKind::Directory);
335        let handle = ReaddirHandle::new(&self.inner, dir_ino, parent_ino, dir_key.into(), page_size)?;
336        let handle_id = self.inner.next_dir_handle_id.fetch_add(1, Ordering::SeqCst);
337        let dirhandle = DirHandle::new(handle.parent(), handle);
338        self.inner
339            .dir_handles
340            .write()
341            .unwrap()
342            .insert(handle_id, Arc::new(dirhandle));
343        trace!("Added handle with id: {}", handle_id);
344        Ok(handle_id)
345    }
346
347    /// Prepare for an inode to be read.
348    /// Configure the inode state to be read-ready, track the new reader in the inode's handles_map,
349    /// and return the reference to any pending upload of data to S3 for the inode.
350    fn start_reading(
351        &self,
352        locked_inode: &mut InodeLockedForWriting,
353        inode: Inode,
354        fh: u64,
355    ) -> Result<Option<PendingUploadHook>, InodeError> {
356        match locked_inode.write_status {
357            WriteStatus::LocalUnopened => Err(InodeError::InodeNotReadableWhileWriting(inode.err())),
358            WriteStatus::LocalOpenForWriting | WriteStatus::PendingRename | WriteStatus::Remote => {
359                if !self.inner.open_handles.try_add_reader(locked_inode, fh) {
360                    return Err(InodeError::InodeNotReadableWhileWriting(inode.err()));
361                }
362                if matches!(locked_inode.write_status, WriteStatus::LocalOpenForWriting) {
363                    locked_inode.write_status = WriteStatus::Remote;
364                }
365                Ok(locked_inode.pending_upload_hook.clone())
366            }
367        }
368    }
369
370    /// Prepare for an inode to be written to.
371    /// Configure the inode state to be write-ready, track the new writer in the inode's handles_map,
372    /// and return the reference to any pending upload of data to S3 for the inode.
373    fn start_writing(
374        &self,
375        locked_inode: &mut InodeLockedForWriting,
376        inode: Inode,
377        mode: &WriteMode,
378        is_truncate: bool,
379        handle_id: u64,
380    ) -> Result<Option<PendingUploadHook>, InodeError> {
381        let setup_inode_for_writing = |locked_inode: &mut InodeLockedForWriting| -> Result<(), InodeError> {
382            self.inner
383                .open_handles
384                .set_writer(locked_inode, handle_id)
385                .map_err(|e| match e {
386                    SetWriterError::ActiveWriter => InodeError::InodeAlreadyWriting(inode.err()),
387                    SetWriterError::ActiveReaders => InodeError::InodeNotWritableWhileReading(inode.err()),
388                })?;
389            locked_inode.write_status = WriteStatus::LocalOpenForWriting;
390            Ok(())
391        };
392        match locked_inode.write_status {
393            WriteStatus::LocalUnopened => {
394                setup_inode_for_writing(locked_inode)?;
395                locked_inode.stat.size = 0;
396                Ok(None)
397            }
398            WriteStatus::PendingRename => Err(InodeError::InodeAlreadyWriting(inode.err())),
399            WriteStatus::LocalOpenForWriting | WriteStatus::Remote => {
400                if !mode.is_inode_writable(is_truncate) {
401                    return Err(InodeError::InodeNotWritable(inode.err()));
402                }
403                setup_inode_for_writing(locked_inode)?;
404                if is_truncate {
405                    locked_inode.stat.size = 0;
406                }
407                Ok(locked_inode.pending_upload_hook.clone())
408            }
409        }
410    }
411}
412
413#[async_trait]
414impl<OC: ObjectClient + Send + Sync + Clone> Metablock for Superblock<OC> {
415    /// Lookups inode and increments its lookup count.
416    async fn lookup(&self, parent_ino: InodeNo, name: &OsStr) -> Result<Lookup, InodeError> {
417        trace!(parent=?parent_ino, ?name, "lookup");
418        let lookup = self
419            .inner
420            .lookup_by_name(parent_ino, name, self.inner.config.cache_config.serve_lookup_from_cache)
421            .await?;
422        self.inner.remember(&lookup.inode);
423        Ok(lookup.into())
424    }
425
426    async fn getattr(&self, ino: InodeNo, force_revalidate_if_remote: bool) -> Result<Lookup, InodeError> {
427        self.getattr_with_inode(ino, force_revalidate_if_remote)
428            .await
429            .map(|lookup| lookup.into())
430    }
431
432    /// Rename is only supported on Amazon S3 directory buckets supporting the RenameObject operation.
433    /// File systems against other buckets will reject `rename` file system operations within the same file system.
434    ///
435    /// As part of this operation, we update the local file system state.
436    async fn rename(
437        &self,
438        src_parent_ino: InodeNo,
439        src_name: &OsStr,
440        dst_parent_ino: InodeNo,
441        dst_name: &OsStr,
442        allow_overwrite: bool,
443    ) -> Result<(), InodeError> {
444        // If we have cached a failed rename, we will directly fail
445        if !self.inner.cached_rename_support.should_try_rename() {
446            trace!("Cached rename failure, returning NotSupported");
447            return Err(InodeError::RenameNotSupported());
448        }
449        let src_parent = self.inner.get(src_parent_ino)?;
450        let dst_parent = self.inner.get(dst_parent_ino)?;
451        let src_inode = self
452            .inner
453            .lookup_by_name(
454                src_parent_ino,
455                src_name,
456                self.inner.config.cache_config.serve_lookup_from_cache,
457            )
458            .await?
459            .inode;
460        if src_inode.kind() == InodeKind::Directory {
461            return Err(InodeError::CannotRenameDirectory(src_inode.err()));
462        }
463        // Check write status from source and set to PendingRename
464        let mut src_status_guard = PendingRenameGuard::try_transition(&src_inode)?;
465
466        let dest_inode = self
467            .inner
468            .lookup_by_name(
469                dst_parent_ino,
470                dst_name,
471                self.inner.config.cache_config.serve_lookup_from_cache,
472            )
473            .await
474            .ok()
475            .map(|looked_up| looked_up.inode);
476        // Check if destination exists and transition to `PendingRename` state if possible.
477        let dest_status_guard = dest_inode
478            .as_ref()
479            .map(|inode| {
480                if !allow_overwrite {
481                    return Err(InodeError::RenameDestinationExists {
482                        dest_key: self.inner.full_key_for_inode(inode).to_string(),
483                        src_inode: src_inode.err(),
484                    });
485                }
486
487                PendingRenameGuard::try_transition(inode)
488            })
489            .transpose()?;
490
491        let src_key = self.inner.full_key_for_inode(&src_inode);
492        let dest_name = ValidName::parse_os_str(dst_name)?;
493        let dest_full_valid_name = dst_parent
494            .valid_key()
495            .new_child(dest_name, InodeKind::File)
496            .map_err(|_| InodeError::NotADirectory(dst_parent.err()))?;
497        let dest_key: String = format!("{}{}", self.inner.s3_path.prefix, dest_full_valid_name.as_ref());
498        debug!(?src_key, ?dest_key, "rename on remote file will now be actioned");
499        // TODO-RENAME: Consider adding ETag-matching here
500        let rename_params = if allow_overwrite {
501            RenameObjectParams::new()
502        } else {
503            RenameObjectParams::new().if_none_match(Some("*".to_string()))
504        };
505
506        let rename_object_result = self
507            .inner
508            .client
509            .rename_object(&self.inner.s3_path.bucket, src_key.as_ref(), &dest_key, &rename_params)
510            .await;
511
512        match rename_object_result {
513            Ok(_res) => {
514                debug!(?src_key, ?dest_key, "RenameObject succeeded");
515            }
516            Err(error) => {
517                debug!(?src_key, ?dest_key, ?error, "RenameObject failed");
518
519                return match error {
520                    ObjectClientError::ServiceError(RenameObjectError::PreConditionFailed(
521                        RenamePreconditionTypes::IfNoneMatch,
522                    )) => Err(InodeError::RenameDestinationExists {
523                        dest_key,
524                        src_inode: src_inode.err(),
525                    }),
526                    ObjectClientError::ServiceError(RenameObjectError::KeyNotFound) => {
527                        Err(InodeError::InodeDoesNotExist(src_inode.ino()))
528                    }
529                    ObjectClientError::ServiceError(RenameObjectError::KeyTooLong) => {
530                        Err(InodeError::NameTooLong(dest_key))
531                    }
532                    ObjectClientError::ServiceError(RenameObjectError::NotImplementedError) => {
533                        // We only cache `NotImplemented` responses negatively,
534                        // as other failures do not imply that the bucket does not support rename
535                        self.inner.cached_rename_support.cache_failure();
536                        Err(InodeError::RenameNotSupported())
537                    }
538                    _ => Err(InodeError::client_error(
539                        error,
540                        "RenameObject failed",
541                        &self.inner.s3_path.bucket,
542                        src_key.as_ref(),
543                    )),
544                };
545            }
546        };
547
548        self.inner.cached_rename_support.cache_success();
549        // Invalidate destination from negative cache, as it is now in S3
550        self.inner.negative_cache.remove(
551            dst_parent.ino(),
552            dst_name
553                .to_str()
554                .ok_or_else(|| InodeError::InvalidFileName(dst_name.to_owned()))?,
555        );
556        // Acquire locks using RenameLockGuard
557        let mut rename_guard = RenameLockGuard::new(&src_parent, &dst_parent, &self.inner)?;
558        // Handle source parent
559        {
560            let source_state = rename_guard.source_parent_mut();
561
562            match &mut source_state.kind_data {
563                InodeKindData::File { .. } => {
564                    debug_assert!(false, "inodes never change kind");
565                    return Err(InodeError::NotADirectory(src_parent.err()));
566                }
567                InodeKindData::Directory { children, .. } => {
568                    if let Some((_name, child)) = children.remove_entry(src_inode.name()) {
569                        // VFS-locking should guarantee that these are identical
570                        debug_assert_eq!(src_inode.ino(), child.ino(), "inode should have stayed identical");
571                    }
572                }
573            }
574        }
575        // Handle destination parent
576        {
577            let dst_state = rename_guard.destination_parent_mut();
578            match &mut dst_state.kind_data {
579                InodeKindData::File { .. } => {
580                    debug_assert!(false, "inodes never change kind");
581                    return Err(InodeError::NotADirectory(src_parent.err()));
582                }
583                InodeKindData::Directory { children, .. } => {
584                    let dst_name_as_str: Box<str> = dest_name.as_ref().into();
585                    let new_inode = src_inode.try_clone_with_new_key(
586                        dest_full_valid_name,
587                        &self.inner.s3_path.prefix,
588                        self.inner.config.cache_config.file_ttl,
589                        dst_parent_ino,
590                    )?;
591
592                    // Try to remove the inode from the parent, and notice if it was in an unexpected state.
593                    let concurrent_modification_detected =
594                        if let Some((_name, old_inode)) = children.remove_entry(dst_name_as_str.as_ref()) {
595                            dest_inode
596                                .as_ref()
597                                .map(|inode| inode.ino() != old_inode.ino())
598                                .unwrap_or(true)
599                        } else {
600                            dest_inode.is_some()
601                        };
602
603                    if concurrent_modification_detected {
604                        warn!(
605                            src_ino = src_inode.ino(),
606                            "concurrent modification detected during rename of inode {}, dest parent state changed unexpectedly which may cause unexpected behavior",
607                            src_inode.err(),
608                        );
609                    }
610
611                    children.insert(dst_name_as_str, new_inode.clone());
612                    let mut inodes_write = self.inner.inodes.write().unwrap();
613                    inodes_write.replace_or_insert(new_inode.ino(), &new_inode);
614                }
615            }
616        }
617        debug!("Rename completed in superblock");
618        src_status_guard.confirm();
619        if let Some(mut guard) = dest_status_guard {
620            guard.confirm()
621        }
622        Ok(())
623    }
624
625    /// Remove local-only empty directory, i.e., the ones created by mkdir.
626    /// It does not affect empty directories represented remotely with directory markers.
627    async fn rmdir(&self, parent_ino: InodeNo, name: &OsStr) -> Result<(), InodeError> {
628        let LookedUpInode { inode, .. } = self
629            .inner
630            .lookup_by_name(parent_ino, name, self.inner.config.cache_config.serve_lookup_from_cache)
631            .await?;
632
633        if inode.kind() == InodeKind::File {
634            return Err(InodeError::NotADirectory(inode.err()));
635        }
636
637        let parent = self.inner.get(parent_ino)?;
638        let mut parent_state = parent.get_mut_inode_state()?;
639        let mut inode_state = inode.get_mut_inode_state()?;
640
641        match &inode_state.write_status {
642            WriteStatus::LocalOpenForWriting => unreachable!("A directory cannot be in LocalOpenForWriting state"),
643            WriteStatus::Remote => {
644                return Err(InodeError::CannotRemoveRemoteDirectory(inode.err()));
645            }
646            WriteStatus::LocalUnopened => match &mut inode_state.kind_data {
647                InodeKindData::File {} => unreachable!("Already checked that inode is a directory"),
648                InodeKindData::Directory {
649                    writing_children,
650                    deleted,
651                    ..
652                } => {
653                    if !writing_children.is_empty() {
654                        return Err(InodeError::DirectoryNotEmpty(inode.err()));
655                    }
656                    *deleted = true;
657                }
658            },
659            WriteStatus::PendingRename => unreachable!("Only files can be in PendingRename"),
660        }
661
662        match &mut parent_state.kind_data {
663            InodeKindData::File {} => {
664                debug_assert!(false, "inodes never change kind");
665                return Err(InodeError::NotADirectory(parent.err()));
666            }
667            InodeKindData::Directory {
668                children,
669                writing_children,
670                ..
671            } => {
672                let removed = writing_children.remove(&inode.ino());
673                debug_assert!(
674                    removed,
675                    "should be able to remove the directory from its parents writing children as it was local"
676                );
677                children.remove(inode.name());
678            }
679        }
680
681        Ok(())
682    }
683
684    /// Unlink the entry described by `parent_ino` and `name`.
685    ///
686    /// If the entry exists, delete it from S3 and the superblock.
687    async fn unlink(&self, parent_ino: InodeNo, name: &OsStr) -> Result<(), InodeError> {
688        let parent = self.inner.get(parent_ino)?;
689        let LookedUpInode { inode, .. } = self
690            .inner
691            .lookup_by_name(parent_ino, name, self.inner.config.cache_config.serve_lookup_from_cache)
692            .await?;
693
694        if inode.kind() == InodeKind::Directory {
695            return Err(InodeError::IsDirectory(inode.err()));
696        }
697
698        let write_status = {
699            let inode_state = inode.get_inode_state()?;
700            inode_state.write_status
701        };
702
703        match write_status {
704            WriteStatus::LocalUnopened | WriteStatus::LocalOpenForWriting | WriteStatus::PendingRename => {
705                // In the future, we may permit `unlink` and cancel any in-flight uploads.
706                warn!(
707                    parent = parent_ino,
708                    ?name,
709                    "unlink on local file not allowed until write is complete",
710                );
711                return Err(InodeError::UnlinkNotPermittedWhileWriting(inode.err()));
712            }
713            WriteStatus::Remote => {
714                let bucket = &self.inner.s3_path.bucket;
715                let s3_key = self.inner.full_key_for_inode(&inode);
716                debug!(parent=?parent_ino, ?name, "unlink on remote file will delete key {}", s3_key);
717                let delete_obj_result = self.inner.client.delete_object(bucket, &s3_key).await;
718
719                match delete_obj_result {
720                    Ok(_res) => (),
721                    Err(e) => {
722                        error!(
723                            inode=%inode.err(),
724                            error=?e,
725                            "DeleteObject failed for unlink",
726                        );
727                        Err(InodeError::client_error(e, "DeleteObject failed", bucket, &s3_key))?;
728                    }
729                };
730            }
731        }
732
733        // Now that the entry was deleted from S3, we need to delete it from the superblock.
734        // The Linux Kernel's VFS should lock both the parent and child (https://www.kernel.org/doc/html/next/filesystems/directory-locking.html).
735        // However, the superblock is still subject to concurrent changes as we don't hold this lock over remote calls.
736        let mut parent_state = parent.get_mut_inode_state()?;
737        match &mut parent_state.kind_data {
738            InodeKindData::File { .. } => {
739                debug_assert!(false, "inodes never change kind");
740                return Err(InodeError::NotADirectory(parent.err()));
741            }
742            InodeKindData::Directory { children, .. } => {
743                // We want to remove the original child.
744                // If there is a mismatch in inode number between the existing inode and the deleted inode, we invalidate the existing inode's stat.
745                // If for some reason the child with the specified name was already removed, we do nothing as this is the desired state.
746                if let Some(existing_inode) = children.get(inode.name()) {
747                    if existing_inode.ino() == inode.ino() {
748                        children.remove(inode.name());
749                    } else {
750                        // Mismatch in inode number, thus the existing inode might not be the same one we deleted
751                        let mut state = existing_inode.get_mut_inode_state_no_check();
752
753                        // Invalidate the inode's stats so we refresh them from S3 when next queried
754                        state.stat.update_validity(Duration::from_secs(0));
755                    }
756                } else {
757                    debug!("parent did not contain child after deletion during unlink");
758                }
759            }
760        };
761
762        Ok(())
763    }
764
765    async fn setattr(
766        &self,
767        ino: InodeNo,
768        atime: Option<OffsetDateTime>,
769        mtime: Option<OffsetDateTime>,
770    ) -> Result<Lookup, InodeError> {
771        let inode = self.inner.get(ino)?;
772        logging::record_name(inode.name());
773        let mut sync = inode.get_mut_inode_state()?;
774
775        if sync.write_status == WriteStatus::Remote {
776            return Err(InodeError::SetAttrNotPermittedOnRemoteInode(inode.err()));
777        }
778
779        let validity = match inode.kind() {
780            InodeKind::File => self.inner.config.cache_config.file_ttl,
781            InodeKind::Directory => self.inner.config.cache_config.dir_ttl,
782        };
783
784        // Resetting the InodeStat expiry because the new InodeStat should have new validity
785        sync.stat.update_validity(validity);
786
787        if let Some(t) = atime {
788            sync.stat.atime = t;
789        }
790        if let Some(t) = mtime {
791            sync.stat.mtime = t;
792        };
793
794        let stat = sync.stat.clone();
795        drop(sync);
796        Ok(Lookup::new(
797            inode.ino(),
798            stat,
799            inode.kind(),
800            Some(S3Location::new(self.inner.s3_path.clone(), inode.valid_key().clone())),
801        ))
802    }
803
804    /// Opens a file handle for the specified inode.
805    ///
806    /// Prepares the inode for reading or writing based on the provided flags and write mode.
807    /// Returns a new handle that can be used for subsequent file operations.
808    ///
809    /// Errors out if request/Mountpoint setup is incorrect, or if the inode is not in a state to
810    /// allow opening the new handle.
811    async fn open_handle(
812        &self,
813        ino: InodeNo,
814        fh: u64,
815        write_mode: &WriteMode,
816        flags: OpenFlags,
817    ) -> Result<NewHandle, InodeError> {
818        let force_revalidate_if_remote = !self.inner.config.cache_config.serve_lookup_from_cache || flags.direct_io();
819        let looked_up_inode = self.getattr_with_inode(ino, force_revalidate_if_remote).await?;
820        match looked_up_inode.inode.kind() {
821            InodeKind::Directory => return Err(InodeError::IsDirectory(looked_up_inode.inode.err())),
822            InodeKind::File => (),
823        }
824
825        let mode = if flags.contains(OpenFlags::O_RDWR) {
826            if looked_up_inode.write_status == WriteStatus::LocalUnopened
827                || (write_mode.allow_overwrite && flags.contains(OpenFlags::O_TRUNC))
828                || (write_mode.incremental_upload && flags.contains(OpenFlags::O_APPEND))
829            {
830                // If the file is new, or if it was opened in truncate or in append mode,
831                // we know it should be a write handle.
832                debug!("open choosing write handle for O_RDWR");
833                ReadWriteMode::Write
834            } else {
835                // Otherwise, it should be a read handle.
836                debug!("open choosing read handle for O_RDWR");
837                ReadWriteMode::Read
838            }
839        } else if flags.contains(OpenFlags::O_WRONLY) {
840            ReadWriteMode::Write
841        } else {
842            ReadWriteMode::Read
843        };
844
845        if matches!(mode, ReadWriteMode::Read) && !looked_up_inode.stat.is_readable {
846            return Err(InodeError::FlexibleRetrievalObjectNotAccessible(
847                looked_up_inode.inode.err(),
848            ));
849        }
850
851        let (pending_upload_hook, inode_lookup) = {
852            let inode = looked_up_inode.inode.clone();
853            let mut locked_inode = looked_up_inode.inode.get_mut_inode_state()?;
854
855            let pending_upload_hook = match mode {
856                ReadWriteMode::Read => self.start_reading(&mut locked_inode, inode.clone(), fh)?,
857                ReadWriteMode::Write => {
858                    let is_truncate = flags.contains(OpenFlags::O_TRUNC);
859                    self.start_writing(&mut locked_inode, inode.clone(), write_mode, is_truncate, fh)?
860                }
861            };
862
863            let inode_lookup = Lookup::new(
864                inode.ino(),
865                locked_inode.stat.clone(),
866                inode.kind(),
867                Some(S3Location::new(self.inner.s3_path.clone(), inode.valid_key().clone())),
868            );
869
870            (pending_upload_hook, inode_lookup)
871        };
872
873        let lookup = if let Some(upload_hook) = pending_upload_hook
874            && let Some(lookup_after_upload) = upload_hook.wait_for_completion().await?
875        {
876            lookup_after_upload
877        } else {
878            inode_lookup
879        };
880
881        Ok(NewHandle { lookup, mode })
882    }
883
884    async fn inc_file_size(&self, ino: InodeNo, len: usize) -> Result<usize, InodeError> {
885        let inode = self.inner.get(ino)?;
886        let mut state = inode.get_mut_inode_state()?;
887        if !matches!(state.write_status, WriteStatus::LocalOpenForWriting) {
888            debug!(?inode, "Error trying to increase file size on write");
889            return Err(InodeError::InodeInvalidWriteStatus(inode.err()));
890        }
891        state.stat.size += len;
892        Ok(state.stat.size)
893    }
894
895    /// Concludes a writing operation for a file handle and marks the inode closed for writing anymore.
896    ///
897    /// Transitions the inode and all its ancestor directories from local writing state to remote
898    /// state as needed.
899    /// Updates the inode with the latest state in S3 and invalidates the closed writer-handle.
900    async fn finish_writing(&self, ino: InodeNo, etag: Option<ETag>, fh: u64) -> Result<Lookup, InodeError> {
901        let inode = self.inner.get(ino)?;
902        // Collect ancestor inodes that may need updating,
903        // from parent to first remote ancestor.
904        let ancestors = {
905            let mut ancestors = Vec::new();
906            let mut ancestor_ino = inode.parent();
907            let mut visited = HashSet::new();
908            loop {
909                assert!(visited.insert(ancestor_ino), "cycle detected in inode ancestors");
910                let ancestor = self.inner.get(ancestor_ino)?;
911                ancestors.push(ancestor.clone());
912                if ancestor.ino() == FUSE_ROOT_INODE || ancestor.get_inode_state()?.write_status == WriteStatus::Remote
913                {
914                    break;
915                }
916                ancestor_ino = ancestor.parent();
917            }
918            ancestors
919        };
920
921        // Acquire locks on ancestors in descending order to avoid deadlocks.
922        let mut ancestors_states = ancestors
923            .iter()
924            .rev()
925            .map(|inode| inode.get_mut_inode_state())
926            .collect::<Result<Vec<_>, _>>()?;
927
928        let mut locked_inode = inode.get_mut_inode_state()?;
929        match locked_inode.write_status {
930            WriteStatus::LocalOpenForWriting | WriteStatus::Remote => {
931                locked_inode.pending_upload_hook = None;
932                // The completion of the pending upload results in the object etag being updated,
933                // and hence we want to convey the latest inode stat values (with updated validity)
934                // and LookedUpInode to the caller awaiting the hook completion, before it proceeds
935                // with the rest of the request.
936                // A failed hook completion returns the stale information it has (same as the
937                // existing lookup) but invalidates the inode's stats instead, so that we refresh
938                // them from S3 when next queried.
939
940                if let Some(etag) = etag {
941                    // Upload succeeded (with the new `etag`)
942                    //
943                    // Only go to Remote if finishing writing the current open writer.
944                    // Otherwise, the start_reading/writing methods take care of inode status changes.
945                    if self.inner.open_handles.try_remove_writer(&locked_inode, fh) {
946                        locked_inode.write_status = WriteStatus::Remote;
947                    }
948                    locked_inode.stat.etag = Some(etag.into_inner().into_boxed_str());
949                    locked_inode
950                        .stat
951                        .update_validity(self.inner.config.cache_config.file_ttl);
952                } else {
953                    // Upload failed
954                    locked_inode.write_status = WriteStatus::Remote;
955                    self.inner.open_handles.remove_inode(ino); // Equivalent of removing all handles
956                    locked_inode.stat.update_validity(Duration::from_secs(0));
957                }
958
959                // Walk up the ancestors from parent to first remote ancestor to transition
960                // the inode and all "local" containing directories to "remote".
961                let children_inos = std::iter::once(inode.ino()).chain(ancestors.iter().map(|ancestor| ancestor.ino()));
962                for (ancestor_state, child_ino) in ancestors_states.iter_mut().rev().zip(children_inos) {
963                    match &mut ancestor_state.kind_data {
964                        InodeKindData::File { .. } => unreachable!("we know the ancestor is a directory"),
965                        InodeKindData::Directory { writing_children, .. } => {
966                            writing_children.remove(&child_ino);
967                        }
968                    }
969                    ancestor_state.write_status = WriteStatus::Remote;
970                }
971
972                let stat = locked_inode.stat.clone();
973                drop(locked_inode);
974
975                Ok(Lookup::new(
976                    inode.ino(),
977                    stat,
978                    inode.kind(),
979                    Some(S3Location::new(self.inner.s3_path.clone(), inode.valid_key().clone())),
980                ))
981            }
982            _ => Err(InodeError::InodeInvalidWriteStatus(inode.err())),
983        }
984    }
985
986    /// Marks a reader handle as deactivated in the open handles map.
987    async fn flush_reader(&self, ino: InodeNo, fh: u64) -> Result<(), InodeError> {
988        let inode = self.inner.get(ino)?;
989        let locked_inode = inode.get_mut_inode_state()?;
990        self.inner.open_handles.deactivate_reader(&locked_inode, fh);
991        Ok(())
992    }
993
994    /// Marks a writer handle as deactivated in the open handles map, and
995    /// sets up a pending upload hook if the inode is in the local writing state.
996    ///
997    /// Returns the upload hook if one was created or already exists, which the caller may choose to
998    /// await the completion of.
999    async fn flush_writer(
1000        &self,
1001        ino: InodeNo,
1002        fh: u64,
1003        hook: PendingUploadHook,
1004    ) -> Result<Option<PendingUploadHook>, InodeError> {
1005        let inode = self.inner.get(ino)?;
1006        let pending_upload_hook = {
1007            let mut locked_inode = inode.get_mut_inode_state()?;
1008            match locked_inode.write_status {
1009                WriteStatus::LocalOpenForWriting => {
1010                    if self.inner.open_handles.try_deactivate_writer(&locked_inode, fh) {
1011                        Some(locked_inode.pending_upload_hook.get_or_insert(hook).clone())
1012                    } else {
1013                        None
1014                    }
1015                }
1016                _ => None,
1017            }
1018        };
1019        Ok(pending_upload_hook)
1020    }
1021
1022    /// Concludes a read operation for a file handle.
1023    ///
1024    /// Cleans up the reader handle from the inode's open handles map.
1025    async fn finish_reading(&self, ino: InodeNo, fh: u64) -> Result<(), InodeError> {
1026        let inode = self.inner.get(ino)?;
1027        let state = inode.get_mut_inode_state()?;
1028        self.inner.open_handles.remove_reader(&state, fh);
1029        Ok(())
1030    }
1031
1032    /// Releases a writer handle and waits for any pending upload to complete.
1033    ///
1034    /// Flushes the writer handle if it's not already flushed at the time of receiving a `Release`.
1035    /// Waits for the upload hook to complete (if one exists or is newly created by release).
1036    /// This ensures all data is uploaded to S3 before the handle is fully released.
1037    async fn release_writer(
1038        &self,
1039        ino: InodeNo,
1040        fh: u64,
1041        pending_upload_hook: PendingUploadHook,
1042        location: &S3Location,
1043    ) -> Result<(), InodeError> {
1044        let pending_upload_hook = self.flush_writer(ino, fh, pending_upload_hook).await;
1045        match pending_upload_hook {
1046            Ok(Some(upload_hook)) => {
1047                let completion_result = upload_hook.wait_for_completion().await?;
1048                if completion_result.is_some() {
1049                    debug!(key = %location, "upload completed async after file was closed");
1050                }
1051            }
1052            Ok(None) => {}
1053            Err(e) => {
1054                debug!(key = %location, "failed to flush open file handle during release: {e}");
1055            }
1056        }
1057        Ok(())
1058    }
1059
1060    async fn new_readdir_handle(&self, dir_ino: InodeNo) -> Result<u64, InodeError> {
1061        self.new_readdir_handle_with_pagesize(dir_ino, 1000).await
1062    }
1063
1064    async fn readdir<'a>(
1065        &self,
1066        parent: InodeNo,
1067        fh: u64,
1068        offset: i64,
1069        is_readdirplus: bool,
1070        mut add: AddDirEntry<'a>,
1071    ) -> Result<(), InodeError> {
1072        let dir_handle = {
1073            let dir_handles = self.inner.dir_handles.read().unwrap();
1074            dir_handles.get(&fh).cloned().ok_or(InodeError::NoSuchDirHandle { fh })
1075        }?;
1076        trace!("readdir in superblock");
1077        // special case where we need to rewind and restart the streaming but only when it is not the first time we see offset 0
1078        if offset == 0 && dir_handle.offset() != 0 {
1079            trace!("new handle");
1080            let dir = self.inner.get(parent)?;
1081            let new_handle = ReaddirHandle::new(
1082                &self.inner,
1083                dir.ino(),
1084                dir.parent(),
1085                self.inner.full_key_for_inode(&dir).to_string(),
1086                1000,
1087            )?;
1088            *dir_handle.handle.lock().await = new_handle;
1089            dir_handle.rewind_offset();
1090            // drop any cached entries, as new response may be unordered and cache would be stale
1091            *dir_handle.last_response.lock().await = None;
1092        }
1093        let readdir_handle = dir_handle.handle.lock().await;
1094
1095        // If offset is 0 we've already restarted the request and do not use cache, otherwise we're using the same request
1096        // and it is safe to repeat the response. We do not repeat the response if negative offset was provided.
1097        if offset != dir_handle.offset() && offset > 0 {
1098            // POSIX allows seeking an open directory. That's a pain for us since we are streaming
1099            // the directory entries and don't want to keep them all in memory. But one common case
1100            // we've seen (https://github.com/awslabs/mountpoint-s3/issues/477) is applications that
1101            // request offset 0 twice in a row. So we remember the last response and, if repeated,
1102            // we return it again. Last response may also be used partially, if an interrupt occured
1103            // (https://github.com/awslabs/mountpoint-s3/issues/955), which caused entries from it to
1104            // be only partially fetched by kernel.
1105
1106            let last_response = dir_handle.last_response.lock().await;
1107            if let Some((last_offset, entries)) = last_response.as_ref() {
1108                let offset = offset as usize;
1109                let last_offset = *last_offset as usize;
1110                if (last_offset..last_offset + entries.len()).contains(&offset) {
1111                    trace!(offset, "repeating readdir response");
1112                    for entry in entries[offset - last_offset..].iter() {
1113                        if add(
1114                            entry.lookup.clone().into(),
1115                            entry.name.clone(),
1116                            entry.offset,
1117                            entry.generation,
1118                        ) == AddDirEntryResult::ReplyBufferFull
1119                        {
1120                            break;
1121                        }
1122                        // We are returning this result a second time, so the contract is that we
1123                        // must remember it again, except that readdirplus specifies that . and ..
1124                        // are never incremented.
1125                        if is_readdirplus && entry.name != "." && entry.name != ".." {
1126                            self.inner.remember(&entry.lookup.inode)
1127                        }
1128                    }
1129                    return Ok(());
1130                }
1131            }
1132            return Err(InodeError::OutOfOrderReadDir {
1133                expected: dir_handle.offset(),
1134                actual: offset,
1135                fh,
1136            });
1137        }
1138
1139        /// Wrap a replier to duplicate the entries and store them in `dir_handle.last_response` so
1140        /// we can re-use them if the directory handle rewinds
1141        struct Reply<'a> {
1142            add: AddDirEntry<'a>,
1143            entries: Vec<DirectoryEntryReaddir>,
1144        }
1145
1146        impl Reply<'_> {
1147            async fn finish(self, offset: i64, dir_handle: &DirHandle) {
1148                *dir_handle.last_response.lock().await = Some((offset, self.entries));
1149            }
1150            fn add(&mut self, entry: DirectoryEntryReaddir) -> AddDirEntryResult {
1151                let result = (self.add)(
1152                    entry.lookup.clone().into(),
1153                    entry.name.clone(),
1154                    entry.offset,
1155                    entry.generation,
1156                );
1157                if result == AddDirEntryResult::EntryAdded {
1158                    self.entries.push(entry);
1159                }
1160                result
1161            }
1162        }
1163
1164        let mut reply = Reply { add, entries: vec![] };
1165
1166        if dir_handle.offset() < 1 {
1167            let lookup = self.getattr_with_inode(parent, false).await?;
1168            let entry = DirectoryEntryReaddir {
1169                offset: dir_handle.offset() + 1,
1170                name: ".".into(),
1171                generation: 0,
1172                lookup,
1173            };
1174            if reply.add(entry) == AddDirEntryResult::ReplyBufferFull {
1175                reply.finish(offset, &dir_handle).await;
1176                return Ok(());
1177            }
1178            dir_handle.next_offset();
1179        }
1180        if dir_handle.offset() < 2 {
1181            let lookup = self.getattr_with_inode(readdir_handle.parent(), false).await?;
1182            let entry = DirectoryEntryReaddir {
1183                offset: dir_handle.offset() + 1,
1184                name: "..".into(),
1185                generation: 0,
1186                lookup,
1187            };
1188            if reply.add(entry) == AddDirEntryResult::ReplyBufferFull {
1189                reply.finish(offset, &dir_handle).await;
1190                return Ok(());
1191            }
1192            dir_handle.next_offset();
1193        }
1194
1195        loop {
1196            let next = match readdir_handle.next(&self.inner).await? {
1197                None => {
1198                    reply.finish(offset, &dir_handle).await;
1199                    return Ok(());
1200                }
1201                Some(next) => next,
1202            };
1203            trace!(next_inode = ?next.inode, "new inode yielded by readdir handle");
1204            let entry = DirectoryEntryReaddir {
1205                offset: dir_handle.offset() + 1,
1206                name: next.inode.name().into(),
1207                generation: 0,
1208                lookup: next.clone(),
1209            };
1210
1211            if reply.add(entry) == AddDirEntryResult::ReplyBufferFull {
1212                readdir_handle.readd(next);
1213                reply.finish(offset, &dir_handle).await;
1214                return Ok(());
1215            }
1216            if is_readdirplus {
1217                self.inner.remember(&next.inode)
1218            }
1219            dir_handle.next_offset();
1220        }
1221    }
1222
1223    async fn releasedir(&self, fh: u64) -> Result<(), InodeError> {
1224        let mut dir_handles = self.inner.dir_handles.write().unwrap();
1225        dir_handles
1226            .remove(&fh)
1227            .map(|_| ())
1228            .ok_or(InodeError::NoSuchDirHandle { fh })
1229    }
1230
1231    async fn create(&self, dir: InodeNo, name: &OsStr, kind: InodeKind) -> Result<Lookup, InodeError> {
1232        trace!(parent=?dir, ?name, "create");
1233
1234        let existing = self
1235            .inner
1236            .lookup_by_name(dir, name, self.inner.config.cache_config.serve_lookup_from_cache)
1237            .await;
1238        match existing {
1239            Ok(lookup) => return Err(InodeError::FileAlreadyExists(lookup.inode.err())),
1240            Err(InodeError::FileDoesNotExist(_, _)) => (),
1241            Err(e) => return Err(e),
1242        }
1243
1244        // Should be impossible to fail since [lookup] does this check, but let's be sure
1245        let name: ValidName = name.try_into()?;
1246
1247        // Put inode creation in a block so we don't hold the lock on the parent state longer than needed.
1248        let (lookup, inode) = {
1249            let parent_inode = self.inner.get(dir)?;
1250            let mut parent_state = parent_inode.get_mut_inode_state()?;
1251
1252            // Check again for the child now that the parent is locked, since we might have lost to a
1253            // racing lookup. (It would be nice to lock the parent and *then* lookup, but we'd have to
1254            // hold that lock across the remote API calls).
1255            let InodeKindData::Directory { children, .. } = &mut parent_state.kind_data else {
1256                return Err(InodeError::NotADirectory(parent_inode.err()));
1257            };
1258            if let Some(inode) = children.get(name.as_ref()) {
1259                return Err(InodeError::FileAlreadyExists(inode.err()));
1260            }
1261
1262            let stat = match kind {
1263                // Objects don't have an ETag until they are uploaded to S3
1264                InodeKind::File => InodeStat::for_file(
1265                    0,
1266                    OffsetDateTime::now_utc(),
1267                    None,
1268                    None,
1269                    None,
1270                    self.inner.config.cache_config.file_ttl,
1271                ),
1272                InodeKind::Directory => {
1273                    InodeStat::for_directory(self.inner.mount_time, self.inner.config.cache_config.dir_ttl)
1274                }
1275            };
1276
1277            let write_status = WriteStatus::LocalUnopened;
1278            let state = InodeState::new(&stat, kind, write_status);
1279            let inode = self
1280                .inner
1281                .create_inode_locked(&parent_inode, &mut parent_state, name, kind, state, true)?;
1282            let lookup = Lookup::new(
1283                inode.ino(),
1284                stat,
1285                inode.kind(),
1286                Some(S3Location::new(self.inner.s3_path.clone(), inode.valid_key().clone())),
1287            );
1288            (lookup, inode)
1289        };
1290
1291        self.inner.remember(&inode);
1292        Ok(lookup)
1293    }
1294
1295    /// Reacts to the kernel notifying us that the lookup count of an Inode has decreased.
1296    /// If the lookup count reaches zero, it is safe for the [Superblock] to delete the [Inode].
1297    async fn forget(&self, ino: InodeNo, n: u64) {
1298        let mut inodes = self.inner.inodes.write().unwrap();
1299
1300        match inodes.decrease_lookup_count(ino, n) {
1301            Ok(Some(removed_inode)) => {
1302                trace!(ino, "removing inode from superblock");
1303                let parent_ino = removed_inode.parent();
1304
1305                // Remove from the parent. Nothing to do if the parent has already been removed,
1306                // e.g. when FORGETs are handled out-of-order.
1307                if let Some((parent, _)) = inodes.get_mut(&parent_ino) {
1308                    let mut parent_state = parent.get_mut_inode_state_no_check();
1309                    let InodeKindData::Directory {
1310                        children,
1311                        writing_children,
1312                        ..
1313                    } = &mut parent_state.kind_data
1314                    else {
1315                        unreachable!("parent is always a directory");
1316                    };
1317                    if let Some(child) = children.get(removed_inode.name()) {
1318                        // Don't accidentally remove a newer inode (e.g. remote shadowing local)
1319                        if child.ino() == ino {
1320                            children.remove(removed_inode.name());
1321                        }
1322                    }
1323                    writing_children.remove(&ino);
1324                }
1325
1326                drop(inodes);
1327
1328                if let Ok(state) = removed_inode.get_inode_state() {
1329                    metrics::counter!("metadata_cache.inode_forgotten_before_expiry")
1330                        .increment(state.stat.is_valid().into());
1331                };
1332                if self.inner.open_handles.remove_inode(ino) {
1333                    // This should never happen, but it is good to have this visibility to detect any
1334                    // discrepancies in our inode handles' tracking logic or tests involving `forget`
1335                    // TODO: Fix this condition, as it can lead to potential data loss
1336                    // The Kernel should issue the `release` on the last open write handle before it
1337                    // issues the `forget` on the inode, but due to potential out-of-order processing,
1338                    // the `release` may not be able to complete upload to S3 before the inode is
1339                    // forgotten from Mountpoint's internal state.
1340                    debug!("Open file handle(s) found for forgotten inode {}", ino);
1341                }
1342            }
1343            Ok(None) => {}
1344            Err(_) => {
1345                debug_assert!(
1346                    false,
1347                    "forget should not be called on inode already removed from superblock"
1348                );
1349                error!("forget called on inode {ino} already removed from the superblock");
1350            }
1351        }
1352    }
1353
1354    /// Attempts to re-activate a file handle for the specified inode when a read/write arrives
1355    /// for a `flushed` handle.
1356    ///
1357    /// Tries to activate either a reader or writer handle based on the mode. If the handle is still
1358    /// open (not overridden by another open), it gets marked as "Active" in the open handles map.
1359    /// Returns true if the handle was successfully activated, false otherwise.
1360    async fn try_reactivate_handle(&self, ino: InodeNo, fh: u64, mode: ReadWriteMode) -> Result<bool, InodeError> {
1361        let inode = self.inner.get(ino)?;
1362        let mut locked_inode = inode.get_mut_inode_state()?;
1363        match mode {
1364            ReadWriteMode::Read => {
1365                if self.inner.open_handles.try_activate_reader(&locked_inode, fh) {
1366                    return Ok(true);
1367                }
1368            }
1369            ReadWriteMode::Write => {
1370                if self.inner.open_handles.try_activate_writer(&locked_inode, fh) {
1371                    debug_assert!(locked_inode.write_status == WriteStatus::LocalOpenForWriting);
1372                    locked_inode.pending_upload_hook = None;
1373                    return Ok(true);
1374                }
1375            }
1376        }
1377        Ok(false)
1378    }
1379}
1380
1381impl<OC: ObjectClient + Send + Sync> SuperblockInner<OC> {
1382    /// Retrieve the inode for the given number if it exists.
1383    ///
1384    /// The expiry of its stat field is not checked.
1385    /// This may return error on no entry existing or if the Inode is corrupted.
1386    pub fn get(&self, ino: InodeNo) -> Result<Inode, InodeError> {
1387        let inode = self
1388            .inodes
1389            .read()
1390            .unwrap()
1391            .get_inode(&ino)
1392            .cloned()
1393            .ok_or(InodeError::InodeDoesNotExist(ino))?;
1394        inode.verify_inode(ino, &self.s3_path.prefix)?;
1395        Ok(inode)
1396    }
1397
1398    fn full_key_for_inode(&self, inode: &Inode) -> ValidKey {
1399        inode.valid_key().full_key(&self.s3_path.prefix)
1400    }
1401
1402    /// Increase the lookup count of the given inode and
1403    /// ensure it is registered with this superblock.
1404    ///
1405    /// This should be called whenever we pass a `fuse_reply_entry` or `fuse_reply_create` struct to the FUSE driver.
1406    pub fn remember(&self, inode: &Inode) {
1407        let mut inodes_write = self.inodes.write().unwrap();
1408        inodes_write.increase_lookup_count(inode);
1409    }
1410
1411    /// Lookup an inode in the parent directory with the given name.
1412    ///
1413    /// Updates the parent inode to be in sync with the client, but does
1414    /// not add new inodes to the superblock. The caller is responsible
1415    /// for calling [`remember()`] if that is required.
1416    async fn lookup_by_name(
1417        &self,
1418        parent_ino: InodeNo,
1419        name: &OsStr,
1420        allow_cache: bool,
1421    ) -> Result<LookedUpInode, InodeError> {
1422        let name: ValidName = name.try_into()?;
1423
1424        let lookup = if allow_cache {
1425            self.cache_lookup(parent_ino, &name)
1426        } else {
1427            None
1428        };
1429
1430        let lookup = match lookup {
1431            Some(lookup) => lookup?,
1432            None => {
1433                let remote = self.remote_lookup(parent_ino, name).await?;
1434                self.update_from_remote(parent_ino, name, remote)?
1435            }
1436        };
1437
1438        lookup
1439            .inode
1440            .verify_child(parent_ino, name.as_ref(), &self.s3_path.prefix)?;
1441        Ok(lookup)
1442    }
1443
1444    /// Lookup an [Inode] against known directory entries in the parent,
1445    /// verifying any returned entry has not expired.
1446    /// If no record for the given `name` is found, returns [None].
1447    /// If an entry is found in the negative cache, returns [Some(Err(InodeError::FileDoesNotExist))].
1448    fn cache_lookup(&self, parent_ino: InodeNo, name: &str) -> Option<Result<LookedUpInode, InodeError>> {
1449        fn do_cache_lookup<O: ObjectClient + Send + Sync>(
1450            superblock: &SuperblockInner<O>,
1451            parent: Inode,
1452            name: &str,
1453        ) -> Option<Result<LookedUpInode, InodeError>> {
1454            match &parent.get_inode_state().ok()?.kind_data {
1455                InodeKindData::File { .. } => unreachable!("parent should be a directory!"),
1456                InodeKindData::Directory { children, .. } => {
1457                    if let Some(inode) = children.get(name) {
1458                        let locked = inode.get_inode_state().ok()?;
1459                        if locked.stat.is_valid() {
1460                            let lookup = LookedUpInode {
1461                                inode: inode.clone(),
1462                                stat: locked.stat.clone(),
1463                                path: superblock.s3_path.clone(),
1464                                write_status: locked.write_status,
1465                            };
1466                            return Some(Ok(lookup));
1467                        }
1468                    }
1469                }
1470            };
1471
1472            if superblock.negative_cache.contains(parent.ino(), name) {
1473                return Some(Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err())));
1474            }
1475
1476            None
1477        }
1478
1479        let lookup = self
1480            .get(parent_ino)
1481            .ok()
1482            .and_then(|parent| do_cache_lookup(self, parent, name));
1483
1484        match &lookup {
1485            Some(lookup) => trace!("lookup returned from cache: {:?}", lookup),
1486            None => trace!("no lookup available from cache"),
1487        }
1488        metrics::counter!("metadata_cache.cache_hit").increment(lookup.is_some().into());
1489
1490        lookup
1491    }
1492
1493    /// Lookup an inode in the parent directory with the given name
1494    /// on the remote client.
1495    async fn remote_lookup(
1496        &self,
1497        parent_ino: InodeNo,
1498        name: ValidName<'_>,
1499    ) -> Result<Option<RemoteLookup>, InodeError> {
1500        let parent = self.get(parent_ino)?;
1501        let full_path: String = self
1502            .full_key_for_inode(&parent)
1503            .new_child(name, InodeKind::Directory)
1504            .map_err(|_| InodeError::NotADirectory(parent.err()))?
1505            .into();
1506
1507        let object_key = &full_path[..(full_path.len() - 1)];
1508        let directory_prefix = &full_path[..];
1509
1510        // We need to try two requests here, one to find an object with the given name, and one to
1511        // discover a possible shadowing (implicit) directory with the same name. There's a few
1512        // different cases we need to consider here:
1513        //   (1) Consider this namespace with two keys:
1514        //           a
1515        //           a/b
1516        //       Here we need to make a choice about whether to make `a` visible as a file or as a
1517        //       directory. We choose to make it a directory. If we lookup("a") and only do a
1518        //       HeadObject for `a`, we'd see the object `a`, but we need to shadow that object with
1519        //       a directory. Doing the concurrent ListObjects lets us find out that `a` needs to be
1520        //       a directory and so we should suppress the file lookup. Note that this means we
1521        //       can't respond to the `lookup` call until both the Head and List calls complete.
1522        //   (2) Consider this namespace with two keys, similar to (1):
1523        //           a
1524        //           a/
1525        //       This has the same problem as (1), except that we also need to warn the user that
1526        //       the key `a/` will be inaccessible.
1527        //   (3) Consider this namespace with two keys:
1528        //           dir-1/foo
1529        //           dir/ bar
1530        //       Here we need to be careful how we issue the ListObjects call. If we don't append a
1531        //       "/" to the prefix in the request, the first common prefix we'll get back will be
1532        //       "dir-1/", because that precedes "dir/" in lexicographic order. Doing the
1533        //       ListObjects with "/" appended makes sure we always observe the correct prefix.
1534        let head_object_params = HeadObjectParams::new();
1535        let mut file_lookup = self
1536            .client
1537            .head_object(&self.s3_path.bucket, object_key, &head_object_params)
1538            .fuse();
1539        let mut dir_lookup = self
1540            .client
1541            .list_objects(&self.s3_path.bucket, None, "/", 1, directory_prefix)
1542            .fuse();
1543
1544        let mut file_state = None;
1545
1546        for _ in 0..2 {
1547            select_biased! {
1548                result = file_lookup => {
1549                    match result {
1550                        Ok(HeadObjectResult { size, last_modified, restore_status, etag, storage_class, .. }) => {
1551                            let stat = InodeStat::for_file(size as usize, last_modified, Some(etag.into_inner().into_boxed_str()), storage_class.as_deref(), restore_status, self.config.cache_config.file_ttl);
1552                            file_state = Some(stat);
1553                        }
1554                        // If the object is not found, might be a directory, so keep going
1555                        Err(ObjectClientError::ServiceError(HeadObjectError::NotFound)) => {},
1556                        Err(e) => return Err(InodeError::client_error(e, "HeadObject failed", &self.s3_path.bucket, object_key)),
1557                    }
1558                }
1559
1560                result = dir_lookup => {
1561                    let result = result.map_err(|e| InodeError::client_error(e, "ListObjectsV2 failed", &self.s3_path.bucket, object_key))?;
1562
1563                    let found_directory = if result
1564                        .common_prefixes
1565                        .first()
1566                        .map(|prefix| prefix.starts_with(directory_prefix))
1567                        .unwrap_or(false)
1568                    {
1569                        true
1570                    } else if result
1571                        .objects
1572                        .first()
1573                        .map(|object| object.key.starts_with(directory_prefix))
1574                        .unwrap_or(false)
1575                    {
1576                        if result.objects[0].key == directory_prefix {
1577                            trace!(
1578                                parent = ?parent_ino,
1579                                ?name,
1580                                size = result.objects[0].size,
1581                                "found a directory that shadows this name"
1582                            );
1583                            // The S3 Console creates zero-sized keys for explicit directories, so
1584                            // let's not warn about those cases.
1585                            if result.objects[0].size > 0 {
1586                                warn!(
1587                                    "key {:?} is not a valid filename (ends in `/`); will be hidden and unavailable",
1588                                    directory_prefix
1589                                );
1590                            }
1591                        }
1592                        true
1593                    } else {
1594                        false
1595                    };
1596
1597                    // We don't have to wait for the HeadObject to complete because in our
1598                    // semantics, directories always shadow files.
1599                    if found_directory {
1600                        trace!(parent = ?parent_ino, ?name, "lookup ListObjects found a directory");
1601                        let stat = InodeStat::for_directory(self.mount_time, self.config.cache_config.dir_ttl);
1602                        return Ok(Some(RemoteLookup { kind: InodeKind::Directory, stat }));
1603                    }
1604                }
1605            }
1606        }
1607
1608        // If we reach here, the ListObjects didn't find a shadowing directory, so we know we either
1609        // have a valid file, or both requests failed to find the object so the file must not exist remotely
1610        if let Some(mut stat) = file_state {
1611            trace!(parent = ?parent_ino, ?name, etag =? stat.etag, "found a regular file in S3");
1612            // Update the validity of the stat in case the racing ListObjects took a long time
1613            stat.update_validity(self.config.cache_config.file_ttl);
1614            Ok(Some(RemoteLookup {
1615                kind: InodeKind::File,
1616                stat,
1617            }))
1618        } else {
1619            trace!(parent = ?parent_ino, ?name, "not found");
1620            Ok(None)
1621        }
1622    }
1623
1624    /// Update the inode with the given name in a parent directory with the remote data.
1625    /// It may update or delete an existing inode, or insert a new one.
1626    fn update_from_remote(
1627        &self,
1628        parent_ino: InodeNo,
1629        name: ValidName,
1630        remote: Option<RemoteLookup>,
1631    ) -> Result<LookedUpInode, InodeError> {
1632        let parent = self.get(parent_ino)?;
1633
1634        // Should be impossible since all callers check this already, but let's be safe
1635        if parent.kind() != InodeKind::Directory {
1636            return Err(InodeError::NotADirectory(parent.err()));
1637        }
1638
1639        if self.config.cache_config.use_negative_cache {
1640            match &remote {
1641                // Remove negative cache entry.
1642                Some(_) => self.negative_cache.remove(parent_ino, &name),
1643                // Insert or update TTL of negative cache entry.
1644                None => self.negative_cache.insert(parent_ino, &name),
1645            }
1646        }
1647
1648        // Fast path: try with only a read lock on the directory first.
1649        if let Some(looked_up) = Self::try_update_fast_path(&parent, &name, &remote, self.s3_path.clone())? {
1650            return Ok(looked_up);
1651        }
1652
1653        self.update_slow_path(parent, name, remote)
1654    }
1655
1656    /// Try to update the inode for the given name in the parent directory with only a read lock on
1657    /// the parent.
1658    fn try_update_fast_path(
1659        parent: &Inode,
1660        name: &str,
1661        remote: &Option<RemoteLookup>,
1662        s3_path: Arc<S3Path>,
1663    ) -> Result<Option<LookedUpInode>, InodeError> {
1664        let parent_state = parent.get_inode_state()?;
1665        let inode = match &parent_state.kind_data {
1666            InodeKindData::File { .. } => unreachable!("we know parent is a directory"),
1667            InodeKindData::Directory { children, .. } => children.get(name),
1668        };
1669        match (remote, inode) {
1670            (None, None) => Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err())),
1671            (Some(remote), Some(existing_inode)) => {
1672                let mut existing_state = existing_inode.get_mut_inode_state()?;
1673                if remote.kind == existing_inode.kind()
1674                    && existing_state.write_status == WriteStatus::Remote
1675                    && existing_state.stat.etag == remote.stat.etag
1676                {
1677                    trace!(parent=?existing_inode.parent(), name=?existing_inode.name(), ino=?existing_inode.ino(), "updating inode in place");
1678                    existing_state.stat = remote.stat.clone();
1679                    Ok(Some(LookedUpInode {
1680                        inode: existing_inode.clone(),
1681                        stat: remote.stat.clone(),
1682                        path: s3_path.clone(),
1683                        write_status: existing_state.write_status,
1684                    }))
1685                } else {
1686                    Ok(None)
1687                }
1688            }
1689            _ => Ok(None),
1690        }
1691    }
1692
1693    /// Update or create the inode for the given name in the parent directory with a write lock on
1694    /// the parent. This method still needs to handle the cases handled by [try_update_fast_path]
1695    /// because an intervening writer might have modified the inode we're updating.
1696    fn update_slow_path(
1697        &self,
1698        parent: Inode,
1699        name: ValidName,
1700        remote: Option<RemoteLookup>,
1701    ) -> Result<LookedUpInode, InodeError> {
1702        let mut parent_state = parent.get_mut_inode_state()?;
1703        let inode = match &parent_state.kind_data {
1704            InodeKindData::File { .. } => unreachable!("we know parent is a directory"),
1705            InodeKindData::Directory { children, .. } => children.get(name.as_ref()).cloned(),
1706        };
1707        match (remote, inode) {
1708            (None, None) => Err(InodeError::FileDoesNotExist(name.to_string(), parent.err())),
1709            (None, Some(existing_inode)) => {
1710                let InodeKindData::Directory {
1711                    children,
1712                    writing_children,
1713                    ..
1714                } = &mut parent_state.kind_data
1715                else {
1716                    unreachable!("we know parent is a directory");
1717                };
1718                if writing_children.contains(&existing_inode.ino()) {
1719                    let mut sync = existing_inode.get_mut_inode_state()?;
1720
1721                    let validity = match existing_inode.kind() {
1722                        InodeKind::File => self.config.cache_config.file_ttl,
1723                        InodeKind::Directory => self.config.cache_config.dir_ttl,
1724                    };
1725                    sync.stat.update_validity(validity);
1726                    let stat = sync.stat.clone();
1727                    let write_status = sync.write_status;
1728                    drop(sync);
1729
1730                    Ok(LookedUpInode {
1731                        inode: existing_inode,
1732                        stat,
1733                        path: self.s3_path.clone(),
1734                        write_status,
1735                    })
1736                } else {
1737                    // This existing inode is local-only (because `remote` is None), but is not
1738                    // being written. It must have previously existed but been removed on the remote
1739                    // side.
1740                    children.remove(name.as_ref());
1741                    Err(InodeError::FileDoesNotExist(name.to_string(), parent.err()))
1742                }
1743            }
1744            (Some(remote), None) => {
1745                let write_status = WriteStatus::Remote;
1746                let state = InodeState::new(&remote.stat, remote.kind, write_status);
1747                self.create_inode_locked(&parent, &mut parent_state, name, remote.kind, state, false)
1748                    .map(|inode| LookedUpInode {
1749                        inode,
1750                        stat: remote.stat,
1751                        path: self.s3_path.clone(),
1752                        write_status,
1753                    })
1754            }
1755            (Some(remote), Some(existing_inode)) => {
1756                // We need to reconcile the existing state with the state we just got from the
1757                // remote. Our goal here is for the behavior to be as unsurprising as possible while
1758                // being consistent with our stated semantics about implicit directories and how
1759                // directories shadow files.
1760                let mut existing_state = existing_inode.get_mut_inode_state()?;
1761                let existing_is_remote = existing_state.write_status == WriteStatus::Remote;
1762
1763                // Remote files are always shadowed by existing local files/directories, so do
1764                // nothing and return the existing inode.
1765                if remote.kind == InodeKind::File && !existing_is_remote {
1766                    return Ok(LookedUpInode {
1767                        inode: existing_inode.clone(),
1768                        stat: existing_state.stat.clone(),
1769                        path: self.s3_path.clone(),
1770                        write_status: existing_state.write_status,
1771                    });
1772                }
1773
1774                // Try to update in place if we can. The fast path does this too, but here we can
1775                // also handle the case of a local directory becoming remote, which requires
1776                // updating the parent.
1777                let same_kind = remote.kind == existing_inode.kind();
1778                let same_etag = existing_state.stat.etag == remote.stat.etag;
1779                if same_kind && same_etag && (existing_is_remote || remote.kind == InodeKind::Directory) {
1780                    trace!(parent=?existing_inode.parent(), name=?existing_inode.name(), ino=?existing_inode.ino(), "updating inode in place (slow path)");
1781                    existing_state.stat = remote.stat.clone();
1782                    if remote.kind == InodeKind::Directory && !existing_is_remote {
1783                        trace!(parent=?existing_inode.parent(), name=?existing_inode.name(), ino=?existing_inode.ino(), "local directory has become remote");
1784                        existing_state.write_status = WriteStatus::Remote;
1785                        let InodeKindData::Directory { writing_children, .. } = &mut parent_state.kind_data else {
1786                            unreachable!("we know parent is a directory");
1787                        };
1788                        writing_children.remove(&existing_inode.ino());
1789                    }
1790                    return Ok(LookedUpInode {
1791                        inode: existing_inode.clone(),
1792                        stat: remote.stat,
1793                        path: self.s3_path.clone(),
1794                        write_status: existing_state.write_status,
1795                    });
1796                }
1797
1798                trace!(
1799                    ino=?existing_inode.ino(),
1800                    same_kind,
1801                    same_etag,
1802                    existing_is_remote,
1803                    remote_is_dir = remote.kind == InodeKind::Directory,
1804                    "inode could not be updated in place",
1805                );
1806
1807                // Otherwise, create a fresh inode, possibly merging the existing contents. Note
1808                // that [create_inode_locked] takes care of unlinking the existing inode from its
1809                // parent if necessary.
1810                debug!(
1811                    parent=?existing_inode.parent(),
1812                    name=?existing_inode.name(),
1813                    ino=?existing_inode.ino(),
1814                    "inode needs to be recreated",
1815                );
1816                let write_status = WriteStatus::Remote;
1817                let state = InodeState::new(&remote.stat, remote.kind, write_status);
1818                let new_inode =
1819                    self.create_inode_locked(&parent, &mut parent_state, name, remote.kind, state, false)?;
1820                Ok(LookedUpInode {
1821                    inode: new_inode,
1822                    stat: remote.stat,
1823                    path: self.s3_path.clone(),
1824                    write_status,
1825                })
1826            }
1827        }
1828    }
1829
1830    /// Create a new inode in the parent directory, which is already write-locked.
1831    ///
1832    /// Don't use this directly unless you need to do inode creation without re-acquiring the parent
1833    /// write lock. Prefer [SuperblockInner::update_from_remote] instead.
1834    fn create_inode_locked(
1835        &self,
1836        parent: &Inode,
1837        parent_locked: &mut InodeState,
1838        name: ValidName,
1839        kind: InodeKind,
1840        state: InodeState,
1841        is_new_file: bool,
1842    ) -> Result<Inode, InodeError> {
1843        let key = parent
1844            .valid_key()
1845            .new_child(name, kind)
1846            .map_err(|_| InodeError::NotADirectory(parent.err()))?;
1847        let next_ino = self.next_ino.fetch_add(1, Ordering::SeqCst);
1848        let inode = Inode::new(next_ino, parent.ino(), key, &self.s3_path.prefix, state);
1849        trace!(parent=?inode.parent(), name=?inode.name(), kind=?inode.kind(), new_ino=?inode.ino(), key=?inode.key(), "created new inode");
1850
1851        match &mut parent_locked.kind_data {
1852            InodeKindData::File {} => {
1853                debug_assert!(false, "inodes never change kind");
1854                return Err(InodeError::NotADirectory(parent.err()));
1855            }
1856            InodeKindData::Directory {
1857                children,
1858                writing_children,
1859                ..
1860            } => {
1861                let existing_inode = children.insert(name.as_ref().into(), inode.clone());
1862                if is_new_file {
1863                    writing_children.insert(next_ino);
1864                }
1865                if let Some(existing_inode) = existing_inode {
1866                    writing_children.remove(&existing_inode.ino());
1867                }
1868            }
1869        }
1870
1871        Ok(inode)
1872    }
1873}
1874
1875/// Data from a remote object.
1876#[derive(Debug, Clone)]
1877pub struct RemoteLookup {
1878    kind: InodeKind,
1879    stat: InodeStat,
1880}
1881
1882/// Result of an internal "lookup" for an [Inode], containing a "snapshot" of its internal state.
1883/// `stat` is a copy of the inode's `stat` field that has already had its expiry checked and
1884/// so is guaranteed to be valid until `stat.expiry`.
1885#[derive(Debug, Clone)]
1886pub struct LookedUpInode {
1887    pub inode: Inode,
1888    pub stat: InodeStat,
1889    pub path: Arc<S3Path>,
1890    pub write_status: WriteStatus,
1891}
1892
1893impl LookedUpInode {
1894    /// How much longer this lookup will be valid for
1895    pub fn validity(&self) -> Duration {
1896        self.stat.expiry.remaining_ttl()
1897    }
1898}
1899
1900impl From<LookedUpInode> for InodeInformation {
1901    fn from(val: LookedUpInode) -> Self {
1902        InodeInformation::new(val.inode.ino(), val.stat, val.inode.kind())
1903    }
1904}
1905
1906impl From<LookedUpInode> for Lookup {
1907    fn from(val: LookedUpInode) -> Self {
1908        let location = Some(S3Location::new(val.path.clone(), val.inode.valid_key().clone()));
1909
1910        Lookup::new_from_info_and_loc(val.into(), location)
1911    }
1912}
1913
1914/// Maps inode number to inode and lookup count.
1915///
1916/// Additionally reports metrics when inodes are added or removed.
1917/// It is implemented as a wrapper around `HashMap<InodeNo, (Inode, u64)>`.
1918#[derive(Debug, Default)]
1919struct InodeMap {
1920    map: HashMap<InodeNo, (Inode, u64)>,
1921}
1922
1923impl InodeMap {
1924    fn get_inode(&self, ino: &InodeNo) -> Option<&Inode> {
1925        self.map.get(ino).map(|(node, _lookup_count)| node)
1926    }
1927
1928    fn get_mut(&mut self, ino: &InodeNo) -> Option<&mut (Inode, u64)> {
1929        self.map.get_mut(ino)
1930    }
1931
1932    #[cfg(test)]
1933    fn get_count(&self, ino: &InodeNo) -> Option<u64> {
1934        self.map.get(ino).map(|(_, lookup_count)| *lookup_count)
1935    }
1936
1937    fn insert(&mut self, ino: InodeNo, inode: Inode, lookup_count: u64) -> Option<Inode> {
1938        Self::add_metrics(&inode);
1939        trace!(ino, lookup_count, "inserting inode");
1940        self.map
1941            .insert(ino, (inode, lookup_count))
1942            .inspect(|(inode, _count)| {
1943                Self::remove_metrics(inode);
1944            })
1945            .map(|(node, _lookup_count)| node)
1946    }
1947
1948    fn replace_or_insert(&mut self, ino: InodeNo, new_inode: &Inode) {
1949        self.map
1950            .entry(ino)
1951            .and_modify(|(inode, count)| {
1952                Self::remove_metrics(inode);
1953                Self::add_metrics(new_inode);
1954                trace!(ino = inode.ino(), lookup_count = count, "replaced inode");
1955                *inode = new_inode.clone();
1956            })
1957            .or_insert_with(|| {
1958                trace!(ino, lookup_count = 1, "inserting inode");
1959                Self::add_metrics(new_inode);
1960                (new_inode.clone(), 1)
1961            });
1962    }
1963
1964    /// Increases the lookup count of an inode
1965    fn increase_lookup_count(&mut self, inode: &Inode) {
1966        self.map
1967            .entry(inode.ino())
1968            .and_modify(|(_, count)| {
1969                *count += 1;
1970                trace!(ino = inode.ino(), new_lookup_count = *count, "incremented lookup count");
1971            })
1972            .or_insert_with(|| {
1973                Self::add_metrics(inode);
1974                trace!(ino = inode.ino(), lookup_count = 1, "inserting inode");
1975                (inode.clone(), 1)
1976            });
1977    }
1978
1979    /// Decreases lookup count, removes if is reduced to 0 and returns inode if it has been removed
1980    fn decrease_lookup_count(&mut self, ino: InodeNo, n: u64) -> Result<Option<Inode>, InodeMapError> {
1981        match self.map.get_mut(&ino) {
1982            Some((_, count)) => {
1983                // Decrease lookup count
1984                *count = count.saturating_sub(n);
1985                trace!(ino = ino, new_lookup_count = *count, "decremented lookup count");
1986
1987                if *count == 0 {
1988                    trace!(ino, "removing inode from superblock");
1989                    let (inode, _) = self.map.remove(&ino).unwrap();
1990                    Ok(Some(inode))
1991                } else {
1992                    Ok(None)
1993                }
1994            }
1995            None => Err(InodeMapError::InodeNotFound(ino)),
1996        }
1997    }
1998
1999    fn remove_metrics(inode: &Inode) {
2000        metrics::gauge!("fs.inodes").decrement(1.0);
2001        metrics::gauge!("fs.inode_kinds", "kind" => inode.kind().as_str()).decrement(1.0);
2002    }
2003
2004    fn add_metrics(inode: &Inode) {
2005        metrics::gauge!("fs.inodes").increment(1.0);
2006        metrics::gauge!("fs.inode_kinds", "kind" => inode.kind().as_str()).increment(1.0);
2007    }
2008}
2009
2010#[derive(Debug, Error)]
2011pub enum InodeMapError {
2012    #[error("inode {0} not found in InodeMap")]
2013    InodeNotFound(InodeNo),
2014}
2015
2016#[cfg(test)]
2017mod tests {
2018    use mountpoint_s3_client::{
2019        mock_client::{MockClient, MockObject, Operation},
2020        types::ETag,
2021    };
2022    use std::ffi::OsString;
2023    use std::str::FromStr;
2024    use test_case::test_case;
2025    use time::{Duration, OffsetDateTime};
2026
2027    use crate::fs::{FUSE_ROOT_INODE, TimeToLive, ToErrno};
2028    use crate::metablock::AddDirEntryResult;
2029    use crate::s3::{Bucket, Prefix};
2030
2031    use super::*;
2032
2033    /// Check an Inode's stat matches a series of fields.
2034    macro_rules! assert_inode_stat {
2035        ($lookup:expr, $kind:expr, $datetime:expr, $size:expr) => {
2036            assert_eq!($lookup.kind(), $kind);
2037            assert!($lookup.stat().atime >= $datetime && $lookup.stat().atime < $datetime + Duration::seconds(5));
2038            assert!($lookup.stat().ctime >= $datetime && $lookup.stat().ctime < $datetime + Duration::seconds(5));
2039            assert!($lookup.stat().mtime >= $datetime && $lookup.stat().mtime < $datetime + Duration::seconds(5));
2040            assert_eq!($lookup.stat().size, $size);
2041        };
2042    }
2043
2044    #[test_case(""; "unprefixed")]
2045    #[test_case("test_prefix/"; "prefixed")]
2046    #[tokio::test]
2047    async fn test_lookup(prefix: &str) {
2048        let bucket = Bucket::new("test_bucket").unwrap();
2049        let client = Arc::new(
2050            MockClient::config()
2051                .bucket(bucket.to_string())
2052                .part_size(1024 * 1024)
2053                .build(),
2054        );
2055
2056        let keys = &[
2057            format!("{prefix}dir0/file0.txt"),
2058            format!("{prefix}dir0/sdir0/file0.txt"),
2059            format!("{prefix}dir0/sdir0/file1.txt"),
2060            format!("{prefix}dir0/sdir0/file2.txt"),
2061            format!("{prefix}dir0/sdir1/file0.txt"),
2062            format!("{prefix}dir0/sdir1/file1.txt"),
2063            format!("{prefix}dir1/sdir2/file0.txt"),
2064            format!("{prefix}dir1/sdir2/file1.txt"),
2065            format!("{prefix}dir1/sdir2/file2.txt"),
2066            format!("{prefix}dir1/sdir3/file0.txt"),
2067            format!("{prefix}dir1/sdir3/file1.txt"),
2068        ];
2069
2070        let object_size = 30;
2071        let mut last_modified = OffsetDateTime::UNIX_EPOCH;
2072        for key in keys {
2073            let mut obj = MockObject::constant(0xaa, object_size, ETag::for_tests());
2074            last_modified += Duration::days(1);
2075            obj.set_last_modified(last_modified);
2076            client.add_object(key, obj);
2077        }
2078
2079        let prefix = Prefix::new(prefix).expect("valid prefix");
2080        let ts = OffsetDateTime::now_utc();
2081        let superblock = Superblock::new(
2082            client.clone(),
2083            S3Path::new(bucket.clone(), prefix.clone()),
2084            Default::default(),
2085        );
2086
2087        // Try it twice to test the inode reuse path too
2088        for _ in 0..2 {
2089            let dir0 = superblock
2090                .lookup(FUSE_ROOT_INODE, &OsString::from("dir0"))
2091                .await
2092                .expect("should exist");
2093            assert_inode_stat!(dir0, InodeKind::Directory, ts, 0);
2094            assert_eq!(
2095                dir0.s3_location().expect("should have location").full_key().as_ref(),
2096                format!("{prefix}dir0/")
2097            );
2098
2099            let dir1 = superblock
2100                .lookup(FUSE_ROOT_INODE, &OsString::from("dir1"))
2101                .await
2102                .expect("should exist");
2103            assert_inode_stat!(dir1, InodeKind::Directory, ts, 0);
2104            assert_eq!(
2105                dir1.s3_location().expect("should have location").full_key().as_ref(),
2106                format!("{prefix}dir1/")
2107            );
2108
2109            let sdir0 = superblock
2110                .lookup(dir0.ino(), &OsString::from("sdir0"))
2111                .await
2112                .expect("should exist");
2113            assert_inode_stat!(sdir0, InodeKind::Directory, ts, 0);
2114            assert_eq!(
2115                sdir0.s3_location().expect("should have location").full_key().as_ref(),
2116                format!("{prefix}dir0/sdir0/")
2117            );
2118
2119            let sdir1 = superblock
2120                .lookup(dir0.ino(), &OsString::from("sdir1"))
2121                .await
2122                .expect("should exist");
2123            assert_inode_stat!(sdir1, InodeKind::Directory, ts, 0);
2124            assert_eq!(
2125                sdir1.s3_location().expect("should have location").full_key().as_ref(),
2126                format!("{prefix}dir0/sdir1/")
2127            );
2128
2129            let sdir2 = superblock
2130                .lookup(dir1.ino(), &OsString::from("sdir2"))
2131                .await
2132                .expect("should exist");
2133            assert_inode_stat!(sdir2, InodeKind::Directory, ts, 0);
2134            assert_eq!(
2135                sdir2.s3_location().expect("should have location").full_key().as_ref(),
2136                format!("{prefix}dir1/sdir2/")
2137            );
2138
2139            let sdir3 = superblock
2140                .lookup(dir1.ino(), &OsString::from("sdir3"))
2141                .await
2142                .expect("should exist");
2143            assert_inode_stat!(sdir3, InodeKind::Directory, ts, 0);
2144            assert_eq!(
2145                sdir3.s3_location().expect("should have location").full_key().as_ref(),
2146                format!("{prefix}dir1/sdir3/")
2147            );
2148
2149            for (dir, sdir, ino, n) in &[
2150                (0, 0, sdir0.ino(), 3),
2151                (0, 1, sdir1.ino(), 2),
2152                (1, 2, sdir2.ino(), 3),
2153                (1, 3, sdir3.ino(), 2),
2154            ] {
2155                for i in 0..*n {
2156                    let file = superblock
2157                        .lookup(*ino, &OsString::from(format!("file{i}.txt")))
2158                        .await
2159                        .expect("inode should exist");
2160                    // Grab last modified time according to mock S3
2161                    let full_key = file.s3_location().expect("should have location").full_key();
2162                    let modified_time = client
2163                        .head_object(&bucket, full_key.as_ref(), &HeadObjectParams::new())
2164                        .await
2165                        .expect("object should exist")
2166                        .last_modified;
2167                    assert_inode_stat!(file, InodeKind::File, modified_time, object_size);
2168                    assert_eq!(full_key.as_ref(), format!("{prefix}dir{dir}/sdir{sdir}/file{i}.txt"));
2169                }
2170            }
2171        }
2172    }
2173
2174    #[test_case(true; "cached")]
2175    #[test_case(false; "not cached")]
2176    #[tokio::test]
2177    async fn test_lookup_with_caching(cached: bool) {
2178        let bucket = Bucket::new("test_bucket").unwrap();
2179        let prefix = "prefix/";
2180        let client = Arc::new(
2181            MockClient::config()
2182                .bucket(bucket.to_string())
2183                .part_size(1024 * 1024)
2184                .build(),
2185        );
2186
2187        let keys = &[
2188            format!("{prefix}file0.txt"),
2189            format!("{prefix}sdir0/file0.txt"),
2190            format!("{prefix}sdir0/file1.txt"),
2191        ];
2192
2193        let object_size = 30;
2194        let mut last_modified = OffsetDateTime::UNIX_EPOCH;
2195        for key in keys {
2196            let mut obj = MockObject::constant(0xaa, object_size, ETag::for_tests());
2197            last_modified += Duration::days(1);
2198            obj.set_last_modified(last_modified);
2199            client.add_object(key, obj);
2200        }
2201
2202        let prefix = Prefix::new(prefix).expect("valid prefix");
2203        let ttl = if cached {
2204            std::time::Duration::from_secs(60 * 60 * 24 * 7) // 7 days should be enough
2205        } else {
2206            std::time::Duration::ZERO
2207        };
2208        let superblock = Superblock::new(
2209            client.clone(),
2210            S3Path::new(bucket, prefix.clone()),
2211            SuperblockConfig {
2212                cache_config: CacheConfig::new(TimeToLive::Duration(ttl)),
2213                s3_personality: S3Personality::Standard,
2214            },
2215        );
2216
2217        let entries = ["file0.txt", "sdir0"];
2218        for entry in entries {
2219            _ = superblock
2220                .lookup(FUSE_ROOT_INODE, entry.as_ref())
2221                .await
2222                .expect("should exist");
2223        }
2224
2225        for key in keys {
2226            client.remove_object(key);
2227        }
2228
2229        for entry in entries {
2230            let lookup = superblock.lookup(FUSE_ROOT_INODE, entry.as_ref()).await;
2231            if cached {
2232                lookup.expect("inode should still be served from cache");
2233            } else {
2234                lookup.expect_err("entry should have expired, and not be found in S3");
2235            }
2236        }
2237    }
2238
2239    #[test_case(true; "cached")]
2240    #[test_case(false; "not cached")]
2241    #[tokio::test]
2242    async fn test_negative_lookup_with_caching(cached: bool) {
2243        let bucket = Bucket::new("test_bucket").unwrap();
2244        let prefix = "prefix/";
2245        let client = Arc::new(MockClient::config().bucket(bucket.to_string()).part_size(32).build());
2246
2247        let prefix = Prefix::new(prefix).expect("valid prefix");
2248        let ttl = if cached {
2249            std::time::Duration::from_secs(60 * 60 * 24 * 7) // 7 days should be enough
2250        } else {
2251            std::time::Duration::ZERO
2252        };
2253        let superblock = Superblock::new(
2254            client.clone(),
2255            S3Path::new(bucket, prefix.clone()),
2256            SuperblockConfig {
2257                cache_config: CacheConfig::new(TimeToLive::Duration(ttl)),
2258                s3_personality: S3Personality::Standard,
2259            },
2260        );
2261
2262        let entries = ["file0.txt", "sdir0"];
2263        for entry in entries {
2264            _ = superblock
2265                .lookup(FUSE_ROOT_INODE, entry.as_ref())
2266                .await
2267                .expect_err("should not exist");
2268        }
2269
2270        let keys = &[
2271            format!("{prefix}file0.txt"),
2272            format!("{prefix}sdir0/file0.txt"),
2273            format!("{prefix}sdir0/file1.txt"),
2274        ];
2275
2276        let object_size = 30;
2277        let mut last_modified = OffsetDateTime::UNIX_EPOCH;
2278        for key in keys {
2279            let mut obj = MockObject::constant(0xaa, object_size, ETag::for_tests());
2280            last_modified += Duration::days(1);
2281            obj.set_last_modified(last_modified);
2282            client.add_object(key, obj);
2283        }
2284
2285        for entry in entries {
2286            let lookup = superblock.lookup(FUSE_ROOT_INODE, entry.as_ref()).await;
2287            if cached {
2288                lookup.expect_err("negative entry should still be valid in the cache, so the new key should not have been looked up in S3");
2289            } else {
2290                lookup.expect("new object should have been looked up in S3");
2291            }
2292        }
2293    }
2294
2295    #[tokio::test]
2296    async fn test_getattr_with_inode_local_invalid_stat_force_revalidate() {
2297        let (superblock, client) = setup_test_superblock();
2298        let head_object_counter = client.new_counter(Operation::HeadObject);
2299        let inode = create_test_inode(&superblock, WriteStatus::LocalUnopened, true, None, true).unwrap();
2300
2301        let result = superblock.getattr_with_inode(inode.ino(), true).await;
2302        assert!(result.is_ok());
2303        let looked_up = result.unwrap();
2304        assert_eq!(looked_up.write_status, WriteStatus::LocalUnopened);
2305        assert_eq!(looked_up.inode.ino(), inode.ino());
2306        assert_eq!(looked_up.stat.etag, None);
2307        assert_eq!(head_object_counter.count(), 0);
2308    }
2309
2310    #[tokio::test]
2311    async fn test_getattr_with_inode_remote_valid_stat_no_force_revalidate() {
2312        let (superblock, client) = setup_test_superblock();
2313        let test_etag = ETag::for_tests();
2314        let head_object_counter = client.new_counter(Operation::HeadObject);
2315        let inode = create_test_inode(&superblock, WriteStatus::Remote, false, Some(test_etag.clone()), false).unwrap();
2316
2317        let result = superblock.getattr_with_inode(inode.ino(), false).await;
2318        assert!(result.is_ok());
2319        let looked_up = result.unwrap();
2320        assert_eq!(looked_up.write_status, WriteStatus::Remote);
2321        assert_eq!(looked_up.inode.ino(), inode.ino());
2322        assert_eq!(looked_up.stat.etag, Some(test_etag.into_inner().into_boxed_str()));
2323        assert_eq!(head_object_counter.count(), 0);
2324    }
2325
2326    #[tokio::test]
2327    async fn test_getattr_with_inode_remote_force_revalidate() {
2328        let (superblock, client) = setup_test_superblock();
2329        let test_etag = ETag::for_tests();
2330        let head_object_counter = client.new_counter(Operation::HeadObject);
2331        client.add_object("prefix/test inode", MockObject::constant(0xaa, 1024, test_etag.clone()));
2332        let inode = create_test_inode(&superblock, WriteStatus::Remote, false, Some(test_etag.clone()), false).unwrap();
2333
2334        let result = superblock.getattr_with_inode(inode.ino(), true).await;
2335        assert!(result.is_ok());
2336        let looked_up = result.unwrap();
2337        assert_eq!(looked_up.write_status, WriteStatus::Remote);
2338        assert_eq!(looked_up.inode.ino(), inode.ino());
2339        assert_eq!(looked_up.stat.etag, Some(test_etag.into_inner().into_boxed_str()));
2340        assert!(head_object_counter.count() > 0);
2341    }
2342
2343    #[tokio::test]
2344    async fn test_getattr_with_stale_inode_remote_no_force_revalidate() {
2345        let (superblock, client) = setup_test_superblock();
2346        let head_object_counter = client.new_counter(Operation::HeadObject);
2347        // Create the object with a different e-tag than which the inode is created with
2348        client.add_object("prefix/test inode", MockObject::constant(0xaa, 1024, ETag::for_tests()));
2349        let local_etag = ETag::from_str("Stale local etag").unwrap();
2350        let inode = create_test_inode(&superblock, WriteStatus::Remote, false, Some(local_etag), true).unwrap();
2351
2352        let result = superblock.getattr_with_inode(inode.ino(), false).await;
2353        if let Err(InodeError::StaleInode {
2354            remote_key,
2355            old_inode,
2356            new_inode,
2357        }) = result
2358        {
2359            assert_eq!(remote_key, "prefix/test inode");
2360            assert_ne!(old_inode.ino, new_inode.ino);
2361            assert!(head_object_counter.count() > 0);
2362        } else {
2363            panic!("Expected StaleInode error");
2364        }
2365    }
2366
2367    fn setup_test_superblock() -> (Superblock<Arc<MockClient>>, Arc<MockClient>) {
2368        let bucket = Bucket::new("test_bucket").unwrap();
2369        let prefix = "prefix/";
2370        let client = Arc::new(MockClient::config().bucket(bucket.to_string()).part_size(32).build());
2371        let prefix = Prefix::new(prefix).expect("valid prefix");
2372        let superblock = Superblock::new(
2373            client.clone(),
2374            S3Path::new(bucket, prefix.clone()),
2375            SuperblockConfig {
2376                cache_config: CacheConfig::new(TimeToLive::Duration(std::time::Duration::from_secs(24 * 60 * 60))),
2377                s3_personality: S3Personality::Standard,
2378            },
2379        );
2380        (superblock, client)
2381    }
2382
2383    fn create_test_inode(
2384        superblock: &Superblock<Arc<MockClient>>,
2385        write_status: WriteStatus,
2386        is_new_file: bool,
2387        etag: Option<ETag>,
2388        invalidate_stat: bool,
2389    ) -> Result<Inode, InodeError> {
2390        let parent_inode = superblock.inner.get(FUSE_ROOT_INODE).unwrap();
2391        let etag = etag.map(|etag| etag.into_inner().into_boxed_str());
2392
2393        let mut stat = InodeStat::for_file(
2394            1024,
2395            OffsetDateTime::now_utc(),
2396            etag,
2397            None,
2398            None,
2399            std::time::Duration::from_secs(24 * 60 * 60),
2400        );
2401        if invalidate_stat {
2402            // For testing the expired stat cases
2403            stat.update_validity(std::time::Duration::from_secs(0));
2404        }
2405        let state = InodeState::new(&stat, InodeKind::File, write_status);
2406
2407        let mut parent_state = parent_inode.get_mut_inode_state()?;
2408
2409        let inode = superblock.inner.create_inode_locked(
2410            &parent_inode,
2411            &mut parent_state,
2412            ValidName::parse_str("test inode")?,
2413            InodeKind::File,
2414            state,
2415            is_new_file,
2416        )?;
2417        // Manually add the inode to the Superblock's inode map, done by lookup/create in reality
2418        superblock.inner.remember(&inode);
2419        Ok(inode)
2420    }
2421
2422    #[test_case(""; "unprefixed")]
2423    #[test_case("test_prefix/"; "prefixed")]
2424    #[tokio::test]
2425    async fn test_readdir(prefix: &str) {
2426        let bucket = Bucket::new("test_bucket").unwrap();
2427        let client = Arc::new(
2428            MockClient::config()
2429                .bucket(bucket.to_string())
2430                .part_size(1024 * 1024)
2431                .build(),
2432        );
2433
2434        let keys = &[
2435            format!("{prefix}dir0/file0.txt"),
2436            format!("{prefix}dir0/sdir0/file0.txt"),
2437            format!("{prefix}dir0/sdir0/file1.txt"),
2438            format!("{prefix}dir0/sdir0/file2.txt"),
2439            format!("{prefix}dir0/sdir1/file0.txt"),
2440            format!("{prefix}dir0/sdir1/file1.txt"),
2441            format!("{prefix}dir1/sdir2/file0.txt"),
2442            format!("{prefix}dir1/sdir2/file1.txt"),
2443            format!("{prefix}dir1/sdir2/file2.txt"),
2444            format!("{prefix}dir1/sdir3/file0.txt"),
2445            format!("{prefix}dir1/sdir3/file1.txt"),
2446        ];
2447
2448        let last_modified = OffsetDateTime::UNIX_EPOCH + Duration::days(30);
2449        for key in keys {
2450            let mut obj = MockObject::constant(0xaa, 30, ETag::for_tests());
2451            obj.set_last_modified(last_modified);
2452            client.add_object(key, obj);
2453        }
2454
2455        let prefix = Prefix::new(prefix).expect("valid prefix");
2456        let ts = OffsetDateTime::now_utc();
2457        let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2458
2459        // Try it all twice to test inode reuse
2460        for _ in 0..2 {
2461            // Step 1: Read root directory
2462            let root_entries = collect_dir_entries_with_info(&superblock, FUSE_ROOT_INODE, true, 100).await;
2463
2464            assert_eq!(root_entries.len(), 2);
2465            assert_eq!(
2466                root_entries.iter().map(|(_, name)| name).collect::<Vec<_>>(),
2467                &["dir0", "dir1"]
2468            );
2469
2470            let (dir0_entry, _) = &root_entries[0];
2471            let (dir1_entry, _) = &root_entries[1];
2472
2473            // Check stats for root directory entries
2474            assert_eq!(dir0_entry.kind(), InodeKind::Directory);
2475            assert_eq!(dir0_entry.stat().mtime.date(), ts.date());
2476            assert_eq!(dir0_entry.stat().size, 0);
2477
2478            assert_eq!(dir1_entry.kind(), InodeKind::Directory);
2479            assert_eq!(dir1_entry.stat().mtime.date(), ts.date());
2480            assert_eq!(dir1_entry.stat().size, 0);
2481
2482            // Step 2: Read dir0 directory
2483            let dir0_ino = dir0_entry.ino();
2484            let dir0_entries = collect_dir_entries_with_info(&superblock, dir0_ino, true, 100).await;
2485
2486            assert_eq!(dir0_entries.len(), 3);
2487            assert_eq!(
2488                dir0_entries.iter().map(|(_, name)| name).collect::<Vec<_>>(),
2489                &["file0.txt", "sdir0", "sdir1"]
2490            );
2491
2492            let (file0_entry, _) = &dir0_entries[0];
2493            let (sdir0_entry, _) = &dir0_entries[1];
2494            let (sdir1_entry, _) = &dir0_entries[2];
2495
2496            // Check stats for dir0 entries
2497            assert_eq!(file0_entry.kind(), InodeKind::File);
2498            assert_eq!(file0_entry.stat().mtime.date(), last_modified.date());
2499            assert_eq!(file0_entry.stat().size, 30);
2500
2501            assert_eq!(sdir0_entry.kind(), InodeKind::Directory);
2502            assert_eq!(sdir0_entry.stat().mtime.date(), ts.date());
2503            assert_eq!(sdir0_entry.stat().size, 0);
2504
2505            assert_eq!(sdir1_entry.kind(), InodeKind::Directory);
2506            assert_eq!(sdir1_entry.stat().mtime.date(), ts.date());
2507            assert_eq!(sdir1_entry.stat().size, 0);
2508
2509            // Step 3: Read sdir0 directory
2510            let sdir0_ino = sdir0_entry.ino();
2511            let sdir0_entries = collect_dir_entries_with_info(&superblock, sdir0_ino, true, 100).await;
2512
2513            assert_eq!(sdir0_entries.len(), 3);
2514            assert_eq!(
2515                sdir0_entries.iter().map(|(_, name)| name).collect::<Vec<_>>(),
2516                &["file0.txt", "file1.txt", "file2.txt"]
2517            );
2518
2519            // Check stats for all files in sdir0
2520            for (entry, _) in &sdir0_entries {
2521                assert_eq!(entry.kind(), InodeKind::File);
2522                assert_eq!(entry.stat().mtime.date(), last_modified.date());
2523                assert_eq!(entry.stat().size, 30);
2524            }
2525        }
2526    }
2527
2528    /// Helper function to collect directory entries with their inode information
2529    ///
2530    /// This function skips the "." and ".." entries, and returns both the
2531    /// InodeInformation and name for each entry.
2532    async fn collect_dir_entries_with_info(
2533        superblock: &Superblock<Arc<MockClient>>,
2534        dir_ino: InodeNo,
2535        is_readdir_plus: bool,
2536        page_size: usize,
2537    ) -> Vec<(InodeInformation, OsString)> {
2538        let fh = superblock
2539            .new_readdir_handle_with_pagesize(dir_ino, page_size)
2540            .await
2541            .unwrap();
2542        let mut entries = Vec::new();
2543
2544        superblock
2545            .readdir(
2546                dir_ino,
2547                fh,
2548                0,
2549                is_readdir_plus,
2550                Box::new(|entry, name, _offset, _generation| {
2551                    // Skip . and .. entries
2552                    if name != OsStr::new(".") && name != OsStr::new("..") {
2553                        entries.push((entry, name.to_owned()));
2554                    }
2555                    AddDirEntryResult::EntryAdded // Continue collecting entries
2556                }),
2557            )
2558            .await
2559            .unwrap();
2560
2561        entries
2562    }
2563
2564    // Helper function to collect directory entries
2565    async fn collect_dir_entries(
2566        superblock: &Superblock<Arc<MockClient>>,
2567        dir_ino: InodeNo,
2568        is_readdir_plus: bool,
2569        page_size: usize,
2570    ) -> Vec<OsString> {
2571        collect_dir_entries_with_info(superblock, dir_ino, is_readdir_plus, page_size)
2572            .await
2573            .into_iter()
2574            .map(|(_, name)| name)
2575            .collect()
2576    }
2577
2578    #[test_case(""; "unprefixed")]
2579    #[test_case("test_prefix/"; "prefixed")]
2580    #[tokio::test]
2581    async fn test_readdir_no_remote_keys(prefix: &str) {
2582        let bucket = Bucket::new("test_bucket").unwrap();
2583        let client = Arc::new(MockClient::config().bucket(bucket.to_string()).part_size(32).build());
2584
2585        let prefix = Prefix::new(prefix).expect("valid prefix");
2586        let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2587
2588        let mut expected_list: Vec<OsString> = Vec::new();
2589
2590        // Create local keys
2591        for i in 0..5 {
2592            let filename = format!("file{i}.txt");
2593            let new_inode = superblock
2594                .create(FUSE_ROOT_INODE, filename.as_ref(), InodeKind::File)
2595                .await
2596                .unwrap();
2597            superblock
2598                .open_handle(new_inode.ino(), 0, &Default::default(), OpenFlags::O_WRONLY)
2599                .await
2600                .expect("should be able to start writing");
2601            expected_list.push(filename.into());
2602        }
2603
2604        // Try it all twice to test inode reuse
2605        for _ in 0..2 {
2606            let entries = collect_dir_entries(&superblock, FUSE_ROOT_INODE, false, 2).await;
2607            assert_eq!(entries, expected_list);
2608        }
2609    }
2610
2611    #[test_case(""; "unprefixed")]
2612    #[test_case("test_prefix/"; "prefixed")]
2613    #[tokio::test]
2614    async fn test_readdir_local_keys_after_remote_keys(prefix: &str) {
2615        let bucket = Bucket::new("test_bucket").unwrap();
2616        let client = Arc::new(
2617            MockClient::config()
2618                .bucket(bucket.to_string())
2619                .part_size(1024 * 1024)
2620                .build(),
2621        );
2622
2623        let prefix = Prefix::new(prefix).expect("valid prefix");
2624        let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2625
2626        let mut expected_list: Vec<OsString> = Vec::new();
2627
2628        let remote_filenames = ["file0.txt", "file1.txt", "file2.txt"];
2629
2630        let last_modified = OffsetDateTime::UNIX_EPOCH + Duration::days(30);
2631        for filename in remote_filenames {
2632            let mut obj = MockObject::constant(0xaa, 30, ETag::for_tests());
2633            obj.set_last_modified(last_modified);
2634            let key = format!("{prefix}{filename}");
2635            client.add_object(&key, obj);
2636            expected_list.push(filename.to_owned().into());
2637        }
2638
2639        // Create local keys
2640        for i in 0..5 {
2641            let filename = format!("newfile{i}.txt");
2642            let new_inode = superblock
2643                .create(FUSE_ROOT_INODE, filename.as_ref(), InodeKind::File)
2644                .await
2645                .unwrap();
2646            superblock
2647                .open_handle(new_inode.ino(), 0, &Default::default(), OpenFlags::O_WRONLY)
2648                .await
2649                .expect("should be able to start writing");
2650            expected_list.push(filename.to_owned().into());
2651        }
2652
2653        // Try it all twice to test inode reuse
2654        for _ in 0..2 {
2655            let entries = collect_dir_entries(&superblock, FUSE_ROOT_INODE, true, 10).await;
2656            assert_eq!(expected_list, entries);
2657        }
2658    }
2659
2660    #[test_case(""; "unprefixed")]
2661    #[test_case("test_prefix/"; "prefixed")]
2662    #[tokio::test]
2663    async fn test_create_local_dir(prefix: &str) {
2664        let bucket = Bucket::new("test_bucket").unwrap();
2665        let client = Arc::new(
2666            MockClient::config()
2667                .bucket(bucket.to_string())
2668                .part_size(1024 * 1024)
2669                .build(),
2670        );
2671        let prefix = Prefix::new(prefix).expect("valid prefix");
2672        let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2673
2674        // Create local directory
2675        let dirname = "local_dir";
2676        superblock
2677            .create(FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory)
2678            .await
2679            .unwrap();
2680
2681        let lookedup = superblock
2682            .lookup(FUSE_ROOT_INODE, dirname.as_ref())
2683            .await
2684            .expect("lookup should succeed on local dirs");
2685        assert_eq!(
2686            superblock.get_write_status(lookedup.ino()),
2687            Some(WriteStatus::LocalUnopened)
2688        );
2689
2690        let entries = collect_dir_entries(&superblock, FUSE_ROOT_INODE, true, 2).await;
2691        assert_eq!(entries.iter().collect::<Vec<_>>(), vec![dirname]);
2692
2693        // Check that local directories are not present in the client
2694        let prefix = format!("{prefix}{dirname}");
2695        assert!(!client.contains_prefix(&prefix));
2696    }
2697
2698    #[test_case(""; "unprefixed")]
2699    #[test_case("test_prefix/"; "prefixed")]
2700    #[tokio::test]
2701    async fn test_readdir_lookup_after_rmdir(prefix: &str) {
2702        let bucket = Bucket::new("test_bucket").unwrap();
2703        let client = Arc::new(
2704            MockClient::config()
2705                .bucket(bucket.to_string())
2706                .part_size(1024 * 1024)
2707                .build(),
2708        );
2709        let prefix = Prefix::new(prefix).expect("valid prefix");
2710        let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2711
2712        // Create local directory
2713        let dirname = "local_dir";
2714        let lookedup = superblock
2715            .create(FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory)
2716            .await
2717            .expect("Should be able to create directory");
2718
2719        let fh = superblock
2720            .new_readdir_handle_with_pagesize(lookedup.ino(), 1024)
2721            .await
2722            .expect("should get new readhhandle");
2723
2724        superblock
2725            .rmdir(FUSE_ROOT_INODE, dirname.as_ref())
2726            .await
2727            .expect("rmdir on empty local directory should succeed");
2728
2729        superblock
2730            .lookup(FUSE_ROOT_INODE, dirname.as_ref())
2731            .await
2732            .expect_err("should not do lookup on removed directory");
2733
2734        superblock
2735            .readdir(
2736                lookedup.ino(),
2737                fh,
2738                0,
2739                true,
2740                Box::new(|_entry, _name, _offset, _generation| {
2741                    AddDirEntryResult::EntryAdded // Continue collecting entries
2742                }),
2743            )
2744            .await
2745            .expect_err("should not do readdir on removed directory");
2746
2747        superblock
2748            .getattr(lookedup.ino(), false)
2749            .await
2750            .expect_err("should not do getattr on removed directory");
2751    }
2752
2753    #[test_case("", true; "unprefixed ordered")]
2754    #[test_case("test_prefix/", true; "prefixed ordered")]
2755    #[test_case("", false; "unprefixed unordered")]
2756    #[test_case("test_prefix/", false; "prefixed unordered")]
2757    #[tokio::test]
2758    async fn test_readdir_unordered(prefix: &str, ordered: bool) {
2759        let bucket = Bucket::new("test_bucket").unwrap();
2760        let mut config = MockClient::config().bucket(bucket.to_string()).part_size(1024 * 1024);
2761        if !ordered {
2762            config = config.unordered_list_seed(Some(123456));
2763        }
2764        let client = Arc::new(config.build());
2765
2766        let prefix = Prefix::new(prefix).expect("valid prefix");
2767        let s3_personality = if ordered {
2768            S3Personality::Standard
2769        } else {
2770            S3Personality::ExpressOneZone
2771        };
2772        let superblock = Superblock::new(
2773            client.clone(),
2774            S3Path::new(bucket, prefix.clone()),
2775            SuperblockConfig {
2776                s3_personality,
2777                ..Default::default()
2778            },
2779        );
2780
2781        // Here are the remote/local cases we want to test:
2782        // - `dir1`: directory in the remote bucket, no conflicting local node
2783        // - `dir2`: directory in the remote bucket, conflicting local directory
2784        // - `dir3`: directory in the remote bucket, conflicting local file
2785        // - `dir4`: directory in the remote bucket, conflicting remote file
2786        // - `dm1`: directory marker in the remote bucket, no conflicting local node
2787        // - `dm2`: directory marker in the remote bucket, conflicting local directory
2788        // - `dm3`: directory marker in the remote bucket, conflicting local file
2789        // - `dm4`: directory marker in the remote bucket, conflicting remote file
2790        // - `file1`: file in the remote bucket, no conflicting local node
2791        // - `file2`: file in the remote bucket, conflicting local directory
2792        // - `file3`: file in the remote bucket, conflicting local file
2793        // Then to check ordering:
2794        // - `aaa` and `zzz`: local files only
2795
2796        // Open some local keys
2797        for filename in ["aaa", "dir3", "dm3", "file3", "zzz"] {
2798            let new_inode = superblock
2799                .create(FUSE_ROOT_INODE, filename.as_ref(), InodeKind::File)
2800                .await
2801                .unwrap();
2802            superblock
2803                .open_handle(new_inode.ino(), 0, &Default::default(), OpenFlags::O_WRONLY)
2804                .await
2805                .expect("should be able to start writing");
2806        }
2807
2808        // Create some local directories
2809        for dirname in ["dir2", "dm2", "file2"] {
2810            let _new_inode = superblock
2811                .create(FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory)
2812                .await
2813                .unwrap();
2814        }
2815
2816        // Now create remote state so that we can test shadowing
2817        let keys = &[
2818            format!("{prefix}dir1/file.txt"),
2819            format!("{prefix}dir2/file.txt"),
2820            format!("{prefix}dir3/file.txt"),
2821            format!("{prefix}dir4/file.txt"),
2822            format!("{prefix}dir4"),
2823            format!("{prefix}dm1/"),
2824            format!("{prefix}dm2/"),
2825            format!("{prefix}dm3/"),
2826            format!("{prefix}dm4/"),
2827            format!("{prefix}dm4"),
2828            format!("{prefix}file1"),
2829            format!("{prefix}file2"),
2830            format!("{prefix}file3"),
2831        ];
2832
2833        let last_modified = OffsetDateTime::UNIX_EPOCH + Duration::days(30);
2834        for key in keys {
2835            let mut obj = MockObject::constant(0xaa, 30, ETag::for_tests());
2836            obj.set_last_modified(last_modified);
2837            client.add_object(key, obj);
2838        }
2839
2840        // And now walk the root directory to check it contains the right stuff
2841        let readdir_entries = collect_dir_entries_with_info(&superblock, FUSE_ROOT_INODE, true, 20).await;
2842        let entries: Vec<(&str, InodeKind)> = readdir_entries
2843            .iter()
2844            .map(|(info, name)| (name.to_str().unwrap(), info.kind()))
2845            .collect();
2846
2847        let expected_entries = [
2848            ("aaa", InodeKind::File),
2849            ("dir1", InodeKind::Directory),
2850            ("dir2", InodeKind::Directory),
2851            ("dir3", InodeKind::Directory),
2852            ("dir4", InodeKind::Directory),
2853            ("dm1", InodeKind::Directory),
2854            ("dm2", InodeKind::Directory),
2855            ("dm3", InodeKind::Directory),
2856            ("dm4", InodeKind::Directory),
2857            ("file1", InodeKind::File),
2858            ("file2", InodeKind::Directory), // local directory shadows remote file
2859            ("file3", InodeKind::File),
2860            ("zzz", InodeKind::File),
2861        ];
2862
2863        for entry in expected_entries {
2864            assert!(entries.contains(&entry), "missing entry {entry:?}");
2865        }
2866
2867        if ordered {
2868            assert_eq!(entries, expected_entries);
2869        }
2870    }
2871
2872    #[test_case(""; "unprefixed")]
2873    #[test_case("test_prefix/"; "prefixed")]
2874    #[tokio::test]
2875    async fn test_rmdir_delete_status(prefix: &str) {
2876        let bucket = Bucket::new("test_bucket").unwrap();
2877        let client = Arc::new(
2878            MockClient::config()
2879                .bucket(bucket.to_string())
2880                .part_size(1024 * 1024)
2881                .build(),
2882        );
2883        let prefix = Prefix::new(prefix).expect("valid prefix");
2884        let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2885
2886        // Create local directory
2887        let dirname = "local_dir";
2888        let lookedup = superblock
2889            .create(FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory)
2890            .await
2891            .expect("Should be able to create directory");
2892
2893        superblock
2894            .rmdir(FUSE_ROOT_INODE, dirname.as_ref())
2895            .await
2896            .expect("rmdir on empty local directory should succeed");
2897
2898        let parent = superblock.inner.get(FUSE_ROOT_INODE).unwrap();
2899        let parent_state = parent
2900            .get_inode_state()
2901            .expect("should get parent state with read lock");
2902        match &parent_state.kind_data {
2903            InodeKindData::File {} => unreachable!("Parent can only be a Directory"),
2904            InodeKindData::Directory {
2905                children,
2906                writing_children,
2907                ..
2908            } => {
2909                assert!(writing_children.get(&lookedup.ino()).is_none());
2910                assert!(
2911                    children
2912                        .get(lookedup.clone().try_into_s3_location().unwrap().name())
2913                        .is_none()
2914                );
2915            }
2916        }
2917
2918        assert_eq!(
2919            superblock.get_write_status(lookedup.ino()),
2920            None,
2921            "Should not be able to get llokup count of deleted Inode"
2922        );
2923    }
2924
2925    #[test_case(""; "unprefixed")]
2926    #[test_case("test_prefix/"; "prefixed")]
2927    #[tokio::test]
2928    async fn test_parent_readdir_after_rmdir(prefix: &str) {
2929        let bucket = Bucket::new("test_bucket").unwrap();
2930        let client = Arc::new(
2931            MockClient::config()
2932                .bucket(bucket.to_string())
2933                .part_size(1024 * 1024)
2934                .build(),
2935        );
2936        let prefix = Prefix::new(prefix).expect("valid prefix");
2937        let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2938
2939        // Create local directory
2940        let dirname = "local_dir";
2941        superblock
2942            .create(FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory)
2943            .await
2944            .expect("Should be able to create directory");
2945
2946        let dirname_to_stay = "staying_local_dir";
2947        superblock
2948            .create(FUSE_ROOT_INODE, dirname_to_stay.as_ref(), InodeKind::Directory)
2949            .await
2950            .expect("Should be able to create directory");
2951
2952        superblock
2953            .rmdir(FUSE_ROOT_INODE, dirname.as_ref())
2954            .await
2955            .expect("rmdir on empty local directory should succeed");
2956
2957        // removed directory should not appear in readdir of parent
2958
2959        let entries = collect_dir_entries(&superblock, FUSE_ROOT_INODE, false, 2).await;
2960        assert_eq!(entries.iter().collect::<Vec<_>>(), &[dirname_to_stay]);
2961    }
2962
2963    #[test_case(""; "unprefixed")]
2964    #[test_case("test_prefix/"; "prefixed")]
2965    #[tokio::test]
2966    async fn test_lookup_after_unlink(prefix: &str) {
2967        let bucket = Bucket::new("test_bucket").unwrap();
2968        let client = Arc::new(
2969            MockClient::config()
2970                .bucket(bucket.to_string())
2971                .part_size(1024 * 1024)
2972                .build(),
2973        );
2974        let prefix = Prefix::new(prefix).expect("valid prefix");
2975        let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
2976
2977        let file_name = "file.txt";
2978        let file_key = format!("{prefix}{file_name}");
2979        client.add_object(file_key.as_ref(), MockObject::constant(0xaa, 30, ETag::for_tests()));
2980        let parent_ino = FUSE_ROOT_INODE;
2981
2982        superblock
2983            .lookup(parent_ino, file_name.as_ref())
2984            .await
2985            .expect("file should exist");
2986
2987        superblock
2988            .unlink(parent_ino, file_name.as_ref())
2989            .await
2990            .expect("file delete should succeed as it exists");
2991
2992        let err: i32 = superblock
2993            .lookup(parent_ino, file_name.as_ref())
2994            .await
2995            .expect_err("lookup should no longer find deleted file")
2996            .to_errno();
2997        assert_eq!(libc::ENOENT, err, "lookup should return no existing entry error");
2998    }
2999
3000    #[tokio::test]
3001    async fn test_finish_writing_convert_parent_local_dirs_to_remote() {
3002        let bucket = Bucket::new("test_bucket").unwrap();
3003        let client = Arc::new(
3004            MockClient::config()
3005                .bucket(bucket.to_string())
3006                .part_size(1024 * 1024)
3007                .build(),
3008        );
3009        let superblock = Superblock::new(
3010            client.clone(),
3011            S3Path::new(bucket, Default::default()),
3012            Default::default(),
3013        );
3014
3015        let nested_dirs = (0..5).map(|i| format!("level{i}")).collect::<Vec<_>>();
3016        let leaf_dir_ino = {
3017            let mut parent_dir_ino = FUSE_ROOT_INODE;
3018            for dirname in &nested_dirs {
3019                let dir_lookedup = superblock
3020                    .create(parent_dir_ino, dirname.as_ref(), InodeKind::Directory)
3021                    .await
3022                    .unwrap();
3023
3024                assert_eq!(
3025                    superblock
3026                        .get_write_status(dir_lookedup.ino())
3027                        .expect("should get write status with read lock"),
3028                    WriteStatus::LocalUnopened
3029                );
3030
3031                parent_dir_ino = dir_lookedup.ino();
3032            }
3033            parent_dir_ino
3034        };
3035
3036        // Create object under leaf dir
3037        let filename = "newfile.txt";
3038        let new_inode = superblock
3039            .create(leaf_dir_ino, filename.as_ref(), InodeKind::File)
3040            .await
3041            .unwrap();
3042
3043        superblock
3044            .open_handle(new_inode.ino(), 0, &Default::default(), OpenFlags::O_WRONLY)
3045            .await
3046            .expect("should be able to start writing");
3047
3048        // Invoke [finish_writing], without actually adding the
3049        // object to the client
3050        superblock.finish_writing(new_inode.ino(), None, 0).await.unwrap();
3051
3052        // All nested dirs disappear
3053        let dirname = nested_dirs.first().unwrap();
3054        let lookedup = superblock.lookup(FUSE_ROOT_INODE, dirname.as_ref()).await;
3055        assert!(matches!(lookedup, Err(InodeError::FileDoesNotExist(_, _))));
3056    }
3057
3058    #[tokio::test]
3059    async fn test_inode_reuse() {
3060        let bucket = Bucket::new("test_bucket").unwrap();
3061        let client = Arc::new(
3062            MockClient::config()
3063                .bucket(bucket.to_string())
3064                .part_size(1024 * 1024)
3065                .build(),
3066        );
3067        client.add_object("dir1/file1.txt", MockObject::constant(0xaa, 30, ETag::for_tests()));
3068
3069        let superblock = Superblock::new(
3070            client.clone(),
3071            S3Path::new(bucket, Default::default()),
3072            Default::default(),
3073        );
3074
3075        for _ in 0..2 {
3076            let dir1_1 = superblock.lookup(FUSE_ROOT_INODE, "dir1".as_ref()).await.unwrap();
3077            let dir1_2 = superblock.lookup(FUSE_ROOT_INODE, "dir1".as_ref()).await.unwrap();
3078            assert_eq!(dir1_1.ino(), dir1_2.ino());
3079
3080            let file1_1 = superblock.lookup(dir1_1.ino(), "file1.txt".as_ref()).await.unwrap();
3081            let file1_2 = superblock.lookup(dir1_1.ino(), "file1.txt".as_ref()).await.unwrap();
3082            assert_eq!(file1_1.ino(), file1_2.ino());
3083        }
3084    }
3085
3086    #[test_case(""; "no subdirectory")]
3087    #[test_case("subdir/"; "with subdirectory")]
3088    #[tokio::test]
3089    async fn test_lookup_directory_overlap(subdir: &str) {
3090        let bucket = Bucket::new("test_bucket").unwrap();
3091        let client = Arc::new(
3092            MockClient::config()
3093                .bucket(bucket.to_string())
3094                .part_size(1024 * 1024)
3095                .build(),
3096        );
3097        // In this test the `/` delimiter comes back to bite us. `dir-1/` comes before `dir/` in
3098        // lexicographical order (- is ASCII 0x2d, / is ASCII 0x2f), so `dir-1` will be the first
3099        // common prefix when we do ListObjects with prefix = 'dir'. But `dir` comes before `dir-1`
3100        // in lexicographical order, so `dir` will be the first common prefix when we do ListObjects
3101        // with prefix = ''.
3102        client.add_object(
3103            &format!("dir/{subdir}file1.txt"),
3104            MockObject::constant(0xaa, 30, ETag::for_tests()),
3105        );
3106        client.add_object(
3107            &format!("dir-1/{subdir}file1.txt"),
3108            MockObject::constant(0xaa, 30, ETag::for_tests()),
3109        );
3110
3111        let superblock = Superblock::new(
3112            client.clone(),
3113            S3Path::new(bucket, Default::default()),
3114            Default::default(),
3115        );
3116
3117        let entries = collect_dir_entries(&superblock, FUSE_ROOT_INODE, false, 2).await;
3118        assert_eq!(entries, &["dir", "dir-1"]);
3119
3120        let dir = superblock.lookup(FUSE_ROOT_INODE, "dir".as_ref()).await.unwrap();
3121        assert_eq!(
3122            dir.s3_location().expect("should have location").full_key().as_ref(),
3123            "dir/"
3124        );
3125    }
3126
3127    #[tokio::test]
3128    async fn test_invalid_names() {
3129        let bucket = Bucket::new("test_bucket").unwrap();
3130        let client = Arc::new(
3131            MockClient::config()
3132                .bucket(bucket.to_string())
3133                .part_size(1024 * 1024)
3134                .build(),
3135        );
3136
3137        // The only valid key here is "dir1/a", so we should see a directory called "dir1" and a
3138        // file inside it called "a".
3139        client.add_object(
3140            "dir1/",
3141            MockObject::constant(0xaa, 30, ETag::from_str("test_etag_1").unwrap()),
3142        );
3143        client.add_object(
3144            "dir1//",
3145            MockObject::constant(0xaa, 30, ETag::from_str("test_etag_2").unwrap()),
3146        );
3147        client.add_object(
3148            "dir1/a",
3149            MockObject::constant(0xaa, 30, ETag::from_str("test_etag_3").unwrap()),
3150        );
3151        client.add_object(
3152            "dir1/.",
3153            MockObject::constant(0xaa, 30, ETag::from_str("test_etag_4").unwrap()),
3154        );
3155        client.add_object(
3156            "dir1/./a",
3157            MockObject::constant(0xaa, 30, ETag::from_str("test_etag_5").unwrap()),
3158        );
3159
3160        let superblock = Superblock::new(
3161            client.clone(),
3162            S3Path::new(bucket, Default::default()),
3163            Default::default(),
3164        );
3165        let entries = collect_dir_entries_with_info(&superblock, FUSE_ROOT_INODE, true, 2).await;
3166        assert_eq!(entries.iter().map(|(_info, name)| name).collect::<Vec<_>>(), &["dir1"]);
3167
3168        let dir1_ino = entries[0].0.ino();
3169        let entries = collect_dir_entries(&superblock, dir1_ino, false, 2).await;
3170        assert_eq!(entries, &["a"]);
3171
3172        // Neither of these keys should exist in the directory
3173        for key in ["/", "."] {
3174            let lookup = superblock.lookup(dir1_ino, key.as_ref()).await;
3175            assert!(matches!(lookup, Err(InodeError::InvalidFileName(_))));
3176        }
3177    }
3178
3179    #[test_case(""; "unprefixed")]
3180    #[test_case("test_prefix/"; "prefixed")]
3181    #[tokio::test]
3182    async fn test_setattr(prefix: &str) {
3183        let bucket = Bucket::new("test_bucket").unwrap();
3184        let client = Arc::new(
3185            MockClient::config()
3186                .bucket(bucket.to_string())
3187                .part_size(1024 * 1024)
3188                .build(),
3189        );
3190        let prefix = Prefix::new(prefix).expect("valid prefix");
3191        let superblock = Superblock::new(client.clone(), S3Path::new(bucket, prefix.clone()), Default::default());
3192
3193        // Create a new file
3194        let filename = "newfile.txt";
3195        let new_inode = superblock
3196            .create(FUSE_ROOT_INODE, filename.as_ref(), InodeKind::File)
3197            .await
3198            .unwrap();
3199
3200        superblock
3201            .open_handle(new_inode.ino(), 0, &Default::default(), OpenFlags::O_WRONLY)
3202            .await
3203            .expect("should be able to start writing");
3204
3205        let atime = OffsetDateTime::UNIX_EPOCH + Duration::days(90);
3206        let mtime = OffsetDateTime::UNIX_EPOCH + Duration::days(60);
3207
3208        // Call setattr and verify the stat
3209        let lookup = superblock
3210            .setattr(new_inode.ino(), Some(atime), Some(mtime))
3211            .await
3212            .expect("setattr should be successful");
3213        let stat = lookup.stat();
3214        assert_eq!(stat.atime, atime);
3215        assert_eq!(stat.mtime, mtime);
3216
3217        let lookup = superblock
3218            .getattr(new_inode.ino(), false)
3219            .await
3220            .expect("getattr should be successful");
3221        let stat = lookup.stat();
3222        assert_eq!(stat.atime, atime);
3223        assert_eq!(stat.mtime, mtime);
3224
3225        // Invoke [finish_writing] to make the file remote
3226        superblock
3227            .finish_writing(new_inode.ino(), Some(ETag::for_tests()), 0)
3228            .await
3229            .unwrap();
3230
3231        // Should get an error back when calling setattr
3232        let result = superblock.setattr(new_inode.ino(), Some(atime), Some(mtime)).await;
3233        assert!(matches!(result, Err(InodeError::SetAttrNotPermittedOnRemoteInode(_))));
3234    }
3235
3236    #[test]
3237    fn test_inodestat_constructors() {
3238        let ts = OffsetDateTime::UNIX_EPOCH + Duration::days(90);
3239        let file_inodestat = InodeStat::for_file(128, ts, None, None, None, Default::default());
3240        assert_eq!(file_inodestat.size, 128);
3241        assert_eq!(file_inodestat.atime, ts);
3242        assert_eq!(file_inodestat.ctime, ts);
3243        assert_eq!(file_inodestat.mtime, ts);
3244
3245        let ts = OffsetDateTime::UNIX_EPOCH + Duration::days(180);
3246        let file_inodestat = InodeStat::for_directory(ts, Default::default());
3247        assert_eq!(file_inodestat.size, 0);
3248        assert_eq!(file_inodestat.atime, ts);
3249        assert_eq!(file_inodestat.ctime, ts);
3250        assert_eq!(file_inodestat.mtime, ts);
3251    }
3252
3253    #[test]
3254    fn test_rename_cache_positive() {
3255        let cache = RenameCache::new();
3256        assert!(
3257            cache.should_try_rename(),
3258            "Without a registered failure, should try a rename"
3259        );
3260        cache.cache_success();
3261        assert!(cache.should_try_rename(), "After a success, should still try renames");
3262        // Try to cache a failure. This should be rejected now, since a success has been cached.
3263        cache.cache_failure();
3264        assert!(cache.should_try_rename(), "Failure should not change cache state");
3265    }
3266
3267    #[test]
3268    fn test_rename_cache_negative() {
3269        let cache = RenameCache::new();
3270        assert!(
3271            cache.should_try_rename(),
3272            "Without a registered failure, should try a rename"
3273        );
3274        cache.cache_failure();
3275        assert!(!cache.should_try_rename(), "After a failure, should not try renames");
3276        // Try to cache a failure. This should be rejected now, since a success has been cached.
3277        cache.cache_success();
3278        assert!(
3279            !cache.should_try_rename(),
3280            "Success after failure should not modify cache state"
3281        );
3282    }
3283}