use crate::storage::bucket::Bucket;
use crate::storage::entry::Entry;
use log::debug;
use reduct_base::error::ReductError;
use reduct_base::msg::bucket_api::QuotaType;
use reduct_base::{bad_request, internal_server_error};
use std::sync::Arc;
impl Bucket {
pub(super) async fn keep_quota_for(
self: &Arc<Self>,
content_size: u64,
) -> Result<(), ReductError> {
let settings = self.settings.read().await?;
let quota_size = settings.quota_size.unwrap_or(0);
match settings.quota_type.clone().unwrap_or(QuotaType::NONE) {
QuotaType::NONE => Ok(()),
QuotaType::FIFO => self.remove_oldest_block(content_size, quota_size).await,
QuotaType::HARD => {
let entries = self.entries.read().await?;
let entry_list: Vec<_> = entries.values().cloned().collect();
drop(entries);
let mut total_size = 0u64;
for entry in entry_list {
total_size += entry.size().await?;
}
if total_size + content_size as u64 > quota_size {
Err(bad_request!("Quota of '{}' exceeded", self.name()))
} else {
Ok(())
}
}
}
}
async fn remove_oldest_block(
&self,
content_size: u64,
quota_size: u64,
) -> Result<(), ReductError> {
let get_bucket_size = async || {
let entries = self.entries.read().await?;
let mut total_size = 0u64;
for entry in entries.values() {
total_size += entry.size().await?;
}
Ok::<u64, ReductError>(total_size)
};
let mut size = get_bucket_size().await? + content_size as u64;
while size > quota_size {
let mut success = false;
{
debug!(
"Need more space. Remove an oldest block from bucket '{}'",
self.name()
);
let mut candidates: Vec<(u64, &Entry)> = vec![];
let entries = self.entries.read().await?;
for (_, entry) in entries.iter() {
if !entry.is_eligible_for_fifo_eviction() {
continue;
}
let info = entry.info().await?;
candidates.push((info.oldest_record, entry));
}
candidates.sort_by_key(|entry| entry.0);
let candidates = candidates
.iter()
.map(|(_, entry)| entry.name().to_string())
.collect::<Vec<String>>();
for name in candidates {
debug!("Remove an oldest block from entry '{}'", name);
match entries.get(&name).unwrap().try_remove_oldest_block().await {
Ok(_) => {
success = true;
break;
}
Err(e) => {
debug!("Failed to remove oldest block from entry '{}': {}", name, e);
}
}
}
}
if !success {
return Err(internal_server_error!(
"Failed to keep quota of '{}'",
self.name()
));
}
size = get_bucket_size().await? + content_size;
}
let mut entries = self.entries.write().await?;
let mut names_to_remove = vec![];
for (name, entry) in entries.iter() {
if entry.info().await?.record_count != 0 {
continue;
}
names_to_remove.push(name.clone());
}
for name in names_to_remove {
entries.remove(&name);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::storage::bucket::tests::{bucket, path, write, write_meta};
use reduct_base::error::ReductError;
use reduct_base::msg::bucket_api::{BucketSettings, QuotaType};
use rstest::rstest;
use std::path::PathBuf;
#[rstest]
#[tokio::test]
async fn test_fifo_quota_keeping(path: PathBuf) {
let bucket = bucket(
BucketSettings {
max_block_size: Some(20),
quota_type: Some(QuotaType::FIFO),
quota_size: Some(120),
max_block_records: Some(100),
},
path,
)
.await;
let blob: &[u8] = &[0u8; 40];
write(&bucket, "test-1", 0, blob).await.unwrap();
assert_eq!(bucket.clone().info().await.unwrap().info.size, 44);
write(&bucket, "test-2", 1, blob).await.unwrap();
assert_eq!(bucket.clone().info().await.unwrap().info.size, 91);
write(&bucket, "test-3", 2, blob).await.unwrap();
assert_eq!(bucket.clone().info().await.unwrap().info.size, 94);
assert_eq!(
crate::storage::bucket::tests::read(&bucket, "test-1", 0)
.await
.err(),
Some(ReductError::not_found(
"Entry 'test-1' not found in bucket 'test'"
))
);
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_fifo_quota_ignores_meta_entries_for_eviction(path: PathBuf) {
let bucket = bucket(
BucketSettings {
max_block_size: Some(20),
quota_type: Some(QuotaType::FIFO),
quota_size: Some(120),
max_block_records: Some(100),
},
path,
)
.await;
let blob: &[u8] = &[0u8; 40];
write_meta(&bucket, "data-1/$meta", 0, blob).await.unwrap();
write(&bucket, "data-1", 1, blob).await.unwrap();
write(&bucket, "data-2", 2, blob).await.unwrap();
assert!(crate::storage::bucket::tests::read(&bucket, "data-1", 1)
.await
.is_err());
assert!(
crate::storage::bucket::tests::read(&bucket, "data-1/$meta", 0)
.await
.is_ok()
);
assert!(crate::storage::bucket::tests::read(&bucket, "data-2", 2)
.await
.is_ok());
}
#[rstest]
#[tokio::test]
async fn test_hard_quota_keeping(path: PathBuf) {
let bucket = bucket(
BucketSettings {
quota_type: Some(QuotaType::HARD),
quota_size: Some(100),
..BucketSettings::default()
},
path,
)
.await;
let blob: &[u8] = &[0u8; 40];
write(&bucket, "test-1", 0, blob).await.unwrap();
write(&bucket, "test-2", 1, blob).await.unwrap();
let err = write(&bucket, "test-3", 2, blob).await.err().unwrap();
assert_eq!(err, ReductError::bad_request("Quota of 'test' exceeded"));
}
#[rstest]
#[tokio::test]
async fn test_blob_bigger_than_quota(path: PathBuf) {
let bucket = bucket(
BucketSettings {
max_block_size: Some(5),
quota_type: Some(QuotaType::FIFO),
quota_size: Some(10),
max_block_records: Some(100),
},
path,
)
.await;
write(&bucket, "test-1", 0, b"test").await.unwrap();
bucket.sync_fs().await.unwrap(); assert_eq!(bucket.clone().info().await.unwrap().info.size, 22);
let result = write(&bucket, "test-2", 1, b"0123456789___").await;
assert_eq!(
result.err(),
Some(ReductError::internal_server_error(
"Failed to keep quota of 'test'"
))
);
}
}