use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use async_trait::async_trait;
use mountpoint_s3_client::types::ETag;
use time::OffsetDateTime;
use crate::fs::OpenFlags;
use crate::metablock::{
AddDirEntry, AddDirEntryResult, InodeError, InodeErrorInfo, InodeInformation, InodeKind, InodeNo, InodeStat,
Lookup, Metablock, NEVER_EXPIRE_TTL, NewHandle, PendingUploadHook, ROOT_INODE_NO, ReadWriteMode, S3Location,
ValidName, WriteMode,
};
use crate::s3::S3Path;
use crate::sync::atomic::{AtomicU64, Ordering};
use crate::sync::{Arc, Mutex, RwLock};
use super::core::{Manifest, ManifestDirIter, ManifestError};
#[derive(Debug)]
pub struct ManifestMetablock {
channels: Vec<Arc<S3Path>>,
mount_time: OffsetDateTime,
manifest: Manifest,
next_dir_handle_id: AtomicU64,
readdir_handles: RwLock<HashMap<u64, Arc<Mutex<ManifestDirIter>>>>,
}
impl ManifestMetablock {
pub fn new(manifest: Manifest) -> Result<Self, ManifestError> {
let channels = manifest.load_channels()?.into_iter().map(Arc::new).collect();
Ok(Self {
channels,
mount_time: OffsetDateTime::now_utc(),
manifest,
next_dir_handle_id: Default::default(),
readdir_handles: Default::default(),
})
}
fn get_parent_id(&self, ino: InodeNo) -> Result<InodeNo, InodeError> {
if ino == ROOT_INODE_NO {
return Ok(ROOT_INODE_NO);
};
let Some(manifest_entry) = self.manifest.manifest_lookup_by_id(ino)? else {
return Err(InodeError::InodeDoesNotExist(ino));
};
Ok(manifest_entry.parent_id())
}
fn stat_for_directory(&self) -> InodeStat {
InodeStat::for_directory(self.mount_time, NEVER_EXPIRE_TTL)
}
}
#[async_trait]
impl Metablock for ManifestMetablock {
async fn lookup(&self, parent_ino: InodeNo, name: &OsStr) -> Result<Lookup, InodeError> {
let name: ValidName = name.try_into()?;
let Some(manifest_entry) = self.manifest.manifest_lookup(parent_ino, &name)? else {
return Err(InodeError::FileDoesNotExist(
name.to_string(),
InodeErrorInfo {
ino: parent_ino,
key: "".into(), bucket: None,
},
));
};
let lookup = manifest_entry.into_lookup(&self.channels, self.mount_time)?;
Ok(lookup)
}
async fn getattr(&self, ino: InodeNo, _force_revalidate_if_remote: bool) -> Result<Lookup, InodeError> {
if ino == ROOT_INODE_NO {
return Ok(Lookup::new(ino, self.stat_for_directory(), InodeKind::Directory, None));
}
let Some(manifest_entry) = self.manifest.manifest_lookup_by_id(ino)? else {
return Err(InodeError::InodeDoesNotExist(ino));
};
let lookup = manifest_entry.into_lookup(&self.channels, self.mount_time)?;
Ok(lookup)
}
async fn new_readdir_handle(&self, dir_ino: InodeNo) -> Result<u64, InodeError> {
let readdir_handle_id = self.next_dir_handle_id.fetch_add(1, Ordering::SeqCst);
let readdir_handle = self.manifest.dir_iter(dir_ino);
self.readdir_handles
.write()
.expect("lock must succeed")
.insert(readdir_handle_id, Arc::new(Mutex::new(readdir_handle)));
Ok(readdir_handle_id)
}
async fn readdir<'a>(
&self,
parent: InodeNo,
fh: u64,
mut offset: i64,
_is_readdirplus: bool,
mut add: AddDirEntry<'a>,
) -> Result<(), InodeError> {
let Some(readdir_handle) = self
.readdir_handles
.read()
.expect("lock must succeed")
.get(&fh)
.cloned()
else {
return Err(InodeError::NoSuchDirHandle { fh });
};
if offset < 1 {
if add(
InodeInformation::new(parent, self.stat_for_directory(), InodeKind::Directory),
".".into(),
offset + 1,
0,
) == AddDirEntryResult::ReplyBufferFull
{
return Ok(());
}
offset += 1;
}
if offset < 2 {
let grandparent_ino = self.get_parent_id(parent)?;
if add(
InodeInformation::new(grandparent_ino, self.stat_for_directory(), InodeKind::Directory),
"..".into(),
offset + 1,
0,
) == AddDirEntryResult::ReplyBufferFull
{
return Ok(());
}
offset += 1;
}
let mut readdir_handle = readdir_handle.lock().expect("lock must succeed");
let shifted_offset = (offset - 2) as usize; readdir_handle.seek(shifted_offset)?; while let Some(manifest_entry) = readdir_handle.next_entry()? {
let (inode_info, name) = manifest_entry
.clone()
.into_inode_information(&self.channels, self.mount_time)?;
if add(inode_info, OsString::from(name), offset + 1, 0) == AddDirEntryResult::ReplyBufferFull {
readdir_handle.readd(manifest_entry);
break;
}
offset += 1;
}
Ok(())
}
async fn releasedir(&self, fh: u64) -> Result<(), InodeError> {
self.readdir_handles.write().expect("lock must succeed").remove(&fh);
Ok(())
}
async fn open_handle(
&self,
ino: InodeNo,
_fh: u64,
_write_mode: &WriteMode,
flags: OpenFlags,
) -> Result<NewHandle, InodeError> {
let lookup = self.getattr(ino, false).await?;
if flags.contains(OpenFlags::O_WRONLY) {
return Err(InodeError::InodeNotWritable(lookup.inode_err()));
}
Ok(NewHandle::read(lookup))
}
async fn finish_reading(&self, _ino: InodeNo, _file_handle: u64) -> Result<(), InodeError> {
Ok(())
}
async fn forget(&self, _ino: InodeNo, _n: u64) {
}
async fn create(&self, dir: InodeNo, _name: &OsStr, _kind: InodeKind) -> Result<Lookup, InodeError> {
Err(InodeError::InodeNotWritable(InodeErrorInfo {
ino: dir,
key: "".into(),
bucket: None,
}))
}
async fn inc_file_size(&self, ino: InodeNo, _len: usize) -> Result<usize, InodeError> {
Err(InodeError::InodeNotWritable(InodeErrorInfo {
ino,
key: "".into(),
bucket: None,
}))
}
async fn finish_writing(&self, ino: InodeNo, _etag: Option<ETag>, _fh: u64) -> Result<Lookup, InodeError> {
Err(InodeError::InodeNotWritable(InodeErrorInfo {
ino,
key: "".into(),
bucket: None,
}))
}
async fn rename(
&self,
src_parent_ino: InodeNo,
_src_name: &OsStr,
_dst_parent_ino: InodeNo,
_dst_name: &OsStr,
_allow_overwrite: bool,
) -> Result<(), InodeError> {
Err(InodeError::InodeNotWritable(InodeErrorInfo {
ino: src_parent_ino,
key: "".into(),
bucket: None,
}))
}
async fn rmdir(&self, parent_ino: InodeNo, _name: &OsStr) -> Result<(), InodeError> {
Err(InodeError::InodeNotWritable(InodeErrorInfo {
ino: parent_ino,
key: "".into(),
bucket: None,
}))
}
async fn unlink(&self, parent_ino: InodeNo, _name: &OsStr) -> Result<(), InodeError> {
Err(InodeError::InodeNotWritable(InodeErrorInfo {
ino: parent_ino,
key: "".into(),
bucket: None,
}))
}
async fn setattr(
&self,
ino: InodeNo,
_atime: Option<OffsetDateTime>,
_mtime: Option<OffsetDateTime>,
) -> Result<Lookup, InodeError> {
Err(InodeError::InodeNotWritable(InodeErrorInfo {
ino,
key: "".into(),
bucket: None,
}))
}
async fn try_reactivate_handle(&self, ino: InodeNo, _fh: u64, mode: ReadWriteMode) -> Result<bool, InodeError> {
match mode {
ReadWriteMode::Read => Ok(true),
ReadWriteMode::Write => Err(InodeError::InodeNotWritable(InodeErrorInfo {
ino,
key: "".into(),
bucket: None,
})),
}
}
async fn flush_reader(&self, _ino: InodeNo, _fh: u64) -> Result<(), InodeError> {
Ok(())
}
async fn flush_writer(
&self,
ino: InodeNo,
_fh: u64,
_pending_upload_hook: PendingUploadHook,
) -> Result<Option<PendingUploadHook>, InodeError> {
Err(InodeError::InodeNotWritable(InodeErrorInfo {
ino,
key: "".into(),
bucket: None,
}))
}
async fn release_writer(
&self,
ino: InodeNo,
_fh: u64,
_pending_upload_hook: PendingUploadHook,
_location: &S3Location,
) -> Result<(), InodeError> {
Err(InodeError::InodeNotWritable(InodeErrorInfo {
ino,
key: "".into(),
bucket: None,
}))
}
}