use mountpoint_s3_client::ObjectClient;
use std::pin::Pin;
use crate::fs::{FileHandle, FileHandleState};
use crate::sync::{Arc, AsyncMutex};
use super::{InodeError, Lookup, Metablock};
#[derive(Clone, Debug)]
pub struct PendingUploadHook {
state: Arc<AsyncMutex<PendingUploadHookState>>,
}
type UploadCompletionResult = Result<Option<Lookup>, InodeError>;
struct PendingUploadHookState {
future: Pin<Box<dyn Future<Output = UploadCompletionResult> + Send>>,
result: Option<UploadCompletionResult>,
}
impl std::fmt::Debug for PendingUploadHookState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PendingUploadHookState")
.field("result", &self.result)
.finish()
}
}
impl PendingUploadHook {
pub(crate) fn new<Client>(metablock: Arc<dyn Metablock>, handle: Arc<FileHandle<Client>>, fh: u64) -> Self
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
let ino = handle.ino;
let location = handle.location.clone();
let state = Arc::new(AsyncMutex::new(PendingUploadHookState {
future: Box::pin(async move {
let mut fh_state = handle.state.lock().await;
let FileHandleState::Write { state, .. } = &mut *fh_state else {
return Ok(None); };
state.complete_pending_upload(metablock, ino, &location, fh).await
}),
result: None,
}));
Self { state }
}
pub async fn wait_for_completion(&self) -> UploadCompletionResult {
let mut state = self.state.lock().await;
if let Some(result) = &state.result {
return result.clone();
}
let future = std::mem::replace(&mut state.future, Box::pin(std::future::ready(Ok(None))));
let result = future.await;
state.result = Some(result.clone());
result
}
}