use crate::error::FaucetError;
use crate::traits::{Sink, Source};
use futures_core::Stream;
use serde_json::Value;
use std::pin::Pin;
#[derive(Debug, Clone)]
pub struct PipelineResult {
pub records_written: usize,
pub bookmark: Option<Value>,
}
pub struct Pipeline<'a, So: Source + ?Sized, Si: Sink + ?Sized> {
source: &'a So,
sink: &'a Si,
}
impl<'a, So: Source + ?Sized, Si: Sink + ?Sized> Pipeline<'a, So, Si> {
pub fn new(source: &'a So, sink: &'a Si) -> Self {
Self { source, sink }
}
pub async fn run(&self) -> Result<PipelineResult, FaucetError> {
let (records, bookmark) = self.source.fetch_all_incremental().await?;
let records_written = if records.is_empty() {
0
} else {
self.sink.write_batch(&records).await?
};
self.sink.flush().await?;
tracing::info!(
records_written,
has_bookmark = bookmark.is_some(),
"pipeline batch run complete"
);
Ok(PipelineResult {
records_written,
bookmark,
})
}
}
pub async fn run_stream<S, Si>(mut pages: S, sink: &Si) -> Result<PipelineResult, FaucetError>
where
S: Stream<Item = Result<Vec<Value>, FaucetError>> + Unpin,
Si: Sink + ?Sized,
{
let mut records_written = 0usize;
loop {
let page = std::future::poll_fn(|cx| Pin::new(&mut pages).poll_next(cx)).await;
match page {
Some(Ok(records)) => {
if !records.is_empty() {
records_written += sink.write_batch(&records).await?;
}
}
Some(Err(e)) => return Err(e),
None => break,
}
}
sink.flush().await?;
tracing::info!(records_written, "pipeline streaming run complete");
Ok(PipelineResult {
records_written,
bookmark: None,
})
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use serde_json::json;
struct MockSource(Vec<Value>);
#[async_trait]
impl Source for MockSource {
async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
Ok(self.0.clone())
}
}
struct IncrementalSource {
records: Vec<Value>,
bookmark: Value,
}
#[async_trait]
impl Source for IncrementalSource {
async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
Ok(self.records.clone())
}
async fn fetch_all_incremental(&self) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
Ok((self.records.clone(), Some(self.bookmark.clone())))
}
}
struct FailingSource;
#[async_trait]
impl Source for FailingSource {
async fn fetch_all(&self) -> Result<Vec<Value>, FaucetError> {
Err(FaucetError::Auth("no credentials".into()))
}
}
struct MockSink(std::sync::Mutex<Vec<Value>>);
impl MockSink {
fn new() -> Self {
Self(std::sync::Mutex::new(Vec::new()))
}
fn written(&self) -> Vec<Value> {
self.0.lock().unwrap().clone()
}
}
#[async_trait]
impl Sink for MockSink {
async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
self.0.lock().unwrap().extend(records.iter().cloned());
Ok(records.len())
}
}
struct FailingSink;
#[async_trait]
impl Sink for FailingSink {
async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
Err(FaucetError::Sink("write failed".into()))
}
}
#[tokio::test]
async fn batch_pipeline_writes_all_records() {
let source = MockSource(vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})]);
let sink = MockSink::new();
let result = Pipeline::new(&source, &sink).run().await.unwrap();
assert_eq!(result.records_written, 3);
assert!(result.bookmark.is_none());
assert_eq!(sink.written().len(), 3);
}
#[tokio::test]
async fn batch_pipeline_returns_bookmark() {
let source = IncrementalSource {
records: vec![json!({"id": 1, "ts": "2024-12-01"})],
bookmark: json!("2024-12-01"),
};
let sink = MockSink::new();
let result = Pipeline::new(&source, &sink).run().await.unwrap();
assert_eq!(result.records_written, 1);
assert_eq!(result.bookmark, Some(json!("2024-12-01")));
}
#[tokio::test]
async fn batch_pipeline_empty_source() {
let source = MockSource(vec![]);
let sink = MockSink::new();
let result = Pipeline::new(&source, &sink).run().await.unwrap();
assert_eq!(result.records_written, 0);
assert!(sink.written().is_empty());
}
#[tokio::test]
async fn batch_pipeline_source_error_propagates() {
let source = FailingSource;
let sink = MockSink::new();
let result = Pipeline::new(&source, &sink).run().await;
assert!(result.is_err());
assert!(sink.written().is_empty());
}
#[tokio::test]
async fn batch_pipeline_sink_error_propagates() {
let source = MockSource(vec![json!({"id": 1})]);
let sink = FailingSink;
let result = Pipeline::new(&source, &sink).run().await;
assert!(result.is_err());
}
#[tokio::test]
async fn batch_pipeline_with_trait_objects() {
let source: Box<dyn Source> = Box::new(MockSource(vec![json!({"id": 1})]));
let sink: Box<dyn Sink> = Box::new(MockSink::new());
let result = Pipeline::new(source.as_ref(), sink.as_ref())
.run()
.await
.unwrap();
assert_eq!(result.records_written, 1);
}
#[tokio::test]
async fn stream_pipeline_writes_pages() {
let pages: Vec<Result<Vec<Value>, FaucetError>> = vec![
Ok(vec![json!({"id": 1}), json!({"id": 2})]),
Ok(vec![json!({"id": 3})]),
];
let stream = futures::stream::iter(pages);
let sink = MockSink::new();
let result = run_stream(stream, &sink).await.unwrap();
assert_eq!(result.records_written, 3);
assert!(result.bookmark.is_none());
assert_eq!(sink.written().len(), 3);
}
#[tokio::test]
async fn stream_pipeline_empty() {
let pages: Vec<Result<Vec<Value>, FaucetError>> = vec![];
let stream = futures::stream::iter(pages);
let sink = MockSink::new();
let result = run_stream(stream, &sink).await.unwrap();
assert_eq!(result.records_written, 0);
}
#[tokio::test]
async fn stream_pipeline_skips_empty_pages() {
let pages: Vec<Result<Vec<Value>, FaucetError>> = vec![
Ok(vec![json!({"id": 1})]),
Ok(vec![]),
Ok(vec![json!({"id": 2})]),
];
let stream = futures::stream::iter(pages);
let sink = MockSink::new();
let result = run_stream(stream, &sink).await.unwrap();
assert_eq!(result.records_written, 2);
}
#[tokio::test]
async fn stream_pipeline_error_in_page_propagates() {
let pages: Vec<Result<Vec<Value>, FaucetError>> = vec![
Ok(vec![json!({"id": 1})]),
Err(FaucetError::HttpStatus {
status: 500,
url: "https://example.com".into(),
body: "Internal Server Error".into(),
}),
];
let stream = futures::stream::iter(pages);
let sink = MockSink::new();
let result = run_stream(stream, &sink).await;
assert!(result.is_err());
assert_eq!(sink.written().len(), 1);
}
#[tokio::test]
async fn stream_pipeline_sink_error_propagates() {
let pages: Vec<Result<Vec<Value>, FaucetError>> = vec![Ok(vec![json!({"id": 1})])];
let stream = futures::stream::iter(pages);
let sink = FailingSink;
let result = run_stream(stream, &sink).await;
assert!(result.is_err());
}
#[tokio::test]
async fn stream_pipeline_with_trait_object_sink() {
let pages: Vec<Result<Vec<Value>, FaucetError>> = vec![Ok(vec![json!({"id": 1})])];
let stream = futures::stream::iter(pages);
let sink: Box<dyn Sink> = Box::new(MockSink::new());
let result = run_stream(stream, sink.as_ref()).await.unwrap();
assert_eq!(result.records_written, 1);
}
}