use std::borrow::Cow;
use std::collections::hash_map::{Entry, HashMap};
use std::error;
use std::ffi::OsStr;
use std::fmt::{self, Debug, Display};
use std::fs::{self, File};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use crates::chrono::Utc;
use crates::inotify::EventMask;
use crates::itertools::Itertools;
use crates::rand::{self, Rng};
use crates::serde_json::{self, Map, Value};
use crates::thiserror::Error;
use handler::{Handler, HandlerResult};
use watcher::Watcher;
pub struct Director<'a> {
accept: PathBuf,
reject: PathBuf,
fail: PathBuf,
handlers: HashMap<String, &'a dyn Handler>,
}
fn log_file_name<P>(path: &P) -> Cow<str>
where
P: AsRef<Path>,
{
path.as_ref()
.file_name()
.map_or_else(|| Cow::Borrowed("."), OsStr::to_string_lossy)
}
impl<'a> Debug for Director<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Director")
.field("accept", &self.accept)
.field("reject", &self.reject)
.field("fail", &self.fail)
.field("handlers", &self.handlers.keys().collect::<Vec<_>>())
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RunResult {
Continue,
Restart,
Done,
}
impl RunResult {
fn should_exit(self) -> bool {
if let RunResult::Continue = self {
false
} else {
true
}
}
fn is_done(self) -> bool {
if let RunResult::Done = self {
true
} else {
false
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Outbox {
Accept,
Reject,
Fail,
}
impl Outbox {
pub fn name(self) -> &'static str {
match self {
Outbox::Accept => "accept",
Outbox::Reject => "reject",
Outbox::Fail => "fail",
}
}
}
impl Display for Outbox {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}
#[derive(Debug, Error)]
pub enum DirectorError {
#[error("not a directory: {}", path.display())]
NotADirectory {
path: PathBuf,
},
#[error("duplicate handler: {}", kind)]
DuplicateHandler {
kind: String,
},
#[error("failed to create {} directory: {}", outbox, source)]
CreateDirectory {
outbox: Outbox,
#[source]
source: io::Error,
},
#[error("failed to create file {}: {}", filename.display(), source)]
CreateFile {
filename: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to write file {}: {}", filename.display(), source)]
WriteFile {
filename: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to move job {} into {}: {}", filepath.display(), outbox, source)]
MoveJob {
outbox: Outbox,
filepath: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to list the queue {}: {}", path.display(), source)]
ListQueue {
path: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to watch the queue {}: {}", path.display(), source)]
WatchQueue {
path: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to open job {}: {}", filepath.display(), source)]
OpenJob {
filepath: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to defer job {}: {}", filepath.display(), source)]
CreateDeferJob {
filepath: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to defer job {}: {}", filepath.display(), source)]
WriteDeferJob {
filepath: PathBuf,
#[source]
source: serde_json::Error,
},
#[error("handler error: {}", source)]
Handler {
#[source]
source: Box<dyn error::Error + Send + Sync>,
},
#[doc(hidden)]
#[error("unreachable...")]
_NonExhaustive,
}
impl DirectorError {
fn not_a_directory(path: &Path) -> Self {
DirectorError::NotADirectory {
path: path.into(),
}
}
fn duplicate_handler(kind: &str) -> Self {
DirectorError::DuplicateHandler {
kind: kind.into(),
}
}
fn create_directory(outbox: Outbox, source: io::Error) -> Self {
DirectorError::CreateDirectory {
outbox,
source,
}
}
fn file_with_ext(path: &Path, ext: &'static str) -> PathBuf {
path.file_name()
.map_or_else(|| ".".into(), |name| Path::new(name).with_extension(ext))
}
fn create_file(filepath: &Path, ext: &'static str, source: io::Error) -> Self {
DirectorError::CreateFile {
filename: Self::file_with_ext(filepath, ext),
source,
}
}
fn write_file(filepath: &Path, ext: &'static str, source: io::Error) -> Self {
DirectorError::WriteFile {
filename: Self::file_with_ext(filepath, ext),
source,
}
}
fn move_job(filepath: PathBuf, outbox: Outbox, source: io::Error) -> Self {
DirectorError::MoveJob {
outbox,
filepath,
source,
}
}
fn list_queue(path: PathBuf, source: io::Error) -> Self {
DirectorError::ListQueue {
path,
source,
}
}
fn watch_queue(path: PathBuf, source: io::Error) -> Self {
DirectorError::WatchQueue {
path,
source,
}
}
fn open_job(filepath: PathBuf, source: io::Error) -> Self {
DirectorError::OpenJob {
filepath,
source,
}
}
fn create_defer_job(filepath: PathBuf, source: io::Error) -> Self {
DirectorError::CreateDeferJob {
filepath,
source,
}
}
fn write_defer_job(filepath: PathBuf, source: serde_json::Error) -> Self {
DirectorError::WriteDeferJob {
filepath,
source,
}
}
fn handler(source: Box<dyn error::Error + Send + Sync>) -> Self {
DirectorError::Handler {
source,
}
}
}
type DirectorResult<T> = Result<T, DirectorError>;
impl<'a> Director<'a> {
pub fn new(root: &Path) -> DirectorResult<Self> {
if !root.is_dir() {
return Err(DirectorError::not_a_directory(root));
}
info!(target: "director", "setting up a director in {}", root.display());
let accept_dir = root.join("accept");
let reject_dir = root.join("reject");
let fail_dir = root.join("fail");
fs::create_dir_all(&accept_dir)
.map_err(|err| DirectorError::create_directory(Outbox::Accept, err))?;
fs::create_dir_all(&reject_dir)
.map_err(|err| DirectorError::create_directory(Outbox::Reject, err))?;
fs::create_dir_all(&fail_dir)
.map_err(|err| DirectorError::create_directory(Outbox::Fail, err))?;
Ok(Director {
accept: accept_dir,
reject: reject_dir,
fail: fail_dir,
handlers: HashMap::new(),
})
}
pub fn add_handler<K>(&mut self, kind: K, handler: &'a dyn Handler) -> DirectorResult<()>
where
K: Into<String>,
{
match self.handlers.entry(kind.into()) {
Entry::Occupied(o) => Err(DirectorError::duplicate_handler(o.key())),
Entry::Vacant(v) => {
debug!(target: "director", "adding handler '{}'", v.key());
v.insert(handler);
Ok(())
},
}
}
fn tag(&self, outbox: Outbox, file: &Path) -> DirectorResult<PathBuf> {
let mut target_path = match outbox {
Outbox::Accept => &self.accept,
Outbox::Reject => &self.reject,
Outbox::Fail => &self.fail,
}
.to_path_buf();
target_path.push(
file.file_name()
.expect("expected the input file to have a file name"),
);
let mut stamp_file = File::create(target_path.with_extension("stamp"))
.map_err(|err| DirectorError::create_file(&target_path, "stamp", err))?;
let time = Utc::now();
writeln!(stamp_file, "{}", time)
.map_err(|err| DirectorError::write_file(&target_path, "stamp", err))?;
fs::rename(file, &target_path)
.map_err(|err| DirectorError::move_job(file.into(), outbox, err))?;
Ok(target_path)
}
fn tag_with_reason<R>(&self, outbox: Outbox, file: &Path, reason: R) -> DirectorResult<()>
where
R: fmt::Display,
{
let target_file = self.tag(outbox, file)?;
self.write_reason(&target_file, reason)
}
fn write_reason<R>(&self, target_file: &Path, reason: R) -> DirectorResult<()>
where
R: fmt::Display,
{
let mut reason_file = File::create(target_file.with_extension("reason"))
.map_err(|err| DirectorError::create_file(target_file, "reason", err))?;
writeln!(reason_file, "{}", reason)
.map_err(|err| DirectorError::write_file(target_file, "reason", err))?;
Ok(())
}
pub fn run_one<P>(&self, path: P) -> RunResult
where
P: AsRef<Path>,
{
match self.dispatch(path.as_ref()) {
Ok(res) => res,
Err(err) => {
error!("error when handling {}: {}", log_file_name(&path), err);
RunResult::Done
},
}
}
pub fn run<I, P>(&self, paths: I) -> RunResult
where
I: IntoIterator<Item = P>,
P: AsRef<Path>,
{
let mut result = RunResult::Continue;
for path in paths {
let one_result = self.run_one(path);
if one_result > result {
result = one_result;
}
if result.is_done() {
break;
}
}
result
}
pub fn watch_directory(&self, path: &Path) -> DirectorResult<RunResult> {
let mut watcher = {
let paths = path
.read_dir()
.map_err(|err| DirectorError::list_queue(path.into(), err))?;
let watcher =
Watcher::new(path).map_err(|err| DirectorError::watch_queue(path.into(), err))?;
let path_entries = paths
.filter_map(|e| e.ok())
.filter_map(|e| {
if e.path().is_dir() {
None
} else {
Some(e.path())
}
})
.sorted();
let result = self.run(path_entries);
if result.should_exit() {
return Ok(result);
}
watcher
};
let loop_result;
loop {
let events = watcher
.events()
.map_err(|err| DirectorError::watch_queue(path.into(), err))?;
let path_entries = events
.filter_map(|event| {
if event.mask.contains(EventMask::ISDIR) {
None
} else {
event.name.map(|name| path.join(name))
}
})
.sorted();
let result = self.run(path_entries);
if result.should_exit() {
loop_result = result;
break;
}
}
Ok(loop_result)
}
fn dispatch(&self, file: &Path) -> DirectorResult<RunResult> {
trace!(target: "director", "handling file {}", log_file_name(&file));
let ext = file.extension().map(OsStr::to_string_lossy);
let ext_str = ext.as_ref().map(AsRef::as_ref);
if let Some("json") = ext_str {
} else {
return Ok(RunResult::Continue);
}
let event_file =
File::open(file).map_err(|err| DirectorError::open_job(file.into(), err))?;
let mut payload: Value = match serde_json::from_reader(event_file) {
Ok(payload) => payload,
Err(err) => {
info!("failed to read JSON from {}: {}", file.display(), err);
return Ok(RunResult::Continue);
},
};
if !payload.is_object() {
return Ok(RunResult::Continue);
}
let ret = match self.handle(&payload)? {
HandlerResult::Accept => {
debug!("accepted {}", log_file_name(&file));
self.tag(Outbox::Accept, file)?;
RunResult::Continue
},
HandlerResult::Defer(ref reason) => {
debug!("deferring {}: {}", log_file_name(&file), reason);
let rndpart = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(12)
.collect::<String>();
let defer_job_file =
file.with_file_name(format!("{}-{}.json", Utc::now().to_rfc3339(), rndpart));
Self::add_reason_to_object(&mut payload, file.to_string_lossy(), reason);
let mut defer_file = File::create(&defer_job_file).map_err(|err| {
DirectorError::create_defer_job(defer_job_file.clone(), err)
})?;
serde_json::to_writer(&mut defer_file, &payload)
.map_err(|err| DirectorError::write_defer_job(defer_job_file, err))?;
self.tag_with_reason(Outbox::Reject, file, reason)?;
RunResult::Continue
},
HandlerResult::Reject(ref reason) => {
debug!("rejecting {}: {}", file.display(), reason);
self.tag_with_reason(Outbox::Reject, file, reason)?;
RunResult::Continue
},
HandlerResult::Fail(ref reason) => {
debug!("failed {}: {:?}", file.display(), reason);
self.tag_with_reason(Outbox::Fail, file, format!("{:?}", reason))?;
RunResult::Continue
},
HandlerResult::Restart => {
info!(target: "director", "restarting via {}", log_file_name(&file));
self.tag(Outbox::Accept, file)?;
RunResult::Restart
},
HandlerResult::Done => {
info!(target: "director", "completed via {}", log_file_name(&file));
self.tag(Outbox::Accept, file)?;
RunResult::Done
},
};
trace!(target: "director", "handled file {}", log_file_name(&file));
Ok(ret)
}
fn add_reason_to_object<N, R>(object: &mut Value, name: N, reason: R)
where
N: Into<String>,
R: Into<String>,
{
let retry_map = object
.as_object_mut()
.expect("expected an object; internal logic failure")
.entry("retry")
.or_insert_with(|| Value::Object(Map::new()));
if !retry_map.is_object() {
*retry_map = Value::Object(Map::new());
}
retry_map
.as_object_mut()
.expect("expected an object; internal logic failure") .insert(name.into(), Value::String(reason.into()));
}
fn handle(&self, payload: &Value) -> DirectorResult<HandlerResult> {
let kind = match payload.pointer("/kind") {
Some(&Value::String(ref kind)) => kind,
Some(_) => return Ok(HandlerResult::reject("'kind' is not a string")),
None => return Ok(HandlerResult::reject("no 'kind'")),
};
let data = match payload.pointer("/data") {
Some(data) => data,
None => return Ok(HandlerResult::reject("no 'data'")),
};
let retry_reasons = match payload.pointer("/retry") {
Some(&Value::Object(ref reasons)) => {
let retry_reasons = reasons
.iter()
.map(|(_, reason)| {
reason
.as_str()
.map(Into::into)
.ok_or_else(|| HandlerResult::reject("retry reason is not a string"))
})
.collect::<::std::result::Result<Vec<_>, HandlerResult>>();
match retry_reasons {
Ok(reasons) => reasons,
Err(reject) => return Ok(reject),
}
},
Some(_) => return Ok(HandlerResult::reject("'retry' is not an object")),
None => vec![],
};
match self.handlers.get(kind) {
Some(handler) => {
handler
.handle_retry(kind, data, retry_reasons)
.map_err(DirectorError::handler)
},
None => {
Ok(HandlerResult::Reject(format!(
"no handler for kind '{}'",
kind,
)))
},
}
}
}