use crate::core::error::{Result, StorageError};
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "config-toml", derive(serde::Serialize, serde::Deserialize))]
pub enum DurabilityMode {
Synchronous,
Async {
flush_interval_ms: u64,
},
GroupCommit {
max_delay_ms: u64,
max_batch_size: usize,
},
AsyncBatched {
max_delay_ms: u64,
max_batch_size: usize,
},
}
impl Default for DurabilityMode {
fn default() -> Self {
DurabilityMode::Synchronous
}
}
impl DurabilityMode {
#[allow(dead_code)] pub(crate) const fn async_mode(flush_interval_ms: u64) -> Self {
assert!(
flush_interval_ms > 0,
"flush_interval_ms must be greater than 0"
);
assert!(
flush_interval_ms <= 60_000,
"flush_interval_ms must be <= 60000ms (1 minute)"
);
DurabilityMode::Async { flush_interval_ms }
}
pub fn async_mode_validated(flush_interval_ms: u64) -> Result<Self> {
if flush_interval_ms == 0 {
return Err(StorageError::WalError {
reason: "flush_interval_ms must be greater than 0".to_string(),
}
.into());
}
if flush_interval_ms > 60_000 {
return Err(StorageError::WalError {
reason: "flush_interval_ms must be <= 60000ms (1 minute)".to_string(),
}
.into());
}
Ok(DurabilityMode::Async { flush_interval_ms })
}
#[allow(dead_code)] pub(crate) const fn group_commit(max_delay_ms: u64, max_batch_size: usize) -> Self {
assert!(max_delay_ms > 0, "max_delay_ms must be greater than 0");
assert!(
max_delay_ms <= 1000,
"max_delay_ms must be <= 1000ms (1 second)"
);
assert!(max_batch_size > 0, "max_batch_size must be greater than 0");
assert!(max_batch_size <= 10_000, "max_batch_size must be <= 10000");
DurabilityMode::GroupCommit {
max_delay_ms,
max_batch_size,
}
}
pub fn group_commit_validated(max_delay_ms: u64, max_batch_size: usize) -> Result<Self> {
if max_delay_ms == 0 {
return Err(StorageError::WalError {
reason: "max_delay_ms must be greater than 0".to_string(),
}
.into());
}
if max_delay_ms > 1000 {
return Err(StorageError::WalError {
reason: "max_delay_ms must be <= 1000ms (1 second)".to_string(),
}
.into());
}
if max_batch_size == 0 {
return Err(StorageError::WalError {
reason: "max_batch_size must be greater than 0".to_string(),
}
.into());
}
if max_batch_size > 10_000 {
return Err(StorageError::WalError {
reason: "max_batch_size must be <= 10000".to_string(),
}
.into());
}
Ok(DurabilityMode::GroupCommit {
max_delay_ms,
max_batch_size,
})
}
pub const fn group_commit_default() -> Self {
DurabilityMode::GroupCommit {
max_delay_ms: 2,
max_batch_size: 200,
}
}
pub fn async_batched_validated(max_delay_ms: u64, max_batch_size: usize) -> Result<Self> {
match max_delay_ms {
0 => {
return Err(StorageError::WalError {
reason: "max_delay_ms must be greater than 0".to_string(),
}
.into());
}
1..=1000 => {} _ => {
return Err(StorageError::WalError {
reason: "max_delay_ms must be <= 1000ms (1 second)".to_string(),
}
.into());
}
}
match max_batch_size {
0 => {
return Err(StorageError::WalError {
reason: "max_batch_size must be greater than 0".to_string(),
}
.into());
}
1..=10_000 => {} _ => {
return Err(StorageError::WalError {
reason: "max_batch_size must be <= 10000".to_string(),
}
.into());
}
}
Ok(DurabilityMode::AsyncBatched {
max_delay_ms,
max_batch_size,
})
}
pub const fn async_batched_default() -> Self {
DurabilityMode::AsyncBatched {
max_delay_ms: 10,
max_batch_size: 100,
}
}
pub const fn fast() -> Self {
DurabilityMode::GroupCommit {
max_delay_ms: 1,
max_batch_size: 500,
}
}
pub const fn needs_background_thread(&self) -> bool {
matches!(
self,
DurabilityMode::Async { .. }
| DurabilityMode::GroupCommit { .. }
| DurabilityMode::AsyncBatched { .. }
)
}
pub const fn flush_interval(&self) -> Option<Duration> {
match self {
DurabilityMode::Synchronous => None,
DurabilityMode::Async { flush_interval_ms } => {
Some(Duration::from_millis(*flush_interval_ms))
}
DurabilityMode::GroupCommit { max_delay_ms, .. } => {
Some(Duration::from_millis(*max_delay_ms))
}
DurabilityMode::AsyncBatched { max_delay_ms, .. } => {
Some(Duration::from_millis(*max_delay_ms))
}
}
}
pub const fn waits_for_durability(&self) -> bool {
matches!(
self,
DurabilityMode::Synchronous | DurabilityMode::GroupCommit { .. }
)
}
pub const fn is_acid_durable(&self) -> bool {
matches!(
self,
DurabilityMode::Synchronous | DurabilityMode::GroupCommit { .. }
)
}
}
#[derive(Debug, Clone, Default)]
pub struct WriteOptions {
pub durability_mode: Option<DurabilityMode>,
}
impl WriteOptions {
const BULK_IMPORT_FLUSH_INTERVAL_MS: u64 = 100;
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_durability(mut self, mode: DurabilityMode) -> Self {
self.durability_mode = Some(mode);
self
}
pub fn effective_durability(&self, default: DurabilityMode) -> DurabilityMode {
self.durability_mode.unwrap_or(default)
}
#[must_use]
pub fn bulk_import() -> Self {
Self {
durability_mode: Some(DurabilityMode::Async {
flush_interval_ms: Self::BULK_IMPORT_FLUSH_INTERVAL_MS,
}),
}
}
#[must_use]
pub fn critical() -> Self {
Self {
durability_mode: Some(DurabilityMode::Synchronous),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_is_synchronous() {
assert_eq!(DurabilityMode::default(), DurabilityMode::Synchronous);
}
#[test]
fn test_async_mode_constructor() {
let mode = DurabilityMode::async_mode(100);
assert_eq!(
mode,
DurabilityMode::Async {
flush_interval_ms: 100
}
);
}
#[test]
fn test_group_commit_constructor() {
let mode = DurabilityMode::group_commit(10, 200);
assert_eq!(
mode,
DurabilityMode::GroupCommit {
max_delay_ms: 10,
max_batch_size: 200
}
);
}
#[test]
fn test_group_commit_default() {
let mode = DurabilityMode::group_commit_default();
assert_eq!(
mode,
DurabilityMode::GroupCommit {
max_delay_ms: 2,
max_batch_size: 200
}
);
}
#[test]
fn test_fast() {
let mode = DurabilityMode::fast();
assert_eq!(
mode,
DurabilityMode::GroupCommit {
max_delay_ms: 1,
max_batch_size: 500
}
);
}
#[test]
fn test_needs_background_thread() {
assert!(!DurabilityMode::Synchronous.needs_background_thread());
assert!(DurabilityMode::async_mode(100).needs_background_thread());
assert!(DurabilityMode::group_commit_default().needs_background_thread());
}
#[test]
fn test_flush_interval() {
assert_eq!(DurabilityMode::Synchronous.flush_interval(), None);
assert_eq!(
DurabilityMode::async_mode(100).flush_interval(),
Some(Duration::from_millis(100))
);
assert_eq!(
DurabilityMode::group_commit(10, 200).flush_interval(),
Some(Duration::from_millis(10))
);
}
#[test]
fn test_waits_for_durability() {
assert!(DurabilityMode::Synchronous.waits_for_durability());
assert!(!DurabilityMode::async_mode(100).waits_for_durability());
assert!(DurabilityMode::group_commit_default().waits_for_durability());
}
#[test]
fn test_is_acid_durable() {
assert!(DurabilityMode::Synchronous.is_acid_durable());
assert!(!DurabilityMode::async_mode(100).is_acid_durable());
assert!(DurabilityMode::group_commit_default().is_acid_durable());
}
#[test]
fn test_write_options_default() {
let opts = WriteOptions::default();
assert!(opts.durability_mode.is_none());
}
#[test]
fn test_write_options_with_durability() {
let opts = WriteOptions::new().with_durability(DurabilityMode::async_mode(50));
assert_eq!(
opts.durability_mode,
Some(DurabilityMode::Async {
flush_interval_ms: 50
})
);
}
#[test]
fn test_effective_durability_uses_override() {
let opts = WriteOptions::new().with_durability(DurabilityMode::async_mode(50));
let effective = opts.effective_durability(DurabilityMode::Synchronous);
assert_eq!(
effective,
DurabilityMode::Async {
flush_interval_ms: 50
}
);
}
#[test]
fn test_effective_durability_uses_default_when_none() {
let opts = WriteOptions::new();
let effective = opts.effective_durability(DurabilityMode::Synchronous);
assert_eq!(effective, DurabilityMode::Synchronous);
}
#[test]
fn test_write_options_bulk_import_preset() {
let opts = WriteOptions::bulk_import();
assert!(opts.durability_mode.is_some());
let mode = opts.durability_mode.unwrap();
match mode {
DurabilityMode::Async { flush_interval_ms } => {
assert_eq!(
flush_interval_ms,
WriteOptions::BULK_IMPORT_FLUSH_INTERVAL_MS,
"bulk_import preset should use the defined flush interval constant"
);
}
_ => panic!("bulk_import should return Async mode, got {:?}", mode),
}
}
#[test]
fn test_write_options_critical_preset() {
let opts = WriteOptions::critical();
assert!(opts.durability_mode.is_some());
let mode = opts.durability_mode.unwrap();
assert_eq!(
mode,
DurabilityMode::Synchronous,
"critical should return Synchronous mode"
);
}
#[test]
fn test_preset_methods_can_chain_with_other_methods() {
let opts = WriteOptions::bulk_import();
assert!(opts.durability_mode.is_some());
let effective = opts.effective_durability(DurabilityMode::Synchronous);
assert!(matches!(effective, DurabilityMode::Async { .. }));
}
#[test]
fn test_async_mode_validated_success() {
let mode = DurabilityMode::async_mode_validated(100).unwrap();
assert_eq!(
mode,
DurabilityMode::Async {
flush_interval_ms: 100
}
);
}
#[test]
fn test_async_mode_validated_zero_fails() {
let result = DurabilityMode::async_mode_validated(0);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("flush_interval_ms"));
assert!(err.to_string().contains("greater than 0"));
}
#[test]
fn test_async_mode_validated_too_large_fails() {
let result = DurabilityMode::async_mode_validated(60_001);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("flush_interval_ms"));
assert!(err.to_string().contains("60000ms"));
}
#[test]
fn test_async_mode_validated_boundary_values() {
assert!(DurabilityMode::async_mode_validated(1).is_ok());
assert!(DurabilityMode::async_mode_validated(60_000).is_ok());
}
#[test]
fn test_group_commit_validated_success() {
let mode = DurabilityMode::group_commit_validated(10, 200).unwrap();
assert_eq!(
mode,
DurabilityMode::GroupCommit {
max_delay_ms: 10,
max_batch_size: 200
}
);
}
#[test]
fn test_group_commit_validated_zero_delay_fails() {
let result = DurabilityMode::group_commit_validated(0, 200);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("max_delay_ms"));
assert!(err.to_string().contains("greater than 0"));
}
#[test]
fn test_group_commit_validated_delay_too_large_fails() {
let result = DurabilityMode::group_commit_validated(1001, 200);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("max_delay_ms"));
assert!(err.to_string().contains("1000ms"));
}
#[test]
fn test_group_commit_validated_zero_batch_fails() {
let result = DurabilityMode::group_commit_validated(10, 0);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("max_batch_size"));
assert!(err.to_string().contains("greater than 0"));
}
#[test]
fn test_group_commit_validated_batch_too_large_fails() {
let result = DurabilityMode::group_commit_validated(10, 10_001);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("max_batch_size"));
assert!(err.to_string().contains("10000"));
}
#[test]
fn test_group_commit_validated_boundary_values() {
assert!(DurabilityMode::group_commit_validated(1, 1).is_ok());
assert!(DurabilityMode::group_commit_validated(1000, 10_000).is_ok());
}
#[test]
fn test_async_batched_constructor() {
let mode = DurabilityMode::async_batched_validated(10, 100).unwrap();
assert!(matches!(
mode,
DurabilityMode::AsyncBatched {
max_delay_ms: 10,
max_batch_size: 100
}
));
}
#[test]
fn test_async_batched_validation_zero_delay_fails() {
let result = DurabilityMode::async_batched_validated(0, 200);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("max_delay_ms"));
assert!(err.to_string().contains("greater than 0"));
}
#[test]
fn test_async_batched_validation_zero_batch_fails() {
let result = DurabilityMode::async_batched_validated(10, 0);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("max_batch_size"));
assert!(err.to_string().contains("greater than 0"));
}
#[test]
fn test_async_batched_validation_boundary_values() {
assert!(DurabilityMode::async_batched_validated(1, 1).is_ok());
assert!(DurabilityMode::async_batched_validated(1000, 10_000).is_ok());
assert!(DurabilityMode::async_batched_validated(1001, 10).is_err());
assert!(DurabilityMode::async_batched_validated(10, 10_001).is_err());
}
#[test]
fn test_async_batched_default() {
let mode = DurabilityMode::async_batched_default();
assert!(matches!(
mode,
DurabilityMode::AsyncBatched {
max_delay_ms: 10,
max_batch_size: 100
}
));
}
#[test]
fn test_async_batched_needs_background_thread() {
let mode = DurabilityMode::async_batched_validated(10, 100).unwrap();
assert!(mode.needs_background_thread());
}
#[test]
fn test_async_batched_does_not_wait_for_durability() {
let mode = DurabilityMode::async_batched_validated(10, 100).unwrap();
assert!(!mode.waits_for_durability());
}
#[test]
fn test_async_batched_not_acid_durable() {
let mode = DurabilityMode::async_batched_validated(10, 100).unwrap();
assert!(!mode.is_acid_durable());
}
#[test]
fn test_async_batched_flush_interval() {
let mode = DurabilityMode::async_batched_validated(42, 100).unwrap();
assert_eq!(mode.flush_interval(), Some(Duration::from_millis(42)));
}
}