polars_io/ipc/
ipc_reader_async.rs1use std::sync::Arc;
2
3use arrow::io::ipc::read::{FileMetadata, OutOfSpecKind, get_row_count};
4use object_store::ObjectMeta;
5use object_store::path::Path;
6use polars_core::datatypes::IDX_DTYPE;
7use polars_core::frame::DataFrame;
8use polars_core::schema::{Schema, SchemaExt};
9use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
10use polars_utils::mmap::MMapSemaphore;
11use polars_utils::pl_path::PlRefPath;
12use polars_utils::pl_str::PlSmallStr;
13
14use crate::RowIndex;
15use crate::cloud::{
16 CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
17};
18use crate::file_cache::{FileCacheEntry, init_entries_from_uri_list};
19use crate::predicates::PhysicalIoExpr;
20use crate::prelude::{IpcReader, materialize_projection};
21use crate::shared::SerReader;
22
23pub struct IpcReaderAsync {
25 store: PolarsObjectStore,
26 cache_entry: Arc<FileCacheEntry>,
27 path: Path,
28}
29
30#[derive(Default, Clone)]
31pub struct IpcReadOptions {
32 projection: Option<Arc<[PlSmallStr]>>,
34
35 row_limit: Option<usize>,
37
38 row_index: Option<RowIndex>,
40
41 predicate: Option<Arc<dyn PhysicalIoExpr>>,
43}
44
45impl IpcReadOptions {
46 pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
47 self.projection = projection;
48 self
49 }
50
51 pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {
52 self.row_limit = row_limit.into();
53 self
54 }
55
56 pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
57 self.row_index = row_index.into();
58 self
59 }
60
61 pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {
62 self.predicate = predicate.into();
63 self
64 }
65}
66
67impl IpcReaderAsync {
68 pub async fn from_uri(
69 uri: PlRefPath,
70 cloud_options: Option<&CloudOptions>,
71 ) -> PolarsResult<IpcReaderAsync> {
72 let cache_entry =
73 init_entries_from_uri_list([uri.clone()].into_iter(), cloud_options).await?[0].clone();
74 let (CloudLocation { prefix, .. }, store) =
75 build_object_store(uri, cloud_options, false).await?;
76
77 let path = object_path_from_str(&prefix)?;
78
79 Ok(Self {
80 store,
81 cache_entry,
82 path,
83 })
84 }
85
86 async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {
87 self.store.head(&self.path).await
88 }
89
90 async fn file_size(&self) -> PolarsResult<usize> {
91 Ok(self.object_metadata().await?.size as usize)
92 }
93
94 pub async fn metadata(&self) -> PolarsResult<FileMetadata> {
95 let file_size = self.file_size().await?;
96
97 let footer_metadata =
99 self.store
100 .get_range(
101 &self.path,
102 file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {
103 to_compute_err("ipc file size is smaller than the minimum")
104 })?..file_size,
105 )
106 .await?;
107
108 let footer_size = deserialize_footer_metadata(
109 footer_metadata
110 .as_ref()
111 .try_into()
112 .map_err(to_compute_err)?,
113 )?;
114
115 let footer = self
116 .store
117 .get_range(
118 &self.path,
119 file_size
120 .checked_sub(FOOTER_METADATA_SIZE + footer_size)
121 .ok_or_else(|| {
122 to_compute_err("invalid ipc footer metadata: footer size too large")
123 })?..file_size,
124 )
125 .await?;
126
127 arrow::io::ipc::read::deserialize_footer(
128 footer.as_ref(),
129 footer_size.try_into().map_err(to_compute_err)?,
130 )
131 }
132
133 pub async fn data(
134 &self,
135 metadata: Option<&FileMetadata>,
136 options: IpcReadOptions,
137 verbose: bool,
138 ) -> PolarsResult<DataFrame> {
139 let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
142 let bytes = MMapSemaphore::new_from_file(&file).unwrap();
143
144 let projection = match options.projection.as_deref() {
145 Some(projection) => {
146 fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {
147 if let Some(rc) = row_index {
148 let _ = schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE);
149 }
150 schema
151 }
152
153 let fetched_metadata;
155 let metadata = if let Some(metadata) = metadata {
156 metadata
157 } else {
158 fetched_metadata = self.metadata().await?;
160 &fetched_metadata
161 };
162
163 let schema = prepare_schema(
164 Schema::from_arrow_schema(metadata.schema.as_ref()),
165 options.row_index.as_ref(),
166 );
167
168 let hive_partitions = None;
169
170 materialize_projection(
171 Some(projection),
172 &schema,
173 hive_partitions,
174 options.row_index.is_some(),
175 )
176 },
177 None => None,
178 };
179
180 let reader = <IpcReader<_> as SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))
181 .with_row_index(options.row_index)
182 .with_n_rows(options.row_limit)
183 .with_projection(projection);
184 reader.finish_with_scan_ops(options.predicate, verbose)
185 }
186
187 pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
188 let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
191 let bytes = MMapSemaphore::new_from_file(&file).unwrap();
192 get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
193 }
194}
195
196const FOOTER_METADATA_SIZE: usize = 10;
197
198fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {
201 let footer_size: usize =
202 i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))
203 .try_into()
204 .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
205
206 if &bytes[4..] != b"ARROW1" {
207 polars_bail!(oos = OutOfSpecKind::InvalidFooter);
208 }
209
210 Ok(footer_size)
211}