use std::collections::VecDeque;
use std::ffi::OsString;
use super::{InodeKindData, LookedUpInode, RemoteLookup, SuperblockInner};
use crate::metablock::{InodeError, InodeKind, InodeNo, InodeStat};
use crate::superblock::ValidName;
use crate::sync::atomic::{AtomicI64, Ordering};
use crate::sync::{AsyncMutex, Mutex};
use mountpoint_s3_client::ObjectClient;
use mountpoint_s3_client::types::RestoreStatus;
use time::OffsetDateTime;
use tracing::{error, trace, warn};
#[derive(Debug)]
pub struct ReaddirHandle {
dir_ino: InodeNo,
parent_ino: InodeNo,
iter: AsyncMutex<ReaddirIter>,
readded: Mutex<Option<LookedUpInode>>,
}
impl ReaddirHandle {
pub(super) fn new<OC: ObjectClient + Send + Sync>(
inner: &SuperblockInner<OC>,
dir_ino: InodeNo,
parent_ino: InodeNo,
full_path: String,
page_size: usize,
) -> Result<Self, InodeError> {
let local_entries = {
let inode = inner.get(dir_ino)?;
let kind_data = &inode.get_inode_state()?.kind_data;
let local_files = match kind_data {
InodeKindData::File { .. } => return Err(InodeError::NotADirectory(inode.err())),
InodeKindData::Directory { writing_children, .. } => writing_children.iter().map(|ino| {
let inode = inner.get(*ino)?;
let locked_inode = inode.get_inode_state()?;
let stat = locked_inode.stat.clone();
let write_status = locked_inode.write_status;
drop(locked_inode);
Ok(ReaddirEntry::LocalInode {
lookup: LookedUpInode {
inode,
stat,
path: inner.s3_path.clone(),
write_status,
},
})
}),
};
match local_files.collect::<Result<Vec<_>, _>>() {
Ok(mut new_results) => {
new_results.sort();
new_results
}
Err(e) => {
error!(error=?e, "readdir failed listing local files");
return Err(e);
}
}
};
let iter = if inner.config.s3_personality.is_list_ordered() {
ReaddirIter::ordered(&inner.s3_path.bucket, &full_path, page_size, local_entries.into())
} else {
ReaddirIter::unordered(&inner.s3_path.bucket, &full_path, page_size, local_entries.into())
};
Ok(Self {
dir_ino,
parent_ino,
iter: AsyncMutex::new(iter),
readded: Default::default(),
})
}
pub(super) async fn next<OC: ObjectClient + Send + Sync>(
&self,
inner: &SuperblockInner<OC>,
) -> Result<Option<LookedUpInode>, InodeError> {
if let Some(readded) = self.readded.lock().unwrap().take() {
return Ok(Some(readded));
}
loop {
let next = {
let mut iter = self.iter.lock().await;
iter.next(&inner.client).await?
};
let Some(next) = next else {
return Ok(None);
};
let Ok(name) = next.name().try_into() else {
warn!("{} has an invalid name and will be unavailable", next.description());
continue;
};
let lookup = self.lookup_from_entry(inner, &next, name)?;
return Ok(Some(lookup));
}
}
pub fn readd(&self, entry: LookedUpInode) {
let old = self.readded.lock().unwrap().replace(entry);
assert!(old.is_none(), "cannot readd more than one entry");
}
pub fn parent(&self) -> InodeNo {
self.parent_ino
}
fn lookup_from_entry<OC: ObjectClient + Send + Sync>(
&self,
inner: &SuperblockInner<OC>,
entry: &ReaddirEntry,
name: ValidName,
) -> Result<LookedUpInode, InodeError> {
let remote_lookup = match entry {
ReaddirEntry::LocalInode { lookup } => {
return Ok(lookup.clone());
}
ReaddirEntry::RemotePrefix { .. } => {
let stat = InodeStat::for_directory(inner.mount_time, inner.config.cache_config.dir_ttl);
RemoteLookup {
stat,
kind: InodeKind::Directory,
}
}
ReaddirEntry::RemoteObject {
size,
last_modified,
etag,
storage_class,
restore_status,
..
} => {
let stat = InodeStat::for_file(
*size as usize,
*last_modified,
Some(etag.as_str().into()),
storage_class.as_deref(),
*restore_status,
inner.config.cache_config.file_ttl,
);
RemoteLookup {
stat,
kind: InodeKind::File,
}
}
};
inner.update_from_remote(self.dir_ino, name, Some(remote_lookup))
}
}
#[derive(Debug, Clone)]
enum ReaddirEntry {
RemotePrefix {
name: String,
},
RemoteObject {
name: String,
full_key: String,
size: u64,
last_modified: OffsetDateTime,
storage_class: Option<String>,
restore_status: Option<RestoreStatus>,
etag: String,
},
LocalInode {
lookup: LookedUpInode,
},
}
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq)]
enum ReaddirEntryKind {
RemotePrefix,
RemoteObject,
LocalInode,
}
impl ReaddirEntry {
fn name(&self) -> &str {
match self {
Self::RemotePrefix { name } => name,
Self::RemoteObject { name, .. } => name,
Self::LocalInode { lookup } => lookup.inode.name(),
}
}
fn kind(&self) -> ReaddirEntryKind {
match self {
Self::RemotePrefix { .. } => ReaddirEntryKind::RemotePrefix,
Self::RemoteObject { .. } => ReaddirEntryKind::RemoteObject,
Self::LocalInode { .. } => ReaddirEntryKind::LocalInode,
}
}
fn description(&self) -> String {
match self {
Self::RemotePrefix { name } => {
format!("directory '{name}'")
}
Self::RemoteObject { name, full_key, .. } => {
format!("file '{name}' (full key {full_key:?})")
}
Self::LocalInode { lookup } => {
let kind = match lookup.inode.kind() {
InodeKind::Directory => "directory",
InodeKind::File => "file",
};
format!("local {} '{}'", kind, lookup.inode.name())
}
}
}
}
impl PartialEq for ReaddirEntry {
fn eq(&self, other: &Self) -> bool {
self.name() == other.name() && self.kind() == other.kind()
}
}
impl Eq for ReaddirEntry {}
impl PartialOrd for ReaddirEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ReaddirEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.name()
.cmp(other.name())
.then_with(|| self.kind().cmp(&other.kind()))
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum ReaddirIter {
Ordered(ordered::ReaddirIter),
Unordered(unordered::ReaddirIter),
}
impl ReaddirIter {
fn ordered(bucket: &str, full_path: &str, page_size: usize, local_entries: VecDeque<ReaddirEntry>) -> Self {
Self::Ordered(ordered::ReaddirIter::new(bucket, full_path, page_size, local_entries))
}
fn unordered(bucket: &str, full_path: &str, page_size: usize, local_entries: VecDeque<ReaddirEntry>) -> Self {
Self::Unordered(unordered::ReaddirIter::new(bucket, full_path, page_size, local_entries))
}
async fn next(&mut self, client: &impl ObjectClient) -> Result<Option<ReaddirEntry>, InodeError> {
match self {
Self::Ordered(iter) => iter.next(client).await,
Self::Unordered(iter) => iter.next(client).await,
}
}
}
#[derive(Debug, PartialEq, Eq)]
enum RemoteIterState {
InProgress(Option<String>),
Finished,
}
#[derive(Debug)]
struct RemoteIter {
entries: VecDeque<ReaddirEntry>,
bucket: String,
full_path: String,
page_size: usize,
state: RemoteIterState,
ordered: bool,
}
impl RemoteIter {
fn new(bucket: &str, full_path: &str, page_size: usize, ordered: bool) -> Self {
Self {
entries: VecDeque::new(),
bucket: bucket.to_owned(),
full_path: full_path.to_owned(),
page_size,
state: RemoteIterState::InProgress(None),
ordered,
}
}
async fn next(&mut self, client: &impl ObjectClient) -> Result<Option<ReaddirEntry>, InodeError> {
if self.entries.is_empty() {
let continuation_token = match &mut self.state {
RemoteIterState::Finished => {
trace!(self=?self as *const _, prefix=?self.full_path, "remote iter finished");
return Ok(None);
}
RemoteIterState::InProgress(token) => token.take(),
};
trace!(self=?self as *const _, prefix=?self.full_path, ?continuation_token, "continuing remote iter");
let result = client
.list_objects(
&self.bucket,
continuation_token.as_deref(),
"/",
self.page_size,
self.full_path.as_str(),
)
.await
.map_err(|e| InodeError::client_error(e, "ListObjectsV2 failed", &self.bucket, &self.full_path))?;
self.state = match result.next_continuation_token {
Some(token) => RemoteIterState::InProgress(Some(token)),
None => RemoteIterState::Finished,
};
let prefixes = result
.common_prefixes
.into_iter()
.map(|prefix| ReaddirEntry::RemotePrefix {
name: prefix[self.full_path.len()..prefix.len() - 1].to_owned(),
});
let objects = result
.objects
.into_iter()
.map(|object_info| ReaddirEntry::RemoteObject {
name: object_info.key[self.full_path.len()..].to_owned(),
full_key: object_info.key,
size: object_info.size,
last_modified: object_info.last_modified,
storage_class: object_info.storage_class,
restore_status: object_info.restore_status,
etag: object_info.etag,
});
if self.ordered {
let mut new_entries = prefixes.chain(objects).collect::<Vec<_>>();
new_entries.sort();
self.entries.extend(new_entries);
} else {
self.entries.extend(prefixes.chain(objects));
}
}
Ok(self.entries.pop_front())
}
}
mod ordered {
use super::*;
#[derive(Debug)]
pub struct ReaddirIter {
remote: RemoteIter,
local: LocalIter,
next_remote: Option<ReaddirEntry>,
next_local: Option<ReaddirEntry>,
last_entry: Option<ReaddirEntry>,
}
impl ReaddirIter {
pub(super) fn new(
bucket: &str,
full_path: &str,
page_size: usize,
local_entries: VecDeque<ReaddirEntry>,
) -> Self {
Self {
remote: RemoteIter::new(bucket, full_path, page_size, true),
local: LocalIter::new(local_entries),
next_remote: None,
next_local: None,
last_entry: None,
}
}
pub(super) async fn next(&mut self, client: &impl ObjectClient) -> Result<Option<ReaddirEntry>, InodeError> {
loop {
if self.next_remote.is_none() {
self.next_remote = self.remote.next(client).await?;
}
if self.next_local.is_none() {
self.next_local = self.local.next();
}
let next = match (&self.next_remote, &self.next_local) {
(Some(remote), Some(local)) => {
if remote <= local {
self.next_remote.take()
} else {
self.next_local.take()
}
}
(Some(_), None) => self.next_remote.take(),
(None, _) => self.next_local.take(),
};
match (next, &self.last_entry) {
(Some(entry), Some(last_entry)) => {
if last_entry.name() == entry.name() {
warn!(
"{} is omitted because another {} exist with the same name",
entry.description(),
last_entry.description(),
);
} else {
self.last_entry = Some(entry.clone());
return Ok(Some(entry));
}
}
(Some(entry), None) => {
self.last_entry = Some(entry.clone());
return Ok(Some(entry));
}
_ => return Ok(None),
}
}
}
}
#[derive(Debug)]
struct LocalIter {
entries: VecDeque<ReaddirEntry>,
}
impl LocalIter {
fn new(entries: VecDeque<ReaddirEntry>) -> Self {
Self { entries }
}
fn next(&mut self) -> Option<ReaddirEntry> {
self.entries.pop_front()
}
}
}
mod unordered {
use std::collections::HashMap;
use super::*;
#[derive(Debug)]
pub struct ReaddirIter {
remote: RemoteIter,
local: HashMap<String, ReaddirEntry>,
local_iter: VecDeque<ReaddirEntry>,
}
impl ReaddirIter {
pub(super) fn new(
bucket: &str,
full_path: &str,
page_size: usize,
local_entries: VecDeque<ReaddirEntry>,
) -> Self {
let local_map = local_entries
.into_iter()
.map(|entry| {
let ReaddirEntry::LocalInode { lookup } = &entry else {
unreachable!("local entries are always LocalInode");
};
(lookup.inode.name().to_owned(), entry)
})
.collect::<HashMap<_, _>>();
Self {
remote: RemoteIter::new(bucket, full_path, page_size, false),
local: local_map,
local_iter: VecDeque::new(),
}
}
pub(super) async fn next(&mut self, client: &impl ObjectClient) -> Result<Option<ReaddirEntry>, InodeError> {
if let Some(remote) = self.remote.next(client).await? {
self.local.remove(remote.name());
return Ok(Some(remote));
}
if !self.local.is_empty() {
self.local_iter.extend(self.local.drain().map(|(_, entry)| entry));
}
Ok(self.local_iter.pop_front())
}
}
}
#[derive(Debug, Clone)]
pub struct DirectoryEntryReaddir {
pub lookup: LookedUpInode,
pub offset: i64,
pub name: OsString,
pub generation: u64,
}
#[derive(Debug)]
pub struct DirHandle {
#[allow(unused)]
ino: InodeNo,
pub handle: AsyncMutex<ReaddirHandle>,
offset: AtomicI64,
pub last_response: AsyncMutex<Option<(i64, Vec<DirectoryEntryReaddir>)>>,
}
impl DirHandle {
pub fn new(ino: InodeNo, readdir_handle: ReaddirHandle) -> Self {
Self {
ino,
handle: AsyncMutex::new(readdir_handle),
offset: AtomicI64::new(0),
last_response: AsyncMutex::new(None),
}
}
pub fn offset(&self) -> i64 {
self.offset.load(Ordering::SeqCst)
}
pub fn next_offset(&self) {
self.offset.fetch_add(1, Ordering::SeqCst);
}
pub fn rewind_offset(&self) {
self.offset.store(0, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use crate::fs::{FUSE_ROOT_INODE, OpenFlags};
use crate::metablock::{AddDirEntryResult, InodeKind, Metablock};
use crate::s3::{Bucket, S3Path};
use crate::superblock::Superblock;
use crate::sync::Arc;
use mountpoint_s3_client::mock_client::MockClient;
#[tokio::test]
async fn test_readdir_race_condition() {
let bucket = Bucket::new("test-bucket").unwrap();
let client = Arc::new(MockClient::config().bucket(bucket.to_string()).build());
let superblock = Superblock::new(
client.clone(),
S3Path::new(bucket, Default::default()),
Default::default(),
);
let filename = "test_file.txt";
let write_file_handle = 1;
let lookup = superblock
.create(FUSE_ROOT_INODE, filename.as_ref(), InodeKind::File)
.await
.expect("Create failed");
superblock
.open_handle(
lookup.ino(),
write_file_handle,
&Default::default(),
OpenFlags::O_WRONLY,
)
.await
.expect("Start writing failed");
let readdir_handle = superblock
.new_readdir_handle(FUSE_ROOT_INODE)
.await
.expect("Failed to create readdir handle");
superblock
.finish_writing(lookup.ino(), None, write_file_handle)
.await
.expect("Finish writing failed");
superblock
.readdir(
FUSE_ROOT_INODE,
readdir_handle,
0,
false,
Box::new(|_, _, _, _| AddDirEntryResult::EntryAdded),
)
.await
.expect("Readdir failed");
}
}