use std::fs::{self, File};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration;
use chrono::Utc;
use log::{error, warn};
use lzma::LzmaWriter;
use rand::Rng;
use serde::Serialize;
use serde_json::{json, Value};
use tar::Builder;
use tempfile::TempDir;
use thiserror::Error;
use crate::Outbox;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum JobError {
#[error("failed to create job {}: {}", filepath.display(), source)]
Create {
filepath: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to write job file {}: {}", filepath.display(), source)]
Write {
filepath: PathBuf,
#[source]
source: serde_json::Error,
},
}
impl JobError {
fn create(filepath: PathBuf, source: io::Error) -> Self {
JobError::Create {
filepath,
source,
}
}
fn write(filepath: PathBuf, source: serde_json::Error) -> Self {
JobError::Write {
filepath,
source,
}
}
}
type JobResult<T> = Result<T, JobError>;
fn write_job(queue: &Path, data: &Value) -> JobResult<()> {
let rndpart = rand::rng()
.sample_iter(&rand::distr::Alphanumeric)
.map(char::from)
.take(12)
.collect::<String>();
let filename = format!("{}-{}.json", Utc::now().to_rfc3339(), rndpart);
let job_file = queue.join(filename);
{
let mut file =
File::create(&job_file).map_err(|err| JobError::create(job_file.clone(), err))?;
serde_json::to_writer(&mut file, data)
.map_err(|err| JobError::write(job_file.clone(), err))?;
}
while job_file.exists() {
thread::sleep(Duration::from_millis(100));
}
Ok(())
}
pub fn drop_job<Q, K, V>(queue: Q, kind: K, data: V) -> JobResult<()>
where
Q: AsRef<Path>,
K: AsRef<str>,
V: Serialize,
{
let job = json!({
"kind": kind.as_ref(),
"data": data,
});
write_job(queue.as_ref(), &job)
}
pub fn restart<Q>(queue: Q) -> JobResult<()>
where
Q: AsRef<Path>,
{
drop_job(queue, "watchdog:restart", json!({}))
}
pub fn exit<Q>(queue: Q) -> JobResult<()>
where
Q: AsRef<Path>,
{
drop_job(queue, "watchdog:exit", json!({}))
}
fn log_tempdir(tempdir: TempDir) {
error!(
"archival failed mid-stream; in-progress archival job files may be found in {}",
tempdir.into_path().display(),
);
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ArchiveQueueError {
#[error("failed to create archive file {}: {}", path.display(), source)]
CreateOutput {
#[doc(hidden)]
outbox: Outbox,
path: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to create compressor: {}", source)]
CreateCompressor {
#[doc(hidden)]
outbox: Outbox,
#[source]
source: lzma::LzmaError,
},
#[error("failed to archive the {} outbox: {}", outbox, source)]
Archive {
#[doc(hidden)]
outbox: Outbox,
#[source]
source: ArchiveError,
},
#[error("failed to finish the {} archive: {}", outbox, source)]
FinishArchive {
#[doc(hidden)]
outbox: Outbox,
#[source]
source: lzma::LzmaError,
},
#[error("failed to remove an incomplete {} archive: {}", outbox, source)]
RemoveIncompleteArchive {
#[doc(hidden)]
outbox: Outbox,
#[source]
source: io::Error,
},
}
impl ArchiveQueueError {
pub fn outbox(&self) -> Outbox {
match self {
ArchiveQueueError::CreateOutput {
outbox, ..
}
| ArchiveQueueError::CreateCompressor {
outbox, ..
}
| ArchiveQueueError::Archive {
outbox, ..
}
| ArchiveQueueError::FinishArchive {
outbox, ..
}
| ArchiveQueueError::RemoveIncompleteArchive {
outbox, ..
} => *outbox,
}
}
fn create_output(outbox: Outbox, path: PathBuf, source: io::Error) -> Self {
ArchiveQueueError::CreateOutput {
outbox,
path,
source,
}
}
fn create_compressor(outbox: Outbox, source: lzma::LzmaError) -> Self {
ArchiveQueueError::CreateCompressor {
outbox,
source,
}
}
fn archive(outbox: Outbox, source: ArchiveError) -> Self {
ArchiveQueueError::Archive {
outbox,
source,
}
}
fn finish_archive(tempdir: TempDir, outbox: Outbox, source: lzma::LzmaError) -> Self {
log_tempdir(tempdir);
ArchiveQueueError::FinishArchive {
outbox,
source,
}
}
fn remove_incomplete_archive(outbox: Outbox, source: io::Error) -> Self {
ArchiveQueueError::RemoveIncompleteArchive {
outbox,
source,
}
}
}
const LZMA_COMPRESSION: u32 = 6;
pub fn archive_queue<Q, O>(queue: Q, output: O) -> Result<(), ArchiveQueueError>
where
Q: AsRef<Path>,
O: AsRef<Path>,
{
archive_queue_impl(queue.as_ref(), output.as_ref())
}
fn archive_queue_impl(queue: &Path, output: &Path) -> Result<(), ArchiveQueueError> {
for outbox in [Outbox::Accept, Outbox::Fail, Outbox::Reject]
.iter()
.cloned()
{
let (filename, file) = archive_file(output, outbox)?;
let opt_writer = archive_directory(queue, output, outbox, file)
.map_err(|err| ArchiveQueueError::archive(outbox, err))?;
if let Some((tempdir, writer)) = opt_writer {
writer
.finish()
.map_err(|err| ArchiveQueueError::finish_archive(tempdir, outbox, err))?;
} else {
fs::remove_file(filename)
.map_err(|err| ArchiveQueueError::remove_incomplete_archive(outbox, err))?;
}
}
Ok(())
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ArchiveError {
#[error("failed to create temporary directory: {}", source)]
CreateTempdir {
#[source]
source: io::Error,
},
#[error("failed to add directory: {}", source)]
AddDirectory {
#[source]
source: io::Error,
},
#[error("failed to read directory: {}", source)]
ReadDirectory {
#[source]
source: io::Error,
},
#[error("failed to append job: {}", source)]
AppendJob {
path: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to finish tar stream: {}", source)]
FinishTar {
#[source]
source: io::Error,
},
}
impl ArchiveError {
fn create_tempdir(source: io::Error) -> Self {
ArchiveError::CreateTempdir {
source,
}
}
fn add_directory(source: io::Error) -> Self {
ArchiveError::AddDirectory {
source,
}
}
fn read_directory(source: io::Error) -> Self {
ArchiveError::ReadDirectory {
source,
}
}
fn append_job(tempdir: TempDir, path: PathBuf, source: io::Error) -> Self {
log_tempdir(tempdir);
ArchiveError::AppendJob {
path,
source,
}
}
fn finish_tar(tempdir: TempDir, source: io::Error) -> Self {
log_tempdir(tempdir);
ArchiveError::FinishTar {
source,
}
}
}
fn archive_file(
path: &Path,
outbox: Outbox,
) -> Result<(PathBuf, LzmaWriter<File>), ArchiveQueueError> {
let now = Utc::now();
let filepath = path.join(format!("{}-{}.tar.xz", now.to_rfc3339(), outbox));
let file = File::create(&filepath)
.map_err(|err| ArchiveQueueError::create_output(outbox, filepath.clone(), err))?;
let writer = LzmaWriter::new_compressor(file, LZMA_COMPRESSION)
.map_err(|err| ArchiveQueueError::create_compressor(outbox, err))?;
Ok((filepath, writer))
}
fn archive_directory<O>(
path: &Path,
workdir: &Path,
outbox: Outbox,
output: O,
) -> Result<Option<(TempDir, O)>, ArchiveError>
where
O: Write,
{
let tempdir = TempDir::new_in(workdir).map_err(ArchiveError::create_tempdir)?;
let outbox_name = outbox.name();
let outbox_path = PathBuf::from(outbox_name);
let mut archive = Builder::new(output);
archive
.append_dir(outbox_name, &tempdir)
.map_err(ArchiveError::add_directory)?;
let entries = fs::read_dir(path.join(outbox_name)).map_err(ArchiveError::read_directory)?;
let mut is_empty = true;
for entry in entries {
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
warn!("failed to read directory entry; skipping: {:?}", err);
continue;
},
};
let path = entry.path();
let file_name = entry.file_name();
if let Err(err) = archive.append_path_with_name(&path, outbox_path.join(&file_name)) {
return Err(ArchiveError::append_job(tempdir, path, err));
}
let target_path = tempdir.path().join(&file_name);
match fs::rename(&path, &target_path) {
Ok(()) => is_empty = false,
Err(err) => {
warn!(
"failed to rename {} to {}: {:?}",
path.display(),
target_path.display(),
err,
)
},
}
}
if is_empty {
return Ok(None);
}
match archive.into_inner() {
Ok(output) => Ok(Some((tempdir, output))),
Err(err) => Err(ArchiveError::finish_tar(tempdir, err)),
}
}
#[cfg(test)]
mod tests {
use std::fs;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use serde_json::json;
use crate::{test, utils};
use crate::{Director, DirectorWatchdog, HandlerCore, RunResult};
#[test]
fn test_log_tempdir_leaves_directory() {
let tempdir = test::test_workspace_dir();
let path = tempdir.path().to_path_buf();
super::log_tempdir(tempdir);
assert!(path.exists(), "{} should still exist", path.display());
fs::remove_dir_all(path).unwrap();
}
#[test]
fn test_write_job_waits_for_consumption() {
let tempdir = test::test_workspace_dir();
let mut director = Director::new(tempdir.path()).unwrap();
let mutex = Arc::new(Mutex::new(false));
let signal = test::FlaggingWatcher::new(mutex.clone());
signal.add_to_director(&mut director).unwrap();
DirectorWatchdog.add_to_director(&mut director).unwrap();
let thread_mutex = mutex.clone();
let job_path = tempdir.path().to_path_buf();
let write_thread = thread::spawn(move || {
let data = json!({
"kind": "flagging:set",
"data": {},
});
utils::write_job(&job_path, &data).unwrap();
assert!(
*thread_mutex.lock().unwrap(),
"`write_job` returned before the handler set the flag",
);
utils::exit(job_path).unwrap();
});
loop {
let entries = fs::read_dir(tempdir.path()).unwrap();
let nfiles = entries
.filter(|entry| entry.as_ref().unwrap().file_type().unwrap().is_file())
.count();
if nfiles == 1 {
break;
}
}
thread::sleep(Duration::from_millis(100));
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Done);
write_thread.join().unwrap();
}
#[test]
fn test_archive_queue() {
let tempdir = test::test_workspace_dir();
let archive = tempdir.path().join("archive");
let output = tempdir.path().join("output");
fs::create_dir_all(&archive).unwrap();
let _director = Director::new(&archive).unwrap(); fs::create_dir_all(&output).unwrap();
let accept = archive.join("accept");
let reject = archive.join("reject");
let fail = archive.join("fail");
assert_eq!(
accept.read_dir().unwrap().count(),
0,
"`accept` is not an empty directory",
);
assert_eq!(
reject.read_dir().unwrap().count(),
0,
"`reject` is not an empty directory",
);
assert_eq!(
fail.read_dir().unwrap().count(),
0,
"`fail` is not an empty directory",
);
assert_eq!(
output.read_dir().unwrap().count(),
0,
"`output` is not an empty directory",
);
utils::archive_queue(&archive, &output).unwrap();
assert_eq!(
output.read_dir().unwrap().count(),
0,
"archived empty queues",
);
fs::write(accept.join("accept.json"), b"{}").unwrap();
fs::write(reject.join("reject.json"), b"{}").unwrap();
fs::write(fail.join("fail.json"), b"{}").unwrap();
utils::archive_queue(archive, &output).unwrap();
assert_eq!(
output.read_dir().unwrap().count(),
3,
"not all archives archived",
);
}
#[test]
fn test_drop_restart() {
let tempdir = test::test_workspace_dir();
let mut director = Director::new(tempdir.path()).unwrap();
DirectorWatchdog.add_to_director(&mut director).unwrap();
let job_path = tempdir.path().to_path_buf();
let write_thread = thread::spawn(move || {
utils::restart(job_path).unwrap();
});
let res = director.watch_directory(tempdir.path()).unwrap();
assert_eq!(res, RunResult::Restart);
write_thread.join().unwrap();
}
}