Skip to main content

chewdata/
lib.rs

1//! This crate is a Rust ETL to Manipulate data everywhere. You can use the program or use the library in your code.
2//!
3//! # How/Why to use this ETL ?
4//!
5//! You can find the detail of this project in the [repository](https://github.com/jmfiaschi/chewdata).
6#![forbid(unsafe_code)]
7
8extern crate glob;
9extern crate json_value_merge;
10extern crate json_value_resolve;
11extern crate serde;
12extern crate serde_json;
13#[macro_use]
14extern crate tracing;
15
16pub mod connector;
17pub mod document;
18pub mod helper;
19pub mod step;
20pub mod updater;
21
22use self::step::StepType;
23use async_channel::{Receiver, Sender};
24use connector::Connector;
25use futures::stream::{self, Stream};
26use futures::StreamExt;
27use json_value_merge::Merge;
28use serde::{Deserialize, Serialize};
29use serde_json::{Map, Value};
30use std::io::Result;
31use std::pin::Pin;
32use std::{collections::HashMap, io};
33
34#[cfg(feature = "curl")]
35use async_lock::OnceCell;
36
37pub const PROJECT_NAME: &str = "chewdata";
38
39pub type DataStream = Pin<Box<dyn Stream<Item = DataResult> + Send>>;
40pub type ConnectorStream = Pin<Box<dyn Stream<Item = Result<Box<dyn Connector>>> + Send>>;
41pub type DataSet = Vec<DataResult>;
42
43#[cfg(feature = "curl")]
44static TLS_INIT: OnceCell<()> = OnceCell::new();
45
46pub async fn exec(
47    step_types: Vec<StepType>,
48    input_receiver: Option<Receiver<Context>>,
49    output_sender: Option<Sender<Context>>,
50) -> Result<()> {
51    #[cfg(feature = "curl")]
52    init_tls().await?;
53
54    let mut steps = Vec::default();
55    let step_types_len = step_types.len();
56    let mut previous_step_receiver = input_receiver;
57
58    for (pos, step_type) in step_types.into_iter().enumerate() {
59        let mut step = step_type.step_inner();
60        let step_number = step.number();
61        let record_limit = step.record_limit();
62        let (sender, receiver) = async_channel::bounded(record_limit);
63
64        let mut sender_option = None;
65        if pos != step_types_len - 1 {
66            sender_option = Some(sender.clone());
67        } else if let Some(external_sender) = &output_sender {
68            sender_option = Some(external_sender.clone());
69        }
70
71        if let Some(receiver) = previous_step_receiver {
72            step.set_receiver(receiver.clone());
73        }
74
75        if let Some(sender) = sender_option {
76            step.set_sender(sender.clone());
77        }
78
79        for _pos in 0..step_number {
80            steps.push(step.clone());
81        }
82        previous_step_receiver = Some(receiver);
83    }
84
85    let results: Vec<Result<_>> = stream::iter(steps)
86        .map(|step| smol::spawn(async move { step.exec().await }))
87        .buffer_unordered(usize::MAX)
88        .collect()
89        .await;
90
91    results
92        .into_iter()
93        .filter(|result| result.is_err())
94        .map(|result| warn!("{:?}", result))
95        .for_each(drop);
96
97    Ok(())
98}
99
100#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
101#[serde(default, deny_unknown_fields)]
102pub struct Metadata {
103    pub has_headers: Option<bool>,
104    pub delimiter: Option<String>,
105    pub quote: Option<String>,
106    pub escape: Option<String>,
107    pub comment: Option<String>,
108    pub terminator: Option<String>,
109    pub mime_type: Option<String>,
110    pub mime_subtype: Option<String>,
111    pub charset: Option<String>,
112    pub compression: Option<String>,
113    pub language: Option<String>,
114}
115
116#[allow(dead_code)]
117impl Metadata {
118    fn merge(self, metadata: &Metadata) -> Metadata {
119        Metadata {
120            has_headers: metadata.has_headers.or(self.has_headers),
121            delimiter: metadata.delimiter.clone().or(self.delimiter),
122            quote: metadata.quote.clone().or(self.quote),
123            escape: metadata.escape.clone().or(self.escape),
124            comment: metadata.comment.clone().or(self.comment),
125            terminator: metadata.terminator.clone().or(self.terminator),
126            mime_type: metadata.mime_type.clone().or(self.mime_type),
127            mime_subtype: metadata.mime_subtype.clone().or(self.mime_subtype),
128            charset: metadata.charset.clone().or(self.charset),
129            compression: metadata.compression.clone().or(self.compression),
130            language: metadata.language.clone().or(self.language),
131        }
132    }
133    fn content_type(&self) -> String {
134        let mut content_type = String::default();
135
136        if let (Some(mime_type), Some(mime_subtype)) = (&self.mime_type, &self.mime_subtype) {
137            content_type = format!("{}/{}", mime_type, mime_subtype);
138
139            if let Some(charset) = &self.charset {
140                content_type += &format!("; charset={}", charset);
141            }
142        }
143
144        content_type
145    }
146    fn content_language(&self) -> String {
147        self.language.clone().unwrap_or_default()
148    }
149    fn to_hashmap(&self) -> HashMap<String, String> {
150        let mut hashmap: HashMap<String, String> = HashMap::default();
151        if let Some(has_headers) = self.has_headers {
152            hashmap.insert("has_headers".to_string(), has_headers.to_string());
153        }
154        if let Some(delimiter) = &self.delimiter {
155            hashmap.insert("delimiter".to_string(), delimiter.clone());
156        }
157        if let Some(quote) = &self.quote {
158            hashmap.insert("quote".to_string(), quote.clone());
159        }
160        if let Some(escape) = &self.escape {
161            hashmap.insert("escape".to_string(), escape.clone());
162        }
163        if let Some(comment) = &self.comment {
164            hashmap.insert("comment".to_string(), comment.clone());
165        }
166        if let Some(terminator) = &self.terminator {
167            hashmap.insert("terminator".to_string(), terminator.clone());
168        }
169        if let (Some(_), Some(_)) = (&self.mime_type, &self.mime_subtype) {
170            hashmap.insert("content_type".to_string(), self.content_type());
171        }
172        if let Some(compression) = &self.compression {
173            hashmap.insert("compression".to_string(), compression.clone());
174        }
175        if let Some(language) = &self.language {
176            hashmap.insert("Content-Language".to_string(), language.clone());
177        }
178        hashmap
179    }
180}
181
182impl From<Metadata> for Value {
183    fn from(metadata: Metadata) -> Value {
184        let mut options = Map::default();
185        if let Some(has_headers) = metadata.has_headers {
186            options.insert("has_headers".to_string(), Value::Bool(has_headers));
187        }
188        if let Some(delimiter) = &metadata.delimiter {
189            options.insert("delimiter".to_string(), Value::String(delimiter.clone()));
190        }
191        if let Some(quote) = &metadata.quote {
192            options.insert("quote".to_string(), Value::String(quote.clone()));
193        }
194        if let Some(escape) = &metadata.escape {
195            options.insert("escape".to_string(), Value::String(escape.clone()));
196        }
197        if let Some(comment) = &metadata.comment {
198            options.insert("comment".to_string(), Value::String(comment.clone()));
199        }
200        if let Some(compression) = &metadata.compression {
201            options.insert(
202                "compression".to_string(),
203                Value::String(compression.clone()),
204            );
205        }
206        if let Some(mime_type) = &metadata.mime_type {
207            options.insert("mime_type".to_string(), Value::String(mime_type.clone()));
208        }
209        if let Some(mime_subtype) = &metadata.mime_subtype {
210            options.insert(
211                "mime_subtype".to_string(),
212                Value::String(mime_subtype.clone()),
213            );
214        }
215        if let Some(charset) = &metadata.charset {
216            options.insert("charset".to_string(), Value::String(charset.clone()));
217        }
218        if let Some(language) = metadata.language {
219            options.insert("language".to_string(), Value::String(language));
220        }
221
222        Value::Object(options)
223    }
224}
225
226#[derive(Debug)]
227pub enum DataResult {
228    Ok(Value),
229    Err((Value, io::Error)),
230}
231
232impl Clone for DataResult {
233    fn clone(&self) -> Self {
234        match self {
235            DataResult::Ok(value) => DataResult::Ok(value.clone()),
236            DataResult::Err((value, e)) => {
237                DataResult::Err((value.clone(), io::Error::new(e.kind(), e.to_string())))
238            }
239        }
240    }
241}
242
243impl PartialEq for DataResult {
244    fn eq(&self, other: &Self) -> bool {
245        match (self, other) {
246            (DataResult::Ok(value1), DataResult::Ok(value2)) => value1 == value2,
247            (DataResult::Err((value1, e1)), DataResult::Err((value2, e2))) => {
248                value1 == value2 && e1.to_string() == e2.to_string()
249            }
250            (_, _) => false,
251        }
252    }
253}
254
255impl DataResult {
256    pub const OK: &'static str = "ok";
257    pub const ERR: &'static str = "err";
258    const FIELD_ERROR: &'static str = "_error";
259
260    pub fn to_value(&self) -> Value {
261        match self {
262            DataResult::Ok(value) => value.to_owned(),
263            DataResult::Err((value, error)) => {
264                let mut json_value = value.to_owned();
265                match json_value {
266                    Value::Array(_) => json_value
267                        .merge_in(
268                            format!("/*/{}", DataResult::FIELD_ERROR).as_ref(),
269                            &Value::String(format!("{}", error)),
270                        )
271                        .unwrap(),
272                    _ => json_value
273                        .merge_in(
274                            format!("/{}", DataResult::FIELD_ERROR).as_ref(),
275                            &Value::String(format!("{}", error)),
276                        )
277                        .unwrap(),
278                }
279
280                json_value
281            }
282        }
283    }
284    pub fn is_type(&self, data_type: &str) -> bool {
285        matches!(
286            (self, data_type),
287            (DataResult::Ok(_), DataResult::OK) | (DataResult::Err(_), DataResult::ERR)
288        )
289    }
290    pub fn merge(&mut self, data_result: DataResult) {
291        let new_json_value = data_result.to_value();
292
293        match self {
294            DataResult::Ok(value) => {
295                value.merge(&new_json_value);
296            }
297            DataResult::Err((value, _e)) => {
298                value.merge(&new_json_value);
299            }
300        };
301    }
302}
303
304#[derive(Debug, Clone, PartialEq)]
305pub struct Context {
306    // Previous steps history
307    steps: Value,
308    input: DataResult,
309}
310
311impl Context {
312    pub fn new(step_name: String, data_result: DataResult) -> Self {
313        let mut map = Map::default();
314        map.insert(step_name, data_result.to_value());
315
316        Context {
317            steps: Value::Object(map),
318            input: data_result,
319        }
320    }
321    pub fn insert_step_result(&mut self, step_name: String, data_result: DataResult) {
322        let mut map = Map::default();
323        map.insert(step_name, data_result.to_value());
324
325        self.steps.merge(&Value::Object(map));
326        self.input = data_result;
327    }
328    pub fn input(&self) -> DataResult {
329        self.input.clone()
330    }
331    pub fn steps(&self) -> Value {
332        self.steps.clone()
333    }
334    pub fn to_value(&self) -> Result<Value> {
335        let mut value = Value::default();
336        value.merge_in("/input", &self.input.to_value())?;
337        value.merge_in("/steps", &self.steps)?;
338        Ok(value)
339    }
340}
341
342#[cfg(feature = "curl")]
343async fn init_tls() -> io::Result<()> {
344    TLS_INIT
345        .get_or_init(|| async {
346            let _ = rustls::crypto::CryptoProvider::install_default(
347                rustls::crypto::ring::default_provider(),
348            );
349        })
350        .await;
351    Ok(())
352}