use std::borrow::Cow;
use std::collections::hash_map::{Entry, HashMap};
use std::ffi::OsStr;
use std::fmt::{self, Debug, Display};
use std::fs::{self, File};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use chrono::Utc;
use futures::{future, Stream, StreamExt};
use inotify::EventMask;
use itertools::Itertools;
use log::{debug, error, info, trace};
use rand::Rng;
use serde_json::{Map, Value};
use thiserror::Error;
use crate::handler::{Handler, JobError, JobResult};
use crate::watcher::Watcher;
mod stream_ext {
use futures_core::future::{Future, TryFuture};
use futures_core::stream::Stream;
use crate::try_fold::TryFold;
impl<T: ?Sized> StreamExt2 for T where T: Stream {}
fn assert_future<T, F>(future: F) -> F
where
F: Future<Output = T>,
{
future
}
pub trait StreamExt2: Stream {
fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
where
F: FnMut(T, Self::Item) -> Fut,
Fut: TryFuture<Ok = T>,
Self: Sized,
{
assert_future::<Result<T, Fut::Error>, _>(TryFold::new(self, f, init))
}
}
}
use stream_ext::StreamExt2;
pub struct Director<'a> {
accept: PathBuf,
reject: PathBuf,
fail: PathBuf,
handlers: HashMap<String, &'a dyn Handler>,
}
fn log_file_name<P>(path: &P) -> Cow<str>
where
P: AsRef<Path> + ?Sized,
{
path.as_ref()
.file_name()
.map_or_else(|| Cow::Borrowed("."), OsStr::to_string_lossy)
}
impl Debug for Director<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Director")
.field("accept", &self.accept)
.field("reject", &self.reject)
.field("fail", &self.fail)
.field(
"handlers",
&self.handlers.keys().sorted().collect::<Vec<_>>(),
)
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RunResult {
Continue,
Restart,
Done,
}
impl RunResult {
pub fn should_exit(self) -> bool {
!matches!(self, RunResult::Continue)
}
pub fn is_done(self) -> bool {
matches!(self, RunResult::Done)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Outbox {
Accept,
Reject,
Fail,
}
impl Outbox {
pub fn name(self) -> &'static str {
match self {
Outbox::Accept => "accept",
Outbox::Reject => "reject",
Outbox::Fail => "fail",
}
}
}
impl Display for Outbox {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum DirectorError {
#[error("not a directory: {}", path.display())]
NotADirectory {
path: PathBuf,
},
#[error("duplicate handler: {}", kind)]
DuplicateHandler {
kind: String,
},
#[error("failed to create {} directory: {}", outbox, source)]
CreateDirectory {
outbox: Outbox,
#[source]
source: io::Error,
},
#[error("failed to create file {}: {}", filename.display(), source)]
CreateFile {
filename: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to write file {}: {}", filename.display(), source)]
WriteFile {
filename: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to move job {} into {}: {}", filepath.display(), outbox, source)]
MoveJob {
outbox: Outbox,
filepath: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to list the queue {}: {}", path.display(), source)]
ListQueue {
path: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to watch the queue {}: {}", path.display(), source)]
WatchQueue {
path: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to open job {}: {}", filepath.display(), source)]
OpenJob {
filepath: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to defer job {}: {}", filepath.display(), source)]
CreateDeferJob {
filepath: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to defer job {}: {}", filepath.display(), source)]
WriteDeferJob {
filepath: PathBuf,
#[source]
source: serde_json::Error,
},
#[error("tokio runtime creation error: {}", source)]
RuntimeError {
#[source]
source: std::io::Error,
},
#[error("handler error: {}", source)]
Handler {
#[source]
source: JobError,
},
}
impl DirectorError {
fn not_a_directory(path: &Path) -> Self {
DirectorError::NotADirectory {
path: path.into(),
}
}
fn duplicate_handler(kind: &str) -> Self {
DirectorError::DuplicateHandler {
kind: kind.into(),
}
}
fn create_directory(outbox: Outbox, source: io::Error) -> Self {
DirectorError::CreateDirectory {
outbox,
source,
}
}
fn file_with_ext(path: &Path, ext: &'static str) -> PathBuf {
path.file_name()
.map_or_else(|| ".".into(), |name| Path::new(name).with_extension(ext))
}
fn create_file(filepath: &Path, ext: &'static str, source: io::Error) -> Self {
DirectorError::CreateFile {
filename: Self::file_with_ext(filepath, ext),
source,
}
}
fn write_file(filepath: &Path, ext: &'static str, source: io::Error) -> Self {
DirectorError::WriteFile {
filename: Self::file_with_ext(filepath, ext),
source,
}
}
fn move_job(filepath: PathBuf, outbox: Outbox, source: io::Error) -> Self {
DirectorError::MoveJob {
outbox,
filepath,
source,
}
}
fn list_queue(path: PathBuf, source: io::Error) -> Self {
DirectorError::ListQueue {
path,
source,
}
}
fn watch_queue(path: PathBuf, source: io::Error) -> Self {
DirectorError::WatchQueue {
path,
source,
}
}
fn open_job(filepath: PathBuf, source: io::Error) -> Self {
DirectorError::OpenJob {
filepath,
source,
}
}
fn create_defer_job(filepath: PathBuf, source: io::Error) -> Self {
DirectorError::CreateDeferJob {
filepath,
source,
}
}
fn write_defer_job(filepath: PathBuf, source: serde_json::Error) -> Self {
DirectorError::WriteDeferJob {
filepath,
source,
}
}
fn runtime_error(source: std::io::Error) -> Self {
Self::RuntimeError {
source,
}
}
fn handler(source: JobError) -> Self {
DirectorError::Handler {
source,
}
}
}
type DirectorResult<T> = Result<T, DirectorError>;
impl<'a> Director<'a> {
pub fn new(root: &Path) -> DirectorResult<Self> {
if !root.is_dir() {
return Err(DirectorError::not_a_directory(root));
}
info!(target: "director", "setting up a director in {}", root.display());
let accept_dir = root.join("accept");
let reject_dir = root.join("reject");
let fail_dir = root.join("fail");
fs::create_dir_all(&accept_dir)
.map_err(|err| DirectorError::create_directory(Outbox::Accept, err))?;
fs::create_dir_all(&reject_dir)
.map_err(|err| DirectorError::create_directory(Outbox::Reject, err))?;
fs::create_dir_all(&fail_dir)
.map_err(|err| DirectorError::create_directory(Outbox::Fail, err))?;
Ok(Director {
accept: accept_dir,
reject: reject_dir,
fail: fail_dir,
handlers: HashMap::new(),
})
}
pub fn add_handler<K>(&mut self, kind: K, handler: &'a dyn Handler) -> DirectorResult<()>
where
K: Into<String>,
{
match self.handlers.entry(kind.into()) {
Entry::Occupied(o) => Err(DirectorError::duplicate_handler(o.key())),
Entry::Vacant(v) => {
debug!(target: "director", "adding handler '{}'", v.key());
v.insert(handler);
Ok(())
},
}
}
fn tag(&self, outbox: Outbox, file: &Path) -> DirectorResult<PathBuf> {
let mut target_path = match outbox {
Outbox::Accept => &self.accept,
Outbox::Reject => &self.reject,
Outbox::Fail => &self.fail,
}
.to_path_buf();
target_path.push(
file.file_name()
.expect("expected the input file to have a file name"),
);
let mut stamp_file = File::create(target_path.with_extension("stamp"))
.map_err(|err| DirectorError::create_file(&target_path, "stamp", err))?;
let time = Utc::now();
writeln!(stamp_file, "{}", time)
.map_err(|err| DirectorError::write_file(&target_path, "stamp", err))?;
fs::rename(file, &target_path)
.map_err(|err| DirectorError::move_job(file.into(), outbox, err))?;
Ok(target_path)
}
fn tag_with_reason<R>(&self, outbox: Outbox, file: &Path, reason: R) -> DirectorResult<()>
where
R: fmt::Display,
{
let target_file = self.tag(outbox, file)?;
self.write_reason(&target_file, reason)
}
fn write_reason<R>(&self, target_file: &Path, reason: R) -> DirectorResult<()>
where
R: fmt::Display,
{
let mut reason_file = File::create(target_file.with_extension("reason"))
.map_err(|err| DirectorError::create_file(target_file, "reason", err))?;
writeln!(reason_file, "{}", reason)
.map_err(|err| DirectorError::write_file(target_file, "reason", err))?;
Ok(())
}
pub async fn run<P>(&self, path: P) -> RunResult
where
P: AsRef<Path>,
{
match self.dispatch(path.as_ref()).await {
Ok(res) => res,
Err(err) => {
error!("error when handling {}: {}", log_file_name(&path), err);
RunResult::Done
},
}
}
pub fn path_stream(&self, path: &Path) -> DirectorResult<impl Stream<Item = PathBuf>> {
let watcher =
Watcher::new(path).map_err(|err| DirectorError::watch_queue(path.into(), err))?;
let paths = path
.read_dir()
.map_err(|err| DirectorError::list_queue(path.into(), err))?;
let events = watcher
.events()
.map_err(|err| DirectorError::watch_queue(path.into(), err))?;
let path_entries = paths
.filter_map(|e| e.ok())
.filter_map(|e| {
if e.path().is_dir() {
None
} else {
Some(e.path())
}
})
.sorted();
let path_owned = path.to_path_buf();
Ok(futures::stream::iter(path_entries).chain(
events
.filter_map(|e| future::ready(e.ok()))
.filter(|event| future::ready(!event.mask.contains(EventMask::ISDIR)))
.filter_map(|event| future::ready(event.name))
.map(move |name| path_owned.join(name)),
))
}
pub fn watch_directory(&self, path: &Path) -> DirectorResult<RunResult> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.map_err(DirectorError::runtime_error)?;
let _tokio_ctx = rt.enter();
rt.block_on(self.watch_directory_in(path))
}
pub async fn watch_directory_in(&self, path: &Path) -> DirectorResult<RunResult> {
let stream = self.path_stream(path)?;
let result = stream
.then(|path| self.run(path))
.try_fold(RunResult::Continue, |_, res| {
future::ready(if res.should_exit() { Err(res) } else { Ok(res) })
})
.await;
let (Ok(result) | Err(result)) = result;
Ok(result)
}
async fn dispatch(&self, file: &Path) -> DirectorResult<RunResult> {
trace!(target: "director", "handling file {}", log_file_name(&file));
let ext = file.extension().map(OsStr::to_string_lossy);
let ext_str = ext.as_ref().map(AsRef::as_ref);
if let Some("json") = ext_str {
} else {
return Ok(RunResult::Continue);
}
let event_file =
File::open(file).map_err(|err| DirectorError::open_job(file.into(), err))?;
let mut payload: Value = match serde_json::from_reader(event_file) {
Ok(payload) => payload,
Err(err) => {
info!("failed to read JSON from {}: {}", file.display(), err);
return Ok(RunResult::Continue);
},
};
if !payload.is_object() {
return Ok(RunResult::Continue);
}
let ret = match self.handle(&payload).await? {
JobResult::Accept => {
debug!("accepted {}", log_file_name(&file));
self.tag(Outbox::Accept, file)?;
RunResult::Continue
},
JobResult::Defer(ref reason) => {
debug!("deferring {}: {}", log_file_name(&file), reason);
let rndpart = rand::rng()
.sample_iter(&rand::distr::Alphanumeric)
.map(char::from)
.take(12)
.collect::<String>();
let defer_job_file =
file.with_file_name(format!("{}-{}.json", Utc::now().to_rfc3339(), rndpart));
Self::add_reason_to_object(&mut payload, file.to_string_lossy(), reason);
let mut defer_file = File::create(&defer_job_file)
.map_err(|err| DirectorError::create_defer_job(defer_job_file.clone(), err))?;
serde_json::to_writer(&mut defer_file, &payload)
.map_err(|err| DirectorError::write_defer_job(defer_job_file, err))?;
self.tag_with_reason(Outbox::Reject, file, reason)?;
RunResult::Continue
},
JobResult::Reject(ref reason) => {
debug!("rejecting {}: {}", file.display(), reason);
self.tag_with_reason(Outbox::Reject, file, reason)?;
RunResult::Continue
},
JobResult::Fail(ref reason) => {
debug!("failed {}: {:?}", file.display(), reason);
self.tag_with_reason(Outbox::Fail, file, format!("{:?}", reason))?;
RunResult::Continue
},
JobResult::Restart => {
info!(target: "director", "restarting via {}", log_file_name(&file));
self.tag(Outbox::Accept, file)?;
RunResult::Restart
},
JobResult::Done => {
info!(target: "director", "completed via {}", log_file_name(&file));
self.tag(Outbox::Accept, file)?;
RunResult::Done
},
};
trace!(target: "director", "handled file {}", log_file_name(&file));
Ok(ret)
}
fn add_reason_to_object<N, R>(object: &mut Value, name: N, reason: R)
where
N: Into<String>,
R: Into<String>,
{
let retry_map = object
.as_object_mut()
.expect("expected an object; internal logic failure")
.entry("retry")
.or_insert_with(|| Value::Object(Map::new()));
if !retry_map.is_object() {
*retry_map = Value::Object(Map::new());
}
retry_map
.as_object_mut()
.expect("expected an object; internal logic failure") .insert(name.into(), Value::String(reason.into()));
}
async fn handle(&self, payload: &Value) -> DirectorResult<JobResult> {
let kind = match payload.pointer("/kind") {
Some(Value::String(kind)) => kind,
Some(_) => return Ok(JobResult::reject("'kind' is not a string")),
None => return Ok(JobResult::reject("no 'kind'")),
};
let data = match payload.pointer("/data") {
Some(data) => data,
None => return Ok(JobResult::reject("no 'data'")),
};
let retry_count = match payload.pointer("/retry") {
Some(Value::Object(reasons)) => {
let retry_reasons = reasons
.iter()
.map(|(_, reason)| {
reason
.as_str()
.ok_or_else(|| JobResult::reject("retry reason is not a string"))
})
.collect::<::std::result::Result<Vec<_>, JobResult>>();
match retry_reasons {
Ok(reasons) => reasons.len(),
Err(reject) => return Ok(reject),
}
},
Some(_) => return Ok(JobResult::reject("'retry' is not an object")),
None => 0,
};
match self.handlers.get(kind) {
Some(handler) => {
handler
.handle(kind, data, retry_count)
.await
.map_err(DirectorError::handler)
},
None => Ok(JobResult::Reject(format!("no handler for kind '{}'", kind))),
}
}
}
#[cfg(test)]
mod tests {
use std::path::Path;
use serde_json::json;
use crate::test;
use crate::{Director, Outbox, RunResult};
#[test]
fn test_run_result_should_exit() {
let items = &[
(RunResult::Continue, false),
(RunResult::Restart, true),
(RunResult::Done, true),
];
for (i, b) in items {
assert_eq!(i.should_exit(), *b);
}
}
#[test]
fn test_run_result_is_done() {
let items = &[
(RunResult::Continue, false),
(RunResult::Restart, false),
(RunResult::Done, true),
];
for (i, b) in items {
assert_eq!(i.is_done(), *b);
}
}
#[test]
fn test_outbox_name() {
let items = &[
(Outbox::Accept, "accept"),
(Outbox::Reject, "reject"),
(Outbox::Fail, "fail"),
];
for (i, n) in items {
assert_eq!(i.name(), *n);
}
}
#[test]
fn test_outbox_display() {
let items = &[
(Outbox::Accept, "accept"),
(Outbox::Reject, "reject"),
(Outbox::Fail, "fail"),
];
for (i, n) in items {
assert_eq!(format!("{}", i), *n);
}
}
#[test]
fn test_log_file_name() {
assert_eq!(super::log_file_name("dir/file"), "file");
assert_eq!(super::log_file_name("dir/"), "dir");
assert_eq!(super::log_file_name("dir"), "dir");
assert_eq!(super::log_file_name("."), ".");
assert_eq!(super::log_file_name(".."), ".");
assert_eq!(super::log_file_name(""), ".");
}
#[test]
fn test_director_debug() {
let tempdir = test::test_workspace_dir();
let director = Director::new(tempdir.path()).unwrap();
assert_eq!(format!("{:?}", director), format!("Director {{ accept: \"{path}/accept\", reject: \"{path}/reject\", fail: \"{path}/fail\", handlers: [] }}", path=tempdir.path().display()));
}
#[test]
fn test_error_file_with_ext() {
use super::DirectorError;
let test_file_with_ext = |(path, ext), expect| {
assert_eq!(
DirectorError::file_with_ext(Path::new(path), ext),
Path::new(expect),
);
};
test_file_with_ext(("dir/file", "ext"), "file.ext");
test_file_with_ext(("dir/file.ext", "ext2"), "file.ext2");
test_file_with_ext(("dir/.dotfile", "ext"), ".dotfile.ext");
test_file_with_ext(("dir/", "ext"), "dir.ext");
test_file_with_ext(("", "ext"), ".");
test_file_with_ext((".", "ext"), ".");
test_file_with_ext(("..", "ext"), ".");
}
#[test]
fn test_add_reason_to_object() {
let mut object = json!({});
Director::add_reason_to_object(&mut object, "name", "value");
{
let retry = object
.as_object()
.unwrap()
.get("retry")
.unwrap()
.as_object()
.unwrap();
assert_eq!(retry.len(), 1);
assert_eq!(retry.get("name").unwrap().as_str().unwrap(), "value");
}
Director::add_reason_to_object(&mut object, "name", "overwrite");
{
let retry = object
.as_object()
.unwrap()
.get("retry")
.unwrap()
.as_object()
.unwrap();
assert_eq!(retry.len(), 1);
assert_eq!(retry.get("name").unwrap().as_str().unwrap(), "overwrite");
}
Director::add_reason_to_object(&mut object, "another", "append");
{
let retry = object
.as_object()
.unwrap()
.get("retry")
.unwrap()
.as_object()
.unwrap();
assert_eq!(retry.len(), 2);
assert_eq!(retry.get("name").unwrap().as_str().unwrap(), "overwrite");
assert_eq!(retry.get("another").unwrap().as_str().unwrap(), "append");
}
}
}