use crate::cfg::{Cfg, InstanceRole};
use crate::storage::bucket::Bucket;
use crate::storage::engine::ReadOnlyMode;
use crate::storage::entry::{Entry, EntrySettings};
use async_trait::async_trait;
use reduct_base::error::ReductError;
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use std::time::Instant;
fn normalize_entry_name(path: &std::path::Path) -> String {
path.to_string_lossy().replace('\\', "/")
}
#[async_trait]
impl ReadOnlyMode for Bucket {
fn cfg(&self) -> &Cfg {
&self.cfg
}
async fn reload(&self) -> Result<(), ReductError> {
let mut last_sync = self.last_replica_sync.write().await?;
if self.cfg().role != InstanceRole::Replica
|| last_sync.elapsed() < self.cfg.engine_config.replica_update_interval
{
return Ok(());
}
*last_sync = Instant::now();
let mut task_set = vec![];
let current_bucket_paths = self
.entries
.read()
.await?
.values()
.map(|b| b.path().clone())
.collect::<HashSet<_>>();
let mut entries_to_retain = vec![];
self.folder_keeper.reload().await?;
for entry_path in self.folder_keeper.list_folders().await? {
if current_bucket_paths.contains(&entry_path) {
entries_to_retain.push(entry_path);
continue;
}
let settings = self.settings.read().await?;
let entry_name = normalize_entry_name(
entry_path
.strip_prefix(self.path())
.unwrap_or(entry_path.as_path()),
);
let handler = Entry::restore(
entry_path,
entry_name,
self.name().to_string(),
EntrySettings {
max_block_size: settings.max_block_size.unwrap(),
max_block_records: settings.max_block_records.unwrap(),
},
self.cfg.clone(),
);
task_set.push(handler);
}
let mut new_entries = BTreeMap::new();
for task in task_set {
if let Some(entry) = task.await? {
new_entries.insert(entry.name().to_string(), Arc::new(entry));
}
}
let mut entries = self.entries.write().await?;
entries.retain(|_, v| entries_to_retain.contains(v.path()));
entries.extend(new_entries.into_iter());
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cfg::storage_engine::StorageEngineConfig;
use crate::storage::bucket::tests::write;
use crate::storage::bucket::FILE_CACHE;
use reduct_base::msg::bucket_api::BucketSettings;
use rstest::{fixture, rstest};
use tempfile::tempdir;
#[rstest]
#[tokio::test]
async fn test_reload_new_entry(#[future] primary_bucket: Arc<Bucket>) {
let primary_bucket = primary_bucket.await;
let mut cfg = primary_bucket.cfg().clone();
cfg.role = InstanceRole::Replica;
let read_only_bucket = Arc::new(
Bucket::restore(primary_bucket.path().clone(), cfg.clone())
.await
.unwrap(),
);
tokio::time::sleep(cfg.engine_config.replica_update_interval).await;
read_only_bucket.reload().await.unwrap();
{
let entries = read_only_bucket.entries.read().await.unwrap();
assert_eq!(entries.len(), 1);
assert!(entries.contains_key("test-1"));
}
write(&primary_bucket, "test-2", 1, b"test data")
.await
.unwrap();
primary_bucket.sync_fs().await.unwrap();
read_only_bucket.reset_last_replica_sync().await;
read_only_bucket.reload().await.unwrap();
assert_eq!(
read_only_bucket.entries.read().await.unwrap().len(),
1,
"Should not reload before interval"
);
tokio::time::sleep(cfg.engine_config.replica_update_interval * 2).await;
read_only_bucket.reload().await.unwrap();
{
let entries = read_only_bucket.entries.read().await.unwrap();
assert_eq!(entries.len(), 2);
assert!(entries.contains_key("test-1"));
assert!(entries.contains_key("test-2"));
}
}
#[rstest]
#[tokio::test]
async fn test_remove_entry(#[future] primary_bucket: Arc<Bucket>) {
let primary_bucket = primary_bucket.await;
let mut cfg = primary_bucket.cfg().clone();
cfg.role = InstanceRole::Replica;
let read_only_bucket = Arc::new(
Bucket::restore(primary_bucket.path().clone(), cfg.clone())
.await
.unwrap(),
);
tokio::time::sleep(cfg.engine_config.replica_update_interval).await;
read_only_bucket.reload().await.unwrap();
{
let entries = read_only_bucket.entries.read().await.unwrap();
assert_eq!(entries.len(), 1);
assert!(entries.contains_key("test-1"));
}
primary_bucket.remove_entry("test-1").await.unwrap();
primary_bucket.sync_fs().await.unwrap();
read_only_bucket.reset_last_replica_sync().await;
read_only_bucket.reload().await.unwrap();
assert_eq!(
read_only_bucket.entries.read().await.unwrap().len(),
1,
"Should not reload before interval"
);
tokio::time::sleep(cfg.engine_config.replica_update_interval).await;
read_only_bucket.reload().await.unwrap();
{
let entries = read_only_bucket.entries.read().await.unwrap();
assert_eq!(entries.len(), 0);
}
}
mod forbidden {
use super::*;
use reduct_base::forbidden;
use rstest::rstest;
#[rstest]
#[tokio::test]
async fn test_prohibited_operations_on_read_only_bucket(
#[future] primary_bucket: Arc<Bucket>,
) {
let primary_bucket = primary_bucket.await;
let mut cfg = primary_bucket.cfg().clone();
cfg.role = InstanceRole::Replica;
let read_only_bucket = Arc::new(
Bucket::restore(primary_bucket.path().clone(), cfg.clone())
.await
.unwrap(),
);
let err = forbidden!("Cannot perform this operation in read-only mode");
let write_result = read_only_bucket.get_or_create_entry("new-entry").await;
assert_eq!(write_result.err().unwrap(), err);
let remove_result = read_only_bucket.remove_entry("test-1").await;
assert_eq!(remove_result.err().unwrap(), err);
let compact_result = read_only_bucket.rename_entry("test-1", "new-name").await;
assert_eq!(compact_result.err().unwrap(), err);
}
}
mod reload_before {
use super::*;
use rstest::rstest;
#[rstest]
#[tokio::test]
async fn test_reload_before_access_entries(#[future] primary_bucket: Arc<Bucket>) {
let primary_bucket = primary_bucket.await;
let mut cfg = primary_bucket.cfg().clone();
cfg.role = InstanceRole::Replica;
let read_only_bucket = Arc::new(
Bucket::restore(primary_bucket.path().clone(), cfg.clone())
.await
.unwrap(),
);
tokio::time::sleep(cfg.engine_config.replica_update_interval).await;
read_only_bucket.reload().await.unwrap();
{
let entries = read_only_bucket.entries.read().await.unwrap();
assert_eq!(entries.len(), 1);
assert!(entries.contains_key("test-1"));
}
write(&primary_bucket, "test-2", 1, b"test data")
.await
.unwrap();
primary_bucket.sync_fs().await.unwrap();
tokio::time::sleep(cfg.engine_config.replica_update_interval).await;
{
let entries = read_only_bucket.info().await.unwrap().entries;
assert_eq!(entries.len(), 2);
}
}
}
#[fixture]
pub async fn primary_bucket() -> Arc<Bucket> {
let path = tempdir().unwrap().keep();
let mut cfg = Cfg {
data_path: path,
role: InstanceRole::Primary,
engine_config: StorageEngineConfig {
replica_update_interval: std::time::Duration::from_millis(300),
..StorageEngineConfig::default()
},
..Cfg::default()
};
cfg.role = InstanceRole::Primary;
FILE_CACHE
.create_dir_all(&cfg.data_path.join("bucket"))
.await
.unwrap();
let bucket = Arc::new(
Bucket::try_build(
"bucket",
&cfg.data_path.clone(),
BucketSettings::default(),
cfg,
)
.await
.unwrap(),
);
write(&bucket, "test-1", 1, b"test data").await.unwrap();
bucket.sync_fs().await.unwrap();
bucket
}
}