1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
use std::{io::Cursor, sync::Arc};

use crate::ArrowBatch;
use anyhow::{Context, Result};
use arrow2::io::ipc;

pub trait TransportFormat {
    fn read_chunks(bytes: &[u8]) -> Result<Vec<ArrowBatch>>;
    fn path() -> &'static str;
}

pub struct ArrowIpc;

impl TransportFormat for ArrowIpc {
    fn read_chunks(bytes: &[u8]) -> Result<Vec<ArrowBatch>> {
        let mut reader = Cursor::new(bytes);

        let metadata = ipc::read::read_file_metadata(&mut reader).context("read metadata")?;

        let schema = Arc::new(metadata.schema.clone());

        let reader = ipc::read::FileReader::new(reader, metadata, None, None);

        let chunks = reader
            .map(|chunk| {
                chunk.context("read chunk").map(|chunk| ArrowBatch {
                    chunk,
                    schema: schema.clone(),
                })
            })
            .collect::<Result<Vec<ArrowBatch>>>()?;

        Ok(chunks)
    }

    fn path() -> &'static str {
        "arrow-ipc"
    }
}