1use log::{info, warn};
4use serde::{Deserialize, Serialize};
5
6use crate::pipe_log::Version;
7use crate::{util::ReadableSize, Result};
8
9const MIN_RECOVERY_READ_BLOCK_SIZE: usize = 512;
10const MIN_RECOVERY_THREADS: usize = 1;
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "kebab-case")]
14pub enum RecoveryMode {
15 AbsoluteConsistency,
16 #[serde(
18 alias = "tolerate-corrupted-tail-records",
19 rename(serialize = "tolerate-corrupted-tail-records")
20 )]
21 TolerateTailCorruption,
22 TolerateAnyCorruption,
23}
24
25#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
26#[serde(default)]
27#[serde(rename_all = "kebab-case")]
28pub struct Config {
29 pub dir: String,
33
34 pub spill_dir: Option<String>,
42
43 pub recovery_mode: RecoveryMode,
47 pub recovery_read_block_size: ReadableSize,
51 pub recovery_threads: usize,
55
56 pub batch_compression_threshold: ReadableSize,
61 pub compression_level: Option<usize>,
67 pub bytes_per_sync: Option<ReadableSize>,
73
74 pub format_version: Version,
78
79 pub target_file_size: ReadableSize,
83
84 pub purge_threshold: ReadableSize,
88 pub purge_rewrite_threshold: Option<ReadableSize>,
92 pub purge_rewrite_garbage_ratio: f64,
96
97 pub memory_limit: Option<ReadableSize>,
102
103 pub enable_log_recycle: bool,
109
110 pub prefill_for_recycle: bool,
117
118 pub prefill_limit: Option<ReadableSize>,
124}
125
126impl Default for Config {
127 fn default() -> Config {
128 #[allow(unused_mut)]
129 let mut cfg = Config {
130 dir: "".to_owned(),
131 spill_dir: None,
132 recovery_mode: RecoveryMode::TolerateTailCorruption,
133 recovery_read_block_size: ReadableSize::kb(16),
134 recovery_threads: 4,
135 batch_compression_threshold: ReadableSize::kb(8),
136 compression_level: None,
137 bytes_per_sync: None,
138 format_version: Version::V2,
139 target_file_size: ReadableSize::mb(128),
140 purge_threshold: ReadableSize::gb(10),
141 purge_rewrite_threshold: None,
142 purge_rewrite_garbage_ratio: 0.6,
143 memory_limit: None,
144 enable_log_recycle: true,
145 prefill_for_recycle: false,
146 prefill_limit: None,
147 };
148 #[cfg(test)]
150 {
151 cfg.memory_limit = Some(ReadableSize(0));
152 }
153 cfg
154 }
155}
156
157impl Config {
158 pub fn sanitize(&mut self) -> Result<()> {
159 if self.purge_threshold.0 < self.target_file_size.0 {
160 return Err(box_err!("purge-threshold < target-file-size"));
161 }
162 if self.purge_rewrite_threshold.is_none() {
163 self.purge_rewrite_threshold = Some(ReadableSize(std::cmp::max(
164 self.purge_threshold.0 / 10,
165 self.target_file_size.0,
166 )));
167 }
168 if self.bytes_per_sync.is_some() {
169 warn!("bytes-per-sync has been deprecated.");
170 }
171 let min_recovery_read_block_size = ReadableSize(MIN_RECOVERY_READ_BLOCK_SIZE as u64);
172 if self.recovery_read_block_size < min_recovery_read_block_size {
173 warn!(
174 "recovery-read-block-size ({}) is too small, setting it to {min_recovery_read_block_size}",
175 self.recovery_read_block_size
176 );
177 self.recovery_read_block_size = min_recovery_read_block_size;
178 }
179 if self.recovery_threads < MIN_RECOVERY_THREADS {
180 warn!(
181 "recovery-threads ({}) is too small, setting it to {MIN_RECOVERY_THREADS}",
182 self.recovery_threads
183 );
184 self.recovery_threads = MIN_RECOVERY_THREADS;
185 }
186 if self.enable_log_recycle && !self.format_version.has_log_signing() {
187 return Err(box_err!(
188 "format version {} doesn't support log recycle, use 2 or above",
189 self.format_version
190 ));
191 }
192 if !self.enable_log_recycle && self.prefill_for_recycle {
193 return Err(box_err!(
194 "prefill is not allowed when log recycle is disabled"
195 ));
196 }
197 if !self.prefill_for_recycle && self.prefill_limit.is_some() {
198 warn!("prefill-limit will be ignored when prefill is disabled");
199 self.prefill_limit = None;
200 }
201 if self.prefill_for_recycle && self.prefill_limit.is_none() {
202 info!("prefill-limit will be calibrated to purge-threshold");
203 self.prefill_limit = Some(self.purge_threshold);
204 }
205 #[cfg(not(feature = "swap"))]
206 if self.memory_limit.is_some() {
207 warn!("memory-limit will be ignored because swap feature is disabled");
208 }
209 Ok(())
210 }
211
212 pub(crate) fn recycle_capacity(&self) -> usize {
214 if !self.format_version.has_log_signing() {
218 return 0;
219 }
220 if self.enable_log_recycle && self.purge_threshold.0 >= self.target_file_size.0 {
221 std::cmp::min(
225 (self.purge_threshold.0 / self.target_file_size.0) as usize * 3 / 2,
226 u32::MAX as usize,
227 )
228 } else {
229 0
230 }
231 }
232
233 pub(crate) fn prefill_capacity(&self) -> usize {
235 if !self.enable_log_recycle || !self.format_version.has_log_signing() {
238 return 0;
239 }
240 let prefill_limit = self.prefill_limit.unwrap_or(ReadableSize(0)).0;
241 if self.prefill_for_recycle && prefill_limit >= self.target_file_size.0 {
242 std::cmp::min(
244 (prefill_limit / self.target_file_size.0) as usize * 3 / 2,
245 u32::MAX as usize,
246 )
247 } else {
248 0
249 }
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 #[test]
258 fn test_serde() {
259 let value = Config::default();
260 let dump = toml::to_string_pretty(&value).unwrap();
261 let load = toml::from_str(&dump).unwrap();
262 assert_eq!(value, load);
263 assert!(load.spill_dir.is_none());
264 }
265
266 #[test]
267 fn test_custom() {
268 let custom = r#"
269 dir = "custom_dir"
270 spill-dir = "custom_spill_dir"
271 recovery-mode = "tolerate-tail-corruption"
272 bytes-per-sync = "2KB"
273 target-file-size = "1MB"
274 purge-threshold = "3MB"
275 format-version = 1
276 enable-log-recycle = false
277 prefill-for-recycle = false
278 "#;
279 let mut load: Config = toml::from_str(custom).unwrap();
280 assert_eq!(load.dir, "custom_dir");
281 assert_eq!(load.spill_dir, Some("custom_spill_dir".to_owned()));
282 assert_eq!(load.recovery_mode, RecoveryMode::TolerateTailCorruption);
283 assert_eq!(load.bytes_per_sync, Some(ReadableSize::kb(2)));
284 assert_eq!(load.target_file_size, ReadableSize::mb(1));
285 assert_eq!(load.purge_threshold, ReadableSize::mb(3));
286 assert_eq!(load.format_version, Version::V1);
287 assert_eq!(load.enable_log_recycle, false);
288 assert_eq!(load.prefill_for_recycle, false);
289 load.sanitize().unwrap();
290 }
291
292 #[test]
293 fn test_invalid() {
294 let hard_error = r#"
295 target-file-size = "5MB"
296 purge-threshold = "3MB"
297 "#;
298 let mut hard_load: Config = toml::from_str(hard_error).unwrap();
299 assert!(hard_load.sanitize().is_err());
300
301 let soft_error = r#"
302 recovery-read-block-size = 1
303 recovery-threads = 0
304 target-file-size = "5000MB"
305 format-version = 2
306 enable-log-recycle = true
307 prefill-for-recycle = true
308 "#;
309 let soft_load: Config = toml::from_str(soft_error).unwrap();
310 assert!(soft_load.recovery_read_block_size.0 < MIN_RECOVERY_READ_BLOCK_SIZE as u64);
311 assert!(soft_load.recovery_threads < MIN_RECOVERY_THREADS);
312 let mut soft_sanitized = soft_load;
313 soft_sanitized.sanitize().unwrap();
314 assert!(soft_sanitized.recovery_read_block_size.0 >= MIN_RECOVERY_READ_BLOCK_SIZE as u64);
315 assert!(soft_sanitized.recovery_threads >= MIN_RECOVERY_THREADS);
316 assert_eq!(
317 soft_sanitized.purge_rewrite_threshold.unwrap(),
318 soft_sanitized.target_file_size
319 );
320
321 let recycle_error = r#"
322 enable-log-recycle = true
323 format-version = 1
324 "#;
325 let mut cfg_load: Config = toml::from_str(recycle_error).unwrap();
326 assert!(cfg_load.sanitize().is_err());
327
328 let prefill_error = r#"
329 enable-log-recycle = false
330 prefill-for-recycle = true
331 format-version = 2
332 "#;
333 let mut cfg_load: Config = toml::from_str(prefill_error).unwrap();
334 assert!(cfg_load.sanitize().is_err());
335 }
336
337 #[test]
338 fn test_backward_compactibility() {
339 let old = r#"
341 recovery-mode = "tolerate-corrupted-tail-records"
342 "#;
343 let mut load: Config = toml::from_str(old).unwrap();
344 load.sanitize().unwrap();
345 assert!(toml::to_string(&load)
347 .unwrap()
348 .contains("tolerate-corrupted-tail-records"));
349 }
350
351 #[test]
352 fn test_prefill_for_recycle() {
353 let default_prefill_v1 = r#"
354 enable-log-recycle = true
355 prefill-for-recycle = true
356 "#;
357 let mut cfg_load: Config = toml::from_str(default_prefill_v1).unwrap();
358 assert!(cfg_load.sanitize().is_ok());
359 assert_eq!(cfg_load.prefill_limit.unwrap(), cfg_load.purge_threshold);
360
361 let default_prefill_v2 = r#"
362 enable-log-recycle = true
363 prefill-for-recycle = false
364 prefill-limit = "20GB"
365 "#;
366 let mut cfg_load: Config = toml::from_str(default_prefill_v2).unwrap();
367 assert!(cfg_load.sanitize().is_ok());
368 assert!(cfg_load.prefill_limit.is_none());
369 }
370}