1use crate::error::{ClientError, Result};
27use byteorder::{ByteOrder, LittleEndian};
28use std::marker::PhantomData;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
32pub enum FieldType {
33 Int64,
34 UInt64,
35 Float64,
36 String,
37 Bytes,
38 Bool,
39}
40
41#[derive(Debug, Clone)]
43pub struct ColumnRef {
44 pub id: usize,
45 pub name: String,
46 pub field_type: FieldType,
47}
48
49#[derive(Debug, Clone)]
51pub struct ArraySchema {
52 pub name: String,
53 pub fields: Vec<String>,
54 pub types: Vec<FieldType>,
55}
56
57#[derive(Debug)]
59pub struct TypedColumn<'a, T> {
60 data: &'a [u8],
61 count: usize,
62 _marker: PhantomData<T>,
63}
64
65impl<'a> TypedColumn<'a, i64> {
66 pub fn from_bytes(data: &'a [u8], count: usize) -> Self {
68 Self {
69 data,
70 count,
71 _marker: PhantomData,
72 }
73 }
74
75 #[inline]
77 pub fn get(&self, index: usize) -> Option<i64> {
78 if index >= self.count {
79 return None;
80 }
81 let offset = index * 8;
82 if offset + 8 > self.data.len() {
83 return None;
84 }
85 Some(LittleEndian::read_i64(&self.data[offset..offset + 8]))
86 }
87
88 pub fn to_vec(&self) -> Vec<i64> {
90 (0..self.count).filter_map(|i| self.get(i)).collect()
91 }
92
93 pub fn sum(&self) -> i64 {
95 (0..self.count).filter_map(|i| self.get(i)).sum()
96 }
97
98 pub fn min(&self) -> Option<i64> {
100 (0..self.count).filter_map(|i| self.get(i)).min()
101 }
102
103 pub fn max(&self) -> Option<i64> {
105 (0..self.count).filter_map(|i| self.get(i)).max()
106 }
107
108 pub fn avg(&self) -> Option<f64> {
110 if self.count == 0 {
111 return None;
112 }
113 let sum: i64 = self.sum();
114 Some(sum as f64 / self.count as f64)
115 }
116
117 pub fn len(&self) -> usize {
119 self.count
120 }
121
122 pub fn is_empty(&self) -> bool {
124 self.count == 0
125 }
126
127 pub fn iter(&self) -> impl Iterator<Item = i64> + '_ {
129 (0..self.count).filter_map(move |i| self.get(i))
130 }
131}
132
133impl<'a> TypedColumn<'a, f64> {
134 pub fn from_bytes(data: &'a [u8], count: usize) -> Self {
136 Self {
137 data,
138 count,
139 _marker: PhantomData,
140 }
141 }
142
143 #[inline]
145 pub fn get(&self, index: usize) -> Option<f64> {
146 if index >= self.count {
147 return None;
148 }
149 let offset = index * 8;
150 if offset + 8 > self.data.len() {
151 return None;
152 }
153 Some(LittleEndian::read_f64(&self.data[offset..offset + 8]))
154 }
155
156 pub fn to_vec(&self) -> Vec<f64> {
158 (0..self.count).filter_map(|i| self.get(i)).collect()
159 }
160
161 pub fn sum(&self) -> f64 {
163 (0..self.count).filter_map(|i| self.get(i)).sum()
164 }
165
166 pub fn min(&self) -> Option<f64> {
168 (0..self.count)
169 .filter_map(|i| self.get(i))
170 .fold(None, |acc, x| {
171 Some(match acc {
172 None => x,
173 Some(min) => {
174 if x < min {
175 x
176 } else {
177 min
178 }
179 }
180 })
181 })
182 }
183
184 pub fn max(&self) -> Option<f64> {
186 (0..self.count)
187 .filter_map(|i| self.get(i))
188 .fold(None, |acc, x| {
189 Some(match acc {
190 None => x,
191 Some(max) => {
192 if x > max {
193 x
194 } else {
195 max
196 }
197 }
198 })
199 })
200 }
201
202 pub fn avg(&self) -> Option<f64> {
204 if self.count == 0 {
205 return None;
206 }
207 let sum: f64 = self.sum();
208 Some(sum / self.count as f64)
209 }
210
211 pub fn std_dev(&self) -> Option<f64> {
213 let avg = self.avg()?;
214 let variance: f64 = (0..self.count)
215 .filter_map(|i| self.get(i))
216 .map(|x| (x - avg).powi(2))
217 .sum::<f64>()
218 / self.count as f64;
219 Some(variance.sqrt())
220 }
221
222 pub fn len(&self) -> usize {
224 self.count
225 }
226
227 pub fn is_empty(&self) -> bool {
229 self.count == 0
230 }
231
232 pub fn iter(&self) -> impl Iterator<Item = f64> + '_ {
234 (0..self.count).filter_map(move |i| self.get(i))
235 }
236}
237
238pub struct ColumnView<'a> {
240 schema: &'a ArraySchema,
241 columns: &'a [ColumnRef],
242 _data: PhantomData<&'a [u8]>,
243}
244
245impl<'a> ColumnView<'a> {
246 pub fn new(schema: &'a ArraySchema, columns: &'a [ColumnRef]) -> Self {
248 Self {
249 schema,
250 columns,
251 _data: PhantomData,
252 }
253 }
254
255 #[allow(dead_code)]
257 pub fn schema_name(&self) -> &str {
258 &self.schema.name
259 }
260
261 pub fn column_count(&self) -> usize {
263 self.columns.len()
264 }
265
266 #[allow(dead_code)]
268 pub fn column_names(&self) -> Vec<&str> {
269 self.columns.iter().map(|c| c.name.as_str()).collect()
270 }
271
272 pub fn get_column(&self, name: &str) -> Option<&ColumnRef> {
274 self.columns.iter().find(|c| c.name == name)
275 }
276
277 pub fn simd_groups(&self) -> Vec<Vec<&str>> {
279 use std::collections::HashMap;
280
281 let mut groups: HashMap<FieldType, Vec<&str>> = HashMap::new();
282 for col in self.columns {
283 groups.entry(col.field_type).or_default().push(&col.name);
284 }
285 groups.into_values().collect()
286 }
287
288 pub fn column_i64(
290 &self,
291 name: &str,
292 data: &'a [u8],
293 count: usize,
294 ) -> Result<TypedColumn<'a, i64>> {
295 let col = self
296 .get_column(name)
297 .ok_or_else(|| ClientError::NotFound(format!("Column '{}' not found", name)))?;
298
299 if col.field_type != FieldType::Int64 && col.field_type != FieldType::UInt64 {
300 return Err(ClientError::TypeMismatch {
301 expected: "Int64".to_string(),
302 actual: format!("{:?}", col.field_type),
303 });
304 }
305
306 Ok(TypedColumn::<i64>::from_bytes(data, count))
307 }
308
309 pub fn column_f64(
311 &self,
312 name: &str,
313 data: &'a [u8],
314 count: usize,
315 ) -> Result<TypedColumn<'a, f64>> {
316 let col = self
317 .get_column(name)
318 .ok_or_else(|| ClientError::NotFound(format!("Column '{}' not found", name)))?;
319
320 if col.field_type != FieldType::Float64 {
321 return Err(ClientError::TypeMismatch {
322 expected: "Float64".to_string(),
323 actual: format!("{:?}", col.field_type),
324 });
325 }
326
327 Ok(TypedColumn::<f64>::from_bytes(data, count))
328 }
329}
330
331pub trait ColumnAccess {
333 fn row_count(&self) -> usize;
334 fn col_count(&self) -> usize;
335 fn field_names(&self) -> Vec<&str>;
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use byteorder::WriteBytesExt;
342
343 fn i64_to_bytes(values: &[i64]) -> Vec<u8> {
345 let mut bytes = Vec::with_capacity(values.len() * 8);
346 for &v in values {
347 bytes.write_i64::<LittleEndian>(v).unwrap();
348 }
349 bytes
350 }
351
352 fn f64_to_bytes(values: &[f64]) -> Vec<u8> {
354 let mut bytes = Vec::with_capacity(values.len() * 8);
355 for &v in values {
356 bytes.write_f64::<LittleEndian>(v).unwrap();
357 }
358 bytes
359 }
360
361 #[test]
362 fn test_typed_column_i64() {
363 let data: Vec<i64> = vec![1, 2, 3, 4, 5];
364 let bytes = i64_to_bytes(&data);
365
366 let col = TypedColumn::<i64>::from_bytes(&bytes, 5);
367
368 assert_eq!(col.sum(), 15);
369 assert_eq!(col.min(), Some(1));
370 assert_eq!(col.max(), Some(5));
371 assert!((col.avg().unwrap() - 3.0).abs() < 0.001);
372 }
373
374 #[test]
375 fn test_typed_column_f64() {
376 let data: Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0];
377 let bytes = f64_to_bytes(&data);
378
379 let col = TypedColumn::<f64>::from_bytes(&bytes, 5);
380
381 assert!((col.sum() - 15.0).abs() < 0.001);
382 assert!((col.avg().unwrap() - 3.0).abs() < 0.001);
383 assert!(col.std_dev().is_some());
384 }
385
386 #[test]
387 fn test_safe_column_access_i64() {
388 let values = vec![100i64, 200, 300, -400, 500];
390 let bytes = i64_to_bytes(&values);
391
392 let col = TypedColumn::<i64>::from_bytes(&bytes, 5);
393
394 assert_eq!(col.get(0), Some(100));
396 assert_eq!(col.get(1), Some(200));
397 assert_eq!(col.get(2), Some(300));
398 assert_eq!(col.get(3), Some(-400));
399 assert_eq!(col.get(4), Some(500));
400 assert_eq!(col.get(5), None); assert_eq!(col.to_vec(), values);
404
405 let collected: Vec<i64> = col.iter().collect();
407 assert_eq!(collected, values);
408 }
409
410 #[test]
411 fn test_safe_column_access_f64() {
412 let values = vec![1.5f64, 2.5, 3.5, -4.5, 5.5];
414 let bytes = f64_to_bytes(&values);
415
416 let col = TypedColumn::<f64>::from_bytes(&bytes, 5);
417
418 assert!((col.get(0).unwrap() - 1.5).abs() < 0.001);
420 assert!((col.get(1).unwrap() - 2.5).abs() < 0.001);
421 assert!((col.get(2).unwrap() - 3.5).abs() < 0.001);
422 assert!((col.get(3).unwrap() - (-4.5)).abs() < 0.001);
423 assert!((col.get(4).unwrap() - 5.5).abs() < 0.001);
424 assert_eq!(col.get(5), None); for (a, b) in col.to_vec().iter().zip(values.iter()) {
428 assert!((a - b).abs() < 0.001);
429 }
430 }
431
432 #[test]
433 fn test_safe_column_misaligned_data() {
434 let values = vec![42i64, 84, 126];
437 let aligned_bytes = i64_to_bytes(&values);
438
439 let mut misaligned = vec![0u8];
441 misaligned.extend_from_slice(&aligned_bytes);
442
443 let col = TypedColumn::<i64>::from_bytes(&misaligned[1..], 3);
445
446 assert_eq!(col.get(0), Some(42));
448 assert_eq!(col.get(1), Some(84));
449 assert_eq!(col.get(2), Some(126));
450 assert_eq!(col.sum(), 252);
451 }
452
453 #[test]
454 fn test_empty_column() {
455 let bytes: Vec<u8> = vec![];
456
457 let i64_col = TypedColumn::<i64>::from_bytes(&bytes, 0);
458 assert!(i64_col.is_empty());
459 assert_eq!(i64_col.len(), 0);
460 assert_eq!(i64_col.sum(), 0);
461 assert_eq!(i64_col.min(), None);
462 assert_eq!(i64_col.max(), None);
463 assert_eq!(i64_col.avg(), None);
464
465 let f64_col = TypedColumn::<f64>::from_bytes(&bytes, 0);
466 assert!(f64_col.is_empty());
467 assert_eq!(f64_col.len(), 0);
468 assert!((f64_col.sum() - 0.0).abs() < 0.001);
469 assert_eq!(f64_col.min(), None);
470 assert_eq!(f64_col.max(), None);
471 assert_eq!(f64_col.avg(), None);
472 }
473
474 #[test]
475 fn test_column_view() {
476 let schema = ArraySchema {
477 name: "test".to_string(),
478 fields: vec!["a".to_string(), "b".to_string()],
479 types: vec![FieldType::Int64, FieldType::Float64],
480 };
481
482 let columns = vec![
483 ColumnRef {
484 id: 0,
485 name: "a".to_string(),
486 field_type: FieldType::Int64,
487 },
488 ColumnRef {
489 id: 1,
490 name: "b".to_string(),
491 field_type: FieldType::Float64,
492 },
493 ];
494
495 let view = ColumnView::new(&schema, &columns);
496
497 assert_eq!(view.column_count(), 2);
498 assert!(view.get_column("a").is_some());
499 assert!(view.get_column("c").is_none());
500 }
501
502 #[test]
503 fn test_simd_groups() {
504 let schema = ArraySchema {
505 name: "test".to_string(),
506 fields: vec!["a".to_string(), "b".to_string(), "c".to_string()],
507 types: vec![FieldType::Int64, FieldType::Int64, FieldType::Float64],
508 };
509
510 let columns = vec![
511 ColumnRef {
512 id: 0,
513 name: "a".to_string(),
514 field_type: FieldType::Int64,
515 },
516 ColumnRef {
517 id: 1,
518 name: "b".to_string(),
519 field_type: FieldType::Int64,
520 },
521 ColumnRef {
522 id: 2,
523 name: "c".to_string(),
524 field_type: FieldType::Float64,
525 },
526 ];
527
528 let view = ColumnView::new(&schema, &columns);
529 let groups = view.simd_groups();
530
531 assert_eq!(groups.len(), 2); }
533
534 #[test]
535 fn test_column_view_type_checking() {
536 let schema = ArraySchema {
537 name: "test".to_string(),
538 fields: vec!["int_col".to_string(), "float_col".to_string()],
539 types: vec![FieldType::Int64, FieldType::Float64],
540 };
541
542 let columns = vec![
543 ColumnRef {
544 id: 0,
545 name: "int_col".to_string(),
546 field_type: FieldType::Int64,
547 },
548 ColumnRef {
549 id: 1,
550 name: "float_col".to_string(),
551 field_type: FieldType::Float64,
552 },
553 ];
554
555 let view = ColumnView::new(&schema, &columns);
556 let int_bytes = i64_to_bytes(&[1, 2, 3]);
557 let float_bytes = f64_to_bytes(&[1.0, 2.0, 3.0]);
558
559 assert!(view.column_i64("int_col", &int_bytes, 3).is_ok());
561 assert!(view.column_f64("float_col", &float_bytes, 3).is_ok());
562
563 assert!(view.column_f64("int_col", &int_bytes, 3).is_err());
565 assert!(view.column_i64("float_col", &float_bytes, 3).is_err());
566
567 assert!(view.column_i64("nonexistent", &int_bytes, 3).is_err());
569 }
570
571 #[test]
572 fn test_large_column_performance() {
573 let size = 10000;
575 let values: Vec<i64> = (0..size).collect();
576 let bytes = i64_to_bytes(&values);
577
578 let col = TypedColumn::<i64>::from_bytes(&bytes, size as usize);
579
580 assert_eq!(col.sum(), 49995000);
582 assert_eq!(col.min(), Some(0));
583 assert_eq!(col.max(), Some(9999));
584 assert_eq!(col.len(), 10000);
585 }
586}