1use std::format;
2use std::time::Duration;
3
4use crate::ChunkId;
5use crate::errors::InvalidChunkFileName;
6use crate::num;
7
8const DEFAULT_FLUSH_BATCH_WAIT: Duration = Duration::from_millis(1);
9const DEFAULT_FLUSH_BATCH_MAX_ITEMS: usize = 2048;
10
11#[derive(Clone, Debug, Default)]
18pub struct Config {
19 pub dir: String,
21
22 pub read_buffer_size: Option<usize>,
24
25 pub chunk_max_records: Option<usize>,
27
28 pub chunk_max_size: Option<usize>,
30
31 pub truncate_incomplete_record: Option<bool>,
36
37 pub flush_batch_wait: Option<Duration>,
42
43 pub flush_batch_max_items: Option<usize>,
47}
48
49impl Config {
50 pub fn new(dir: impl ToString) -> Self {
52 Self {
53 dir: dir.to_string(),
54 ..Default::default()
55 }
56 }
57
58 pub fn new_full(
60 dir: impl ToString,
61 read_buffer_size: Option<usize>,
62 chunk_max_records: Option<usize>,
63 chunk_max_size: Option<usize>,
64 ) -> Self {
65 Self {
66 dir: dir.to_string(),
67 read_buffer_size,
68 chunk_max_records,
69 chunk_max_size,
70 truncate_incomplete_record: None,
71 flush_batch_wait: None,
72 flush_batch_max_items: None,
73 }
74 }
75
76 pub fn read_buffer_size(&self) -> usize {
78 self.read_buffer_size.unwrap_or(64 * 1024 * 1024)
79 }
80
81 pub fn chunk_max_records(&self) -> usize {
83 self.chunk_max_records.unwrap_or(1024 * 1024)
84 }
85
86 pub fn chunk_max_size(&self) -> usize {
88 self.chunk_max_size.unwrap_or(1024 * 1024 * 1024)
89 }
90
91 pub fn truncate_incomplete_record(&self) -> bool {
93 self.truncate_incomplete_record.unwrap_or(true)
94 }
95
96 pub fn flush_batch_wait(&self) -> Duration {
98 self.flush_batch_wait.unwrap_or(DEFAULT_FLUSH_BATCH_WAIT)
99 }
100
101 pub fn flush_batch_max_items(&self) -> usize {
103 self.flush_batch_max_items
104 .unwrap_or(DEFAULT_FLUSH_BATCH_MAX_ITEMS)
105 .max(1)
106 }
107
108 pub fn chunk_path(&self, chunk_id: ChunkId) -> String {
110 let file_name = Self::chunk_file_name(chunk_id);
111 format!("{}/{}", self.dir, file_name)
112 }
113
114 pub fn chunk_file_name(chunk_id: ChunkId) -> String {
118 let file_name = num::format_pad_u64(*chunk_id);
119 format!("r-{}.wal", file_name)
120 }
121
122 pub fn parse_chunk_file_name(
132 file_name: &str,
133 ) -> Result<u64, InvalidChunkFileName> {
134 let without_suffix =
136 file_name.strip_suffix(".wal").ok_or_else(|| {
137 InvalidChunkFileName::new(file_name, "has no '.wal' suffix")
138 })?;
139
140 let without_prefix =
142 without_suffix.strip_prefix("r-").ok_or_else(|| {
143 InvalidChunkFileName::new(file_name, "has no 'r-' prefix")
144 })?;
145
146 if without_prefix.len() != 26 {
147 return Err(InvalidChunkFileName::new(
148 file_name,
149 "does not have 26 digit after 'r-' prefix",
150 ));
151 }
152
153 let digits = without_prefix
154 .chars()
155 .filter(|c| c.is_ascii_digit())
156 .collect::<String>();
157
158 digits.parse::<u64>().map_err(|e| {
160 InvalidChunkFileName::new(
161 file_name,
162 format!("cannot parse as u64: {}", e),
163 )
164 })
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use std::time::Duration;
171
172 use super::Config;
173 use crate::ChunkId;
174
175 #[test]
176 fn test_config_defaults() {
177 let config = Config::new("wal-dir");
178
179 assert_eq!("wal-dir", config.dir);
180 assert_eq!(64 * 1024 * 1024, config.read_buffer_size());
181 assert_eq!(1024 * 1024, config.chunk_max_records());
182 assert_eq!(1024 * 1024 * 1024, config.chunk_max_size());
183 assert!(config.truncate_incomplete_record());
184 assert_eq!(Duration::from_millis(1), config.flush_batch_wait());
185 assert_eq!(2048, config.flush_batch_max_items());
186 }
187
188 #[test]
189 fn test_config_overrides() {
190 let mut config = Config::new_full("wal-dir", Some(1), Some(2), Some(3));
191 config.truncate_incomplete_record = Some(false);
192 config.flush_batch_wait = Some(Duration::from_millis(9));
193 config.flush_batch_max_items = Some(0);
194
195 assert_eq!(1, config.read_buffer_size());
196 assert_eq!(2, config.chunk_max_records());
197 assert_eq!(3, config.chunk_max_size());
198 assert!(!config.truncate_incomplete_record());
199 assert_eq!(Duration::from_millis(9), config.flush_batch_wait());
200 assert_eq!(1, config.flush_batch_max_items());
201 }
202
203 #[test]
204 fn test_chunk_path_and_file_name() {
205 let config = Config::new("wal-dir");
206
207 assert_eq!(
208 "r-00_000_000_000_001_200_000.wal",
209 Config::chunk_file_name(ChunkId(1_200_000))
210 );
211 assert_eq!(
212 "wal-dir/r-00_000_000_000_001_200_000.wal",
213 config.chunk_path(ChunkId(1_200_000))
214 );
215 }
216
217 #[test]
218 fn test_parse_chunk_file_name() {
219 assert_eq!(
220 Config::parse_chunk_file_name("r-10_100_000_000_001_200_000.wal"),
221 Ok(10_100_000_000_001_200_000)
222 );
223
224 assert!(
225 Config::parse_chunk_file_name("r-10_100_000_000_001_200_000_1.wal")
226 .is_err()
227 );
228 assert!(Config::parse_chunk_file_name("r-1000000000.wal").is_err());
229 assert!(
230 Config::parse_chunk_file_name("r-10_100_000_000_001_200_000.wall")
231 .is_err()
232 );
233 assert!(
234 Config::parse_chunk_file_name("rrr-10_100_000_000_001_200_000.wal")
235 .is_err()
236 );
237
238 let bad_file_name = format!("r-{}.wal", "_".repeat(26));
239 let err = Config::parse_chunk_file_name(&bad_file_name).unwrap_err();
240 assert_eq!(bad_file_name, err.bad_file_name);
241 assert!(err.reason.contains("cannot parse as u64"));
242 }
243}