use crate::connector::ConnectorType;
use crate::document::DocumentType;
use crate::step::{DataResult, Step};
use crate::Context;
use async_channel::{Receiver, Sender};
use async_trait::async_trait;
use smol::stream::StreamExt;
use serde::{Deserialize, Serialize};
use std::io;
use uuid::Uuid;
use crate::helper::string::DisplayOnlyForDebugging;
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(default, deny_unknown_fields)]
pub struct Writer {
#[serde(rename = "connector")]
#[serde(alias = "conn")]
connector_type: ConnectorType,
#[serde(rename = "document")]
#[serde(alias = "doc")]
document_type: DocumentType,
#[serde(alias = "alias")]
pub name: String,
#[serde(alias = "data")]
pub data_type: String,
#[serde(alias = "batch")]
pub record_limit: usize,
pub concurrency_limit: usize,
#[serde(skip)]
pub receiver: Option<Receiver<Context>>,
#[serde(skip)]
pub sender: Option<Sender<Context>>,
}
impl Default for Writer {
fn default() -> Self {
let uuid = Uuid::new_v4();
Writer {
connector_type: ConnectorType::default(),
document_type: DocumentType::default(),
name: uuid.simple().to_string(),
data_type: DataResult::OK.to_string(),
record_limit: 100,
concurrency_limit: 1,
receiver: None,
sender: None,
}
}
}
#[async_trait]
impl Step for Writer {
fn set_receiver(&mut self, receiver: Receiver<Context>) {
self.receiver = Some(receiver);
}
fn receiver(&self) -> Option<&Receiver<Context>> {
self.receiver.as_ref()
}
fn set_sender(&mut self, sender: Sender<Context>) {
self.sender = Some(sender);
}
fn sender(&self) -> Option<&Sender<Context>> {
self.sender.as_ref()
}
#[instrument(name = "writer::exec",
skip(self),
fields(name=self.name,
data_type=self.data_type,
concurrency_limit=self.concurrency_limit,
record_limit=self.record_limit,
))]
async fn exec(&self) -> io::Result<()> {
info!("Start writing data...");
let mut total_written: usize = 0;
let mut connector = self.connector_type.clone().boxed_inner();
let document = self.document_type.clone().boxed_inner();
connector.set_document(document.clone())?;
let mut dataset = Vec::default();
let mut receiver_stream = self.receive().await;
let default_connector = connector.clone();
let mut last_context_received = None;
while let Some(context_received) = receiver_stream.next().await {
if !context_received.input().is_type(self.data_type.as_ref()) {
trace!("Handles only this data type");
self.send(&context_received).await;
continue;
}
last_context_received = Some(context_received.clone());
{
if connector.is_resource_will_change(context_received.to_value()?)?
&& !dataset.is_empty()
{
info!(dataset_length = dataset.len(), "Next write");
match connector.send(&dataset).await {
Ok(_) => {
total_written+=dataset.len();
info!(dataset_length = dataset.len(), total = &total_written, "Write with success");
for data in dataset {
let mut context = context_received.clone();
context.insert_step_result(self.name(), data);
self.send(&context).await;
}
}
Err(e) => {
warn!(
error = format!("{:?}", &e).as_str(),
dataset = &dataset.display_only_for_debugging(),
"Can't write data"
);
for data in dataset {
let mut context = context_received.clone();
context.insert_step_result(
self.name(),
DataResult::Err((
data.to_value(),
io::Error::new(e.kind(), e.to_string()),
)),
);
self.send(&context).await;
}
}
};
dataset = Vec::default();
connector = default_connector.clone();
}
}
connector.set_parameters(context_received.to_value()?);
dataset.push(context_received.input());
if self.record_limit <= dataset.len() && document.can_append() {
info!(dataset_length = dataset.len(), "Next write");
match connector.send(&dataset).await {
Ok(_) => {
total_written+=dataset.len();
info!(dataset_length = dataset.len(), total = total_written, "Write with success");
for data in dataset {
let mut context = context_received.clone();
context.insert_step_result(self.name(), data);
self.send(&context).await;
}
}
Err(e) => {
warn!(
error = format!("{:?}", &e).as_str(),
dataset = &dataset.display_only_for_debugging(),
"Can't write data"
);
for data in dataset {
let mut context = context_received.clone();
context.insert_step_result(
self.name(),
DataResult::Err((
data.to_value(),
io::Error::new(e.kind(), e.to_string()),
)),
);
self.send(&context).await;
}
}
};
dataset = Vec::default();
}
}
if !dataset.is_empty() {
info!(dataset_length = dataset.len(), "Last write");
match connector.send(&dataset).await {
Ok(_) => {
total_written+=dataset.len();
info!(dataset_length = dataset.len(), total = total_written, "Write with success");
for data in dataset {
let context = match &last_context_received {
Some(context_received) => {
let mut context = context_received.clone();
context.insert_step_result(self.name(), data);
context
}
None => Context::new(self.name(), data),
};
self.send(&context).await;
}
}
Err(e) => {
warn!(
error = format!("{:?}", &e).as_str(),
dataset = &dataset.display_only_for_debugging(),
"Can't write data"
);
for data in dataset {
let context = match &last_context_received {
Some(context_received) => {
let mut context = context_received.clone();
context.insert_step_result(
self.name(),
DataResult::Err((
data.to_value(),
io::Error::new(e.kind(), e.to_string()),
)),
);
context
}
None => Context::new(
self.name(),
DataResult::Err((
data.to_value(),
io::Error::new(e.kind(), e.to_string()),
)),
),
};
self.send(&context).await;
}
}
};
}
info!(
total = total_written,
"Stops writing data and sending context in the channel"
);
Ok(())
}
fn number(&self) -> usize {
self.concurrency_limit
}
fn name(&self) -> String {
self.name.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use macro_rules_attribute::apply;
use smol_macros::test;
use serde_json::Value;
use std::io::{Error, ErrorKind};
use std::thread;
use crate::connector::in_memory::InMemory;
#[apply(test!)]
async fn exec_with_different_data_result_type() {
let mut step = Writer::default();
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();
let data = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
let error = Error::new(ErrorKind::InvalidData, "My error");
let context = Context::new("before".to_string(), DataResult::Err((data, error)));
let expected_context = context.clone();
thread::spawn(move || {
sender_input.try_send(context).unwrap();
});
step.receiver = Some(receiver_input);
step.sender = Some(sender_output);
step.exec().await.unwrap();
assert_eq!(expected_context, receiver_output.recv().await.unwrap());
}
#[apply(test!)]
async fn exec_with_same_data_result_type() {
let mut step = Writer::default();
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();
let data: Value = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
let context = Context::new("before".to_string(), DataResult::Ok(data.clone()));
let mut expected_context = context.clone();
expected_context.insert_step_result("my_step".to_string(), DataResult::Ok(data.clone()));
thread::spawn(move || {
sender_input.try_send(context).unwrap();
});
step.receiver = Some(receiver_input);
step.sender = Some(sender_output);
step.name = "my_step".to_string();
step.connector_type = ConnectorType::InMemory(InMemory::default());
step.exec().await.unwrap();
assert_eq!(expected_context, receiver_output.recv().await.unwrap());
}
}