use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum StorageConfig {
InMemory,
SlateDb(SlateDbStorageConfig),
}
impl Default for StorageConfig {
fn default() -> Self {
StorageConfig::SlateDb(SlateDbStorageConfig {
path: "data".to_string(),
object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: ".data".to_string(),
}),
settings_path: None,
block_cache: None,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SlateDbStorageConfig {
pub path: String,
pub object_store: ObjectStoreConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub settings_path: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub block_cache: Option<BlockCacheConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum BlockCacheConfig {
FoyerHybrid(FoyerHybridCacheConfig),
}
#[derive(Default, Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum FoyerWritePolicy {
#[default]
WriteOnInsertion,
WriteOnEviction,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FoyerHybridCacheConfig {
pub memory_capacity: u64,
pub disk_capacity: u64,
pub disk_path: String,
#[serde(default)]
pub write_policy: FoyerWritePolicy,
#[serde(default = "default_flushers")]
pub flushers: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub buffer_pool_size: Option<u64>,
#[serde(default = "default_submit_queue_size_threshold")]
pub submit_queue_size_threshold: u64,
}
fn default_flushers() -> usize {
4
}
fn default_submit_queue_size_threshold() -> u64 {
1024 * 1024 * 1024 }
impl FoyerHybridCacheConfig {
pub fn effective_buffer_pool_size(&self) -> u64 {
self.buffer_pool_size.unwrap_or(self.memory_capacity / 32)
}
}
impl Default for SlateDbStorageConfig {
fn default() -> Self {
Self {
path: "data".to_string(),
object_store: ObjectStoreConfig::default(),
settings_path: None,
block_cache: None,
}
}
}
impl StorageConfig {
pub fn with_path_suffix(&self, suffix: &str) -> Self {
match self {
StorageConfig::InMemory => StorageConfig::InMemory,
StorageConfig::SlateDb(config) => StorageConfig::SlateDb(SlateDbStorageConfig {
path: format!("{}/{}", config.path, suffix),
object_store: config.object_store.clone(),
settings_path: config.settings_path.clone(),
block_cache: config.block_cache.clone(),
}),
}
}
}
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
#[default]
InMemory,
Aws(AwsObjectStoreConfig),
Local(LocalObjectStoreConfig),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AwsObjectStoreConfig {
pub region: String,
pub bucket: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct LocalObjectStoreConfig {
pub path: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_default_to_slatedb_with_local_data_dir() {
let config = StorageConfig::default();
match config {
StorageConfig::SlateDb(slate_config) => {
assert_eq!(slate_config.path, "data");
assert_eq!(
slate_config.object_store,
ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: ".data".to_string()
})
);
}
_ => panic!("Expected SlateDb config as default"),
}
}
#[test]
fn should_deserialize_in_memory_config() {
let yaml = r#"type: InMemory"#;
let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(config, StorageConfig::InMemory);
}
#[test]
fn should_deserialize_slatedb_config_with_local_object_store() {
let yaml = r#"
type: SlateDb
path: my-data
object_store:
type: Local
path: /tmp/slatedb
"#;
let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
match config {
StorageConfig::SlateDb(slate_config) => {
assert_eq!(slate_config.path, "my-data");
assert_eq!(
slate_config.object_store,
ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: "/tmp/slatedb".to_string()
})
);
assert!(slate_config.settings_path.is_none());
}
_ => panic!("Expected SlateDb config"),
}
}
#[test]
fn should_deserialize_slatedb_config_with_aws_object_store() {
let yaml = r#"
type: SlateDb
path: my-data
object_store:
type: Aws
region: us-west-2
bucket: my-bucket
settings_path: slatedb.toml
"#;
let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
match config {
StorageConfig::SlateDb(slate_config) => {
assert_eq!(slate_config.path, "my-data");
assert_eq!(
slate_config.object_store,
ObjectStoreConfig::Aws(AwsObjectStoreConfig {
region: "us-west-2".to_string(),
bucket: "my-bucket".to_string()
})
);
assert_eq!(slate_config.settings_path, Some("slatedb.toml".to_string()));
}
_ => panic!("Expected SlateDb config"),
}
}
#[test]
fn should_deserialize_slatedb_config_with_in_memory_object_store() {
let yaml = r#"
type: SlateDb
path: test-data
object_store:
type: InMemory
"#;
let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
match config {
StorageConfig::SlateDb(slate_config) => {
assert_eq!(slate_config.path, "test-data");
assert_eq!(slate_config.object_store, ObjectStoreConfig::InMemory);
}
_ => panic!("Expected SlateDb config"),
}
}
#[test]
fn should_serialize_slatedb_config() {
let config = StorageConfig::SlateDb(SlateDbStorageConfig {
path: "my-data".to_string(),
object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: "/tmp/slatedb".to_string(),
}),
settings_path: None,
block_cache: None,
});
let yaml = serde_yaml::to_string(&config).unwrap();
assert!(yaml.contains("type: SlateDb"));
assert!(yaml.contains("path: my-data"));
assert!(yaml.contains("type: Local"));
assert!(!yaml.contains("settings_path"));
assert!(!yaml.contains("block_cache"));
}
#[test]
fn should_deserialize_block_cache_config() {
let yaml = r#"
type: SlateDb
path: data
object_store:
type: InMemory
block_cache:
type: FoyerHybrid
memory_capacity: 8589934592
disk_capacity: 150323855360
disk_path: /mnt/nvme/block-cache
"#;
let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
match config {
StorageConfig::SlateDb(slate_config) => {
let cache = slate_config.block_cache.expect("block_cache should be set");
match cache {
BlockCacheConfig::FoyerHybrid(foyer) => {
assert_eq!(foyer.memory_capacity, 8589934592);
assert_eq!(foyer.disk_capacity, 150323855360);
assert_eq!(foyer.disk_path, "/mnt/nvme/block-cache");
assert_eq!(foyer.write_policy, FoyerWritePolicy::WriteOnInsertion);
assert_eq!(foyer.flushers, 4);
assert!(foyer.buffer_pool_size.is_none());
assert_eq!(foyer.submit_queue_size_threshold, 1024 * 1024 * 1024);
assert_eq!(foyer.effective_buffer_pool_size(), 8589934592 / 32);
}
}
}
_ => panic!("Expected SlateDb config"),
}
}
#[test]
fn should_deserialize_block_cache_with_explicit_engine_options() {
let yaml = r#"
type: SlateDb
path: data
object_store:
type: InMemory
block_cache:
type: FoyerHybrid
memory_capacity: 4294967296
disk_capacity: 10737418240
disk_path: /mnt/nvme/cache
write_policy: WriteOnEviction
flushers: 2
buffer_pool_size: 134217728
submit_queue_size_threshold: 536870912
"#;
let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
match config {
StorageConfig::SlateDb(slate_config) => {
let cache = slate_config.block_cache.expect("block_cache should be set");
match cache {
BlockCacheConfig::FoyerHybrid(foyer) => {
assert_eq!(foyer.write_policy, FoyerWritePolicy::WriteOnEviction);
assert_eq!(foyer.flushers, 2);
assert_eq!(foyer.buffer_pool_size, Some(134217728));
assert_eq!(foyer.submit_queue_size_threshold, 536870912);
assert_eq!(foyer.effective_buffer_pool_size(), 134217728);
}
}
}
_ => panic!("Expected SlateDb config"),
}
}
#[test]
fn should_derive_buffer_pool_size_from_memory_capacity() {
let config = FoyerHybridCacheConfig {
memory_capacity: 8 * 1024 * 1024 * 1024, disk_capacity: 100 * 1024 * 1024 * 1024,
disk_path: "/tmp/cache".to_string(),
write_policy: FoyerWritePolicy::default(),
flushers: 4,
buffer_pool_size: None,
submit_queue_size_threshold: 1024 * 1024 * 1024,
};
assert_eq!(
config.effective_buffer_pool_size(),
256 * 1024 * 1024 );
}
#[test]
fn should_default_block_cache_to_none() {
let yaml = r#"
type: SlateDb
path: data
object_store:
type: InMemory
"#;
let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
match config {
StorageConfig::SlateDb(slate_config) => {
assert!(slate_config.block_cache.is_none());
}
_ => panic!("Expected SlateDb config"),
}
}
}