#[cfg(test)]
use crate::AppContext;
use crate::{
persistence::lmdb::{tables::entries::Entry, LmDB},
shared::webdav::EntryPath,
ConfigToml,
};
use bytes::Bytes;
use futures_util::Stream;
#[cfg(test)]
use futures_util::StreamExt;
#[cfg(test)]
use opendal::Buffer;
use std::path::Path;
use super::{FileIoError, FileStream, OpendalService, WriteStreamError};
#[derive(Debug, Clone)]
pub struct FileService {
pub(crate) opendal: OpendalService,
pub(crate) db: LmDB,
}
impl FileService {
pub fn new(opendal_service: OpendalService, db: LmDB) -> Self {
Self {
opendal: opendal_service,
db,
}
}
pub fn new_from_config(
config: &ConfigToml,
data_directory: &Path,
db: LmDB,
) -> Result<Self, FileIoError> {
let user_quota_bytes = match config.general.user_storage_quota_mb {
0 => u64::MAX,
other => other * 1024 * 1024,
};
let opendal_service = OpendalService::new_from_config(
&config.storage,
data_directory,
&db,
user_quota_bytes,
)?;
Ok(Self::new(opendal_service, db))
}
pub async fn get_info(&self, path: &EntryPath) -> Result<Entry, FileIoError> {
self.db.get_entry(path)
}
pub async fn get_stream(&self, path: &EntryPath) -> Result<FileStream, FileIoError> {
let stream: FileStream = self.opendal.get_stream(path).await?;
Ok(stream)
}
pub async fn write_stream(
&self,
path: &EntryPath,
stream: impl Stream<Item = Result<Bytes, WriteStreamError>> + Unpin + Send,
) -> Result<Entry, FileIoError> {
self.opendal.write_stream(path, stream).await?;
self.db.get_entry(path)
}
pub async fn delete(&self, path: &EntryPath) -> Result<(), FileIoError> {
if !self.opendal.exists(path).await? {
return Err(FileIoError::NotFound);
}
self.opendal.delete(path).await?;
Ok(())
}
}
#[cfg(test)]
impl FileService {
pub fn new_from_context(context: &AppContext) -> Result<Self, FileIoError> {
let opendal_service = OpendalService::new(context)?;
Ok(Self::new(opendal_service, context.db.clone()))
}
pub async fn get(&self, path: &EntryPath) -> Result<Bytes, FileIoError> {
let mut stream = self.get_stream(path).await?;
let mut collected_data = Vec::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
collected_data.extend_from_slice(&chunk);
}
Ok(Bytes::from(collected_data))
}
pub async fn write(&self, path: &EntryPath, data: Buffer) -> Result<Entry, FileIoError> {
let stream = futures_util::stream::iter(vec![Ok(Bytes::from(data.to_vec()))]);
let entry = self.write_stream(path, stream).await?;
Ok(entry)
}
}
#[cfg(test)]
mod tests {
use crate::{
persistence::files::user_quota_layer::FILE_METADATA_SIZE, shared::webdav::WebDavPath,
};
use futures_lite::StreamExt;
use super::*;
#[tokio::test]
async fn test_write_get_delete_lmdb_and_opendal() {
let context = AppContext::test();
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.db.clone();
let pubkey = pkarr::Keypair::random().public_key();
db.create_user(&pubkey).unwrap();
assert_eq!(db.get_user_data_usage(&pubkey).unwrap(), Some(0));
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_lmdb.txt").unwrap());
match file_service.get_stream(&path).await {
Ok(_) => panic!("Should error for non-existent file"),
Err(FileIoError::NotFound) => {}
Err(e) => panic!("Should error for non-existent file: {}", e),
};
let test_data = b"Hello, world! This is test data for the get method.";
let chunks = vec![Ok(Bytes::from(test_data.as_slice()))];
let stream = futures_util::stream::iter(chunks);
file_service.write_stream(&path, stream).await.unwrap();
assert_eq!(
db.get_user_data_usage(&pubkey).unwrap(),
Some(test_data.len() as u64 + FILE_METADATA_SIZE),
"Data usage should be the size of the file"
);
let mut stream = file_service
.get_stream(&path)
.await
.expect("File should exist");
let mut collected_data = Vec::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result.unwrap();
collected_data.extend_from_slice(&chunk);
}
assert_eq!(
collected_data,
test_data.to_vec(),
"Content should match original data for LMDB location"
);
file_service.delete(&path).await.unwrap();
let result = file_service.get_stream(&path).await;
assert!(result.is_err(), "Should error for deleted file");
assert_eq!(
db.get_user_data_usage(&pubkey).unwrap(),
Some(0),
"Data usage should be 0 after deleting file"
);
let path = EntryPath::new(
pubkey.clone(),
WebDavPath::new("/test_opendal.txt").unwrap(),
);
let chunks = vec![Ok(Bytes::from(test_data.as_slice()))];
let stream = futures_util::stream::iter(chunks);
file_service.write_stream(&path, stream).await.unwrap();
assert_eq!(
db.get_user_data_usage(&pubkey).unwrap(),
Some(test_data.len() as u64 + FILE_METADATA_SIZE),
"Data usage should be the size of the file"
);
let mut stream = file_service
.get_stream(&path)
.await
.expect("File should exist");
let mut collected_data = Vec::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result.unwrap();
collected_data.extend_from_slice(&chunk);
}
assert_eq!(
collected_data,
test_data.to_vec(),
"Content should match original data for OpenDal location"
);
file_service.delete(&path).await.unwrap();
let result = file_service.get_stream(&path).await;
assert!(result.is_err(), "Should error for deleted file");
assert_eq!(
db.get_user_data_usage(&pubkey).unwrap(),
Some(0),
"Data usage should be 0 after deleting file"
);
}
#[tokio::test]
async fn test_write_get_basic() {
let context = AppContext::test();
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.db.clone();
let pubkey = pkarr::Keypair::random().public_key();
db.create_user(&pubkey).unwrap();
let test_data = b"Hello, world!";
let buffer = Buffer::from(test_data.as_slice());
let lmdb_path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_lmdb.txt").unwrap());
file_service
.write(&lmdb_path, buffer.clone())
.await
.unwrap();
let content = file_service.get(&lmdb_path).await.unwrap();
assert_eq!(content.as_ref(), test_data);
let opendal_path = EntryPath::new(pubkey, WebDavPath::new("/test_opendal.txt").unwrap());
file_service.write(&opendal_path, buffer).await.unwrap();
let content = file_service.get(&opendal_path).await.unwrap();
assert_eq!(content.as_ref(), test_data);
}
#[tokio::test]
async fn test_data_usage_update_basic() {
let mut context = AppContext::test();
context.config_toml.general.user_storage_quota_mb = 1;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.db.clone();
let pubkey = pkarr::Keypair::random().public_key();
db.create_user(&pubkey).unwrap();
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_lmdb.txt").unwrap());
let test_data = vec![1u8; 1024];
let buffer = Buffer::from(test_data.clone());
file_service.write(&path, buffer).await.unwrap();
assert_eq!(
db.get_user_data_usage(&pubkey).unwrap(),
Some(test_data.len() as u64 + FILE_METADATA_SIZE)
);
file_service.delete(&path).await.unwrap();
assert_eq!(db.get_user_data_usage(&pubkey).unwrap(), Some(0));
}
#[tokio::test]
async fn test_data_usage_override_existing_entry() {
let mut context = AppContext::test();
context.config_toml.general.user_storage_quota_mb = 1;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.db.clone();
let pubkey = pkarr::Keypair::random().public_key();
db.create_user(&pubkey).unwrap();
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_lmdb.txt").unwrap());
let test_data = vec![1u8; 1024];
let buffer = Buffer::from(test_data.clone());
file_service.write(&path, buffer).await.unwrap();
let test_data2 = vec![2u8; 1024];
let buffer2 = Buffer::from(test_data2.clone());
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_lmdb.txt").unwrap());
file_service.write(&path, buffer2).await.unwrap();
assert_eq!(
db.get_user_data_usage(&pubkey).unwrap(),
Some(test_data2.len() as u64 + FILE_METADATA_SIZE)
);
}
#[tokio::test]
async fn test_data_usage_exactly_to_quota() {
let mut context = AppContext::test();
context.config_toml.general.user_storage_quota_mb = 1;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.db.clone();
let pubkey = pkarr::Keypair::random().public_key();
db.create_user(&pubkey).unwrap();
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_lmdb.txt").unwrap());
let test_data = vec![1u8; 1024 * 1024 - FILE_METADATA_SIZE as usize];
let buffer = Buffer::from(test_data.clone());
file_service.write(&path, buffer).await.unwrap();
assert_eq!(
db.get_user_data_usage(&pubkey).unwrap(),
Some(test_data.len() as u64 + FILE_METADATA_SIZE)
);
}
#[tokio::test]
async fn test_data_usage_above_quota() {
let mut context = AppContext::test();
context.config_toml.general.user_storage_quota_mb = 1;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.db.clone();
let pubkey = pkarr::Keypair::random().public_key();
db.create_user(&pubkey).unwrap();
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_lmdb.txt").unwrap());
let test_data = vec![1u8; 1024 * 1024 + 1];
let buffer = Buffer::from(test_data.clone());
match file_service.write(&path, buffer).await {
Ok(_) => panic!("Should error for file above quota"),
Err(FileIoError::DiskSpaceQuotaExceeded) => {} Err(e) => {
panic!("Should error for file above quota: {:?}", e);
}
}
assert_eq!(db.get_user_data_usage(&pubkey).unwrap(), Some(0));
}
#[tokio::test]
async fn test_data_usage_override_existing_above_quota() {
let mut context = AppContext::test();
context.config_toml.general.user_storage_quota_mb = 1;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.db.clone();
let pubkey = pkarr::Keypair::random().public_key();
db.create_user(&pubkey).unwrap();
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_lmdb.txt").unwrap());
let test_data = vec![1u8; 1024];
let buffer = Buffer::from(test_data.clone());
file_service.write(&path, buffer).await.unwrap();
let test_data2 = vec![2u8; 1024 * 1024 + 1];
let buffer2 = Buffer::from(test_data2.clone());
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_lmdb.txt").unwrap());
match file_service.write(&path, buffer2).await {
Ok(_) => panic!("Should error for file above quota"),
Err(FileIoError::DiskSpaceQuotaExceeded) => {} Err(e) => {
panic!("Should error for file above quota: {:?}", e);
}
}
assert_eq!(
db.get_user_data_usage(&pubkey).unwrap(),
Some(test_data.len() as u64 + FILE_METADATA_SIZE)
);
}
}