#[cfg(test)]
use crate::AppContext;
use crate::{
persistence::{
files::events::EventsService,
sql::{
entry::{EntryEntity, EntryRepository},
SqlDb, UnifiedExecutor,
},
},
shared::webdav::EntryPath,
ConfigToml,
};
use bytes::Bytes;
use futures_util::Stream;
#[cfg(test)]
use futures_util::StreamExt;
#[cfg(test)]
use opendal::Buffer;
use std::path::Path;
use super::super::{FileIoError, FileStream, OpendalService, WriteStreamError};
#[derive(Debug, Clone)]
pub struct FileService {
pub(crate) opendal: OpendalService,
pub(crate) db: SqlDb,
}
impl FileService {
pub fn new(opendal_service: OpendalService, db: SqlDb) -> Self {
Self {
opendal: opendal_service,
db,
}
}
pub fn new_from_config(
config: &ConfigToml,
data_directory: &Path,
db: SqlDb,
events_service: EventsService,
user_service: crate::services::user_service::UserService,
) -> Result<Self, FileIoError> {
let opendal_service = OpendalService::new_from_config(
&config.storage,
data_directory,
&db,
events_service,
user_service,
)?;
Ok(Self::new(opendal_service, db))
}
pub async fn get_info(
&self,
path: &EntryPath,
executor: &mut UnifiedExecutor<'_>,
) -> Result<EntryEntity, FileIoError> {
match EntryRepository::get_by_path(path, executor).await {
Ok(entry) => Ok(entry),
Err(sqlx::Error::RowNotFound) => Err(FileIoError::NotFound),
Err(e) => Err(e.into()),
}
}
pub async fn get_stream(&self, path: &EntryPath) -> Result<FileStream, FileIoError> {
let stream: FileStream = self.opendal.get_stream(path).await?;
Ok(stream)
}
pub async fn write_stream(
&self,
path: &EntryPath,
stream: impl Stream<Item = Result<Bytes, WriteStreamError>> + Unpin + Send,
) -> Result<EntryEntity, FileIoError> {
self.opendal.write_stream(path, stream).await?;
match EntryRepository::get_by_path(path, &mut self.db.pool().into()).await {
Ok(entry) => Ok(entry),
Err(sqlx::Error::RowNotFound) => Err(FileIoError::NotFound),
Err(e) => Err(e.into()),
}
}
pub async fn delete(&self, path: &EntryPath) -> Result<(), FileIoError> {
if !self.opendal.exists(path).await? {
return Err(FileIoError::NotFound);
}
self.opendal.delete(path).await?;
Ok(())
}
pub async fn admin_delete(&self, path: &EntryPath) -> Result<(), FileIoError> {
if !self.opendal.exists(path).await? {
return Err(FileIoError::NotFound);
}
self.opendal.admin_delete(path).await?;
Ok(())
}
}
#[cfg(test)]
impl FileService {
pub fn new_from_context(context: &AppContext) -> Result<Self, FileIoError> {
let opendal_service = OpendalService::new(context)?;
Ok(Self::new(opendal_service, context.sql_db.clone()))
}
pub async fn get(&self, path: &EntryPath) -> Result<Bytes, FileIoError> {
let mut stream = self.get_stream(path).await?;
let mut collected_data = Vec::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
collected_data.extend_from_slice(&chunk);
}
Ok(Bytes::from(collected_data))
}
pub async fn write(&self, path: &EntryPath, data: Buffer) -> Result<EntryEntity, FileIoError> {
let stream = futures_util::stream::iter(vec![Ok(Bytes::from(data.to_vec()))]);
let entry = self.write_stream(path, stream).await?;
Ok(entry)
}
}
#[cfg(test)]
mod tests {
use crate::{
persistence::sql::user::UserRepository, services::user_service::FILE_METADATA_SIZE,
shared::webdav::WebDavPath,
};
use futures_lite::StreamExt;
use super::*;
#[tokio::test]
#[pubky_test_utils::test]
async fn test_write_get_delete_db_and_opendal() {
let context = AppContext::test().await;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.sql_db.clone();
let pubkey = pubky_common::crypto::Keypair::random().public_key();
let user = UserRepository::create(&pubkey, &mut db.pool().into())
.await
.unwrap();
assert_eq!(user.used_bytes, 0);
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_file.txt").unwrap());
match file_service.get_stream(&path).await {
Ok(_) => panic!("Should error for non-existent file"),
Err(FileIoError::NotFound) => {}
Err(e) => panic!("Should error for non-existent file: {}", e),
};
let test_data = b"Hello, world! This is test data for the get method.";
let chunks = vec![Ok(Bytes::from(test_data.as_slice()))];
let stream = futures_util::stream::iter(chunks);
file_service.write_stream(&path, stream).await.unwrap();
let user = UserRepository::get(&pubkey, &mut db.pool().into())
.await
.unwrap();
assert_eq!(
user.used_bytes,
test_data.len() as u64 + FILE_METADATA_SIZE,
"Data usage should be the size of the file"
);
let mut stream = file_service
.get_stream(&path)
.await
.expect("File should exist");
let mut collected_data = Vec::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result.unwrap();
collected_data.extend_from_slice(&chunk);
}
assert_eq!(
collected_data,
test_data.to_vec(),
"Content should match original data"
);
file_service.delete(&path).await.unwrap();
let result = file_service.get_stream(&path).await;
assert!(result.is_err(), "Should error for deleted file");
let user = UserRepository::get(&pubkey, &mut db.pool().into())
.await
.unwrap();
assert_eq!(
user.used_bytes, 0,
"Data usage should be 0 after deleting file"
);
let path = EntryPath::new(
pubkey.clone(),
WebDavPath::new("/test_opendal.txt").unwrap(),
);
let chunks = vec![Ok(Bytes::from(test_data.as_slice()))];
let stream = futures_util::stream::iter(chunks);
file_service.write_stream(&path, stream).await.unwrap();
let user = UserRepository::get(&pubkey, &mut db.pool().into())
.await
.unwrap();
assert_eq!(
user.used_bytes,
test_data.len() as u64 + FILE_METADATA_SIZE,
"Data usage should be the size of the file"
);
let mut stream = file_service
.get_stream(&path)
.await
.expect("File should exist");
let mut collected_data = Vec::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result.unwrap();
collected_data.extend_from_slice(&chunk);
}
assert_eq!(
collected_data,
test_data.to_vec(),
"Content should match original data for OpenDal location"
);
file_service.delete(&path).await.unwrap();
let result = file_service.get_stream(&path).await;
assert!(result.is_err(), "Should error for deleted file");
let user = UserRepository::get(&pubkey, &mut db.pool().into())
.await
.unwrap();
assert_eq!(
user.used_bytes, 0,
"Data usage should be 0 after deleting file"
);
}
#[tokio::test]
#[pubky_test_utils::test]
async fn test_write_get_basic() {
let context = AppContext::test().await;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.sql_db.clone();
let pubkey = pubky_common::crypto::Keypair::random().public_key();
UserRepository::create(&pubkey, &mut db.pool().into())
.await
.unwrap();
let test_data = b"Hello, world!";
let buffer = Buffer::from(test_data.as_slice());
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_file.txt").unwrap());
file_service.write(&path, buffer.clone()).await.unwrap();
let content = file_service.get(&path).await.unwrap();
assert_eq!(content.as_ref(), test_data);
let opendal_path = EntryPath::new(pubkey, WebDavPath::new("/test_opendal.txt").unwrap());
file_service.write(&opendal_path, buffer).await.unwrap();
let content = file_service.get(&opendal_path).await.unwrap();
assert_eq!(content.as_ref(), test_data);
}
#[tokio::test]
#[pubky_test_utils::test]
async fn test_data_usage_update_basic() {
let context = AppContext::test().await;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.sql_db.clone();
let pubkey = pubky_common::crypto::Keypair::random().public_key();
UserRepository::create_with_quota_mb(&db, &pubkey, 1).await;
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_file.txt").unwrap());
let test_data = vec![1u8; 1024];
let buffer = Buffer::from(test_data.clone());
file_service.write(&path, buffer).await.unwrap();
let user = UserRepository::get(&pubkey, &mut db.pool().into())
.await
.unwrap();
assert_eq!(user.used_bytes, test_data.len() as u64 + FILE_METADATA_SIZE);
file_service.delete(&path).await.unwrap();
let user = UserRepository::get(&pubkey, &mut db.pool().into())
.await
.unwrap();
assert_eq!(user.used_bytes, 0);
}
#[tokio::test]
#[pubky_test_utils::test]
async fn test_data_usage_override_existing_entry() {
let context = AppContext::test().await;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.sql_db.clone();
let pubkey = pubky_common::crypto::Keypair::random().public_key();
UserRepository::create_with_quota_mb(&db, &pubkey, 1).await;
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_file.txt").unwrap());
let test_data = vec![1u8; 1024];
let buffer = Buffer::from(test_data.clone());
file_service.write(&path, buffer).await.unwrap();
let test_data2 = vec![2u8; 1024];
let buffer2 = Buffer::from(test_data2.clone());
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_file.txt").unwrap());
file_service.write(&path, buffer2).await.unwrap();
assert_eq!(
UserRepository::get(&pubkey, &mut db.pool().into())
.await
.unwrap()
.used_bytes,
test_data2.len() as u64 + FILE_METADATA_SIZE
);
}
#[tokio::test]
#[pubky_test_utils::test]
async fn test_data_usage_exactly_to_quota() {
let context = AppContext::test().await;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.sql_db.clone();
let pubkey = pubky_common::crypto::Keypair::random().public_key();
UserRepository::create_with_quota_mb(&db, &pubkey, 1).await;
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_file.txt").unwrap());
let test_data = vec![1u8; 1024 * 1024 - FILE_METADATA_SIZE as usize];
let buffer = Buffer::from(test_data.clone());
file_service.write(&path, buffer).await.unwrap();
assert_eq!(
UserRepository::get(&pubkey, &mut db.pool().into())
.await
.unwrap()
.used_bytes,
test_data.len() as u64 + FILE_METADATA_SIZE
);
}
#[tokio::test]
#[pubky_test_utils::test]
async fn test_data_usage_above_quota() {
let context = AppContext::test().await;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.sql_db.clone();
let pubkey = pubky_common::crypto::Keypair::random().public_key();
UserRepository::create_with_quota_mb(&db, &pubkey, 1).await;
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_file.txt").unwrap());
let test_data = vec![1u8; 1024 * 1024 + 1];
let buffer = Buffer::from(test_data.clone());
match file_service.write(&path, buffer).await {
Ok(_) => panic!("Should error for file above quota"),
Err(FileIoError::DiskSpaceQuotaExceeded) => {} Err(e) => {
panic!("Should error for file above quota: {:?}", e);
}
}
assert_eq!(
UserRepository::get(&pubkey, &mut db.pool().into())
.await
.unwrap()
.used_bytes,
0
);
}
#[tokio::test]
#[pubky_test_utils::test]
async fn test_data_usage_override_existing_above_quota() {
let context = AppContext::test().await;
let file_service = FileService::new_from_context(&context).unwrap();
let db = context.sql_db.clone();
let pubkey = pubky_common::crypto::Keypair::random().public_key();
UserRepository::create_with_quota_mb(&db, &pubkey, 1).await;
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_file.txt").unwrap());
let test_data = vec![1u8; 1024];
let buffer = Buffer::from(test_data.clone());
file_service.write(&path, buffer).await.unwrap();
let test_data2 = vec![2u8; 1024 * 1024 + 1];
let buffer2 = Buffer::from(test_data2.clone());
let path = EntryPath::new(pubkey.clone(), WebDavPath::new("/test_file.txt").unwrap());
match file_service.write(&path, buffer2).await {
Ok(_) => panic!("Should error for file above quota"),
Err(FileIoError::DiskSpaceQuotaExceeded) => {} Err(e) => {
panic!("Should error for file above quota: {:?}", e);
}
}
assert_eq!(
UserRepository::get(&pubkey, &mut db.pool().into())
.await
.unwrap()
.used_bytes,
test_data.len() as u64 + FILE_METADATA_SIZE
);
}
}