raft_engine/
config.rs

1// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
2
3use log::{info, warn};
4use serde::{Deserialize, Serialize};
5
6use crate::pipe_log::Version;
7use crate::{util::ReadableSize, Result};
8
9const MIN_RECOVERY_READ_BLOCK_SIZE: usize = 512;
10const MIN_RECOVERY_THREADS: usize = 1;
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "kebab-case")]
14pub enum RecoveryMode {
15    AbsoluteConsistency,
16    // For backward compatibility.
17    #[serde(
18        alias = "tolerate-corrupted-tail-records",
19        rename(serialize = "tolerate-corrupted-tail-records")
20    )]
21    TolerateTailCorruption,
22    TolerateAnyCorruption,
23}
24
25#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
26#[serde(default)]
27#[serde(rename_all = "kebab-case")]
28pub struct Config {
29    /// Main directory to store log files. Will create on startup if not exists.
30    ///
31    /// Default: ""
32    pub dir: String,
33
34    /// Auxiliary directory to store log files. Will create on startup if
35    /// set but not exists.
36    ///
37    /// Newly logs will be put into this dir when the main `dir` is full
38    /// and no spare space for new logs.
39    ///
40    /// Default: None
41    pub spill_dir: Option<String>,
42
43    /// How to deal with file corruption during recovery.
44    ///
45    /// Default: "tolerate-tail-corruption".
46    pub recovery_mode: RecoveryMode,
47    /// Minimum I/O size for reading log files during recovery.
48    ///
49    /// Default: "16KB". Minimum: "512B".
50    pub recovery_read_block_size: ReadableSize,
51    /// The number of threads used to scan and recovery log files.
52    ///
53    /// Default: 4. Minimum: 1.
54    pub recovery_threads: usize,
55
56    /// Compress a log batch if its size exceeds this value. Setting it to zero
57    /// disables compression.
58    ///
59    /// Default: "8KB"
60    pub batch_compression_threshold: ReadableSize,
61    /// Acceleration factor for LZ4 compression. It can be fine tuned, with each
62    /// successive value providing roughly +~3% to speed. The value will be
63    /// capped within [1, 65537] by LZ4.
64    ///
65    /// Default: 1.
66    pub compression_level: Option<usize>,
67    /// Deprecated.
68    /// Incrementally sync log files after specified bytes have been written.
69    /// Setting it to zero disables incremental sync.
70    ///
71    /// Default: "4MB"
72    pub bytes_per_sync: Option<ReadableSize>,
73
74    /// Version of the log file.
75    ///
76    /// Default: 2
77    pub format_version: Version,
78
79    /// Target file size for rotating log files.
80    ///
81    /// Default: "128MB"
82    pub target_file_size: ReadableSize,
83
84    /// Purge append log queue if its size exceeds this value.
85    ///
86    /// Default: "10GB"
87    pub purge_threshold: ReadableSize,
88    /// Purge rewrite log queue if its size exceeds this value.
89    ///
90    /// Default: MAX(`purge_threshold` / 10, `target_file_size`)
91    pub purge_rewrite_threshold: Option<ReadableSize>,
92    /// Purge rewrite log queue if its garbage ratio exceeds this value.
93    ///
94    /// Default: "0.6"
95    pub purge_rewrite_garbage_ratio: f64,
96
97    /// Maximum memory bytes allowed for the in-memory index.
98    /// Effective under the `swap` feature only.
99    ///
100    /// Default: None
101    pub memory_limit: Option<ReadableSize>,
102
103    /// Whether to recycle stale log files.
104    /// If `true`, logically purged log files will be reserved for recycling.
105    /// Only available for `format_version` 2 and above.
106    ///
107    /// Default: true
108    pub enable_log_recycle: bool,
109
110    /// Whether to prepare log files for recycling when start.
111    /// If `true`, batch empty log files will be prepared for recycling when
112    /// starting engine.
113    /// Only available for `enable-log-reycle` is true.
114    ///
115    /// Default: false
116    pub prefill_for_recycle: bool,
117
118    /// Maximum capacity for preparing log files for recycling when start.
119    /// If `None`, its size is equal to `purge-threshold`*1.5.
120    /// Only available for `prefill-for-recycle` is true.
121    ///
122    /// Default: None
123    pub prefill_limit: Option<ReadableSize>,
124}
125
126impl Default for Config {
127    fn default() -> Config {
128        #[allow(unused_mut)]
129        let mut cfg = Config {
130            dir: "".to_owned(),
131            spill_dir: None,
132            recovery_mode: RecoveryMode::TolerateTailCorruption,
133            recovery_read_block_size: ReadableSize::kb(16),
134            recovery_threads: 4,
135            batch_compression_threshold: ReadableSize::kb(8),
136            compression_level: None,
137            bytes_per_sync: None,
138            format_version: Version::V2,
139            target_file_size: ReadableSize::mb(128),
140            purge_threshold: ReadableSize::gb(10),
141            purge_rewrite_threshold: None,
142            purge_rewrite_garbage_ratio: 0.6,
143            memory_limit: None,
144            enable_log_recycle: true,
145            prefill_for_recycle: false,
146            prefill_limit: None,
147        };
148        // Test-specific configurations.
149        #[cfg(test)]
150        {
151            cfg.memory_limit = Some(ReadableSize(0));
152        }
153        cfg
154    }
155}
156
157impl Config {
158    pub fn sanitize(&mut self) -> Result<()> {
159        if self.purge_threshold.0 < self.target_file_size.0 {
160            return Err(box_err!("purge-threshold < target-file-size"));
161        }
162        if self.purge_rewrite_threshold.is_none() {
163            self.purge_rewrite_threshold = Some(ReadableSize(std::cmp::max(
164                self.purge_threshold.0 / 10,
165                self.target_file_size.0,
166            )));
167        }
168        if self.bytes_per_sync.is_some() {
169            warn!("bytes-per-sync has been deprecated.");
170        }
171        let min_recovery_read_block_size = ReadableSize(MIN_RECOVERY_READ_BLOCK_SIZE as u64);
172        if self.recovery_read_block_size < min_recovery_read_block_size {
173            warn!(
174                "recovery-read-block-size ({}) is too small, setting it to {min_recovery_read_block_size}",
175                self.recovery_read_block_size
176            );
177            self.recovery_read_block_size = min_recovery_read_block_size;
178        }
179        if self.recovery_threads < MIN_RECOVERY_THREADS {
180            warn!(
181                "recovery-threads ({}) is too small, setting it to {MIN_RECOVERY_THREADS}",
182                self.recovery_threads
183            );
184            self.recovery_threads = MIN_RECOVERY_THREADS;
185        }
186        if self.enable_log_recycle && !self.format_version.has_log_signing() {
187            return Err(box_err!(
188                "format version {} doesn't support log recycle, use 2 or above",
189                self.format_version
190            ));
191        }
192        if !self.enable_log_recycle && self.prefill_for_recycle {
193            return Err(box_err!(
194                "prefill is not allowed when log recycle is disabled"
195            ));
196        }
197        if !self.prefill_for_recycle && self.prefill_limit.is_some() {
198            warn!("prefill-limit will be ignored when prefill is disabled");
199            self.prefill_limit = None;
200        }
201        if self.prefill_for_recycle && self.prefill_limit.is_none() {
202            info!("prefill-limit will be calibrated to purge-threshold");
203            self.prefill_limit = Some(self.purge_threshold);
204        }
205        #[cfg(not(feature = "swap"))]
206        if self.memory_limit.is_some() {
207            warn!("memory-limit will be ignored because swap feature is disabled");
208        }
209        Ok(())
210    }
211
212    /// Returns the capacity for recycling log files.
213    pub(crate) fn recycle_capacity(&self) -> usize {
214        // Attention please, log files with Version::V1 could not be recycled, it might
215        // cause LogBatchs in a mess in the recycled file, where the reader might get
216        // an obsolete entries (unexpected) from the recycled file.
217        if !self.format_version.has_log_signing() {
218            return 0;
219        }
220        if self.enable_log_recycle && self.purge_threshold.0 >= self.target_file_size.0 {
221            // (1) At most u32::MAX so that the file number can be capped into an u32
222            // without colliding. (2) Increase the threshold by 50% to add some more file
223            // as an additional buffer to avoid jitters.
224            std::cmp::min(
225                (self.purge_threshold.0 / self.target_file_size.0) as usize * 3 / 2,
226                u32::MAX as usize,
227            )
228        } else {
229            0
230        }
231    }
232
233    /// Returns the capacity for preparing log files for recycling when start.
234    pub(crate) fn prefill_capacity(&self) -> usize {
235        // Attention please, log files with Version::V1 could not be recycled, so it's
236        // useless for prefill.
237        if !self.enable_log_recycle || !self.format_version.has_log_signing() {
238            return 0;
239        }
240        let prefill_limit = self.prefill_limit.unwrap_or(ReadableSize(0)).0;
241        if self.prefill_for_recycle && prefill_limit >= self.target_file_size.0 {
242            // Keep same with the maximum setting of `recycle_capacity`.
243            std::cmp::min(
244                (prefill_limit / self.target_file_size.0) as usize * 3 / 2,
245                u32::MAX as usize,
246            )
247        } else {
248            0
249        }
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    #[test]
258    fn test_serde() {
259        let value = Config::default();
260        let dump = toml::to_string_pretty(&value).unwrap();
261        let load = toml::from_str(&dump).unwrap();
262        assert_eq!(value, load);
263        assert!(load.spill_dir.is_none());
264    }
265
266    #[test]
267    fn test_custom() {
268        let custom = r#"
269            dir = "custom_dir"
270            spill-dir = "custom_spill_dir"
271            recovery-mode = "tolerate-tail-corruption"
272            bytes-per-sync = "2KB"
273            target-file-size = "1MB"
274            purge-threshold = "3MB"
275            format-version = 1
276            enable-log-recycle = false
277            prefill-for-recycle = false
278        "#;
279        let mut load: Config = toml::from_str(custom).unwrap();
280        assert_eq!(load.dir, "custom_dir");
281        assert_eq!(load.spill_dir, Some("custom_spill_dir".to_owned()));
282        assert_eq!(load.recovery_mode, RecoveryMode::TolerateTailCorruption);
283        assert_eq!(load.bytes_per_sync, Some(ReadableSize::kb(2)));
284        assert_eq!(load.target_file_size, ReadableSize::mb(1));
285        assert_eq!(load.purge_threshold, ReadableSize::mb(3));
286        assert_eq!(load.format_version, Version::V1);
287        assert_eq!(load.enable_log_recycle, false);
288        assert_eq!(load.prefill_for_recycle, false);
289        load.sanitize().unwrap();
290    }
291
292    #[test]
293    fn test_invalid() {
294        let hard_error = r#"
295            target-file-size = "5MB"
296            purge-threshold = "3MB"
297        "#;
298        let mut hard_load: Config = toml::from_str(hard_error).unwrap();
299        assert!(hard_load.sanitize().is_err());
300
301        let soft_error = r#"
302            recovery-read-block-size = 1
303            recovery-threads = 0
304            target-file-size = "5000MB"
305            format-version = 2
306            enable-log-recycle = true
307            prefill-for-recycle = true
308        "#;
309        let soft_load: Config = toml::from_str(soft_error).unwrap();
310        assert!(soft_load.recovery_read_block_size.0 < MIN_RECOVERY_READ_BLOCK_SIZE as u64);
311        assert!(soft_load.recovery_threads < MIN_RECOVERY_THREADS);
312        let mut soft_sanitized = soft_load;
313        soft_sanitized.sanitize().unwrap();
314        assert!(soft_sanitized.recovery_read_block_size.0 >= MIN_RECOVERY_READ_BLOCK_SIZE as u64);
315        assert!(soft_sanitized.recovery_threads >= MIN_RECOVERY_THREADS);
316        assert_eq!(
317            soft_sanitized.purge_rewrite_threshold.unwrap(),
318            soft_sanitized.target_file_size
319        );
320
321        let recycle_error = r#"
322            enable-log-recycle = true
323            format-version = 1
324        "#;
325        let mut cfg_load: Config = toml::from_str(recycle_error).unwrap();
326        assert!(cfg_load.sanitize().is_err());
327
328        let prefill_error = r#"
329            enable-log-recycle = false
330            prefill-for-recycle = true
331            format-version = 2
332        "#;
333        let mut cfg_load: Config = toml::from_str(prefill_error).unwrap();
334        assert!(cfg_load.sanitize().is_err());
335    }
336
337    #[test]
338    fn test_backward_compactibility() {
339        // Upgrade from older version.
340        let old = r#"
341            recovery-mode = "tolerate-corrupted-tail-records"
342        "#;
343        let mut load: Config = toml::from_str(old).unwrap();
344        load.sanitize().unwrap();
345        // Downgrade to older version.
346        assert!(toml::to_string(&load)
347            .unwrap()
348            .contains("tolerate-corrupted-tail-records"));
349    }
350
351    #[test]
352    fn test_prefill_for_recycle() {
353        let default_prefill_v1 = r#"
354            enable-log-recycle = true
355            prefill-for-recycle = true
356        "#;
357        let mut cfg_load: Config = toml::from_str(default_prefill_v1).unwrap();
358        assert!(cfg_load.sanitize().is_ok());
359        assert_eq!(cfg_load.prefill_limit.unwrap(), cfg_load.purge_threshold);
360
361        let default_prefill_v2 = r#"
362            enable-log-recycle = true
363            prefill-for-recycle = false
364            prefill-limit = "20GB"
365        "#;
366        let mut cfg_load: Config = toml::from_str(default_prefill_v2).unwrap();
367        assert!(cfg_load.sanitize().is_ok());
368        assert!(cfg_load.prefill_limit.is_none());
369    }
370}