use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use futures::stream::BoxStream;
use crate::{RunReport, TransferredError};
pub type BatchStream = BoxStream<'static, Result<RecordBatch, TransferredError>>;
#[async_trait]
pub trait Source: Send {
async fn stream_partitions(self: Box<Self>) -> Result<Vec<BatchStream>, TransferredError>;
}
#[async_trait]
pub trait Destination: Send {
async fn write_partitions(
self: Box<Self>,
partitions: Vec<BatchStream>,
) -> Result<RunReport, TransferredError>;
}
pub struct Transfer {
source: Box<dyn Source>,
destination: Box<dyn Destination>,
}
impl Transfer {
#[must_use]
pub fn new(source: Box<dyn Source>, destination: Box<dyn Destination>) -> Self {
Self {
source,
destination,
}
}
pub async fn run(self) -> Result<RunReport, TransferredError> {
let partitions = self.source.stream_partitions().await?;
self.destination.write_partitions(partitions).await
}
}