Skip to main content

chunked_wal/
config.rs

1use std::format;
2use std::time::Duration;
3
4use crate::ChunkId;
5use crate::errors::InvalidChunkFileName;
6use crate::num;
7
8const DEFAULT_FLUSH_BATCH_WAIT: Duration = Duration::from_millis(1);
9const DEFAULT_FLUSH_BATCH_MAX_ITEMS: usize = 2048;
10
11/// Configuration for chunked WAL.
12///
13/// This struct holds directory, chunk, recovery, and flush batching settings.
14///
15/// Optional parameters are `Option<T>` in this struct, and default values is
16/// evaluated when a getter method is called.
17#[derive(Clone, Debug, Default)]
18pub struct Config {
19    /// Base directory for storing WAL files.
20    pub dir: String,
21
22    /// Size of the read buffer in bytes.
23    pub read_buffer_size: Option<usize>,
24
25    /// Maximum number of records in a chunk.
26    pub chunk_max_records: Option<usize>,
27
28    /// Maximum size of a chunk in bytes.
29    pub chunk_max_size: Option<usize>,
30
31    /// Whether to truncate the last half sync-ed record.
32    ///
33    /// If truncate, the chunk is considered successfully opened.
34    /// Otherwise, an io::Error will be returned.
35    pub truncate_incomplete_record: Option<bool>,
36
37    /// Maximum time the flush worker waits for more write requests before
38    /// starting a sync batch.
39    ///
40    /// Defaults to 1 millisecond.
41    pub flush_batch_wait: Option<Duration>,
42
43    /// Maximum number of write requests to include in one flush batch.
44    ///
45    /// Defaults to 2048. Values smaller than 1 are treated as 1.
46    pub flush_batch_max_items: Option<usize>,
47}
48
49impl Config {
50    /// Creates a new Config with the specified directory and defaults.
51    pub fn new(dir: impl ToString) -> Self {
52        Self {
53            dir: dir.to_string(),
54            ..Default::default()
55        }
56    }
57
58    /// Creates a new Config with all configurable parameters
59    pub fn new_full(
60        dir: impl ToString,
61        read_buffer_size: Option<usize>,
62        chunk_max_records: Option<usize>,
63        chunk_max_size: Option<usize>,
64    ) -> Self {
65        Self {
66            dir: dir.to_string(),
67            read_buffer_size,
68            chunk_max_records,
69            chunk_max_size,
70            truncate_incomplete_record: None,
71            flush_batch_wait: None,
72            flush_batch_max_items: None,
73        }
74    }
75
76    /// Returns the size of read buffer in bytes (defaults to 64MB)
77    pub fn read_buffer_size(&self) -> usize {
78        self.read_buffer_size.unwrap_or(64 * 1024 * 1024)
79    }
80
81    /// Returns the maximum number of records per chunk (defaults to 1M records)
82    pub fn chunk_max_records(&self) -> usize {
83        self.chunk_max_records.unwrap_or(1024 * 1024)
84    }
85
86    /// Returns the maximum size of a chunk in bytes (defaults to 1GB)
87    pub fn chunk_max_size(&self) -> usize {
88        self.chunk_max_size.unwrap_or(1024 * 1024 * 1024)
89    }
90
91    /// Returns whether to truncate incomplete records (defaults to true)
92    pub fn truncate_incomplete_record(&self) -> bool {
93        self.truncate_incomplete_record.unwrap_or(true)
94    }
95
96    /// Returns the bounded wait before syncing a flush batch.
97    pub fn flush_batch_wait(&self) -> Duration {
98        self.flush_batch_wait.unwrap_or(DEFAULT_FLUSH_BATCH_WAIT)
99    }
100
101    /// Returns the maximum number of write requests in one flush batch.
102    pub fn flush_batch_max_items(&self) -> usize {
103        self.flush_batch_max_items
104            .unwrap_or(DEFAULT_FLUSH_BATCH_MAX_ITEMS)
105            .max(1)
106    }
107
108    /// Returns the full path for a given chunk ID
109    pub fn chunk_path(&self, chunk_id: ChunkId) -> String {
110        let file_name = Self::chunk_file_name(chunk_id);
111        format!("{}/{}", self.dir, file_name)
112    }
113
114    /// Generates the file name for a given chunk ID
115    ///
116    /// The file name format is "r-{padded_chunk_id}.wal"
117    pub fn chunk_file_name(chunk_id: ChunkId) -> String {
118        let file_name = num::format_pad_u64(*chunk_id);
119        format!("r-{}.wal", file_name)
120    }
121
122    /// Parses a chunk file name and returns the chunk ID
123    ///
124    /// # Arguments
125    /// * `file_name` - Name of the chunk file (format:
126    ///   "r-{padded_chunk_id}.wal")
127    ///
128    /// # Returns
129    /// * `Ok(u64)` - The chunk ID if parsing succeeds
130    /// * `Err(InvalidChunkFileName)` - If the file name format is invalid
131    pub fn parse_chunk_file_name(
132        file_name: &str,
133    ) -> Result<u64, InvalidChunkFileName> {
134        // 1. Strip the ".wal" suffix or return an error if it's not there
135        let without_suffix =
136            file_name.strip_suffix(".wal").ok_or_else(|| {
137                InvalidChunkFileName::new(file_name, "has no '.wal' suffix")
138            })?;
139
140        // 2. Strip the "r-" prefix or return an error if it's not there
141        let without_prefix =
142            without_suffix.strip_prefix("r-").ok_or_else(|| {
143                InvalidChunkFileName::new(file_name, "has no 'r-' prefix")
144            })?;
145
146        if without_prefix.len() != 26 {
147            return Err(InvalidChunkFileName::new(
148                file_name,
149                "does not have 26 digit after 'r-' prefix",
150            ));
151        }
152
153        let digits = without_prefix
154            .chars()
155            .filter(|c| c.is_ascii_digit())
156            .collect::<String>();
157
158        // 3. Parse the remaining string as an u64
159        digits.parse::<u64>().map_err(|e| {
160            InvalidChunkFileName::new(
161                file_name,
162                format!("cannot parse as u64: {}", e),
163            )
164        })
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use std::time::Duration;
171
172    use super::Config;
173    use crate::ChunkId;
174
175    #[test]
176    fn test_config_defaults() {
177        let config = Config::new("wal-dir");
178
179        assert_eq!("wal-dir", config.dir);
180        assert_eq!(64 * 1024 * 1024, config.read_buffer_size());
181        assert_eq!(1024 * 1024, config.chunk_max_records());
182        assert_eq!(1024 * 1024 * 1024, config.chunk_max_size());
183        assert!(config.truncate_incomplete_record());
184        assert_eq!(Duration::from_millis(1), config.flush_batch_wait());
185        assert_eq!(2048, config.flush_batch_max_items());
186    }
187
188    #[test]
189    fn test_config_overrides() {
190        let mut config = Config::new_full("wal-dir", Some(1), Some(2), Some(3));
191        config.truncate_incomplete_record = Some(false);
192        config.flush_batch_wait = Some(Duration::from_millis(9));
193        config.flush_batch_max_items = Some(0);
194
195        assert_eq!(1, config.read_buffer_size());
196        assert_eq!(2, config.chunk_max_records());
197        assert_eq!(3, config.chunk_max_size());
198        assert!(!config.truncate_incomplete_record());
199        assert_eq!(Duration::from_millis(9), config.flush_batch_wait());
200        assert_eq!(1, config.flush_batch_max_items());
201    }
202
203    #[test]
204    fn test_chunk_path_and_file_name() {
205        let config = Config::new("wal-dir");
206
207        assert_eq!(
208            "r-00_000_000_000_001_200_000.wal",
209            Config::chunk_file_name(ChunkId(1_200_000))
210        );
211        assert_eq!(
212            "wal-dir/r-00_000_000_000_001_200_000.wal",
213            config.chunk_path(ChunkId(1_200_000))
214        );
215    }
216
217    #[test]
218    fn test_parse_chunk_file_name() {
219        assert_eq!(
220            Config::parse_chunk_file_name("r-10_100_000_000_001_200_000.wal"),
221            Ok(10_100_000_000_001_200_000)
222        );
223
224        assert!(
225            Config::parse_chunk_file_name("r-10_100_000_000_001_200_000_1.wal")
226                .is_err()
227        );
228        assert!(Config::parse_chunk_file_name("r-1000000000.wal").is_err());
229        assert!(
230            Config::parse_chunk_file_name("r-10_100_000_000_001_200_000.wall")
231                .is_err()
232        );
233        assert!(
234            Config::parse_chunk_file_name("rrr-10_100_000_000_001_200_000.wal")
235                .is_err()
236        );
237
238        let bad_file_name = format!("r-{}.wal", "_".repeat(26));
239        let err = Config::parse_chunk_file_name(&bad_file_name).unwrap_err();
240        assert_eq!(bad_file_name, err.bad_file_name);
241        assert!(err.reason.contains("cannot parse as u64"));
242    }
243}