use log::{info, warn};
use serde::{Deserialize, Serialize};
use crate::pipe_log::Version;
use crate::{util::ReadableSize, Result};
const MIN_RECOVERY_READ_BLOCK_SIZE: usize = 512;
const MIN_RECOVERY_THREADS: usize = 1;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum RecoveryMode {
AbsoluteConsistency,
#[serde(
alias = "tolerate-corrupted-tail-records",
rename(serialize = "tolerate-corrupted-tail-records")
)]
TolerateTailCorruption,
TolerateAnyCorruption,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
pub dir: String,
pub spill_dir: Option<String>,
pub recovery_mode: RecoveryMode,
pub recovery_read_block_size: ReadableSize,
pub recovery_threads: usize,
pub batch_compression_threshold: ReadableSize,
pub compression_level: Option<usize>,
pub bytes_per_sync: Option<ReadableSize>,
pub format_version: Version,
pub target_file_size: ReadableSize,
pub purge_threshold: ReadableSize,
pub purge_rewrite_threshold: Option<ReadableSize>,
pub purge_rewrite_garbage_ratio: f64,
pub memory_limit: Option<ReadableSize>,
pub enable_log_recycle: bool,
pub prefill_for_recycle: bool,
pub prefill_limit: Option<ReadableSize>,
}
impl Default for Config {
fn default() -> Config {
#[allow(unused_mut)]
let mut cfg = Config {
dir: "".to_owned(),
spill_dir: None,
recovery_mode: RecoveryMode::TolerateTailCorruption,
recovery_read_block_size: ReadableSize::kb(16),
recovery_threads: 4,
batch_compression_threshold: ReadableSize::kb(8),
compression_level: None,
bytes_per_sync: None,
format_version: Version::V2,
target_file_size: ReadableSize::mb(128),
purge_threshold: ReadableSize::gb(10),
purge_rewrite_threshold: None,
purge_rewrite_garbage_ratio: 0.6,
memory_limit: None,
enable_log_recycle: true,
prefill_for_recycle: false,
prefill_limit: None,
};
#[cfg(test)]
{
cfg.memory_limit = Some(ReadableSize(0));
}
cfg
}
}
impl Config {
pub fn sanitize(&mut self) -> Result<()> {
if self.purge_threshold.0 < self.target_file_size.0 {
return Err(box_err!("purge-threshold < target-file-size"));
}
if self.purge_rewrite_threshold.is_none() {
self.purge_rewrite_threshold = Some(ReadableSize(std::cmp::max(
self.purge_threshold.0 / 10,
self.target_file_size.0,
)));
}
if self.bytes_per_sync.is_some() {
warn!("bytes-per-sync has been deprecated.");
}
let min_recovery_read_block_size = ReadableSize(MIN_RECOVERY_READ_BLOCK_SIZE as u64);
if self.recovery_read_block_size < min_recovery_read_block_size {
warn!(
"recovery-read-block-size ({}) is too small, setting it to {min_recovery_read_block_size}",
self.recovery_read_block_size
);
self.recovery_read_block_size = min_recovery_read_block_size;
}
if self.recovery_threads < MIN_RECOVERY_THREADS {
warn!(
"recovery-threads ({}) is too small, setting it to {MIN_RECOVERY_THREADS}",
self.recovery_threads
);
self.recovery_threads = MIN_RECOVERY_THREADS;
}
if self.enable_log_recycle && !self.format_version.has_log_signing() {
return Err(box_err!(
"format version {} doesn't support log recycle, use 2 or above",
self.format_version
));
}
if !self.enable_log_recycle && self.prefill_for_recycle {
return Err(box_err!(
"prefill is not allowed when log recycle is disabled"
));
}
if !self.prefill_for_recycle && self.prefill_limit.is_some() {
warn!("prefill-limit will be ignored when prefill is disabled");
self.prefill_limit = None;
}
if self.prefill_for_recycle && self.prefill_limit.is_none() {
info!("prefill-limit will be calibrated to purge-threshold");
self.prefill_limit = Some(self.purge_threshold);
}
#[cfg(not(feature = "swap"))]
if self.memory_limit.is_some() {
warn!("memory-limit will be ignored because swap feature is disabled");
}
Ok(())
}
pub(crate) fn recycle_capacity(&self) -> usize {
if !self.format_version.has_log_signing() {
return 0;
}
if self.enable_log_recycle && self.purge_threshold.0 >= self.target_file_size.0 {
std::cmp::min(
(self.purge_threshold.0 / self.target_file_size.0) as usize + 2,
u32::MAX as usize,
)
} else {
0
}
}
pub(crate) fn prefill_capacity(&self) -> usize {
if !self.enable_log_recycle || !self.format_version.has_log_signing() {
return 0;
}
let prefill_limit = self.prefill_limit.unwrap_or(ReadableSize(0)).0;
if self.prefill_for_recycle && prefill_limit >= self.target_file_size.0 {
std::cmp::min(
(prefill_limit / self.target_file_size.0) as usize + 2,
u32::MAX as usize,
)
} else {
0
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serde() {
let value = Config::default();
let dump = toml::to_string_pretty(&value).unwrap();
let load = toml::from_str(&dump).unwrap();
assert_eq!(value, load);
assert!(load.spill_dir.is_none());
}
#[test]
fn test_custom() {
let custom = r#"
dir = "custom_dir"
spill-dir = "custom_spill_dir"
recovery-mode = "tolerate-tail-corruption"
bytes-per-sync = "2KB"
target-file-size = "1MB"
purge-threshold = "3MB"
format-version = 1
enable-log-recycle = false
prefill-for-recycle = false
"#;
let mut load: Config = toml::from_str(custom).unwrap();
assert_eq!(load.dir, "custom_dir");
assert_eq!(load.spill_dir, Some("custom_spill_dir".to_owned()));
assert_eq!(load.recovery_mode, RecoveryMode::TolerateTailCorruption);
assert_eq!(load.bytes_per_sync, Some(ReadableSize::kb(2)));
assert_eq!(load.target_file_size, ReadableSize::mb(1));
assert_eq!(load.purge_threshold, ReadableSize::mb(3));
assert_eq!(load.format_version, Version::V1);
assert_eq!(load.enable_log_recycle, false);
assert_eq!(load.prefill_for_recycle, false);
load.sanitize().unwrap();
}
#[test]
fn test_invalid() {
let hard_error = r#"
target-file-size = "5MB"
purge-threshold = "3MB"
"#;
let mut hard_load: Config = toml::from_str(hard_error).unwrap();
assert!(hard_load.sanitize().is_err());
let soft_error = r#"
recovery-read-block-size = 1
recovery-threads = 0
target-file-size = "5000MB"
format-version = 2
enable-log-recycle = true
prefill-for-recycle = true
"#;
let soft_load: Config = toml::from_str(soft_error).unwrap();
assert!(soft_load.recovery_read_block_size.0 < MIN_RECOVERY_READ_BLOCK_SIZE as u64);
assert!(soft_load.recovery_threads < MIN_RECOVERY_THREADS);
let mut soft_sanitized = soft_load;
soft_sanitized.sanitize().unwrap();
assert!(soft_sanitized.recovery_read_block_size.0 >= MIN_RECOVERY_READ_BLOCK_SIZE as u64);
assert!(soft_sanitized.recovery_threads >= MIN_RECOVERY_THREADS);
assert_eq!(
soft_sanitized.purge_rewrite_threshold.unwrap(),
soft_sanitized.target_file_size
);
let recycle_error = r#"
enable-log-recycle = true
format-version = 1
"#;
let mut cfg_load: Config = toml::from_str(recycle_error).unwrap();
assert!(cfg_load.sanitize().is_err());
let prefill_error = r#"
enable-log-recycle = false
prefill-for-recycle = true
format-version = 2
"#;
let mut cfg_load: Config = toml::from_str(prefill_error).unwrap();
assert!(cfg_load.sanitize().is_err());
}
#[test]
fn test_backward_compactibility() {
let old = r#"
recovery-mode = "tolerate-corrupted-tail-records"
"#;
let mut load: Config = toml::from_str(old).unwrap();
load.sanitize().unwrap();
assert!(toml::to_string(&load)
.unwrap()
.contains("tolerate-corrupted-tail-records"));
}
#[test]
fn test_prefill_for_recycle() {
let default_prefill_v1 = r#"
enable-log-recycle = true
prefill-for-recycle = true
"#;
let mut cfg_load: Config = toml::from_str(default_prefill_v1).unwrap();
assert!(cfg_load.sanitize().is_ok());
assert_eq!(cfg_load.prefill_limit.unwrap(), cfg_load.purge_threshold);
let default_prefill_v2 = r#"
enable-log-recycle = true
prefill-for-recycle = false
prefill-limit = "20GB"
"#;
let mut cfg_load: Config = toml::from_str(default_prefill_v2).unwrap();
assert!(cfg_load.sanitize().is_ok());
assert!(cfg_load.prefill_limit.is_none());
}
}