Skip to main content

polars_io/utils/
other.rs

1use std::io::Read;
2#[cfg(target_os = "emscripten")]
3use std::io::{Seek, SeekFrom};
4
5use polars_buffer::Buffer;
6use polars_core::prelude::*;
7use polars_utils::mmap::MMapSemaphore;
8
9use crate::mmap::{MmapBytesReader, ReaderBytes};
10
11pub fn get_reader_bytes<R: Read + MmapBytesReader + ?Sized>(
12    reader: &mut R,
13) -> PolarsResult<ReaderBytes<'_>> {
14    // we have a file so we can mmap
15    // only seekable files are mmap-able
16    if let Some((file, offset)) = reader
17        .stream_position()
18        .ok()
19        .and_then(|offset| Some((reader.to_file()?, offset)))
20    {
21        let mut options = memmap::MmapOptions::new();
22        options.offset(offset);
23
24        // Set mmap size based on seek to end when running under Emscripten
25        #[cfg(target_os = "emscripten")]
26        {
27            let mut file = file;
28            let size = file.seek(SeekFrom::End(0)).unwrap();
29            options.len((size - offset) as usize);
30        }
31
32        let mmap = MMapSemaphore::new_from_file_with_options(file, options)?;
33        Ok(ReaderBytes::Owned(Buffer::from_owner(mmap)))
34    } else {
35        // we can get the bytes for free
36        if reader.to_bytes().is_some() {
37            // duplicate .to_bytes() is necessary to satisfy the borrow checker
38            Ok(ReaderBytes::Borrowed((*reader).to_bytes().unwrap()))
39        } else {
40            // we have to read to an owned buffer to get the bytes.
41            let mut bytes = Vec::with_capacity(1024 * 128);
42            reader.read_to_end(&mut bytes)?;
43            Ok(ReaderBytes::Owned(bytes.into()))
44        }
45    }
46}
47
48#[cfg(any(
49    feature = "ipc",
50    feature = "ipc_streaming",
51    feature = "parquet",
52    feature = "avro"
53))]
54pub fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> ArrowSchema {
55    projection
56        .iter()
57        .map(|idx| schema.get_at_index(*idx).unwrap())
58        .map(|(k, v)| (k.clone(), v.clone()))
59        .collect()
60}
61
62#[cfg(any(
63    feature = "ipc",
64    feature = "ipc_streaming",
65    feature = "avro",
66    feature = "parquet"
67))]
68pub fn columns_to_projection<T: AsRef<str>>(
69    columns: &[T],
70    schema: &ArrowSchema,
71) -> PolarsResult<Vec<usize>> {
72    let mut prj = Vec::with_capacity(columns.len());
73
74    for column in columns {
75        let i = schema.try_index_of(column.as_ref())?;
76        prj.push(i);
77    }
78
79    Ok(prj)
80}
81
82#[cfg(debug_assertions)]
83fn check_offsets(dfs: &[DataFrame]) {
84    dfs.windows(2).for_each(|s| {
85        let a = &s[0].columns()[0];
86        let b = &s[1].columns()[0];
87
88        let prev = a.get(a.len() - 1).unwrap().extract::<usize>().unwrap();
89        let next = b.get(0).unwrap().extract::<usize>().unwrap();
90        assert_eq!(prev + 1, next);
91    })
92}
93
94/// Because of threading every row starts from `0` or from `offset`.
95/// We must correct that so that they are monotonically increasing.
96#[cfg(any(feature = "csv", feature = "json"))]
97pub(crate) fn update_row_counts2(dfs: &mut [DataFrame], offset: IdxSize) {
98    if !dfs.is_empty() {
99        let mut previous = offset;
100        for df in &mut *dfs {
101            if df.shape_has_zero() {
102                continue;
103            }
104            let n_read = df.height() as IdxSize;
105            if let Some(s) = unsafe { df.columns_mut_retain_schema() }.get_mut(0) {
106                if let Ok(v) = s.get(0) {
107                    if v.extract::<usize>().unwrap() != previous as usize {
108                        *s = &*s + previous;
109                    }
110                }
111            }
112            previous += n_read;
113        }
114    }
115    #[cfg(debug_assertions)]
116    {
117        check_offsets(dfs)
118    }
119}
120
121/// Because of threading every row starts from `0` or from `offset`.
122/// We must correct that so that they are monotonically increasing.
123#[cfg(feature = "json")]
124pub(crate) fn update_row_counts3(dfs: &mut [DataFrame], heights: &[IdxSize], offset: IdxSize) {
125    assert_eq!(dfs.len(), heights.len());
126    if !dfs.is_empty() {
127        let mut previous = offset;
128        for i in 0..dfs.len() {
129            let df = &mut dfs[i];
130            if df.shape_has_zero() {
131                continue;
132            }
133
134            if let Some(s) = unsafe { df.columns_mut_retain_schema() }.get_mut(0) {
135                if let Ok(v) = s.get(0) {
136                    if v.extract::<usize>().unwrap() != previous as usize {
137                        *s = &*s + previous;
138                    }
139                }
140            }
141            let n_read = heights[i];
142            previous += n_read;
143        }
144    }
145}
146
147#[cfg(feature = "json")]
148pub fn overwrite_schema(schema: &mut Schema, overwriting_schema: &Schema) -> PolarsResult<()> {
149    for (k, value) in overwriting_schema.iter() {
150        *schema.try_get_mut(k)? = value.clone();
151    }
152    Ok(())
153}
154
155polars_utils::regex_cache::cached_regex! {
156    pub static FLOAT_RE = r"^[-+]?((\d*\.\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+\.)$";
157    pub static FLOAT_RE_DECIMAL = r"^[-+]?((\d*,\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+,)$";
158    pub static INTEGER_RE = r"^-?(\d+)$";
159    pub static BOOLEAN_RE = r"^(?i:true|false)$";
160}
161
162pub fn materialize_projection(
163    with_columns: Option<&[PlSmallStr]>,
164    schema: &Schema,
165    hive_partitions: Option<&[Series]>,
166    has_row_index: bool,
167) -> Option<Vec<usize>> {
168    match hive_partitions {
169        None => with_columns.map(|with_columns| {
170            with_columns
171                .iter()
172                .map(|name| schema.index_of(name).unwrap() - has_row_index as usize)
173                .collect()
174        }),
175        Some(part_cols) => {
176            with_columns.map(|with_columns| {
177                with_columns
178                    .iter()
179                    .flat_map(|name| {
180                        // the hive partitions are added at the end of the schema, but we don't want to project
181                        // them from the file
182                        if part_cols.iter().any(|s| s.name() == name.as_str()) {
183                            None
184                        } else {
185                            Some(schema.index_of(name).unwrap() - has_row_index as usize)
186                        }
187                    })
188                    .collect()
189            })
190        },
191    }
192}
193
194/// Utility for decoding JSON that adds the response value to the error message if decoding fails.
195/// This makes it much easier to debug errors from parsing network responses.
196#[cfg(feature = "cloud")]
197pub fn decode_json_response<T>(bytes: &[u8]) -> PolarsResult<T>
198where
199    T: for<'de> serde::de::Deserialize<'de>,
200{
201    use polars_error::to_compute_err;
202    use polars_utils::error::TruncateErrorDetail;
203
204    serde_json::from_slice(bytes)
205        .map_err(to_compute_err)
206        .map_err(|e| {
207            e.wrap_msg(|e| {
208                format!(
209                    "error decoding response: {}, response value: {}",
210                    e,
211                    TruncateErrorDetail(&String::from_utf8_lossy(bytes))
212                )
213            })
214        })
215}
216
217#[cfg(test)]
218mod tests {
219    use super::FLOAT_RE;
220
221    #[test]
222    fn test_float_parse() {
223        assert!(FLOAT_RE.is_match("0.1"));
224        assert!(FLOAT_RE.is_match("3.0"));
225        assert!(FLOAT_RE.is_match("3.00001"));
226        assert!(FLOAT_RE.is_match("-9.9990e-003"));
227        assert!(FLOAT_RE.is_match("9.9990e+003"));
228        assert!(FLOAT_RE.is_match("9.9990E+003"));
229        assert!(FLOAT_RE.is_match("9.9990E+003"));
230        assert!(FLOAT_RE.is_match(".5"));
231        assert!(FLOAT_RE.is_match("2.5E-10"));
232        assert!(FLOAT_RE.is_match("2.5e10"));
233        assert!(FLOAT_RE.is_match("NaN"));
234        assert!(FLOAT_RE.is_match("-NaN"));
235        assert!(FLOAT_RE.is_match("-inf"));
236        assert!(FLOAT_RE.is_match("inf"));
237        assert!(FLOAT_RE.is_match("-7e-05"));
238        assert!(FLOAT_RE.is_match("7e-05"));
239        assert!(FLOAT_RE.is_match("+7e+05"));
240    }
241}