use super::DataResult;
use super::reader::Reader;
use super::referential::Referential;
use crate::helper::json_pointer::JsonPointer;
use crate::helper::mustache::Mustache;
use crate::step::Step;
use crate::updater::{ActionType, UpdaterType, INPUT_FIELD_KEY};
use crate::Context;
use crate::updater::Action;
use async_channel::{Receiver, Sender};
use futures::StreamExt;
use async_trait::async_trait;
use json_value_merge::Merge;
use json_value_search::Search;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::io::{Error, ErrorKind};
use std::{
collections::BTreeMap,
io,
};
use uuid::Uuid;
#[derive(Debug, Deserialize, Clone)]
#[serde(default, deny_unknown_fields)]
pub struct Validator {
#[serde(rename = "updater")]
#[serde(alias = "u")]
pub updater_type: UpdaterType,
#[serde(alias = "refs")]
pub referentials: HashMap<String, Reader>,
#[serde(alias = "alias")]
pub name: String,
pub data_type: String,
pub concurrency_limit: usize,
pub rules: BTreeMap<String, Rule>,
#[serde(alias = "separator")]
pub error_separator: String,
#[serde(skip)]
pub receiver: Option<Receiver<Context>>,
#[serde(skip)]
pub sender: Option<Sender<Context>>,
}
impl Default for Validator {
fn default() -> Self {
let uuid = Uuid::new_v4();
Validator {
updater_type: UpdaterType::default(),
referentials: HashMap::default(),
name: uuid.simple().to_string(),
data_type: DataResult::OK.to_string(),
concurrency_limit: 1,
rules: BTreeMap::default(),
error_separator: "\r\n".to_string(),
receiver: None,
sender: None,
}
}
}
#[async_trait]
impl Step for Validator {
fn set_receiver(&mut self, receiver: Receiver<Context>) {
self.receiver = Some(receiver);
}
fn receiver(&self) -> Option<&Receiver<Context>> {
self.receiver.as_ref()
}
fn set_sender(&mut self, sender: Sender<Context>) {
self.sender = Some(sender);
}
fn sender(&self) -> Option<&Sender<Context>> {
self.sender.as_ref()
}
#[instrument(name = "validator::exec",
skip(self),
fields(name=self.name,
data_type=self.data_type,
concurrency_limit=self.concurrency_limit,
error_separator=self.error_separator,
))]
async fn exec(&self) -> io::Result<()> {
info!("Start validating data...");
let receiver_stream = self.receive().await;
trace!("Warm up static referential before using it in the concurrent execution.");
Referential::new(&self.referentials).to_value(&Context::new(String::default(), DataResult::Ok(Value::default()))).await?;
let actions: Vec<Action> = self
.rules
.iter()
.map(|(rule_name, rule)| Action {
field: rule_name.clone(),
pattern: Some(rule.pattern.clone()),
action_type: ActionType::Replace,
})
.collect();
let results: Vec<_> = receiver_stream.map(|context_received| {
let step = self.clone();
let actions = actions.clone();
smol::spawn(async move {
validate(&step, &mut context_received.clone(), &actions.clone()).await
})}).buffer_unordered(self.concurrency_limit).collect().await;
results
.into_iter()
.filter(|result| result.is_err())
.map(|result| warn!("{:?}", result))
.for_each(drop);
info!(
"Stops validating and sending context in the channel"
);
Ok(())
}
fn number(&self) -> usize {
self.concurrency_limit
}
fn name(&self) -> String {
self.name.clone()
}
}
#[instrument(name = "validator::validate", skip(step, context_received))]
async fn validate(step: &Validator, context_received: &mut Context, actions: &Vec<Action>) -> io::Result<()> {
let data_result = context_received.input();
if !data_result.is_type(step.data_type.as_ref()) {
trace!("Handles only this data type");
step.send(context_received).await;
return Ok(());
}
let record = data_result.to_value();
let validator_result = step
.updater_type
.updater()
.update(
&record,
&context_received.steps(),
&Referential::new(&step.referentials).to_value(context_received).await?,
actions,
)
.await
.and_then(|value| match value {
Value::Object(_) => Ok(value),
_ => Err(Error::new(
ErrorKind::InvalidInput,
format!("The validation's result must be a boolean and not '{:?}'", value),
)),
})
.and_then(|value| {
let mut errors = String::default();
for (rule_name, rule) in &step.rules {
let value_result =
value.clone().search(rule_name.to_json_pointer().as_str());
let mut error = match value_result {
Ok(Some(Value::Bool(true))) => continue,
Ok(Some(Value::Bool(false))) => {
rule.message.clone().unwrap_or(format!("The rule '{}' failed", rule_name))
}
Ok(Some(_)) => format!(
"The rule '{}' has invalid result pattern '{:?}', it must be a boolean",
rule_name,
value_result.unwrap().unwrap()
),
Ok(None) => format!(
"The rule '{}' is not found in the validation result '{:?}'",
rule_name,
value_result.unwrap()
),
Err(e) => e.to_string(),
};
if !errors.is_empty() {
errors.push_str(step.error_separator.as_str());
}
let mut params = Value::default();
params.merge_in(&format!("/{}", INPUT_FIELD_KEY), &record)?;
params.merge_in("/rule/name", &Value::String(rule_name.clone()))?;
error.replace_mustache(params);
errors.push_str(error.as_str());
}
if !errors.is_empty() {
Err(Error::new(ErrorKind::InvalidInput, errors))
} else {
Ok(record.clone())
}
});
let new_data_result = match validator_result {
Ok(record) => DataResult::Ok(record),
Err(e) => DataResult::Err((record.clone(), e)),
};
context_received.insert_step_result(step.name(), new_data_result);
step.send(context_received).await;
Ok(())
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct Rule {
pub pattern: String,
pub message: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use macro_rules_attribute::apply;
use smol_macros::test;
use std::thread;
#[apply(test!)]
async fn exec_with_different_data_result_type() {
let mut step = Validator::default();
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();
let data = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
let error = Error::new(ErrorKind::InvalidData, "My error");
let context = Context::new("before".to_string(), DataResult::Err((data, error)));
let expected_context = context.clone();
thread::spawn(move || {
sender_input.try_send(context).unwrap();
});
step.receiver = Some(receiver_input);
step.sender = Some(sender_output);
step.exec().await.unwrap();
assert_eq!(expected_context, receiver_output.recv().await.unwrap());
}
#[apply(test!)]
async fn exec_with_same_data_result_type() {
let mut step = Validator::default();
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();
let data: Value = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
let context = Context::new("before".to_string(), DataResult::Ok(data.clone()));
let mut expected_context = context.clone();
expected_context.insert_step_result("my_step".to_string(), DataResult::Ok(data));
thread::spawn(move || {
sender_input.try_send(context).unwrap();
});
step.receiver = Some(receiver_input);
step.sender = Some(sender_output);
step.name = "my_step".to_string();
step.rules = serde_json::from_str(r#"{"rule_1": {"pattern": "true"}}"#).unwrap();
step.exec().await.unwrap();
assert_eq!(expected_context, receiver_output.recv().await.unwrap());
}
#[apply(test!)]
async fn exec() {
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();
let mut rules = BTreeMap::default();
rules.insert(
"rule_number_1".to_string(),
Rule {
pattern:
"{% if input.number_1 is matching('\\d+') %} true {% else %} false {% endif %}"
.to_string(),
message: Some("Err N.1".to_string()),
},
);
rules.insert(
"rule_number_2".to_string(),
Rule {
pattern: "{% if input.number_2 < 100 %} true {% else %} false {% endif %}"
.to_string(),
message: Some("Err N.2".to_string()),
},
);
rules.insert(
"rule_text".to_string(),
Rule {
pattern:
"{% if input.text is matching('[^\\d]+') %} true {% else %} false {% endif %}"
.to_string(),
message: Some("Err T.1".to_string()),
},
);
let validator = Validator {
rules: rules,
error_separator: " & ".to_string(),
receiver: Some(receiver_input),
sender: Some(sender_output),
..Default::default()
};
thread::spawn(move || {
let data =
serde_json::from_str(r#"{"number_1":"my_string","number_2":100,"text":"120"}"#)
.unwrap();
let context =
Context::new("step_data_loading".to_string(), DataResult::Ok(data));
sender_input.try_send(context).unwrap();
});
validator.exec().await.unwrap();
while let Ok(context) = receiver_output.try_recv() {
let error_result = context
.input()
.to_value()
.search("/_error")
.unwrap()
.unwrap();
let error_result_expected = Value::String("Err N.1 & Err N.2 & Err T.1".to_string());
assert_eq!(error_result_expected, error_result);
}
}
#[apply(test!)]
async fn exec_with_validation_error() {
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();
let mut rules = BTreeMap::default();
rules.insert(
"rule_exception".to_string(),
Rule {
pattern:
"{% if input.number_1 is matching('\\d+') %} true {% else %} false {% endif %}"
.to_string(),
message: Some("Err N.1".to_string()),
},
);
let validator = Validator {
rules: rules,
error_separator: " & ".to_string(),
receiver: Some(receiver_input),
sender: Some(sender_output),
..Default::default()
};
thread::spawn(move || {
let data = serde_json::from_str(r#"{"number":100}"#).unwrap();
let context =
Context::new("step_data_loading".to_string(), DataResult::Ok(data));
sender_input.try_send(context).unwrap();
});
validator.exec().await.unwrap();
while let Ok(context) = receiver_output.try_recv() {
let error_result = context
.input()
.to_value()
.search("/_error")
.unwrap()
.unwrap();
let error_result_expected = Value::String("Failed to render the field 'rule_exception'. Tester `matching` was called on an undefined variable.".to_string());
assert_eq!(error_result_expected, error_result);
}
}
}