//! Local repository copy synchronized with rsync.
//!
//! The rsync collector works as follows:
//!
//! Data is kept in the directory given via the cache_dir attribute using the
//! rsync URI without the scheme as the path. We assume that data is published
//! in rsync modules identified by the first two components of this path. This
//! corresponds to the way the rsync daemon works.
//!
//! During a valiation run, we keep track of the modules we already have
//! updated. When access to a module that has not yet been updated is
//! requested, we spawn rsync and block until it returns. If during that time
//! another thread requests access to the same module, that thread is blocked,
//! too.
use std::{fmt, fs, io, ops};
use std::borrow::{Borrow, Cow, ToOwned};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::process::{Command as StdCommand, ExitStatus, Stdio};
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use bytes::Bytes;
use futures::{FutureExt, TryFutureExt};
use futures::future::Either;
use log::{debug, error, info, warn};
use rpki::uri;
use tokio::process::Command as AsyncCommand;
use crate::config::Config;
use crate::error::Failed;
use crate::metrics::{Metrics, RsyncModuleMetrics};
use crate::utils::fatal;
use crate::utils::sync::{Mutex, RwLock};
use crate::utils::uri::UriExt;
//------------ Collector -----------------------------------------------------
/// A local copy of repositories synchronized via rsync.
#[derive(Debug)]
pub struct Collector {
/// The base directory of the collector.
working_dir: WorkingDir,
/// The command for running rsync.
///
/// If this is `None` actual rsyncing has been disabled and data
/// present will be used as is.
command: Option<RsyncCommand>,
/// Whether to filter dubious authorities in rsync URIs.
filter_dubious: bool,
}
impl Collector {
/// Initializes the rsync collector without creating a value.
///
/// This function is called implicitely by [`new`][Collector::new].
pub fn init(config: &Config) -> Result<(), Failed> {
let _ = Self::create_working_dir(config)?;
Ok(())
}
/// Creates the working dir and returns its path.
fn create_working_dir(config: &Config) -> Result<PathBuf, Failed> {
let working_dir = config.cache_dir.join("rsync");
if config.fresh {
if let Err(err) = fs::remove_dir_all(&working_dir) {
if err.kind() != io::ErrorKind::NotFound {
error!(
"Failed to delete rsync working directory at {}: {}",
working_dir.display(), err
);
return Err(Failed)
}
}
}
if let Err(err) = fs::create_dir_all(&working_dir) {
error!(
"Failed to create rsync working directory {}: {}.",
working_dir.display(), err
);
return Err(Failed);
}
Ok(working_dir)
}
/// Creates a new rsync collector.
///
/// If use of rsync is disabled via the config, returns `Ok(None)`.
pub fn new(config: &Config) -> Result<Option<Self>, Failed> {
if config.disable_rsync {
Ok(None)
}
else {
Ok(Some(Collector {
working_dir: WorkingDir::new(
Self::create_working_dir(config)?
),
command: Some(RsyncCommand::new(config)?),
filter_dubious: !config.allow_dubious_hosts
}))
}
}
/// Prepares the collector for use in a validation run.
pub fn ignite(&mut self) -> Result<(), Failed> {
// We don’t need to do anything. But just in case we later will,
// let’s keep the method around.
Ok(())
}
/// Start a validation run on the collector.
pub fn start(&self) -> Run {
Run::new(self)
}
/// Dumps the content of the rsync collector.
pub fn dump(&self, dir: &Path) -> Result<(), Failed> {
let target = dir.join("rsync");
debug!("Dumping rsync collector content to {}", target.display());
if let Err(err) = fs::remove_dir_all(&target) {
if err.kind() != io::ErrorKind::NotFound {
error!(
"Failed to delete directory {}: {}",
dir.display(), err
);
return Err(Failed)
}
}
Self::dump_dir(&self.working_dir.base, &target)?;
debug!("Rsync collector dump complete.");
Ok(())
}
/// Recursively copies the content of `source` to `target`.
fn dump_dir(source: &Path, target: &Path) -> Result<(), Failed> {
let read_dir = match fs::read_dir(source) {
Ok(read_dir) => read_dir,
Err(err) => {
error!(
"Failed to open directory {}: {}", source.display(), err
);
return Err(Failed)
}
};
for item in read_dir {
let item = match item {
Ok(item) => item,
Err(err) => {
error!(
"Failed to read directory {}: {}",
source.display(), err
);
return Err(Failed)
}
};
let file_type = match item.file_type() {
Ok(file_type) => file_type,
Err(err) => {
error!(
"Failed to file type for {}: {}",
item.path().display(), err
);
return Err(Failed)
}
};
if file_type.is_dir() {
let target = target.join(item.file_name());
if let Err(err) = fs::create_dir_all(&target) {
error!(
"Failed to create directory {}: {}",
target.display(), err
);
return Err(Failed);
}
Self::dump_dir(&item.path(), &target)?;
}
else if file_type.is_file() {
let target = target.join(item.file_name());
if let Err(err) = fs::copy(item.path(), &target) {
error!(
"Failed to copy {} to {}: {}",
item.path().display(), target.display(), err
);
return Err(Failed)
}
}
}
Ok(())
}
}
//------------ Run -----------------------------------------------------------
/// Using the rsync collector during a validation run.
#[derive(Debug)]
pub struct Run<'a> {
/// A reference to the underlying collector.
collector: &'a Collector,
/// The set of modules that have been updated already.
updated: RwLock<HashSet<OwnedModule>>,
/// The modules that are currently being updated.
///
/// The value in the map is a mutex that is used to synchronize competing
/// attempts to update the module. Only the thread that has the mutex is
/// allowed to actually run rsync.
running: RwLock<HashMap<OwnedModule, Arc<Mutex<()>>>>,
/// The metrics for updated rsync modules.
metrics: Mutex<Vec<RsyncModuleMetrics>>,
}
impl<'a> Run<'a> {
/// Creates a new runner from a collector.
fn new(collector: &'a Collector) -> Self {
Run {
collector,
updated: Default::default(),
running: Default::default(),
metrics: Default::default(),
}
}
/// Returns whether the module for the given URI has been updated yet.
///
/// This does not mean the module is actually up-to-date or even available
/// as an update may have failed.
pub fn was_updated(&self, uri: &uri::Rsync) -> bool {
self.updated.read().contains(Module::from_uri(uri).as_ref())
}
/// Tries to update the module for the given URI.
///
/// If the module has not yet been updated, may block until an update
/// finished. This update may not be successful and files in the module
/// may be outdated or missing completely.
pub fn load_module(&self, uri: &uri::Rsync) {
let command = match self.collector.command.as_ref() {
Some(command) => command,
None => return,
};
let module = Module::from_uri(uri);
// If it is already up-to-date, return.
if self.updated.read().contains(module.as_ref()) {
return
}
// Get a clone of the (arc-ed) mutex. Make a new one if there isn’t
// yet.
let mutex = {
self.running.write()
.entry(module.clone().into_owned()).or_default()
.clone()
};
// Acquire the mutex. Once we have it, see if the module is up-to-date
// which happens if someone else had it first.
let _lock = mutex.lock();
if self.updated.read().contains(module.as_ref()) {
return
}
// Check if the module name is dubious. If so, skip updating.
if self.collector.filter_dubious && uri.has_dubious_authority() {
warn!(
"{}: Dubious host name. Skipping update.",
module
)
}
else {
// Run the actual update.
let metrics = command.update(
module.as_ref(),
&self.collector.working_dir.module_path(module.as_ref())
);
// Insert into updated map and metrics.
self.metrics.lock().push(metrics);
}
// Remove from running.
self.running.write().remove(module.as_ref());
// Insert into updated map no matter what.
self.updated.write().insert(module.into_owned());
}
/// Loads the file for the given URI.
///
/// Does _not_ attempt to update the corresponding module first. You need
/// to explicitely call [`load_module`][Run::load_module] for that.
///
/// If the file is missing, returns `None`.
pub fn load_file(
&self,
uri: &uri::Rsync,
) -> Option<Bytes> {
let path = self.collector.working_dir.uri_path(uri);
match fs::File::open(&path) {
Ok(mut file) => {
let mut data = Vec::new();
if let Err(err) = io::Read::read_to_end(&mut file, &mut data) {
error!(
"Failed to read file '{}': {}",
path.display(),
err
);
None
}
else {
Some(data.into())
}
}
Err(err) => {
if err.kind() == io::ErrorKind::NotFound {
info!("{}: not found in local repository", uri);
} else {
error!(
"Failed to open file '{}': {}",
path.display(), err
);
}
None
}
}
}
/// Cleans the collector only keeping the modules included in `retain`.
//
// This currently is super agressive, deleting everyting that it doesn’t
// like.
pub fn cleanup(&self, retain: &mut ModuleSet) -> Result<(), Failed> {
if self.collector.command.is_none() {
return Ok(())
}
// Add all modules we’ve used during this run to retain.
for module in self.updated.read().iter() {
retain.add_from_uri(&module.to_uri());
}
for entry in fatal::read_dir(
&self.collector.working_dir.base
)? {
let entry = entry?;
let keep = match entry.file_name().to_str() {
Some(name) => {
match retain.authorities.get(name) {
Some(modules) => self.cleanup_host(&entry, modules)?,
None => false,
}
}
None => false
};
if !keep {
fatal::remove_all(entry.path())?;
}
}
Ok(())
}
/// Removes all modules from the directory that are not in `retain`.
///
/// Returns whether the host directory should be kept or can be deleted,
/// too.
fn cleanup_host(
&self, entry: &fatal::DirEntry, retain: &HashSet<String>
) -> Result<bool, Failed> {
if entry.is_file() {
fatal::remove_file(entry.path())?;
return Ok(false)
}
else if !entry.is_dir() {
return Ok(false)
}
let mut keep_host = false;
for entry in fatal::read_dir(entry.path())? {
let entry = entry?;
let keep = match entry.file_name().to_str() {
Some(name) => retain.contains(name),
None => false
};
if !keep {
fatal::remove_all(entry.path())?;
}
else {
keep_host = true;
}
}
Ok(keep_host)
}
/// Finishes the validation run.
///
/// Updates `metrics` with the collector run’s metrics.
///
/// If you are not interested in the metrics, you can simple drop the
/// value, instead.
pub fn done(self, metrics: &mut Metrics) {
metrics.rsync = self.metrics.into_inner();
}
}
//------------ RsyncCommand --------------------------------------------------
/// The command to run rsync.
#[derive(Debug)]
struct RsyncCommand {
/// The actual command.
command: String,
/// The list of additional arguments.
///
/// We will always add a few more when actually running.
args: Vec<String>,
/// The rsync timeout.
timeout: Option<Duration>,
}
impl RsyncCommand {
/// Creates a new rsync command from the config.
pub fn new(config: &Config) -> Result<Self, Failed> {
let command = config.rsync_command.clone();
let output = match StdCommand::new(&command).arg("-h").output() {
Ok(output) => output,
Err(err) => {
error!(
"Failed to run rsync: {}",
err
);
return Err(Failed)
}
};
if !output.status.success() {
error!(
"Running rsync failed with output: \n{}",
String::from_utf8_lossy(&output.stderr)
);
return Err(Failed);
}
let args = match config.rsync_args {
Some(ref args) => args.clone(),
None => {
let mut args = Vec::new();
let has_contimeout =
output.stdout.windows(12)
.any(|window| window == b"--contimeout");
if has_contimeout {
args.push("--contimeout=10".into());
}
if let Some(max_size) = config.max_object_size {
args.push(format!("--max-size={}", max_size));
}
args
}
};
Ok(RsyncCommand {
command,
args,
timeout: config.rsync_timeout,
})
}
/// Updates a module by running rsync.
pub fn update(
&self,
source: &Module,
destination: &Path
) -> RsyncModuleMetrics {
let start = SystemTime::now();
let status = self.command(
source, destination
).and_then(|cmd| self.run(source, cmd));
RsyncModuleMetrics {
module: source.to_uri(),
status,
duration: SystemTime::now().duration_since(start),
}
}
/// Actually runs the rsync command.
fn run(
&self,
source: &Module,
mut command: AsyncCommand
) -> Result<ExitStatus, io::Error> {
// Because we can’t have a timeout on a child process with just std,
// we resort to Tokio here: We fire up a current-thread runtime and
// use Tokio’s async process handling.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()?;
runtime.block_on(async {
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
command.kill_on_drop(true);
let mut child = command.spawn()?;
let stdout_pipe = child.stdout.take();
let stderr_pipe = child.stderr.take();
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let res = tokio::try_join!(
match self.timeout {
None => Either::Left(child.wait().map(Ok)),
Some(timeout) => {
Either::Right(
tokio::time::timeout(
timeout, child.wait()
).map_err(|_| {
io::Error::new(
io::ErrorKind::TimedOut,
"rsync process reached time out"
)
})
)
}
},
async {
if let Some(mut pipe) = stdout_pipe {
tokio::io::copy(&mut pipe, &mut stdout).await?;
}
Ok(())
},
async {
if let Some(mut pipe) = stderr_pipe {
tokio::io::copy(&mut pipe, &mut stderr).await?;
}
Ok(())
},
);
let status = match res {
Ok((Ok(status), _, _)) => {
// Child has exited successfully with status.
Ok(status)
}
Ok((Err(err), _, _)) => {
// Waiting for child has failed with err.
Err(err)
}
Err(err) => {
if let Err(kill_err) = child.kill().await {
warn!(
"{}: Failed to kill rsync process: {}",
source, kill_err
);
}
Err(err)
}
};
if !stderr.is_empty() {
String::from_utf8_lossy(&stderr).lines().for_each(|l| {
warn!("{}: {}", source, l);
})
}
if !stdout.is_empty() {
String::from_utf8_lossy(&stdout).lines().for_each(|l| {
info!("{}: {}", source, l);
})
}
if let Err(ref err) = status {
warn!("{}: {}", source, err);
}
status
})
}
/// Creates the rsync command.
fn command(
&self,
source: &Module,
destination: &Path
) -> Result<AsyncCommand, io::Error> {
info!("rsyncing from {}.", source);
fs::create_dir_all(destination)?;
let destination = match Self::format_destination(destination) {
Ok(some) => some,
Err(_) => {
error!(
"rsync: illegal destination path {}.",
destination.display()
);
return Err(io::Error::new(
io::ErrorKind::Other,
"illegal destination path"
));
}
};
let mut cmd = AsyncCommand::new(&self.command);
for item in &self.args {
cmd.arg(item);
}
cmd.arg("-rltz")
.arg("--delete")
.arg(source.to_string())
.arg(destination);
debug!("{}: Running command {:?}", source, cmd);
Ok(cmd)
}
/// Formats the destination path for inclusion in the command.
#[cfg(not(windows))]
#[allow(clippy::unnecessary_wraps)]
fn format_destination(path: &Path) -> Result<String, Failed> {
// Make sure the path ends in a slash or strange things happen.
let mut destination = format!("{}", path.display());
if !destination.ends_with('/') {
destination.push('/')
}
Ok(destination)
}
/// Formats the destination path for inclusion in the command.
#[cfg(windows)]
fn format_destination(path: &Path) -> Result<String, Failed> {
// On Windows we are using Cygwin rsync which requires Unix-style
// paths. In particular, the drive parameter needs to be turned
// from e.g. `C:` into `/cygdrive/c` and all backslashes should
// become slashes.
use std::path::{Component, Prefix};
let mut destination = String::new();
for component in path.components() {
match component {
Component::Prefix(prefix) => {
// We only accept UNC and Disk prefixes. Everything else
// causes an error.
match prefix.kind() {
Prefix::UNC(server, share) => {
let (server, share) = match (server.to_str(),
share.to_str()) {
(Some(srv), Some(shr)) => (srv, shr),
_ => return Err(Failed)
};
destination.push_str(server);
destination.push('/');
destination.push_str(share);
}
Prefix::Disk(disk) => {
let disk = if disk.is_ascii() {
(disk as char).to_ascii_lowercase()
}
else {
return Err(Failed)
};
destination.push_str("/cygdrive/");
destination.push(disk);
}
_ => return Err(Failed)
}
}
Component::CurDir | Component::RootDir => {
continue
}
Component::ParentDir => {
destination.push_str("..");
}
Component::Normal(s) => {
match s.to_str() {
Some(s) => destination.push_str(s),
None => return Err(Failed)
}
}
}
destination.push('/');
}
Ok(destination)
}
}
//------------ WorkingDir ----------------------------------------------------
/// The working directory of the rsync collector.
#[derive(Clone, Debug)]
struct WorkingDir {
/// The base path.
base: PathBuf
}
impl WorkingDir {
/// Creates a new value.
///
/// Does not actually create the directory on disk.
pub fn new(base: PathBuf) -> Self {
WorkingDir { base }
}
/// Returns the absolute path for the given module.
pub fn module_path(&self, module: &Module) -> PathBuf {
let mut res = self.base.clone();
res.push(&module.0[8..]);
res
}
/// Returns the absolute path for the given URI.
fn uri_path(&self, uri: &uri::Rsync) -> PathBuf {
let mut res = self.base.clone();
res.push(uri.canonical_authority().as_ref());
res.push(uri.module_name());
res.push(uri.path());
res
}
}
//------------ Module --------------------------------------------------------
/// The module portion of an rsync URI.
///
/// This is an unsized object – essentially a wrapped `str`.
#[derive(Debug, Eq, Hash, PartialEq)]
pub struct Module(str);
impl Module {
/// Creates a new module without checking the underlying string.
unsafe fn from_str(s: &str) -> &Module {
&*(s as *const str as *const Module)
}
/// Returns a module reference for a reference to an rsync URI.
///
/// Because the authority portion of a URI is case insensitive, the
/// function may have to convert upper ASCII case letters into lower case
/// to create a canonical value. If this has to happen, an [`OwnedModule`]
/// is returned via the cow.
pub fn from_uri(uri: &uri::Rsync) -> Cow<Module> {
match uri.canonical_module() {
Cow::Borrowed(s) => {
Cow::Borrowed(unsafe { Module::from_str(s) })
}
Cow::Owned(s) => Cow::Owned(OwnedModule(s))
}
}
/// Converts a module reference into its rsync URI.
pub fn to_uri(&self) -> uri::Rsync {
uri::Rsync::from_str(&self.0).unwrap()
}
}
//--- ToOwned
impl ToOwned for Module {
type Owned = OwnedModule;
fn to_owned(&self) -> Self::Owned {
OwnedModule(self.0.to_owned())
}
}
//--- Display
impl fmt::Display for Module {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
self.0.fmt(f)
}
}
//------------ OwnedModule ---------------------------------------------------
/// An owned version of the module portion of an rsync URI.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct OwnedModule(String);
//--- Deref, AsRef, Borrow
impl ops::Deref for OwnedModule {
type Target = Module;
fn deref(&self) -> &Module {
self.as_ref()
}
}
impl AsRef<Module> for OwnedModule {
fn as_ref(&self) -> &Module {
unsafe { Module::from_str(self.0.as_str()) }
}
}
impl Borrow<Module> for OwnedModule {
fn borrow(&self) -> &Module {
self.as_ref()
}
}
//--- Display
impl fmt::Display for OwnedModule {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
self.0.fmt(f)
}
}
//------------ ModuleSet -----------------------------------------------------
/// A set of rsync modules.
///
/// This is used in cleanup.
#[derive(Clone, Debug, Default)]
pub struct ModuleSet {
/// The modules under each authority.
authorities: HashMap<String, HashSet<String>>,
}
impl ModuleSet {
/// Adds a the module from a URI to the set.
///
/// Returns whether the module was new to the set.
pub fn add_from_uri(&mut self, uri: &uri::Rsync) -> bool {
self.with_authority(uri, |auth| {
let module_name = uri.module_name();
if auth.contains(module_name) {
false
}
else {
auth.insert(module_name.to_string());
true
}
})
}
fn with_authority<F: FnOnce(&mut HashSet<String>) -> R, R>(
&mut self, uri: &uri::Rsync, op: F,
) -> R {
// If uri.canonical_authority returns a borrowed str, we avoid an
// allocation at the price of a double lookup for a missing
// authority. Given that the map should be relatively small, this
// should be faster.
let auth = uri.canonical_authority();
if let Cow::Borrowed(auth) = auth {
if let Some(value) = self.authorities.get_mut(auth) {
return op(value)
}
}
op(self.authorities.entry(auth.into_owned()).or_default())
}
}