chord-input 0.1.20

async parallel case executor
Documentation
use std::fs::File;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;

use csv::{Reader, ReaderBuilder};
use log::trace;

use chord_core::flow::Flow;
use chord_core::input::{async_trait, Error, JobLoader, StageLoader, TaskLoader};
use chord_core::task::TaskId;
use chord_core::value::{Map, Value};

static LOAD_STRATEGY_DEFAULT: &str = "actual";

pub struct CsvJobLoader {
    path: PathBuf,
    strategy: String,
    path_is_task: bool,
}

impl CsvJobLoader {
    pub async fn new<P: AsRef<Path>>(
        conf: Option<&Value>,
        path: P,
        path_is_task: bool,
    ) -> Result<CsvJobLoader, Error> {
        let ls = conf
            .map(|c| {
                c["csv"]["strategy"]
                    .as_str()
                    .unwrap_or(LOAD_STRATEGY_DEFAULT)
            })
            .unwrap_or(LOAD_STRATEGY_DEFAULT);

        Ok(CsvJobLoader {
            path: path.as_ref().to_path_buf(),
            strategy: ls.to_string(),
            path_is_task,
        })
    }
}

#[async_trait]
impl JobLoader for CsvJobLoader {
    async fn task(
        &self,
        task_id: Arc<dyn TaskId>,
        flow: Arc<Flow>,
    ) -> Result<Box<dyn TaskLoader>, Error> {
        let mut buf = self.path.clone();
        if !self.path_is_task {
            for p in task_id.task().split(".") {
                buf.push(p);
            }
        }

        let loader = CsvTaskLoader::new(flow, buf, self.strategy.clone()).await?;
        Ok(Box::new(loader))
    }
}

pub struct CsvTaskLoader {
    flow: Arc<Flow>,
    path: PathBuf,
    strategy: String,
}

impl CsvTaskLoader {
    async fn new(flow: Arc<Flow>, path: PathBuf, strategy: String) -> Result<CsvTaskLoader, Error> {
        let loader = CsvTaskLoader {
            flow,
            path,
            strategy,
        };
        Ok(loader)
    }
}

#[async_trait]
impl TaskLoader for CsvTaskLoader {
    async fn stage(&self, stage_id: &str) -> Result<Box<dyn StageLoader>, Error> {
        let conf = self.flow.stage_loader(stage_id);
        let name = conf["name"].as_str().unwrap_or(stage_id);
        let strategy = conf["strategy"].as_str().unwrap_or(self.strategy.as_str());
        let path = self.path.join(format!("{}.csv", name));
        let loader = CsvStageLoader::new(path, strategy.to_string()).await?;
        Ok(Box::new(loader))
    }
}

struct CsvStageLoader {
    row_num: usize,
    reader: Reader<File>,
    strategy: String,
}

impl CsvStageLoader {
    async fn new<P: AsRef<Path>>(path: P, strategy: String) -> Result<CsvStageLoader, Error> {
        trace!("new CsvStageLoader {}", path.as_ref().to_str().unwrap());
        let loader = CsvStageLoader {
            row_num: 1,
            reader: from_path(path.as_ref()).await?,
            strategy,
        };
        Ok(loader)
    }
}

#[async_trait]
impl StageLoader for CsvStageLoader {
    async fn load(&mut self, size: usize) -> Result<Vec<(String, Value)>, Error> {
        let dv = load(&mut self.reader, size).await?;
        let mut result: Vec<(String, Value)> = vec![];
        for d in dv {
            self.row_num = self.row_num + 1;
            result.push((self.row_num.to_string(), d));
        }

        if (!result.is_empty()) && result.len() < size {
            match self.strategy.as_str() {
                "fix_size_repeat_last_page" => {
                    let last_page = result.clone();
                    for i in 0..size - result.len() {
                        let offset = i % last_page.len();
                        let fake_row = (
                            (self.row_num + 1 + i).to_string(),
                            last_page[offset].1.clone(),
                        );
                        result.push(fake_row);
                    }
                }
                _ => {}
            }
        }
        Ok(result)
    }
}

async fn load<R: std::io::Read>(reader: &mut Reader<R>, size: usize) -> Result<Vec<Value>, Error> {
    let mut hashmap_vec = Vec::new();
    let mut curr_size = 0;
    for result in reader.deserialize() {
        let result: Map = result?;

        let mut record: Map = Map::new();
        //data fields must all be string
        for (k, v) in result {
            if v.is_string() {
                record.insert(k, v);
            } else {
                record.insert(k, Value::String(v.to_string()));
            }
        }

        hashmap_vec.push(Value::Object(record));

        curr_size += 1;
        if curr_size == size {
            break;
        }
    }
    Ok(hashmap_vec)
}

async fn from_path<P: AsRef<Path>>(path: P) -> Result<Reader<File>, Error> {
    Ok(ReaderBuilder::new().from_path(path)?)
}