raft_engine/
fork.rs

1// Copyright (c) 2023-present, PingCAP, Inc. Licensed under Apache-2.0.
2
3use std::fs::{copy, create_dir_all};
4use std::path::Path;
5use std::sync::Arc;
6
7#[cfg(not(windows))]
8use std::os::unix::fs::symlink;
9#[cfg(windows)]
10use std::os::windows::fs::symlink_file as symlink;
11
12use crate::config::{Config, RecoveryMode};
13use crate::env::FileSystem;
14use crate::file_pipe_log::{FileNameExt, FilePipeLog, FilePipeLogBuilder};
15use crate::pipe_log::{FileId, LogQueue};
16use crate::Engine;
17
18/// Returned by `Engine::fork`.
19#[derive(Default)]
20pub struct CopyDetails {
21    /// Paths of copied log files.
22    pub copied: Vec<String>,
23    /// Paths of symlinked log files.
24    pub symlinked: Vec<String>,
25}
26
27impl<F: FileSystem> Engine<F, FilePipeLog<F>> {
28    /// Make a copy from `source` to `target`. `source` should exists but
29    /// `target` shouldn't. And `source` shouldn't be opened, otherwise
30    /// data corruption can happen.
31    ///
32    /// *symlink* will be used if possbile, otherwise *copy* will be used
33    /// instead. Generally all inactive log files will be symlinked, but the
34    /// last active one will be copied.
35    ///
36    /// After the copy is made both of 2 engines can be started and run at the
37    /// same time.
38    ///
39    /// It reports errors if the source instance
40    ///   * is specified with `enable_log_recycle = true`. `source` and `target`
41    ///     can share log files, so log file reusing can cause data corruption.
42    ///   * is specified with `recovery_mode = TolerateAnyCorruption`, in which
43    ///     case *symlink* can't be use. Users should consider to copy the
44    ///     instance directly.
45    pub fn fork<T: AsRef<Path>>(
46        source: &Config,
47        fs: Arc<F>,
48        target: T,
49    ) -> Result<CopyDetails, String> {
50        minimum_copy(source, fs, target)
51    }
52}
53
54fn minimum_copy<F, P>(cfg: &Config, fs: Arc<F>, target: P) -> Result<CopyDetails, String>
55where
56    F: FileSystem,
57    P: AsRef<Path>,
58{
59    if cfg.enable_log_recycle {
60        return Err("enable_log_recycle should be false".to_owned());
61    }
62    if cfg.recovery_mode == RecoveryMode::TolerateAnyCorruption {
63        return Err("recovery_mode shouldn't be TolerateAnyCorruption".to_owned());
64    }
65
66    let mut cfg = cfg.clone();
67    cfg.sanitize()
68        .map_err(|e| format!("sanitize config: {e}"))?;
69
70    create_dir_all(&target)
71        .map_err(|e| format!("create_dir_all({}): {e}", target.as_ref().display()))?;
72
73    let mut builder = FilePipeLogBuilder::new(cfg.clone(), fs, vec![]);
74    builder
75        .scan_and_sort(false)
76        .map_err(|e| format!("scan files: {e}"))?;
77
78    // Iterate all log files and rewrite files.
79    let mut details = CopyDetails::default();
80    for (queue, files) in [
81        (LogQueue::Append, &builder.append_file_names),
82        (LogQueue::Rewrite, &builder.rewrite_file_names),
83    ] {
84        let count = files.len();
85        for (i, f) in files.iter().enumerate() {
86            let src: &Path = f.path.as_ref();
87            let dst = FileId::new(queue, f.seq).build_file_path(&target);
88            if i < count - 1 {
89                symlink(src, &dst)
90                    .map_err(|e| format!("symlink({}, {}): {e}", src.display(), dst.display()))?;
91                let path = dst.canonicalize().unwrap().to_str().unwrap().to_owned();
92                details.symlinked.push(path);
93            } else {
94                copy(src, &dst)
95                    .map(|_| ())
96                    .map_err(|e| format!("copy({}, {}): {e}", src.display(), dst.display()))?;
97                let path = dst.canonicalize().unwrap().to_str().unwrap().to_owned();
98                details.copied.push(path);
99            };
100        }
101    }
102
103    Ok(details)
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use crate::engine::tests::RaftLogEngine;
110    use crate::env::DefaultFileSystem;
111    use crate::{LogBatch, ReadableSize};
112    use std::path::PathBuf;
113
114    #[test]
115    fn test_fork() {
116        let dir = tempfile::Builder::new()
117            .prefix("test_engine_fork")
118            .tempdir()
119            .unwrap();
120
121        let mut source = PathBuf::from(dir.as_ref());
122        source.push("source");
123        let mut cfg = Config {
124            dir: source.to_str().unwrap().to_owned(),
125            target_file_size: ReadableSize::kb(1),
126            enable_log_recycle: false,
127            ..Default::default()
128        };
129        let engine = RaftLogEngine::open(cfg.clone()).unwrap();
130
131        let mut log_batch = LogBatch::default();
132        log_batch.put(1, vec![b'1'; 16], vec![b'v'; 1024]).unwrap();
133        engine.write(&mut log_batch, false).unwrap();
134        engine.purge_manager().must_rewrite_append_queue(None, None);
135
136        let mut log_batch = LogBatch::default();
137        log_batch.put(1, vec![b'2'; 16], vec![b'v'; 1024]).unwrap();
138        engine.write(&mut log_batch, false).unwrap();
139        engine.purge_manager().must_rewrite_append_queue(None, None);
140
141        let mut log_batch = LogBatch::default();
142        log_batch.put(1, vec![b'3'; 16], vec![b'v'; 1024]).unwrap();
143        engine.write(&mut log_batch, false).unwrap();
144
145        let mut log_batch = LogBatch::default();
146        log_batch.put(1, vec![b'4'; 16], vec![b'v'; 1024]).unwrap();
147        engine.write(&mut log_batch, false).unwrap();
148
149        let mut target = PathBuf::from(dir.as_ref());
150        target.push("target");
151        Engine::<_, _>::fork(&cfg, Arc::new(DefaultFileSystem), &target).unwrap();
152        cfg.dir = target.to_str().unwrap().to_owned();
153        let engine1 = RaftLogEngine::open(cfg.clone()).unwrap();
154
155        assert!(engine1.get(1, vec![b'1'; 16].as_ref()).is_some());
156        assert!(engine1.get(1, vec![b'2'; 16].as_ref()).is_some());
157        assert!(engine1.get(1, vec![b'3'; 16].as_ref()).is_some());
158        assert!(engine1.get(1, vec![b'4'; 16].as_ref()).is_some());
159
160        let mut log_batch = LogBatch::default();
161        log_batch.put(1, vec![b'5'; 16], vec![b'v'; 1024]).unwrap();
162        engine.write(&mut log_batch, false).unwrap();
163
164        let mut log_batch = LogBatch::default();
165        log_batch.put(1, vec![b'6'; 16], vec![b'v'; 1024]).unwrap();
166        engine1.write(&mut log_batch, false).unwrap();
167
168        assert!(engine.get(1, vec![b'5'; 16].as_ref()).is_some());
169        assert!(engine1.get(1, vec![b'6'; 16].as_ref()).is_some());
170
171        let mut target = PathBuf::from(dir.as_ref());
172        target.push("target-1");
173        let mut cfg1 = cfg.clone();
174        cfg1.enable_log_recycle = true;
175        assert!(Engine::<_, _>::fork(&cfg1, Arc::new(DefaultFileSystem), &target).is_err());
176        let mut cfg1 = cfg;
177        cfg1.recovery_mode = RecoveryMode::TolerateAnyCorruption;
178        assert!(Engine::<_, _>::fork(&cfg1, Arc::new(DefaultFileSystem), &target).is_err());
179    }
180}