use crate::connector::Connector;
use crate::document::DocumentType;
use crate::helper::string::DisplayOnlyForDebugging;
use crate::step::Step;
use crate::DataResult;
use crate::{connector::ConnectorType, Context};
use async_channel::{Receiver, Sender};
use async_trait::async_trait;
use futures::StreamExt;
use serde::Deserialize;
use std::io;
use uuid::Uuid;
#[derive(Debug, Deserialize, Clone)]
#[serde(default, deny_unknown_fields)]
pub struct Reader {
#[serde(rename = "connector")]
#[serde(alias = "conn")]
pub connector_type: ConnectorType,
#[serde(rename = "document")]
#[serde(alias = "doc")]
pub document_type: DocumentType,
#[serde(alias = "alias")]
pub name: String,
#[serde(alias = "data")]
pub data_type: String,
#[serde(skip)]
pub receiver: Option<Receiver<Context>>,
#[serde(skip)]
pub sender: Option<Sender<Context>>,
pub concurrency_limit: usize,
}
impl Default for Reader {
fn default() -> Self {
let uuid = Uuid::new_v4();
Reader {
connector_type: ConnectorType::default(),
document_type: DocumentType::default(),
name: uuid.simple().to_string(),
data_type: DataResult::OK.to_string(),
receiver: None,
sender: None,
concurrency_limit: 1,
}
}
}
#[async_trait]
impl Step for Reader {
fn set_receiver(&mut self, receiver: Receiver<Context>) {
self.receiver = Some(receiver);
}
fn receiver(&self) -> Option<&Receiver<Context>> {
self.receiver.as_ref()
}
fn set_sender(&mut self, sender: Sender<Context>) {
self.sender = Some(sender);
}
fn sender(&self) -> Option<&Sender<Context>> {
self.sender.as_ref()
}
#[instrument(name = "reader::exec",
skip(self),
fields(name=self.name,
data_type=self.data_type,
concurrency_limit=self.concurrency_limit))]
async fn exec(&self) -> io::Result<()> {
info!("Start reading data...");
let mut connector = self.connector_type.clone().boxed_inner();
let document = self.document_type.clone().boxed_inner();
connector.set_document(document)?;
let mut receiver_stream = self.receive().await;
let mut has_data_been_received = false;
while let Some(context_received) = receiver_stream.next().await {
if !has_data_been_received {
has_data_been_received = true;
}
if !context_received.input().is_type(self.data_type.as_ref()) {
trace!("handles only this data type");
self.send(&context_received).await;
continue;
}
connector.set_parameters(context_received.to_value()?);
connector.paginate().await?
.filter_map(|connector_result| async { match connector_result {
Ok(connector) => Some(connector),
Err(e) => {
warn!(
error = %e,
"pagination through the paginator failed"
);
None
}
}})
.for_each_concurrent(None, |connector| {
let step = self.clone();
let context = Some(context_received.clone());
async move {
read(&step, &mut connector.clone(), &context).await;
}}).await;
}
if !has_data_been_received {
connector.paginate().await?
.filter_map(|connector_result| async { match connector_result {
Ok(connector) => Some(connector),
Err(e) => {
warn!(
error = %e,
"pagination through the paginator failed"
);
None
}
}})
.for_each_concurrent(None, |connector| {
let step = self.clone();
async move {
read(&step, &mut connector.clone(), &None).await;
}}).await;
}
info!("stops reading data and sending context in the channel");
Ok(())
}
fn name(&self) -> String {
self.name.clone()
}
}
async fn read<'step>(
step: &'step Reader,
connector: &'step mut Box<dyn Connector>,
context: &'step Option<Context>,
) {
let dataset = match connector.fetch().await {
Ok(Some(dataset)) => {
info!("read and forward data");
dataset
},
Ok(None) => {
info!(document = connector.document().display_only_for_debugging(), "No data found through the connector. If it's not normal, check the document configuration.");
return
},
Err(e) => {
warn!(
error = %e,
"fetch data failed"
);
if let Some(context) = context {
let mut context_in_err = context.clone();
context_in_err.insert_step_result(
step.name(),
DataResult::Err((
context.input().to_value(),
io::Error::new(e.kind(), e.to_string()),
)),
);
step.send(&context_in_err).await;
}
return;
}
};
let step = step.clone();
let context = context.clone();
let max_concurrency = step.concurrency_limit;
dataset.map(|data_result| async {
let context = match context.clone() {
Some(ref mut context) => {
context.insert_step_result(step.name(), data_result);
context.clone()
},
None => Context::new(step.name(), data_result),
};
step.send(&context).await;
}).buffer_unordered(max_concurrency)
.collect::<Vec<_>>()
.await;
}
#[cfg(test)]
mod tests {
use super::*;
use macro_rules_attribute::apply;
use smol_macros::test;
use serde_json::Value;
use std::io::{Error, ErrorKind};
use std::thread;
use crate::connector::in_memory::InMemory;
#[apply(test!)]
async fn exec_with_different_data_result_type() {
let mut step = Reader::default();
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();
let data = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
let error = Error::new(ErrorKind::InvalidData, "My error");
let context = Context::new("before".to_string(), DataResult::Err((data, error)));
let expected_context = context.clone();
thread::spawn(move || {
sender_input.try_send(context).unwrap();
});
step.receiver = Some(receiver_input);
step.sender = Some(sender_output);
step.exec().await.unwrap();
assert_eq!(expected_context, receiver_output.recv().await.unwrap());
}
#[apply(test!)]
async fn exec_with_same_data_result_type() {
let mut step = Reader::default();
let (sender_input, receiver_input) = async_channel::unbounded();
let (sender_output, receiver_output) = async_channel::unbounded();
let data: Value = serde_json::from_str(r#"{"field_1":"value_1"}"#).unwrap();
let context = Context::new("before".to_string(), DataResult::Ok(data.clone()));
let mut expected_context = context.clone();
let data2: Value = serde_json::from_str(r#"{"field_1":"value_2"}"#).unwrap();
expected_context.insert_step_result("my_step".to_string(), DataResult::Ok(data2));
thread::spawn(move || {
sender_input.try_send(context).unwrap();
});
step.receiver = Some(receiver_input);
step.sender = Some(sender_output);
step.name = "my_step".to_string();
step.connector_type = ConnectorType::InMemory(InMemory::new(r#"{"field_1":"value_2"}"#));
step.exec().await.unwrap();
assert_eq!(expected_context, receiver_output.recv().await.unwrap());
}
}