use super::reader::Reader;
use super::referential::Referential;
use super::DataResult;
use crate::step::Step;
use crate::updater::{Action, UpdaterType};
use crate::Context;
use async_channel::{Receiver, Sender};
use futures::StreamExt;
use async_trait::async_trait;
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::io;
use uuid::Uuid;
use crate::helper::string::DisplayOnlyForDebugging;
#[derive(Debug, Deserialize, Clone)]
#[serde(default, deny_unknown_fields)]
pub struct Transformer {
#[serde(rename = "updater")]
#[serde(alias = "u")]
pub updater_type: UpdaterType,
#[serde(alias = "refs")]
pub referentials: HashMap<String, Reader>,
#[serde(alias = "alias")]
pub name: String,
pub data_type: String,
pub concurrency_limit: usize,
pub actions: Vec<Action>,
#[serde(skip)]
pub receiver: Option<Receiver<Context>>,
#[serde(skip)]
pub sender: Option<Sender<Context>>,
}
impl Default for Transformer {
fn default() -> Self {
let uuid = Uuid::new_v4();
Transformer {
updater_type: UpdaterType::default(),
referentials: HashMap::default(),
name: uuid.simple().to_string(),
data_type: DataResult::OK.to_string(),
concurrency_limit: 1,
actions: Vec::default(),
receiver: None,
sender: None,
}
}
}
#[async_trait]
impl Step for Transformer {
fn set_receiver(&mut self, receiver: Receiver<Context>) {
self.receiver = Some(receiver);
}
fn receiver(&self) -> Option<&Receiver<Context>> {
self.receiver.as_ref()
}
fn set_sender(&mut self, sender: Sender<Context>) {
self.sender = Some(sender);
}
fn sender(&self) -> Option<&Sender<Context>> {
self.sender.as_ref()
}
fn name(&self) -> String {
self.name.clone()
}
#[instrument(name = "transformer::exec",
skip(self),
fields(name=self.name,
data_type=self.data_type,
concurrency_limit=self.concurrency_limit,
))]
async fn exec(&self) -> io::Result<()> {
info!("Starting data transformation...");
let receiver_stream = self.receive().await;
let referentials = self.referentials.clone().into_iter().filter(|(_, r)| !r.connector_type.inner().is_variable()).collect();
Referential::new(&referentials).to_value(&Context::new(String::default(), DataResult::Ok(Value::default()))).await?;
let results: Vec<_> = receiver_stream.map(|mut context| {
let step = self.clone();
async move {
transform(&step, &mut context).await
}
}).buffer_unordered(self.concurrency_limit).collect().await;
results
.into_iter()
.filter_map(Result::err)
.for_each(drop);
info!("Finished transformation and sending context in the channel");
Ok(())
}
}
#[instrument(name = "transformer::transform", skip(step, context_received))]
async fn transform(step: &Transformer, context_received: &mut Context) -> io::Result<()> {
let data_result = context_received.input();
if !data_result.is_type(&step.data_type) {
trace!("Handles only this data type");
step.send(context_received).await;
return Ok(());
}
let record = data_result.to_value();
match step.updater_type.updater().update(
&record,
&context_received.to_value()?,
&Referential::new(&step.referentials).to_value(context_received).await?,
&step.actions,
).await {
Ok(new_record) => match &new_record {
Value::Array(array) => {
info!(
from = record.display_only_for_debugging(),
to = new_record.display_only_for_debugging(),
"Transformed array successfully"
);
for array_value in array {
context_received.insert_step_result(step.name(), DataResult::Ok(array_value.clone()));
step.send(context_received).await;
}
}
Value::Null => {
info!(
record = new_record.display_only_for_debugging(),
"Skipping record with null value"
);
}
_ => {
info!(
from = record.display_only_for_debugging(),
to = new_record.display_only_for_debugging(),
"Record transformed successfully"
);
context_received.insert_step_result(step.name(), DataResult::Ok(new_record.clone()));
step.send(context_received).await;
}
},
Err(e) => {
warn!(
from = record.display_only_for_debugging(),
error = format!("{}", e).as_str(),
context = context_received.display_only_for_debugging(),
"Updater transformation failed"
);
context_received.insert_step_result(step.name(), DataResult::Err((record.clone(), e)));
step.send(context_received).await;
}
};
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use macro_rules_attribute::apply;
use smol_macros::test;
use serde_json::Value;
use std::io::{Error, ErrorKind};
use std::thread;
#[apply(test!)]
async fn exec_with_different_data_result_type() {
let mut step = Transformer::default();
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();
let data = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
let error = Error::new(ErrorKind::InvalidData, "My error");
let context = Context::new("before".to_string(), DataResult::Err((data, error)));
let expected_context = context.clone();
thread::spawn(move || {
sender_input.try_send(context).unwrap();
});
step.receiver = Some(receiver_input);
step.sender = Some(sender_output);
step.exec().await.unwrap();
assert_eq!(expected_context, receiver_output.recv().await.unwrap());
}
#[apply(test!)]
async fn exec_with_same_data_result_type() {
let mut step = Transformer::default();
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();
let data: Value = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
let context = Context::new("before".to_string(), DataResult::Ok(data.clone()));
let mut expected_context = context.clone();
let data2: Value = serde_json::from_str(r#"{"field_1":"value_2"}"#).unwrap();
expected_context.insert_step_result("my_step".to_string(), DataResult::Ok(data2));
thread::spawn(move || {
sender_input.try_send(context).unwrap();
});
step.receiver = Some(receiver_input);
step.sender = Some(sender_output);
step.name = "my_step".to_string();
step.actions =
serde_json::from_str(r#"[{"field":"field_1","pattern": "value_2"}]"#).unwrap();
step.exec().await.unwrap();
assert_eq!(expected_context, receiver_output.recv().await.unwrap());
}
#[apply(test!)]
async fn exec_with_array() {
let mut step = Transformer::default();
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();
let data: Value = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
let context = Context::new("before".to_string(), DataResult::Ok(data.clone()));
let mut expected_context_1 = context.clone();
let data: Value = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
expected_context_1.insert_step_result("my_step".to_string(), DataResult::Ok(data));
let mut expected_context_2 = context.clone();
let data: Value = serde_json::from_str(r#"{"field_1":"value_2"}"#).unwrap();
expected_context_2.insert_step_result("my_step".to_string(), DataResult::Ok(data));
thread::spawn(move || {
sender_input.try_send(context).unwrap();
});
step.receiver = Some(receiver_input);
step.sender = Some(sender_output);
step.name = "my_step".to_string();
step.actions = serde_json::from_str(
r#"[{"pattern": "[{\"field_1\":\"value_1\"},{\"field_1\":\"value_2\"}]"}]"#,
)
.unwrap();
step.exec().await.unwrap();
assert_eq!(expected_context_1, receiver_output.recv().await.unwrap());
assert_eq!(expected_context_2, receiver_output.recv().await.unwrap());
}
}