1use std::fs::{copy, create_dir_all};
4use std::path::Path;
5use std::sync::Arc;
6
7#[cfg(not(windows))]
8use std::os::unix::fs::symlink;
9#[cfg(windows)]
10use std::os::windows::fs::symlink_file as symlink;
11
12use crate::config::{Config, RecoveryMode};
13use crate::env::FileSystem;
14use crate::file_pipe_log::{FileNameExt, FilePipeLog, FilePipeLogBuilder};
15use crate::pipe_log::{FileId, LogQueue};
16use crate::Engine;
17
18#[derive(Default)]
20pub struct CopyDetails {
21 pub copied: Vec<String>,
23 pub symlinked: Vec<String>,
25}
26
27impl<F: FileSystem> Engine<F, FilePipeLog<F>> {
28 pub fn fork<T: AsRef<Path>>(
46 source: &Config,
47 fs: Arc<F>,
48 target: T,
49 ) -> Result<CopyDetails, String> {
50 minimum_copy(source, fs, target)
51 }
52}
53
54fn minimum_copy<F, P>(cfg: &Config, fs: Arc<F>, target: P) -> Result<CopyDetails, String>
55where
56 F: FileSystem,
57 P: AsRef<Path>,
58{
59 if cfg.enable_log_recycle {
60 return Err("enable_log_recycle should be false".to_owned());
61 }
62 if cfg.recovery_mode == RecoveryMode::TolerateAnyCorruption {
63 return Err("recovery_mode shouldn't be TolerateAnyCorruption".to_owned());
64 }
65
66 let mut cfg = cfg.clone();
67 cfg.sanitize()
68 .map_err(|e| format!("sanitize config: {e}"))?;
69
70 create_dir_all(&target)
71 .map_err(|e| format!("create_dir_all({}): {e}", target.as_ref().display()))?;
72
73 let mut builder = FilePipeLogBuilder::new(cfg.clone(), fs, vec![]);
74 builder
75 .scan_and_sort(false)
76 .map_err(|e| format!("scan files: {e}"))?;
77
78 let mut details = CopyDetails::default();
80 for (queue, files) in [
81 (LogQueue::Append, &builder.append_file_names),
82 (LogQueue::Rewrite, &builder.rewrite_file_names),
83 ] {
84 let count = files.len();
85 for (i, f) in files.iter().enumerate() {
86 let src: &Path = f.path.as_ref();
87 let dst = FileId::new(queue, f.seq).build_file_path(&target);
88 if i < count - 1 {
89 symlink(src, &dst)
90 .map_err(|e| format!("symlink({}, {}): {e}", src.display(), dst.display()))?;
91 let path = dst.canonicalize().unwrap().to_str().unwrap().to_owned();
92 details.symlinked.push(path);
93 } else {
94 copy(src, &dst)
95 .map(|_| ())
96 .map_err(|e| format!("copy({}, {}): {e}", src.display(), dst.display()))?;
97 let path = dst.canonicalize().unwrap().to_str().unwrap().to_owned();
98 details.copied.push(path);
99 };
100 }
101 }
102
103 Ok(details)
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use crate::engine::tests::RaftLogEngine;
110 use crate::env::DefaultFileSystem;
111 use crate::{LogBatch, ReadableSize};
112 use std::path::PathBuf;
113
114 #[test]
115 fn test_fork() {
116 let dir = tempfile::Builder::new()
117 .prefix("test_engine_fork")
118 .tempdir()
119 .unwrap();
120
121 let mut source = PathBuf::from(dir.as_ref());
122 source.push("source");
123 let mut cfg = Config {
124 dir: source.to_str().unwrap().to_owned(),
125 target_file_size: ReadableSize::kb(1),
126 enable_log_recycle: false,
127 ..Default::default()
128 };
129 let engine = RaftLogEngine::open(cfg.clone()).unwrap();
130
131 let mut log_batch = LogBatch::default();
132 log_batch.put(1, vec![b'1'; 16], vec![b'v'; 1024]).unwrap();
133 engine.write(&mut log_batch, false).unwrap();
134 engine.purge_manager().must_rewrite_append_queue(None, None);
135
136 let mut log_batch = LogBatch::default();
137 log_batch.put(1, vec![b'2'; 16], vec![b'v'; 1024]).unwrap();
138 engine.write(&mut log_batch, false).unwrap();
139 engine.purge_manager().must_rewrite_append_queue(None, None);
140
141 let mut log_batch = LogBatch::default();
142 log_batch.put(1, vec![b'3'; 16], vec![b'v'; 1024]).unwrap();
143 engine.write(&mut log_batch, false).unwrap();
144
145 let mut log_batch = LogBatch::default();
146 log_batch.put(1, vec![b'4'; 16], vec![b'v'; 1024]).unwrap();
147 engine.write(&mut log_batch, false).unwrap();
148
149 let mut target = PathBuf::from(dir.as_ref());
150 target.push("target");
151 Engine::<_, _>::fork(&cfg, Arc::new(DefaultFileSystem), &target).unwrap();
152 cfg.dir = target.to_str().unwrap().to_owned();
153 let engine1 = RaftLogEngine::open(cfg.clone()).unwrap();
154
155 assert!(engine1.get(1, vec![b'1'; 16].as_ref()).is_some());
156 assert!(engine1.get(1, vec![b'2'; 16].as_ref()).is_some());
157 assert!(engine1.get(1, vec![b'3'; 16].as_ref()).is_some());
158 assert!(engine1.get(1, vec![b'4'; 16].as_ref()).is_some());
159
160 let mut log_batch = LogBatch::default();
161 log_batch.put(1, vec![b'5'; 16], vec![b'v'; 1024]).unwrap();
162 engine.write(&mut log_batch, false).unwrap();
163
164 let mut log_batch = LogBatch::default();
165 log_batch.put(1, vec![b'6'; 16], vec![b'v'; 1024]).unwrap();
166 engine1.write(&mut log_batch, false).unwrap();
167
168 assert!(engine.get(1, vec![b'5'; 16].as_ref()).is_some());
169 assert!(engine1.get(1, vec![b'6'; 16].as_ref()).is_some());
170
171 let mut target = PathBuf::from(dir.as_ref());
172 target.push("target-1");
173 let mut cfg1 = cfg.clone();
174 cfg1.enable_log_recycle = true;
175 assert!(Engine::<_, _>::fork(&cfg1, Arc::new(DefaultFileSystem), &target).is_err());
176 let mut cfg1 = cfg;
177 cfg1.recovery_mode = RecoveryMode::TolerateAnyCorruption;
178 assert!(Engine::<_, _>::fork(&cfg1, Arc::new(DefaultFileSystem), &target).is_err());
179 }
180}