1use std::sync::Arc;
10
11pub const BATCH_SIZE: usize = 2048;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum ColumnKind {
20 Int64,
21 Float64,
22 Bool,
23 Text,
24}
25
26#[derive(Debug, Clone)]
27pub struct Field {
28 pub name: String,
29 pub kind: ColumnKind,
30 pub nullable: bool,
33}
34
35#[derive(Debug, Clone)]
36pub struct Schema {
37 fields: Vec<Field>,
38}
39
40impl Schema {
41 pub fn new(fields: Vec<Field>) -> Self {
42 Self { fields }
43 }
44
45 pub fn fields(&self) -> &[Field] {
46 &self.fields
47 }
48
49 pub fn index_of(&self, name: &str) -> Option<usize> {
50 self.fields.iter().position(|f| f.name == name)
51 }
52
53 pub fn field(&self, idx: usize) -> Option<&Field> {
54 self.fields.get(idx)
55 }
56
57 pub fn len(&self) -> usize {
58 self.fields.len()
59 }
60
61 pub fn is_empty(&self) -> bool {
62 self.fields.is_empty()
63 }
64
65 pub fn with_subset(&self, indices: &[usize]) -> Self {
66 Self {
67 fields: indices
68 .iter()
69 .filter_map(|i| self.fields.get(*i).cloned())
70 .collect(),
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
78pub enum ColumnVector {
79 Int64 {
80 data: Vec<i64>,
81 validity: Option<Vec<bool>>,
82 },
83 Float64 {
84 data: Vec<f64>,
85 validity: Option<Vec<bool>>,
86 },
87 Bool {
88 data: Vec<bool>,
89 validity: Option<Vec<bool>>,
90 },
91 Text {
92 data: Vec<String>,
93 validity: Option<Vec<bool>>,
94 },
95}
96
97impl ColumnVector {
98 pub fn len(&self) -> usize {
99 match self {
100 ColumnVector::Int64 { data, .. } => data.len(),
101 ColumnVector::Float64 { data, .. } => data.len(),
102 ColumnVector::Bool { data, .. } => data.len(),
103 ColumnVector::Text { data, .. } => data.len(),
104 }
105 }
106
107 pub fn is_empty(&self) -> bool {
108 self.len() == 0
109 }
110
111 pub fn kind(&self) -> ColumnKind {
112 match self {
113 ColumnVector::Int64 { .. } => ColumnKind::Int64,
114 ColumnVector::Float64 { .. } => ColumnKind::Float64,
115 ColumnVector::Bool { .. } => ColumnKind::Bool,
116 ColumnVector::Text { .. } => ColumnKind::Text,
117 }
118 }
119
120 pub fn is_valid(&self, idx: usize) -> bool {
121 let validity = match self {
122 ColumnVector::Int64 { validity, .. } => validity.as_ref(),
123 ColumnVector::Float64 { validity, .. } => validity.as_ref(),
124 ColumnVector::Bool { validity, .. } => validity.as_ref(),
125 ColumnVector::Text { validity, .. } => validity.as_ref(),
126 };
127 validity
128 .map(|v| v.get(idx).copied().unwrap_or(false))
129 .unwrap_or(true)
130 }
131
132 pub fn take_indices(&self, indices: &[usize]) -> ColumnVector {
133 match self {
134 ColumnVector::Int64 { data, validity } => {
135 let new_data: Vec<i64> = indices.iter().map(|i| data[*i]).collect();
136 let new_validity = validity.as_ref().map(|v| {
137 indices
138 .iter()
139 .map(|i| *v.get(*i).unwrap_or(&true))
140 .collect()
141 });
142 ColumnVector::Int64 {
143 data: new_data,
144 validity: new_validity,
145 }
146 }
147 ColumnVector::Float64 { data, validity } => {
148 let new_data: Vec<f64> = indices.iter().map(|i| data[*i]).collect();
149 let new_validity = validity.as_ref().map(|v| {
150 indices
151 .iter()
152 .map(|i| *v.get(*i).unwrap_or(&true))
153 .collect()
154 });
155 ColumnVector::Float64 {
156 data: new_data,
157 validity: new_validity,
158 }
159 }
160 ColumnVector::Bool { data, validity } => {
161 let new_data: Vec<bool> = indices.iter().map(|i| data[*i]).collect();
162 let new_validity = validity.as_ref().map(|v| {
163 indices
164 .iter()
165 .map(|i| *v.get(*i).unwrap_or(&true))
166 .collect()
167 });
168 ColumnVector::Bool {
169 data: new_data,
170 validity: new_validity,
171 }
172 }
173 ColumnVector::Text { data, validity } => {
174 let new_data: Vec<String> = indices.iter().map(|i| data[*i].clone()).collect();
175 let new_validity = validity.as_ref().map(|v| {
176 indices
177 .iter()
178 .map(|i| *v.get(*i).unwrap_or(&true))
179 .collect()
180 });
181 ColumnVector::Text {
182 data: new_data,
183 validity: new_validity,
184 }
185 }
186 }
187 }
188}
189
190#[derive(Debug, Clone, PartialEq)]
193pub enum ValueRef<'a> {
194 Int64(i64),
195 Float64(f64),
196 Bool(bool),
197 Text(&'a str),
198 Null,
199}
200
201impl<'a> ValueRef<'a> {
202 pub fn as_i64(&self) -> Option<i64> {
203 match self {
204 ValueRef::Int64(v) => Some(*v),
205 _ => None,
206 }
207 }
208
209 pub fn as_f64(&self) -> Option<f64> {
210 match self {
211 ValueRef::Float64(v) => Some(*v),
212 ValueRef::Int64(v) => Some(*v as f64),
213 _ => None,
214 }
215 }
216
217 pub fn as_bool(&self) -> Option<bool> {
218 match self {
219 ValueRef::Bool(v) => Some(*v),
220 _ => None,
221 }
222 }
223
224 pub fn as_str(&self) -> Option<&str> {
225 match self {
226 ValueRef::Text(s) => Some(s),
227 _ => None,
228 }
229 }
230
231 pub fn is_null(&self) -> bool {
232 matches!(self, ValueRef::Null)
233 }
234}
235
236#[derive(Debug, Clone)]
237pub struct ColumnBatch {
238 pub schema: Arc<Schema>,
239 pub columns: Vec<ColumnVector>,
240 pub len: usize,
241}
242
243impl ColumnBatch {
244 pub fn new(schema: Arc<Schema>, columns: Vec<ColumnVector>) -> Self {
245 let len = columns.first().map(|c| c.len()).unwrap_or(0);
246 debug_assert!(
247 columns.iter().all(|c| c.len() == len),
248 "column lengths diverge in batch construction"
249 );
250 debug_assert_eq!(
251 schema.len(),
252 columns.len(),
253 "schema / column count mismatch"
254 );
255 Self {
256 schema,
257 columns,
258 len,
259 }
260 }
261
262 pub fn empty(schema: Arc<Schema>) -> Self {
263 let columns = schema
264 .fields()
265 .iter()
266 .map(|f| match f.kind {
267 ColumnKind::Int64 => ColumnVector::Int64 {
268 data: Vec::new(),
269 validity: None,
270 },
271 ColumnKind::Float64 => ColumnVector::Float64 {
272 data: Vec::new(),
273 validity: None,
274 },
275 ColumnKind::Bool => ColumnVector::Bool {
276 data: Vec::new(),
277 validity: None,
278 },
279 ColumnKind::Text => ColumnVector::Text {
280 data: Vec::new(),
281 validity: None,
282 },
283 })
284 .collect();
285 Self {
286 schema,
287 columns,
288 len: 0,
289 }
290 }
291
292 pub fn len(&self) -> usize {
293 self.len
294 }
295
296 pub fn is_empty(&self) -> bool {
297 self.len == 0
298 }
299
300 pub fn value(&self, row: usize, column: usize) -> ValueRef<'_> {
302 if row >= self.len || column >= self.columns.len() {
303 return ValueRef::Null;
304 }
305 let col = &self.columns[column];
306 if !col.is_valid(row) {
307 return ValueRef::Null;
308 }
309 match col {
310 ColumnVector::Int64 { data, .. } => ValueRef::Int64(data[row]),
311 ColumnVector::Float64 { data, .. } => ValueRef::Float64(data[row]),
312 ColumnVector::Bool { data, .. } => ValueRef::Bool(data[row]),
313 ColumnVector::Text { data, .. } => ValueRef::Text(data[row].as_str()),
314 }
315 }
316
317 pub fn take(&self, indices: &[usize]) -> ColumnBatch {
319 let columns = self
320 .columns
321 .iter()
322 .map(|c| c.take_indices(indices))
323 .collect();
324 ColumnBatch {
325 schema: Arc::clone(&self.schema),
326 columns,
327 len: indices.len(),
328 }
329 }
330
331 pub fn project(&self, indices: &[usize]) -> ColumnBatch {
333 let new_schema = Arc::new(self.schema.with_subset(indices));
334 let columns = indices
335 .iter()
336 .filter_map(|i| self.columns.get(*i).cloned())
337 .collect();
338 ColumnBatch {
339 schema: new_schema,
340 columns,
341 len: self.len,
342 }
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349
350 fn simple_schema() -> Arc<Schema> {
351 Arc::new(Schema::new(vec![
352 Field {
353 name: "id".into(),
354 kind: ColumnKind::Int64,
355 nullable: false,
356 },
357 Field {
358 name: "value".into(),
359 kind: ColumnKind::Float64,
360 nullable: false,
361 },
362 Field {
363 name: "name".into(),
364 kind: ColumnKind::Text,
365 nullable: true,
366 },
367 ]))
368 }
369
370 fn batch_of(n: usize) -> ColumnBatch {
371 let schema = simple_schema();
372 let ids: Vec<i64> = (0..n as i64).collect();
373 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
374 let names: Vec<String> = (0..n).map(|i| format!("row-{i}")).collect();
375 ColumnBatch::new(
376 schema,
377 vec![
378 ColumnVector::Int64 {
379 data: ids,
380 validity: None,
381 },
382 ColumnVector::Float64 {
383 data: values,
384 validity: None,
385 },
386 ColumnVector::Text {
387 data: names,
388 validity: None,
389 },
390 ],
391 )
392 }
393
394 #[test]
395 fn schema_lookup_by_name_returns_index() {
396 let s = simple_schema();
397 assert_eq!(s.index_of("id"), Some(0));
398 assert_eq!(s.index_of("value"), Some(1));
399 assert_eq!(s.index_of("missing"), None);
400 }
401
402 #[test]
403 fn value_access_by_row_and_column() {
404 let b = batch_of(5);
405 assert_eq!(b.value(0, 0), ValueRef::Int64(0));
406 assert_eq!(b.value(3, 1), ValueRef::Float64(4.5));
407 assert_eq!(b.value(4, 2), ValueRef::Text("row-4"));
408 }
409
410 #[test]
411 fn value_out_of_range_yields_null() {
412 let b = batch_of(3);
413 assert!(b.value(99, 0).is_null());
414 assert!(b.value(0, 99).is_null());
415 }
416
417 #[test]
418 fn take_produces_reduced_batch_preserving_schema() {
419 let b = batch_of(10);
420 let taken = b.take(&[0, 2, 4]);
421 assert_eq!(taken.len(), 3);
422 assert_eq!(taken.value(1, 0), ValueRef::Int64(2));
423 assert_eq!(taken.value(2, 1), ValueRef::Float64(6.0));
424 }
425
426 #[test]
427 fn project_drops_unwanted_columns() {
428 let b = batch_of(4);
429 let p = b.project(&[0, 2]);
430 assert_eq!(p.schema.len(), 2);
431 assert_eq!(p.schema.index_of("value"), None);
432 assert_eq!(p.value(2, 0), ValueRef::Int64(2));
433 }
434
435 #[test]
436 fn validity_bits_mask_nulls() {
437 let col = ColumnVector::Int64 {
438 data: vec![1, 2, 3],
439 validity: Some(vec![true, false, true]),
440 };
441 assert!(col.is_valid(0));
442 assert!(!col.is_valid(1));
443 assert!(col.is_valid(2));
444 }
445
446 #[test]
447 fn batch_size_constant_is_power_of_two() {
448 assert_eq!(BATCH_SIZE & (BATCH_SIZE - 1), 0);
449 assert!(BATCH_SIZE >= 1024);
450 }
451}