use std::path::Path;
#[cfg(test)]
use crate::AppContext;
use crate::{
persistence::{
files::{entry_layer::EntryLayer, user_quota_layer::UserQuotaLayer},
lmdb::LmDB,
},
shared::webdav::EntryPath,
storage_config::StorageConfigToml,
};
use bytes::Bytes;
use futures_util::{stream::StreamExt, Stream};
#[cfg(test)]
use opendal::Buffer;
use opendal::Operator;
use super::{FileIoError, FileMetadata, FileMetadataBuilder, FileStream, WriteStreamError};
pub fn build_storage_operator(
storage_config: &StorageConfigToml,
data_directory: &Path,
db: &LmDB,
user_quota_bytes: u64,
) -> Result<Operator, FileIoError> {
let user_quota_layer = UserQuotaLayer::new(db.clone(), user_quota_bytes);
let entry_layer = EntryLayer::new(db.clone());
let builder = match storage_config {
StorageConfigToml::FileSystem => {
let files_dir = match data_directory.join("data/files").to_str() {
Some(path) => path.to_string(),
None => {
return Err(FileIoError::OpenDAL(opendal::Error::new(
opendal::ErrorKind::Unexpected,
"Invalid path",
)))
}
};
let builder = opendal::services::Fs::default().root(files_dir.as_str());
opendal::Operator::new(builder)?
.layer(user_quota_layer)
.layer(entry_layer)
.finish()
}
#[cfg(feature = "storage-gcs")]
StorageConfigToml::GoogleBucket(config) => {
tracing::info!(
"Store files in a Google Cloud Storage bucket: {}",
config.bucket_name
);
let builder = config.to_builder()?;
opendal::Operator::new(builder)?
.layer(user_quota_layer)
.layer(entry_layer)
.finish()
}
#[cfg(any(feature = "storage-memory", test))]
StorageConfigToml::InMemory => {
tracing::info!("Store files in memory");
let builder = opendal::services::Memory::default();
opendal::Operator::new(builder)?
.layer(user_quota_layer)
.layer(entry_layer)
.finish()
}
};
Ok(builder)
}
#[cfg(test)]
pub fn build_storage_operator_from_context(context: &AppContext) -> Result<Operator, FileIoError> {
let quota_bytes = match context.config_toml.general.user_storage_quota_mb {
0 => u64::MAX,
other => other * 1024 * 1024,
};
build_storage_operator(
&context.config_toml.storage,
context.data_dir.path(),
&context.db,
quota_bytes,
)
}
const CHUNK_SIZE: usize = 16 * 1024;
#[derive(Debug, Clone)]
pub struct OpendalService {
pub(crate) operator: Operator,
}
impl OpendalService {
pub fn new_from_config(
config: &StorageConfigToml,
data_directory: &Path,
db: &LmDB,
user_quota_bytes: u64,
) -> Result<Self, FileIoError> {
let operator = build_storage_operator(config, data_directory, db, user_quota_bytes)?;
Ok(Self { operator })
}
pub async fn delete(&self, path: &EntryPath) -> Result<(), FileIoError> {
self.operator
.delete(path.as_str())
.await
.map_err(FileIoError::OpenDAL)
}
pub async fn write_stream(
&self,
path: &EntryPath,
mut stream: impl Stream<Item = Result<Bytes, WriteStreamError>> + Unpin + Send,
) -> Result<FileMetadata, FileIoError> {
let mut writer = self.operator.writer(path.as_str()).await?;
let mut metadata_builder = FileMetadataBuilder::default();
metadata_builder.guess_mime_type_from_path(path.path().as_str());
let write_result: Result<(), FileIoError> = async {
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
metadata_builder.update(&chunk);
writer.write(chunk).await?;
}
Ok(())
}
.await;
match write_result {
Ok(()) => {
writer.close().await.map_err(|e| {
if e.kind() == opendal::ErrorKind::RateLimited
&& e.to_string().contains("User quota exceeded")
{
FileIoError::DiskSpaceQuotaExceeded
} else {
FileIoError::OpenDAL(e)
}
})?;
Ok(metadata_builder.finalize())
}
Err(e) => {
writer.abort().await?;
Err(e)
}
}
}
async fn get_stream_inner(&self, path: &EntryPath) -> Result<FileStream, opendal::Error> {
let reader = self
.operator
.reader_with(path.as_str())
.chunk(CHUNK_SIZE)
.await?;
let stream = reader.into_bytes_stream(0..).await?;
Ok(Box::new(stream))
}
pub async fn get_stream(&self, path: &EntryPath) -> Result<FileStream, FileIoError> {
match self.get_stream_inner(path).await {
Ok(stream) => Ok(stream),
Err(e) => match e.kind() {
opendal::ErrorKind::NotFound => Err(FileIoError::NotFound),
opendal::ErrorKind::PermissionDenied => {
tracing::warn!(
"Permission denied for path: {}. Treating as not found.",
path
);
Err(FileIoError::NotFound)
}
_ => {
tracing::error!("OpenDAL error for path {}: {}", path, e);
Err(FileIoError::OpenDAL(e))
}
},
}
}
pub async fn exists(&self, path: &EntryPath) -> Result<bool, opendal::Error> {
self.operator.exists(path.as_str()).await
}
}
#[cfg(test)]
impl OpendalService {
pub fn new(context: &AppContext) -> Result<Self, FileIoError> {
let operator = build_storage_operator_from_context(context)?;
Ok(Self { operator })
}
pub fn new_from_operator(operator: Operator) -> Self {
Self { operator }
}
#[cfg(test)]
pub async fn get(&self, path: &EntryPath) -> Result<Bytes, FileIoError> {
let mut stream = self.get_stream(path).await?;
let mut content = Vec::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
content.extend_from_slice(&chunk);
}
Ok(Bytes::from(content))
}
#[cfg(test)]
pub async fn write(
&self,
path: &EntryPath,
buffer: impl Into<Buffer>,
) -> Result<FileMetadata, FileIoError> {
let buffer: Buffer = buffer.into();
let bytes = Bytes::from(buffer.to_vec());
let stream = Box::pin(futures_util::stream::once(async move { Ok(bytes) }));
self.write_stream(path, stream).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::persistence::files::opendal_test_operators::OpendalTestOperators;
use crate::shared::webdav::WebDavPath;
#[tokio::test]
async fn test_build_storage_operator_from_config_file_system() {
let mut context = AppContext::test();
context.config_toml.storage = StorageConfigToml::FileSystem;
let service =
OpendalService::new(&context).expect("Failed to create OpenDAL service for testing");
let pubky = pkarr::Keypair::random().public_key();
context
.db
.create_user(&pubky)
.expect("Failed to create user");
let path = EntryPath::new(pubky, WebDavPath::new("/test.txt").unwrap());
assert!(!service.exists(&path).await.unwrap());
}
#[tokio::test]
async fn test_quota_exceeded_error() {
let mut context = AppContext::test();
context.config_toml.general.user_storage_quota_mb = 1;
let service =
OpendalService::new(&context).expect("Failed to create OpenDAL service for testing");
let pubky = pkarr::Keypair::random().public_key();
context
.db
.create_user(&pubky)
.expect("Failed to create user");
let path = EntryPath::new(pubky, WebDavPath::new("/test.txt").unwrap());
let write_result = service.write(&path, vec![42u8; 1024 * 1024]).await;
assert!(write_result.is_err());
assert!(matches!(
write_result,
Err(FileIoError::DiskSpaceQuotaExceeded)
));
}
#[tokio::test]
async fn test_get_content_chunked() {
let operators = OpendalTestOperators::new();
for (_scheme, operator) in operators.operators() {
let file_service = OpendalService::new_from_operator(operator);
let pubkey = pkarr::Keypair::random().public_key();
let path = EntryPath::new(pubkey, WebDavPath::new("/test.txt").unwrap());
let should_chunk_count = 5;
let test_data = vec![42u8; should_chunk_count * CHUNK_SIZE];
file_service.write(&path, test_data.clone()).await.unwrap();
let mut stream = file_service.get_stream(&path).await.unwrap();
let mut collected_data = Vec::new();
let mut count = 0;
while let Some(chunk_result) = stream.next().await {
count += 1;
let chunk = chunk_result.unwrap();
collected_data.extend_from_slice(&chunk);
}
assert_eq!(
collected_data.len(),
test_data.len(),
"Total size should be 10KB"
);
assert_eq!(
collected_data, test_data,
"Content should match original data"
);
assert!(count >= should_chunk_count, "Should have received x chunks");
assert_eq!(
collected_data.len(),
should_chunk_count * CHUNK_SIZE,
"Total size should be 10KB"
);
assert_eq!(
collected_data, test_data,
"Content should match original data"
);
file_service
.delete(&path)
.await
.expect("Should delete file");
assert!(
!file_service.exists(&path).await.unwrap(),
"File should not exist after deletion"
);
}
}
#[tokio::test]
async fn test_write_content_stream() {
let operators = OpendalTestOperators::new();
for (_scheme, operator) in operators.operators() {
let file_service = OpendalService::new_from_operator(operator);
let pubkey = pkarr::Keypair::random().public_key();
let path = EntryPath::new(pubkey, WebDavPath::new("/test_stream.txt").unwrap());
let chunk_count = 3;
let mut test_data = Vec::new();
let mut chunks = Vec::new();
for i in 0..chunk_count {
let chunk_data = vec![i as u8; CHUNK_SIZE];
test_data.extend_from_slice(&chunk_data);
chunks.push(Ok(Bytes::from(chunk_data)));
}
let stream = futures_util::stream::iter(chunks);
file_service.write_stream(&path, stream).await.unwrap();
let read_content = file_service.get(&path).await.unwrap();
assert_eq!(
read_content.len(),
test_data.len(),
"Content length should match"
);
assert_eq!(
read_content.to_vec(),
test_data,
"Content should match original data"
);
file_service
.delete(&path)
.await
.expect("Should delete file");
assert!(
!file_service.exists(&path).await.unwrap(),
"File should not exist after deletion"
);
}
}
}