chunked-wal 0.2.0

Chunked write-ahead log implementation
Documentation
use std::format;
use std::time::Duration;

use crate::ChunkId;
use crate::errors::InvalidChunkFileName;
use crate::num;

const DEFAULT_FLUSH_BATCH_WAIT: Duration = Duration::from_millis(1);
const DEFAULT_FLUSH_BATCH_MAX_ITEMS: usize = 2048;

/// Configuration for chunked WAL.
///
/// This struct holds directory, chunk, recovery, and flush batching settings.
///
/// Optional parameters are `Option<T>` in this struct, and default values is
/// evaluated when a getter method is called.
#[derive(Clone, Debug, Default)]
pub struct Config {
    /// Base directory for storing WAL files.
    pub dir: String,

    /// Size of the read buffer in bytes.
    pub read_buffer_size: Option<usize>,

    /// Maximum number of records in a chunk.
    pub chunk_max_records: Option<usize>,

    /// Maximum size of a chunk in bytes.
    pub chunk_max_size: Option<usize>,

    /// Whether to truncate the last half sync-ed record.
    ///
    /// If truncate, the chunk is considered successfully opened.
    /// Otherwise, an io::Error will be returned.
    pub truncate_incomplete_record: Option<bool>,

    /// Maximum time the flush worker waits for more write requests before
    /// starting a sync batch.
    ///
    /// Defaults to 1 millisecond.
    pub flush_batch_wait: Option<Duration>,

    /// Maximum number of write requests to include in one flush batch.
    ///
    /// Defaults to 2048. Values smaller than 1 are treated as 1.
    pub flush_batch_max_items: Option<usize>,
}

impl Config {
    /// Creates a new Config with the specified directory and defaults.
    pub fn new(dir: impl ToString) -> Self {
        Self {
            dir: dir.to_string(),
            ..Default::default()
        }
    }

    /// Creates a new Config with all configurable parameters
    pub fn new_full(
        dir: impl ToString,
        read_buffer_size: Option<usize>,
        chunk_max_records: Option<usize>,
        chunk_max_size: Option<usize>,
    ) -> Self {
        Self {
            dir: dir.to_string(),
            read_buffer_size,
            chunk_max_records,
            chunk_max_size,
            truncate_incomplete_record: None,
            flush_batch_wait: None,
            flush_batch_max_items: None,
        }
    }

    /// Returns the size of read buffer in bytes (defaults to 64MB)
    pub fn read_buffer_size(&self) -> usize {
        self.read_buffer_size.unwrap_or(64 * 1024 * 1024)
    }

    /// Returns the maximum number of records per chunk (defaults to 1M records)
    pub fn chunk_max_records(&self) -> usize {
        self.chunk_max_records.unwrap_or(1024 * 1024)
    }

    /// Returns the maximum size of a chunk in bytes (defaults to 1GB)
    pub fn chunk_max_size(&self) -> usize {
        self.chunk_max_size.unwrap_or(1024 * 1024 * 1024)
    }

    /// Returns whether to truncate incomplete records (defaults to true)
    pub fn truncate_incomplete_record(&self) -> bool {
        self.truncate_incomplete_record.unwrap_or(true)
    }

    /// Returns the bounded wait before syncing a flush batch.
    pub fn flush_batch_wait(&self) -> Duration {
        self.flush_batch_wait.unwrap_or(DEFAULT_FLUSH_BATCH_WAIT)
    }

    /// Returns the maximum number of write requests in one flush batch.
    pub fn flush_batch_max_items(&self) -> usize {
        self.flush_batch_max_items
            .unwrap_or(DEFAULT_FLUSH_BATCH_MAX_ITEMS)
            .max(1)
    }

    /// Returns the full path for a given chunk ID
    pub fn chunk_path(&self, chunk_id: ChunkId) -> String {
        let file_name = Self::chunk_file_name(chunk_id);
        format!("{}/{}", self.dir, file_name)
    }

    /// Generates the file name for a given chunk ID
    ///
    /// The file name format is "r-{padded_chunk_id}.wal"
    pub fn chunk_file_name(chunk_id: ChunkId) -> String {
        let file_name = num::format_pad_u64(*chunk_id);
        format!("r-{}.wal", file_name)
    }

