use crate::{BatchableTransformer, ChunkerTransformer, Loader, NodeCache, Persist, Transformer};
use anyhow::Result;
use futures_util::{StreamExt, TryFutureExt, TryStreamExt};
use std::sync::Arc;
use super::IngestionStream;
pub struct IngestionPipeline {
stream: IngestionStream,
storage: Vec<Arc<dyn Persist>>,
concurrency: usize,
}
impl Default for IngestionPipeline {
fn default() -> Self {
Self {
stream: Box::pin(futures_util::stream::empty()),
storage: Default::default(),
concurrency: num_cpus::get(),
}
}
}
impl IngestionPipeline {
pub fn from_loader(loader: impl Loader + 'static) -> Self {
let stream = loader.into_stream();
Self {
stream: stream.boxed(),
..Default::default()
}
}
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}
pub fn filter_cached(mut self, cache: impl NodeCache + 'static) -> Self {
let cache = Arc::new(cache);
self.stream = self
.stream
.try_filter_map(move |node| {
let cache = Arc::clone(&cache);
let current_span = tracing::Span::current();
tokio::spawn(current_span.in_scope(|| async move {
if !cache.get(&node).await {
cache.set(&node).await;
tracing::debug!("Node not in cache, passing through");
Some(node)
} else {
tracing::debug!("Node in cache, skipping");
None
}
}))
.map_err(anyhow::Error::from)
})
.boxed();
self
}
pub fn then(mut self, transformer: impl Transformer + 'static) -> Self {
let transformer = Arc::new(transformer);
let concurrency = transformer.concurrency().unwrap_or(self.concurrency);
self.stream = self
.stream
.map_ok(move |node| {
let transformer = Arc::clone(&transformer);
let current_span = tracing::Span::current();
tokio::spawn(
current_span.in_scope(|| async move { transformer.transform_node(node).await }),
)
.map_err(anyhow::Error::from)
})
.try_buffer_unordered(concurrency)
.map(|x| x.and_then(|x| x))
.boxed();
self
}
pub fn then_in_batch(
mut self,
batch_size: usize,
transformer: impl BatchableTransformer + 'static,
) -> Self {
let transformer = Arc::new(transformer);
let concurrency = transformer.concurrency().unwrap_or(self.concurrency);
self.stream = self
.stream
.try_chunks(batch_size)
.map_ok(move |chunks| {
let transformer = Arc::clone(&transformer);
let current_span = tracing::Span::current();
tokio::spawn(
current_span
.in_scope(|| async move { transformer.batch_transform(chunks).await }),
)
.map_err(anyhow::Error::from)
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(concurrency)
.try_flatten_unordered(concurrency)
.boxed();
self
}
pub fn then_chunk(mut self, chunker: impl ChunkerTransformer + 'static) -> Self {
let chunker = Arc::new(chunker);
let concurrency = chunker.concurrency().unwrap_or(self.concurrency);
self.stream = self
.stream
.map_ok(move |node| {
let chunker = Arc::clone(&chunker);
let current_span = tracing::Span::current();
tokio::spawn(
current_span.in_scope(|| async move { chunker.transform_node(node).await }),
)
.map_err(anyhow::Error::from)
})
.try_buffer_unordered(concurrency)
.try_flatten_unordered(concurrency)
.boxed();
self
}
pub fn then_store_with(mut self, storage: impl Persist + 'static) -> Self {
let storage = Arc::new(storage);
self.storage.push(storage.clone());
if storage.batch_size().is_some() {
self.stream = self
.stream
.try_chunks(storage.batch_size().unwrap())
.map_ok(move |nodes| {
let storage = Arc::clone(&storage);
let current_span = tracing::Span::current();
tokio::spawn(
current_span.in_scope(|| async move { storage.batch_store(nodes).await }),
)
.map_err(anyhow::Error::from)
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(self.concurrency)
.try_flatten_unordered(self.concurrency)
.boxed();
} else {
self.stream = self
.stream
.map_ok(move |node| {
let storage = Arc::clone(&storage);
let current_span = tracing::Span::current();
tokio::spawn(current_span.in_scope(|| async move { storage.store(node).await }))
.map_err(anyhow::Error::from)
})
.err_into::<anyhow::Error>()
.try_buffer_unordered(self.concurrency)
.map(|x| x.and_then(|x| x))
.boxed();
}
self
}
#[tracing::instrument(skip_all, fields(total_nodes), name = "ingestion_pipeline.run")]
pub async fn run(mut self) -> Result<()> {
tracing::info!(
"Starting ingestion pipeline with {} concurrency",
self.concurrency
);
if self.storage.is_empty() {
anyhow::bail!("No storage configured for ingestion pipeline");
}
let setup_futures = self
.storage
.into_iter()
.map(|storage| tokio::spawn(async move { storage.setup().await }))
.collect::<Vec<_>>();
futures_util::future::try_join_all(setup_futures).await?;
let mut total_nodes = 0;
while self.stream.try_next().await?.is_some() {
total_nodes += 1;
}
tracing::warn!("Processed {} nodes", total_nodes);
tracing::Span::current().record("total_nodes", total_nodes);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ingestion::IngestionNode;
use crate::traits::*;
use futures_util::stream;
use mockall::Sequence;
#[test_log::test(tokio::test)]
async fn test_simple_run() {
let mut loader = MockLoader::new();
let mut transformer = MockTransformer::new();
let mut batch_transformer = MockBatchableTransformer::new();
let mut chunker = MockChunkerTransformer::new();
let mut storage = MockPersist::new();
let mut seq = Sequence::new();
loader
.expect_into_stream()
.times(1)
.in_sequence(&mut seq)
.returning(|| Box::pin(stream::iter(vec![Ok(IngestionNode::default())])));
transformer.expect_transform_node().returning(|mut node| {
node.chunk = "transformed".to_string();
Ok(node)
});
transformer.expect_concurrency().returning(|| None);
batch_transformer
.expect_batch_transform()
.times(1)
.in_sequence(&mut seq)
.returning(|nodes| Box::pin(stream::iter(nodes.into_iter().map(Ok))));
batch_transformer.expect_concurrency().returning(|| None);
chunker
.expect_transform_node()
.times(1)
.in_sequence(&mut seq)
.returning(|node| {
let mut nodes = vec![];
for i in 0..3 {
let mut node = node.clone();
node.chunk = format!("transformed_chunk_{}", i);
nodes.push(Ok(node));
}
Box::pin(stream::iter(nodes))
});
chunker.expect_concurrency().returning(|| None);
storage.expect_setup().returning(|| Ok(()));
storage.expect_batch_size().returning(|| None);
storage
.expect_store()
.times(3)
.in_sequence(&mut seq)
.withf(|node| node.chunk.starts_with("transformed_chunk_"))
.returning(Ok);
let pipeline = IngestionPipeline::from_loader(loader)
.then(transformer)
.then_in_batch(1, batch_transformer)
.then_chunk(chunker)
.then_store_with(storage);
pipeline.run().await.unwrap();
}
}