use crate::{
compactors,
consts::{
DEFAULT_ALLOW_PREFETCH, DEFAULT_COMPACTION_FLUSH_LISTNER_INTERVAL, DEFAULT_COMPACTION_INTERVAL,
DEFAULT_ENABLE_TTL, DEFAULT_FALSE_POSITIVE_RATE, DEFAULT_MAX_WRITE_BUFFER_NUMBER,
DEFAULT_ONLINE_GC_INTERVAL, DEFAULT_PREFETCH_SIZE, DEFAULT_TOMBSTONE_COMPACTION_INTERVAL,
DEFAULT_TOMBSTONE_TTL, ENTRY_TTL, GC_CHUNK_SIZE, WRITE_BUFFER_SIZE,
},
};
use crate::{
db::{DataStore, SizeUnit},
types::Key,
};
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct Config {
pub false_positive_rate: f64,
pub allow_prefetch: bool,
pub prefetch_size: usize,
pub write_buffer_size: usize,
pub max_buffer_write_number: usize,
pub enable_ttl: bool,
pub entry_ttl: std::time::Duration,
pub tombstone_ttl: std::time::Duration,
pub compactor_flush_listener_interval: std::time::Duration,
pub background_compaction_interval: std::time::Duration,
pub tombstone_compaction_interval: std::time::Duration,
pub compaction_strategy: compactors::Strategy,
pub online_gc_interval: std::time::Duration,
pub gc_chunk_size: usize,
pub open_files_limit: usize,
}
fn get_open_file_limit() -> usize {
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
return 900;
#[cfg(target_os = "windows")]
return 400;
#[cfg(target_os = "macos")]
return 150;
}
impl Default for Config {
fn default() -> Self {
Config {
false_positive_rate: DEFAULT_FALSE_POSITIVE_RATE,
enable_ttl: DEFAULT_ENABLE_TTL,
entry_ttl: ENTRY_TTL,
allow_prefetch: DEFAULT_ALLOW_PREFETCH,
prefetch_size: DEFAULT_PREFETCH_SIZE,
max_buffer_write_number: DEFAULT_MAX_WRITE_BUFFER_NUMBER,
write_buffer_size: WRITE_BUFFER_SIZE,
compactor_flush_listener_interval: DEFAULT_COMPACTION_FLUSH_LISTNER_INTERVAL,
background_compaction_interval: DEFAULT_COMPACTION_INTERVAL,
tombstone_ttl: DEFAULT_TOMBSTONE_TTL,
tombstone_compaction_interval: DEFAULT_TOMBSTONE_COMPACTION_INTERVAL,
compaction_strategy: compactors::Strategy::STCS,
online_gc_interval: DEFAULT_ONLINE_GC_INTERVAL,
gc_chunk_size: GC_CHUNK_SIZE,
open_files_limit: get_open_file_limit(),
}
}
}
impl DataStore<'static, Key> {
pub fn with_false_positive_rate(mut self, rate: f64) -> Self {
assert!(rate > 0.0, "false_positive_rate must be greater than 0.0");
self.config.false_positive_rate = rate;
self
}
pub fn with_allow_prefetch(mut self, allow: bool) -> Self {
self.config.allow_prefetch = allow;
self
}
pub fn with_prefetch_size(mut self, size: usize) -> Self {
assert!(
(self.config.allow_prefetch && size > 0),
"prefetch_size should be greater than 0 if allow_prefetch is set to true"
);
self.config.prefetch_size = size;
self
}
pub fn with_write_buffer_size(mut self, size: usize) -> Self {
assert!(
size >= 50,
"write_buffer_size should not be less than 50 Kilobytes"
);
self.config.write_buffer_size = SizeUnit::Kilobytes.as_bytes(size);
self
}
pub fn with_max_buffer_write_number(mut self, number: usize) -> Self {
assert!(number > 0, "max_buffer_write_number should be greater zero");
self.config.max_buffer_write_number = number;
self
}
pub fn with_enable_ttl(mut self, enable: bool) -> Self {
self.config.enable_ttl = enable;
self
}
pub fn with_entry_ttl(mut self, ttl: std::time::Duration) -> Self {
assert!(
(self.config.enable_ttl && ttl.as_millis() >= Duration::from_secs(3 * 24 * 60 * 60).as_millis()),
"entry_ttl_millis cannot be less than 3 days if enable_ttl is set to true"
);
self.config.entry_ttl = ttl;
self
}
pub fn with_tombstone_ttl(mut self, ttl: std::time::Duration) -> Self {
assert!(
ttl.as_millis() >= Duration::from_secs(10 * 24 * 60 * 60).as_millis(),
"tombstone_ttl should not be less than 10 days to prevent resurrecting entries marked deleted"
);
self.config.tombstone_ttl = ttl;
self
}
pub fn with_compactor_flush_listener_interval(mut self, interval: std::time::Duration) -> Self {
assert!(
interval.as_millis() >= Duration::from_secs(2 * 60).as_millis(),
"compactor_flush_listener_interval should not be less than 2 minutes, to prevent overloading the system"
);
self.config.compactor_flush_listener_interval = interval;
self
}
pub fn with_background_compaction_interval(mut self, interval: std::time::Duration) -> Self {
assert!(
interval.as_millis() >= Duration::from_secs(5 * 60).as_millis(),
"background_compaction_interval should not be less than 5 minutes to prevent overloads"
);
self.config.background_compaction_interval = interval;
self
}
pub fn with_tombstone_compaction_interval(mut self, interval: std::time::Duration) -> Self {
assert!(
interval.as_millis() >= Duration::from_secs(10 * 24 * 60 * 60).as_millis(),
"tombstone_compaction_interval should not be less than 10 days"
);
self.config.tombstone_compaction_interval = interval;
self
}
pub fn with_compaction_strategy(mut self, strategy: compactors::Strategy) -> Self {
self.config.compaction_strategy = strategy;
self
}
pub fn with_online_gc_interval(mut self, interval: std::time::Duration) -> Self {
assert!(
interval.as_secs() >= Duration::from_secs(60 * 60).as_secs(),
"online_gc_interval should not be less than 1 hour"
);
self.config.online_gc_interval = interval;
self
}
pub fn with_gc_chunk_size(mut self, size: usize) -> Self {
assert!(size >= 50, "gc_chunk_size should not be less than 50 Kilobyte");
self.config.gc_chunk_size = SizeUnit::Kilobytes.as_bytes(size);
self
}
}
#[cfg(test)]
mod tests {
use tempfile::tempdir;
use crate::cfg::Config;
use super::*;
use std::time::Duration;
async fn create_datastore() -> DataStore<'static, Key> {
let root = tempdir().unwrap();
let path = root.path().join("store_test_3");
let mut store = DataStore::open("test", path).await.unwrap();
let config = Config {
false_positive_rate: 0.01,
allow_prefetch: false,
prefetch_size: 0,
write_buffer_size: 51200,
max_buffer_write_number: 1,
enable_ttl: false,
entry_ttl: Duration::from_secs(0),
tombstone_ttl: Duration::from_secs(0),
compactor_flush_listener_interval: Duration::from_secs(0),
background_compaction_interval: Duration::from_secs(0),
tombstone_compaction_interval: Duration::from_secs(0),
compaction_strategy: compactors::Strategy::STCS,
online_gc_interval: Duration::from_secs(0),
gc_chunk_size: 51200,
open_files_limit: 150,
};
store.config = config;
store
}
#[tokio::test]
#[should_panic(expected = "false_positive_rate must be greater than 0.0")]
async fn test_with_false_positive_rate_invalid() {
let ds = create_datastore().await;
ds.with_false_positive_rate(0.0);
}
#[tokio::test]
async fn test_with_false_positive_rate() {
let ds = create_datastore().await;
let ds = ds.with_false_positive_rate(0.05);
assert_eq!(ds.config.false_positive_rate, 0.05);
}
#[tokio::test]
async fn test_with_allow_prefetch() {
let ds = create_datastore().await;
let ds = ds.with_allow_prefetch(true);
assert!(ds.config.allow_prefetch);
}
#[tokio::test]
#[should_panic(expected = "prefetch_size should be greater than 0 if allow_prefetch is set to true")]
async fn test_with_prefetch_size_invalid() {
let ds = create_datastore().await.with_allow_prefetch(true);
ds.with_prefetch_size(0);
}
#[tokio::test]
async fn test_with_prefetch_size() {
let ds = create_datastore().await.with_allow_prefetch(true);
let ds = ds.with_prefetch_size(10);
assert_eq!(ds.config.prefetch_size, 10);
}
#[tokio::test]
#[should_panic(expected = "write_buffer_size should not be less than 50 Kilobytes")]
async fn test_with_write_buffer_size_invalid() {
let ds = create_datastore().await;
ds.with_write_buffer_size(49);
}
#[tokio::test]
async fn test_with_write_buffer_size() {
let ds = create_datastore().await;
let ds = ds.with_write_buffer_size(100);
assert_eq!(ds.config.write_buffer_size, SizeUnit::Kilobytes.as_bytes(100));
}
#[tokio::test]
#[should_panic(expected = "max_buffer_write_number should be greater zero")]
async fn test_with_max_buffer_write_number_invalid() {
let ds = create_datastore().await;
ds.with_max_buffer_write_number(0);
}
#[tokio::test]
async fn test_with_max_buffer_write_number() {
let ds = create_datastore().await;
let ds = ds.with_max_buffer_write_number(5);
assert_eq!(ds.config.max_buffer_write_number, 5);
}
#[tokio::test]
async fn test_with_enable_ttl() {
let ds = create_datastore().await;
let ds = ds.with_enable_ttl(true);
assert!(ds.config.enable_ttl);
}
#[tokio::test]
#[should_panic(expected = "entry_ttl_millis cannot be less than 3 days if enable_ttl is set to true")]
async fn test_with_entry_ttl_invalid() {
let ds = create_datastore().await.with_enable_ttl(true);
ds.with_entry_ttl(Duration::from_secs(24 * 60 * 60)); }
#[tokio::test]
async fn test_with_entry_ttl() {
let ds = create_datastore().await.with_enable_ttl(true);
let ds = ds.with_entry_ttl(Duration::from_secs(4 * 24 * 60 * 60)); assert_eq!(ds.config.entry_ttl, Duration::from_secs(4 * 24 * 60 * 60));
}
#[tokio::test]
#[should_panic(
expected = "tombstone_ttl should not be less than 10 days to prevent resurrecting entries marked deleted"
)]
async fn test_with_tombstone_ttl_invalid() {
let ds = create_datastore();
ds.await.with_tombstone_ttl(Duration::from_secs(9 * 24 * 60 * 60)); }
#[tokio::test]
async fn test_with_tombstone_ttl() {
let ds = create_datastore();
let ds = ds
.await
.with_tombstone_ttl(Duration::from_secs(15 * 24 * 60 * 60)); assert_eq!(ds.config.tombstone_ttl, Duration::from_secs(15 * 24 * 60 * 60));
}
#[tokio::test]
#[should_panic(
expected = "compactor_flush_listener_interval should not be less than 2 minutes, to prevent overloading the system"
)]
async fn test_with_compactor_flush_listener_interval_invalid() {
let ds = create_datastore().await;
ds.with_compactor_flush_listener_interval(Duration::from_secs(60)); }
#[tokio::test]
async fn test_with_compactor_flush_listener_interval() {
let ds = create_datastore().await;
let ds = ds.with_compactor_flush_listener_interval(Duration::from_secs(3 * 60)); assert_eq!(
ds.config.compactor_flush_listener_interval,
Duration::from_secs(3 * 60)
);
}
#[tokio::test]
#[should_panic(
expected = "background_compaction_interval should not be less than 5 minutes to prevent overloads"
)]
async fn test_with_background_compaction_interval_invalid() {
let ds = create_datastore().await;
ds.with_background_compaction_interval(Duration::from_secs(4 * 60)); }
#[tokio::test]
async fn test_with_background_compaction_interval() {
let ds = create_datastore().await;
let ds = ds.with_background_compaction_interval(Duration::from_secs(6 * 60)); assert_eq!(
ds.config.background_compaction_interval,
Duration::from_secs(6 * 60)
);
}
#[tokio::test]
#[should_panic(expected = "tombstone_compaction_interval should not be less than 10 days")]
async fn test_with_tombstone_compaction_interval_invalid() {
let ds = create_datastore().await;
ds.with_tombstone_compaction_interval(Duration::from_secs(9 * 24 * 60 * 60));
}
#[tokio::test]
async fn test_with_tombstone_compaction_interval() {
let ds = create_datastore().await;
let ds = ds.with_tombstone_compaction_interval(Duration::from_secs(11 * 24 * 60 * 60)); assert_eq!(
ds.config.tombstone_compaction_interval,
Duration::from_secs(11 * 24 * 60 * 60)
);
}
#[tokio::test]
async fn test_with_compaction_strategy() {
let ds = create_datastore().await;
let strategy = compactors::Strategy::STCS; let ds = ds.with_compaction_strategy(strategy);
assert_eq!(ds.config.compaction_strategy, compactors::Strategy::STCS);
}
#[tokio::test]
#[should_panic(expected = "online_gc_interval should not be less than 1 hour")]
async fn test_with_online_gc_interval_invalid() {
let ds = create_datastore().await;
ds.with_online_gc_interval(Duration::from_secs(30 * 60)); }
#[tokio::test]
async fn test_with_online_gc_interval() {
let ds = create_datastore().await;
let ds = ds.with_online_gc_interval(Duration::from_secs(2 * 60 * 60)); assert_eq!(ds.config.online_gc_interval, Duration::from_secs(2 * 60 * 60));
}
#[tokio::test]
#[should_panic(expected = "gc_chunk_size should not be less than 50 Kilobyte")]
async fn test_with_gc_chunk_size_invalid() {
let ds = create_datastore().await;
ds.with_gc_chunk_size(49);
}
#[tokio::test]
async fn test_with_gc_chunk_size() {
let ds = create_datastore().await;
let ds = ds.with_gc_chunk_size(100);
assert_eq!(ds.config.gc_chunk_size, SizeUnit::Kilobytes.as_bytes(100));
}
}