json_job_dispatch/
utils.rs

1// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4// option. This file may not be copied, modified, or distributed
5// except according to those terms.
6
7//! Utilities for director-based tools.
8//!
9//! Tools written using this crate usually have other tasks related to management of the job files.
10//! These functions are meant to be used in the tools so that these tasks are built into the tool
11//! rather than managed using external scripts.
12
13use std::fs::{self, File};
14use std::io::{self, Write};
15use std::path::{Path, PathBuf};
16use std::thread;
17use std::time::Duration;
18
19use chrono::Utc;
20use log::{error, warn};
21use lzma::LzmaWriter;
22use rand::Rng;
23use serde::Serialize;
24use serde_json::{json, Value};
25use tar::Builder;
26use tempfile::TempDir;
27use thiserror::Error;
28
29use crate::Outbox;
30
31/// An error creating a utiltiy job.
32#[derive(Debug, Error)]
33#[non_exhaustive]
34pub enum JobError {
35    /// Failure to create a job.
36    #[error("failed to create job {}: {}", filepath.display(), source)]
37    Create {
38        /// The path to the job.
39        filepath: PathBuf,
40        /// The source of the error.
41        #[source]
42        source: io::Error,
43    },
44    /// Failure to serialize a job.
45    #[error("failed to write job file {}: {}", filepath.display(), source)]
46    Write {
47        /// The path to the job.
48        filepath: PathBuf,
49        /// The source of the error.
50        #[source]
51        source: serde_json::Error,
52    },
53}
54
55impl JobError {
56    fn create(filepath: PathBuf, source: io::Error) -> Self {
57        JobError::Create {
58            filepath,
59            source,
60        }
61    }
62
63    fn write(filepath: PathBuf, source: serde_json::Error) -> Self {
64        JobError::Write {
65            filepath,
66            source,
67        }
68    }
69}
70
71type JobResult<T> = Result<T, JobError>;
72
73/// Write a job object to a new file in a directory.
74fn write_job(queue: &Path, data: &Value) -> JobResult<()> {
75    let rndpart = rand::rng()
76        .sample_iter(&rand::distr::Alphanumeric)
77        .map(char::from)
78        .take(12)
79        .collect::<String>();
80    let filename = format!("{}-{}.json", Utc::now().to_rfc3339(), rndpart);
81    let job_file = queue.join(filename);
82
83    // Write and close the job file.
84    {
85        let mut file =
86            File::create(&job_file).map_err(|err| JobError::create(job_file.clone(), err))?;
87        serde_json::to_writer(&mut file, data)
88            .map_err(|err| JobError::write(job_file.clone(), err))?;
89    }
90
91    // Wait for the job to have been processed.
92    while job_file.exists() {
93        thread::sleep(Duration::from_millis(100));
94    }
95
96    Ok(())
97}
98
99/// Write a job to the given queue.
100pub fn drop_job<Q, K, V>(queue: Q, kind: K, data: V) -> JobResult<()>
101where
102    Q: AsRef<Path>,
103    K: AsRef<str>,
104    V: Serialize,
105{
106    let job = json!({
107        "kind": kind.as_ref(),
108        "data": data,
109    });
110    write_job(queue.as_ref(), &job)
111}
112
113/// Write a restart job to the given queue.
114pub fn restart<Q>(queue: Q) -> JobResult<()>
115where
116    Q: AsRef<Path>,
117{
118    drop_job(queue, "watchdog:restart", json!({}))
119}
120
121/// Write an exit job to the given queue.
122pub fn exit<Q>(queue: Q) -> JobResult<()>
123where
124    Q: AsRef<Path>,
125{
126    drop_job(queue, "watchdog:exit", json!({}))
127}
128
129/// Log that a temporary directory containing jobs moved for archiving but left unarchived due to
130/// an error still exists.
131fn log_tempdir(tempdir: TempDir) {
132    error!(
133        "archival failed mid-stream; in-progress archival job files may be found in {}",
134        tempdir.into_path().display(),
135    );
136}
137
138/// An error archiving a queue.
139#[derive(Debug, Error)]
140#[non_exhaustive]
141pub enum ArchiveQueueError {
142    /// Failure to create an output file.
143    #[error("failed to create archive file {}: {}", path.display(), source)]
144    CreateOutput {
145        #[doc(hidden)]
146        outbox: Outbox,
147        /// The path to the archive file.
148        path: PathBuf,
149        /// The source of the error.
150        #[source]
151        source: io::Error,
152    },
153    /// Failure to create a compressor for the archive.
154    #[error("failed to create compressor: {}", source)]
155    CreateCompressor {
156        #[doc(hidden)]
157        outbox: Outbox,
158        /// The source of the error.
159        #[source]
160        source: lzma::LzmaError,
161    },
162    /// Failure when archiving jobs.
163    #[error("failed to archive the {} outbox: {}", outbox, source)]
164    Archive {
165        #[doc(hidden)]
166        outbox: Outbox,
167        /// The source of the error.
168        #[source]
169        source: ArchiveError,
170    },
171    /// Failure when finishing the archive.
172    #[error("failed to finish the {} archive: {}", outbox, source)]
173    FinishArchive {
174        #[doc(hidden)]
175        outbox: Outbox,
176        /// The source of the error.
177        #[source]
178        source: lzma::LzmaError,
179    },
180    /// Failure to remove an incomplete archive.
181    #[error("failed to remove an incomplete {} archive: {}", outbox, source)]
182    RemoveIncompleteArchive {
183        #[doc(hidden)]
184        outbox: Outbox,
185        /// The source of the error.
186        #[source]
187        source: io::Error,
188    },
189}
190
191impl ArchiveQueueError {
192    /// The outbox being archived when the error occurred.
193    pub fn outbox(&self) -> Outbox {
194        match self {
195            ArchiveQueueError::CreateOutput {
196                outbox, ..
197            }
198            | ArchiveQueueError::CreateCompressor {
199                outbox, ..
200            }
201            | ArchiveQueueError::Archive {
202                outbox, ..
203            }
204            | ArchiveQueueError::FinishArchive {
205                outbox, ..
206            }
207            | ArchiveQueueError::RemoveIncompleteArchive {
208                outbox, ..
209            } => *outbox,
210        }
211    }
212
213    fn create_output(outbox: Outbox, path: PathBuf, source: io::Error) -> Self {
214        ArchiveQueueError::CreateOutput {
215            outbox,
216            path,
217            source,
218        }
219    }
220
221    fn create_compressor(outbox: Outbox, source: lzma::LzmaError) -> Self {
222        ArchiveQueueError::CreateCompressor {
223            outbox,
224            source,
225        }
226    }
227
228    fn archive(outbox: Outbox, source: ArchiveError) -> Self {
229        ArchiveQueueError::Archive {
230            outbox,
231            source,
232        }
233    }
234
235    fn finish_archive(tempdir: TempDir, outbox: Outbox, source: lzma::LzmaError) -> Self {
236        log_tempdir(tempdir);
237        ArchiveQueueError::FinishArchive {
238            outbox,
239            source,
240        }
241    }
242
243    fn remove_incomplete_archive(outbox: Outbox, source: io::Error) -> Self {
244        ArchiveQueueError::RemoveIncompleteArchive {
245            outbox,
246            source,
247        }
248    }
249}
250
251/// The LZMA compression level to use for archiving.
252const LZMA_COMPRESSION: u32 = 6;
253
254/// Archive the jobs in the given queue into a tarball in the output directory.
255///
256/// Each subdirectory, `accept`, `fail`, and `reject` will be archived separately.
257pub fn archive_queue<Q, O>(queue: Q, output: O) -> Result<(), ArchiveQueueError>
258where
259    Q: AsRef<Path>,
260    O: AsRef<Path>,
261{
262    archive_queue_impl(queue.as_ref(), output.as_ref())
263}
264
265fn archive_queue_impl(queue: &Path, output: &Path) -> Result<(), ArchiveQueueError> {
266    for outbox in [Outbox::Accept, Outbox::Fail, Outbox::Reject]
267        .iter()
268        .cloned()
269    {
270        let (filename, file) = archive_file(output, outbox)?;
271        let opt_writer = archive_directory(queue, output, outbox, file)
272            .map_err(|err| ArchiveQueueError::archive(outbox, err))?;
273        if let Some((tempdir, writer)) = opt_writer {
274            writer
275                .finish()
276                .map_err(|err| ArchiveQueueError::finish_archive(tempdir, outbox, err))?;
277        } else {
278            fs::remove_file(filename)
279                .map_err(|err| ArchiveQueueError::remove_incomplete_archive(outbox, err))?;
280        }
281    }
282
283    Ok(())
284}
285
286/// An error when writing to an archive.
287#[derive(Debug, Error)]
288#[non_exhaustive]
289pub enum ArchiveError {
290    /// Failure to create a temporary directory for working.
291    #[error("failed to create temporary directory: {}", source)]
292    CreateTempdir {
293        /// The source of the error.
294        #[source]
295        source: io::Error,
296    },
297    /// Failure to add the root directory to the archive.
298    #[error("failed to add directory: {}", source)]
299    AddDirectory {
300        /// The source of the error.
301        #[source]
302        source: io::Error,
303    },
304    /// Failure to read the directory for processed job files.
305    #[error("failed to read directory: {}", source)]
306    ReadDirectory {
307        /// The source of the error.
308        #[source]
309        source: io::Error,
310    },
311    /// Failure to append a job to the archive.
312    #[error("failed to append job: {}", source)]
313    AppendJob {
314        /// The file that could not be appended.
315        path: PathBuf,
316        /// The source of the error.
317        #[source]
318        source: io::Error,
319    },
320    /// Failure to finish the tar stream.
321    #[error("failed to finish tar stream: {}", source)]
322    FinishTar {
323        /// The source of the error.
324        #[source]
325        source: io::Error,
326    },
327}
328
329impl ArchiveError {
330    fn create_tempdir(source: io::Error) -> Self {
331        ArchiveError::CreateTempdir {
332            source,
333        }
334    }
335
336    fn add_directory(source: io::Error) -> Self {
337        ArchiveError::AddDirectory {
338            source,
339        }
340    }
341
342    fn read_directory(source: io::Error) -> Self {
343        ArchiveError::ReadDirectory {
344            source,
345        }
346    }
347
348    fn append_job(tempdir: TempDir, path: PathBuf, source: io::Error) -> Self {
349        log_tempdir(tempdir);
350        ArchiveError::AppendJob {
351            path,
352            source,
353        }
354    }
355
356    fn finish_tar(tempdir: TempDir, source: io::Error) -> Self {
357        log_tempdir(tempdir);
358        ArchiveError::FinishTar {
359            source,
360        }
361    }
362}
363
364/// Create an archive file stream in the given path for the `outbox` files.
365fn archive_file(
366    path: &Path,
367    outbox: Outbox,
368) -> Result<(PathBuf, LzmaWriter<File>), ArchiveQueueError> {
369    let now = Utc::now();
370    let filepath = path.join(format!("{}-{}.tar.xz", now.to_rfc3339(), outbox));
371    let file = File::create(&filepath)
372        .map_err(|err| ArchiveQueueError::create_output(outbox, filepath.clone(), err))?;
373    let writer = LzmaWriter::new_compressor(file, LZMA_COMPRESSION)
374        .map_err(|err| ArchiveQueueError::create_compressor(outbox, err))?;
375
376    Ok((filepath, writer))
377}
378
379/// Archive a directory into an output stream.
380fn archive_directory<O>(
381    path: &Path,
382    workdir: &Path,
383    outbox: Outbox,
384    output: O,
385) -> Result<Option<(TempDir, O)>, ArchiveError>
386where
387    O: Write,
388{
389    let tempdir = TempDir::new_in(workdir).map_err(ArchiveError::create_tempdir)?;
390    let outbox_name = outbox.name();
391    let outbox_path = PathBuf::from(outbox_name);
392    let mut archive = Builder::new(output);
393    archive
394        .append_dir(outbox_name, &tempdir)
395        .map_err(ArchiveError::add_directory)?;
396
397    // Any errors after this point will need to perform `tempdir.into_path()` to prevent it from
398    // removing any job files which have been staged there in preparation for removal because the
399    // archive will not be successful at this point.
400
401    let entries = fs::read_dir(path.join(outbox_name)).map_err(ArchiveError::read_directory)?;
402    let mut is_empty = true;
403    for entry in entries {
404        let entry = match entry {
405            Ok(entry) => entry,
406            Err(err) => {
407                warn!("failed to read directory entry; skipping: {:?}", err);
408                continue;
409            },
410        };
411        let path = entry.path();
412        let file_name = entry.file_name();
413        if let Err(err) = archive.append_path_with_name(&path, outbox_path.join(&file_name)) {
414            // The state of `archive` is unknown if an error occurs when adding data to it; it
415            // cannot be trusted anymore, so bail out.
416            //
417            // See https://github.com/alexcrichton/tar-rs/issues/213
418            return Err(ArchiveError::append_job(tempdir, path, err));
419        }
420        // The file has been added to the archive; move it to the staging directory.
421        let target_path = tempdir.path().join(&file_name);
422        match fs::rename(&path, &target_path) {
423            Ok(()) => is_empty = false,
424            Err(err) => {
425                warn!(
426                    "failed to rename {} to {}: {:?}",
427                    path.display(),
428                    target_path.display(),
429                    err,
430                )
431            },
432        }
433    }
434
435    if is_empty {
436        // All good to let `tempdir` be destructed here; we didn't move anything to staging
437        // directory anyways.
438        return Ok(None);
439    }
440
441    // Both branches here need to pass on `tempdir`. The `Ok` path might still have an error before
442    // everything is good to go.
443    match archive.into_inner() {
444        Ok(output) => Ok(Some((tempdir, output))),
445        Err(err) => Err(ArchiveError::finish_tar(tempdir, err)),
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use std::fs;
452    use std::sync::{Arc, Mutex};
453    use std::thread;
454    use std::time::Duration;
455
456    use serde_json::json;
457
458    use crate::{test, utils};
459    use crate::{Director, DirectorWatchdog, HandlerCore, RunResult};
460
461    #[test]
462    fn test_log_tempdir_leaves_directory() {
463        let tempdir = test::test_workspace_dir();
464        let path = tempdir.path().to_path_buf();
465
466        super::log_tempdir(tempdir);
467        assert!(path.exists(), "{} should still exist", path.display());
468        fs::remove_dir_all(path).unwrap();
469    }
470
471    #[test]
472    fn test_write_job_waits_for_consumption() {
473        let tempdir = test::test_workspace_dir();
474        let mut director = Director::new(tempdir.path()).unwrap();
475        let mutex = Arc::new(Mutex::new(false));
476        let signal = test::FlaggingWatcher::new(mutex.clone());
477        signal.add_to_director(&mut director).unwrap();
478        DirectorWatchdog.add_to_director(&mut director).unwrap();
479
480        let thread_mutex = mutex.clone();
481        let job_path = tempdir.path().to_path_buf();
482        let write_thread = thread::spawn(move || {
483            let data = json!({
484                "kind": "flagging:set",
485                "data": {},
486            });
487            utils::write_job(&job_path, &data).unwrap();
488            assert!(
489                *thread_mutex.lock().unwrap(),
490                "`write_job` returned before the handler set the flag",
491            );
492
493            utils::exit(job_path).unwrap();
494        });
495
496        // Wait for a file to appear.
497        loop {
498            let entries = fs::read_dir(tempdir.path()).unwrap();
499            let nfiles = entries
500                .filter(|entry| entry.as_ref().unwrap().file_type().unwrap().is_file())
501                .count();
502            if nfiles == 1 {
503                break;
504            }
505        }
506        // Let the other thread make progress.
507        thread::sleep(Duration::from_millis(100));
508
509        let res = director.watch_directory(tempdir.path()).unwrap();
510        assert_eq!(res, RunResult::Done);
511
512        write_thread.join().unwrap();
513    }
514
515    #[test]
516    fn test_archive_queue() {
517        let tempdir = test::test_workspace_dir();
518        let archive = tempdir.path().join("archive");
519        let output = tempdir.path().join("output");
520        fs::create_dir_all(&archive).unwrap();
521        let _director = Director::new(&archive).unwrap(); // Create structure.
522        fs::create_dir_all(&output).unwrap();
523
524        let accept = archive.join("accept");
525        let reject = archive.join("reject");
526        let fail = archive.join("fail");
527
528        assert_eq!(
529            accept.read_dir().unwrap().count(),
530            0,
531            "`accept` is not an empty directory",
532        );
533        assert_eq!(
534            reject.read_dir().unwrap().count(),
535            0,
536            "`reject` is not an empty directory",
537        );
538        assert_eq!(
539            fail.read_dir().unwrap().count(),
540            0,
541            "`fail` is not an empty directory",
542        );
543        assert_eq!(
544            output.read_dir().unwrap().count(),
545            0,
546            "`output` is not an empty directory",
547        );
548        utils::archive_queue(&archive, &output).unwrap();
549        assert_eq!(
550            output.read_dir().unwrap().count(),
551            0,
552            "archived empty queues",
553        );
554
555        fs::write(accept.join("accept.json"), b"{}").unwrap();
556        fs::write(reject.join("reject.json"), b"{}").unwrap();
557        fs::write(fail.join("fail.json"), b"{}").unwrap();
558        utils::archive_queue(archive, &output).unwrap();
559        assert_eq!(
560            output.read_dir().unwrap().count(),
561            3,
562            "not all archives archived",
563        );
564    }
565
566    #[test]
567    fn test_drop_restart() {
568        let tempdir = test::test_workspace_dir();
569        let mut director = Director::new(tempdir.path()).unwrap();
570        DirectorWatchdog.add_to_director(&mut director).unwrap();
571
572        let job_path = tempdir.path().to_path_buf();
573        let write_thread = thread::spawn(move || {
574            utils::restart(job_path).unwrap();
575        });
576
577        let res = director.watch_directory(tempdir.path()).unwrap();
578        assert_eq!(res, RunResult::Restart);
579
580        write_thread.join().unwrap();
581    }
582}