use std::cmp::{min, max};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Instant;
use abstract_ns::Name;
use proto::{ReceivedImage, AbortedImage};
use cluster::addr::AddrCell;
use cluster::future::UploadOk;
use cluster::error::ErrorKind;
use cluster::config::Config;
use VPath;
use machine_id::MachineId;
#[derive(Debug)]
struct Bookkeeping {
accepted_ips: HashSet<SocketAddr>,
discovered_servers: HashMap<MachineId, String>,
done_ips: HashSet<SocketAddr>,
done_servers: HashMap<MachineId, String>,
aborted_ips: HashMap<SocketAddr, String>,
aborted_ids: HashMap<MachineId, String>,
aborted_hostnames: HashMap<String, String>,
rejected_no_config: HashSet<SocketAddr>,
rejected_ips: HashMap<SocketAddr, String>,
}
#[derive(Debug)]
pub struct Stats {
pub(crate) cluster_name: Vec<Name>,
pub(crate) started: Instant,
pub(crate) path: VPath,
weak_errors: bool,
book: RwLock<Bookkeeping>,
total_responses: AtomicUsize,
}
#[derive(Debug)]
pub struct ProgressOneLiner<'a>(&'a Stats);
#[derive(Debug)]
pub struct UploadName<'a>(pub(crate) &'a Stats);
impl Stats {
pub(crate) fn new(cluster_name: &Vec<Name>, path: &VPath, weak: bool)
-> Stats
{
Stats {
cluster_name: cluster_name.clone(),
started: Instant::now(),
path: path.clone(),
weak_errors: weak,
book: RwLock::new(Bookkeeping {
discovered_servers: HashMap::new(),
accepted_ips: HashSet::new(),
done_ips: HashSet::new(),
done_servers: HashMap::new(),
aborted_ips: HashMap::new(),
aborted_ids: HashMap::new(),
aborted_hostnames: HashMap::new(),
rejected_ips: HashMap::new(),
rejected_no_config: HashSet::new(),
}),
total_responses: AtomicUsize::new(0),
}
}
pub(crate) fn received_image(&self, addr: SocketAddr, info: &ReceivedImage)
{
let mut book = self.book.write()
.expect("bookkeeping is not poisoned");
if !info.forwarded {
book.done_ips.insert(addr);
}
book.done_servers.insert(
info.machine_id.clone(), info.hostname.clone());
book.discovered_servers.insert(
info.machine_id.clone(), info.hostname.clone());
}
pub(crate) fn aborted_image(&self, addr: SocketAddr, info: &AbortedImage) {
let mut book = self.book.write()
.expect("bookkeeping is not poisoned");
if !info.forwarded {
book.aborted_ips.insert(addr, info.reason.clone());
}
book.aborted_ids.insert(
info.machine_id.clone(), info.reason.clone());
book.aborted_hostnames.insert(
info.hostname.clone(), info.reason.clone());
}
pub(crate) fn add_response(&self, source: SocketAddr,
mut accepted: bool, reject_reason: Option<String>,
hosts: HashMap<MachineId, String>)
-> bool
{
let mut book = self.book.write()
.expect("bookkeeping is not poisoned");
match (accepted, reject_reason.as_ref().map(|x| &x[..])) {
(false, Some("already_exists"))
| (false, Some("already_uploading_different_version"))
if self.weak_errors => {
warn!("Rejected because of {:?}, but that's fine",
reject_reason);
let res = book.accepted_ips.insert(source);
if res {
self.total_responses.fetch_add(1, Ordering::Relaxed);
}
book.done_ips.insert(source);
accepted = true;
}
(false, Some("no_config")) => {
info!("Connection {} rejects directory", source);
if book.rejected_no_config.insert(source) {
self.total_responses.fetch_add(1, Ordering::Relaxed);
}
}
(false, _) => {
warn!("Rejected because of {:?} try {:?}",
reject_reason, hosts);
let res = book.rejected_ips.insert(source,
reject_reason.unwrap_or_else(|| String::from("unknown")));
if res.is_none() {
self.total_responses.fetch_add(1, Ordering::Relaxed);
}
}
(true, _) => {
debug!("Accepted from {}", source);
let res = book.accepted_ips.insert(source);
if res {
self.total_responses.fetch_add(1, Ordering::Relaxed);
}
}
}
for (id, val) in hosts {
book.discovered_servers.insert(id, val);
}
return accepted;
}
pub(crate) fn total_responses(&self) -> u32 {
self.total_responses.load(Ordering::Relaxed) as u32
}
pub(crate) fn is_rejected(&self, addr: SocketAddr) -> bool {
let book = self.book.read()
.expect("bookkeeping is not poisoned");
return book.rejected_ips.contains_key(&addr) ||
book.rejected_no_config.contains(&addr);
}
pub(crate) fn fmt_downloaded(&self, f: &mut fmt::Formatter) -> fmt::Result
{
let book = self.book.read()
.expect("bookkeeping is not poisoned");
f.write_str("fetched from ")?;
if book.done_servers.len() > 3 {
write!(f, "{} hosts", book.done_servers.len())?;
} else {
fmt_iter(f, book.done_servers.values())?;
}
if book.discovered_servers.len() > book.done_servers.len() {
write!(f, " (out of {} hosts)", book.discovered_servers.len())?;
}
if !book.rejected_ips.is_empty() {
f.write_str(", rejected by ")?;
fmt_iter(f,
book.rejected_ips.iter()
.map(|(k, v)| format!("{}: {}", k, v)))?;
}
if !book.aborted_ids.is_empty() {
f.write_str(", aborted by ")?;
fmt_iter(f,
book.aborted_hostnames.iter()
.map(|(k, v)| format!("{}: {}", k, v)))?;
}
Ok(())
}
pub fn one_line_progress(&self) -> ProgressOneLiner {
ProgressOneLiner(&self)
}
}
fn fmt_iter<I: IntoIterator>(f: &mut fmt::Formatter, iter: I) -> fmt::Result
where I::Item: fmt::Display,
{
let mut iter = iter.into_iter();
match iter.next() {
Some(x) => write!(f, "{}", x)?,
None => return Ok(()),
}
for item in iter {
write!(f, ", {}", item)?;
}
Ok(())
}
fn is_superset(done: &HashMap<MachineId, String>,
discovered: &HashMap<MachineId, String>)
-> bool
{
discovered.keys().all(|x| done.contains_key(x))
}
pub(in cluster) fn check(stats: &Arc<Stats>, config: &Config,
initial_addr: &AddrCell, early_timeout_timed_out: bool,
no_candidates: bool)
-> Option<Result<UploadOk, ErrorKind>>
{
let book = stats.book.read()
.expect("bookkeeping is not poisoned");
trace!("Current state {:?}{}", book,
if early_timeout_timed_out { "" } else { " early" });
if book.accepted_ips.is_empty() &&
initial_addr.is_done() && no_candidates &&
(!book.rejected_ips.is_empty() || !book.rejected_no_config.is_empty())
{
let all_rejected = initial_addr.get().addresses_at(0)
.all(|a| book.rejected_ips.contains_key(&a) ||
book.aborted_ips.contains_key(&a) ||
book.rejected_no_config.contains(&a));
if all_rejected {
return Some(Err(ErrorKind::Rejected));
}
}
if early_timeout_timed_out {
let fract_hosts = (
book.discovered_num() as f32 *
config.early_fraction).ceil() as usize;
let hosts = min(book.discovered_num() as usize,
max(config.early_hosts as usize, fract_hosts));
if hosts > 0 && book.done_servers.len() >= hosts {
if book.rejected_ips.len() > 0 || book.aborted_ids.len() > 0 {
return Some(Err(ErrorKind::Rejected));
} else {
return Some(Ok(UploadOk::new(stats)));
}
}
} else if
book.done_ips.is_superset(&book.accepted_ips) &&
is_superset(&book.done_servers, &book.discovered_servers)
{
if book.done_servers.len() > 0 {
if book.rejected_ips.len() > 0 || book.aborted_ips.len() > 0 {
return Some(Err(ErrorKind::Rejected));
} else {
return Some(Ok(UploadOk::new(stats)));
}
}
}
return None;
}
impl<'a> fmt::Display for UploadName<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.0.cluster_name.len() == 1 {
f.write_str(self.0.cluster_name[0].as_ref())?;
} else {
write!(f, "({} hosts)", self.0.cluster_name.len())?;
}
write!(f, ":{}", self.0.path)
}
}
impl<'a> fmt::Display for ProgressOneLiner<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let book = self.0.book.read()
.expect("bookkeeping is not poisoned");
let done = book.done_servers.len() as u32;
let disc = book.discovered_num();
let percent = if disc > 0 {
(done as f32 / disc as f32) * 100.
} else {
0.
};
write!(f, "Progress of {}: {:.0}%", UploadName(self.0), percent)?;
if !book.rejected_ips.is_empty() {
f.write_str(", rejected by ")?;
fmt_iter(f,
book.rejected_ips.iter()
.map(|(k, v)| format!("{}: {}", k, v)))?;
}
if !book.aborted_ids.is_empty() {
f.write_str(", aborted by ")?;
fmt_iter(f,
book.aborted_hostnames.iter()
.map(|(k, v)| format!("{}: {}", k, v)))?;
}
if disc > done {
write!(f, ", {} servers left: ", disc - done)?;
let mut count = 0;
for (ref id, ref name) in &book.discovered_servers {
if !book.done_servers.contains_key(id) {
if count > 0 {
f.write_str(", ")?;
}
name.fmt(f)?;
count += 1;
if count >= 3 {
break;
}
}
}
if count < disc - done {
f.write_str("...")?;
}
}
Ok(())
}
}
impl Bookkeeping {
fn discovered_num(&self) -> u32 {
let explicit = self.discovered_servers.len() as u32;
if self.accepted_ips.len() == 1 &&
self.rejected_ips.is_empty() && self.rejected_no_config.is_empty()
{
return explicit + 1;
} else {
return explicit;
}
}
}