1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use arrow::chunk::Chunk;
use arrow::io::ipc::read;
use arrow::io::ipc::read::{Dictionaries, FileMetadata};
use arrow::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
use memmap::Mmap;

use super::*;
use crate::mmap::MmapBytesReader;
use crate::utils::{apply_projection, columns_to_projection};

struct MMapChunkIter<'a> {
    dictionaries: Dictionaries,
    metadata: FileMetadata,
    mmap: Arc<Mmap>,
    idx: usize,
    end: usize,
    projection: &'a Option<Vec<usize>>,
}

impl<'a> MMapChunkIter<'a> {
    fn new(
        mmap: Mmap,
        metadata: FileMetadata,
        projection: &'a Option<Vec<usize>>,
    ) -> PolarsResult<Self> {
        let mmap = Arc::new(mmap);

        let end = metadata.blocks.len();
        // mmap the dictionaries
        let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? };

        Ok(Self {
            dictionaries,
            metadata,
            mmap,
            idx: 0,
            end,
            projection,
        })
    }
}

impl ArrowReader for MMapChunkIter<'_> {
    fn next_record_batch(&mut self) -> ArrowResult<Option<ArrowChunk>> {
        if self.idx < self.end {
            let chunk = unsafe {
                mmap_unchecked(
                    &self.metadata,
                    &self.dictionaries,
                    self.mmap.clone(),
                    self.idx,
                )
            }?;
            self.idx += 1;
            let chunk = match &self.projection {
                None => chunk,
                Some(proj) => {
                    let cols = chunk.into_arrays();
                    let arrays = proj.iter().map(|i| cols[*i].clone()).collect();
                    Chunk::new(arrays)
                }
            };
            Ok(Some(chunk))
        } else {
            Ok(None)
        }
    }
}

impl<R: MmapBytesReader> IpcReader<R> {
    pub(super) fn finish_memmapped(
        &mut self,
        predicate: Option<Arc<dyn PhysicalIoExpr>>,
    ) -> PolarsResult<DataFrame> {
        match self.reader.to_file() {
            Some(file) => {
                let mmap = unsafe { memmap::Mmap::map(file).unwrap() };
                let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?;

                if let Some(columns) = &self.columns {
                    let schema = &metadata.schema;
                    let prj = columns_to_projection(columns, schema)?;
                    self.projection = Some(prj);
                }

                let schema = if let Some(projection) = &self.projection {
                    apply_projection(&metadata.schema, projection)
                } else {
                    metadata.schema.clone()
                };

                let reader = MMapChunkIter::new(mmap, metadata, &self.projection)?;

                finish_reader(
                    reader,
                    // don't rechunk, that would trigger a read.
                    false,
                    self.n_rows,
                    predicate,
                    &schema,
                    self.row_count.clone(),
                )
            }
            None => Err(PolarsError::ComputeError(
                "Cannot memory map, you must provide a file".into(),
            )),
        }
    }
}