1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
use std::{
    fs,
    io::{Read, Write},
    path::PathBuf,
};

use anyhow::Context;
use tracing::{debug, info};

use super::{Pipeline, Resource};

pub struct Job<'a> {
    pub(super) input_path: PathBuf,
    pub(super) output_path: PathBuf,
    pub(super) pipeline: &'a Pipeline,
}

impl<'a> Job<'a> {
    #[tracing::instrument(ret, skip_all, fields(pipeline = self.pipeline.name, input_path = self.input_path.to_str()))]
    pub fn execute(&self, resource: &Resource) -> anyhow::Result<()> {
        info!(
            "start pipeline {} -> {}",
            self.input_path.display(),
            self.output_path.display(),
        );

        let mut input_file = fs::File::open(&self.input_path)
            .with_context(|| format!("failed to open file \"{}\"", &self.input_path.display()))?;

        let mut input_bytes = Vec::new();
        input_file
            .read_to_end(&mut input_bytes)
            .with_context(|| format!("failed to read file \"{}\"", &self.input_path.display()))?;

        let entry_directory = self
            .input_path
            .parent()
            .map(|parent| parent.to_string_lossy().to_string());

        let output_bytes = self
            .pipeline
            .execute(input_bytes, resource, entry_directory)
            .context("pipeline failed")?;

        if let Some(parent) = self.output_path.parent() {
            debug!("create parent directory");
            fs::create_dir_all(parent)
                .with_context(|| format!("failed to create directory \"{}\"", parent.display()))?;
        }
        let mut output_file = fs::OpenOptions::new()
            .write(true)
            .create(true)
            .append(false)
            .open(&self.output_path)
            .with_context(|| {
                format!(
                    "failed to open output file \"{}\"",
                    self.output_path.display()
                )
            })?;
        output_file
            .write_all(&output_bytes)
            .with_context(|| format!("failed to write file \"{}\"", self.output_path.display()))?;

        info!("finish pipeline");
        Ok(())
    }

    pub fn pipeline(&self) -> &Pipeline {
        self.pipeline
    }

    pub fn input_path(&self) -> &PathBuf {
        &self.input_path
    }

    pub fn output_path(&self) -> &PathBuf {
        &self.output_path
    }
}