use std::fmt;
use std::path::PathBuf;
use std::time::Duration;
#[cfg(feature = "encryption")]
#[derive(Clone)]
pub struct EncryptionConfig {
pub key_chain: std::sync::Arc<grafeo_common::encryption::KeyChain>,
}
#[cfg(feature = "encryption")]
impl fmt::Debug for EncryptionConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EncryptionConfig")
.field("key_chain", &"[redacted]")
.finish()
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum GraphModel {
#[default]
Lpg,
Rdf,
}
impl fmt::Display for GraphModel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Lpg => write!(f, "LPG"),
Self::Rdf => write!(f, "RDF"),
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum AccessMode {
#[default]
ReadWrite,
ReadOnly,
}
impl fmt::Display for AccessMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ReadWrite => write!(f, "read-write"),
Self::ReadOnly => write!(f, "read-only"),
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum StorageFormat {
#[default]
Auto,
WalDirectory,
SingleFile,
}
impl fmt::Display for StorageFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Auto => write!(f, "auto"),
Self::WalDirectory => write!(f, "wal-directory"),
Self::SingleFile => write!(f, "single-file"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum DurabilityMode {
Sync,
Batch {
max_delay_ms: u64,
max_records: u64,
},
Adaptive {
target_interval_ms: u64,
},
NoSync,
}
impl Default for DurabilityMode {
fn default() -> Self {
Self::Batch {
max_delay_ms: 100,
max_records: 1000,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ConfigError {
ZeroMemoryLimit,
ZeroThreads,
ZeroWalFlushInterval,
RdfFeatureRequired,
}
impl fmt::Display for ConfigError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ZeroMemoryLimit => write!(f, "memory_limit must be greater than zero"),
Self::ZeroThreads => write!(f, "threads must be greater than zero"),
Self::ZeroWalFlushInterval => {
write!(f, "wal_flush_interval_ms must be greater than zero")
}
Self::RdfFeatureRequired => {
write!(
f,
"RDF graph model requires the `rdf` feature flag to be enabled"
)
}
}
}
}
impl std::error::Error for ConfigError {}
#[derive(Debug, Clone)]
#[allow(clippy::struct_excessive_bools)] pub struct Config {
pub graph_model: GraphModel,
pub path: Option<PathBuf>,
pub memory_limit: Option<usize>,
pub spill_path: Option<PathBuf>,
pub threads: usize,
pub wal_enabled: bool,
pub wal_flush_interval_ms: u64,
pub backward_edges: bool,
pub query_logging: bool,
pub adaptive: AdaptiveConfig,
pub factorized_execution: bool,
pub wal_durability: DurabilityMode,
pub storage_format: StorageFormat,
pub schema_constraints: bool,
pub query_timeout: Option<Duration>,
pub max_property_size: Option<usize>,
pub gc_interval: usize,
pub access_mode: AccessMode,
pub cdc_enabled: bool,
#[cfg(feature = "cdc")]
pub cdc_retention: crate::cdc::CdcRetentionConfig,
pub section_configs: hashbrown::HashMap<
grafeo_common::storage::SectionType,
grafeo_common::storage::SectionMemoryConfig,
>,
pub checkpoint_interval: Option<Duration>,
#[cfg(feature = "encryption")]
pub encryption: Option<EncryptionConfig>,
}
#[derive(Debug, Clone)]
pub struct AdaptiveConfig {
pub enabled: bool,
pub threshold: f64,
pub min_rows: u64,
pub max_reoptimizations: usize,
}
impl Default for AdaptiveConfig {
fn default() -> Self {
Self {
enabled: true,
threshold: 3.0,
min_rows: 1000,
max_reoptimizations: 3,
}
}
}
impl AdaptiveConfig {
#[must_use]
pub fn disabled() -> Self {
Self {
enabled: false,
..Default::default()
}
}
#[must_use]
pub fn with_threshold(mut self, threshold: f64) -> Self {
self.threshold = threshold;
self
}
#[must_use]
pub fn with_min_rows(mut self, min_rows: u64) -> Self {
self.min_rows = min_rows;
self
}
#[must_use]
pub fn with_max_reoptimizations(mut self, max: usize) -> Self {
self.max_reoptimizations = max;
self
}
}
impl Default for Config {
fn default() -> Self {
Self {
graph_model: GraphModel::default(),
path: None,
memory_limit: None,
spill_path: None,
threads: num_cpus::get(),
wal_enabled: true,
wal_flush_interval_ms: 100,
backward_edges: true,
query_logging: false,
adaptive: AdaptiveConfig::default(),
factorized_execution: true,
wal_durability: DurabilityMode::default(),
storage_format: StorageFormat::default(),
schema_constraints: false,
query_timeout: Some(Duration::from_secs(30)),
max_property_size: Some(16 * 1024 * 1024), gc_interval: 100,
access_mode: AccessMode::default(),
cdc_enabled: false,
#[cfg(feature = "cdc")]
cdc_retention: crate::cdc::CdcRetentionConfig::default(),
section_configs: hashbrown::HashMap::new(),
checkpoint_interval: None,
#[cfg(feature = "encryption")]
encryption: None,
}
}
}
impl Config {
#[must_use]
pub fn in_memory() -> Self {
Self {
path: None,
wal_enabled: false,
..Default::default()
}
}
#[must_use]
pub fn persistent(path: impl Into<PathBuf>) -> Self {
Self {
path: Some(path.into()),
wal_enabled: true,
..Default::default()
}
}
#[must_use]
pub fn with_memory_limit(mut self, limit: usize) -> Self {
self.memory_limit = Some(limit);
self
}
#[must_use]
pub fn with_threads(mut self, threads: usize) -> Self {
self.threads = threads;
self
}
#[must_use]
pub fn without_backward_edges(mut self) -> Self {
self.backward_edges = false;
self
}
#[must_use]
pub fn with_query_logging(mut self) -> Self {
self.query_logging = true;
self
}
#[must_use]
pub fn with_memory_fraction(mut self, fraction: f64) -> Self {
use grafeo_common::memory::buffer::BufferManagerConfig;
let system_memory = BufferManagerConfig::detect_system_memory();
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let budget = (system_memory as f64 * fraction) as usize;
self.memory_limit = Some(budget);
self
}
#[must_use]
pub fn with_spill_path(mut self, path: impl Into<PathBuf>) -> Self {
self.spill_path = Some(path.into());
self
}
#[must_use]
pub fn with_adaptive(mut self, adaptive: AdaptiveConfig) -> Self {
self.adaptive = adaptive;
self
}
#[must_use]
pub fn without_adaptive(mut self) -> Self {
self.adaptive.enabled = false;
self
}
#[must_use]
pub fn without_factorized_execution(mut self) -> Self {
self.factorized_execution = false;
self
}
#[must_use]
pub fn with_graph_model(mut self, model: GraphModel) -> Self {
self.graph_model = model;
self
}
#[must_use]
pub fn with_wal_durability(mut self, mode: DurabilityMode) -> Self {
self.wal_durability = mode;
self
}
#[must_use]
pub fn with_storage_format(mut self, format: StorageFormat) -> Self {
self.storage_format = format;
self
}
#[must_use]
pub fn with_schema_constraints(mut self) -> Self {
self.schema_constraints = true;
self
}
#[must_use]
pub fn with_query_timeout(mut self, timeout: Duration) -> Self {
self.query_timeout = Some(timeout);
self
}
#[must_use]
pub fn without_query_timeout(mut self) -> Self {
self.query_timeout = None;
self
}
#[must_use]
pub fn with_max_property_size(mut self, size: usize) -> Self {
self.max_property_size = Some(size);
self
}
#[must_use]
pub fn without_max_property_size(mut self) -> Self {
self.max_property_size = None;
self
}
#[must_use]
pub fn with_gc_interval(mut self, interval: usize) -> Self {
self.gc_interval = interval;
self
}
#[must_use]
pub fn with_access_mode(mut self, mode: AccessMode) -> Self {
self.access_mode = mode;
self
}
#[must_use]
pub fn read_only(path: impl Into<PathBuf>) -> Self {
Self {
path: Some(path.into()),
wal_enabled: false,
access_mode: AccessMode::ReadOnly,
..Default::default()
}
}
#[must_use]
pub fn with_cdc(mut self) -> Self {
self.cdc_enabled = true;
self
}
#[must_use]
pub fn with_section_config(
mut self,
section_type: grafeo_common::storage::SectionType,
config: grafeo_common::storage::SectionMemoryConfig,
) -> Self {
self.section_configs.insert(section_type, config);
self
}
#[must_use]
pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
self.checkpoint_interval = Some(interval);
self
}
pub fn validate(&self) -> std::result::Result<(), ConfigError> {
if let Some(limit) = self.memory_limit
&& limit == 0
{
return Err(ConfigError::ZeroMemoryLimit);
}
if self.threads == 0 {
return Err(ConfigError::ZeroThreads);
}
if self.wal_flush_interval_ms == 0 {
return Err(ConfigError::ZeroWalFlushInterval);
}
#[cfg(not(feature = "triple-store"))]
if self.graph_model == GraphModel::Rdf {
return Err(ConfigError::RdfFeatureRequired);
}
Ok(())
}
}
mod num_cpus {
#[cfg(not(target_arch = "wasm32"))]
pub fn get() -> usize {
std::thread::available_parallelism().map_or(4, |n| n.get())
}
#[cfg(target_arch = "wasm32")]
pub fn get() -> usize {
1
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_default() {
let config = Config::default();
assert_eq!(config.graph_model, GraphModel::Lpg);
assert!(config.path.is_none());
assert!(config.memory_limit.is_none());
assert!(config.spill_path.is_none());
assert!(config.threads > 0);
assert!(config.wal_enabled);
assert_eq!(config.wal_flush_interval_ms, 100);
assert!(config.backward_edges);
assert!(!config.query_logging);
assert!(config.factorized_execution);
assert_eq!(config.wal_durability, DurabilityMode::default());
assert!(!config.schema_constraints);
assert_eq!(config.query_timeout, Some(Duration::from_secs(30)));
assert_eq!(config.gc_interval, 100);
}
#[test]
fn test_config_in_memory() {
let config = Config::in_memory();
assert!(config.path.is_none());
assert!(!config.wal_enabled);
assert!(config.backward_edges);
}
#[test]
fn test_config_persistent() {
let config = Config::persistent("/tmp/test_db");
assert_eq!(
config.path.as_deref(),
Some(std::path::Path::new("/tmp/test_db"))
);
assert!(config.wal_enabled);
}
#[test]
fn test_config_with_memory_limit() {
let config = Config::in_memory().with_memory_limit(1024 * 1024);
assert_eq!(config.memory_limit, Some(1024 * 1024));
}
#[test]
fn test_config_with_threads() {
let config = Config::in_memory().with_threads(8);
assert_eq!(config.threads, 8);
}
#[test]
fn test_config_without_backward_edges() {
let config = Config::in_memory().without_backward_edges();
assert!(!config.backward_edges);
}
#[test]
fn test_config_with_query_logging() {
let config = Config::in_memory().with_query_logging();
assert!(config.query_logging);
}
#[test]
fn test_config_with_spill_path() {
let config = Config::in_memory().with_spill_path("/tmp/spill");
assert_eq!(
config.spill_path.as_deref(),
Some(std::path::Path::new("/tmp/spill"))
);
}
#[test]
fn test_config_with_memory_fraction() {
let config = Config::in_memory().with_memory_fraction(0.5);
assert!(config.memory_limit.is_some());
assert!(config.memory_limit.unwrap() > 0);
}
#[test]
fn test_config_with_adaptive() {
let adaptive = AdaptiveConfig::default().with_threshold(5.0);
let config = Config::in_memory().with_adaptive(adaptive);
assert!((config.adaptive.threshold - 5.0).abs() < f64::EPSILON);
}
#[test]
fn test_config_without_adaptive() {
let config = Config::in_memory().without_adaptive();
assert!(!config.adaptive.enabled);
}
#[test]
fn test_config_without_factorized_execution() {
let config = Config::in_memory().without_factorized_execution();
assert!(!config.factorized_execution);
}
#[test]
fn test_config_builder_chaining() {
let config = Config::persistent("/tmp/db")
.with_memory_limit(512 * 1024 * 1024)
.with_threads(4)
.with_query_logging()
.without_backward_edges()
.with_spill_path("/tmp/spill");
assert!(config.path.is_some());
assert_eq!(config.memory_limit, Some(512 * 1024 * 1024));
assert_eq!(config.threads, 4);
assert!(config.query_logging);
assert!(!config.backward_edges);
assert!(config.spill_path.is_some());
}
#[test]
fn test_adaptive_config_default() {
let config = AdaptiveConfig::default();
assert!(config.enabled);
assert!((config.threshold - 3.0).abs() < f64::EPSILON);
assert_eq!(config.min_rows, 1000);
assert_eq!(config.max_reoptimizations, 3);
}
#[test]
fn test_adaptive_config_disabled() {
let config = AdaptiveConfig::disabled();
assert!(!config.enabled);
}
#[test]
fn test_adaptive_config_with_threshold() {
let config = AdaptiveConfig::default().with_threshold(10.0);
assert!((config.threshold - 10.0).abs() < f64::EPSILON);
}
#[test]
fn test_adaptive_config_with_min_rows() {
let config = AdaptiveConfig::default().with_min_rows(500);
assert_eq!(config.min_rows, 500);
}
#[test]
fn test_adaptive_config_with_max_reoptimizations() {
let config = AdaptiveConfig::default().with_max_reoptimizations(5);
assert_eq!(config.max_reoptimizations, 5);
}
#[test]
fn test_adaptive_config_builder_chaining() {
let config = AdaptiveConfig::default()
.with_threshold(2.0)
.with_min_rows(100)
.with_max_reoptimizations(10);
assert!((config.threshold - 2.0).abs() < f64::EPSILON);
assert_eq!(config.min_rows, 100);
assert_eq!(config.max_reoptimizations, 10);
}
#[test]
fn test_graph_model_default_is_lpg() {
assert_eq!(GraphModel::default(), GraphModel::Lpg);
}
#[test]
fn test_graph_model_display() {
assert_eq!(GraphModel::Lpg.to_string(), "LPG");
assert_eq!(GraphModel::Rdf.to_string(), "RDF");
}
#[test]
fn test_config_with_graph_model() {
let config = Config::in_memory().with_graph_model(GraphModel::Rdf);
assert_eq!(config.graph_model, GraphModel::Rdf);
}
#[test]
fn test_durability_mode_default_is_batch() {
let mode = DurabilityMode::default();
assert_eq!(
mode,
DurabilityMode::Batch {
max_delay_ms: 100,
max_records: 1000
}
);
}
#[test]
fn test_config_with_wal_durability() {
let config = Config::persistent("/tmp/db").with_wal_durability(DurabilityMode::Sync);
assert_eq!(config.wal_durability, DurabilityMode::Sync);
}
#[test]
fn test_config_with_wal_durability_nosync() {
let config = Config::persistent("/tmp/db").with_wal_durability(DurabilityMode::NoSync);
assert_eq!(config.wal_durability, DurabilityMode::NoSync);
}
#[test]
fn test_config_with_wal_durability_adaptive() {
let config = Config::persistent("/tmp/db").with_wal_durability(DurabilityMode::Adaptive {
target_interval_ms: 50,
});
assert_eq!(
config.wal_durability,
DurabilityMode::Adaptive {
target_interval_ms: 50
}
);
}
#[test]
fn test_config_default_max_property_size() {
let config = Config::in_memory();
assert_eq!(config.max_property_size, Some(16 * 1024 * 1024));
}
#[test]
fn test_config_with_max_property_size() {
let config = Config::in_memory().with_max_property_size(1024);
assert_eq!(config.max_property_size, Some(1024));
}
#[test]
fn test_config_without_max_property_size() {
let config = Config::in_memory().without_max_property_size();
assert!(config.max_property_size.is_none());
}
#[test]
fn test_config_with_schema_constraints() {
let config = Config::in_memory().with_schema_constraints();
assert!(config.schema_constraints);
}
#[test]
fn test_config_with_query_timeout() {
let config = Config::in_memory().with_query_timeout(Duration::from_mins(1));
assert_eq!(config.query_timeout, Some(Duration::from_mins(1)));
}
#[test]
fn test_config_without_query_timeout() {
let config = Config::in_memory().without_query_timeout();
assert!(config.query_timeout.is_none());
}
#[test]
fn test_config_default_query_timeout() {
let config = Config::in_memory();
assert_eq!(config.query_timeout, Some(Duration::from_secs(30)));
}
#[test]
fn test_config_with_gc_interval() {
let config = Config::in_memory().with_gc_interval(50);
assert_eq!(config.gc_interval, 50);
}
#[test]
fn test_config_gc_disabled() {
let config = Config::in_memory().with_gc_interval(0);
assert_eq!(config.gc_interval, 0);
}
#[test]
fn test_validate_default_config() {
assert!(Config::default().validate().is_ok());
}
#[test]
fn test_validate_in_memory_config() {
assert!(Config::in_memory().validate().is_ok());
}
#[test]
fn test_validate_rejects_zero_memory_limit() {
let config = Config::in_memory().with_memory_limit(0);
assert_eq!(config.validate(), Err(ConfigError::ZeroMemoryLimit));
}
#[test]
fn test_validate_rejects_zero_threads() {
let config = Config::in_memory().with_threads(0);
assert_eq!(config.validate(), Err(ConfigError::ZeroThreads));
}
#[test]
fn test_validate_rejects_zero_wal_flush_interval() {
let mut config = Config::in_memory();
config.wal_flush_interval_ms = 0;
assert_eq!(config.validate(), Err(ConfigError::ZeroWalFlushInterval));
}
#[cfg(not(feature = "triple-store"))]
#[test]
fn test_validate_rejects_rdf_without_feature() {
let config = Config::in_memory().with_graph_model(GraphModel::Rdf);
assert_eq!(config.validate(), Err(ConfigError::RdfFeatureRequired));
}
#[test]
fn test_config_error_display() {
assert_eq!(
ConfigError::ZeroMemoryLimit.to_string(),
"memory_limit must be greater than zero"
);
assert_eq!(
ConfigError::ZeroThreads.to_string(),
"threads must be greater than zero"
);
assert_eq!(
ConfigError::ZeroWalFlushInterval.to_string(),
"wal_flush_interval_ms must be greater than zero"
);
assert_eq!(
ConfigError::RdfFeatureRequired.to_string(),
"RDF graph model requires the `rdf` feature flag to be enabled"
);
}
#[test]
fn test_config_full_builder_chaining() {
let config = Config::persistent("/tmp/db")
.with_graph_model(GraphModel::Lpg)
.with_memory_limit(512 * 1024 * 1024)
.with_threads(4)
.with_query_logging()
.with_wal_durability(DurabilityMode::Sync)
.with_schema_constraints()
.without_backward_edges()
.with_spill_path("/tmp/spill")
.with_query_timeout(Duration::from_mins(1));
assert_eq!(config.graph_model, GraphModel::Lpg);
assert!(config.path.is_some());
assert_eq!(config.memory_limit, Some(512 * 1024 * 1024));
assert_eq!(config.threads, 4);
assert!(config.query_logging);
assert_eq!(config.wal_durability, DurabilityMode::Sync);
assert!(config.schema_constraints);
assert!(!config.backward_edges);
assert!(config.spill_path.is_some());
assert_eq!(config.query_timeout, Some(Duration::from_mins(1)));
assert!(config.validate().is_ok());
}
#[test]
fn test_access_mode_default_is_read_write() {
assert_eq!(AccessMode::default(), AccessMode::ReadWrite);
}
#[test]
fn test_access_mode_display() {
assert_eq!(AccessMode::ReadWrite.to_string(), "read-write");
assert_eq!(AccessMode::ReadOnly.to_string(), "read-only");
}
#[test]
fn test_config_with_access_mode() {
let config = Config::persistent("/tmp/db").with_access_mode(AccessMode::ReadOnly);
assert_eq!(config.access_mode, AccessMode::ReadOnly);
}
#[test]
fn test_config_read_only() {
let config = Config::read_only("/tmp/db.grafeo");
assert_eq!(config.access_mode, AccessMode::ReadOnly);
assert!(config.path.is_some());
assert!(!config.wal_enabled);
}
#[test]
fn test_config_default_is_read_write() {
let config = Config::default();
assert_eq!(config.access_mode, AccessMode::ReadWrite);
}
#[test]
fn test_storage_format_default_is_auto() {
assert_eq!(StorageFormat::default(), StorageFormat::Auto);
}
#[test]
fn test_storage_format_display() {
assert_eq!(StorageFormat::Auto.to_string(), "auto");
assert_eq!(StorageFormat::WalDirectory.to_string(), "wal-directory");
assert_eq!(StorageFormat::SingleFile.to_string(), "single-file");
}
#[test]
fn test_config_with_storage_format() {
let config = Config::in_memory().with_storage_format(StorageFormat::SingleFile);
assert_eq!(config.storage_format, StorageFormat::SingleFile);
let config2 = Config::in_memory().with_storage_format(StorageFormat::WalDirectory);
assert_eq!(config2.storage_format, StorageFormat::WalDirectory);
}
#[test]
fn test_config_with_cdc() {
let config = Config::in_memory().with_cdc();
assert!(config.cdc_enabled);
}
#[test]
fn test_config_cdc_default_false() {
let config = Config::default();
assert!(!config.cdc_enabled);
}
#[test]
fn test_config_error_is_std_error() {
let err = ConfigError::ZeroMemoryLimit;
let dyn_err: &dyn std::error::Error = &err;
assert!(dyn_err.source().is_none());
assert!(!dyn_err.to_string().is_empty());
}
#[test]
fn test_validate_accepts_nonzero_memory_limit() {
let config = Config::in_memory().with_memory_limit(1);
assert!(config.validate().is_ok());
}
#[test]
fn test_validate_accepts_none_memory_limit() {
let config = Config::in_memory();
assert!(config.memory_limit.is_none());
assert!(config.validate().is_ok());
}
#[test]
fn test_durability_mode_debug() {
let sync = DurabilityMode::Sync;
let debug = format!("{sync:?}");
assert_eq!(debug, "Sync");
let no_sync = DurabilityMode::NoSync;
let debug = format!("{no_sync:?}");
assert_eq!(debug, "NoSync");
}
#[test]
fn test_read_only_config_full() {
let config = Config::read_only("/tmp/data.grafeo");
assert_eq!(config.access_mode, AccessMode::ReadOnly);
assert!(!config.wal_enabled);
assert!(config.path.is_some());
assert!(config.backward_edges);
assert_eq!(config.graph_model, GraphModel::Lpg);
}
}