use std::fs::{copy, create_dir_all};
use std::path::Path;
use std::sync::Arc;
#[cfg(not(windows))]
use std::os::unix::fs::symlink;
#[cfg(windows)]
use std::os::windows::fs::symlink_file as symlink;
use crate::config::{Config, RecoveryMode};
use crate::env::FileSystem;
use crate::file_pipe_log::{FileNameExt, FilePipeLog, FilePipeLogBuilder};
use crate::pipe_log::{FileId, LogQueue};
use crate::Engine;
#[derive(Default)]
pub struct CopyDetails {
pub copied: Vec<String>,
pub symlinked: Vec<String>,
}
impl<F: FileSystem> Engine<F, FilePipeLog<F>> {
pub fn fork<T: AsRef<Path>>(
source: &Config,
fs: Arc<F>,
target: T,
) -> Result<CopyDetails, String> {
minimum_copy(source, fs, target)
}
}
fn minimum_copy<F, P>(cfg: &Config, fs: Arc<F>, target: P) -> Result<CopyDetails, String>
where
F: FileSystem,
P: AsRef<Path>,
{
if cfg.enable_log_recycle {
return Err("enable_log_recycle should be false".to_owned());
}
if cfg.recovery_mode == RecoveryMode::TolerateAnyCorruption {
return Err("recovery_mode shouldn't be TolerateAnyCorruption".to_owned());
}
let mut cfg = cfg.clone();
cfg.sanitize()
.map_err(|e| format!("sanitize config: {e}"))?;
create_dir_all(&target)
.map_err(|e| format!("create_dir_all({}): {e}", target.as_ref().display()))?;
let mut builder = FilePipeLogBuilder::new(cfg.clone(), fs, vec![]);
builder
.scan_and_sort(false)
.map_err(|e| format!("scan files: {e}"))?;
let mut details = CopyDetails::default();
for (queue, files) in [
(LogQueue::Append, &builder.append_file_names),
(LogQueue::Rewrite, &builder.rewrite_file_names),
] {
let count = files.len();
for (i, f) in files.iter().enumerate() {
let src: &Path = f.path.as_ref();
let dst = FileId::new(queue, f.seq).build_file_path(&target);
if i < count - 1 {
symlink(src, &dst)
.map_err(|e| format!("symlink({}, {}): {e}", src.display(), dst.display()))?;
let path = dst.canonicalize().unwrap().to_str().unwrap().to_owned();
details.symlinked.push(path);
} else {
copy(src, &dst)
.map(|_| ())
.map_err(|e| format!("copy({}, {}): {e}", src.display(), dst.display()))?;
let path = dst.canonicalize().unwrap().to_str().unwrap().to_owned();
details.copied.push(path);
};
}
}
Ok(details)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::tests::RaftLogEngine;
use crate::env::DefaultFileSystem;
use crate::{LogBatch, ReadableSize};
use std::path::PathBuf;
#[test]
fn test_fork() {
let dir = tempfile::Builder::new()
.prefix("test_engine_fork")
.tempdir()
.unwrap();
let mut source = PathBuf::from(dir.as_ref());
source.push("source");
let mut cfg = Config {
dir: source.to_str().unwrap().to_owned(),
target_file_size: ReadableSize::kb(1),
enable_log_recycle: false,
..Default::default()
};
let engine = RaftLogEngine::open(cfg.clone()).unwrap();
let mut log_batch = LogBatch::default();
log_batch.put(1, vec![b'1'; 16], vec![b'v'; 1024]).unwrap();
engine.write(&mut log_batch, false).unwrap();
engine.purge_manager().must_rewrite_append_queue(None, None);
let mut log_batch = LogBatch::default();
log_batch.put(1, vec![b'2'; 16], vec![b'v'; 1024]).unwrap();
engine.write(&mut log_batch, false).unwrap();
engine.purge_manager().must_rewrite_append_queue(None, None);
let mut log_batch = LogBatch::default();
log_batch.put(1, vec![b'3'; 16], vec![b'v'; 1024]).unwrap();
engine.write(&mut log_batch, false).unwrap();
let mut log_batch = LogBatch::default();
log_batch.put(1, vec![b'4'; 16], vec![b'v'; 1024]).unwrap();
engine.write(&mut log_batch, false).unwrap();
let mut target = PathBuf::from(dir.as_ref());
target.push("target");
Engine::<_, _>::fork(&cfg, Arc::new(DefaultFileSystem), &target).unwrap();
cfg.dir = target.to_str().unwrap().to_owned();
let engine1 = RaftLogEngine::open(cfg.clone()).unwrap();
assert!(engine1.get(1, vec![b'1'; 16].as_ref()).is_some());
assert!(engine1.get(1, vec![b'2'; 16].as_ref()).is_some());
assert!(engine1.get(1, vec![b'3'; 16].as_ref()).is_some());
assert!(engine1.get(1, vec![b'4'; 16].as_ref()).is_some());
let mut log_batch = LogBatch::default();
log_batch.put(1, vec![b'5'; 16], vec![b'v'; 1024]).unwrap();
engine.write(&mut log_batch, false).unwrap();
let mut log_batch = LogBatch::default();
log_batch.put(1, vec![b'6'; 16], vec![b'v'; 1024]).unwrap();
engine1.write(&mut log_batch, false).unwrap();
assert!(engine.get(1, vec![b'5'; 16].as_ref()).is_some());
assert!(engine1.get(1, vec![b'6'; 16].as_ref()).is_some());
let mut target = PathBuf::from(dir.as_ref());
target.push("target-1");
let mut cfg1 = cfg.clone();
cfg1.enable_log_recycle = true;
assert!(Engine::<_, _>::fork(&cfg1, Arc::new(DefaultFileSystem), &target).is_err());
let mut cfg1 = cfg;
cfg1.recovery_mode = RecoveryMode::TolerateAnyCorruption;
assert!(Engine::<_, _>::fork(&cfg1, Arc::new(DefaultFileSystem), &target).is_err());
}
}