chewdata/step/validator.rs
1//! Check the consistancy of the data.
2//!
3//! If a data is not valid, an error message is stored in the field `_error` before to share the data to another step and the data is tagged with an error.
4//! Use the `data_type` field of a `step` to target which kind of data a step can handle.
5//!
6//! ### Actions
7//!
8//! 1 - Get a [`crate::Context`] from the input queue.
9//! 2 - Extract the [`crate::DataResult`] from the [`crate::Context`].
10//! 3 - Validate the data with a list of rules.
11//! 4 - Create a new [`crate::Context`] and attach the [`crate::DataResult`] to it.
12//! 5 - Push the new [`crate::Context`] into the output queue.
13//! 6 - Go to step 1 until the input queue is not empty.
14//!
15//! ### Configuration
16//!
17//! | key | alias | Description | Default Value | Possible Values |
18//! | --------------- | ------- | ----------------------------------------------------------------------------------------------------------------- | ------------- | ----------------------------------------------- |
19//! | type | - | Required in order to use transformer step | `validator` | `validator` / `validate` / `v` |
20//! | updater | u | Updater type used as a template engine for transformation | `tera` | `tera` |
21//! | referentials | refs | List of [`crate::step::Reader`] indexed by their name. A referential can be use to map object during the validation | `null` | `{"alias_a": READER,"alias_b": READER, etc...}` |
22//! | name | alias | Name step | `null` | Auto generate alphanumeric value |
23//! | data_type | data | Type of data used for the transformation. skip other data type | `ok` | `ok` / `err` |
24//! | concurrency_limit | - | Limit of steps to run in concurrence. | `1` | unsigned number |
25//! | rules | - | List of [`self::Rule`] indexed by their names | `null` | `{"rule_0": Rule,"rule_1": Rule}` |
26//! | error_separator | - | Separator use to delimite two errors | `\r\n` | String |
27//! | record_limit | - | Maximum number of records that this step can hold in memory at the same time. | `100` | unsigned number |
28//!
29//! ### Rule
30//!
31//! | key | Description | Default Value | Possible Values |
32//! | ------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | ------------------------------------------------- |
33//! | pattern | Pattern in [django template language](https://docs.djangoproject.com/en/3.1/topics/templates/) format used to test a field. If the result of the pattern is not a boolean, an error will raised | `null` | `{% if true %} true {% else %} false {% endif %}` |
34//! | message | Message to display if the render pattern is false. If the message is empty, a default value is rendered | `string` | `My error message` |
35//!
36//! ### Examples
37//!
38//! ```json
39//! [
40//! {
41//! "type": "validator",
42//! "updater": {
43//! "type": "tera"
44//! },
45//! "referentials": {
46//! "mapping_ref": {
47//! "connector": {
48//! "type": "mem",
49//! "data": "[{\"mapping_code\":\"value_to_map\",\"mapping_value\":\"value mapped\"},{\"mapping_code\":\"value_to_map_2\",\"mapping_value\":\"value mapped 2\"}]"
50//! }
51//! }
52//! },
53//! "name": "my_validator",
54//! "data_type": "ok",
55//! "concurrency_limit": 1,
56//! "rules": {
57//! "number_rule": {
58//! "pattern": "{%- if input.number == 10 -%} true {%- else -%} false {%- endif -%}",
59//! "message": "The number field value must be equal to 10"
60//! },
61//! "text_rule": {
62//! "pattern": "{%- if input.text is matching('.*hello world.*') -%} true {%- else -%} false {%- endif -%}",
63//! "message": "The text field value doesn't contain 'Hello World'";
64//! },
65//! "code_rule": {
66//! "pattern": "{%- if mapping_ref | filter(attribute='mapping_code', value=input.code) | length > 0 -%} true {%- else -%} false {%- endif -%}",
67//! "message": "The code field value doesn't match with the referential dataset"
68//! }
69//! },
70//! "error_separator": " & "
71//! }
72//! ]
73//! ```
74//!
75//! input:
76//!
77//! ```json
78//! [
79//! {"number": 100, "text": "my text", "code": "my_code"},
80//! ...
81//! ]
82//! ```
83//!
84//! output:
85//!
86//! ```json
87//! [
88//! {"number": 100, "text": "my text", "code": "my_code", "_error":"The number field value must be equal to 10 & The text field value doesn't contain 'Hello World' & The code field value doesn't match with the referential dataset"},
89//! ...
90//! ]
91//! ```
92use super::DataResult;
93use super::reader::Reader;
94use super::referential::Referential;
95use crate::helper::json_pointer::JsonPointer;
96use crate::helper::mustache::Mustache;
97use crate::step::Step;
98use crate::updater::{ActionType, UpdaterType, INPUT_FIELD_KEY};
99use crate::Context;
100use crate::updater::Action;
101use async_channel::{Receiver, Sender};
102use futures::StreamExt;
103use async_trait::async_trait;
104use json_value_merge::Merge;
105use json_value_search::Search;
106use serde::Deserialize;
107use serde_json::Value;
108use std::collections::HashMap;
109use std::io::{Error, ErrorKind};
110use std::{
111 collections::BTreeMap,
112 io,
113};
114use uuid::Uuid;
115
116#[derive(Debug, Deserialize, Clone)]
117#[serde(default, deny_unknown_fields)]
118pub struct Validator {
119 #[serde(rename = "updater")]
120 #[serde(alias = "u")]
121 pub updater_type: UpdaterType,
122 #[serde(alias = "refs")]
123 pub referentials: HashMap<String, Reader>,
124 #[serde(alias = "alias")]
125 pub name: String,
126 pub data_type: String,
127 pub concurrency_limit: usize,
128 pub rules: BTreeMap<String, Rule>,
129 #[serde(alias = "separator")]
130 pub error_separator: String,
131 #[serde(skip)]
132 pub receiver: Option<Receiver<Context>>,
133 #[serde(skip)]
134 pub sender: Option<Sender<Context>>,
135}
136
137impl Default for Validator {
138 fn default() -> Self {
139 let uuid = Uuid::new_v4();
140 Validator {
141 updater_type: UpdaterType::default(),
142 referentials: HashMap::default(),
143 name: uuid.simple().to_string(),
144 data_type: DataResult::OK.to_string(),
145 concurrency_limit: 1,
146 rules: BTreeMap::default(),
147 error_separator: "\r\n".to_string(),
148 receiver: None,
149 sender: None,
150 }
151 }
152}
153
154#[async_trait]
155impl Step for Validator {
156 /// See [`Step::set_receiver`] for more details.
157 fn set_receiver(&mut self, receiver: Receiver<Context>) {
158 self.receiver = Some(receiver);
159 }
160 /// See [`Step::receiver`] for more details.
161 fn receiver(&self) -> Option<&Receiver<Context>> {
162 self.receiver.as_ref()
163 }
164 /// See [`Step::set_sender`] for more details.
165 fn set_sender(&mut self, sender: Sender<Context>) {
166 self.sender = Some(sender);
167 }
168 /// See [`Step::sender`] for more details.
169 fn sender(&self) -> Option<&Sender<Context>> {
170 self.sender.as_ref()
171 }
172 /// This step validate the values of a dataset.
173 ///
174 /// # Example: simple validations
175 ///
176 /// ```
177 /// use std::io;
178 /// use serde_json::Value;
179 /// use json_value_search::Search;
180 /// use chewdata::DataResult;
181 /// use chewdata::Context;
182 /// use chewdata::step::Step;
183 /// use chewdata::step::validator::{Validator, Rule};
184 /// use std::thread;
185 /// use std::collections::{BTreeMap, HashMap};
186 /// use macro_rules_attribute::apply;
187 /// use smol_macros::main;
188 ///
189 /// #[apply(main!)]
190 /// async fn main() -> io::Result<()> {
191 /// let (sender_input, receiver_input) = async_channel::unbounded();
192 /// let (sender_output, receiver_output) = async_channel::unbounded();
193 ///
194 /// let mut rules = BTreeMap::default();
195 /// rules.insert("rule_number_1".to_string(), Rule {
196 /// pattern: "{% if input.number_1 is matching('\\d+') %} true {% else %} false {% endif %}".to_string(),
197 /// message: Some("Err N.1".to_string())
198 /// });
199 ///
200 /// let validator = Validator {
201 /// rules: rules,
202 /// error_separator: " & ".to_string(),
203 /// receiver: Some(receiver_input),
204 /// sender: Some(sender_output),
205 /// ..Default::default()
206 /// };
207 ///
208 /// thread::spawn(move || {
209 /// let data = serde_json::from_str(r#"{"number_1":"my_string","number_2":100,"text":"120"}"#).unwrap();
210 /// let context = Context::new("step_data_loading".to_string(), DataResult::Ok(data));
211 /// sender_input.try_send(context).unwrap();
212 /// });
213 ///
214 /// validator.exec().await?;
215 ///
216 /// for context in receiver_output.try_recv() {
217 /// let error_result = context.input().to_value().search("/_error").unwrap().unwrap();
218 /// let error_result_expected = Value::String("Err N.1".to_string());
219 /// assert_eq!(error_result_expected, error_result);
220 /// }
221 ///
222 /// Ok(())
223 /// }
224 /// ```
225 #[instrument(name = "validator::exec",
226 skip(self),
227 fields(name=self.name,
228 data_type=self.data_type,
229 concurrency_limit=self.concurrency_limit,
230 error_separator=self.error_separator,
231 ))]
232 async fn exec(&self) -> io::Result<()> {
233 info!("Start validating data...");
234
235
236 let receiver_stream = self.receive().await;
237
238 trace!("Warm up static referential before using it in the concurrent execution.");
239 Referential::new(&self.referentials).to_value(&Context::new(String::default(), DataResult::Ok(Value::default()))).await?;
240
241 let actions: Vec<Action> = self
242 .rules
243 .iter()
244 .map(|(rule_name, rule)| Action {
245 field: rule_name.clone(),
246 pattern: Some(rule.pattern.clone()),
247 action_type: ActionType::Replace,
248 })
249 .collect();
250
251 // Validate in concurrence with parallelism.
252 let results: Vec<_> = receiver_stream.map(|context_received| {
253 let step = self.clone();
254 let actions = actions.clone();
255 smol::spawn(async move {
256 validate(&step, &mut context_received.clone(), &actions.clone()).await
257 })}).buffer_unordered(self.concurrency_limit).collect().await;
258
259 results
260 .into_iter()
261 .filter(|result| result.is_err())
262 .map(|result| warn!("{:?}", result))
263 .for_each(drop);
264
265 info!(
266 "Stops validating and sending context in the channel"
267 );
268
269 Ok(())
270 }
271 fn number(&self) -> usize {
272 self.concurrency_limit
273 }
274 fn name(&self) -> String {
275 self.name.clone()
276 }
277}
278
279#[instrument(name = "validator::validate", skip(step, context_received))]
280async fn validate(step: &Validator, context_received: &mut Context, actions: &Vec<Action>) -> io::Result<()> {
281 let data_result = context_received.input();
282
283 if !data_result.is_type(step.data_type.as_ref()) {
284 trace!("Handles only this data type");
285 step.send(context_received).await;
286 return Ok(());
287 }
288
289 let record = data_result.to_value();
290
291 let validator_result = step
292 .updater_type
293 .updater()
294 .update(
295 &record,
296 &context_received.steps(),
297 &Referential::new(&step.referentials).to_value(context_received).await?,
298 actions,
299 )
300 .await
301 .and_then(|value| match value {
302 Value::Object(_) => Ok(value),
303 _ => Err(Error::new(
304 ErrorKind::InvalidInput,
305 format!("The validation's result must be a boolean and not '{:?}'", value),
306 )),
307 })
308 .and_then(|value| {
309 let mut errors = String::default();
310
311 for (rule_name, rule) in &step.rules {
312 let value_result =
313 value.clone().search(rule_name.to_json_pointer().as_str());
314
315 let mut error = match value_result {
316 Ok(Some(Value::Bool(true))) => continue,
317 Ok(Some(Value::Bool(false))) => {
318 rule.message.clone().unwrap_or(format!("The rule '{}' failed", rule_name))
319 }
320 Ok(Some(_)) => format!(
321 "The rule '{}' has invalid result pattern '{:?}', it must be a boolean",
322 rule_name,
323 value_result.unwrap().unwrap()
324 ),
325 Ok(None) => format!(
326 "The rule '{}' is not found in the validation result '{:?}'",
327 rule_name,
328 value_result.unwrap()
329 ),
330 Err(e) => e.to_string(),
331 };
332
333 if !errors.is_empty() {
334 errors.push_str(step.error_separator.as_str());
335 }
336
337 let mut params = Value::default();
338 params.merge_in(&format!("/{}", INPUT_FIELD_KEY), &record)?;
339 params.merge_in("/rule/name", &Value::String(rule_name.clone()))?;
340
341 error.replace_mustache(params);
342
343 errors.push_str(error.as_str());
344 }
345
346 if !errors.is_empty() {
347 Err(Error::new(ErrorKind::InvalidInput, errors))
348 } else {
349 Ok(record.clone())
350 }
351 });
352
353 let new_data_result = match validator_result {
354 Ok(record) => DataResult::Ok(record),
355 Err(e) => DataResult::Err((record.clone(), e)),
356 };
357
358 context_received.insert_step_result(step.name(), new_data_result);
359 step.send(context_received).await;
360
361 Ok(())
362}
363
364#[derive(Debug, Deserialize, Clone, Default)]
365pub struct Rule {
366 pub pattern: String,
367 pub message: Option<String>,
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373 use macro_rules_attribute::apply;
374 use smol_macros::test;
375 use std::thread;
376
377 #[apply(test!)]
378 async fn exec_with_different_data_result_type() {
379 let mut step = Validator::default();
380 let (sender_input, receiver_input) = async_channel::unbounded();
381 let (sender_output, receiver_output) = async_channel::unbounded();
382 let data = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
383 let error = Error::new(ErrorKind::InvalidData, "My error");
384 let context = Context::new("before".to_string(), DataResult::Err((data, error)));
385 let expected_context = context.clone();
386
387 thread::spawn(move || {
388 sender_input.try_send(context).unwrap();
389 });
390
391 step.receiver = Some(receiver_input);
392 step.sender = Some(sender_output);
393 step.exec().await.unwrap();
394
395 assert_eq!(expected_context, receiver_output.recv().await.unwrap());
396 }
397 #[apply(test!)]
398 async fn exec_with_same_data_result_type() {
399 let mut step = Validator::default();
400 let (sender_input, receiver_input) = async_channel::unbounded();
401 let (sender_output, receiver_output) = async_channel::unbounded();
402 let data: Value = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
403 let context = Context::new("before".to_string(), DataResult::Ok(data.clone()));
404
405 let mut expected_context = context.clone();
406 expected_context.insert_step_result("my_step".to_string(), DataResult::Ok(data));
407
408 thread::spawn(move || {
409 sender_input.try_send(context).unwrap();
410 });
411
412 step.receiver = Some(receiver_input);
413 step.sender = Some(sender_output);
414 step.name = "my_step".to_string();
415 step.rules = serde_json::from_str(r#"{"rule_1": {"pattern": "true"}}"#).unwrap();
416 step.exec().await.unwrap();
417
418 assert_eq!(expected_context, receiver_output.recv().await.unwrap());
419 }
420 #[apply(test!)]
421 async fn exec() {
422 let (sender_input, receiver_input) = async_channel::unbounded();
423 let (sender_output, receiver_output) = async_channel::unbounded();
424 let mut rules = BTreeMap::default();
425 rules.insert(
426 "rule_number_1".to_string(),
427 Rule {
428 pattern:
429 "{% if input.number_1 is matching('\\d+') %} true {% else %} false {% endif %}"
430 .to_string(),
431 message: Some("Err N.1".to_string()),
432 },
433 );
434 rules.insert(
435 "rule_number_2".to_string(),
436 Rule {
437 pattern: "{% if input.number_2 < 100 %} true {% else %} false {% endif %}"
438 .to_string(),
439 message: Some("Err N.2".to_string()),
440 },
441 );
442 rules.insert(
443 "rule_text".to_string(),
444 Rule {
445 pattern:
446 "{% if input.text is matching('[^\\d]+') %} true {% else %} false {% endif %}"
447 .to_string(),
448 message: Some("Err T.1".to_string()),
449 },
450 );
451 let validator = Validator {
452 rules: rules,
453 error_separator: " & ".to_string(),
454 receiver: Some(receiver_input),
455 sender: Some(sender_output),
456 ..Default::default()
457 };
458 thread::spawn(move || {
459 let data =
460 serde_json::from_str(r#"{"number_1":"my_string","number_2":100,"text":"120"}"#)
461 .unwrap();
462 let context =
463 Context::new("step_data_loading".to_string(), DataResult::Ok(data));
464 sender_input.try_send(context).unwrap();
465 });
466 validator.exec().await.unwrap();
467 while let Ok(context) = receiver_output.try_recv() {
468 let error_result = context
469 .input()
470 .to_value()
471 .search("/_error")
472 .unwrap()
473 .unwrap();
474 let error_result_expected = Value::String("Err N.1 & Err N.2 & Err T.1".to_string());
475 assert_eq!(error_result_expected, error_result);
476 }
477 }
478 #[apply(test!)]
479 async fn exec_with_validation_error() {
480 let (sender_input, receiver_input) = async_channel::unbounded();
481 let (sender_output, receiver_output) = async_channel::unbounded();
482 let mut rules = BTreeMap::default();
483 rules.insert(
484 "rule_exception".to_string(),
485 Rule {
486 pattern:
487 "{% if input.number_1 is matching('\\d+') %} true {% else %} false {% endif %}"
488 .to_string(),
489 message: Some("Err N.1".to_string()),
490 },
491 );
492 let validator = Validator {
493 rules: rules,
494 error_separator: " & ".to_string(),
495 receiver: Some(receiver_input),
496 sender: Some(sender_output),
497 ..Default::default()
498 };
499 thread::spawn(move || {
500 let data = serde_json::from_str(r#"{"number":100}"#).unwrap();
501 let context =
502 Context::new("step_data_loading".to_string(), DataResult::Ok(data));
503 sender_input.try_send(context).unwrap();
504 });
505 validator.exec().await.unwrap();
506 while let Ok(context) = receiver_output.try_recv() {
507 let error_result = context
508 .input()
509 .to_value()
510 .search("/_error")
511 .unwrap()
512 .unwrap();
513 let error_result_expected = Value::String("Failed to render the field 'rule_exception'. Tester `matching` was called on an undefined variable.".to_string());
514 assert_eq!(error_result_expected, error_result);
515 }
516 }
517}