processing_chain 0.2.2

Rust library to set up processing chains of large amounts of data.
Documentation
use anyhow::{Context, Result};
use std::ffi::OsStr;
use std::path::PathBuf;
use std::{fs, io};
use log::{info, warn};
use rayon::prelude::*;
use indicatif::ParallelProgressIterator;
use std::iter::zip;

use crate::{process_trait::ProcessingCore, items::Item};
pub use super::Process;


impl ProcessingCore for Process {
    fn set_items(&mut self) -> Result<()> {
        let entries = fs::read_dir(&self.inputs_dir_path)?
            .map(|res| res.map(|e| e.path()))
            .collect::<Result<Vec<_>, io::Error>>()?;

        let mut items: Vec<Item> = Vec::new();
        let mut i = 0;
        for e in entries.into_iter() {
            if e.extension().unwrap_or_else(|| OsStr::new("")) == OsStr::new(&self.inputs_extenion)
            {
                let file_name = e.file_name().context("file_name() failed")?;
                let mut output_item_path = PathBuf::new();
                let mut tmp_item_path = PathBuf::new();

                if self.tmp_dir_path.is_some() {
                    tmp_item_path.push(
                        self.tmp_dir_path
                            .as_ref()
                            .context("as_ref() failed")?
                            .to_path_buf()
                            .join(file_name),
                    );
                    output_item_path.push(self.outputs_dir_path.to_path_buf().join(file_name));
                } else {
                    output_item_path.push(self.outputs_dir_path.to_path_buf().join(file_name));
                }

                if !self.overwrite && output_item_path.exists() {
                    continue;
                }

                let it = Item {
                    name: format!("file_{}", i),
                    input_item_paths: vec![e.to_path_buf()],
                    output_item_paths: vec![output_item_path],
                    tmp_item_paths: Some(vec![tmp_item_path]),
                };
                i += 1;
                items.push(it)
            }
        }
        self.items = items;
        Ok(())
    }

    fn check_all_inputs_exist(&self) -> Result<bool> {
        let mut test = true;
        for f in self.items.iter() {
            for i in f.input_item_paths.iter() {
                test = test && i.exists();
            }
        }
        Ok(test)
    }

    fn check_tmp_dir_exist(&self) -> Result<bool> {
        Ok(self.tmp_dir_path.is_some())
    }

    fn create_tmp_directory(&self) -> Result<()> {
        fs::create_dir_all(self.tmp_dir_path.as_ref().context("as_ref() failed")?).with_context(
            || {
                format!(
                    "could not create temporary directory `{}`",
                    &self.tmp_dir_path.as_ref().unwrap().display()
                )
            },
        )?;
        info! {"Created tmp directory at {}",  &self.tmp_dir_path.as_ref().context("as_ref() failed")?.display()}
        Ok(())
    }

    fn process_items<F>(&self, f: F) -> Result<bool>
    where
        F: Fn(&Item) -> Result<bool> + Send + Sync,
    {
        info!("Process name {}", self.name);

        (self.items)
            .par_iter()
            .progress_count(self.items.len() as u64)
            .for_each(|i| {
                let fl = f(i)
                    .with_context(|| format!("could not process item `{}`", i.name))
                    .unwrap();

                if !fl {
                    let warn_description = format!("Process for {} not succesfull.", i.name);
                    warn!("Warning! {}!", warn_description);
                }
            });

        Ok(true)
    }

    fn move_files(&self) -> Result<bool> {
        for i in self.items.iter() {

            let tmp_paths = i.tmp_item_paths.as_ref().unwrap();

            let iter = zip(tmp_paths, &i.output_item_paths);

            for (p1, p2 )in iter {
                fs::rename(p1, p2)?;
            }
            }
        Ok(true)
    }
}