use std::collections::HashSet;
use std::fs::File;
use std::fs::Permissions;
use std::io::ErrorKind;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
use tempfile::TempDir;
use tracing::instrument;
use tracing::Level;
use umash::Fingerprint;
use crate::chain_error;
use crate::chain_info;
use crate::chain_warn;
use crate::drop_result;
use crate::filtered_io_error;
use crate::fresh_error;
use crate::fresh_info;
use crate::instance_id;
use crate::loader::Chunk;
use crate::manifest_schema::Manifest;
use crate::ofd_lock::OfdLock;
use crate::process_id::process_id;
use crate::replication_target::ReplicationTarget;
use crate::replication_target::ReplicationTargetList;
use crate::result::Result;
#[derive(Debug)]
pub(crate) struct ReplicationBuffer {
spooling_directory: PathBuf,
}
lazy_static::lazy_static! {
static ref DEFAULT_SPOOLING_DIRECTORY: RwLock<Option<PathBuf>> = Default::default();
}
const INSTANCE_ID_PROBE_RANGE: u64 = 2;
const ON_DISK_FORMAT_VERSION_SUFFIX: &str = "v2";
const DOT_TAP: &str = ".tap";
const STAGING: &str = "staging";
const READY: &str = "ready";
const CONSUMING: &str = "consuming";
const DOT_METADATA: &str = ".metadata";
const DOT_META_COPY_LOCK: &str = ".meta_copy_lock";
const CHUNKS: &str = "chunks";
const META: &str = "meta";
const SCRATCH: &str = "scratch";
const SUBDIRS: [&str; 3] = [CHUNKS, META, SCRATCH];
const SCRATCH_FILE_GRACE_PERIOD: Duration = Duration::from_secs(10);
const ORPHAN_TAP_FILE_GRACE_PERIOD: Duration = Duration::from_secs(24 * 3600);
const MAX_MANGLED_NAME_LENGTH: usize = 230;
#[instrument]
pub(crate) fn set_default_spooling_directory(
path: &Path,
permissions: std::fs::Permissions,
) -> Result<()> {
let mut default = DEFAULT_SPOOLING_DIRECTORY.write().unwrap();
if let Some(old_path) = &*default {
if old_path == path {
return Ok(());
}
return Err(fresh_info!(
"default spooling directory already set",
?old_path,
?path
));
}
if !path.exists() {
std::fs::create_dir_all(path)
.map_err(|e| chain_error!(e, "failed to create directory", ?path))?;
std::fs::set_permissions(path, permissions.clone())
.map_err(|e| chain_error!(e, "failed to update permissions", ?path, ?permissions))?;
}
*default = Some(path.into());
Ok(())
}
#[instrument(level = "debug", err)]
fn create_scratch_file(spool_dir: PathBuf) -> Result<(tempfile::NamedTempFile, PathBuf)> {
let mut scratch = spool_dir;
scratch.push(STAGING);
scratch.push(SCRATCH);
let temp = tempfile::Builder::new()
.prefix(&(process_id() + "."))
.suffix(".tmp")
.tempfile_in(&scratch)
.map_err(|e| chain_error!(e, "failed to create temporary file", ?scratch))?;
temp.as_file()
.set_permissions(Permissions::from_mode(0o444))
.map_err(|e| chain_error!(e, "failed to set file read-only", ?temp))?;
Ok((temp, scratch))
}
fn pseudo_unique_filename() -> String {
use rand::Rng;
use std::sync::atomic::AtomicU64;
static COUNTER: AtomicU64 = AtomicU64::new(0);
let pid = process_id();
let timestamp = match std::time::SystemTime::UNIX_EPOCH.elapsed() {
Ok(duration) => duration.as_micros(),
Err(_) => 0,
};
let seq = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let random = rand::thread_rng().gen::<u128>();
format!("{}.{}.{}.{:x}", pid, timestamp, seq, random)
}
fn append_subdirectory(mut base: PathBuf) -> PathBuf {
fn find_child(base: &Path) -> std::io::Result<Option<std::ffi::OsString>> {
if let Some(dirent) = std::fs::read_dir(base)?.flatten().next() {
Ok(Some(dirent.file_name()))
} else {
Ok(None)
}
}
match find_child(&base) {
Ok(Some(child)) => base.push(child),
_ => base.push(pseudo_unique_filename()),
}
base
}
#[instrument(level = "debug")]
pub(crate) fn tap_path_in_spool_prefix(spool_prefix: Option<PathBuf>) -> Option<PathBuf> {
let mut prefix = spool_prefix.or_else(|| DEFAULT_SPOOLING_DIRECTORY.read().unwrap().clone())?;
prefix.push(DOT_TAP);
Some(prefix)
}
#[instrument(level = "debug", err)]
pub(crate) fn tapped_manifest_path_in_spool_prefix(
spool_prefix: Option<PathBuf>,
source_db: &Path,
) -> Result<PathBuf> {
let mut tap = match tap_path_in_spool_prefix(spool_prefix) {
Some(path) => path,
None => {
return Err(fresh_error!(
"no spool_prefix provided or defaulted",
?source_db
))
}
};
tap.push(percent_encode_local_path_uri(source_db)?);
Ok(tap)
}
#[instrument(level = "debug", err)]
pub(crate) fn construct_tapped_manifest_path(
spool_dir: &Path,
source_db: &Path,
) -> Result<PathBuf> {
let base = spool_dir
.parent()
.map(Path::parent)
.flatten()
.ok_or_else(|| fresh_error!("invalid spool directory", ?spool_dir))?;
tapped_manifest_path_in_spool_prefix(Some(base.to_path_buf()), source_db)
}
#[instrument(level = "debug", err)]
pub(crate) fn tap_manifest_file(
spool_dir: &Path,
name: &std::ffi::OsStr,
file: &mut File,
) -> Result<()> {
use std::io::Seek;
let base = spool_dir
.parent()
.map(Path::parent)
.flatten()
.ok_or_else(|| fresh_error!("invalid spool directory", ?spool_dir))?;
let mut tap = base.to_path_buf();
tap.push(DOT_TAP);
std::fs::create_dir_all(&tap).map_err(|e| {
filtered_io_error!(e, ErrorKind::AlreadyExists => Level::DEBUG,
"failed to create .tap dir", dir=?tap)
})?;
std::fs::set_permissions(&tap, PermissionsExt::from_mode(0o777))
.map_err(|e| chain_error!(e, "failed to set .tap permissions", dir=?tap))?;
tap.push(name);
file.seek(std::io::SeekFrom::Start(0))
.map_err(|e| chain_error!(e, "failed to rewind manifest file", ?file, ?tap))?;
let (mut temp, _) = create_scratch_file(spool_dir.to_path_buf())?;
std::io::copy(file, &mut temp)
.map_err(|e| chain_error!(e, "failed to copy .tap file", ?tap))?;
temp.persist(&tap)
.map_err(|e| chain_warn!(e, "failed to tap updated manifest file"))?;
Ok(())
}
#[instrument(level = "debug", err)]
pub(crate) fn acquire_meta_copy_lock(spool_dir: PathBuf) -> Result<Option<OfdLock>> {
let mut lock_path = spool_dir;
lock_path.push(DOT_META_COPY_LOCK);
OfdLock::try_lock(&lock_path)
.map_err(|e| chain_error!(e, "failed to acquire meta copy lock", ?lock_path))
}
#[instrument(level = "debug", err)]
pub(crate) fn reset_meta_copy_lock(spool_dir: PathBuf) -> Result<()> {
let mut lock_path = spool_dir;
lock_path.push(DOT_META_COPY_LOCK);
std::fs::remove_file(&lock_path).map_err(|e| {
filtered_io_error!(e,
ErrorKind::NotFound => Level::DEBUG,
"failed to remove lock file", ?lock_path)
})
}
fn replace_slashes(input: &str) -> String {
const ESCAPED: percent_encoding::AsciiSet = percent_encoding::CONTROLS.add(b'#').add(b'%');
percent_encoding::utf8_percent_encode(input, &ESCAPED)
.map(|fragment| fragment.replace("/", "#"))
.collect()
}
pub(crate) fn restore_slashes(input: &str) -> Result<Option<String>> {
let slashified = input.replace("#", "/");
if input.as_bytes().len() > MAX_MANGLED_NAME_LENGTH {
Ok(None)
} else {
Ok(Some(
percent_encoding::percent_decode_str(&slashified)
.decode_utf8()
.map_err(|e| chain_info!(e, "invalid utf-8 bytes"))?
.to_string(),
))
}
}
fn pseudo_uniquely_fit_string_in_filename(input: String) -> String {
if input.as_bytes().len() < MAX_MANGLED_NAME_LENGTH {
return input;
}
lazy_static::lazy_static! {
static ref MANGLE_PARAMS: umash::Params = umash::Params::derive(0, b"verneuil path mangling params");
}
let fprint = MANGLE_PARAMS
.fingerprinter(0)
.write(input.as_bytes())
.digest();
let fragment_len = 5 + (MAX_MANGLED_NAME_LENGTH - 32) / 2;
let left = String::from_utf8_lossy(&input.as_bytes()[..fragment_len]);
let right = String::from_utf8_lossy(&input.as_bytes()[input.len() - fragment_len..]);
format!(
"{}{:016x}{:016x}{}",
left, fprint.hash[0], fprint.hash[1], right
)
}
fn mangle_string(string: &str) -> String {
pseudo_uniquely_fit_string_in_filename(replace_slashes(string))
}
fn mangle_path(path: &Path) -> Result<String> {
let canonical = std::fs::canonicalize(path)
.map_err(|e| chain_error!(e, "failed to canonicalize path", ?path))?;
let string = canonical
.as_os_str()
.to_str()
.ok_or_else(|| fresh_error!("unable to convert canonical path to string", ?canonical))?;
Ok(mangle_string(string))
}
fn db_file_key(fd: &File) -> Result<String> {
use std::os::unix::fs::MetadataExt;
let meta = fd
.metadata()
.map_err(|e| chain_error!(e, "failed to stat file", ?fd))?;
Ok(format!("{}.{}", meta.dev(), meta.ino()))
}
fn file_is_stale(path: &Path, max_age: Duration) -> Result<bool> {
use std::os::unix::fs::MetadataExt;
let meta = std::fs::metadata(path).map_err(|e| {
filtered_io_error!(e,
ErrorKind::NotFound => Level::DEBUG,
"failed to stat file", ?path)
})?;
let changed = Duration::new(meta.ctime() as u64, meta.ctime_nsec() as u32);
let ctime = std::time::UNIX_EPOCH
.checked_add(changed)
.unwrap_or_else(std::time::SystemTime::now);
let elapsed = ctime
.elapsed()
.map_err(|e| chain_warn!(e, "time went backward", ?path))?;
Ok(elapsed >= max_age)
}
fn remove_dir(path: &Path) -> std::io::Result<()> {
#[cfg(feature = "test_validate_reads")]
return Ok(());
let mut new_path = path.to_path_buf();
new_path.set_extension(".del");
match std::fs::rename(path, &new_path) {
Ok(()) => std::fs::remove_dir_all(&new_path),
Err(_) => std::fs::remove_dir_all(path),
}
}
#[instrument(err)]
fn delete_stale_directories(goal_path: &Path) -> Result<()> {
let mut parent = goal_path.to_owned();
if !parent.pop() {
return Ok(());
}
let goal_filename = goal_path.file_name();
let it = match std::fs::read_dir(&parent) {
Ok(it) => it,
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(()),
Err(e) => {
return Err(chain_info!(e, "failed to list parent directory", path=?goal_path, ?parent))
}
};
for subdir in it.flatten() {
if Some(subdir.file_name().as_os_str()) != goal_filename {
parent.push(subdir.file_name());
if let Err(error) = remove_dir(&parent) {
if error.kind() != std::io::ErrorKind::NotFound {
tracing::info!(?parent, %error, "failed to remove sibling");
}
}
parent.pop();
}
}
Ok(())
}
#[instrument(err)]
fn delete_dangling_replication_directories(mut parent: PathBuf) -> Result<()> {
let it = match std::fs::read_dir(&parent) {
Ok(it) => it,
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(chain_info!(e, "failed to list parent directory", ?parent)),
};
for subdir in it.flatten() {
let name = match subdir.file_name().into_string() {
Ok(name) => name,
Err(_) => continue,
};
if name.starts_with('.') {
continue;
}
if !matches!(subdir.file_type(), Ok(typ) if typ.is_dir()) {
continue;
}
let base_file = match restore_slashes(&name) {
Ok(Some(base_file)) => base_file,
Ok(None) | Err(_) => continue,
};
if matches!(std::fs::symlink_metadata(&base_file),
Err(e) if e.kind() == ErrorKind::NotFound)
{
parent.push(subdir.file_name());
tracing::info!(%base_file, victim=?parent,
"deleting dangling replication directory");
drop_result!(remove_dir(&parent),
e => chain_info!(e, "failed to delete replication directory for missing db",
%base_file, victim=?parent));
parent.pop();
}
}
Ok(())
}
fn delete_orphan_tapped_manifest_files(spooling: PathBuf) -> Result<()> {
let mut tap = spooling;
tap.push(DOT_TAP);
let it = match std::fs::read_dir(&tap) {
Ok(it) => it,
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(chain_info!(e, "failed to list .tap directory", ?tap)),
};
let local_name = manifest_name_for_hostname_path(None, Path::new(""))
.map_err(|e| chain_error!(e, "failed to generate local manifest name"))?;
let decoded_name = percent_encoding::percent_decode_str(&local_name)
.decode_utf8()
.map_err(|e| chain_info!(e, "invalid utf-8 bytes in local manifest name"))?
.to_string();
let local_prefix = match decoded_name.rsplit_once(":") {
Some((prefix, _)) => format!("{}:", prefix),
None => {
return Err(fresh_error!(
"unexpected pattern in local manifest name",
%local_name))
}
};
fn handle_tap_file(path: &Path, decoded_name: &str) -> Result<()> {
if !file_is_stale(path, ORPHAN_TAP_FILE_GRACE_PERIOD)? {
return Ok(());
}
let suffix = match decoded_name
.rsplit_once(":")
.and_then(|(_, hash_path)| hash_path.split_once("/"))
{
Some((_hash, path)) => path,
None => return Ok(()),
};
if !matches!(std::fs::metadata(Path::new(suffix)), Err(e) if e.kind() == ErrorKind::NotFound)
{
return Ok(());
}
match std::fs::remove_file(&path) {
Ok(_) => Ok(()),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
Err(e) => Err(chain_error!(e, "failed to delete orphan tap file", ?path)),
}
}
for child in it.flatten() {
let name = match child.file_name().into_string() {
Ok(name) => name,
Err(_) => continue,
};
let decoded = match percent_encoding::percent_decode_str(&name).decode_utf8() {
Ok(decoded) => decoded.to_string(),
Err(_) => continue,
};
if !decoded.starts_with(&local_prefix) {
continue;
}
if !matches!(child.file_type(), Ok(typ) if typ.is_file()) {
continue;
}
tap.push(child.file_name());
drop_result!(handle_tap_file(&tap, &decoded),
e => chain_info!(e, "failed to maybe delete tapped manifest file", path=?tap));
tap.pop();
}
Ok(())
}
#[instrument(skip(worker), err)]
fn call_with_temp_file(
spool_dir: PathBuf,
target: &Path,
worker: impl Fn(&mut File) -> Result<()>,
) -> Result<()> {
if target.exists() {
return Ok(());
}
let (mut temp, _) = create_scratch_file(spool_dir)
.map_err(|e| chain_error!(e, "failed to prepare for publication", ?target))?;
let result = worker(temp.as_file_mut())?;
match temp.persist_noclobber(target) {
Ok(_) => Ok(result),
Err(tempfile::PersistError { error: e, .. })
if e.kind() == ErrorKind::AlreadyExists || target.exists() =>
{
Ok(result)
}
Err(e) => Err(chain_error!(e, "failed to publish temporary file", ?target)),
}
}
pub fn manifest_name_for_hostname_path(hostname_or: Option<&str>, path: &Path) -> Result<String> {
const ESCAPED: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC
.remove(b'!')
.remove(b'-')
.remove(b'_')
.remove(b'.')
.remove(b'*')
.remove(b'\'')
.remove(b'(')
.remove(b')');
lazy_static::lazy_static! {
static ref PARAMS: umash::Params = umash::Params::derive(0, b"verneuil path params");
}
#[allow(clippy::redundant_closure)] let hostname: &str = hostname_or.unwrap_or_else(|| instance_id::hostname());
let string = path
.as_os_str()
.to_str()
.ok_or_else(|| fresh_error!("failed to convert path to string", ?path))?;
let path_hash = PARAMS.hasher(0).write(string.as_bytes()).digest();
let name = format!(
"{}-verneuil:{}:{:04x}/{}",
&instance_id::hostname_hash(hostname),
hostname,
path_hash % (1 << (4 * 4)),
string
);
Ok(pseudo_uniquely_fit_string_in_filename(
percent_encoding::utf8_percent_encode(&name, &ESCAPED).to_string(),
))
}
fn percent_encode_local_path_uri(path: &Path) -> Result<String> {
manifest_name_for_hostname_path(None, path)
}
lazy_static::lazy_static! {
static ref CHUNK_HASH_SEPARATOR: String =
percent_encoding::utf8_percent_encode("/", percent_encoding::NON_ALPHANUMERIC).to_string();
}
pub(crate) fn fingerprint_chunk_name(fprint: &Fingerprint) -> String {
format!(
"{:016x}{}{:016x}",
fprint.hash(),
&*CHUNK_HASH_SEPARATOR,
fprint.secondary()
)
}
pub(crate) fn chunk_name_fingerprint(name: &str) -> Option<Fingerprint> {
if name.len() != (2 * 16) + CHUNK_HASH_SEPARATOR.len() {
return None;
}
let hash = u64::from_str_radix(&name[0..16], 16).ok()?;
let secondary = u64::from_str_radix(&name[16 + CHUNK_HASH_SEPARATOR.len()..], 16).ok()?;
Some(Fingerprint::new(hash, secondary))
}
#[instrument(level = "trace", err)]
fn read_manifest_at_path(
file_path: &Path,
cache_builder: kismet_cache::CacheBuilder,
targets: &[ReplicationTarget],
) -> Result<Option<(Manifest, Option<Arc<Chunk>>)>> {
use std::io::Read;
let mut file = match std::fs::File::open(file_path) {
Ok(file) => file,
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(chain_error!(e, "failed to open manifest", ?file_path)),
};
let mut contents = Vec::new();
file.read_to_end(&mut contents)
.map_err(|e| chain_error!(e, "failed to read manifest", ?file_path))?;
match Manifest::decode_and_validate(&*contents, cache_builder, Some(targets), file_path) {
Ok(ret) => Ok(Some(ret)),
Err(e) => {
use std::os::unix::fs::MetadataExt;
let initial_meta = file
.metadata()
.map_err(|e| chain_error!(e, "failed to stat open manifest file", ?file_path))?;
let current_meta = match std::fs::metadata(file_path) {
Ok(meta) => meta,
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(chain_error!(e, "failed to stat manifest file", ?file_path)),
};
if (initial_meta.dev(), initial_meta.ino()) == (current_meta.dev(), current_meta.ino())
{
Err(e)
} else {
Ok(None)
}
}
}
}
pub(crate) fn mutable_staging_directory(parent: PathBuf) -> PathBuf {
let mut staging = parent;
staging.push(STAGING);
staging
}
pub(crate) fn mutable_ready_directory(parent: PathBuf) -> PathBuf {
let mut ready = parent;
ready.push(READY);
ready
}
pub(crate) fn mutable_consuming_directory(parent: PathBuf) -> PathBuf {
let mut ready = parent;
ready.push(CONSUMING);
ready
}
#[instrument(level = "trace", err)]
pub(crate) fn snapshot_ready_directory(parent: PathBuf) -> Result<Option<PathBuf>> {
let ready = mutable_ready_directory(parent.clone());
let consuming = mutable_consuming_directory(parent);
match std::fs::rename(ready, &consuming) {
Ok(()) => {}
Err(e) if matches!(e.kind(), ErrorKind::AlreadyExists | ErrorKind::NotFound) => {}
Err(e) if e.raw_os_error() == Some(libc::ENOTEMPTY) => {}
Err(e) => {
if !consuming.is_dir() {
return Err(chain_error!(e, "failed to acquire new consuming directory"));
}
}
}
let ret = append_subdirectory(consuming);
if ret.is_dir() {
Ok(Some(ret))
} else {
Ok(None)
}
}
#[instrument(level = "trace")]
pub(crate) fn remove_consuming_directory_if_empty(parent: PathBuf) -> Result<()> {
let mut consuming = parent;
consuming.push(CONSUMING);
match std::fs::remove_dir(&consuming) {
Ok(()) => Ok(()),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
Err(e) if e.raw_os_error() == Some(libc::ENOTEMPTY) => Err(chain_info!(
e,
"found non-empty consuming directory",
?consuming
)),
Err(e) => Err(chain_error!(
e,
"failed to remove consuming directory",
?consuming
)),
}
}
pub(crate) fn buffer_metadata_file(buffer: PathBuf) -> PathBuf {
let mut metadata = buffer;
metadata.push(DOT_METADATA);
metadata
}
pub(crate) fn directory_chunks(parent: PathBuf) -> PathBuf {
let mut chunks = parent;
chunks.push(CHUNKS);
chunks
}
pub(crate) fn directory_meta(parent: PathBuf) -> PathBuf {
let mut meta = parent;
meta.push(META);
meta
}
pub(crate) fn current_spooling_dir(mut prefix: PathBuf) -> PathBuf {
fn dir_name(instance_id: &str) -> String {
format!(
"verneuil-{}-{}",
replace_slashes(instance_id),
ON_DISK_FORMAT_VERSION_SUFFIX,
)
}
for probe in instance_id::likely_instance_ids(INSTANCE_ID_PROBE_RANGE) {
prefix.push(dir_name(&probe));
if prefix.is_dir() {
return prefix;
}
prefix.pop();
}
prefix.push(dir_name(instance_id::instance_id()));
prefix
}
impl ReplicationBuffer {
#[instrument(err)]
pub fn new(db_path: &Path, fd: &File) -> Result<Option<ReplicationBuffer>> {
use std::sync::atomic::AtomicBool;
static CLEAN_ONCE_FLAG: AtomicBool = AtomicBool::new(true);
let mut spooling = match DEFAULT_SPOOLING_DIRECTORY.read().unwrap().clone() {
Some(dir) => dir,
None => return Ok(None),
};
spooling = current_spooling_dir(spooling);
if CLEAN_ONCE_FLAG.swap(false, std::sync::atomic::Ordering::Relaxed) {
drop_result!(delete_dangling_replication_directories(spooling.clone()),
e => chain_info!(e, "failed to scan for dangling replication directories",
?spooling));
drop_result!(delete_orphan_tapped_manifest_files(spooling.clone()),
e => chain_info!(e, "failed to scan for orphan manifest files",
?spooling));
}
spooling.push(mangle_path(db_path)?);
spooling.push(db_file_key(fd)?);
drop_result!(delete_stale_directories(&spooling),
e => chain_info!(e, "failed to delete stale directories", ?spooling));
std::fs::create_dir_all(&spooling).map_err(|e| {
chain_error!(
e,
"failed to create spooling directory",
?db_path,
?spooling
)
})?;
std::fs::set_permissions(&spooling, PermissionsExt::from_mode(0o777)).map_err(
|e| chain_error!(e, "failed to set spooling dir permissions", dir=?spooling),
)?;
Ok(Some(ReplicationBuffer {
spooling_directory: spooling,
}))
}
#[instrument]
pub fn ensure_staging_dir(&self, targets: &ReplicationTargetList, overwrite_meta: bool) {
let mut buf = self.spooling_directory.clone();
buf.push(STAGING);
drop_result!(
std::fs::create_dir_all(&buf),
e => filtered_io_error!(e, ErrorKind::AlreadyExists => Level::DEBUG,
"failed to create staging dir", dir=?buf)
);
drop_result!(std::fs::set_permissions(&buf, PermissionsExt::from_mode(0o777)),
e => chain_error!(e, "failed to set staging dir permissions", dir=?buf));
for subdir in &SUBDIRS {
buf.push(subdir);
drop_result!(
std::fs::create_dir_all(&buf),
e => filtered_io_error!(e, ErrorKind::AlreadyExists => Level::DEBUG,
"failed to create staging subdir", subdir, dir=?buf)
);
drop_result!(std::fs::set_permissions(&buf, PermissionsExt::from_mode(0o777)),
e => chain_error!(e, "failed to set staging subdir permissions", dir=?buf));
buf.pop();
}
buf.pop();
buf.push(DOT_METADATA);
if overwrite_meta || !buf.exists() {
use std::io::Write;
let json_bytes = serde_json::to_vec(&targets).expect("failed to serialize metadata");
if let Ok((temp, _)) = create_scratch_file(self.spooling_directory.clone()) {
let written = temp.as_file().write_all(&json_bytes).map_err(|e| {
chain_error!(
e,
"failed to populate temporary file",
len = json_bytes.len()
)
});
if let Ok(()) = written {
drop_result!(
temp.persist(&buf),
e => chain_error!(e, "failed to publish .metadata", dst=?buf)
);
}
}
}
}
#[instrument(level = "trace", err)]
pub fn stage_chunk(&self, fprint: Fingerprint, data: &[u8]) -> Result<()> {
use std::io::Write;
let chunk_name = fingerprint_chunk_name(&fprint);
let mut target = self.spooling_directory.clone();
target.push(STAGING);
target.push(CHUNKS);
target.push(&chunk_name);
call_with_temp_file(self.spooling_directory.clone(), &target, |dst| {
dst.write_all(data)
.map_err(|e| chain_error!(e, "failed to write chunk", ?target, len = data.len()))
})
}
#[instrument(err)]
pub fn publish_manifest(&self, db_path: &Path, manifest: &Manifest) -> Result<()> {
use prost::Message;
use std::io::Write;
let mut encoded = Vec::<u8>::new();
manifest
.encode(&mut encoded)
.map_err(|e| chain_error!(e, "failed to serialize manifest proto", ?manifest))?;
let (temp, mut target) = create_scratch_file(self.spooling_directory.clone())?;
temp.as_file().write_all(&encoded).map_err(|e| {
chain_error!(
e,
"failed to write to temporary file",
?temp,
len = encoded.len()
)
})?;
target.pop();
target.push(META);
target.push(
&percent_encode_local_path_uri(db_path)
.map_err(|e| chain_error!(e, "invalid manifest name", ?db_path))?,
);
temp.persist(&target)
.map_err(|e| chain_error!(e, "failed to publish manifest", ?target))?;
Ok(())
}
pub fn has_ready_manifest(&self, db_path: &Path) -> bool {
let src = || -> Result<PathBuf> {
let mut src = self.spooling_directory.clone();
src.push(READY);
src = append_subdirectory(src);
src.push(META);
src.push(&percent_encode_local_path_uri(db_path)?);
Ok(src)
};
match src() {
Ok(src) => std::fs::symlink_metadata(&src).is_ok(),
Err(_) => false,
}
}
#[instrument(level = "trace", err)]
pub fn read_consuming_manifest(
&self,
db_path: &Path,
cache_builder: kismet_cache::CacheBuilder,
targets: &[ReplicationTarget],
) -> Result<Option<(Manifest, Option<Arc<Chunk>>)>> {
let mut src = self.spooling_directory.clone();
src.push(CONSUMING);
src = append_subdirectory(src);
src.push(META);
src.push(&percent_encode_local_path_uri(db_path)?);
read_manifest_at_path(&src, cache_builder, targets)
}
#[instrument(level = "trace", err)]
pub fn read_ready_manifest(
&self,
db_path: &Path,
cache_builder: kismet_cache::CacheBuilder,
targets: &[ReplicationTarget],
) -> Result<Option<(Manifest, Option<Arc<Chunk>>)>> {
let mut src = self.spooling_directory.clone();
src.push(READY);
src = append_subdirectory(src);
src.push(META);
src.push(&percent_encode_local_path_uri(db_path)?);
read_manifest_at_path(&src, cache_builder, targets)
}
#[instrument(level = "trace", err)]
pub fn read_staged_manifest(
&self,
db_path: &Path,
cache_builder: kismet_cache::CacheBuilder,
targets: &[ReplicationTarget],
) -> Result<Option<(Manifest, Option<Arc<Chunk>>)>> {
let mut src = self.spooling_directory.clone();
src.push(STAGING);
src.push(META);
src.push(&percent_encode_local_path_uri(db_path)?);
read_manifest_at_path(&src, cache_builder, targets)
}
#[allow(dead_code)]
pub fn staged_chunk_directory(&self) -> PathBuf {
let mut dir = self.spooling_directory.clone();
dir.push(STAGING);
dir.push(CHUNKS);
dir
}
#[allow(dead_code)]
pub fn ready_chunk_directory(&self) -> PathBuf {
let mut dir = self.spooling_directory.clone();
dir.push(READY);
dir = append_subdirectory(dir);
dir.push(CHUNKS);
dir
}
#[allow(dead_code)]
pub fn consuming_chunk_directory(&self) -> PathBuf {
let mut dir = self.spooling_directory.clone();
dir.push(CONSUMING);
dir = append_subdirectory(dir);
dir.push(CHUNKS);
dir
}
#[instrument(skip(chunks), err)]
pub fn prepare_ready_buffer(&self, chunks: &[Fingerprint]) -> Result<TempDir> {
use tempfile::Builder;
let live: HashSet<String> = chunks.iter().map(fingerprint_chunk_name).collect();
let mut staging = self.spooling_directory.clone();
staging.push(STAGING);
let mut target = self.spooling_directory.clone();
target.push(STAGING);
target.push(SCRATCH);
let temp = Builder::new()
.prefix(&(process_id() + "."))
.suffix(".tmp")
.tempdir_in(&target)
.map_err(|e| chain_error!(e, "failed to create temporary directory", ?target))?;
let mut temp_path = temp.path().to_owned();
std::fs::set_permissions(&temp_path, PermissionsExt::from_mode(0o777)).map_err(
|e| chain_error!(e, "failed to set temporary dir permissions", dir=?temp_path),
)?;
temp_path.push(pseudo_unique_filename());
std::fs::create_dir(&temp_path).map_err(|e| {
filtered_io_error!(
e,
ErrorKind::NotFound => Level::DEBUG,
"failed to create temporary pseudo-unique directory",
?temp_path
)
})?;
std::fs::set_permissions(&temp_path, PermissionsExt::from_mode(0o777)).map_err(
|e| chain_error!(e, "failed to set temporary pseudo-unique dir permissions", dir=?temp_path),
)?;
staging.push(CHUNKS);
temp_path.push(CHUNKS);
std::fs::create_dir(&temp_path).map_err(|e| {
filtered_io_error!(
e,
ErrorKind::NotFound => Level::DEBUG,
"failed to create temporary chunks directory",
?temp_path
)
})?;
std::fs::set_permissions(&temp_path, PermissionsExt::from_mode(0o777)).map_err(
|e| chain_error!(e, "failed to set temporary chunks dir permissions", dir=?temp_path),
)?;
for file in std::fs::read_dir(&staging)
.map_err(|e| chain_error!(e, "failed to list staged chunks", ?staging))?
{
if let Some(name) = file
.map_err(|e| chain_error!(e, "failed to read chunk direntry"))?
.file_name()
.to_str()
{
if live.contains(name) {
staging.push(name);
temp_path.push(name);
match std::fs::hard_link(&staging, &temp_path) {
Ok(_) => {}
Err(error) if error.kind() == ErrorKind::NotFound => {}
Err(err) => return Err(chain_error!(err, "failed to link ready chunk")),
}
temp_path.pop();
staging.pop();
}
}
}
temp_path.pop();
staging.pop();
staging.push(META);
temp_path.push(META);
std::fs::create_dir(&temp_path).map_err(|e| {
filtered_io_error!(
e,
ErrorKind::NotFound => Level::DEBUG,
"failed to create temporary meta directory",
?temp_path
)
})?;
std::fs::set_permissions(&temp_path, PermissionsExt::from_mode(0o777)).map_err(
|e| chain_error!(e, "failed to set temporary meta dir permissions", dir=?temp_path),
)?;
for file_or in std::fs::read_dir(&staging)
.map_err(|e| chain_error!(e, "failed to list staged meta", ?staging))?
{
let file = file_or.map_err(|e| chain_error!(e, "failed to read meta direntry"))?;
staging.push(file.file_name());
temp_path.push(file.file_name());
std::fs::hard_link(&staging, &temp_path).map_err(|e| {
filtered_io_error!(
e,
ErrorKind::NotFound => Level::DEBUG,
"failed to link ready manifest",
?staging,
?temp_path
)
})?;
temp_path.pop();
staging.pop();
}
Ok(temp)
}
#[instrument(err)]
pub fn publish_ready_buffer(&self, ready: TempDir) -> Result<()> {
let mut target = self.spooling_directory.clone();
target.push(READY);
std::fs::rename(ready.path(), &target).map_err(|e| {
filtered_io_error!(
e,
ErrorKind::AlreadyExists | ErrorKind::NotFound => Level::DEBUG,
"failed to publish ready buffer",
?target,
?ready
)
})
}
pub fn spooling_directory(&self) -> &Path {
&self.spooling_directory
}
#[instrument(skip(chunks), err)]
pub fn gc_chunks(&self, chunks: &[Fingerprint]) -> Result<()> {
let live: HashSet<String> = chunks.iter().map(fingerprint_chunk_name).collect();
let mut chunks = self.spooling_directory.clone();
chunks.push(STAGING);
chunks.push(CHUNKS);
for file in std::fs::read_dir(&chunks)
.map_err(|e| chain_error!(e, "failed to list staged chunks", ?chunks))?
.flatten()
{
if let Some(name) = file.file_name().to_str() {
if live.contains(name) {
continue;
}
}
chunks.push(file.file_name());
match std::fs::remove_file(&chunks) {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::NotFound => {}
Err(error) => {
tracing::error!(%error, ?chunks, "failed to clean up stale chunk file")
}
}
chunks.pop();
}
Ok(())
}
#[instrument(err)]
pub fn cleanup_scratch_directory(&self) -> Result<()> {
fn remove_file_if_stale(path: &Path) {
if matches!(file_is_stale(path, SCRATCH_FILE_GRACE_PERIOD), Ok(false)) {
return;
}
match std::fs::remove_file(path) {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::NotFound => {}
Err(error) => {
tracing::error!(%error, ?path, "failed to remove stale scratch file")
}
}
}
let mut scratch = self.spooling_directory.clone();
scratch.push(STAGING);
scratch.push(SCRATCH);
for entry in std::fs::read_dir(&scratch)
.map_err(|e| chain_error!(e, "failed to list scratch directory", ?scratch))?
.flatten()
{
let is_dir = match entry.file_type() {
Ok(file_type) => file_type.is_dir(),
Err(error) => {
tracing::info!(%error, ?entry, "failed to read scratch dentry");
false
}
};
if !is_dir {
scratch.push(entry.file_name());
remove_file_if_stale(&scratch);
scratch.pop();
continue;
}
scratch.push(entry.file_name());
if scratch.extension() != Some(std::ffi::OsStr::new(".del")) {
let old_path = scratch.clone();
scratch.set_extension(".del");
match std::fs::rename(&old_path, &scratch) {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::NotFound => {}
Err(error) => {
tracing::error!(%error, ?old_path, ?scratch, "failed to rename scratch subdirectory")
}
};
}
match std::fs::remove_dir_all(&scratch) {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::NotFound => {}
Err(error) => {
tracing::error!(%error, ?scratch, "failed to remove scratch subdirectory")
}
};
scratch.pop();
}
Ok(())
}
}
#[test]
fn percent_encode_sample() {
const SAMPLE_PATH: &str = "/a@<<sd!%-_/.asd/*'fdg(g/)\\~).db";
assert_eq!(
percent_encode_local_path_uri(Path::new(SAMPLE_PATH)).expect("should convert"),
format!(
"{}-verneuil%3A{}%3A634c%2F%2Fa%40%3C%3Csd!%25-_%2F.asd%2F*\'fdg(g%2F)%5C%7E).db",
instance_id::hostname_hash(instance_id::hostname()),
instance_id::hostname()
)
);
}
#[test]
fn replace_slashes_invertible() {
const HARD_STRING: &str = "/%asd#//%60";
let converted = replace_slashes(HARD_STRING);
assert_eq!(converted.find('/'), None);
assert_eq!(
HARD_STRING,
&restore_slashes(&converted)
.expect("must decode")
.expect("must be reversible")
);
}
#[test]
fn mangle_path_short() {
let path = format!("/foo/bar/{}", String::from_utf8(vec![b'X'; 220]).unwrap());
let mangled = mangle_string(&path);
assert!(mangled.as_bytes().len() < MAX_MANGLED_NAME_LENGTH);
assert_eq!(
path,
restore_slashes(&mangled)
.expect("must decode")
.expect("must be reversible")
);
}
#[test]
fn mangle_path_long() {
let path = format!("/foo/bar/{}", String::from_utf8(vec![b'X'; 221]).unwrap());
let mangled = mangle_string(&path);
assert!(mangled.as_bytes().len() > MAX_MANGLED_NAME_LENGTH);
assert_eq!(None, restore_slashes(&mangled).expect("must decode"));
}
#[test]
fn mangle_path_over_name_max() {
let path = format!("/foo/bar/{}", String::from_utf8(vec![b'X'; 1000]).unwrap());
let mangled = mangle_string(&path);
assert!(mangled.as_bytes().len() > MAX_MANGLED_NAME_LENGTH);
assert_eq!(240, mangled.as_bytes().len());
assert_eq!(None, restore_slashes(&mangled).expect("must decode"));
}
#[test]
fn test_manifest_name_for_hostname_path() {
assert_eq!(
manifest_name_for_hostname_path(Some("test.com"), Path::new("/tmp/test.db")).unwrap(),
"7cb1-verneuil%3Atest.com%3A80ad%2F%2Ftmp%2Ftest.db"
);
}
#[test]
fn test_chunk_name_roundtrip() {
let fprint1 = Fingerprint::new(1, 2);
assert_eq!(
fprint1,
chunk_name_fingerprint(&fingerprint_chunk_name(&fprint1)).expect("should parse")
);
let fprint2 = Fingerprint::new(u64::MAX, u64::MAX - 1);
assert_eq!(
fprint2,
chunk_name_fingerprint(&fingerprint_chunk_name(&fprint2)).expect("should parse")
);
}