polars_io/ipc/
ipc_reader_async.rs1use std::sync::Arc;
2
3use arrow::io::ipc::read::{FileMetadata, OutOfSpecKind, get_row_count};
4use object_store::ObjectMeta;
5use object_store::path::Path;
6use polars_core::datatypes::IDX_DTYPE;
7use polars_core::frame::DataFrame;
8use polars_core::runtime::ASYNC;
9use polars_core::schema::{Schema, SchemaExt};
10use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
11use polars_utils::mmap::MMapSemaphore;
12use polars_utils::pl_path::PlRefPath;
13use polars_utils::pl_str::PlSmallStr;
14
15use crate::RowIndex;
16use crate::cloud::{
17 CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
18};
19use crate::file_cache::{FileCacheEntry, init_entries_from_uri_list};
20use crate::predicates::PhysicalIoExpr;
21use crate::prelude::{IpcReader, materialize_projection};
22use crate::shared::SerReader;
23
24pub struct IpcReaderAsync {
26 store: PolarsObjectStore,
27 cache_entry: Arc<FileCacheEntry>,
28 path: Path,
29}
30
31#[derive(Default, Clone)]
32pub struct IpcReadOptions {
33 projection: Option<Arc<[PlSmallStr]>>,
35
36 row_limit: Option<usize>,
38
39 row_index: Option<RowIndex>,
41
42 predicate: Option<Arc<dyn PhysicalIoExpr>>,
44}
45
46impl IpcReadOptions {
47 pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
48 self.projection = projection;
49 self
50 }
51
52 pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {
53 self.row_limit = row_limit.into();
54 self
55 }
56
57 pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
58 self.row_index = row_index.into();
59 self
60 }
61
62 pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {
63 self.predicate = predicate.into();
64 self
65 }
66}
67
68impl IpcReaderAsync {
69 pub async fn from_uri(
70 uri: PlRefPath,
71 cloud_options: Option<&CloudOptions>,
72 ) -> PolarsResult<IpcReaderAsync> {
73 let cache_entry =
74 init_entries_from_uri_list([uri.clone()].into_iter(), cloud_options).await?[0].clone();
75 let (CloudLocation { prefix, .. }, store) =
76 build_object_store(uri, cloud_options, false).await?;
77
78 let path = object_path_from_str(&prefix)?;
79
80 Ok(Self {
81 store,
82 cache_entry,
83 path,
84 })
85 }
86
87 async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {
88 self.store.head(&self.path).await
89 }
90
91 async fn file_size(&self) -> PolarsResult<usize> {
92 Ok(self.object_metadata().await?.size as usize)
93 }
94
95 pub async fn metadata(&self) -> PolarsResult<FileMetadata> {
96 let file_size = self.file_size().await?;
97
98 let footer_metadata =
100 self.store
101 .get_range(
102 &self.path,
103 file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {
104 to_compute_err("ipc file size is smaller than the minimum")
105 })?..file_size,
106 )
107 .await?;
108
109 let footer_size = deserialize_footer_metadata(
110 footer_metadata
111 .as_ref()
112 .try_into()
113 .map_err(to_compute_err)?,
114 )?;
115
116 let footer = self
117 .store
118 .get_range(
119 &self.path,
120 file_size
121 .checked_sub(FOOTER_METADATA_SIZE + footer_size)
122 .ok_or_else(|| {
123 to_compute_err("invalid ipc footer metadata: footer size too large")
124 })?..file_size,
125 )
126 .await?;
127
128 arrow::io::ipc::read::deserialize_footer(
129 footer.as_ref(),
130 footer_size.try_into().map_err(to_compute_err)?,
131 )
132 }
133
134 pub async fn data(
135 &self,
136 metadata: Option<&FileMetadata>,
137 options: IpcReadOptions,
138 verbose: bool,
139 ) -> PolarsResult<DataFrame> {
140 let file = ASYNC.block_in_place(|| self.cache_entry.try_open_check_latest())?;
143 let bytes = MMapSemaphore::new_from_file(&file).unwrap();
144
145 let projection = match options.projection.as_deref() {
146 Some(projection) => {
147 fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {
148 if let Some(rc) = row_index {
149 let _ = schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE);
150 }
151 schema
152 }
153
154 let fetched_metadata;
156 let metadata = if let Some(metadata) = metadata {
157 metadata
158 } else {
159 fetched_metadata = self.metadata().await?;
161 &fetched_metadata
162 };
163
164 let schema = prepare_schema(
165 Schema::from_arrow_schema(metadata.schema.as_ref()),
166 options.row_index.as_ref(),
167 );
168
169 let hive_partitions = None;
170
171 materialize_projection(
172 Some(projection),
173 &schema,
174 hive_partitions,
175 options.row_index.is_some(),
176 )
177 },
178 None => None,
179 };
180
181 let reader = <IpcReader<_> as SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))
182 .with_row_index(options.row_index)
183 .with_n_rows(options.row_limit)
184 .with_projection(projection);
185 reader.finish_with_scan_ops(options.predicate, verbose)
186 }
187
188 pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
189 let file = ASYNC.block_in_place(|| self.cache_entry.try_open_check_latest())?;
192 let bytes = MMapSemaphore::new_from_file(&file).unwrap();
193 get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
194 }
195}
196
197const FOOTER_METADATA_SIZE: usize = 10;
198
199fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {
202 let footer_size: usize =
203 i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))
204 .try_into()
205 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
206
207 if &bytes[4..] != b"ARROW1" {
208 polars_bail!(oos = OutOfSpecKind::InvalidFooter);
209 }
210
211 Ok(footer_size)
212}