use std::{fmt, fs, io, ops};
use std::borrow::{Borrow, Cow, ToOwned};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::process::{Command as StdCommand, ExitStatus, Stdio};
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use bytes::Bytes;
use futures::TryFutureExt;
use log::{debug, error, info, warn};
use rpki::uri;
use tokio::process::Command as AsyncCommand;
use crate::config::Config;
use crate::error::Failed;
use crate::metrics::{Metrics, RsyncModuleMetrics};
use crate::utils::fatal;
use crate::utils::uri::UriExt;
#[derive(Debug)]
pub struct Collector {
working_dir: WorkingDir,
command: Option<RsyncCommand>,
filter_dubious: bool,
}
impl Collector {
pub fn init(config: &Config) -> Result<(), Failed> {
let _ = Self::create_working_dir(config)?;
Ok(())
}
fn create_working_dir(config: &Config) -> Result<PathBuf, Failed> {
let working_dir = config.cache_dir.join("rsync");
if config.fresh {
if let Err(err) = fs::remove_dir_all(&working_dir) {
if err.kind() != io::ErrorKind::NotFound {
error!(
"Failed to delete rsync working directory at {}: {}",
working_dir.display(), err
);
return Err(Failed)
}
}
}
if let Err(err) = fs::create_dir_all(&working_dir) {
error!(
"Failed to create rsync working directory {}: {}.",
working_dir.display(), err
);
return Err(Failed);
}
Ok(working_dir)
}
pub fn new(config: &Config) -> Result<Option<Self>, Failed> {
if config.disable_rsync {
Ok(None)
}
else {
Ok(Some(Collector {
working_dir: WorkingDir::new(
Self::create_working_dir(config)?
),
command: Some(RsyncCommand::new(config)?),
filter_dubious: !config.allow_dubious_hosts
}))
}
}
pub fn ignite(&mut self) -> Result<(), Failed> {
Ok(())
}
pub fn start(&self) -> Run {
Run::new(self)
}
pub fn dump(&self, dir: &Path) -> Result<(), Failed> {
let target = dir.join("rsync");
debug!("Dumping rsync collector content to {}", target.display());
if let Err(err) = fs::remove_dir_all(&target) {
if err.kind() != io::ErrorKind::NotFound {
error!(
"Failed to delete directory {}: {}",
dir.display(), err
);
return Err(Failed)
}
}
self.dump_dir(&self.working_dir.base, &target)?;
debug!("Rsync collector dump complete.");
Ok(())
}
fn dump_dir(&self, source: &Path, target: &Path) -> Result<(), Failed> {
let read_dir = match fs::read_dir(source) {
Ok(read_dir) => read_dir,
Err(err) => {
error!(
"Failed to open directory {}: {}", source.display(), err
);
return Err(Failed)
}
};
for item in read_dir {
let item = match item {
Ok(item) => item,
Err(err) => {
error!(
"Failed to read directory {}: {}",
source.display(), err
);
return Err(Failed)
}
};
let file_type = match item.file_type() {
Ok(file_type) => file_type,
Err(err) => {
error!(
"Failed to file type for {}: {}",
item.path().display(), err
);
return Err(Failed)
}
};
if file_type.is_dir() {
let target = target.join(item.file_name());
if let Err(err) = fs::create_dir_all(&target) {
error!(
"Failed to create directory {}: {}",
target.display(), err
);
return Err(Failed);
}
self.dump_dir(&item.path(), &target)?;
}
else if file_type.is_file() {
let target = target.join(item.file_name());
if let Err(err) = fs::copy( &item.path(), &target) {
error!(
"Failed to copy {} to {}: {}",
item.path().display(), target.display(), err
);
return Err(Failed)
}
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct Run<'a> {
collector: &'a Collector,
updated: RwLock<HashSet<OwnedModule>>,
running: RwLock<HashMap<OwnedModule, Arc<Mutex<()>>>>,
metrics: Mutex<Vec<RsyncModuleMetrics>>,
}
impl<'a> Run<'a> {
fn new(collector: &'a Collector) -> Self {
Run {
collector,
updated: Default::default(),
running: Default::default(),
metrics: Default::default(),
}
}
pub fn was_updated(&self, uri: &uri::Rsync) -> bool {
self.updated.read().unwrap().contains(Module::from_uri(uri).as_ref())
}
pub fn load_module(&self, uri: &uri::Rsync) {
let command = match self.collector.command.as_ref() {
Some(command) => command,
None => return,
};
let module = Module::from_uri(uri);
if self.updated.read().unwrap().contains(module.as_ref()) {
return
}
let mutex = {
self.running.write().unwrap()
.entry(module.clone().into_owned()).or_default()
.clone()
};
let _lock = mutex.lock().unwrap();
if self.updated.read().unwrap().contains(module.as_ref()) {
return
}
if self.collector.filter_dubious && uri.has_dubious_authority() {
warn!(
"{}: Dubious host name. Skipping update.",
module
)
}
else {
let metrics = command.update(
module.as_ref(),
&self.collector.working_dir.module_path(module.as_ref())
);
self.metrics.lock().unwrap().push(metrics);
}
self.running.write().unwrap().remove(module.as_ref());
self.updated.write().unwrap().insert(module.into_owned());
}
pub fn load_file(
&self,
uri: &uri::Rsync,
) -> Option<Bytes> {
let path = self.collector.working_dir.uri_path(uri);
match fs::File::open(&path) {
Ok(mut file) => {
let mut data = Vec::new();
if let Err(err) = io::Read::read_to_end(&mut file, &mut data) {
error!(
"Failed to read file '{}': {}",
path.display(),
err
);
None
}
else {
Some(data.into())
}
}
Err(err) => {
if err.kind() == io::ErrorKind::NotFound {
info!("{}: not found in local repository", uri);
} else {
error!(
"Failed to open file '{}': {}",
path.display(), err
);
}
None
}
}
}
pub fn cleanup(&self, retain: &mut ModuleSet) -> Result<(), Failed> {
if self.collector.command.is_none() {
return Ok(())
}
for module in self.updated.read().unwrap().iter() {
retain.add_from_uri(&module.to_uri());
}
for entry in fatal::read_dir(
&self.collector.working_dir.base
)? {
let entry = entry?;
let keep = match entry.file_name().to_str() {
Some(name) => {
match retain.authorities.get(name) {
Some(modules) => self.cleanup_host(&entry, modules)?,
None => false,
}
}
None => false
};
if !keep {
fatal::remove_dir_all(entry.path())?;
}
}
Ok(())
}
fn cleanup_host(
&self, entry: &fatal::DirEntry, retain: &HashSet<String>
) -> Result<bool, Failed> {
if entry.is_file() {
fatal::remove_file(entry.path())?;
return Ok(false)
}
else if !entry.is_dir() {
return Ok(false)
}
let mut keep_host = false;
for entry in fatal::read_dir(entry.path())? {
let entry = entry?;
let keep = match entry.file_name().to_str() {
Some(name) => retain.contains(name),
None => false
};
if !keep {
fatal::remove_dir_all(entry.path())?;
}
else {
keep_host = true;
}
}
Ok(keep_host)
}
pub fn done(self, metrics: &mut Metrics) {
metrics.rsync = self.metrics.into_inner().unwrap();
}
}
#[derive(Debug)]
struct RsyncCommand {
command: String,
args: Vec<String>,
timeout: Duration,
}
impl RsyncCommand {
pub fn new(config: &Config) -> Result<Self, Failed> {
let command = config.rsync_command.clone();
let output = match StdCommand::new(&command).arg("-h").output() {
Ok(output) => output,
Err(err) => {
error!(
"Failed to run rsync: {}",
err
);
return Err(Failed)
}
};
if !output.status.success() {
error!(
"Running rsync failed with output: \n{}",
String::from_utf8_lossy(&output.stderr)
);
return Err(Failed);
}
let args = match config.rsync_args {
Some(ref args) => args.clone(),
None => {
let mut args = Vec::new();
let has_contimeout =
output.stdout.windows(12)
.any(|window| window == b"--contimeout");
if has_contimeout {
args.push("--contimeout=10".into());
}
if let Some(max_size) = config.max_object_size {
args.push(format!("--max-size={}", max_size));
}
args
}
};
Ok(RsyncCommand {
command,
args,
timeout: config.rsync_timeout,
})
}
pub fn update(
&self,
source: &Module,
destination: &Path
) -> RsyncModuleMetrics {
let start = SystemTime::now();
let status = self.command(
source, destination
).and_then(|cmd| self.run(source, cmd));
RsyncModuleMetrics {
module: source.to_uri(),
status,
duration: SystemTime::now().duration_since(start),
}
}
fn run(
&self,
source: &Module,
mut command: AsyncCommand
) -> Result<ExitStatus, io::Error> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()?;
runtime.block_on(async {
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
command.kill_on_drop(true);
let mut child = command.spawn()?;
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let (status, stdout, stderr) = tokio::try_join!(
tokio::time::timeout(
self.timeout, child.wait()
).map_err(|_| {
warn!("{}: timed out.", source);
io::Error::new(
io::ErrorKind::TimedOut,
"rsync process reached time out"
)
}),
async {
let mut target = Vec::new();
if let Some(mut stdout) = stdout {
tokio::io::copy(&mut stdout, &mut target).await?;
}
Ok(target)
},
async {
let mut target = Vec::new();
if let Some(mut stderr) = stderr {
tokio::io::copy(&mut stderr, &mut target).await?;
}
Ok(target)
},
)?;
if !stderr.is_empty() {
String::from_utf8_lossy(&stderr).lines().for_each(|l| {
warn!("{}: {}", source, l);
})
}
if !stdout.is_empty() {
String::from_utf8_lossy(&stdout).lines().for_each(|l| {
info!("{}: {}", source, l);
})
}
if let Err(ref err) = status {
warn!("{}: {}", source, err);
}
status
})
}
fn command(
&self,
source: &Module,
destination: &Path
) -> Result<AsyncCommand, io::Error> {
info!("rsyncing from {}.", source);
fs::create_dir_all(destination)?;
let destination = match Self::format_destination(destination) {
Ok(some) => some,
Err(_) => {
error!(
"rsync: illegal destination path {}.",
destination.display()
);
return Err(io::Error::new(
io::ErrorKind::Other,
"illegal destination path"
));
}
};
let mut cmd = AsyncCommand::new(&self.command);
for item in &self.args {
cmd.arg(item);
}
cmd.arg("-rltz")
.arg("--delete")
.arg(source.to_string())
.arg(destination);
debug!("{}: Running command {:?}", source, cmd);
Ok(cmd)
}
#[cfg(not(windows))]
#[allow(clippy::unnecessary_wraps)]
fn format_destination(path: &Path) -> Result<String, Failed> {
let mut destination = format!("{}", path.display());
if !destination.ends_with('/') {
destination.push('/')
}
Ok(destination)
}
#[cfg(windows)]
fn format_destination(path: &Path) -> Result<String, Failed> {
use std::path::{Component, Prefix};
let mut destination = String::new();
for component in path.components() {
match component {
Component::Prefix(prefix) => {
match prefix.kind() {
Prefix::UNC(server, share) => {
let (server, share) = match (server.to_str(),
share.to_str()) {
(Some(srv), Some(shr)) => (srv, shr),
_ => return Err(Failed)
};
destination.push_str(server);
destination.push('/');
destination.push_str(share);
}
Prefix::Disk(disk) => {
let disk = if disk.is_ascii() {
(disk as char).to_ascii_lowercase()
}
else {
return Err(Failed)
};
destination.push_str("/cygdrive/");
destination.push(disk);
}
_ => return Err(Failed)
}
}
Component::CurDir | Component::RootDir => {
continue
}
Component::ParentDir => {
destination.push_str("..");
}
Component::Normal(s) => {
match s.to_str() {
Some(s) => destination.push_str(s),
None => return Err(Failed)
}
}
}
destination.push('/');
}
Ok(destination)
}
}
#[derive(Clone, Debug)]
struct WorkingDir {
base: PathBuf
}
impl WorkingDir {
pub fn new(base: PathBuf) -> Self {
WorkingDir { base }
}
pub fn module_path(&self, module: &Module) -> PathBuf {
let mut res = self.base.clone();
res.push(&module.0[8..]);
res
}
fn uri_path(&self, uri: &uri::Rsync) -> PathBuf {
let mut res = self.base.clone();
res.push(uri.canonical_authority().as_ref());
res.push(uri.module_name());
res.push(uri.path());
res
}
}
#[derive(Debug, Eq, Hash, PartialEq)]
pub struct Module(str);
impl Module {
unsafe fn from_str(s: &str) -> &Module {
&*(s as *const str as *const Module)
}
pub fn from_uri(uri: &uri::Rsync) -> Cow<Module> {
match uri.canonical_module() {
Cow::Borrowed(s) => {
Cow::Borrowed(unsafe { Module::from_str(s) })
}
Cow::Owned(s) => Cow::Owned(OwnedModule(s))
}
}
pub fn to_uri(&self) -> uri::Rsync {
uri::Rsync::from_str(&self.0).unwrap()
}
}
impl ToOwned for Module {
type Owned = OwnedModule;
fn to_owned(&self) -> Self::Owned {
OwnedModule(self.0.to_owned())
}
}
impl fmt::Display for Module {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
self.0.fmt(f)
}
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct OwnedModule(String);
impl ops::Deref for OwnedModule {
type Target = Module;
fn deref(&self) -> &Module {
self.as_ref()
}
}
impl AsRef<Module> for OwnedModule {
fn as_ref(&self) -> &Module {
unsafe { Module::from_str(self.0.as_str()) }
}
}
impl Borrow<Module> for OwnedModule {
fn borrow(&self) -> &Module {
self.as_ref()
}
}
impl fmt::Display for OwnedModule {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
self.0.fmt(f)
}
}
#[derive(Clone, Debug, Default)]
pub struct ModuleSet {
authorities: HashMap<String, HashSet<String>>,
}
impl ModuleSet {
pub fn add_from_uri(&mut self, uri: &uri::Rsync) -> bool {
self.with_authority(uri, |auth| {
let module_name = uri.module_name();
if auth.contains(module_name) {
false
}
else {
auth.insert(module_name.to_string());
true
}
})
}
fn with_authority<F: FnOnce(&mut HashSet<String>) -> R, R>(
&mut self, uri: &uri::Rsync, op: F,
) -> R {
let auth = uri.canonical_authority();
if let Cow::Borrowed(auth) = auth {
if let Some(value) = self.authorities.get_mut(auth) {
return op(value)
}
}
op(self.authorities.entry(auth.into_owned()).or_default())
}
}