1use std::fs::{self, File};
14use std::io::{self, Write};
15use std::path::{Path, PathBuf};
16use std::thread;
17use std::time::Duration;
18
19use chrono::Utc;
20use log::{error, warn};
21use lzma::LzmaWriter;
22use rand::Rng;
23use serde::Serialize;
24use serde_json::{json, Value};
25use tar::Builder;
26use tempfile::TempDir;
27use thiserror::Error;
28
29use crate::Outbox;
30
31#[derive(Debug, Error)]
33#[non_exhaustive]
34pub enum JobError {
35 #[error("failed to create job {}: {}", filepath.display(), source)]
37 Create {
38 filepath: PathBuf,
40 #[source]
42 source: io::Error,
43 },
44 #[error("failed to write job file {}: {}", filepath.display(), source)]
46 Write {
47 filepath: PathBuf,
49 #[source]
51 source: serde_json::Error,
52 },
53}
54
55impl JobError {
56 fn create(filepath: PathBuf, source: io::Error) -> Self {
57 JobError::Create {
58 filepath,
59 source,
60 }
61 }
62
63 fn write(filepath: PathBuf, source: serde_json::Error) -> Self {
64 JobError::Write {
65 filepath,
66 source,
67 }
68 }
69}
70
71type JobResult<T> = Result<T, JobError>;
72
73fn write_job(queue: &Path, data: &Value) -> JobResult<()> {
75 let rndpart = rand::rng()
76 .sample_iter(&rand::distr::Alphanumeric)
77 .map(char::from)
78 .take(12)
79 .collect::<String>();
80 let filename = format!("{}-{}.json", Utc::now().to_rfc3339(), rndpart);
81 let job_file = queue.join(filename);
82
83 {
85 let mut file =
86 File::create(&job_file).map_err(|err| JobError::create(job_file.clone(), err))?;
87 serde_json::to_writer(&mut file, data)
88 .map_err(|err| JobError::write(job_file.clone(), err))?;
89 }
90
91 while job_file.exists() {
93 thread::sleep(Duration::from_millis(100));
94 }
95
96 Ok(())
97}
98
99pub fn drop_job<Q, K, V>(queue: Q, kind: K, data: V) -> JobResult<()>
101where
102 Q: AsRef<Path>,
103 K: AsRef<str>,
104 V: Serialize,
105{
106 let job = json!({
107 "kind": kind.as_ref(),
108 "data": data,
109 });
110 write_job(queue.as_ref(), &job)
111}
112
113pub fn restart<Q>(queue: Q) -> JobResult<()>
115where
116 Q: AsRef<Path>,
117{
118 drop_job(queue, "watchdog:restart", json!({}))
119}
120
121pub fn exit<Q>(queue: Q) -> JobResult<()>
123where
124 Q: AsRef<Path>,
125{
126 drop_job(queue, "watchdog:exit", json!({}))
127}
128
129fn log_tempdir(tempdir: TempDir) {
132 error!(
133 "archival failed mid-stream; in-progress archival job files may be found in {}",
134 tempdir.into_path().display(),
135 );
136}
137
138#[derive(Debug, Error)]
140#[non_exhaustive]
141pub enum ArchiveQueueError {
142 #[error("failed to create archive file {}: {}", path.display(), source)]
144 CreateOutput {
145 #[doc(hidden)]
146 outbox: Outbox,
147 path: PathBuf,
149 #[source]
151 source: io::Error,
152 },
153 #[error("failed to create compressor: {}", source)]
155 CreateCompressor {
156 #[doc(hidden)]
157 outbox: Outbox,
158 #[source]
160 source: lzma::LzmaError,
161 },
162 #[error("failed to archive the {} outbox: {}", outbox, source)]
164 Archive {
165 #[doc(hidden)]
166 outbox: Outbox,
167 #[source]
169 source: ArchiveError,
170 },
171 #[error("failed to finish the {} archive: {}", outbox, source)]
173 FinishArchive {
174 #[doc(hidden)]
175 outbox: Outbox,
176 #[source]
178 source: lzma::LzmaError,
179 },
180 #[error("failed to remove an incomplete {} archive: {}", outbox, source)]
182 RemoveIncompleteArchive {
183 #[doc(hidden)]
184 outbox: Outbox,
185 #[source]
187 source: io::Error,
188 },
189}
190
191impl ArchiveQueueError {
192 pub fn outbox(&self) -> Outbox {
194 match self {
195 ArchiveQueueError::CreateOutput {
196 outbox, ..
197 }
198 | ArchiveQueueError::CreateCompressor {
199 outbox, ..
200 }
201 | ArchiveQueueError::Archive {
202 outbox, ..
203 }
204 | ArchiveQueueError::FinishArchive {
205 outbox, ..
206 }
207 | ArchiveQueueError::RemoveIncompleteArchive {
208 outbox, ..
209 } => *outbox,
210 }
211 }
212
213 fn create_output(outbox: Outbox, path: PathBuf, source: io::Error) -> Self {
214 ArchiveQueueError::CreateOutput {
215 outbox,
216 path,
217 source,
218 }
219 }
220
221 fn create_compressor(outbox: Outbox, source: lzma::LzmaError) -> Self {
222 ArchiveQueueError::CreateCompressor {
223 outbox,
224 source,
225 }
226 }
227
228 fn archive(outbox: Outbox, source: ArchiveError) -> Self {
229 ArchiveQueueError::Archive {
230 outbox,
231 source,
232 }
233 }
234
235 fn finish_archive(tempdir: TempDir, outbox: Outbox, source: lzma::LzmaError) -> Self {
236 log_tempdir(tempdir);
237 ArchiveQueueError::FinishArchive {
238 outbox,
239 source,
240 }
241 }
242
243 fn remove_incomplete_archive(outbox: Outbox, source: io::Error) -> Self {
244 ArchiveQueueError::RemoveIncompleteArchive {
245 outbox,
246 source,
247 }
248 }
249}
250
251const LZMA_COMPRESSION: u32 = 6;
253
254pub fn archive_queue<Q, O>(queue: Q, output: O) -> Result<(), ArchiveQueueError>
258where
259 Q: AsRef<Path>,
260 O: AsRef<Path>,
261{
262 archive_queue_impl(queue.as_ref(), output.as_ref())
263}
264
265fn archive_queue_impl(queue: &Path, output: &Path) -> Result<(), ArchiveQueueError> {
266 for outbox in [Outbox::Accept, Outbox::Fail, Outbox::Reject]
267 .iter()
268 .cloned()
269 {
270 let (filename, file) = archive_file(output, outbox)?;
271 let opt_writer = archive_directory(queue, output, outbox, file)
272 .map_err(|err| ArchiveQueueError::archive(outbox, err))?;
273 if let Some((tempdir, writer)) = opt_writer {
274 writer
275 .finish()
276 .map_err(|err| ArchiveQueueError::finish_archive(tempdir, outbox, err))?;
277 } else {
278 fs::remove_file(filename)
279 .map_err(|err| ArchiveQueueError::remove_incomplete_archive(outbox, err))?;
280 }
281 }
282
283 Ok(())
284}
285
286#[derive(Debug, Error)]
288#[non_exhaustive]
289pub enum ArchiveError {
290 #[error("failed to create temporary directory: {}", source)]
292 CreateTempdir {
293 #[source]
295 source: io::Error,
296 },
297 #[error("failed to add directory: {}", source)]
299 AddDirectory {
300 #[source]
302 source: io::Error,
303 },
304 #[error("failed to read directory: {}", source)]
306 ReadDirectory {
307 #[source]
309 source: io::Error,
310 },
311 #[error("failed to append job: {}", source)]
313 AppendJob {
314 path: PathBuf,
316 #[source]
318 source: io::Error,
319 },
320 #[error("failed to finish tar stream: {}", source)]
322 FinishTar {
323 #[source]
325 source: io::Error,
326 },
327}
328
329impl ArchiveError {
330 fn create_tempdir(source: io::Error) -> Self {
331 ArchiveError::CreateTempdir {
332 source,
333 }
334 }
335
336 fn add_directory(source: io::Error) -> Self {
337 ArchiveError::AddDirectory {
338 source,
339 }
340 }
341
342 fn read_directory(source: io::Error) -> Self {
343 ArchiveError::ReadDirectory {
344 source,
345 }
346 }
347
348 fn append_job(tempdir: TempDir, path: PathBuf, source: io::Error) -> Self {
349 log_tempdir(tempdir);
350 ArchiveError::AppendJob {
351 path,
352 source,
353 }
354 }
355
356 fn finish_tar(tempdir: TempDir, source: io::Error) -> Self {
357 log_tempdir(tempdir);
358 ArchiveError::FinishTar {
359 source,
360 }
361 }
362}
363
364fn archive_file(
366 path: &Path,
367 outbox: Outbox,
368) -> Result<(PathBuf, LzmaWriter<File>), ArchiveQueueError> {
369 let now = Utc::now();
370 let filepath = path.join(format!("{}-{}.tar.xz", now.to_rfc3339(), outbox));
371 let file = File::create(&filepath)
372 .map_err(|err| ArchiveQueueError::create_output(outbox, filepath.clone(), err))?;
373 let writer = LzmaWriter::new_compressor(file, LZMA_COMPRESSION)
374 .map_err(|err| ArchiveQueueError::create_compressor(outbox, err))?;
375
376 Ok((filepath, writer))
377}
378
379fn archive_directory<O>(
381 path: &Path,
382 workdir: &Path,
383 outbox: Outbox,
384 output: O,
385) -> Result<Option<(TempDir, O)>, ArchiveError>
386where
387 O: Write,
388{
389 let tempdir = TempDir::new_in(workdir).map_err(ArchiveError::create_tempdir)?;
390 let outbox_name = outbox.name();
391 let outbox_path = PathBuf::from(outbox_name);
392 let mut archive = Builder::new(output);
393 archive
394 .append_dir(outbox_name, &tempdir)
395 .map_err(ArchiveError::add_directory)?;
396
397 let entries = fs::read_dir(path.join(outbox_name)).map_err(ArchiveError::read_directory)?;
402 let mut is_empty = true;
403 for entry in entries {
404 let entry = match entry {
405 Ok(entry) => entry,
406 Err(err) => {
407 warn!("failed to read directory entry; skipping: {:?}", err);
408 continue;
409 },
410 };
411 let path = entry.path();
412 let file_name = entry.file_name();
413 if let Err(err) = archive.append_path_with_name(&path, outbox_path.join(&file_name)) {
414 return Err(ArchiveError::append_job(tempdir, path, err));
419 }
420 let target_path = tempdir.path().join(&file_name);
422 match fs::rename(&path, &target_path) {
423 Ok(()) => is_empty = false,
424 Err(err) => {
425 warn!(
426 "failed to rename {} to {}: {:?}",
427 path.display(),
428 target_path.display(),
429 err,
430 )
431 },
432 }
433 }
434
435 if is_empty {
436 return Ok(None);
439 }
440
441 match archive.into_inner() {
444 Ok(output) => Ok(Some((tempdir, output))),
445 Err(err) => Err(ArchiveError::finish_tar(tempdir, err)),
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use std::fs;
452 use std::sync::{Arc, Mutex};
453 use std::thread;
454 use std::time::Duration;
455
456 use serde_json::json;
457
458 use crate::{test, utils};
459 use crate::{Director, DirectorWatchdog, HandlerCore, RunResult};
460
461 #[test]
462 fn test_log_tempdir_leaves_directory() {
463 let tempdir = test::test_workspace_dir();
464 let path = tempdir.path().to_path_buf();
465
466 super::log_tempdir(tempdir);
467 assert!(path.exists(), "{} should still exist", path.display());
468 fs::remove_dir_all(path).unwrap();
469 }
470
471 #[test]
472 fn test_write_job_waits_for_consumption() {
473 let tempdir = test::test_workspace_dir();
474 let mut director = Director::new(tempdir.path()).unwrap();
475 let mutex = Arc::new(Mutex::new(false));
476 let signal = test::FlaggingWatcher::new(mutex.clone());
477 signal.add_to_director(&mut director).unwrap();
478 DirectorWatchdog.add_to_director(&mut director).unwrap();
479
480 let thread_mutex = mutex.clone();
481 let job_path = tempdir.path().to_path_buf();
482 let write_thread = thread::spawn(move || {
483 let data = json!({
484 "kind": "flagging:set",
485 "data": {},
486 });
487 utils::write_job(&job_path, &data).unwrap();
488 assert!(
489 *thread_mutex.lock().unwrap(),
490 "`write_job` returned before the handler set the flag",
491 );
492
493 utils::exit(job_path).unwrap();
494 });
495
496 loop {
498 let entries = fs::read_dir(tempdir.path()).unwrap();
499 let nfiles = entries
500 .filter(|entry| entry.as_ref().unwrap().file_type().unwrap().is_file())
501 .count();
502 if nfiles == 1 {
503 break;
504 }
505 }
506 thread::sleep(Duration::from_millis(100));
508
509 let res = director.watch_directory(tempdir.path()).unwrap();
510 assert_eq!(res, RunResult::Done);
511
512 write_thread.join().unwrap();
513 }
514
515 #[test]
516 fn test_archive_queue() {
517 let tempdir = test::test_workspace_dir();
518 let archive = tempdir.path().join("archive");
519 let output = tempdir.path().join("output");
520 fs::create_dir_all(&archive).unwrap();
521 let _director = Director::new(&archive).unwrap(); fs::create_dir_all(&output).unwrap();
523
524 let accept = archive.join("accept");
525 let reject = archive.join("reject");
526 let fail = archive.join("fail");
527
528 assert_eq!(
529 accept.read_dir().unwrap().count(),
530 0,
531 "`accept` is not an empty directory",
532 );
533 assert_eq!(
534 reject.read_dir().unwrap().count(),
535 0,
536 "`reject` is not an empty directory",
537 );
538 assert_eq!(
539 fail.read_dir().unwrap().count(),
540 0,
541 "`fail` is not an empty directory",
542 );
543 assert_eq!(
544 output.read_dir().unwrap().count(),
545 0,
546 "`output` is not an empty directory",
547 );
548 utils::archive_queue(&archive, &output).unwrap();
549 assert_eq!(
550 output.read_dir().unwrap().count(),
551 0,
552 "archived empty queues",
553 );
554
555 fs::write(accept.join("accept.json"), b"{}").unwrap();
556 fs::write(reject.join("reject.json"), b"{}").unwrap();
557 fs::write(fail.join("fail.json"), b"{}").unwrap();
558 utils::archive_queue(archive, &output).unwrap();
559 assert_eq!(
560 output.read_dir().unwrap().count(),
561 3,
562 "not all archives archived",
563 );
564 }
565
566 #[test]
567 fn test_drop_restart() {
568 let tempdir = test::test_workspace_dir();
569 let mut director = Director::new(tempdir.path()).unwrap();
570 DirectorWatchdog.add_to_director(&mut director).unwrap();
571
572 let job_path = tempdir.path().to_path_buf();
573 let write_thread = thread::spawn(move || {
574 utils::restart(job_path).unwrap();
575 });
576
577 let res = director.watch_directory(tempdir.path()).unwrap();
578 assert_eq!(res, RunResult::Restart);
579
580 write_thread.join().unwrap();
581 }
582}