use std::fs::{self, File};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration;
use chrono::Utc;
use log::{error, warn};
use lzma::LzmaWriter;
use rand::Rng;
use serde::Serialize;
use serde_json::{json, Value};
use tar::Builder;
use tempfile::TempDir;
use thiserror::Error;
use crate::Outbox;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum JobError {
#[error("failed to create job {}: {}", filepath.display(), source)]
Create {
filepath: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to write job file {}: {}", filepath.display(), source)]
Write {
filepath: PathBuf,
#[source]
source: serde_json::Error,
},
}
impl JobError {
fn create(filepath: PathBuf, source: io::Error) -> Self {
JobError::Create {
filepath,
source,
}
}
fn write(filepath: PathBuf, source: serde_json::Error) -> Self {
JobError::Write {
filepath,
source,
}
}
}
type JobResult<T> = Result<T, JobError>;
fn write_job(queue: &Path, data: &Value) -> JobResult<()> {
let rndpart = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.map(char::from)
.take(12)
.collect::<String>();
let filename = format!("{}-{}.json", Utc::now().to_rfc3339(), rndpart);
let job_file = queue.join(filename);
{
let mut file =
File::create(&job_file).map_err(|err| JobError::create(job_file.clone(), err))?;
serde_json::to_writer(&mut file, data)
.map_err(|err| JobError::write(job_file.clone(), err))?;
}
while job_file.exists() {
thread::sleep(Duration::from_millis(100));
}
loop {
if !job_file.exists() {
break;
}
thread::sleep(Duration::from_millis(100));
}
Ok(())
}
pub fn drop_job<Q, K, V>(queue: Q, kind: K, data: V) -> JobResult<()>
where
Q: AsRef<Path>,
K: AsRef<str>,
V: Serialize,
{
let job = json!({
"kind": kind.as_ref(),
"data": data,
});
write_job(queue.as_ref(), &job)
}
pub fn restart<Q>(queue: Q) -> JobResult<()>
where
Q: AsRef<Path>,
{
drop_job(queue, "watchdog:restart", json!({}))
}
pub fn exit<Q>(queue: Q) -> JobResult<()>
where
Q: AsRef<Path>,
{
drop_job(queue, "watchdog:exit", json!({}))
}
fn log_tempdir(tempdir: TempDir) {
error!(
"archival failed mid-stream; in-progress archival job files may be found in {}",
tempdir.into_path().display(),
);
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ArchiveQueueError {
#[error("failed to create archive file {}: {}", path.display(), source)]
CreateOutput {
#[doc(hidden)]
outbox: Outbox,
path: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to create compressor: {}", source)]
CreateCompressor {
#[doc(hidden)]
outbox: Outbox,
#[source]
source: lzma::LzmaError,
},
#[error("failed to archive the {} outbox: {}", outbox, source)]
Archive {
#[doc(hidden)]
outbox: Outbox,
#[source]
source: ArchiveError,
},
#[error("failed to finish the {} archive: {}", outbox, source)]
FinishArchive {
#[doc(hidden)]
outbox: Outbox,
#[source]
source: lzma::LzmaError,
},
#[error("failed to remove an incomplete {} archive: {}", outbox, source)]
RemoveIncompleteArchive {
#[doc(hidden)]
outbox: Outbox,
#[source]
source: io::Error,
},
}
impl ArchiveQueueError {
pub fn outbox(&self) -> Outbox {
match self {
ArchiveQueueError::CreateOutput {
outbox, ..
}
| ArchiveQueueError::CreateCompressor {
outbox, ..
}
| ArchiveQueueError::Archive {
outbox, ..
}
| ArchiveQueueError::FinishArchive {
outbox, ..
}
| ArchiveQueueError::RemoveIncompleteArchive {
outbox, ..
} => *outbox,
}
}
fn create_output(outbox: Outbox, path: PathBuf, source: io::Error) -> Self {
ArchiveQueueError::CreateOutput {
outbox,
path,
source,
}
}
fn create_compressor(outbox: Outbox, source: lzma::LzmaError) -> Self {
ArchiveQueueError::CreateCompressor {
outbox,
source,
}
}
fn archive(outbox: Outbox, source: ArchiveError) -> Self {
ArchiveQueueError::Archive {
outbox,
source,
}
}
fn finish_archive(tempdir: TempDir, outbox: Outbox, source: lzma::LzmaError) -> Self {
log_tempdir(tempdir);
ArchiveQueueError::FinishArchive {
outbox,
source,
}
}
fn remove_incomplete_archive(outbox: Outbox, source: io::Error) -> Self {
ArchiveQueueError::RemoveIncompleteArchive {
outbox,
source,
}
}
}
const LZMA_COMPRESSION: u32 = 6;
pub fn archive_queue<Q, O>(queue: Q, output: O) -> Result<(), ArchiveQueueError>
where
Q: AsRef<Path>,
O: AsRef<Path>,
{
archive_queue_impl(queue.as_ref(), output.as_ref())
}
fn archive_queue_impl(queue: &Path, output: &Path) -> Result<(), ArchiveQueueError> {
for outbox in [Outbox::Accept, Outbox::Fail, Outbox::Reject]
.iter()
.cloned()
{
let (filename, file) = archive_file(output, outbox)?;
let opt_writer = archive_directory(queue, output, outbox, file)
.map_err(|err| ArchiveQueueError::archive(outbox, err))?;
if let Some((tempdir, writer)) = opt_writer {
writer
.finish()
.map_err(|err| ArchiveQueueError::finish_archive(tempdir, outbox, err))?;
} else {
fs::remove_file(filename)
.map_err(|err| ArchiveQueueError::remove_incomplete_archive(outbox, err))?;
}
}
Ok(())
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ArchiveError {
#[error("failed to create temporary directory: {}", source)]
CreateTempdir {
#[source]
source: io::Error,
},
#[error("failed to add directory: {}", source)]
AddDirectory {
#[source]
source: io::Error,
},
#[error("failed to read directory: {}", source)]
ReadDirectory {
#[source]
source: io::Error,
},
#[error("failed to append job: {}", source)]
AppendJob {
path: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to finish tar stream: {}", source)]
FinishTar {
#[source]
source: io::Error,
},
}
impl ArchiveError {
fn create_tempdir(source: io::Error) -> Self {
ArchiveError::CreateTempdir {
source,
}
}
fn add_directory(source: io::Error) -> Self {
ArchiveError::AddDirectory {
source,
}
}
fn read_directory(source: io::Error) -> Self {
ArchiveError::ReadDirectory {
source,
}
}
fn append_job(tempdir: TempDir, path: PathBuf, source: io::Error) -> Self {
log_tempdir(tempdir);
ArchiveError::AppendJob {
path,
source,
}
}
fn finish_tar(tempdir: TempDir, source: io::Error) -> Self {
log_tempdir(tempdir);
ArchiveError::FinishTar {
source,
}
}
}
fn archive_file(
path: &Path,
outbox: Outbox,
) -> Result<(PathBuf, LzmaWriter<File>), ArchiveQueueError> {
let now = Utc::now();
let filepath = path.join(format!("{}-{}.tar.xz", now.to_rfc3339(), outbox));
let file = File::create(&filepath)
.map_err(|err| ArchiveQueueError::create_output(outbox, filepath.clone(), err))?;
let writer = LzmaWriter::new_compressor(file, LZMA_COMPRESSION)
.map_err(|err| ArchiveQueueError::create_compressor(outbox, err))?;
Ok((filepath, writer))
}
fn archive_directory<O>(
path: &Path,
workdir: &Path,
outbox: Outbox,
output: O,
) -> Result<Option<(TempDir, O)>, ArchiveError>
where
O: Write,
{
let tempdir = TempDir::new_in(workdir).map_err(ArchiveError::create_tempdir)?;
let outbox_name = outbox.name();
let outbox_path = PathBuf::from(outbox_name);
let mut archive = Builder::new(output);
archive
.append_dir(outbox_name, &tempdir)
.map_err(ArchiveError::add_directory)?;
let entries = fs::read_dir(path.join(outbox_name)).map_err(ArchiveError::read_directory)?;
let mut is_empty = true;
for entry in entries {
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
warn!("failed to read directory entry; skipping: {:?}", err);
continue;
},
};
let path = entry.path();
let file_name = entry.file_name();
if let Err(err) = archive.append_path_with_name(&path, outbox_path.join(&file_name)) {
return Err(ArchiveError::append_job(tempdir, path, err));
}
let target_path = tempdir.path().join(&file_name);
match fs::rename(&path, &target_path) {
Ok(()) => is_empty = false,
Err(err) => {
warn!(
"failed to rename {} to {}: {:?}",
path.display(),
target_path.display(),
err,
)
},
}
}
if is_empty {
return Ok(None);
}
match archive.into_inner() {
Ok(output) => Ok(Some((tempdir, output))),
Err(err) => Err(ArchiveError::finish_tar(tempdir, err)),
}
}