use std::str::FromStr as _;
use mountpoint_s3_client::ObjectClient;
use mountpoint_s3_client::types::ETag;
use tracing::{debug, error};
use crate::fs::InodeError;
use crate::metablock::{Lookup, Metablock, NewHandle, PendingUploadHook, ReadWriteMode, S3Location};
use crate::object::ObjectId;
use crate::prefetch::{HandleId, PrefetchGetObject};
use crate::sync::{Arc, AsyncMutex};
use crate::upload::{AppendUploadRequest, UploadRequest};
use super::{Error, InodeNo, OpenFlags, S3Filesystem, ToErrno};
#[derive(Debug)]
pub struct FileHandle<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
pub ino: InodeNo,
pub location: S3Location,
pub state: AsyncMutex<FileHandleState<Client>>,
pub open_pid: u32,
}
impl<Client> FileHandle<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
pub fn file_name(&self) -> &str {
self.location.name()
}
}
#[derive(Debug)]
pub enum FileHandleState<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
Read {
request: PrefetchGetObject<Client>,
flushed: bool,
},
Write {
state: UploadState<Client>,
flushed: bool,
},
}
impl<Client> FileHandleState<Client>
where
Client: ObjectClient + Clone + Send + Sync,
{
pub async fn new(
fh: u64,
handle: &NewHandle,
flags: OpenFlags,
fs: &S3Filesystem<Client>,
) -> Result<FileHandleState<Client>, Error> {
let ino = handle.lookup.ino();
let stat = handle.lookup.stat();
let location = handle.lookup.s3_location()?;
let full_key = location.full_key();
let bucket = location.bucket_name();
match handle.mode {
ReadWriteMode::Read => {
let object_size = stat.size as u64;
let etag = match &stat.etag {
None => return Err(err!(libc::EBADF, "no E-Tag for inode {}", ino)),
Some(etag) => ETag::from_str(etag).expect("E-Tag should be set"),
};
let object_id = ObjectId::new(full_key.into(), etag);
let request = fs
.prefetcher
.prefetch(bucket.to_string(), object_id, HandleId::new(fh), object_size);
let handle = FileHandleState::Read {
request,
flushed: false,
};
metrics::gauge!("fs.current_handles", "type" => "read").increment(1.0);
Ok(handle)
}
ReadWriteMode::Write => {
let is_truncate = flags.contains(OpenFlags::O_TRUNC);
let write_mode = fs.config.write_mode();
let upload_state = if write_mode.incremental_upload {
let initial_etag = if is_truncate {
None
} else {
stat.etag.as_ref().map(|e| e.into())
};
let current_offset = if is_truncate { 0 } else { stat.size as u64 };
let request = fs.uploader.start_incremental_upload(
bucket.to_string(),
full_key.into(),
current_offset,
initial_etag.clone(),
);
UploadState::AppendInProgress {
request,
initial_etag,
written_bytes: 0,
}
} else {
let request = fs
.uploader
.start_atomic_upload(bucket.to_string(), full_key.into())
.map_err(|e| err!(libc::EIO, source:e, "put failed to start"))?;
UploadState::MPUInProgress { request }
};
let handle = FileHandleState::Write {
state: upload_state,
flushed: false,
};
metrics::gauge!("fs.current_handles", "type" => "write").increment(1.0);
Ok(handle)
}
}
}
}
#[derive(Debug)]
pub enum UploadState<Client: ObjectClient + Send + Sync> {
AppendInProgress {
request: AppendUploadRequest<Client>,
initial_etag: Option<ETag>,
written_bytes: usize,
},
MPUInProgress {
request: UploadRequest<Client>,
},
Completed,
Failed(libc::c_int),
}
impl<Client> UploadState<Client>
where
Client: ObjectClient + Send + Sync + Clone + 'static,
{
pub async fn write(
&mut self,
fs: &S3Filesystem<Client>,
handle: &FileHandle<Client>,
offset: i64,
data: &[u8],
fh: u64,
) -> Result<u32, Error> {
let result: Result<_, Error> = match self {
UploadState::AppendInProgress {
request, written_bytes, ..
} => match request.write(offset as u64, data).await {
Ok(len) => {
*written_bytes += len;
Ok(len)
}
Err(e) => Err(e.into()),
},
UploadState::MPUInProgress { request, .. } => match request.write(offset, data).await {
Ok(len) => Ok(len),
Err(e) => Err(e.into()),
},
UploadState::Completed => {
return Err(err!(libc::EIO, "upload already completed for key {}", handle.location));
}
UploadState::Failed(e) => {
return Err(err!(*e, "upload already aborted for key {}", handle.location));
}
};
match result {
Ok(len) => {
fs.metablock.inc_file_size(handle.ino, len).await?;
Ok(len as u32)
}
Err(e) => {
match std::mem::replace(self, UploadState::Failed(e.to_errno())) {
UploadState::MPUInProgress { .. } | UploadState::AppendInProgress { .. } => {
Self::finish_on_error(fs.metablock.clone(), handle.ino, &handle.location, fh).await;
}
UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
}
Err(e)
}
}
}
pub async fn commit(
&mut self,
fs: &S3Filesystem<Client>,
handle: Arc<FileHandle<Client>>,
fh: u64,
) -> Result<(), Error> {
match &self {
UploadState::Completed => return Ok(()),
UploadState::Failed(e) => {
return Err(err!(*e, "upload already aborted for key {}", handle.location));
}
_ => {}
};
match std::mem::replace(self, UploadState::Completed) {
UploadState::AppendInProgress {
request,
initial_etag,
written_bytes,
} => {
let current_offset = request.current_offset();
let etag = Self::commit_append(request, &handle.location)
.await
.inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?;
let initial_etag = etag.or(initial_etag);
let request = fs.uploader.start_incremental_upload(
handle.location.bucket_name().to_owned(),
handle.location.full_key().to_string(),
current_offset,
initial_etag.clone(),
);
*self = UploadState::AppendInProgress {
request,
initial_etag: initial_etag.clone(),
written_bytes,
};
}
UploadState::MPUInProgress { request, .. } => {
Self::complete_upload(fs.metablock.clone(), handle.ino, &handle.location, request, fh)
.await
.inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?;
}
UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
}
Ok(())
}
pub async fn complete(
&mut self,
fs: &S3Filesystem<Client>,
handle: Arc<FileHandle<Client>>,
pid: u32,
open_pid: u32,
fh: u64,
) -> Result<(), Error> {
match self {
UploadState::AppendInProgress { written_bytes, .. } => {
if *written_bytes == 0 || !are_from_same_process(open_pid, pid) {
self.commit(fs, handle.clone(), fh).await?;
return Self::flush_writer(fs, handle.ino, handle.clone(), fh).await;
}
}
UploadState::MPUInProgress { request, .. } => {
if request.size() == 0 {
debug!(key=%handle.location, "not completing upload because nothing was written yet");
return Self::flush_writer(fs, handle.ino, handle.clone(), fh).await;
}
if !are_from_same_process(open_pid, pid) {
debug!(
key=%handle.location,
pid, open_pid, "not completing upload because current PID differs from PID at open",
);
return Self::flush_writer(fs, handle.ino, handle.clone(), fh).await;
}
}
UploadState::Completed => return Ok(()),
UploadState::Failed(e) => {
return Err(err!(
*e,
"upload already aborted for key {:?}",
handle.location.full_key()
));
}
};
match std::mem::replace(self, UploadState::Completed) {
UploadState::AppendInProgress {
request, initial_etag, ..
} => Self::complete_append(
fs.metablock.clone(),
handle.ino,
&handle.location,
request,
initial_etag,
fh,
)
.await
.inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?,
UploadState::MPUInProgress { request, .. } => {
Self::complete_upload(fs.metablock.clone(), handle.ino, &handle.location, request, fh)
.await
.inspect_err(|e| *self = UploadState::Failed(e.to_errno()))?
}
UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
};
Ok(())
}
pub async fn complete_pending_upload(
&mut self,
metablock: Arc<dyn Metablock>,
ino: InodeNo,
key: &S3Location,
fh: u64,
) -> Result<Option<Lookup>, InodeError> {
match self {
UploadState::Completed | UploadState::Failed(_) => return Ok(None),
_ => {}
}
match std::mem::replace(self, UploadState::Completed) {
UploadState::AppendInProgress {
request, initial_etag, ..
} => Ok(Some(
Self::complete_append(metablock, ino, key, request, initial_etag, fh).await?,
)),
UploadState::MPUInProgress { request, .. } => {
Ok(Some(Self::complete_upload(metablock, ino, key, request, fh).await?))
}
UploadState::Failed(_) | UploadState::Completed => unreachable!("checked above"),
}
}
async fn complete_upload(
metablock: Arc<dyn Metablock>,
ino: InodeNo,
key: &S3Location,
upload: UploadRequest<Client>,
fh: u64,
) -> Result<Lookup, InodeError> {
let size = upload.size();
match upload.complete().await {
Ok(put_result) => {
debug!(etag=?put_result.etag.as_str(), %key, size, "put succeeded");
metablock.finish_writing(ino, Some(put_result.etag), fh).await
}
Err(e) => {
Self::finish_on_error(metablock, ino, key, fh).await;
Err(InodeError::upload_error(e, key.clone()))
}
}
}
async fn complete_append(
metablock: Arc<dyn Metablock>,
ino: InodeNo,
key: &S3Location,
upload: AppendUploadRequest<Client>,
initial_etag: Option<ETag>,
fh: u64,
) -> Result<Lookup, InodeError> {
match Self::commit_append(upload, key).await {
Ok(etag) => {
let etag = etag.or(initial_etag);
metablock.finish_writing(ino, etag, fh).await
}
Err(err) => {
Self::finish_on_error(metablock, ino, key, fh).await;
Err(err)
}
}
}
async fn commit_append(upload: AppendUploadRequest<Client>, key: &S3Location) -> Result<Option<ETag>, InodeError> {
match upload.complete().await {
Ok(Some(result)) => {
debug!(%key, "put succeeded");
Ok(Some(result.etag))
}
Ok(None) => {
debug!(%key, "no put required");
Ok(None)
}
Err(e) => Err(InodeError::upload_error(e, key.clone())),
}
}
async fn finish_on_error(metablock: Arc<dyn Metablock>, ino: InodeNo, s3location: &S3Location, fh: u64) {
if let Err(err) = metablock.finish_writing(ino, None, fh).await {
error!(?err, key=?s3location.full_key(), "error updating the inode status");
}
}
async fn flush_writer(
fs: &S3Filesystem<Client>,
ino: InodeNo,
handle: Arc<FileHandle<Client>>,
fh: u64,
) -> Result<(), Error> {
let pending_upload_hook = PendingUploadHook::new(fs.metablock.clone(), handle, fh);
fs.metablock.flush_writer(ino, fh, pending_upload_hook).await?;
Ok(())
}
}
fn get_tgid(pid: u32) -> Option<u32> {
if cfg!(not(target_os = "macos")) {
use std::fs::File;
use std::io::{BufRead, BufReader};
let path = format!("/proc/{pid}/task/{pid}/status");
let file = File::open(path).ok()?;
for line in BufReader::new(file).lines() {
let line = line.ok()?;
if line.starts_with("Tgid:") {
return line["Tgid: ".len()..].trim().parse::<u32>().ok();
}
}
}
None
}
fn are_from_same_process(pid1: u32, pid2: u32) -> bool {
if pid1 == pid2 {
return true;
}
let Some(tgid1) = get_tgid(pid1) else {
return false;
};
let Some(tgid2) = get_tgid(pid2) else {
return false;
};
tgid1 == tgid2
}