Skip to main content

transferred_core/
transfer.rs

1use arrow::record_batch::RecordBatch;
2use async_trait::async_trait;
3use futures::stream::BoxStream;
4
5use crate::{RunReport, TransferredError};
6
7/// Boxed `Stream` of Arrow batches — one partition's data.
8pub type BatchStream = BoxStream<'static, Result<RecordBatch, TransferredError>>;
9
10/// A data source. Yields one or more partitions of Arrow batches.
11#[async_trait]
12pub trait Source: Send {
13    /// Consume the source and produce its partitions. Single-shot.
14    /// Non-partitionable sources return a single-element `Vec`.
15    async fn stream_partitions(self: Box<Self>) -> Result<Vec<BatchStream>, TransferredError>;
16}
17
18/// A destination. Writes batch partitions atomically and reports stats.
19#[async_trait]
20pub trait Destination: Send {
21    /// Consume the destination and write the partitions. Single-shot.
22    /// Schema is taken from the first batch each partition emits.
23    async fn write_partitions(
24        self: Box<Self>,
25        partitions: Vec<BatchStream>,
26    ) -> Result<RunReport, TransferredError>;
27}
28
29/// Orchestrates a single end-to-end run from a `Source` to a `Destination`.
30pub struct Transfer {
31    source: Box<dyn Source>,
32    destination: Box<dyn Destination>,
33}
34
35impl Transfer {
36    /// Build a transfer.
37    #[must_use]
38    pub fn new(source: Box<dyn Source>, destination: Box<dyn Destination>) -> Self {
39        Self {
40            source,
41            destination,
42        }
43    }
44
45    /// Fetch partitions, hand them to the destination.
46    ///
47    /// # Errors
48    /// Propagates any error from partition setup or write.
49    pub async fn run(self) -> Result<RunReport, TransferredError> {
50        let partitions = self.source.stream_partitions().await?;
51        self.destination.write_partitions(partitions).await
52    }
53}