use serde::{Deserialize, Serialize};
use std::time::Duration;
fn get_cpu_count() -> usize {
std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(1)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum IngestionSafetyLevel {
Full,
Batched {
batch_size: usize,
batch_timeout_ms: u64,
},
Async {
sync_interval_ms: u64,
},
Unsafe {
disable_wal: bool,
checkpoint_interval_secs: u64,
},
}
impl Default for IngestionSafetyLevel {
fn default() -> Self {
Self::Full
}
}
impl IngestionSafetyLevel {
pub fn batched() -> Self {
Self::Batched {
batch_size: 1000,
batch_timeout_ms: 100,
}
}
pub fn async_default() -> Self {
Self::Async {
sync_interval_ms: 1000,
}
}
pub fn bulk_load() -> Self {
Self::Unsafe {
disable_wal: false,
checkpoint_interval_secs: 60,
}
}
pub fn maximum_performance() -> Self {
Self::Unsafe {
disable_wal: true,
checkpoint_interval_secs: 300,
}
}
pub fn use_wal(&self) -> bool {
match self {
Self::Unsafe { disable_wal: true, .. } => false,
_ => true,
}
}
pub fn sync_on_commit(&self) -> bool {
matches!(self, Self::Full)
}
pub fn batch_params(&self) -> Option<(usize, Duration)> {
match self {
Self::Batched { batch_size, batch_timeout_ms } => {
Some((*batch_size, Duration::from_millis(*batch_timeout_ms)))
}
_ => None,
}
}
pub fn async_sync_interval(&self) -> Option<Duration> {
match self {
Self::Async { sync_interval_ms } => {
Some(Duration::from_millis(*sync_interval_ms))
}
_ => None,
}
}
pub fn checkpoint_interval(&self) -> Option<Duration> {
match self {
Self::Unsafe { checkpoint_interval_secs, .. } if *checkpoint_interval_secs > 0 => {
Some(Duration::from_secs(*checkpoint_interval_secs))
}
_ => None,
}
}
pub fn description(&self) -> &'static str {
match self {
Self::Full => "Full ACID - zero data loss, fsync every commit",
Self::Batched { .. } => "Batched ACID - up to N transactions may be lost",
Self::Async { .. } => "Async durability - recent transactions may be lost",
Self::Unsafe { disable_wal: true, .. } => "UNSAFE - all data since checkpoint may be lost",
Self::Unsafe { disable_wal: false, .. } => "UNSAFE - WAL enabled but no fsync",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LockFreeIngestionConfig {
pub safety_level: IngestionSafetyLevel,
pub partition_count: usize,
pub write_buffer_size: usize,
pub hierarchical_row_ids: bool,
pub max_pending_writes: usize,
pub parallel_compression: bool,
pub compression_workers: usize,
pub row_id_batch_size: u64,
}
impl Default for LockFreeIngestionConfig {
fn default() -> Self {
let cpu_count = get_cpu_count();
Self {
safety_level: IngestionSafetyLevel::Full,
partition_count: cpu_count,
write_buffer_size: 64 * 1024, hierarchical_row_ids: false,
max_pending_writes: 100_000,
parallel_compression: true,
compression_workers: cpu_count.min(4),
row_id_batch_size: 10_000,
}
}
}
impl LockFreeIngestionConfig {
pub fn for_bulk_load() -> Self {
let cpu_count = get_cpu_count();
Self {
safety_level: IngestionSafetyLevel::bulk_load(),
partition_count: cpu_count,
write_buffer_size: 1024 * 1024, hierarchical_row_ids: true,
max_pending_writes: 1_000_000,
parallel_compression: true,
compression_workers: cpu_count,
row_id_batch_size: 100_000,
}
}
pub fn for_maximum_performance() -> Self {
let cpu_count = get_cpu_count();
Self {
safety_level: IngestionSafetyLevel::maximum_performance(),
partition_count: cpu_count * 2,
write_buffer_size: 4 * 1024 * 1024, hierarchical_row_ids: true,
max_pending_writes: 10_000_000,
parallel_compression: true,
compression_workers: cpu_count,
row_id_batch_size: 1_000_000,
}
}
pub fn for_oltp() -> Self {
Self {
safety_level: IngestionSafetyLevel::Full,
partition_count: 4,
write_buffer_size: 16 * 1024,
hierarchical_row_ids: false,
max_pending_writes: 10_000,
parallel_compression: false,
compression_workers: 1,
row_id_batch_size: 1_000,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_safety_level_defaults() {
assert!(IngestionSafetyLevel::default().sync_on_commit());
assert!(IngestionSafetyLevel::default().use_wal());
}
#[test]
fn test_bulk_load_config() {
let config = LockFreeIngestionConfig::for_bulk_load();
assert!(config.hierarchical_row_ids);
assert!(!config.safety_level.sync_on_commit());
}
#[test]
fn test_unsafe_no_wal() {
let level = IngestionSafetyLevel::maximum_performance();
assert!(!level.use_wal());
}
}