use std::format;
use std::time::Duration;
use crate::ChunkId;
use crate::errors::InvalidChunkFileName;
use crate::num;
const DEFAULT_FLUSH_BATCH_WAIT: Duration = Duration::from_millis(1);
const DEFAULT_FLUSH_BATCH_MAX_ITEMS: usize = 2048;
#[derive(Clone, Debug, Default)]
pub struct Config {
pub dir: String,
pub read_buffer_size: Option<usize>,
pub chunk_max_records: Option<usize>,
pub chunk_max_size: Option<usize>,
pub truncate_incomplete_record: Option<bool>,
pub flush_batch_wait: Option<Duration>,
pub flush_batch_max_items: Option<usize>,
}
impl Config {
pub fn new(dir: impl ToString) -> Self {
Self {
dir: dir.to_string(),
..Default::default()
}
}
pub fn new_full(
dir: impl ToString,
read_buffer_size: Option<usize>,
chunk_max_records: Option<usize>,
chunk_max_size: Option<usize>,
) -> Self {
Self {
dir: dir.to_string(),
read_buffer_size,
chunk_max_records,
chunk_max_size,
truncate_incomplete_record: None,
flush_batch_wait: None,
flush_batch_max_items: None,
}
}
pub fn read_buffer_size(&self) -> usize {
self.read_buffer_size.unwrap_or(64 * 1024 * 1024)
}
pub fn chunk_max_records(&self) -> usize {
self.chunk_max_records.unwrap_or(1024 * 1024)
}
pub fn chunk_max_size(&self) -> usize {
self.chunk_max_size.unwrap_or(1024 * 1024 * 1024)
}
pub fn truncate_incomplete_record(&self) -> bool {
self.truncate_incomplete_record.unwrap_or(true)
}
pub fn flush_batch_wait(&self) -> Duration {
self.flush_batch_wait.unwrap_or(DEFAULT_FLUSH_BATCH_WAIT)
}
pub fn flush_batch_max_items(&self) -> usize {
self.flush_batch_max_items
.unwrap_or(DEFAULT_FLUSH_BATCH_MAX_ITEMS)
.max(1)
}
pub fn chunk_path(&self, chunk_id: ChunkId) -> String {
let file_name = Self::chunk_file_name(chunk_id);
format!("{}/{}", self.dir, file_name)
}
pub fn chunk_file_name(chunk_id: ChunkId) -> String {
let file_name = num::format_pad_u64(*chunk_id);
format!("r-{}.wal", file_name)
}
pub fn parse_chunk_file_name(
file_name: &str,
) -> Result<u64, InvalidChunkFileName> {
let without_suffix =
file_name.strip_suffix(".wal").ok_or_else(|| {
InvalidChunkFileName::new(file_name, "has no '.wal' suffix")
})?;
let without_prefix =
without_suffix.strip_prefix("r-").ok_or_else(|| {
InvalidChunkFileName::new(file_name, "has no 'r-' prefix")
})?;
if without_prefix.len() != 26 {
return Err(InvalidChunkFileName::new(
file_name,
"does not have 26 digit after 'r-' prefix",
));
}
let digits = without_prefix
.chars()
.filter(|c| c.is_ascii_digit())
.collect::<String>();
digits.parse::<u64>().map_err(|e| {
InvalidChunkFileName::new(
file_name,
format!("cannot parse as u64: {}", e),
)
})
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::Config;
use crate::ChunkId;
#[test]
fn test_config_defaults() {
let config = Config::new("wal-dir");
assert_eq!("wal-dir", config.dir);
assert_eq!(64 * 1024 * 1024, config.read_buffer_size());
assert_eq!(1024 * 1024, config.chunk_max_records());
assert_eq!(1024 * 1024 * 1024, config.chunk_max_size());
assert!(config.truncate_incomplete_record());
assert_eq!(Duration::from_millis(1), config.flush_batch_wait());
assert_eq!(2048, config.flush_batch_max_items());
}
#[test]
fn test_config_overrides() {
let mut config = Config::new_full("wal-dir", Some(1), Some(2), Some(3));
config.truncate_incomplete_record = Some(false);
config.flush_batch_wait = Some(Duration::from_millis(9));
config.flush_batch_max_items = Some(0);
assert_eq!(1, config.read_buffer_size());
assert_eq!(2, config.chunk_max_records());
assert_eq!(3, config.chunk_max_size());
assert!(!config.truncate_incomplete_record());
assert_eq!(Duration::from_millis(9), config.flush_batch_wait());
assert_eq!(1, config.flush_batch_max_items());
}
#[test]
fn test_chunk_path_and_file_name() {
let config = Config::new("wal-dir");
assert_eq!(
"r-00_000_000_000_001_200_000.wal",
Config::chunk_file_name(ChunkId(1_200_000))
);
assert_eq!(
"wal-dir/r-00_000_000_000_001_200_000.wal",
config.chunk_path(ChunkId(1_200_000))
);
}
#[test]
fn test_parse_chunk_file_name() {
assert_eq!(
Config::parse_chunk_file_name("r-10_100_000_000_001_200_000.wal"),
Ok(10_100_000_000_001_200_000)
);
assert!(
Config::parse_chunk_file_name("r-10_100_000_000_001_200_000_1.wal")
.is_err()
);
assert!(Config::parse_chunk_file_name("r-1000000000.wal").is_err());
assert!(
Config::parse_chunk_file_name("r-10_100_000_000_001_200_000.wall")
.is_err()
);
assert!(
Config::parse_chunk_file_name("rrr-10_100_000_000_001_200_000.wal")
.is_err()
);
let bad_file_name = format!("r-{}.wal", "_".repeat(26));
let err = Config::parse_chunk_file_name(&bad_file_name).unwrap_err();
assert_eq!(bad_file_name, err.bad_file_name);
assert!(err.reason.contains("cannot parse as u64"));
}
}