chord_input/load/
csv.rs

1use std::fs::File;
2use std::path::Path;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use csv::{Reader, ReaderBuilder};
7use log::trace;
8
9use chord_core::flow::Flow;
10use chord_core::input::{async_trait, Error, JobLoader, StageLoader, TaskLoader};
11use chord_core::task::TaskId;
12use chord_core::value::{Map, Value};
13
14static LOAD_STRATEGY_DEFAULT: &str = "actual";
15
16pub struct CsvJobLoader {
17    path: PathBuf,
18    strategy: String,
19    path_is_task: bool,
20}
21
22impl CsvJobLoader {
23    pub async fn new<P: AsRef<Path>>(
24        conf: Option<&Value>,
25        path: P,
26        path_is_task: bool,
27    ) -> Result<CsvJobLoader, Error> {
28        let ls = conf
29            .map(|c| {
30                c["csv"]["strategy"]
31                    .as_str()
32                    .unwrap_or(LOAD_STRATEGY_DEFAULT)
33            })
34            .unwrap_or(LOAD_STRATEGY_DEFAULT);
35
36        Ok(CsvJobLoader {
37            path: path.as_ref().to_path_buf(),
38            strategy: ls.to_string(),
39            path_is_task,
40        })
41    }
42}
43
44#[async_trait]
45impl JobLoader for CsvJobLoader {
46    async fn task(
47        &self,
48        task_id: Arc<dyn TaskId>,
49        flow: Arc<Flow>,
50    ) -> Result<Box<dyn TaskLoader>, Error> {
51        let mut buf = self.path.clone();
52        if !self.path_is_task {
53            for p in task_id.task().split(".") {
54                buf.push(p);
55            }
56        }
57
58        let loader = CsvTaskLoader::new(flow, buf, self.strategy.clone()).await?;
59        Ok(Box::new(loader))
60    }
61}
62
63pub struct CsvTaskLoader {
64    flow: Arc<Flow>,
65    path: PathBuf,
66    strategy: String,
67}
68
69impl CsvTaskLoader {
70    async fn new(flow: Arc<Flow>, path: PathBuf, strategy: String) -> Result<CsvTaskLoader, Error> {
71        let loader = CsvTaskLoader {
72            flow,
73            path,
74            strategy,
75        };
76        Ok(loader)
77    }
78}
79
80#[async_trait]
81impl TaskLoader for CsvTaskLoader {
82    async fn stage(&self, stage_id: &str) -> Result<Box<dyn StageLoader>, Error> {
83        let conf = self.flow.stage_loader(stage_id);
84        let name = conf["name"].as_str().unwrap_or(stage_id);
85        let strategy = conf["strategy"].as_str().unwrap_or(self.strategy.as_str());
86        let path = self.path.join(format!("{}.csv", name));
87        let loader = CsvStageLoader::new(path, strategy.to_string()).await?;
88        Ok(Box::new(loader))
89    }
90}
91
92struct CsvStageLoader {
93    row_num: usize,
94    reader: Reader<File>,
95    strategy: String,
96}
97
98impl CsvStageLoader {
99    async fn new<P: AsRef<Path>>(path: P, strategy: String) -> Result<CsvStageLoader, Error> {
100        trace!("new CsvStageLoader {}", path.as_ref().to_str().unwrap());
101        let loader = CsvStageLoader {
102            row_num: 1,
103            reader: from_path(path.as_ref()).await?,
104            strategy,
105        };
106        Ok(loader)
107    }
108}
109
110#[async_trait]
111impl StageLoader for CsvStageLoader {
112    async fn load(&mut self, size: usize) -> Result<Vec<(String, Value)>, Error> {
113        let dv = load(&mut self.reader, size).await?;
114        let mut result: Vec<(String, Value)> = vec![];
115        for d in dv {
116            self.row_num = self.row_num + 1;
117            result.push((self.row_num.to_string(), d));
118        }
119
120        if (!result.is_empty()) && result.len() < size {
121            match self.strategy.as_str() {
122                "fix_size_repeat_last_page" => {
123                    let last_page = result.clone();
124                    for i in 0..size - result.len() {
125                        let offset = i % last_page.len();
126                        let fake_row = (
127                            (self.row_num + 1 + i).to_string(),
128                            last_page[offset].1.clone(),
129                        );
130                        result.push(fake_row);
131                    }
132                }
133                _ => {}
134            }
135        }
136        Ok(result)
137    }
138}
139
140async fn load<R: std::io::Read>(reader: &mut Reader<R>, size: usize) -> Result<Vec<Value>, Error> {
141    let mut hashmap_vec = Vec::new();
142    let mut curr_size = 0;
143    for result in reader.deserialize() {
144        let result: Map = result?;
145
146        let mut record: Map = Map::new();
147        //data fields must all be string
148        for (k, v) in result {
149            if v.is_string() {
150                record.insert(k, v);
151            } else {
152                record.insert(k, Value::String(v.to_string()));
153            }
154        }
155
156        hashmap_vec.push(Value::Object(record));
157
158        curr_size += 1;
159        if curr_size == size {
160            break;
161        }
162    }
163    Ok(hashmap_vec)
164}
165
166async fn from_path<P: AsRef<Path>>(path: P) -> Result<Reader<File>, Error> {
167    Ok(ReaderBuilder::new().from_path(path)?)
168}