1#![forbid(unsafe_code)]
7
8extern crate glob;
9extern crate json_value_merge;
10extern crate json_value_resolve;
11extern crate serde;
12extern crate serde_json;
13#[macro_use]
14extern crate tracing;
15
16pub mod connector;
17pub mod document;
18pub mod helper;
19pub mod step;
20pub mod updater;
21
22use self::step::StepType;
23use async_channel::{Receiver, Sender};
24use connector::Connector;
25use futures::stream::{self, Stream};
26use futures::StreamExt;
27use json_value_merge::Merge;
28use serde::{Deserialize, Serialize};
29use serde_json::{Map, Value};
30use std::io::Result;
31use std::pin::Pin;
32use std::{collections::HashMap, io};
33
34#[cfg(feature = "curl")]
35use async_lock::OnceCell;
36
37pub const PROJECT_NAME: &str = "chewdata";
38
39pub type DataStream = Pin<Box<dyn Stream<Item = DataResult> + Send>>;
40pub type ConnectorStream = Pin<Box<dyn Stream<Item = Result<Box<dyn Connector>>> + Send>>;
41pub type DataSet = Vec<DataResult>;
42
43#[cfg(feature = "curl")]
44static TLS_INIT: OnceCell<()> = OnceCell::new();
45
46pub async fn exec(
47 step_types: Vec<StepType>,
48 input_receiver: Option<Receiver<Context>>,
49 output_sender: Option<Sender<Context>>,
50) -> Result<()> {
51 #[cfg(feature = "curl")]
52 init_tls().await?;
53
54 let mut steps = Vec::default();
55 let step_types_len = step_types.len();
56 let mut previous_step_receiver = input_receiver;
57
58 for (pos, step_type) in step_types.into_iter().enumerate() {
59 let mut step = step_type.step_inner();
60 let step_number = step.number();
61 let record_limit = step.record_limit();
62 let (sender, receiver) = async_channel::bounded(record_limit);
63
64 let mut sender_option = None;
65 if pos != step_types_len - 1 {
66 sender_option = Some(sender.clone());
67 } else if let Some(external_sender) = &output_sender {
68 sender_option = Some(external_sender.clone());
69 }
70
71 if let Some(receiver) = previous_step_receiver {
72 step.set_receiver(receiver.clone());
73 }
74
75 if let Some(sender) = sender_option {
76 step.set_sender(sender.clone());
77 }
78
79 for _pos in 0..step_number {
80 steps.push(step.clone());
81 }
82 previous_step_receiver = Some(receiver);
83 }
84
85 let results: Vec<Result<_>> = stream::iter(steps)
86 .map(|step| smol::spawn(async move { step.exec().await }))
87 .buffer_unordered(usize::MAX)
88 .collect()
89 .await;
90
91 results
92 .into_iter()
93 .filter(|result| result.is_err())
94 .map(|result| warn!("{:?}", result))
95 .for_each(drop);
96
97 Ok(())
98}
99
100#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
101#[serde(default, deny_unknown_fields)]
102pub struct Metadata {
103 pub has_headers: Option<bool>,
104 pub delimiter: Option<String>,
105 pub quote: Option<String>,
106 pub escape: Option<String>,
107 pub comment: Option<String>,
108 pub terminator: Option<String>,
109 pub mime_type: Option<String>,
110 pub mime_subtype: Option<String>,
111 pub charset: Option<String>,
112 pub compression: Option<String>,
113 pub language: Option<String>,
114}
115
116#[allow(dead_code)]
117impl Metadata {
118 fn merge(self, metadata: &Metadata) -> Metadata {
119 Metadata {
120 has_headers: metadata.has_headers.or(self.has_headers),
121 delimiter: metadata.delimiter.clone().or(self.delimiter),
122 quote: metadata.quote.clone().or(self.quote),
123 escape: metadata.escape.clone().or(self.escape),
124 comment: metadata.comment.clone().or(self.comment),
125 terminator: metadata.terminator.clone().or(self.terminator),
126 mime_type: metadata.mime_type.clone().or(self.mime_type),
127 mime_subtype: metadata.mime_subtype.clone().or(self.mime_subtype),
128 charset: metadata.charset.clone().or(self.charset),
129 compression: metadata.compression.clone().or(self.compression),
130 language: metadata.language.clone().or(self.language),
131 }
132 }
133 fn content_type(&self) -> String {
134 let mut content_type = String::default();
135
136 if let (Some(mime_type), Some(mime_subtype)) = (&self.mime_type, &self.mime_subtype) {
137 content_type = format!("{}/{}", mime_type, mime_subtype);
138
139 if let Some(charset) = &self.charset {
140 content_type += &format!("; charset={}", charset);
141 }
142 }
143
144 content_type
145 }
146 fn content_language(&self) -> String {
147 self.language.clone().unwrap_or_default()
148 }
149 fn to_hashmap(&self) -> HashMap<String, String> {
150 let mut hashmap: HashMap<String, String> = HashMap::default();
151 if let Some(has_headers) = self.has_headers {
152 hashmap.insert("has_headers".to_string(), has_headers.to_string());
153 }
154 if let Some(delimiter) = &self.delimiter {
155 hashmap.insert("delimiter".to_string(), delimiter.clone());
156 }
157 if let Some(quote) = &self.quote {
158 hashmap.insert("quote".to_string(), quote.clone());
159 }
160 if let Some(escape) = &self.escape {
161 hashmap.insert("escape".to_string(), escape.clone());
162 }
163 if let Some(comment) = &self.comment {
164 hashmap.insert("comment".to_string(), comment.clone());
165 }
166 if let Some(terminator) = &self.terminator {
167 hashmap.insert("terminator".to_string(), terminator.clone());
168 }
169 if let (Some(_), Some(_)) = (&self.mime_type, &self.mime_subtype) {
170 hashmap.insert("content_type".to_string(), self.content_type());
171 }
172 if let Some(compression) = &self.compression {
173 hashmap.insert("compression".to_string(), compression.clone());
174 }
175 if let Some(language) = &self.language {
176 hashmap.insert("Content-Language".to_string(), language.clone());
177 }
178 hashmap
179 }
180}
181
182impl From<Metadata> for Value {
183 fn from(metadata: Metadata) -> Value {
184 let mut options = Map::default();
185 if let Some(has_headers) = metadata.has_headers {
186 options.insert("has_headers".to_string(), Value::Bool(has_headers));
187 }
188 if let Some(delimiter) = &metadata.delimiter {
189 options.insert("delimiter".to_string(), Value::String(delimiter.clone()));
190 }
191 if let Some(quote) = &metadata.quote {
192 options.insert("quote".to_string(), Value::String(quote.clone()));
193 }
194 if let Some(escape) = &metadata.escape {
195 options.insert("escape".to_string(), Value::String(escape.clone()));
196 }
197 if let Some(comment) = &metadata.comment {
198 options.insert("comment".to_string(), Value::String(comment.clone()));
199 }
200 if let Some(compression) = &metadata.compression {
201 options.insert(
202 "compression".to_string(),
203 Value::String(compression.clone()),
204 );
205 }
206 if let Some(mime_type) = &metadata.mime_type {
207 options.insert("mime_type".to_string(), Value::String(mime_type.clone()));
208 }
209 if let Some(mime_subtype) = &metadata.mime_subtype {
210 options.insert(
211 "mime_subtype".to_string(),
212 Value::String(mime_subtype.clone()),
213 );
214 }
215 if let Some(charset) = &metadata.charset {
216 options.insert("charset".to_string(), Value::String(charset.clone()));
217 }
218 if let Some(language) = metadata.language {
219 options.insert("language".to_string(), Value::String(language));
220 }
221
222 Value::Object(options)
223 }
224}
225
226#[derive(Debug)]
227pub enum DataResult {
228 Ok(Value),
229 Err((Value, io::Error)),
230}
231
232impl Clone for DataResult {
233 fn clone(&self) -> Self {
234 match self {
235 DataResult::Ok(value) => DataResult::Ok(value.clone()),
236 DataResult::Err((value, e)) => {
237 DataResult::Err((value.clone(), io::Error::new(e.kind(), e.to_string())))
238 }
239 }
240 }
241}
242
243impl PartialEq for DataResult {
244 fn eq(&self, other: &Self) -> bool {
245 match (self, other) {
246 (DataResult::Ok(value1), DataResult::Ok(value2)) => value1 == value2,
247 (DataResult::Err((value1, e1)), DataResult::Err((value2, e2))) => {
248 value1 == value2 && e1.to_string() == e2.to_string()
249 }
250 (_, _) => false,
251 }
252 }
253}
254
255impl DataResult {
256 pub const OK: &'static str = "ok";
257 pub const ERR: &'static str = "err";
258 const FIELD_ERROR: &'static str = "_error";
259
260 pub fn to_value(&self) -> Value {
261 match self {
262 DataResult::Ok(value) => value.to_owned(),
263 DataResult::Err((value, error)) => {
264 let mut json_value = value.to_owned();
265 match json_value {
266 Value::Array(_) => json_value
267 .merge_in(
268 format!("/*/{}", DataResult::FIELD_ERROR).as_ref(),
269 &Value::String(format!("{}", error)),
270 )
271 .unwrap(),
272 _ => json_value
273 .merge_in(
274 format!("/{}", DataResult::FIELD_ERROR).as_ref(),
275 &Value::String(format!("{}", error)),
276 )
277 .unwrap(),
278 }
279
280 json_value
281 }
282 }
283 }
284 pub fn is_type(&self, data_type: &str) -> bool {
285 matches!(
286 (self, data_type),
287 (DataResult::Ok(_), DataResult::OK) | (DataResult::Err(_), DataResult::ERR)
288 )
289 }
290 pub fn merge(&mut self, data_result: DataResult) {
291 let new_json_value = data_result.to_value();
292
293 match self {
294 DataResult::Ok(value) => {
295 value.merge(&new_json_value);
296 }
297 DataResult::Err((value, _e)) => {
298 value.merge(&new_json_value);
299 }
300 };
301 }
302}
303
304#[derive(Debug, Clone, PartialEq)]
305pub struct Context {
306 steps: Value,
308 input: DataResult,
309}
310
311impl Context {
312 pub fn new(step_name: String, data_result: DataResult) -> Self {
313 let mut map = Map::default();
314 map.insert(step_name, data_result.to_value());
315
316 Context {
317 steps: Value::Object(map),
318 input: data_result,
319 }
320 }
321 pub fn insert_step_result(&mut self, step_name: String, data_result: DataResult) {
322 let mut map = Map::default();
323 map.insert(step_name, data_result.to_value());
324
325 self.steps.merge(&Value::Object(map));
326 self.input = data_result;
327 }
328 pub fn input(&self) -> DataResult {
329 self.input.clone()
330 }
331 pub fn steps(&self) -> Value {
332 self.steps.clone()
333 }
334 pub fn to_value(&self) -> Result<Value> {
335 let mut value = Value::default();
336 value.merge_in("/input", &self.input.to_value())?;
337 value.merge_in("/steps", &self.steps)?;
338 Ok(value)
339 }
340}
341
342#[cfg(feature = "curl")]
343async fn init_tls() -> io::Result<()> {
344 TLS_INIT
345 .get_or_init(|| async {
346 let _ = rustls::crypto::CryptoProvider::install_default(
347 rustls::crypto::ring::default_provider(),
348 );
349 })
350 .await;
351 Ok(())
352}