Skip to main content

polars_io/parquet/read/
async_impl.rs

1//! Read parquet files in parallel from the Object Store without a third party crate.
2
3use arrow::datatypes::ArrowSchemaRef;
4use object_store::path::Path as ObjectPath;
5use polars_buffer::Buffer;
6use polars_core::prelude::*;
7use polars_parquet::parquet::error::ParquetError;
8use polars_parquet::parquet::read::{deserialize_metadata, deserialize_num_rows};
9use polars_parquet::parquet::{DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, PARQUET_MAGIC};
10use polars_parquet::write::FileMetadata;
11use polars_utils::pl_path::PlRefPath;
12
13use crate::cloud::{
14    CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
15};
16use crate::parquet::metadata::FileMetadataRef;
17
18pub struct ParquetObjectStore {
19    store: PolarsObjectStore,
20    path: ObjectPath,
21    length: Option<usize>,
22    metadata: Option<FileMetadataRef>,
23    schema: Option<ArrowSchemaRef>,
24}
25
26impl ParquetObjectStore {
27    pub async fn from_uri(
28        uri: PlRefPath,
29        options: Option<&CloudOptions>,
30        metadata: Option<FileMetadataRef>,
31    ) -> PolarsResult<Self> {
32        let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;
33        let path = object_path_from_str(&prefix)?;
34
35        Ok(ParquetObjectStore {
36            store,
37            path,
38            length: None,
39            metadata,
40            schema: None,
41        })
42    }
43
44    /// Initialize the length property of the object, unless it has already been fetched.
45    async fn length(&mut self) -> PolarsResult<usize> {
46        if self.length.is_none() {
47            self.length = Some(self.store.head(&self.path).await?.size as usize);
48        }
49        Ok(self.length.unwrap())
50    }
51
52    /// Number of rows in the parquet file.
53    pub async fn num_rows(&mut self) -> PolarsResult<usize> {
54        let metadata = self.get_metadata().await?;
55        Ok(metadata.num_rows)
56    }
57
58    /// Fetch the metadata of the parquet file, do not memoize it.
59    async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {
60        let length = self.length().await?;
61        fetch_metadata(&self.store, &self.path, length).await
62    }
63
64    /// Fetch and memoize the metadata of the parquet file.
65    pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
66        if self.metadata.is_none() {
67            self.metadata = Some(Arc::new(self.fetch_metadata().await?));
68        }
69        Ok(self.metadata.as_ref().unwrap())
70    }
71
72    /// Decode only `FileMetaData.num_rows` from the remote footer.
73    /// Not memoized. Used by `RowCounts` resolve mode.
74    pub async fn num_rows_only(&mut self) -> PolarsResult<i64> {
75        let length = self.length().await?;
76        fetch_num_rows(&self.store, &self.path, length).await
77    }
78
79    pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
80        self.schema = Some(match self.schema.as_ref() {
81            Some(schema) => Arc::clone(schema),
82            None => {
83                let metadata = self.get_metadata().await?;
84                let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;
85                Arc::new(arrow_schema)
86            },
87        });
88
89        Ok(self.schema.clone().unwrap())
90    }
91}
92
93fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {
94    if N <= reader.len() {
95        let (head, tail) = reader.split_at(N);
96        *reader = tail;
97        Some(head.try_into().unwrap())
98    } else {
99        None
100    }
101}
102
103fn read_i32le(reader: &mut &[u8]) -> Option<i32> {
104    read_n(reader).map(i32::from_le_bytes)
105}
106
107/// Speculatively read `DEFAULT_FOOTER_READ_SIZE` from the tail. If the
108/// footer fits in the prefetch (the common case), we're done in one range
109/// request; otherwise re-fetch the full footer. Mirrors the sync
110/// `fetch_footer_buf` strategy.
111async fn fetch_footer_bytes(
112    store: &PolarsObjectStore,
113    path: &ObjectPath,
114    file_byte_length: usize,
115) -> PolarsResult<Buffer<u8>> {
116    let out_of_spec = |msg: &str| ParquetError::OutOfSpec(msg.to_string());
117
118    let prefetch_len = std::cmp::min(DEFAULT_FOOTER_READ_SIZE as usize, file_byte_length);
119    let prefetched = store
120        .get_range(
121            path,
122            file_byte_length
123                .checked_sub(prefetch_len)
124                .ok_or_else(|| out_of_spec("not enough bytes to contain parquet footer"))?
125                ..file_byte_length,
126        )
127        .await?;
128
129    if prefetched.len() < FOOTER_SIZE as usize {
130        return Err(out_of_spec("not enough bytes to contain parquet footer").into());
131    }
132
133    // Trailing 8 bytes: footer size (i32 LE) + magic.
134    let footer_byte_length: usize = {
135        let tail_start = prefetched.len() - FOOTER_SIZE as usize;
136        let reader = &mut &prefetched.as_ref()[tail_start..];
137        let footer_byte_size = read_i32le(reader).unwrap();
138        let magic = read_n(reader).unwrap();
139        debug_assert!(reader.is_empty());
140        if magic != PARQUET_MAGIC {
141            return Err(out_of_spec("incorrect magic in parquet footer").into());
142        }
143        footer_byte_size
144            .try_into()
145            .map_err(|_| out_of_spec("negative footer byte length"))?
146    };
147
148    let footer_len = FOOTER_SIZE as usize + footer_byte_length;
149    if footer_len <= prefetched.len() {
150        // Common case: footer already in the prefetch; zero extra round trips.
151        let start = prefetched.len() - footer_len;
152        Ok(prefetched.sliced(start..))
153    } else {
154        // Fallback: footer larger than the prefetch; re-fetch the full footer.
155        store
156            .get_range(
157                path,
158                file_byte_length
159                    .checked_sub(footer_len)
160                    .ok_or_else(|| out_of_spec("not enough bytes to contain parquet footer"))?
161                    ..file_byte_length,
162            )
163            .await
164    }
165}
166
167/// Asynchronously reads the files' metadata.
168pub async fn fetch_metadata(
169    store: &PolarsObjectStore,
170    path: &ObjectPath,
171    file_byte_length: usize,
172) -> PolarsResult<FileMetadata> {
173    let footer = fetch_footer_bytes(store, path, file_byte_length).await?;
174    Ok(deserialize_metadata(footer)?)
175}
176
177/// Fetch only `FileMetaData.num_rows` from a remote parquet footer.
178pub async fn fetch_num_rows(
179    store: &PolarsObjectStore,
180    path: &ObjectPath,
181    file_byte_length: usize,
182) -> PolarsResult<i64> {
183    let footer = fetch_footer_bytes(store, path, file_byte_length).await?;
184    Ok(deserialize_num_rows(footer)?)
185}