1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use anyhow::{Context, Result};
use std::fs::File;
use std::io::BufReader;
use std::fs;
use log::{info, warn};
use rayon::prelude::*;
use indicatif::ParallelProgressIterator;
use std::iter::zip;

use crate::{process_trait::ProcessingCore, items::Item};
pub use super::JsonProcess;

/// Implementation of the ProcessingCore trait for JsonProcess.
///
impl ProcessingCore for JsonProcess {
    /// Set the Items by parsing the JSON file.
    /// This method also remove input paths that do not exist.
    /// 
    fn set_items(&mut self) -> Result<()> {

        // Open the file in read-only mode with buffer.
        let file = File::open(&self.json_items)?;
        let reader = BufReader::new(file);

        let mut items: Vec<Item> = serde_json::from_reader(reader)
        .expect("error while reading or parsing the json_items file");

        for f in items.iter_mut() {
            f.input_item_paths.retain(|x| x.exists());
            if f.input_item_paths.is_empty() {
                warn!("Item {} has no valid input paths.", f.name);
            }
        }

        self.items = items;

        Ok(())
    }

    fn check_tmp_dir_exist(&self) -> Result<bool> {
        Ok(self.tmp_dir_path.is_some())
    }

    fn check_all_inputs_exist(&self) -> Result<bool> {
        let mut test = true;
        for f in self.items.iter() {
            for i in f.input_item_paths.iter() {
                test = test && i.exists();
            }
        }
        Ok(test)
    }

    fn create_tmp_directory(&self) -> Result<()> {
        fs::create_dir_all(self.tmp_dir_path.as_ref().context("as_ref() failed")?).with_context(
            || {
                format!(
                    "could not create temporary directory `{}`",
                    &self.tmp_dir_path.as_ref().unwrap().display()
                )
            },
        )?;
        info! {"Created tmp directory at {}",  &self.tmp_dir_path.as_ref().context("as_ref() failed")?.display()}
        Ok(())
    }

    /// Method that process all the Items.
    /// 
    fn process_items<F>(&self, f: F) -> Result<bool>
    where
        F: Fn(&Item) -> Result<bool> + Send + Sync,
    {
        info!("Process name {}", self.name);

        (self.items)
            .par_iter()
            .progress_count(self.items.len() as u64)
            .for_each(|i| {
                let fl = f(i)
                    .with_context(|| format!("could not process item `{}`", i.name))
                    .unwrap();

                if !fl {
                    let warn_description = format!("Process for {} not succesfull.", i.name);
                    warn!("Warning! {}!", warn_description);
                }
            });

        Ok(true)
    }

    fn move_files(&self) -> Result<bool> {
        for i in self.items.iter() {

            let tmp_paths = i.tmp_item_paths.as_ref().unwrap();

            let iter = zip(tmp_paths, &i.output_item_paths);

            for (p1, p2 )in iter {
                fs::rename(p1, p2)?;
            }
            }
        Ok(true)
    }
}