use crate::{shared::webdav::EntryPath, storage_config::StorageConfigToml};
use bytes::Bytes;
use futures_util::{stream::StreamExt, Stream};
#[cfg(test)]
use opendal::Buffer;
use opendal::Operator;
use std::path::Path;
use super::{FileIoError, FileMetadata, FileMetadataBuilder, FileStream, WriteStreamError};
pub fn build_storage_operator_from_config(
config: &StorageConfigToml,
data_directory: &Path,
) -> Result<Operator, FileIoError> {
let builder = match config.clone() {
StorageConfigToml::FileSystem => {
let files_dir = match data_directory.join("data/files").to_str() {
Some(path) => path.to_string(),
None => {
return Err(FileIoError::OpenDAL(opendal::Error::new(
opendal::ErrorKind::Unexpected,
"Invalid path",
)))
}
};
let builder = opendal::services::Fs::default().root(files_dir.as_str());
opendal::Operator::new(builder)?.finish()
}
#[cfg(feature = "storage-gcs")]
StorageConfigToml::GoogleBucket(config) => {
tracing::info!(
"Store files in a Google Cloud Storage bucket: {}",
config.bucket_name
);
let builder = config.to_builder()?;
opendal::Operator::new(builder)?.finish()
}
#[cfg(any(feature = "storage-memory", test))]
StorageConfigToml::InMemory => {
tracing::info!("Store files in memory");
let builder = opendal::services::Memory::default();
opendal::Operator::new(builder)?.finish()
}
};
Ok(builder)
}
const CHUNK_SIZE: usize = 16 * 1024;
#[derive(Debug, Clone)]
pub struct OpendalService {
operator: Operator,
}
impl OpendalService {
pub fn new_from_config(
config: &StorageConfigToml,
data_directory: &Path,
) -> Result<Self, FileIoError> {
let operator = build_storage_operator_from_config(config, data_directory)?;
Ok(Self { operator })
}
pub async fn delete(&self, path: &EntryPath) -> Result<(), FileIoError> {
self.operator
.delete(path.as_str())
.await
.map_err(FileIoError::OpenDAL)
}
#[cfg(test)]
pub async fn write(
&self,
path: &EntryPath,
buffer: impl Into<Buffer>,
) -> Result<FileMetadata, FileIoError> {
let buffer: Buffer = buffer.into();
let bytes = Bytes::from(buffer.to_vec());
let stream = Box::pin(futures_util::stream::once(async move { Ok(bytes) }));
self.write_stream(path, stream, u64::MAX).await
}
pub async fn write_stream(
&self,
path: &EntryPath,
mut stream: impl Stream<Item = Result<Bytes, WriteStreamError>> + Unpin + Send,
max_bytes: u64,
) -> Result<FileMetadata, FileIoError> {
let mut writer = self.operator.writer(path.as_str()).await?;
let mut metadata_builder = FileMetadataBuilder::default();
metadata_builder.guess_mime_type_from_path(path.path().as_str());
let write_result: Result<(), FileIoError> = async {
let mut bytes_counter = 0;
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
bytes_counter += chunk.len() as u64;
if bytes_counter > max_bytes {
return Err(FileIoError::DiskSpaceQuotaExceeded);
}
metadata_builder.update(&chunk);
writer.write(chunk).await?;
}
Ok(())
}
.await;
match write_result {
Ok(()) => {
writer.close().await?;
Ok(metadata_builder.finalize())
}
Err(e) => {
writer.abort().await?;
Err(e)
}
}
}
pub async fn get_stream(&self, path: &EntryPath) -> Result<FileStream, FileIoError> {
match self
.operator
.reader_with(path.as_str())
.chunk(CHUNK_SIZE)
.await
{
Ok(reader) => Ok(Box::new(reader.into_bytes_stream(0..).await?)),
Err(e) => match e.kind() {
opendal::ErrorKind::NotFound => Err(FileIoError::NotFound),
opendal::ErrorKind::PermissionDenied => {
tracing::warn!(
"Permission denied for path: {}. Treating as not found.",
path
);
Err(FileIoError::NotFound)
}
_ => {
tracing::error!("OpenDAL error for path {}: {}", path, e);
Err(FileIoError::OpenDAL(e))
}
},
}
}
#[cfg(test)]
pub async fn get(&self, path: &EntryPath) -> Result<Bytes, FileIoError> {
let mut stream = self.get_stream(path).await?;
let mut content = Vec::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
content.extend_from_slice(&chunk);
}
Ok(Bytes::from(content))
}
#[cfg(test)]
pub async fn exists(&self, path: &EntryPath) -> Result<bool, opendal::Error> {
self.operator.exists(path.as_str()).await
}
}
#[cfg(test)]
mod tests {
use crate::{
shared::webdav::WebDavPath,
storage_config::{GoogleBucketConfig, GoogleServiceAccountKeyConfig},
};
use super::*;
fn google_bucket_config() -> Option<StorageConfigToml> {
let service_account_path = Path::new("../gcs-service-account.json").to_path_buf();
if !service_account_path.exists() {
println!("Google Bucket config not tested because no service account file is set.");
return None;
}
Some(StorageConfigToml::GoogleBucket(GoogleBucketConfig {
bucket_name: "homeserver-test".to_string(),
credential: GoogleServiceAccountKeyConfig::Path(service_account_path),
}))
}
fn get_configs() -> Vec<StorageConfigToml> {
let mut configs = vec![StorageConfigToml::FileSystem, StorageConfigToml::InMemory];
if let Some(google_config) = google_bucket_config() {
configs.push(google_config);
}
configs
}
#[tokio::test]
async fn test_get_content_chunked() {
let configs = get_configs();
for config in configs {
let file_service = OpendalService::new_from_config(&config, Path::new("/tmp/test"))
.expect("Failed to create OpenDAL service for testing");
let pubkey = pkarr::Keypair::random().public_key();
let path = EntryPath::new(pubkey, WebDavPath::new("/test.txt").unwrap());
let should_chunk_count = 5;
let test_data = vec![42u8; should_chunk_count * CHUNK_SIZE];
file_service.write(&path, test_data.clone()).await.unwrap();
let mut stream = file_service.get_stream(&path).await.unwrap();
let mut collected_data = Vec::new();
let mut count = 0;
while let Some(chunk_result) = stream.next().await {
count += 1;
let chunk = chunk_result.unwrap();
collected_data.extend_from_slice(&chunk);
}
assert_eq!(
collected_data.len(),
test_data.len(),
"Total size should be 10KB"
);
assert_eq!(
collected_data, test_data,
"Content should match original data"
);
assert!(count >= should_chunk_count, "Should have received x chunks");
assert_eq!(
collected_data.len(),
should_chunk_count * CHUNK_SIZE,
"Total size should be 10KB"
);
assert_eq!(
collected_data, test_data,
"Content should match original data"
);
file_service
.delete(&path)
.await
.expect("Should delete file");
assert!(
!file_service.exists(&path).await.unwrap(),
"File should not exist after deletion"
);
}
}
#[tokio::test]
async fn test_write_content_stream() {
let configs = get_configs();
for config in configs {
let file_service = OpendalService::new_from_config(&config, Path::new("/tmp/test"))
.expect("Failed to create OpenDAL service for testing");
let pubkey = pkarr::Keypair::random().public_key();
let path = EntryPath::new(pubkey, WebDavPath::new("/test_stream.txt").unwrap());
let chunk_count = 3;
let mut test_data = Vec::new();
let mut chunks = Vec::new();
for i in 0..chunk_count {
let chunk_data = vec![i as u8; CHUNK_SIZE];
test_data.extend_from_slice(&chunk_data);
chunks.push(Ok(Bytes::from(chunk_data)));
}
let stream = futures_util::stream::iter(chunks);
file_service
.write_stream(&path, stream, u64::MAX)
.await
.unwrap();
let read_content = file_service.get(&path).await.unwrap();
assert_eq!(
read_content.len(),
test_data.len(),
"Content length should match"
);
assert_eq!(
read_content.to_vec(),
test_data,
"Content should match original data"
);
file_service
.delete(&path)
.await
.expect("Should delete file");
assert!(
!file_service.exists(&path).await.unwrap(),
"File should not exist after deletion"
);
}
}
}