Skip to main content

chewdata/step/
validator.rs

1//! Check the consistancy of the data.
2//!
3//! If a data is not valid, an error message is stored in the field `_error` before to share the data to another step and the data is tagged with an error.
4//! Use the `data_type` field of a `step` to target which kind of data a step can handle.
5//!
6//! ### Actions
7//!
8//! 1 - Get a [`crate::Context`] from the input queue.  
9//! 2 - Extract the [`crate::DataResult`] from the [`crate::Context`].  
10//! 3 - Validate the data with a list of rules.  
11//! 4 - Create a new [`crate::Context`] and attach the [`crate::DataResult`] to it.  
12//! 5 - Push the new [`crate::Context`] into the output queue.  
13//! 6 - Go to step 1 until the input queue is not empty.  
14//!
15//! ### Configuration
16//!
17//! | key             | alias   | Description                                                                                                       | Default Value | Possible Values                                 |
18//! | --------------- | ------- | ----------------------------------------------------------------------------------------------------------------- | ------------- | ----------------------------------------------- |
19//! | type            | -       | Required in order to use transformer step                                                                         | `validator`   | `validator` / `validate` / `v`                  |
20//! | updater         | u       | Updater type used as a template engine for transformation                                                         | `tera`        | `tera`                                          |
21//! | referentials    | refs    | List of [`crate::step::Reader`] indexed by their name. A referential can be use to map object during the validation | `null`        | `{"alias_a": READER,"alias_b": READER, etc...}` |
22//! | name            | alias   | Name step                                                                                                         | `null`        | Auto generate alphanumeric value                |
23//! | data_type       | data    | Type of data used for the transformation. skip other data type                                                    | `ok`          | `ok` / `err`                                    |
24//! | concurrency_limit   | -   | Limit of steps to run in concurrence.                                                                              | `1`           | unsigned number                                 |
25//! | rules           | -       | List of [`self::Rule`] indexed by their names                                                                     | `null`        | `{"rule_0": Rule,"rule_1": Rule}`               |
26//! | error_separator | -       | Separator use to delimite two errors                                                                              | `\r\n`        | String                                          |
27//! | record_limit  | -   | Maximum number of records that this step can hold in memory at the same time.     | `100`        | unsigned number                              |
28//!
29//! ### Rule
30//!
31//! | key     | Description                                                                                                                                                                                     | Default Value | Possible Values                                   |
32//! | ------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | ------------------------------------------------- |
33//! | pattern | Pattern in [django template language](https://docs.djangoproject.com/en/3.1/topics/templates/) format used to test a field. If the result of the pattern is not a boolean, an error will raised | `null`        | `{% if true %} true {% else %} false {% endif %}` |
34//! | message | Message to display if the render pattern is false. If the message is empty, a default value is rendered                                                                                         | `string`      | `My error message`                                |
35//!
36//! ### Examples
37//!
38//! ```json
39//! [
40//!     {
41//!         "type": "validator",
42//!         "updater": {
43//!             "type": "tera"
44//!         },
45//!         "referentials": {
46//!             "mapping_ref": {
47//!                 "connector": {
48//!                     "type": "mem",
49//!                     "data": "[{\"mapping_code\":\"value_to_map\",\"mapping_value\":\"value mapped\"},{\"mapping_code\":\"value_to_map_2\",\"mapping_value\":\"value mapped 2\"}]"
50//!                 }
51//!             }
52//!         },
53//!         "name": "my_validator",
54//!         "data_type": "ok",
55//!         "concurrency_limit": 1,
56//!         "rules": {
57//!             "number_rule": {
58//!                 "pattern": "{%- if input.number == 10  -%} true {%- else -%} false {%- endif -%}",
59//!                 "message": "The number field value must be equal to 10"
60//!             },
61//!             "text_rule": {
62//!                 "pattern": "{%- if input.text is matching('.*hello world.*') -%} true {%- else -%} false {%- endif -%}",
63//!                 "message": "The text field value doesn't contain 'Hello World'";
64//!             },
65//!             "code_rule": {
66//!                 "pattern": "{%- if mapping_ref | filter(attribute='mapping_code', value=input.code) | length > 0 -%} true {%- else -%} false {%- endif -%}",
67//!                 "message": "The code field value doesn't match with the referential dataset"
68//!             }
69//!         },
70//!         "error_separator": " & "
71//!     }
72//! ]
73//! ```
74//!
75//! input:
76//!
77//! ```json
78//! [
79//!     {"number": 100, "text": "my text", "code": "my_code"},
80//!     ...
81//! ]
82//! ```
83//!
84//! output:
85//!
86//! ```json
87//! [
88//!     {"number": 100, "text": "my text", "code": "my_code", "_error":"The number field value must be equal to 10 & The text field value doesn't contain 'Hello World' & The code field value doesn't match with the referential dataset"},
89//!     ...
90//! ]
91//! ```
92use super::DataResult;
93use super::reader::Reader;
94use super::referential::Referential;
95use crate::helper::json_pointer::JsonPointer;
96use crate::helper::mustache::Mustache;
97use crate::step::Step;
98use crate::updater::{ActionType, UpdaterType, INPUT_FIELD_KEY};
99use crate::Context;
100use crate::updater::Action;
101use async_channel::{Receiver, Sender};
102use futures::StreamExt;
103use async_trait::async_trait;
104use json_value_merge::Merge;
105use json_value_search::Search;
106use serde::Deserialize;
107use serde_json::Value;
108use std::collections::HashMap;
109use std::io::{Error, ErrorKind};
110use std::{
111    collections::BTreeMap,
112    io,
113};
114use uuid::Uuid;
115
116#[derive(Debug, Deserialize, Clone)]
117#[serde(default, deny_unknown_fields)]
118pub struct Validator {
119    #[serde(rename = "updater")]
120    #[serde(alias = "u")]
121    pub updater_type: UpdaterType,
122    #[serde(alias = "refs")]
123    pub referentials: HashMap<String, Reader>,
124    #[serde(alias = "alias")]
125    pub name: String,
126    pub data_type: String,
127    pub concurrency_limit: usize,
128    pub rules: BTreeMap<String, Rule>,
129    #[serde(alias = "separator")]
130    pub error_separator: String,
131    #[serde(skip)]
132    pub receiver: Option<Receiver<Context>>,
133    #[serde(skip)]
134    pub sender: Option<Sender<Context>>,
135}
136
137impl Default for Validator {
138    fn default() -> Self {
139        let uuid = Uuid::new_v4();
140        Validator {
141            updater_type: UpdaterType::default(),
142            referentials: HashMap::default(),
143            name: uuid.simple().to_string(),
144            data_type: DataResult::OK.to_string(),
145            concurrency_limit: 1,
146            rules: BTreeMap::default(),
147            error_separator: "\r\n".to_string(),
148            receiver: None,
149            sender: None,
150        }
151    }
152}
153
154#[async_trait]
155impl Step for Validator {
156    /// See [`Step::set_receiver`] for more details.
157    fn set_receiver(&mut self, receiver: Receiver<Context>) {
158        self.receiver = Some(receiver);
159    }
160    /// See [`Step::receiver`] for more details.
161    fn receiver(&self) -> Option<&Receiver<Context>> {
162        self.receiver.as_ref()
163    }
164    /// See [`Step::set_sender`] for more details.
165    fn set_sender(&mut self, sender: Sender<Context>) {
166        self.sender = Some(sender);
167    }
168    /// See [`Step::sender`] for more details.
169    fn sender(&self) -> Option<&Sender<Context>> {
170        self.sender.as_ref()
171    }
172    /// This step validate the values of a dataset.
173    ///
174    /// # Example: simple validations
175    /// 
176    /// ```
177    /// use std::io;
178    /// use serde_json::Value;
179    /// use json_value_search::Search;
180    /// use chewdata::DataResult;
181    /// use chewdata::Context;
182    /// use chewdata::step::Step;
183    /// use chewdata::step::validator::{Validator, Rule};
184    /// use std::thread;
185    /// use std::collections::{BTreeMap, HashMap};
186    /// use macro_rules_attribute::apply;
187    /// use smol_macros::main;
188    /// 
189    /// #[apply(main!)]
190    /// async fn main() -> io::Result<()> {
191    ///     let (sender_input, receiver_input) = async_channel::unbounded();
192    ///     let (sender_output, receiver_output) = async_channel::unbounded();
193    ///
194    ///     let mut rules = BTreeMap::default();
195    ///     rules.insert("rule_number_1".to_string(), Rule {
196    ///         pattern: "{% if input.number_1 is matching('\\d+') %} true {% else %} false {% endif %}".to_string(),
197    ///         message: Some("Err N.1".to_string())
198    ///     });
199    ///
200    ///     let validator = Validator {
201    ///         rules: rules,
202    ///         error_separator: " & ".to_string(),
203    ///         receiver: Some(receiver_input),
204    ///         sender: Some(sender_output),
205    ///         ..Default::default()
206    ///     };
207    ///
208    ///     thread::spawn(move || {
209    ///         let data = serde_json::from_str(r#"{"number_1":"my_string","number_2":100,"text":"120"}"#).unwrap();
210    ///         let context = Context::new("step_data_loading".to_string(), DataResult::Ok(data));
211    ///         sender_input.try_send(context).unwrap();
212    ///     });
213    ///
214    ///     validator.exec().await?;
215    ///
216    ///     for context in receiver_output.try_recv() {
217    ///         let error_result = context.input().to_value().search("/_error").unwrap().unwrap();
218    ///         let error_result_expected = Value::String("Err N.1".to_string());
219    ///         assert_eq!(error_result_expected, error_result);
220    ///     }
221    ///
222    ///     Ok(())
223    /// }
224    /// ```
225    #[instrument(name = "validator::exec",
226        skip(self),
227        fields(name=self.name, 
228        data_type=self.data_type,
229        concurrency_limit=self.concurrency_limit,
230        error_separator=self.error_separator,
231    ))]
232    async fn exec(&self) -> io::Result<()> {
233        info!("Start validating data...");
234    
235
236        let receiver_stream = self.receive().await;
237
238        trace!("Warm up static referential before using it in the concurrent execution.");
239        Referential::new(&self.referentials).to_value(&Context::new(String::default(), DataResult::Ok(Value::default()))).await?;
240
241        let actions: Vec<Action> = self
242            .rules
243            .iter()
244            .map(|(rule_name, rule)| Action {
245                field: rule_name.clone(),
246                pattern: Some(rule.pattern.clone()),
247                action_type: ActionType::Replace,
248            })
249            .collect();
250
251        // Validate in concurrence with parallelism.
252        let results: Vec<_> = receiver_stream.map(|context_received| {
253            let step = self.clone();
254            let actions = actions.clone();
255            smol::spawn(async move {
256            validate(&step, &mut context_received.clone(), &actions.clone()).await
257        })}).buffer_unordered(self.concurrency_limit).collect().await;
258
259        results
260            .into_iter()
261            .filter(|result| result.is_err())
262            .map(|result| warn!("{:?}", result))
263            .for_each(drop);
264
265        info!(
266            "Stops validating and sending context in the channel"
267        );
268
269        Ok(())
270    }
271    fn number(&self) -> usize {
272        self.concurrency_limit
273    }
274    fn name(&self) -> String {
275        self.name.clone()
276    }
277}
278
279#[instrument(name = "validator::validate", skip(step, context_received))]
280async fn validate(step: &Validator, context_received: &mut Context, actions: &Vec<Action>) -> io::Result<()> {
281    let data_result = context_received.input();
282
283    if !data_result.is_type(step.data_type.as_ref()) {
284        trace!("Handles only this data type");
285        step.send(context_received).await;
286        return Ok(());
287    }
288
289    let record = data_result.to_value();
290
291    let validator_result = step
292        .updater_type
293        .updater()
294        .update(
295            &record,
296            &context_received.steps(),
297            &Referential::new(&step.referentials).to_value(context_received).await?,
298            actions,
299        )
300        .await
301        .and_then(|value| match value {
302            Value::Object(_) => Ok(value),
303            _ => Err(Error::new(
304                ErrorKind::InvalidInput,
305                format!("The validation's result must be a boolean and not '{:?}'", value),
306            )),
307        })
308        .and_then(|value| {
309            let mut errors = String::default();
310
311            for (rule_name, rule) in &step.rules {
312                let value_result =
313                    value.clone().search(rule_name.to_json_pointer().as_str());
314
315                let mut error = match value_result {
316                    Ok(Some(Value::Bool(true))) => continue,
317                    Ok(Some(Value::Bool(false))) => {
318                        rule.message.clone().unwrap_or(format!("The rule '{}' failed", rule_name))
319                    }
320                    Ok(Some(_)) => format!(
321                        "The rule '{}' has invalid result pattern '{:?}', it must be a boolean",
322                        rule_name,
323                        value_result.unwrap().unwrap()
324                    ),
325                    Ok(None) => format!(
326                        "The rule '{}' is not found in the validation result '{:?}'",
327                        rule_name,
328                        value_result.unwrap()
329                    ),
330                    Err(e) => e.to_string(),
331                };
332
333                if !errors.is_empty() {
334                    errors.push_str(step.error_separator.as_str());
335                }
336
337                let mut params = Value::default();
338                params.merge_in(&format!("/{}", INPUT_FIELD_KEY), &record)?;
339                params.merge_in("/rule/name", &Value::String(rule_name.clone()))?;
340
341                error.replace_mustache(params);
342
343                errors.push_str(error.as_str());
344            }
345
346            if !errors.is_empty() {
347                Err(Error::new(ErrorKind::InvalidInput, errors))
348            } else {
349                Ok(record.clone())
350            }
351        });
352
353    let new_data_result = match validator_result {
354        Ok(record) => DataResult::Ok(record),
355        Err(e) => DataResult::Err((record.clone(), e)),
356    };
357
358    context_received.insert_step_result(step.name(), new_data_result);
359    step.send(context_received).await;
360
361    Ok(())
362}
363
364#[derive(Debug, Deserialize, Clone, Default)]
365pub struct Rule {
366    pub pattern: String,
367    pub message: Option<String>,
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373    use macro_rules_attribute::apply;
374    use smol_macros::test;
375    use std::thread;
376
377    #[apply(test!)]
378    async fn exec_with_different_data_result_type() {
379        let mut step = Validator::default();
380        let (sender_input, receiver_input) = async_channel::unbounded();
381        let (sender_output, receiver_output) = async_channel::unbounded();
382        let data = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
383        let error = Error::new(ErrorKind::InvalidData, "My error");
384        let context = Context::new("before".to_string(), DataResult::Err((data, error)));
385        let expected_context = context.clone();
386
387        thread::spawn(move || {
388            sender_input.try_send(context).unwrap();
389        });
390
391        step.receiver = Some(receiver_input);
392        step.sender = Some(sender_output);
393        step.exec().await.unwrap();
394
395        assert_eq!(expected_context, receiver_output.recv().await.unwrap());
396    }
397    #[apply(test!)]
398    async fn exec_with_same_data_result_type() {
399        let mut step = Validator::default();
400        let (sender_input, receiver_input) = async_channel::unbounded();
401        let (sender_output, receiver_output) = async_channel::unbounded();
402        let data: Value = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
403        let context = Context::new("before".to_string(), DataResult::Ok(data.clone()));
404
405        let mut expected_context = context.clone();
406        expected_context.insert_step_result("my_step".to_string(), DataResult::Ok(data));
407
408        thread::spawn(move || {
409            sender_input.try_send(context).unwrap();
410        });
411
412        step.receiver = Some(receiver_input);
413        step.sender = Some(sender_output);
414        step.name = "my_step".to_string();
415        step.rules = serde_json::from_str(r#"{"rule_1": {"pattern": "true"}}"#).unwrap();
416        step.exec().await.unwrap();
417
418        assert_eq!(expected_context, receiver_output.recv().await.unwrap());
419    }
420    #[apply(test!)]
421    async fn exec() {
422        let (sender_input, receiver_input) = async_channel::unbounded();
423        let (sender_output, receiver_output) = async_channel::unbounded();
424        let mut rules = BTreeMap::default();
425        rules.insert(
426            "rule_number_1".to_string(),
427            Rule {
428                pattern:
429                    "{% if input.number_1 is matching('\\d+') %} true {% else %} false {% endif %}"
430                        .to_string(),
431                message: Some("Err N.1".to_string()),
432            },
433        );
434        rules.insert(
435            "rule_number_2".to_string(),
436            Rule {
437                pattern: "{% if input.number_2 < 100 %} true {% else %} false {% endif %}"
438                    .to_string(),
439                message: Some("Err N.2".to_string()),
440            },
441        );
442        rules.insert(
443            "rule_text".to_string(),
444            Rule {
445                pattern:
446                    "{% if input.text is matching('[^\\d]+') %} true {% else %} false {% endif %}"
447                        .to_string(),
448                message: Some("Err T.1".to_string()),
449            },
450        );
451        let validator = Validator {
452            rules: rules,
453            error_separator: " & ".to_string(),
454            receiver: Some(receiver_input),
455            sender: Some(sender_output),
456            ..Default::default()
457        };
458        thread::spawn(move || {
459            let data =
460                serde_json::from_str(r#"{"number_1":"my_string","number_2":100,"text":"120"}"#)
461                    .unwrap();
462            let context =
463                Context::new("step_data_loading".to_string(), DataResult::Ok(data));
464            sender_input.try_send(context).unwrap();
465        });
466        validator.exec().await.unwrap();
467        while let Ok(context) = receiver_output.try_recv() {
468            let error_result = context
469                .input()
470                .to_value()
471                .search("/_error")
472                .unwrap()
473                .unwrap();
474            let error_result_expected = Value::String("Err N.1 & Err N.2 & Err T.1".to_string());
475            assert_eq!(error_result_expected, error_result);
476        }
477    }
478    #[apply(test!)]
479    async fn exec_with_validation_error() {
480        let (sender_input, receiver_input) = async_channel::unbounded();
481        let (sender_output, receiver_output) = async_channel::unbounded();
482        let mut rules = BTreeMap::default();
483        rules.insert(
484            "rule_exception".to_string(),
485            Rule {
486                pattern:
487                    "{% if input.number_1 is matching('\\d+') %} true {% else %} false {% endif %}"
488                        .to_string(),
489                message: Some("Err N.1".to_string()),
490            },
491        );
492        let validator = Validator {
493            rules: rules,
494            error_separator: " & ".to_string(),
495            receiver: Some(receiver_input),
496            sender: Some(sender_output),
497            ..Default::default()
498        };
499        thread::spawn(move || {
500            let data = serde_json::from_str(r#"{"number":100}"#).unwrap();
501            let context =
502                Context::new("step_data_loading".to_string(), DataResult::Ok(data));
503            sender_input.try_send(context).unwrap();
504        });
505        validator.exec().await.unwrap();
506        while let Ok(context) = receiver_output.try_recv() {
507            let error_result = context
508                .input()
509                .to_value()
510                .search("/_error")
511                .unwrap()
512                .unwrap();
513            let error_result_expected = Value::String("Failed to render the field 'rule_exception'. Tester `matching` was called on an undefined variable.".to_string());
514            assert_eq!(error_result_expected, error_result);
515        }
516    }
517}