1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use super::array::*;
use super::datatypes::*;
use std::sync::Arc;
pub struct RecordBatch {
schema: Arc<Schema>,
columns: Vec<Arc<Array>>,
}
impl RecordBatch {
pub fn new(schema: Arc<Schema>, columns: Vec<ArrayRef>) -> Self {
assert!(
columns.len() > 0,
"at least one column must be defined to create a record batch"
);
let len = columns[0].data().len();
for i in 1..columns.len() {
assert_eq!(
len,
columns[i].len(),
"all columns in a record batch must have the same length"
);
}
RecordBatch { schema, columns }
}
pub fn schema(&self) -> &Arc<Schema> {
&self.schema
}
pub fn num_columns(&self) -> usize {
self.columns.len()
}
pub fn num_rows(&self) -> i64 {
self.columns[0].data().len()
}
pub fn column(&self, i: usize) -> &ArrayRef {
&self.columns[i]
}
}
unsafe impl Send for RecordBatch {}
unsafe impl Sync for RecordBatch {}
#[cfg(test)]
mod tests {
use super::*;
use array_data::*;
use buffer::*;
#[test]
fn create_record_batch() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]);
let v = vec![1, 2, 3, 4, 5];
let array_data = ArrayData::builder(DataType::Int32)
.len(5)
.add_buffer(Buffer::from(v.to_byte_slice()))
.build();
let a = PrimitiveArray::<i32>::from(array_data);
let v = vec![b'a', b'b', b'c', b'd', b'e'];
let offset_data = vec![0, 1, 2, 3, 4, 5, 6];
let array_data = ArrayData::builder(DataType::Utf8)
.len(5)
.add_buffer(Buffer::from(offset_data.to_byte_slice()))
.add_buffer(Buffer::from(v.to_byte_slice()))
.build();
let b = BinaryArray::from(array_data);
let record_batch = RecordBatch::new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]);
assert_eq!(5, record_batch.num_rows());
assert_eq!(2, record_batch.num_columns());
assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type());
assert_eq!(&DataType::Utf8, record_batch.schema().field(1).data_type());
assert_eq!(5, record_batch.column(0).data().len());
assert_eq!(5, record_batch.column(1).data().len());
}
}