1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
pub mod decode;
pub mod decompress;
pub mod metadata;
pub mod schema;

use std::io::{Read, Seek};
use std::sync::Arc;

use tokio::io::{AsyncRead, AsyncSeek};

use self::metadata::{read_metadata, FileMetadata};
use self::schema::{create_schema, TypeDescription};
use crate::arrow_reader::Cursor;
use crate::error::Result;
use crate::proto::{StripeFooter, StripeInformation};
use crate::reader::metadata::read_metadata_async;

pub struct Reader<R> {
    pub(crate) inner: R,
    metadata: Box<FileMetadata>,
    pub(crate) schema: Arc<TypeDescription>,
}

impl<R: Read + Seek> Reader<R> {
    pub fn new(mut r: R) -> Result<Self> {
        let metadata = Box::new(read_metadata(&mut r)?);
        let schema = create_schema(&metadata.footer.types, 0)?;

        Ok(Self {
            inner: r,
            metadata,
            schema,
        })
    }
}

impl<R: Read> Reader<R> {
    pub fn new_with_metadata(r: R, metadata: FileMetadata) -> Result<Self> {
        let schema = create_schema(&metadata.footer.types, 0)?;

        Ok(Self {
            inner: r,
            metadata: Box::new(metadata),
            schema,
        })
    }

    pub fn select(self, fields: &[&str]) -> Result<Cursor<R>> {
        Cursor::new(self, fields)
    }
}

impl<R> Reader<R> {
    pub fn metadata(&self) -> &FileMetadata {
        &self.metadata
    }

    pub fn schema(&self) -> &TypeDescription {
        &self.schema
    }

    pub fn stripe(&self, index: usize) -> Option<StripeInformation> {
        self.metadata.footer.stripes.get(index).cloned()
    }

    pub fn stripe_footer(&mut self, stripe: usize) -> &StripeFooter {
        &self.metadata.stripe_footers[stripe]
    }
}

impl<R: AsyncRead + AsyncSeek + Unpin + Send> Reader<R> {
    pub async fn new_async(mut r: R) -> Result<Self> {
        let metadata = Box::new(read_metadata_async(&mut r).await?);
        let schema = create_schema(&metadata.footer.types, 0)?;

        Ok(Self {
            inner: r,
            metadata,
            schema,
        })
    }
}