transferred_core/
transfer.rs1use arrow::record_batch::RecordBatch;
2use async_trait::async_trait;
3use futures::stream::BoxStream;
4
5use crate::{RunReport, TransferredError};
6
7pub type BatchStream = BoxStream<'static, Result<RecordBatch, TransferredError>>;
9
10#[async_trait]
12pub trait Source: Send {
13 async fn stream_partitions(self: Box<Self>) -> Result<Vec<BatchStream>, TransferredError>;
16}
17
18#[async_trait]
20pub trait Destination: Send {
21 async fn write_partitions(
24 self: Box<Self>,
25 partitions: Vec<BatchStream>,
26 ) -> Result<RunReport, TransferredError>;
27}
28
29pub struct Transfer {
31 source: Box<dyn Source>,
32 destination: Box<dyn Destination>,
33}
34
35impl Transfer {
36 #[must_use]
38 pub fn new(source: Box<dyn Source>, destination: Box<dyn Destination>) -> Self {
39 Self {
40 source,
41 destination,
42 }
43 }
44
45 pub async fn run(self) -> Result<RunReport, TransferredError> {
50 let partitions = self.source.stream_partitions().await?;
51 self.destination.write_partitions(partitions).await
52 }
53}