use std::env;
use std::fs::{self, File, Permissions};
use std::io::Write;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use async_trait::async_trait;
use itertools::Itertools;
use log::{Level, LevelFilter, Log, Metadata, Record};
use serde_json::{json, Value};
use tempfile::TempDir;
use thiserror::Error;
use crate::director::{Director, DirectorError, Outbox, RunResult};
use crate::handler::{Handler, HandlerCore, JobError, JobResult};
pub struct FlaggingWatcher {
mutex: Arc<Mutex<bool>>,
condvar: Option<Arc<Condvar>>,
}
impl FlaggingWatcher {
pub fn new(mutex: Arc<Mutex<bool>>) -> Self {
Self {
mutex,
condvar: None,
}
}
pub fn new_with_condvar(mutex: Arc<Mutex<bool>>, condvar: Arc<Condvar>) -> Self {
Self {
mutex,
condvar: Some(condvar),
}
}
}
impl HandlerCore for FlaggingWatcher {
fn add_to_director<'a>(&'a self, director: &mut Director<'a>) -> Result<(), JobError> {
director.add_handler("flagging:set", self)?;
Ok(())
}
}
#[async_trait]
impl Handler for FlaggingWatcher {
async fn handle(&self, kind: &str, _: &Value, _: usize) -> Result<JobResult, JobError> {
Ok(match kind {
"flagging:set" => {
*self.mutex.lock().unwrap() = true;
if let Some(cv) = self.condvar.as_ref() {
cv.notify_one();
}
JobResult::Accept
},
_ => JobResult::Reject(format!("signal received an unhandled {} job", kind)),
})
}
}
fn setup_logging() {
struct SimpleLogger;
impl Log for SimpleLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= Level::Debug
}
fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
println!("[{}] {}", record.level(), record.args());
}
}
fn flush(&self) {}
}
static LOGGER: SimpleLogger = SimpleLogger;
let _ = log::set_logger(&LOGGER);
log::set_max_level(LevelFilter::Debug);
}
pub fn test_workspace_dir() -> TempDir {
setup_logging();
let mut working_dir = env::current_exe().unwrap();
working_dir.pop();
TempDir::new_in(working_dir).unwrap()
}
fn files_in_path(path: &Path) -> Vec<String> {
path.read_dir()
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| !e.path().is_dir())
.map(|e| e.path().file_name().unwrap().to_string_lossy().into_owned())
.sorted()
.collect()
}
fn names_in_path(path: &Path) -> Vec<String> {
path.read_dir()
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| {
let path = e.path();
let filename = path.file_name().unwrap();
let ignored_paths = [Path::new("accept"), Path::new("reject"), Path::new("fail")];
!path.is_dir() || !ignored_paths.iter().any(|i| i == filename)
})
.map(|e| e.path().file_name().unwrap().to_string_lossy().into_owned())
.sorted()
.collect()
}
fn drop_job_data(path: &Path, label: &str, object: Value) {
let nfiles = files_in_path(path).len();
let mut fout = File::create(path.join(format!("{}-{}.json", nfiles, label))).unwrap();
serde_json::to_writer(&mut fout, &object).unwrap();
}
fn drop_job(path: &Path, kind: &str) {
drop_job_data(
path,
kind,
json!({
"kind": kind,
"data": {},
}),
);
}
fn check_queues(
path: &Path,
accept: &[&str],
reject: &[&str],
reject_reasons: &[&str],
fail: &[&str],
fail_reasons: &[&str],
ignored: &[&str],
) {
let accept_files = files_in_path(&path.join("accept"));
let reject_files = files_in_path(&path.join("reject"));
let fail_files = files_in_path(&path.join("fail"));
let ignored_files = names_in_path(path);
assert_eq!(accept_files, accept, "accepted paths");
assert_eq!(reject_files, reject, "rejected paths");
assert_eq!(fail_files, fail, "fail paths");
assert_eq!(ignored_files, ignored, "ignored paths");
let reasons = reject_files
.into_iter()
.filter(|reject| reject.ends_with(".reason"))
.map(|file_name| {
let reason_fname = path.join("reject").join(file_name);
fs::read_to_string(reason_fname).unwrap()
})
.collect::<Vec<_>>();
assert_eq!(reasons, reject_reasons);
let reasons = fail_files
.into_iter()
.filter(|fail| fail.ends_with(".reason"))
.map(|file_name| {
let reason_fname = path.join("fail").join(file_name);
fs::read_to_string(reason_fname).unwrap()
})
.collect::<Vec<_>>();
assert_eq!(reasons, fail_reasons);
}
const LOCK_POISONED: &str = "TestHandler mutex poisoned";
#[derive(Default)]
struct TestHandler {
jobs: Mutex<Vec<String>>,
}
impl TestHandler {
fn jobs(self) -> Vec<String> {
self.jobs.into_inner().unwrap()
}
}
#[derive(Debug, Error)]
#[error("test error")]
struct TestError;
impl HandlerCore for TestHandler {
fn add_to_director<'a>(&'a self, director: &mut Director<'a>) -> Result<(), JobError> {
director.add_handler("accept", self)?;
director.add_handler("defer", self)?;
director.add_handler("reject", self)?;
director.add_handler("fail", self)?;
director.add_handler("restart", self)?;
director.add_handler("done", self)?;
director.add_handler("error", self)?;
Ok(())
}
}
#[async_trait]
impl Handler for TestHandler {
async fn handle(&self, kind: &str, _: &Value, count: usize) -> Result<JobResult, JobError> {
{
let mut jobs = self.jobs.lock().expect(LOCK_POISONED);
jobs.push(kind.into());
}
if count > 0 {
return Ok(JobResult::Done);
}
match kind {
"accept" => Ok(JobResult::Accept),
"defer" => Ok(JobResult::defer("deferring")),
"reject" => Ok(JobResult::reject("rejecting")),
"restart" => Ok(JobResult::Restart),
"fail" => Ok(JobResult::fail(TestError)),
"done" => Ok(JobResult::Done),
"error" => Err(TestError.into()),
_ => unreachable!(),
}
}
}
#[test]
fn test_not_a_directory() {
let path = env::current_exe().unwrap();
let err = Director::new(&path).unwrap_err();
match err {
DirectorError::NotADirectory {
path: err_path,
} => {
assert_eq!(err_path, path);
},
_ => panic!("unexpected error: {:?}", err),
}
}
#[test]
fn test_duplicate_handler() {
let tempdir = test_workspace_dir();
let handler1 = TestHandler::default();
let handler2 = TestHandler::default();
let mut director = Director::new(tempdir.path()).unwrap();
handler1.add_to_director(&mut director).unwrap();
let err = handler2.add_to_director(&mut director).unwrap_err();
match err.downcast::<DirectorError>() {
Ok(err) => {
if let DirectorError::DuplicateHandler {
ref kind,
} = err.as_ref()
{
assert_eq!(kind, "accept");
} else {
panic!("unexpected director error: {:?}", err);
}
},
Err(err) => panic!("unexpected error: {:?}", err),
}
}
#[test]
fn test_no_permissions() {
let tempdir = test_workspace_dir();
fs::set_permissions(tempdir.path(), Permissions::from_mode(0o555)).unwrap();
let res = Director::new(tempdir.path());
let is_root = if cfg!(unix) {
let uid = unsafe { libc::getuid() };
uid == 0
} else {
false
};
if is_root {
println!("test_no_permissions is not effective as `root`");
let _ = res.unwrap();
} else {
let err = res.unwrap_err();
match err {
DirectorError::CreateDirectory {
outbox, ..
} => {
assert_eq!(outbox, Outbox::Accept);
},
_ => panic!("unexpected error: {:?}", err),
}
}
}
#[test]
fn test_ignore_directories() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["0-done.json", "0-done.stamp"],
&[],
&[],
&[],
&[],
&[],
);
}
#[test]
fn test_ignore_wrong_extension() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
File::create(tempdir.path().join("0-ignored.txt")).unwrap();
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["1-done.json", "1-done.stamp"],
&[],
&[],
&[],
&[],
&["0-ignored.txt"],
);
}
#[test]
fn test_ignore_invalid_json() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
{
let mut fout = File::create(tempdir.path().join("0-invalid.json")).unwrap();
write!(&mut fout, "invalid json").unwrap();
}
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["1-done.json", "1-done.stamp"],
&[],
&[],
&[],
&[],
&["0-invalid.json"],
);
}
#[test]
fn test_ignore_wrong_json_type() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
{
let mut fout = File::create(tempdir.path().join("0-array.json")).unwrap();
write!(&mut fout, "[]").unwrap();
let mut fout = File::create(tempdir.path().join("1-bool.json")).unwrap();
write!(&mut fout, "true").unwrap();
let mut fout = File::create(tempdir.path().join("2-null.json")).unwrap();
write!(&mut fout, "null").unwrap();
let mut fout = File::create(tempdir.path().join("3-number.json")).unwrap();
write!(&mut fout, "0").unwrap();
let mut fout = File::create(tempdir.path().join("4-string.json")).unwrap();
write!(&mut fout, r#""""#).unwrap();
}
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["5-done.json", "5-done.stamp"],
&[],
&[],
&[],
&[],
&[
"0-array.json",
"1-bool.json",
"2-null.json",
"3-number.json",
"4-string.json",
],
);
}
#[test]
fn test_reject_missing_kind() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
{
let mut fout = File::create(tempdir.path().join("0-missing-kind.json")).unwrap();
fout.write_all(br#"{"data":{}}"#).unwrap();
}
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["1-done.json", "1-done.stamp"],
&[
"0-missing-kind.json",
"0-missing-kind.reason",
"0-missing-kind.stamp",
],
&["no 'kind'\n"],
&[],
&[],
&[],
);
}
#[test]
fn test_reject_invalid_kind() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
{
let mut fout = File::create(tempdir.path().join("0-invalid-kind-array.json")).unwrap();
fout.write_all(br#"{"kind":[],"data":{}}"#).unwrap();
let mut fout = File::create(tempdir.path().join("1-invalid-kind-bool.json")).unwrap();
fout.write_all(br#"{"kind":true,"data":{}}"#).unwrap();
let mut fout = File::create(tempdir.path().join("2-invalid-kind-null.json")).unwrap();
fout.write_all(br#"{"kind":null,"data":{}}"#).unwrap();
let mut fout = File::create(tempdir.path().join("3-invalid-kind-number.json")).unwrap();
fout.write_all(br#"{"kind":0,"data":{}}"#).unwrap();
let mut fout = File::create(tempdir.path().join("4-invalid-kind-object.json")).unwrap();
fout.write_all(br#"{"kind":{},"data":{}}"#).unwrap();
}
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["5-done.json", "5-done.stamp"],
&[
"0-invalid-kind-array.json",
"0-invalid-kind-array.reason",
"0-invalid-kind-array.stamp",
"1-invalid-kind-bool.json",
"1-invalid-kind-bool.reason",
"1-invalid-kind-bool.stamp",
"2-invalid-kind-null.json",
"2-invalid-kind-null.reason",
"2-invalid-kind-null.stamp",
"3-invalid-kind-number.json",
"3-invalid-kind-number.reason",
"3-invalid-kind-number.stamp",
"4-invalid-kind-object.json",
"4-invalid-kind-object.reason",
"4-invalid-kind-object.stamp",
],
&[
"'kind' is not a string\n",
"'kind' is not a string\n",
"'kind' is not a string\n",
"'kind' is not a string\n",
"'kind' is not a string\n",
],
&[],
&[],
&[],
);
}
#[test]
fn test_reject_missing_data() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
{
let mut fout = File::create(tempdir.path().join("0-missing-data.json")).unwrap();
fout.write_all(br#"{"kind":""}"#).unwrap();
}
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["1-done.json", "1-done.stamp"],
&[
"0-missing-data.json",
"0-missing-data.reason",
"0-missing-data.stamp",
],
&["no 'data'\n"],
&[],
&[],
&[],
);
}
#[test]
fn test_reject_invalid_retry() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
{
let mut fout = File::create(tempdir.path().join("0-invalid-retry-array.json")).unwrap();
fout.write_all(br#"{"kind":"retry","data":{},"retry":[]}"#)
.unwrap();
let mut fout = File::create(tempdir.path().join("1-invalid-retry-bool.json")).unwrap();
fout.write_all(br#"{"kind":"retry","data":{},"retry":true}"#)
.unwrap();
let mut fout = File::create(tempdir.path().join("2-invalid-retry-null.json")).unwrap();
fout.write_all(br#"{"kind":"retry","data":{},"retry":null}"#)
.unwrap();
let mut fout =
File::create(tempdir.path().join("3-invalid-retry-number.json")).unwrap();
fout.write_all(br#"{"kind":"retry","data":{},"retry":0}"#)
.unwrap();
let mut fout =
File::create(tempdir.path().join("4-invalid-retry-string.json")).unwrap();
fout.write_all(br#"{"kind":"retry","data":{},"retry":""}"#)
.unwrap();
}
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["5-done.json", "5-done.stamp"],
&[
"0-invalid-retry-array.json",
"0-invalid-retry-array.reason",
"0-invalid-retry-array.stamp",
"1-invalid-retry-bool.json",
"1-invalid-retry-bool.reason",
"1-invalid-retry-bool.stamp",
"2-invalid-retry-null.json",
"2-invalid-retry-null.reason",
"2-invalid-retry-null.stamp",
"3-invalid-retry-number.json",
"3-invalid-retry-number.reason",
"3-invalid-retry-number.stamp",
"4-invalid-retry-string.json",
"4-invalid-retry-string.reason",
"4-invalid-retry-string.stamp",
],
&[
"'retry' is not an object\n",
"'retry' is not an object\n",
"'retry' is not an object\n",
"'retry' is not an object\n",
"'retry' is not an object\n",
],
&[],
&[],
&[],
);
}
#[test]
fn test_reject_invalid_retry_object() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
{
let mut fout =
File::create(tempdir.path().join("0-invalid-retry-object-array.json")).unwrap();
fout.write_all(br#"{"kind":"retry","data":{},"retry":{"reason":[]}}"#)
.unwrap();
let mut fout =
File::create(tempdir.path().join("1-invalid-retry-object-bool.json")).unwrap();
fout.write_all(br#"{"kind":"retry","data":{},"retry":{"reason":true}}"#)
.unwrap();
let mut fout =
File::create(tempdir.path().join("2-invalid-retry-object-null.json")).unwrap();
fout.write_all(br#"{"kind":"retry","data":{},"retry":{"reason":null}}"#)
.unwrap();
let mut fout =
File::create(tempdir.path().join("3-invalid-retry-object-number.json")).unwrap();
fout.write_all(br#"{"kind":"retry","data":{},"retry":{"reason":0}}"#)
.unwrap();
let mut fout =
File::create(tempdir.path().join("4-invalid-retry-object-object.json")).unwrap();
fout.write_all(br#"{"kind":"retry","data":{},"retry":{"reason":{}}}"#)
.unwrap();
}
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["5-done.json", "5-done.stamp"],
&[
"0-invalid-retry-object-array.json",
"0-invalid-retry-object-array.reason",
"0-invalid-retry-object-array.stamp",
"1-invalid-retry-object-bool.json",
"1-invalid-retry-object-bool.reason",
"1-invalid-retry-object-bool.stamp",
"2-invalid-retry-object-null.json",
"2-invalid-retry-object-null.reason",
"2-invalid-retry-object-null.stamp",
"3-invalid-retry-object-number.json",
"3-invalid-retry-object-number.reason",
"3-invalid-retry-object-number.stamp",
"4-invalid-retry-object-object.json",
"4-invalid-retry-object-object.reason",
"4-invalid-retry-object-object.stamp",
],
&[
"retry reason is not a string\n",
"retry reason is not a string\n",
"retry reason is not a string\n",
"retry reason is not a string\n",
"retry reason is not a string\n",
],
&[],
&[],
&[],
);
}
#[test]
fn test_reject_no_handler() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
drop_job(tempdir.path(), "no-handler");
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["1-done.json", "1-done.stamp"],
&[
"0-no-handler.json",
"0-no-handler.reason",
"0-no-handler.stamp",
],
&["no handler for kind 'no-handler'\n"],
&[],
&[],
&[],
);
}
#[test]
fn test_receive_handler_error() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
drop_job(tempdir.path(), "error");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["error"]);
check_queues(tempdir.path(), &[], &[], &[], &[], &[], &["0-error.json"]);
}
#[test]
fn test_accept() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
drop_job(tempdir.path(), "accept");
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["accept", "done"]);
check_queues(
tempdir.path(),
&[
"0-accept.json",
"0-accept.stamp",
"1-done.json",
"1-done.stamp",
],
&[],
&[],
&[],
&[],
&[],
);
}
#[test]
fn test_defer() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
drop_job(tempdir.path(), "defer");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["defer", "defer"]);
let accept_files = files_in_path(&tempdir.path().join("accept"));
assert_eq!(accept_files.len(), 2);
let accept_files_ref = accept_files.iter().map(|s| s.as_ref()).collect::<Vec<_>>();
check_queues(
tempdir.path(),
&accept_files_ref,
&["0-defer.json", "0-defer.reason", "0-defer.stamp"],
&["deferring\n"],
&[],
&[],
&[],
);
}
#[test]
fn test_reject() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
drop_job(tempdir.path(), "reject");
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["reject", "done"]);
check_queues(
tempdir.path(),
&["1-done.json", "1-done.stamp"],
&["0-reject.json", "0-reject.reason", "0-reject.stamp"],
&["rejecting\n"],
&[],
&[],
&[],
);
}
#[test]
fn test_fail() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
drop_job(tempdir.path(), "fail");
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["fail", "done"]);
check_queues(
tempdir.path(),
&["1-done.json", "1-done.stamp"],
&[],
&[],
&["0-fail.json", "0-fail.reason", "0-fail.stamp"],
&["TestError\n"],
&[],
);
}
#[test]
fn test_restart() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
drop_job(tempdir.path(), "restart");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Restart);
}
assert_eq!(handler.jobs(), ["restart"]);
check_queues(
tempdir.path(),
&["0-restart.json", "0-restart.stamp"],
&[],
&[],
&[],
&[],
&[],
);
}
#[test]
fn test_done() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
drop_job(tempdir.path(), "done");
drop_job(tempdir.path(), "after");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["0-done.json", "0-done.stamp"],
&[],
&[],
&[],
&[],
&["1-after.json"],
);
}
#[test]
fn test_ignore_non_object_payload() {
let tempdir = test_workspace_dir();
let handler = TestHandler::default();
{
let mut director = Director::new(tempdir.path()).unwrap();
handler.add_to_director(&mut director).unwrap();
drop_job_data(tempdir.path(), "null-payload", json!(null));
drop_job_data(tempdir.path(), "int-payload", json!(1));
drop_job_data(tempdir.path(), "bool-payload", json!(true));
drop_job_data(tempdir.path(), "string-payload", json!(""));
drop_job_data(tempdir.path(), "array-payload", json!([]));
drop_job(tempdir.path(), "done");
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
}
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&["5-done.json", "5-done.stamp"],
&[],
&[],
&[],
&[],
&[
"0-null-payload.json",
"1-int-payload.json",
"2-bool-payload.json",
"3-string-payload.json",
"4-array-payload.json",
],
);
}
#[test]
fn test_ignore_new_directories() {
let tempdir = test_workspace_dir();
let mut director = Director::new(tempdir.path()).unwrap();
let mutex = Arc::new(Mutex::new(false));
let condvar = Arc::new(Condvar::new());
let signal = FlaggingWatcher::new_with_condvar(mutex.clone(), condvar.clone());
signal.add_to_director(&mut director).unwrap();
let handler = TestHandler::default();
handler.add_to_director(&mut director).unwrap();
drop_job(tempdir.path(), "flagging:set");
let job_path = tempdir.path().to_path_buf();
let dir_thread = thread::spawn(move || {
{
let mut lock = mutex.lock().unwrap();
lock = condvar.wait_while(lock, |b| !*b).unwrap();
assert!(*lock, "the flag was not set");
}
while !files_in_path(&job_path).is_empty() {}
fs::create_dir_all(job_path.join("dir.json")).unwrap();
drop_job(&job_path, "done");
});
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
dir_thread.join().unwrap();
assert_eq!(handler.jobs(), ["done"]);
check_queues(
tempdir.path(),
&[
"0-done.json",
"0-done.stamp",
"0-flagging:set.json",
"0-flagging:set.stamp",
],
&[],
&[],
&[],
&[],
&["dir.json"],
);
}