use std::collections::HashMap;
use std::sync::Arc;
use crate::persistence::lmdb::LmDB;
use crate::shared::webdav::EntryPath;
use opendal::raw::*;
use opendal::Result;
pub(crate) const FILE_METADATA_SIZE: u64 = 256;
#[derive(Clone)]
pub struct UserQuotaLayer {
pub(crate) db: LmDB,
pub(crate) user_quota_bytes: u64,
}
impl UserQuotaLayer {
pub fn new(db: LmDB, user_quota_bytes: u64) -> Self {
Self {
db,
user_quota_bytes,
}
}
}
impl<A: Access> Layer<A> for UserQuotaLayer {
type LayeredAccess = UserQuotaAccessor<A>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
UserQuotaAccessor {
inner: Arc::new(inner),
db: self.db.clone(),
user_quota_bytes: self.user_quota_bytes,
}
}
}
fn ensure_valid_path(path: &str) -> Result<EntryPath, opendal::Error> {
let path: EntryPath = match path.parse() {
Ok(path) => path,
Err(e) => {
return Err(opendal::Error::new(
opendal::ErrorKind::PermissionDenied,
e.to_string(),
));
}
};
Ok(path)
}
#[derive(Debug, Clone)]
pub struct UserQuotaAccessor<A: Access> {
inner: Arc<A>,
db: LmDB,
user_quota_bytes: u64,
}
impl<A: Access> LayeredAccess for UserQuotaAccessor<A> {
type Inner = A;
type Reader = A::Reader;
type Writer = WriterWrapper<A::Writer, A>;
type Lister = A::Lister;
type Deleter = DeleterWrapper<A::Deleter, A>;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
ensure_valid_path(path)?;
self.inner.create_dir(path, args).await
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.inner.read(path, args).await
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let entry_path = ensure_valid_path(path)?;
let (rp, writer) = self.inner.write(path, args).await?;
Ok((
rp,
WriterWrapper {
inner: writer,
db: self.db.clone(),
bytes_count: 0,
entry_path,
inner_accessor: self.inner.clone(),
user_quota_bytes: self.user_quota_bytes,
},
))
}
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
let _ = ensure_valid_path(to)?;
self.inner.copy(from, to, args).await
}
async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
let _ = ensure_valid_path(to)?;
self.inner.rename(from, to, args).await
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.inner.stat(path, args).await
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
let (rp, deleter) = self.inner.delete().await?;
Ok((
rp,
DeleterWrapper {
inner: deleter,
db: self.db.clone(),
inner_accessor: self.inner.clone(),
path_queue: Vec::new(),
},
))
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.inner.list(path, args).await
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
self.inner.presign(path, args).await
}
type BlockingReader = A::BlockingReader;
type BlockingWriter = A::BlockingWriter;
type BlockingLister = A::BlockingLister;
type BlockingDeleter = A::BlockingDeleter;
fn blocking_read(
&self,
path: &str,
args: opendal::raw::OpRead,
) -> opendal::Result<(opendal::raw::RpRead, Self::BlockingReader)> {
self.inner.blocking_read(path, args)
}
fn blocking_write(
&self,
_path: &str,
_args: opendal::raw::OpWrite,
) -> opendal::Result<(opendal::raw::RpWrite, Self::BlockingWriter)> {
Err(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"Writing is not supported in blocking mode",
))
}
fn blocking_delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::BlockingDeleter)> {
Err(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"Deleting is not supported in blocking mode",
))
}
fn blocking_list(
&self,
path: &str,
args: opendal::raw::OpList,
) -> opendal::Result<(opendal::raw::RpList, Self::BlockingLister)> {
self.inner.blocking_list(path, args)
}
}
fn update_user_quota(
db: &LmDB,
user_pubkey: &pkarr::PublicKey,
bytes_delta: i64,
) -> anyhow::Result<()> {
let mut wtxn = db.env.write_txn()?;
let mut user = db
.tables
.users
.get(&wtxn, user_pubkey)?
.ok_or(anyhow::anyhow!("User not found"))?;
user.used_bytes = user.used_bytes.saturating_add_signed(bytes_delta);
db.tables.users.put(&mut wtxn, user_pubkey, &user)?;
wtxn.commit()?;
Ok(())
}
fn does_user_exist(db: &LmDB, user_pubkey: &pkarr::PublicKey) -> anyhow::Result<bool> {
let wtxn = db.env.read_txn()?;
let user = db.tables.users.get(&wtxn, user_pubkey)?;
Ok(user.is_some())
}
pub struct WriterWrapper<R, A: Access> {
inner: R,
db: LmDB,
bytes_count: u64,
entry_path: EntryPath,
inner_accessor: Arc<A>,
user_quota_bytes: u64,
}
impl<R, A: Access> WriterWrapper<R, A> {
async fn get_current_file_size(&self) -> Result<(u64, bool), opendal::Error> {
let stats = match self
.inner_accessor
.stat(self.entry_path.to_string().as_str(), OpStat::default())
.await
{
Ok(stats) => stats,
Err(e) if e.kind() == opendal::ErrorKind::NotFound => {
return Ok((0, false));
}
Err(e) => {
return Err(e);
}
};
let file_size = stats.into_metadata().content_length();
Ok((file_size, true))
}
}
impl<R: oio::Write, A: Access> oio::Write for WriterWrapper<R, A> {
async fn write(&mut self, bs: opendal::Buffer) -> Result<()> {
self.bytes_count += bs.len() as u64;
self.inner.write(bs).await
}
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
async fn close(&mut self) -> Result<opendal::Metadata> {
let current_user_bytes = self
.db
.get_user_data_usage(self.entry_path.pubkey())
.map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, e.to_string()))?;
let current_user_bytes = current_user_bytes.ok_or(opendal::Error::new(
opendal::ErrorKind::Unexpected,
"User not found",
))?;
let (current_file_size, file_already_exists) = self.get_current_file_size().await?;
let bytes_delta = if file_already_exists {
self.bytes_count as i64 - current_file_size as i64
} else {
self.bytes_count as i64 - current_file_size as i64 + FILE_METADATA_SIZE as i64
};
if (current_user_bytes as i128 + bytes_delta as i128) as u64 > self.user_quota_bytes {
return Err(opendal::Error::new(
opendal::ErrorKind::RateLimited,
"User quota exceeded",
));
}
let metadata = self.inner.close().await?;
update_user_quota(&self.db, self.entry_path.pubkey(), bytes_delta)
.map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, e.to_string()))?;
Ok(metadata)
}
}
struct DeletePath {
entry_path: EntryPath,
bytes_count: Option<u64>,
exists: Option<bool>,
}
impl DeletePath {
fn new(path: &str) -> anyhow::Result<Self> {
let entry_path = ensure_valid_path(path)?;
Ok(Self {
entry_path,
bytes_count: None,
exists: None,
})
}
pub async fn pull_bytes_count<A: Access>(&mut self, operator: &A) -> Result<()> {
if self.bytes_count.is_some() {
return Ok(());
}
let size = match operator
.stat(self.entry_path.as_str(), OpStat::default())
.await
{
Ok(stats) => stats.into_metadata().content_length(),
Err(e) if e.kind() == opendal::ErrorKind::NotFound => {
self.exists = Some(false);
return Ok(());
}
Err(e) => {
return Err(e);
}
};
self.bytes_count = Some(size);
self.exists = Some(true);
Ok(())
}
}
pub struct DeleterWrapper<R, A: Access> {
inner: R,
db: LmDB,
inner_accessor: Arc<A>,
path_queue: Vec<DeletePath>,
}
impl<R, A: Access> DeleterWrapper<R, A> {
fn update_user_quota(&self, deleted_paths: Vec<DeletePath>) -> Result<()> {
let mut user_paths: HashMap<pkarr::PublicKey, Vec<DeletePath>> = HashMap::new();
for path in deleted_paths {
user_paths
.entry(path.entry_path.pubkey().clone())
.or_default()
.push(path);
}
for (user_pubkey, paths) in user_paths {
if !does_user_exist(&self.db, &user_pubkey)
.map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, e.to_string()))?
{
continue;
}
let total_bytes: u64 = paths.iter().filter_map(|p| p.bytes_count).sum();
let files_deleted_count =
paths.iter().filter(|p| p.exists.unwrap_or(false)).count() as u64;
let bytes_delta = (total_bytes + files_deleted_count * FILE_METADATA_SIZE) as i64;
update_user_quota(&self.db, &user_pubkey, -bytes_delta)
.map_err(|e| opendal::Error::new(opendal::ErrorKind::Unexpected, e.to_string()))?;
}
Ok(())
}
}
impl<R: oio::Delete, A: Access> oio::Delete for DeleterWrapper<R, A> {
async fn flush(&mut self) -> Result<usize> {
for path in self.path_queue.iter_mut() {
path.pull_bytes_count(&self.inner_accessor).await?;
}
let deleted_files_count = self.inner.flush().await?;
let deleted_paths = self
.path_queue
.drain(0..deleted_files_count)
.collect::<Vec<_>>();
self.update_user_quota(deleted_paths)?;
Ok(deleted_files_count)
}
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
let helper = match DeletePath::new(path) {
Ok(helper) => helper,
Err(e) => {
return Err(opendal::Error::new(
opendal::ErrorKind::PermissionDenied,
e.to_string(),
));
}
};
self.inner.delete(path, args)?;
self.path_queue.push(helper);
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::persistence::files::opendal_test_operators::{
get_memory_operator, OpendalTestOperators,
};
use super::*;
fn get_user_data_usage(db: &LmDB, user_pubkey: &pkarr::PublicKey) -> anyhow::Result<u64> {
let wtxn = db.env.read_txn()?;
let user = db.get_user(user_pubkey, &wtxn)?.ok_or(opendal::Error::new(
opendal::ErrorKind::Unexpected,
"User not found",
))?;
Ok(user.used_bytes)
}
#[tokio::test]
async fn test_ensure_valid_path() {
for (_scheme, operator) in OpendalTestOperators::new().operators() {
let db = LmDB::test();
let layer = UserQuotaLayer::new(db.clone(), 1024 * 1024);
let operator = operator.layer(layer);
operator
.write("1234567890/test.txt", vec![0; 10])
.await
.expect_err("Should fail because the path doesn't start with a pubkey");
let pubkey = pkarr::Keypair::random().public_key();
db.create_user(&pubkey).unwrap();
operator
.write(format!("{}/test.txt", pubkey).as_str(), vec![0; 10])
.await
.expect("Should succeed because the path starts with a pubkey");
operator
.write("test.txt", vec![0; 10])
.await
.expect_err("Should fail because the path doesn't start with a pubkey");
}
}
#[tokio::test]
async fn test_quota_updated_write_delete() {
let db = LmDB::test();
let layer = UserQuotaLayer::new(db.clone(), 1024 * 1024);
let operator = get_memory_operator().layer(layer);
let user_pubkey1 = pkarr::Keypair::random().public_key();
db.create_user(&user_pubkey1).unwrap();
operator
.write(format!("{}/test.txt1", user_pubkey1).as_str(), vec![0; 10])
.await
.unwrap();
let user_usage = get_user_data_usage(&db, &user_pubkey1).unwrap();
assert_eq!(user_usage, 10 + FILE_METADATA_SIZE);
operator
.write(format!("{}/test.txt1", user_pubkey1).as_str(), vec![0; 12])
.await
.unwrap();
let user_usage = get_user_data_usage(&db, &user_pubkey1).unwrap();
assert_eq!(user_usage, 12 + FILE_METADATA_SIZE);
operator
.write(format!("{}/test.txt2", user_pubkey1).as_str(), vec![0; 5])
.await
.unwrap();
let user_usage = get_user_data_usage(&db, &user_pubkey1).unwrap();
assert_eq!(user_usage, 17 + 2 * FILE_METADATA_SIZE);
operator
.delete(format!("{}/test.txt1", user_pubkey1).as_str())
.await
.unwrap();
let user_usage = get_user_data_usage(&db, &user_pubkey1).unwrap();
assert_eq!(user_usage, 5 + FILE_METADATA_SIZE);
operator
.delete(format!("{}/test.txt2", user_pubkey1).as_str())
.await
.unwrap();
let user_usage = get_user_data_usage(&db, &user_pubkey1).unwrap();
assert_eq!(user_usage, 0);
}
#[tokio::test]
async fn test_quota_rechead() {
let db = LmDB::test();
let layer = UserQuotaLayer::new(db.clone(), 20 + FILE_METADATA_SIZE);
let operator = get_memory_operator().layer(layer);
let user_pubkey1 = pkarr::Keypair::random().public_key();
db.create_user(&user_pubkey1).unwrap();
let file_name1 = format!("{}/test1.txt", user_pubkey1);
operator
.write(file_name1.as_str(), vec![0; 21])
.await
.expect_err("Should fail because the user quota is exceeded");
operator
.read(file_name1.as_str())
.await
.expect_err("Should fail because the file doesn't exist");
let user_usage = get_user_data_usage(&db, &user_pubkey1).unwrap();
assert_eq!(user_usage, 0);
operator
.write(file_name1.as_str(), vec![0; 20])
.await
.expect("Should succeed because the user quota is exactly the limit");
operator
.read(file_name1.as_str())
.await
.expect("Should succeed because the file exists");
let user_usage = get_user_data_usage(&db, &user_pubkey1).unwrap();
assert_eq!(user_usage, 20 + FILE_METADATA_SIZE);
let file_name2 = format!("{}/test2.txt", user_pubkey1);
operator
.write(file_name2.as_str(), vec![0; 1])
.await
.expect_err("Should fail because the user quota is exceeded");
}
}