    /// Parses a chunk file name and returns the chunk ID
    ///
    /// # Arguments
    /// * `file_name` - Name of the chunk file (format:
    ///   "r-{padded_chunk_id}.wal")
    ///
    /// # Returns
    /// * `Ok(u64)` - The chunk ID if parsing succeeds
    /// * `Err(InvalidChunkFileName)` - If the file name format is invalid
    pub fn parse_chunk_file_name(
        file_name: &str,
    ) -> Result<u64, InvalidChunkFileName> {
        // 1. Strip the ".wal" suffix or return an error if it's not there
        let without_suffix =
            file_name.strip_suffix(".wal").ok_or_else(|| {
                InvalidChunkFileName::new(file_name, "has no '.wal' suffix")
            })?;

        // 2. Strip the "r-" prefix or return an error if it's not there
        let without_prefix =
            without_suffix.strip_prefix("r-").ok_or_else(|| {
                InvalidChunkFileName::new(file_name, "has no 'r-' prefix")
            })?;

        if without_prefix.len() != 26 {
            return Err(InvalidChunkFileName::new(
                file_name,
                "does not have 26 digit after 'r-' prefix",
            ));
        }

        let digits = without_prefix
            .chars()
            .filter(|c| c.is_ascii_digit())
            .collect::<String>();

        // 3. Parse the remaining string as an u64
        digits.parse::<u64>().map_err(|e| {
            InvalidChunkFileName::new(
                file_name,
                format!("cannot parse as u64: {}", e),
            )
        })
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use super::Config;
    use crate::ChunkId;

    #[test]
    fn test_config_defaults() {
        let config = Config::new("wal-dir");

        assert_eq!("wal-dir", config.dir);
        assert_eq!(64 * 1024 * 1024, config.read_buffer_size());
        assert_eq!(1024 * 1024, config.chunk_max_records());
        assert_eq!(1024 * 1024 * 1024, config.chunk_max_size());
        assert!(config.truncate_incomplete_record());
        assert_eq!(Duration::from_millis(1), config.flush_batch_wait());
        assert_eq!(2048, config.flush_batch_max_items());
    }

    #[test]
    fn test_config_overrides() {
        let mut config = Config::new_full("wal-dir", Some(1), Some(2), Some(3));
        config.truncate_incomplete_record = Some(false);
        config.flush_batch_wait = Some(Duration::from_millis(9));
        config.flush_batch_max_items = Some(0);

        assert_eq!(1, config.read_buffer_size());
        assert_eq!(2, config.chunk_max_records());
        assert_eq!(3, config.chunk_max_size());
        assert!(!config.truncate_incomplete_record());
        assert_eq!(Duration::from_millis(9), config.flush_batch_wait());
        assert_eq!(1, config.flush_batch_max_items());
    }

    #[test]
    fn test_chunk_path_and_file_name() {
        let config = Config::new("wal-dir");

        assert_eq!(
            "r-00_000_000_000_001_200_000.wal",
            Config::chunk_file_name(ChunkId(1_200_000))
        );
        assert_eq!(
            "wal-dir/r-00_000_000_000_001_200_000.wal",
            config.chunk_path(ChunkId(1_200_000))
        );
    }

    #[test]
    fn test_parse_chunk_file_name() {
        assert_eq!(
            Config::parse_chunk_file_name("r-10_100_000_000_001_200_000.wal"),
            Ok(10_100_000_000_001_200_000)
        );

        assert!(
            Config::parse_chunk_file_name("r-10_100_000_000_001_200_000_1.wal")
                .is_err()
        );
        assert!(Config::parse_chunk_file_name("r-1000000000.wal").is_err());
        assert!(
            Config::parse_chunk_file_name("r-10_100_000_000_001_200_000.wall")
                .is_err()
        );
        assert!(
            Config::parse_chunk_file_name("rrr-10_100_000_000_001_200_000.wal")
                .is_err()
        );

        let bad_file_name = format!("r-{}.wal", "_".repeat(26));
        let err = Config::parse_chunk_file_name(&bad_file_name).unwrap_err();
        assert_eq!(bad_file_name, err.bad_file_name);
        assert!(err.reason.contains("cannot parse as u64"));
    }
}