mod get_entry;
mod query;
mod quotas;
mod read_only;
mod remove_entry;
mod remove_records;
mod rename_entry;
pub(super) mod settings;
pub(crate) mod update_records;
use crate::cfg::{Cfg, InstanceRole};
use crate::core::file_cache::FILE_CACHE;
use crate::core::sync::AsyncRwLock;
use crate::core::weak::Weak;
pub use crate::storage::block_manager::RecordRx;
pub use crate::storage::block_manager::RecordTx;
use crate::storage::bucket::query::MultiEntryQuery;
use crate::storage::bucket::settings::{
DEFAULT_MAX_BLOCK_SIZE, DEFAULT_MAX_RECORDS, SETTINGS_NAME,
};
use crate::storage::engine::ReadOnlyMode;
use crate::storage::entry::{
is_system_meta_entry, Entry, EntrySettings, META_ENTRY_MAX_BLOCK_SIZE,
};
use crate::storage::folder_keeper::FolderKeeper;
use crate::storage::proto::BucketSettings as ProtoBucketSettings;
use log::{debug, error};
use prost::bytes::Bytes;
use prost::Message;
use reduct_base::error::ReductError;
use reduct_base::io::WriteRecord;
use reduct_base::msg::bucket_api::{BucketInfo, BucketSettings, FullBucketInfo};
use reduct_base::msg::status::ResourceStatus;
use reduct_base::{conflict, internal_server_error, Labels};
use std::collections::{BTreeMap, HashMap};
use std::io::{Read, SeekFrom};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
fn normalize_entry_name(path: &std::path::Path) -> String {
path.to_string_lossy().replace('\\', "/")
}
fn settings_for_entry(entry_name: &str, settings: &BucketSettings) -> EntrySettings {
EntrySettings {
max_block_size: if is_system_meta_entry(entry_name) {
META_ENTRY_MAX_BLOCK_SIZE
} else {
settings.max_block_size.unwrap_or(DEFAULT_MAX_BLOCK_SIZE)
},
max_block_records: settings.max_block_records.unwrap_or(DEFAULT_MAX_RECORDS),
}
}
pub(crate) struct Bucket {
name: String,
path: PathBuf,
entries: Arc<AsyncRwLock<BTreeMap<String, Arc<Entry>>>>,
settings: AsyncRwLock<BucketSettings>,
folder_keeper: Arc<FolderKeeper>,
cfg: Arc<Cfg>,
last_replica_sync: AsyncRwLock<Instant>,
is_provisioned: AtomicBool,
status: AsyncRwLock<ResourceStatus>,
#[allow(dead_code)]
queries: AsyncRwLock<HashMap<u64, MultiEntryQuery>>,
}
impl Bucket {
pub(crate) async fn try_build(
name: &str,
path: &PathBuf,
settings: BucketSettings,
cfg: Cfg,
) -> Result<Bucket, ReductError> {
let path = path.join(name);
let settings = Self::fill_settings(settings, Self::defaults());
let folder_keeper = FolderKeeper::new(path.clone(), &cfg).await;
let bucket = Bucket {
name: name.to_string(),
path,
entries: Arc::new(AsyncRwLock::new(BTreeMap::new())),
settings: AsyncRwLock::new(settings),
is_provisioned: AtomicBool::new(false),
status: AsyncRwLock::new(ResourceStatus::Ready),
cfg: Arc::new(cfg),
last_replica_sync: AsyncRwLock::new(Instant::now()),
folder_keeper: Arc::new(folder_keeper),
queries: AsyncRwLock::new(HashMap::new()),
};
bucket.save_settings().await?;
Ok(bucket)
}
pub async fn restore(path: PathBuf, cfg: Cfg) -> Result<Bucket, ReductError> {
let mut file = FILE_CACHE
.read(&path.join(SETTINGS_NAME), SeekFrom::Start(0))
.await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let cfg = Arc::new(cfg);
let settings = ProtoBucketSettings::decode(&mut Bytes::from(buf))
.map_err(|e| internal_server_error!("Failed to decode settings: {}", e))?;
let settings = Self::fill_settings(settings.into(), Self::defaults());
let bucket_name = path.file_name().unwrap().to_str().unwrap().to_string();
let mut entries = BTreeMap::new();
let mut task_set = Vec::new();
let folder_keeper = FolderKeeper::new(path.clone(), cfg.as_ref()).await;
for entry_path in folder_keeper.list_folders().await? {
let entry_name = normalize_entry_name(
entry_path
.strip_prefix(&path)
.unwrap_or(entry_path.as_path()),
);
let handler = Entry::restore(
entry_path,
entry_name.clone(),
bucket_name.clone(),
settings_for_entry(&entry_name, &settings),
cfg.clone(),
);
task_set.push(handler);
}
for task in task_set {
if let Some(entry) = task.await? {
entries.insert(entry.name().to_string(), entry);
}
}
Ok(Bucket {
name: bucket_name,
path,
entries: Arc::new(AsyncRwLock::new(
entries.into_iter().map(|(k, v)| (k, Arc::new(v))).collect(),
)),
settings: AsyncRwLock::new(settings),
is_provisioned: AtomicBool::new(false),
status: AsyncRwLock::new(ResourceStatus::Ready),
last_replica_sync: AsyncRwLock::new(Instant::now()),
cfg,
folder_keeper: Arc::new(folder_keeper),
queries: AsyncRwLock::new(HashMap::new()),
})
}
pub async fn get_entry(&self, name: &str) -> Result<Weak<Entry>, ReductError> {
self.reload().await?;
let entries = self.entries.read().await?;
let entry = entries.get(name).ok_or_else(|| {
ReductError::not_found(&format!(
"Entry '{}' not found in bucket '{}'",
name, self.name
))
})?;
entry.ensure_not_deleting().await?;
Ok(entry.clone().into())
}
pub async fn info(self: Arc<Self>) -> Result<FullBucketInfo, ReductError> {
self.reload().await?;
let mut size = 0;
let mut oldest_record = u64::MAX;
let mut latest_record = 0u64;
let mut entry_infos = Vec::new();
let entries_guard = self.entries.read().await?;
let entries = entries_guard.values().cloned().collect::<Vec<_>>();
drop(entries_guard);
for entry in entries {
let info = entry.info().await?;
size += info.size;
oldest_record = oldest_record.min(info.oldest_record);
latest_record = latest_record.max(info.latest_record);
if entry.is_visible_in_bucket_info() {
entry_infos.push(info);
}
}
Ok(FullBucketInfo {
info: BucketInfo {
name: self.name.clone(),
size,
entry_count: entry_infos.len() as u64,
oldest_record,
latest_record,
is_provisioned: self.is_provisioned.load(Ordering::Relaxed),
status: self.status().await?,
},
settings: self.settings.read().await?.clone(),
entries: entry_infos,
})
}
pub async fn begin_write(
self: &Arc<Self>,
name: &str,
time: u64,
content_size: u64,
content_type: String,
labels: Labels,
) -> Result<Box<dyn WriteRecord + Sync + Send>, ReductError> {
self.check_mode()?;
let get_entry = async || {
self.keep_quota_for(content_size).await?;
self.get_or_create_entry(name).await?.upgrade()
};
let entry = match get_entry().await {
Ok(entry) => entry,
Err(e) => {
return Err(e).into();
}
};
entry
.begin_write(time, content_size, content_type, labels)
.await
}
#[cfg(test)]
pub async fn begin_read(
&self,
name: &str,
time: u64,
) -> Result<crate::storage::entry::RecordReader, ReductError> {
match self.get_entry(name).await?.upgrade() {
Ok(entry) => entry.begin_read(time).await,
Err(e) => Err(e).into(),
}
}
pub async fn sync_fs(&self) -> Result<(), ReductError> {
if self.cfg.role == InstanceRole::Replica {
return Ok(());
}
let bucket_name = self.name.clone();
debug!("Syncing bucket '{}'", bucket_name);
let time_start = Instant::now();
self.save_settings().await?;
let entries = self.entries.clone();
let mut count = 0usize;
for entry in entries.read().await?.values() {
if let Err(err) = entry.compact().await {
error!(
"Failed to compact entry '{}' in bucket '{}': {}",
entry.name(),
bucket_name,
err
);
}
count += 1;
}
debug!(
"Bucket '{}' synced {} entries in {:?}",
bucket_name,
count,
Instant::now().duration_since(time_start)
);
Ok(())
}
pub fn name(&self) -> &str {
&self.name
}
pub fn path(&self) -> &PathBuf {
&self.path
}
pub fn set_provisioned(&self, provisioned: bool) {
self.is_provisioned
.store(provisioned, std::sync::atomic::Ordering::Relaxed);
}
pub fn is_provisioned(&self) -> bool {
self.is_provisioned
.load(std::sync::atomic::Ordering::Relaxed)
}
pub(super) async fn status(&self) -> Result<ResourceStatus, ReductError> {
Ok(*self.status.read().await?)
}
pub(super) async fn mark_deleting(&self) -> Result<(), ReductError> {
self.ensure_not_deleting().await?;
*self.status.write().await? = ResourceStatus::Deleting;
for entry in self.entries.read().await?.values() {
entry.mark_deleting().await?;
}
Ok(())
}
pub(super) async fn ensure_not_deleting(&self) -> Result<(), ReductError> {
if self.status().await? == ResourceStatus::Deleting {
Err(conflict!("Bucket '{}' is being deleted", self.name))
} else {
Ok(())
}
}
fn parent_prefixes(key: &str) -> Vec<String> {
let mut prefixes = Vec::new();
let mut current = String::new();
for segment in key.split('/') {
if !current.is_empty() {
current.push('/');
}
current.push_str(segment);
prefixes.push(current.clone());
}
prefixes
}
fn entry_with_descendants<'a>(entry_name: &'a str, candidate: &'a str) -> bool {
candidate == entry_name
|| candidate
.strip_prefix(entry_name)
.is_some_and(|suffix| suffix.starts_with('/'))
}
}
#[cfg(test)]
impl Bucket {
pub async fn reset_last_replica_sync(&self) {
let mut sync = self.last_replica_sync.write().await.unwrap();
*sync = Instant::now();
}
}
#[cfg(test)]
mod tests {
use super::*;
use reduct_base::conflict;
use reduct_base::io::ReadRecord;
use reduct_base::msg::bucket_api::QuotaType;
use reduct_base::msg::status::ResourceStatus;
use rstest::{fixture, rstest};
use std::sync::Arc;
use tempfile::tempdir;
mod status {
use super::*;
#[rstest]
#[tokio::test]
async fn test_bucket_info_has_status(#[future] bucket: Arc<Bucket>) {
let bucket = bucket.await;
let info = bucket.info().await.unwrap();
assert_eq!(info.info.status, ResourceStatus::Ready);
assert!(info
.entries
.iter()
.all(|entry| entry.status == ResourceStatus::Ready));
}
#[rstest]
#[tokio::test]
async fn test_bucket_info_hides_meta_entries_from_count(#[future] bucket: Arc<Bucket>) {
let bucket = bucket.await;
write(&bucket, "entry-1", 1, b"data").await.unwrap();
write_meta(&bucket, "entry-1/$meta", 1, b"meta")
.await
.unwrap();
let info = bucket.info().await.unwrap();
assert_eq!(info.info.entry_count, 1);
assert_eq!(info.entries.len(), 1);
assert_eq!(info.entries[0].name, "entry-1");
assert!(info.info.size >= 8);
}
#[rstest]
#[tokio::test]
async fn test_bucket_deleting_rejects_operations(#[future] bucket: Arc<Bucket>) {
let bucket = bucket.await;
bucket.mark_deleting().await.unwrap();
let err = bucket.get_or_create_entry("new-entry").await.err().unwrap();
assert_eq!(err, conflict!("Bucket 'test' is being deleted"));
}
#[rstest]
#[tokio::test]
async fn bucket_mark_deleting_returns_conflict_when_already_deleting(
#[future] bucket: Arc<Bucket>,
) {
let bucket = bucket.await;
bucket.mark_deleting().await.unwrap();
assert_eq!(
bucket.mark_deleting().await,
Err(conflict!("Bucket 'test' is being deleted"))
);
}
#[rstest]
#[tokio::test]
async fn test_entry_deleting_rejects_operations(#[future] bucket: Arc<Bucket>) {
let bucket = bucket.await;
write(&bucket, "test-1", 1, b"test").await.unwrap();
let entry = bucket.get_entry("test-1").await.unwrap().upgrade().unwrap();
entry.mark_deleting().await.unwrap();
let err = bucket.begin_read("test-1", 1).await.err().unwrap();
assert_eq!(
err,
conflict!("Entry 'test-1' in bucket 'test' is being deleted")
);
}
#[rstest]
#[tokio::test]
async fn entry_mark_deleting_returns_conflict_when_already_deleting(
#[future] bucket: Arc<Bucket>,
) {
let bucket = bucket.await;
write(&bucket, "test-1", 1, b"test").await.unwrap();
let entry = bucket.get_entry("test-1").await.unwrap().upgrade().unwrap();
entry.mark_deleting().await.unwrap();
assert_eq!(
entry.mark_deleting().await,
Err(conflict!(
"Entry 'test-1' in bucket 'test' is being deleted"
))
);
}
}
mod restore {
use super::*;
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_restore_nested_entry_path(#[future] bucket: Arc<Bucket>) {
let bucket = bucket.await;
write(&bucket, "entry/a", 1, b"test").await.unwrap();
bucket.sync_fs().await.unwrap();
let bucket = Bucket::restore(bucket.path.clone(), Cfg::default())
.await
.unwrap();
let mut reader = bucket.begin_read("entry/a", 1).await.unwrap();
assert_eq!(reader.read_chunk().unwrap().unwrap(), Bytes::from("test"));
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_restore_meta_entry_uses_small_block_size(#[future] bucket: Arc<Bucket>) {
let bucket = bucket.await;
write_meta(&bucket, "entry/a/$meta", 1, b"meta")
.await
.unwrap();
bucket.sync_fs().await.unwrap();
let bucket = Bucket::restore(bucket.path.clone(), Cfg::default())
.await
.unwrap();
let entry = bucket
.get_entry("entry/a/$meta")
.await
.unwrap()
.upgrade()
.unwrap();
let settings = entry.settings().await.unwrap();
assert_eq!(settings.max_block_size, META_ENTRY_MAX_BLOCK_SIZE);
}
}
#[rstest]
#[tokio::test]
async fn test_provisioned_info(#[future] provisioned_bucket: Arc<Bucket>) {
let provisioned_bucket = provisioned_bucket.await;
let info = provisioned_bucket.info().await.unwrap().info;
assert_eq!(info.is_provisioned, true);
}
#[rstest]
#[tokio::test]
async fn test_provisioned_settings(#[future] provisioned_bucket: Arc<Bucket>) {
let provisioned_bucket = provisioned_bucket.await;
let err = provisioned_bucket
.set_settings(BucketSettings::default())
.await
.err()
.unwrap();
assert_eq!(
err,
conflict!("Can't change settings of provisioned bucket 'test'")
);
}
pub async fn write(
bucket: &Arc<Bucket>,
entry_name: &str,
time: u64,
content: &'static [u8],
) -> Result<(), ReductError> {
let mut sender = bucket
.begin_write(
entry_name,
time,
content.len() as u64,
"".to_string(),
Labels::new(),
)
.await?;
sender
.send(Ok(Some(Bytes::from(content))))
.await
.map_err(|e| internal_server_error!("Failed to send data: {}", e))?;
sender
.send(Ok(None))
.await
.map_err(|e| internal_server_error!("Failed to sync channel: {}", e))?;
Ok(())
}
pub async fn write_meta(
bucket: &Arc<Bucket>,
entry_name: &str,
time: u64,
content: &'static [u8],
) -> Result<(), ReductError> {
let mut sender = bucket
.begin_write(
entry_name,
time,
content.len() as u64,
"".to_string(),
Labels::from_iter([("key".to_string(), "default".to_string())]),
)
.await?;
sender
.send(Ok(Some(Bytes::from(content))))
.await
.map_err(|e| internal_server_error!("Failed to send data: {}", e))?;
sender
.send(Ok(None))
.await
.map_err(|e| internal_server_error!("Failed to sync channel: {}", e))?;
Ok(())
}
pub async fn read(
bucket: &Arc<Bucket>,
entry_name: &str,
time: u64,
) -> Result<Vec<u8>, ReductError> {
let mut reader = bucket.begin_read(entry_name, time).await?;
let data = reader.read_chunk().unwrap().unwrap();
Ok(data.to_vec())
}
#[fixture]
pub fn settings() -> BucketSettings {
BucketSettings {
max_block_size: Some(100),
quota_type: Some(QuotaType::FIFO),
quota_size: Some(1000),
max_block_records: Some(100),
}
}
#[fixture]
pub fn path() -> PathBuf {
tempdir().unwrap().keep()
}
#[fixture]
pub async fn bucket(settings: BucketSettings, path: PathBuf) -> Arc<Bucket> {
FILE_CACHE.create_dir_all(&path.join("test")).await.unwrap();
Arc::new(
Bucket::try_build("test", &path, settings, Cfg::default())
.await
.unwrap(),
)
}
#[fixture]
pub async fn provisioned_bucket(settings: BucketSettings, path: PathBuf) -> Arc<Bucket> {
FILE_CACHE.create_dir_all(&path.join("test")).await.unwrap();
let bucket = Bucket::try_build("test", &path, settings, Cfg::default())
.await
.unwrap();
bucket.set_provisioned(true);
Arc::new(bucket)
}
}