1use std::fs::File;
2use std::path::Path;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use csv::{Reader, ReaderBuilder};
7use log::trace;
8
9use chord_core::flow::Flow;
10use chord_core::input::{async_trait, Error, JobLoader, StageLoader, TaskLoader};
11use chord_core::task::TaskId;
12use chord_core::value::{Map, Value};
13
14static LOAD_STRATEGY_DEFAULT: &str = "actual";
15
16pub struct CsvJobLoader {
17 path: PathBuf,
18 strategy: String,
19 path_is_task: bool,
20}
21
22impl CsvJobLoader {
23 pub async fn new<P: AsRef<Path>>(
24 conf: Option<&Value>,
25 path: P,
26 path_is_task: bool,
27 ) -> Result<CsvJobLoader, Error> {
28 let ls = conf
29 .map(|c| {
30 c["csv"]["strategy"]
31 .as_str()
32 .unwrap_or(LOAD_STRATEGY_DEFAULT)
33 })
34 .unwrap_or(LOAD_STRATEGY_DEFAULT);
35
36 Ok(CsvJobLoader {
37 path: path.as_ref().to_path_buf(),
38 strategy: ls.to_string(),
39 path_is_task,
40 })
41 }
42}
43
44#[async_trait]
45impl JobLoader for CsvJobLoader {
46 async fn task(
47 &self,
48 task_id: Arc<dyn TaskId>,
49 flow: Arc<Flow>,
50 ) -> Result<Box<dyn TaskLoader>, Error> {
51 let mut buf = self.path.clone();
52 if !self.path_is_task {
53 for p in task_id.task().split(".") {
54 buf.push(p);
55 }
56 }
57
58 let loader = CsvTaskLoader::new(flow, buf, self.strategy.clone()).await?;
59 Ok(Box::new(loader))
60 }
61}
62
63pub struct CsvTaskLoader {
64 flow: Arc<Flow>,
65 path: PathBuf,
66 strategy: String,
67}
68
69impl CsvTaskLoader {
70 async fn new(flow: Arc<Flow>, path: PathBuf, strategy: String) -> Result<CsvTaskLoader, Error> {
71 let loader = CsvTaskLoader {
72 flow,
73 path,
74 strategy,
75 };
76 Ok(loader)
77 }
78}
79
80#[async_trait]
81impl TaskLoader for CsvTaskLoader {
82 async fn stage(&self, stage_id: &str) -> Result<Box<dyn StageLoader>, Error> {
83 let conf = self.flow.stage_loader(stage_id);
84 let name = conf["name"].as_str().unwrap_or(stage_id);
85 let strategy = conf["strategy"].as_str().unwrap_or(self.strategy.as_str());
86 let path = self.path.join(format!("{}.csv", name));
87 let loader = CsvStageLoader::new(path, strategy.to_string()).await?;
88 Ok(Box::new(loader))
89 }
90}
91
92struct CsvStageLoader {
93 row_num: usize,
94 reader: Reader<File>,
95 strategy: String,
96}
97
98impl CsvStageLoader {
99 async fn new<P: AsRef<Path>>(path: P, strategy: String) -> Result<CsvStageLoader, Error> {
100 trace!("new CsvStageLoader {}", path.as_ref().to_str().unwrap());
101 let loader = CsvStageLoader {
102 row_num: 1,
103 reader: from_path(path.as_ref()).await?,
104 strategy,
105 };
106 Ok(loader)
107 }
108}
109
110#[async_trait]
111impl StageLoader for CsvStageLoader {
112 async fn load(&mut self, size: usize) -> Result<Vec<(String, Value)>, Error> {
113 let dv = load(&mut self.reader, size).await?;
114 let mut result: Vec<(String, Value)> = vec![];
115 for d in dv {
116 self.row_num = self.row_num + 1;
117 result.push((self.row_num.to_string(), d));
118 }
119
120 if (!result.is_empty()) && result.len() < size {
121 match self.strategy.as_str() {
122 "fix_size_repeat_last_page" => {
123 let last_page = result.clone();
124 for i in 0..size - result.len() {
125 let offset = i % last_page.len();
126 let fake_row = (
127 (self.row_num + 1 + i).to_string(),
128 last_page[offset].1.clone(),
129 );
130 result.push(fake_row);
131 }
132 }
133 _ => {}
134 }
135 }
136 Ok(result)
137 }
138}
139
140async fn load<R: std::io::Read>(reader: &mut Reader<R>, size: usize) -> Result<Vec<Value>, Error> {
141 let mut hashmap_vec = Vec::new();
142 let mut curr_size = 0;
143 for result in reader.deserialize() {
144 let result: Map = result?;
145
146 let mut record: Map = Map::new();
147 for (k, v) in result {
149 if v.is_string() {
150 record.insert(k, v);
151 } else {
152 record.insert(k, Value::String(v.to_string()));
153 }
154 }
155
156 hashmap_vec.push(Value::Object(record));
157
158 curr_size += 1;
159 if curr_size == size {
160 break;
161 }
162 }
163 Ok(hashmap_vec)
164}
165
166async fn from_path<P: AsRef<Path>>(path: P) -> Result<Reader<File>, Error> {
167 Ok(ReaderBuilder::new().from_path(path)?)
168}