use crate::{
AbstractTree, CheckpointInfo,
file::{BLOBS_FOLDER, CURRENT_VERSION_FILE, TABLES_FOLDER, fsync_directory},
fs::{Fs, FsFile, FsOpenOptions, SyncMode},
version::Version,
vlog::BlobFile,
};
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, string::String, vec::Vec};
use alloc::{sync::Arc, vec};
use std::{
io::{Read, Write},
path::Path,
};
fn table_link_name(id: crate::TableId) -> String {
id.to_string()
}
fn blob_link_name(id: crate::vlog::BlobFileId) -> String {
id.to_string()
}
pub fn prepare_target(target: &Path, include_blobs: bool, target_fs: &dyn Fs) -> crate::Result<()> {
target_fs.create_dir(target).map_err(|e| {
if e.kind() == crate::io::ErrorKind::AlreadyExists {
crate::io::Error::new(
crate::io::ErrorKind::AlreadyExists,
format!(
"checkpoint target {} already exists; refusing to overwrite",
target.display(),
),
)
} else {
e
}
})?;
let mut cleanup = RootCleanup {
target,
fs: target_fs,
armed: true,
};
target_fs.create_dir(&target.join(TABLES_FOLDER))?;
if include_blobs {
target_fs.create_dir(&target.join(BLOBS_FOLDER))?;
}
cleanup.armed = false;
Ok(())
}
struct RootCleanup<'a> {
target: &'a Path,
fs: &'a dyn Fs,
armed: bool,
}
impl Drop for RootCleanup<'_> {
fn drop(&mut self) {
if self.armed
&& let Err(e) = self.fs.remove_dir_all(self.target)
{
log::warn!(
"Failed to clean up partial checkpoint target {}: {e:?}",
self.target.display(),
);
}
}
}
pub fn link_or_copy_cross_fs(
src_fs: &Arc<dyn Fs>,
src: &Path,
dst_fs: &Arc<dyn Fs>,
dst: &Path,
sync_mode: SyncMode,
use_reflink: bool,
) -> std::io::Result<u64> {
let shared_namespace = match (src_fs.backend_id(), dst_fs.backend_id()) {
(Some(a), Some(b)) => a == b,
_ => false,
};
if use_reflink
&& shared_namespace
&& dst.parent().is_some_and(|p| dst_fs.capabilities(p).reflink)
{
dst_fs.reflink_file(src, dst)?;
return Ok(dst_fs.metadata(dst)?.len);
}
if shared_namespace {
match dst_fs.hard_link(src, dst) {
Ok(()) => return Ok(dst_fs.metadata(dst)?.len),
Err(e)
if crate::fs::is_cross_device(&e)
|| e.kind() == crate::io::ErrorKind::Unsupported
|| e.kind() == crate::io::ErrorKind::NotFound =>
{
log::debug!(
"link_or_copy_cross_fs({}, {}) falling back to streamed copy ({})",
src.display(),
dst.display(),
e.kind(),
);
}
Err(e) => return Err(e.into()),
}
} else {
log::debug!(
"link_or_copy_cross_fs({}, {}) crossing namespaces — streaming copy",
src.display(),
dst.display(),
);
}
let mut src_file = src_fs.open(src, &FsOpenOptions::new().read(true))?;
let mut dst_file = dst_fs.open(dst, &FsOpenOptions::new().write(true).create_new(true))?;
let result: std::io::Result<u64> = (|| {
let mut buf = vec![0u8; 64 * 1024].into_boxed_slice();
let mut total: u64 = 0;
loop {
let n = match src_file.read(&mut buf) {
Ok(0) => break,
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
#[expect(
clippy::indexing_slicing,
reason = "n was just produced by read() and is bounded by buf.len()"
)]
dst_file.write_all(&buf[..n])?;
total += n as u64;
}
dst_file.flush()?;
FsFile::sync_all_with(&*dst_file, sync_mode)?;
Ok(total)
})();
match result {
Ok(total) => Ok(total),
Err(e) => {
drop(dst_file);
let _ = dst_fs.remove_file(dst);
Err(e)
}
}
}
pub fn link_tables(
version: &Version,
target_root: &Path,
target_fs: &Arc<dyn Fs>,
sync_mode: SyncMode,
use_reflink: bool,
) -> crate::Result<(usize, u64)> {
let tables_dir = target_root.join(TABLES_FOLDER);
let mut count = 0usize;
let mut bytes: u64 = 0;
for table in version.iter_tables() {
let dst = tables_dir.join(table_link_name(table.id()));
let written = link_or_copy_cross_fs(
&table.fs,
&table.path,
target_fs,
&dst,
sync_mode,
use_reflink,
)
.map_err(crate::Error::from)?;
bytes += written;
count += 1;
}
Ok((count, bytes))
}
pub fn link_blob_files(
blob_files: impl IntoIterator<Item = BlobFile>,
target_root: &Path,
target_fs: &Arc<dyn Fs>,
sync_mode: SyncMode,
use_reflink: bool,
) -> crate::Result<(usize, u64)> {
let blobs_dir = target_root.join(BLOBS_FOLDER);
let mut count = 0usize;
let mut bytes: u64 = 0;
for blob in blob_files {
let dst = blobs_dir.join(blob_link_name(blob.id()));
let written = link_or_copy_cross_fs(
&blob.0.fs,
&blob.0.path,
target_fs,
&dst,
sync_mode,
use_reflink,
)
.map_err(crate::Error::from)?;
bytes += written;
count += 1;
}
Ok((count, bytes))
}
fn copy_metadata_file_optional(
src_fs: &dyn Fs,
src_root: &Path,
target_fs: &dyn Fs,
target_root: &Path,
file_name: &str,
sync_mode: SyncMode,
) -> crate::Result<()> {
let src = src_root.join(file_name);
let mut src_file = match src_fs.open(&src, &FsOpenOptions::new().read(true)) {
Ok(f) => f,
Err(e) if e.kind() == crate::io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(e.into()),
};
let dst = target_root.join(file_name);
let mut dst_file = target_fs.open(&dst, &FsOpenOptions::new().write(true).create_new(true))?;
std::io::copy(&mut src_file, &mut dst_file)?;
dst_file.flush()?;
FsFile::sync_all_with(&*dst_file, sync_mode)?;
Ok(())
}
fn write_current_for_version(
target_fs: &dyn Fs,
target_root: &Path,
version_id: u64,
runtime: Arc<crate::runtime_config::RuntimeConfig>,
encryption: Option<Arc<dyn crate::encryption::EncryptionProvider>>,
sync_mode: SyncMode,
) -> crate::Result<()> {
use crate::checksum::ChecksumType;
use crate::file::rewrite_atomic;
use crate::io::{LittleEndian, WriteBytesExt};
use crate::manifest_blocks::{current_digest, reader::ManifestArchiveReader};
let manifest_path = target_root.join(format!("v{version_id}"));
let archive = ManifestArchiveReader::open(&manifest_path, target_fs, runtime, encryption)?;
let checksum = current_digest::compute(version_id, archive.footer())?;
let mut content = vec![];
content.write_u64::<LittleEndian>(version_id)?;
content.write_u128::<LittleEndian>(checksum)?;
content.write_u8(u8::from(ChecksumType::Xxh3))?;
rewrite_atomic(
&target_root.join(CURRENT_VERSION_FILE),
&content,
target_fs,
sync_mode,
)?;
Ok(())
}
#[expect(
clippy::too_many_arguments,
reason = "checkpoint metadata copy threads (src fs+root, target fs+root, version, \
comparator, runtime, encryption) — every parameter is load-bearing and \
wrapping into a struct would just move the count to the struct literal"
)]
pub fn copy_metadata(
src_fs: &dyn Fs,
src_root: &Path,
target_fs: &dyn Fs,
target_root: &Path,
version: &crate::version::Version,
comparator_name: &str,
runtime: std::sync::Arc<crate::runtime_config::RuntimeConfig>,
encryption: Option<std::sync::Arc<dyn crate::encryption::EncryptionProvider>>,
sync_mode: SyncMode,
) -> crate::Result<()> {
copy_metadata_file_optional(
src_fs,
src_root,
target_fs,
target_root,
"manifest",
sync_mode,
)?;
crate::version::persist_version(
target_root,
version,
comparator_name,
target_fs,
Arc::clone(&runtime),
encryption.clone(),
sync_mode,
)?;
write_current_for_version(
target_fs,
target_root,
version.id(),
runtime,
encryption,
sync_mode,
)?;
Ok(())
}
pub struct CheckpointParams<'a> {
pub target_root: &'a Path,
pub target_fs: &'a Arc<dyn Fs>,
pub src_root: &'a Path,
pub src_fs: &'a Arc<dyn Fs>,
pub deletion_pause: &'a Arc<crate::deletion_pause::DeletionPause>,
pub visible_seqno: &'a crate::seqno::SharedSequenceNumberGenerator,
pub include_blobs: bool,
pub runtime_config: Arc<crate::runtime_config::RuntimeConfig>,
pub encryption: Option<Arc<dyn crate::encryption::EncryptionProvider>>,
}
struct PartialCheckpointGuard<'a> {
target_root: &'a Path,
target_fs: &'a Arc<dyn Fs>,
armed: bool,
}
impl<'a> PartialCheckpointGuard<'a> {
fn new(target_root: &'a Path, target_fs: &'a Arc<dyn Fs>) -> Self {
Self {
target_root,
target_fs,
armed: true,
}
}
fn commit(mut self) {
self.armed = false;
}
}
impl Drop for PartialCheckpointGuard<'_> {
fn drop(&mut self) {
if !self.armed {
return;
}
if let Err(e) = self.target_fs.remove_dir_all(self.target_root) {
log::warn!(
"Failed to clean up partial checkpoint at {}: {e:?}",
self.target_root.display(),
);
}
}
}
pub fn run_checkpoint<T: AbstractTree>(
tree: &T,
params: &CheckpointParams<'_>,
) -> crate::Result<CheckpointInfo> {
let target_fs = params.target_fs;
let src_root = params.src_root;
let src_fs = params.src_fs;
let deletion_pause = params.deletion_pause;
let visible_seqno = params.visible_seqno;
let include_blobs = params.include_blobs;
let normalized_target: std::path::PathBuf = params
.target_root
.components()
.filter(|c| !matches!(c, std::path::Component::CurDir))
.collect();
if normalized_target.as_os_str().is_empty() {
return Err(crate::Error::from(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"checkpoint target_root must name at least one path component",
)));
}
let target_root = normalized_target.as_path();
prepare_target(target_root, include_blobs, &**target_fs)?;
let cleanup = PartialCheckpointGuard::new(target_root, target_fs);
let _pause = deletion_pause.acquire();
let captured_seqno = visible_seqno.get();
tree.flush_active_memtable(0)?;
let version = tree.current_version();
let sync_mode = tree.tree_config().sync_mode;
let use_reflink = params.runtime_config.use_reflink_for_checkpoint;
let (sst_files, sst_bytes) =
link_tables(&version, target_root, target_fs, sync_mode, use_reflink)?;
let (blob_files, blob_bytes) = if include_blobs {
link_blob_files(
version.blob_files.iter().cloned(),
target_root,
target_fs,
sync_mode,
use_reflink,
)?
} else {
(0, 0)
};
copy_metadata(
&**src_fs,
src_root,
&**target_fs,
target_root,
&version,
tree.tree_config().comparator.name(),
Arc::clone(¶ms.runtime_config),
params.encryption.clone(),
sync_mode,
)?;
fsync_directory(&target_root.join(TABLES_FOLDER), &**target_fs, sync_mode)?;
if include_blobs {
fsync_directory(&target_root.join(BLOBS_FOLDER), &**target_fs, sync_mode)?;
}
fsync_directory(target_root, &**target_fs, sync_mode)?;
if let Some(parent) = target_root.parent()
&& !parent.as_os_str().is_empty()
{
fsync_directory(parent, &**target_fs, sync_mode)?;
}
cleanup.commit();
Ok(CheckpointInfo {
sst_files,
blob_files,
total_bytes: sst_bytes + blob_bytes,
version_id: version.id(),
seqno: captured_seqno,
})
}
#[cfg(test)]
#[expect(clippy::unwrap_used, reason = "test code")]
mod tests;