pub trait MmapBytesReader: Read + Seek + Send + Sync {
    fn to_file(&self) -> Option<&File> { ... }
    fn to_bytes(&self) -> Option<&[u8]> { ... }
}
Expand description

Trait used to get a hold to file handler or to the underlying bytes without performing a Read.

Provided Methods§

Examples found in repository?
src/mmap.rs (line 39)
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
    fn to_file(&self) -> Option<&File> {
        T::to_file(self)
    }

    fn to_bytes(&self) -> Option<&[u8]> {
        T::to_bytes(self)
    }
}

impl<T: MmapBytesReader> MmapBytesReader for &mut T {
    fn to_file(&self) -> Option<&File> {
        T::to_file(self)
    }

    fn to_bytes(&self) -> Option<&[u8]> {
        T::to_bytes(self)
    }
}

// Handle various forms of input bytes
pub enum ReaderBytes<'a> {
    Borrowed(&'a [u8]),
    Owned(Vec<u8>),
    Mapped(memmap::Mmap),
}

impl std::ops::Deref for ReaderBytes<'_> {
    type Target = [u8];
    fn deref(&self) -> &[u8] {
        match self {
            Self::Borrowed(ref_bytes) => ref_bytes,
            Self::Owned(vec) => vec,
            Self::Mapped(mmap) => mmap,
        }
    }
}

impl<'a, T: 'a + MmapBytesReader> From<&'a T> for ReaderBytes<'a> {
    fn from(m: &'a T) -> Self {
        match m.to_bytes() {
            Some(s) => ReaderBytes::Borrowed(s),
            None => {
                let f = m.to_file().unwrap();
                let mmap = unsafe { memmap::Mmap::map(f).unwrap() };
                ReaderBytes::Mapped(mmap)
            }
        }
    }
More examples
Hide additional examples
src/csv/utils.rs (line 63)
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
pub fn get_reader_bytes<R: Read + MmapBytesReader + ?Sized>(
    reader: &mut R,
) -> PolarsResult<ReaderBytes<'_>> {
    // we have a file so we can mmap
    if let Some(file) = reader.to_file() {
        let mmap = unsafe { memmap::Mmap::map(file)? };
        Ok(ReaderBytes::Mapped(mmap))
    } else {
        // we can get the bytes for free
        if reader.to_bytes().is_some() {
            // duplicate .to_bytes() is necessary to satisfy the borrow checker
            Ok(ReaderBytes::Borrowed((*reader).to_bytes().unwrap()))
        } else {
            // we have to read to an owned buffer to get the bytes.
            let mut bytes = Vec::with_capacity(1024 * 128);
            reader.read_to_end(&mut bytes)?;
            if !bytes.is_empty()
                && (bytes[bytes.len() - 1] != b'\n' || bytes[bytes.len() - 1] != b'\r')
            {
                bytes.push(b'\n')
            }
            Ok(ReaderBytes::Owned(bytes))
        }
    }
}
src/ipc/mmap.rs (line 75)
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
    pub(super) fn finish_memmapped(
        &mut self,
        predicate: Option<Arc<dyn PhysicalIoExpr>>,
    ) -> PolarsResult<DataFrame> {
        match self.reader.to_file() {
            Some(file) => {
                let mmap = unsafe { memmap::Mmap::map(file).unwrap() };
                let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?;

                if let Some(columns) = &self.columns {
                    let schema = &metadata.schema;
                    let prj = columns_to_projection(columns, schema)?;
                    self.projection = Some(prj);
                }

                let schema = if let Some(projection) = &self.projection {
                    apply_projection(&metadata.schema, projection)
                } else {
                    metadata.schema.clone()
                };

                let reader = MMapChunkIter::new(mmap, metadata, &self.projection)?;

                finish_reader(
                    reader,
                    // don't rechunk, that would trigger a read.
                    false,
                    self.n_rows,
                    predicate,
                    &schema,
                    self.row_count.clone(),
                )
            }
            None => Err(PolarsError::ComputeError(
                "Cannot memory map, you must provide a file".into(),
            )),
        }
    }
src/ipc/ipc_file.rs (line 146)
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
    pub fn finish_with_scan_ops(
        mut self,
        predicate: Option<Arc<dyn PhysicalIoExpr>>,
        verbose: bool,
    ) -> PolarsResult<DataFrame> {
        if self.memmap && self.reader.to_file().is_some() {
            if verbose {
                eprintln!("memory map ipc file")
            }
            match self.finish_memmapped(predicate.clone()) {
                Ok(df) => return Ok(df),
                Err(err) => match err {
                    PolarsError::ArrowError(e) => match e.as_ref() {
                        arrow::error::Error::NotYetImplemented(s)
                            if s == "mmap can only be done on uncompressed IPC files" =>
                        {
                            eprint!("could not mmap compressed IPC file, defaulting to normal read")
                        }
                        _ => return Err(PolarsError::ArrowError(e)),
                    },
                    err => return Err(err),
                },
            }
        }
        let rechunk = self.rechunk;
        let metadata = read::read_file_metadata(&mut self.reader)?;

        let schema = if let Some(projection) = &self.projection {
            apply_projection(&metadata.schema, projection)
        } else {
            metadata.schema.clone()
        };

        let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);

        finish_reader(reader, rechunk, None, predicate, &schema, self.row_count)
    }
}

impl<R: MmapBytesReader> ArrowReader for read::FileReader<R>
where
    R: Read + Seek,
{
    fn next_record_batch(&mut self) -> ArrowResult<Option<ArrowChunk>> {
        self.next().map_or(Ok(None), |v| v.map(Some))
    }
}

impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
    fn new(reader: R) -> Self {
        IpcReader {
            reader,
            rechunk: true,
            n_rows: None,
            columns: None,
            projection: None,
            row_count: None,
            memmap: true,
            metadata: None,
        }
    }

    fn set_rechunk(mut self, rechunk: bool) -> Self {
        self.rechunk = rechunk;
        self
    }

    fn finish(mut self) -> PolarsResult<DataFrame> {
        if self.memmap && self.reader.to_file().is_some() {
            match self.finish_memmapped(None) {
                Ok(df) => return Ok(df),
                Err(err) => match err {
                    PolarsError::ArrowError(e) => match e.as_ref() {
                        arrow::error::Error::NotYetImplemented(s)
                            if s == "mmap can only be done on uncompressed IPC files" =>
                        {
                            eprint!("could not mmap compressed IPC file, defaulting to normal read")
                        }
                        _ => return Err(PolarsError::ArrowError(e)),
                    },
                    err => return Err(err),
                },
            }
        }
        let rechunk = self.rechunk;
        let metadata = read::read_file_metadata(&mut self.reader)?;
        let schema = &metadata.schema;

        if let Some(columns) = &self.columns {
            let prj = columns_to_projection(columns, schema)?;
            self.projection = Some(prj);
        }

        let schema = if let Some(projection) = &self.projection {
            apply_projection(&metadata.schema, projection)
        } else {
            metadata.schema.clone()
        };

        let ipc_reader =
            read::FileReader::new(self.reader, metadata.clone(), self.projection, self.n_rows);
        finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_count)
    }
src/parquet/read_impl.rs (line 345)
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
    pub fn new(
        mut reader: Box<dyn MmapBytesReader>,
        limit: usize,
        projection: Option<Vec<usize>>,
        row_count: Option<RowCount>,
        chunk_size: usize,
    ) -> PolarsResult<Self> {
        let metadata = read::read_metadata(&mut reader)?;
        let schema = read::schema::infer_schema(&metadata)?;
        let n_row_groups = metadata.row_groups.len();
        let projection =
            projection.unwrap_or_else(|| (0usize..schema.fields.len()).collect::<Vec<_>>());

        let parallel =
            if n_row_groups > projection.len() || n_row_groups > POOL.current_num_threads() {
                ParallelStrategy::RowGroups
            } else {
                ParallelStrategy::Columns
            };

        // safety we will keep ownership on the struct and reference the bytes on the heap.
        // this should not work with passed bytes so we check if it is a file
        assert!(reader.to_file().is_some());
        let reader_ptr = unsafe {
            std::mem::transmute::<&mut dyn MmapBytesReader, &'static mut dyn MmapBytesReader>(
                reader.as_mut(),
            )
        };
        let reader_bytes = get_reader_bytes(reader_ptr)?;
        Ok(BatchedParquetReader {
            reader,
            reader_bytes,
            limit,
            projection,
            schema,
            metadata,
            row_count,
            rows_read: 0,
            row_group_offset: 0,
            n_row_groups,
            chunks_fifo: VecDeque::with_capacity(POOL.current_num_threads()),
            parallel,
            chunk_size,
        })
    }
Examples found in repository?
src/mmap.rs (line 43)
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
    fn to_bytes(&self) -> Option<&[u8]> {
        T::to_bytes(self)
    }
}

impl<T: MmapBytesReader> MmapBytesReader for &mut T {
    fn to_file(&self) -> Option<&File> {
        T::to_file(self)
    }

    fn to_bytes(&self) -> Option<&[u8]> {
        T::to_bytes(self)
    }
}

// Handle various forms of input bytes
pub enum ReaderBytes<'a> {
    Borrowed(&'a [u8]),
    Owned(Vec<u8>),
    Mapped(memmap::Mmap),
}

impl std::ops::Deref for ReaderBytes<'_> {
    type Target = [u8];
    fn deref(&self) -> &[u8] {
        match self {
            Self::Borrowed(ref_bytes) => ref_bytes,
            Self::Owned(vec) => vec,
            Self::Mapped(mmap) => mmap,
        }
    }
}

impl<'a, T: 'a + MmapBytesReader> From<&'a T> for ReaderBytes<'a> {
    fn from(m: &'a T) -> Self {
        match m.to_bytes() {
            Some(s) => ReaderBytes::Borrowed(s),
            None => {
                let f = m.to_file().unwrap();
                let mmap = unsafe { memmap::Mmap::map(f).unwrap() };
                ReaderBytes::Mapped(mmap)
            }
        }
    }
More examples
Hide additional examples
src/csv/utils.rs (line 68)
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
pub fn get_reader_bytes<R: Read + MmapBytesReader + ?Sized>(
    reader: &mut R,
) -> PolarsResult<ReaderBytes<'_>> {
    // we have a file so we can mmap
    if let Some(file) = reader.to_file() {
        let mmap = unsafe { memmap::Mmap::map(file)? };
        Ok(ReaderBytes::Mapped(mmap))
    } else {
        // we can get the bytes for free
        if reader.to_bytes().is_some() {
            // duplicate .to_bytes() is necessary to satisfy the borrow checker
            Ok(ReaderBytes::Borrowed((*reader).to_bytes().unwrap()))
        } else {
            // we have to read to an owned buffer to get the bytes.
            let mut bytes = Vec::with_capacity(1024 * 128);
            reader.read_to_end(&mut bytes)?;
            if !bytes.is_empty()
                && (bytes[bytes.len() - 1] != b'\n' || bytes[bytes.len() - 1] != b'\r')
            {
                bytes.push(b'\n')
            }
            Ok(ReaderBytes::Owned(bytes))
        }
    }
}

Implementations on Foreign Types§

Implementors§