1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use arrow::chunk::Chunk;
use arrow::io::ipc::read;
use arrow::io::ipc::read::{Dictionaries, FileMetadata};
use arrow::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
use memmap::Mmap;
use super::*;
use crate::mmap::MmapBytesReader;
use crate::utils::{apply_projection, columns_to_projection};
struct MMapChunkIter<'a> {
dictionaries: Dictionaries,
metadata: FileMetadata,
mmap: Arc<Mmap>,
idx: usize,
end: usize,
projection: &'a Option<Vec<usize>>,
}
impl<'a> MMapChunkIter<'a> {
fn new(
mmap: Mmap,
metadata: FileMetadata,
projection: &'a Option<Vec<usize>>,
) -> PolarsResult<Self> {
let mmap = Arc::new(mmap);
let end = metadata.blocks.len();
let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? };
Ok(Self {
dictionaries,
metadata,
mmap,
idx: 0,
end,
projection,
})
}
}
impl ArrowReader for MMapChunkIter<'_> {
fn next_record_batch(&mut self) -> ArrowResult<Option<ArrowChunk>> {
if self.idx < self.end {
let chunk = unsafe {
mmap_unchecked(
&self.metadata,
&self.dictionaries,
self.mmap.clone(),
self.idx,
)
}?;
self.idx += 1;
let chunk = match &self.projection {
None => chunk,
Some(proj) => {
let cols = chunk.into_arrays();
let arrays = proj.iter().map(|i| cols[*i].clone()).collect();
Chunk::new(arrays)
}
};
Ok(Some(chunk))
} else {
Ok(None)
}
}
}
impl<R: MmapBytesReader> IpcReader<R> {
pub(super) fn finish_memmapped(
&mut self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<DataFrame> {
match self.reader.to_file() {
Some(file) => {
let mmap = unsafe { memmap::Mmap::map(file).unwrap() };
let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?;
if let Some(columns) = &self.columns {
let schema = &metadata.schema;
let prj = columns_to_projection(columns, schema)?;
self.projection = Some(prj);
}
let schema = if let Some(projection) = &self.projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};
let reader = MMapChunkIter::new(mmap, metadata, &self.projection)?;
finish_reader(
reader,
false,
self.n_rows,
predicate,
&schema,
self.row_count.clone(),
)
}
None => Err(PolarsError::ComputeError(
"Cannot memory map, you must provide a file".into(),
)),
}
}
}