use std::path::Path;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::Object;
use chrono::DateTime;
use chrono::Utc;
use tempfile::TempDir;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use crate::checksum::calculate_sha256_chunked_checksum;
use crate::io::remote::RemoteObjectStream;
use crate::io::remote::S3Attributes;
use crate::uri::S3Uri;
use crate::Res;
use super::Storage;
pub(crate) struct MockStorage {
pub(crate) temp_dir: TempDir,
}
impl Clone for MockStorage {
fn clone(&self) -> Self {
MockStorage {
temp_dir: TempDir::new_in(self.temp_dir.path())
.expect("Failed to create temporary directory"),
}
}
}
impl Default for MockStorage {
fn default() -> Self {
MockStorage {
temp_dir: TempDir::new().expect("Failed to create temporary directory"),
}
}
}
impl MockStorage {}
pub fn relative_to_temp_dir(
temp_dir: &impl AsRef<Path>,
path: impl AsRef<Path>,
) -> impl AsRef<Path> {
let path_to_join = if path.as_ref().starts_with(temp_dir) {
path.as_ref().strip_prefix(temp_dir.as_ref()).unwrap()
} else if path.as_ref().starts_with("/") {
path.as_ref().strip_prefix("/").unwrap()
} else {
path.as_ref()
};
temp_dir.as_ref().join(path_to_join)
}
async fn create_parent(path: impl AsRef<Path>) -> Res {
Ok(fs::create_dir_all(path.as_ref().parent().unwrap()).await?)
}
impl Storage for MockStorage {
async fn copy(&self, from: impl AsRef<Path>, to: impl AsRef<Path>) -> Res<u64> {
let from_path = relative_to_temp_dir(&self.temp_dir, &from);
let to_path = relative_to_temp_dir(&self.temp_dir, &to);
create_parent(&to_path).await?;
Ok(fs::copy(from_path, to_path).await?)
}
async fn rename(&self, from: impl AsRef<Path>, to: impl AsRef<Path>) -> Res {
let from_path = relative_to_temp_dir(&self.temp_dir, &from);
let to_path = relative_to_temp_dir(&self.temp_dir, &to);
create_parent(&to_path).await?;
Ok(fs::rename(from_path, to_path).await?)
}
async fn create_dir_all(&self, path: impl AsRef<Path>) -> Res {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
Ok(fs::create_dir_all(rel_path).await?)
}
async fn remove_dir_all(&self, path: impl AsRef<Path>) -> Res {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
Ok(fs::remove_dir_all(rel_path).await?)
}
async fn remove_file(&self, path: impl AsRef<Path>) -> Result<(), std::io::Error> {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
fs::remove_file(rel_path).await
}
async fn exists(&self, path: impl AsRef<std::path::Path>) -> bool {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
fs::metadata(rel_path).await.is_ok()
}
async fn modified_timestamp(
&self,
path: impl AsRef<Path>,
) -> Res<chrono::DateTime<chrono::Utc>> {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
create_parent(&rel_path).await?;
let modified = fs::metadata(rel_path).await.map(|m| m.modified())??;
Ok(DateTime::<Utc>::from(modified))
}
async fn write_file(&self, path: impl AsRef<Path>, bytes: &[u8]) -> Res {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
create_parent(&rel_path).await?;
Ok(fs::write(rel_path, bytes).await?)
}
async fn open_file(&self, path: impl AsRef<Path>) -> Res<fs::File> {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
Ok(fs::File::open(rel_path).await?)
}
async fn create_file(&self, path: impl AsRef<Path>) -> Res<fs::File> {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
create_parent(&rel_path).await?;
Ok(fs::File::create(rel_path).await?)
}
async fn read_dir(&self, path: impl AsRef<Path>) -> Res<fs::ReadDir> {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
Ok(fs::read_dir(&rel_path).await?)
}
async fn read_file(&self, path: impl AsRef<Path>) -> Res<Vec<u8>> {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
Ok(fs::read(&rel_path).await?)
}
async fn read_byte_stream(&self, path: impl AsRef<Path> + Send + Sync) -> Res<ByteStream> {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
Ok(ByteStream::from_path(rel_path).await?)
}
async fn write_byte_stream(
&self,
path: impl AsRef<Path> + Send + Sync,
mut body: ByteStream,
) -> Res {
let rel_path = relative_to_temp_dir(&self.temp_dir, &path);
let mut file = fs::File::create(&rel_path).await?;
while let Some(bytes) = body.try_next().await? {
file.write_all(&bytes).await?;
}
file.flush().await?;
Ok(())
}
async fn get_object_attributes(
&self,
stream: RemoteObjectStream,
listing_uri: &S3Uri,
object: &Object,
) -> Res<S3Attributes> {
let reader = stream.body.into_async_read();
let size: u64 = object.size.unwrap_or(0).try_into()?;
let hash = calculate_sha256_chunked_checksum(reader, size).await?;
Ok(S3Attributes {
listing_uri: listing_uri.clone(),
object_uri: stream.uri,
hash,
size,
})
}
}