1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use std::fs::File;
use std::string::String;
use std::sync::{Arc, Mutex};
use arrow::csv;
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use crate::datasource::{RecordBatchIterator, ScanResult, TableProvider};
use crate::error::Result;
pub struct CsvFile {
filename: String,
schema: Arc<Schema>,
has_header: bool,
}
impl CsvFile {
#[allow(missing_docs)]
pub fn new(filename: &str, schema: &Schema, has_header: bool) -> Self {
Self {
filename: String::from(filename),
schema: Arc::new(schema.clone()),
has_header,
}
}
}
impl TableProvider for CsvFile {
fn schema(&self) -> &Arc<Schema> {
&self.schema
}
fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Result<Vec<ScanResult>> {
Ok(vec![Arc::new(Mutex::new(CsvBatchIterator::new(
&self.filename,
self.schema.clone(),
self.has_header,
projection,
batch_size,
)))])
}
}
pub struct CsvBatchIterator {
schema: Arc<Schema>,
reader: csv::Reader<File>,
}
impl CsvBatchIterator {
#[allow(missing_docs)]
pub fn new(
filename: &str,
schema: Arc<Schema>,
has_header: bool,
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Self {
let file = File::open(filename).unwrap();
let reader = csv::Reader::new(
file,
schema.clone(),
has_header,
batch_size,
projection.clone(),
);
let projected_schema = match projection {
Some(p) => {
let projected_fields: Vec<Field> =
p.iter().map(|i| schema.fields()[*i].clone()).collect();
Arc::new(Schema::new(projected_fields))
}
None => schema,
};
Self {
schema: projected_schema,
reader,
}
}
}
impl RecordBatchIterator for CsvBatchIterator {
fn schema(&self) -> &Arc<Schema> {
&self.schema
}
fn next(&mut self) -> Result<Option<RecordBatch>> {
Ok(self.reader.next()?)
}
}