polars_io/parquet/read/
async_impl.rs1use arrow::datatypes::ArrowSchemaRef;
4use object_store::path::Path as ObjectPath;
5use polars_buffer::Buffer;
6use polars_core::prelude::*;
7use polars_parquet::parquet::error::ParquetError;
8use polars_parquet::parquet::read::{deserialize_metadata, deserialize_num_rows};
9use polars_parquet::parquet::{DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, PARQUET_MAGIC};
10use polars_parquet::write::FileMetadata;
11use polars_utils::pl_path::PlRefPath;
12
13use crate::cloud::{
14 CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
15};
16use crate::parquet::metadata::FileMetadataRef;
17
18pub struct ParquetObjectStore {
19 store: PolarsObjectStore,
20 path: ObjectPath,
21 length: Option<usize>,
22 metadata: Option<FileMetadataRef>,
23 schema: Option<ArrowSchemaRef>,
24}
25
26impl ParquetObjectStore {
27 pub async fn from_uri(
28 uri: PlRefPath,
29 options: Option<&CloudOptions>,
30 metadata: Option<FileMetadataRef>,
31 ) -> PolarsResult<Self> {
32 let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;
33 let path = object_path_from_str(&prefix)?;
34
35 Ok(ParquetObjectStore {
36 store,
37 path,
38 length: None,
39 metadata,
40 schema: None,
41 })
42 }
43
44 async fn length(&mut self) -> PolarsResult<usize> {
46 if self.length.is_none() {
47 self.length = Some(self.store.head(&self.path).await?.size as usize);
48 }
49 Ok(self.length.unwrap())
50 }
51
52 pub async fn num_rows(&mut self) -> PolarsResult<usize> {
54 let metadata = self.get_metadata().await?;
55 Ok(metadata.num_rows)
56 }
57
58 async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {
60 let length = self.length().await?;
61 fetch_metadata(&self.store, &self.path, length).await
62 }
63
64 pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
66 if self.metadata.is_none() {
67 self.metadata = Some(Arc::new(self.fetch_metadata().await?));
68 }
69 Ok(self.metadata.as_ref().unwrap())
70 }
71
72 pub async fn num_rows_only(&mut self) -> PolarsResult<i64> {
75 let length = self.length().await?;
76 fetch_num_rows(&self.store, &self.path, length).await
77 }
78
79 pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
80 self.schema = Some(match self.schema.as_ref() {
81 Some(schema) => Arc::clone(schema),
82 None => {
83 let metadata = self.get_metadata().await?;
84 let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;
85 Arc::new(arrow_schema)
86 },
87 });
88
89 Ok(self.schema.clone().unwrap())
90 }
91}
92
93fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {
94 if N <= reader.len() {
95 let (head, tail) = reader.split_at(N);
96 *reader = tail;
97 Some(head.try_into().unwrap())
98 } else {
99 None
100 }
101}
102
103fn read_i32le(reader: &mut &[u8]) -> Option<i32> {
104 read_n(reader).map(i32::from_le_bytes)
105}
106
107async fn fetch_footer_bytes(
112 store: &PolarsObjectStore,
113 path: &ObjectPath,
114 file_byte_length: usize,
115) -> PolarsResult<Buffer<u8>> {
116 let out_of_spec = |msg: &str| ParquetError::OutOfSpec(msg.to_string());
117
118 let prefetch_len = std::cmp::min(DEFAULT_FOOTER_READ_SIZE as usize, file_byte_length);
119 let prefetched = store
120 .get_range(
121 path,
122 file_byte_length
123 .checked_sub(prefetch_len)
124 .ok_or_else(|| out_of_spec("not enough bytes to contain parquet footer"))?
125 ..file_byte_length,
126 )
127 .await?;
128
129 if prefetched.len() < FOOTER_SIZE as usize {
130 return Err(out_of_spec("not enough bytes to contain parquet footer").into());
131 }
132
133 let footer_byte_length: usize = {
135 let tail_start = prefetched.len() - FOOTER_SIZE as usize;
136 let reader = &mut &prefetched.as_ref()[tail_start..];
137 let footer_byte_size = read_i32le(reader).unwrap();
138 let magic = read_n(reader).unwrap();
139 debug_assert!(reader.is_empty());
140 if magic != PARQUET_MAGIC {
141 return Err(out_of_spec("incorrect magic in parquet footer").into());
142 }
143 footer_byte_size
144 .try_into()
145 .map_err(|_| out_of_spec("negative footer byte length"))?
146 };
147
148 let footer_len = FOOTER_SIZE as usize + footer_byte_length;
149 if footer_len <= prefetched.len() {
150 let start = prefetched.len() - footer_len;
152 Ok(prefetched.sliced(start..))
153 } else {
154 store
156 .get_range(
157 path,
158 file_byte_length
159 .checked_sub(footer_len)
160 .ok_or_else(|| out_of_spec("not enough bytes to contain parquet footer"))?
161 ..file_byte_length,
162 )
163 .await
164 }
165}
166
167pub async fn fetch_metadata(
169 store: &PolarsObjectStore,
170 path: &ObjectPath,
171 file_byte_length: usize,
172) -> PolarsResult<FileMetadata> {
173 let footer = fetch_footer_bytes(store, path, file_byte_length).await?;
174 Ok(deserialize_metadata(footer)?)
175}
176
177pub async fn fetch_num_rows(
179 store: &PolarsObjectStore,
180 path: &ObjectPath,
181 file_byte_length: usize,
182) -> PolarsResult<i64> {
183 let footer = fetch_footer_bytes(store, path, file_byte_length).await?;
184 Ok(deserialize_num_rows(footer)?)
185}