1use std::collections::HashMap;
52use std::sync::Arc;
53
54use crate::soch::{SochRow, SochSchema, SochTable, SochType, SochValue};
55use crate::{Result, SochDBError};
56
57#[derive(Debug, Clone, PartialEq)]
59pub enum ColumnType {
60 Bool,
61 Int8,
62 Int16,
63 Int32,
64 Int64,
65 UInt8,
66 UInt16,
67 UInt32,
68 UInt64,
69 Float32,
70 Float64,
71 String,
72 Binary,
73 Struct(Vec<(String, Box<ColumnType>)>),
75 List(Box<ColumnType>),
77}
78
79impl ColumnType {
80 pub fn from_soch_type(soch_type: &SochType) -> Self {
82 match soch_type {
83 SochType::Bool => ColumnType::Bool,
84 SochType::Int => ColumnType::Int64,
85 SochType::UInt => ColumnType::UInt64,
86 SochType::Float => ColumnType::Float64,
87 SochType::Text => ColumnType::String,
88 SochType::Binary => ColumnType::Binary,
89 SochType::Array(inner) => ColumnType::List(Box::new(Self::from_soch_type(inner))),
90 SochType::Object(fields) => {
91 let struct_fields: Vec<_> = fields
92 .iter()
93 .map(|(name, ty)| (name.clone(), Box::new(Self::from_soch_type(ty))))
94 .collect();
95 ColumnType::Struct(struct_fields)
96 }
97 SochType::Null => ColumnType::Int64, SochType::Ref(_) => ColumnType::String, SochType::Optional(inner) => Self::from_soch_type(inner), }
101 }
102
103 pub fn byte_size(&self) -> Option<usize> {
105 match self {
106 ColumnType::Bool => Some(1),
107 ColumnType::Int8 | ColumnType::UInt8 => Some(1),
108 ColumnType::Int16 | ColumnType::UInt16 => Some(2),
109 ColumnType::Int32 | ColumnType::UInt32 | ColumnType::Float32 => Some(4),
110 ColumnType::Int64 | ColumnType::UInt64 | ColumnType::Float64 => Some(8),
111 _ => None, }
113 }
114}
115
116#[derive(Debug, Clone)]
118pub struct Column {
119 pub name: String,
121 pub dtype: ColumnType,
123 pub data: ColumnData,
125 pub validity: Option<Vec<u8>>,
127 pub len: usize,
129}
130
131#[derive(Debug, Clone)]
133pub enum ColumnData {
134 Bool(Vec<bool>),
135 Int64(Vec<i64>),
136 UInt64(Vec<u64>),
137 Float64(Vec<f64>),
138 String(Vec<String>),
139 Binary(Vec<Vec<u8>>),
140 Offsets(Vec<u32>),
142}
143
144impl Column {
145 pub fn new(name: impl Into<String>, dtype: ColumnType) -> Self {
147 let data = match &dtype {
148 ColumnType::Bool => ColumnData::Bool(Vec::new()),
149 ColumnType::Int8 | ColumnType::Int16 | ColumnType::Int32 | ColumnType::Int64 => {
150 ColumnData::Int64(Vec::new())
151 }
152 ColumnType::UInt8 | ColumnType::UInt16 | ColumnType::UInt32 | ColumnType::UInt64 => {
153 ColumnData::UInt64(Vec::new())
154 }
155 ColumnType::Float32 | ColumnType::Float64 => ColumnData::Float64(Vec::new()),
156 ColumnType::String => ColumnData::String(Vec::new()),
157 ColumnType::Binary => ColumnData::Binary(Vec::new()),
158 ColumnType::Struct(_) | ColumnType::List(_) => ColumnData::Offsets(Vec::new()),
159 };
160
161 Self {
162 name: name.into(),
163 dtype,
164 data,
165 validity: None,
166 len: 0,
167 }
168 }
169
170 pub fn push(&mut self, value: &SochValue) {
172 match (&mut self.data, value) {
173 (ColumnData::Bool(v), SochValue::Bool(b)) => v.push(*b),
174 (ColumnData::Int64(v), SochValue::Int(i)) => v.push(*i),
175 (ColumnData::UInt64(v), SochValue::UInt(u)) => v.push(*u),
176 (ColumnData::Float64(v), SochValue::Float(f)) => v.push(*f),
177 (ColumnData::String(v), SochValue::Text(s)) => v.push(s.clone()),
178 (ColumnData::Binary(v), SochValue::Binary(b)) => v.push(b.clone()),
179 (ColumnData::Int64(v), SochValue::Null) => {
180 v.push(0);
181 self.set_null(self.len);
182 }
183 (ColumnData::UInt64(v), SochValue::Null) => {
184 v.push(0);
185 self.set_null(self.len);
186 }
187 (ColumnData::Float64(v), SochValue::Null) => {
188 v.push(0.0);
189 self.set_null(self.len);
190 }
191 (ColumnData::String(v), SochValue::Null) => {
192 v.push(String::new());
193 self.set_null(self.len);
194 }
195 _ => {} }
197 self.len += 1;
198 }
199
200 fn set_null(&mut self, idx: usize) {
202 if self.validity.is_none() {
203 let bytes_needed = (self.len + 8) / 8;
205 self.validity = Some(vec![0xFF; bytes_needed]);
206 }
207
208 if let Some(ref mut bitmap) = self.validity {
209 let byte_idx = idx / 8;
210 let bit_idx = idx % 8;
211
212 while bitmap.len() <= byte_idx {
214 bitmap.push(0xFF);
215 }
216
217 bitmap[byte_idx] &= !(1 << bit_idx);
219 }
220 }
221
222 pub fn is_null(&self, idx: usize) -> bool {
224 match &self.validity {
225 None => false,
226 Some(bitmap) => {
227 let byte_idx = idx / 8;
228 let bit_idx = idx % 8;
229 if byte_idx >= bitmap.len() {
230 false
231 } else {
232 (bitmap[byte_idx] & (1 << bit_idx)) == 0
233 }
234 }
235 }
236 }
237
238 pub fn get(&self, idx: usize) -> Option<SochValue> {
240 if idx >= self.len {
241 return None;
242 }
243
244 if self.is_null(idx) {
245 return Some(SochValue::Null);
246 }
247
248 match &self.data {
249 ColumnData::Bool(v) => v.get(idx).map(|b| SochValue::Bool(*b)),
250 ColumnData::Int64(v) => v.get(idx).map(|i| SochValue::Int(*i)),
251 ColumnData::UInt64(v) => v.get(idx).map(|u| SochValue::UInt(*u)),
252 ColumnData::Float64(v) => v.get(idx).map(|f| SochValue::Float(*f)),
253 ColumnData::String(v) => v.get(idx).map(|s| SochValue::Text(s.clone())),
254 ColumnData::Binary(v) => v.get(idx).map(|b| SochValue::Binary(b.clone())),
255 ColumnData::Offsets(_) => None, }
257 }
258
259 pub fn memory_usage(&self) -> usize {
261 let data_size = match &self.data {
262 ColumnData::Bool(v) => v.len(),
263 ColumnData::Int64(v) => v.len() * 8,
264 ColumnData::UInt64(v) => v.len() * 8,
265 ColumnData::Float64(v) => v.len() * 8,
266 ColumnData::String(v) => v.iter().map(|s| s.len()).sum(),
267 ColumnData::Binary(v) => v.iter().map(|b| b.len()).sum(),
268 ColumnData::Offsets(v) => v.len() * 4,
269 };
270
271 let validity_size = self.validity.as_ref().map_or(0, |v| v.len());
272 data_size + validity_size
273 }
274}
275
276#[derive(Debug, Clone)]
278pub struct ColumnarTable {
279 pub name: String,
281 pub columns: HashMap<String, Column>,
283 pub column_order: Vec<String>,
285 pub row_count: usize,
287}
288
289impl ColumnarTable {
290 pub fn from_schema(schema: &SochSchema) -> Self {
292 let mut columns = HashMap::new();
293 let mut column_order = Vec::new();
294
295 for field in &schema.fields {
296 let dtype = ColumnType::from_soch_type(&field.field_type);
297 let column = Column::new(&field.name, dtype);
298 column_order.push(field.name.clone());
299 columns.insert(field.name.clone(), column);
300 }
301
302 Self {
303 name: schema.name.clone(),
304 columns,
305 column_order,
306 row_count: 0,
307 }
308 }
309
310 pub fn push_row(&mut self, row: &SochRow) {
312 for (i, col_name) in self.column_order.iter().enumerate() {
313 if let Some(column) = self.columns.get_mut(col_name) {
314 if let Some(value) = row.values.get(i) {
315 column.push(value);
316 } else {
317 column.push(&SochValue::Null);
318 }
319 }
320 }
321 self.row_count += 1;
322 }
323
324 pub fn get_row(&self, idx: usize) -> Option<SochRow> {
326 if idx >= self.row_count {
327 return None;
328 }
329
330 let values: Vec<SochValue> = self
331 .column_order
332 .iter()
333 .filter_map(|col_name| self.columns.get(col_name)?.get(idx))
334 .collect();
335
336 Some(SochRow::new(values))
337 }
338
339 pub fn column(&self, name: &str) -> Option<&Column> {
341 self.columns.get(name)
342 }
343
344 pub fn memory_usage(&self) -> usize {
346 self.columns.values().map(|c| c.memory_usage()).sum()
347 }
348}
349
350pub struct SchemaBridge {
352 schema_cache: HashMap<String, Arc<ColumnMapping>>,
354}
355
356#[derive(Debug, Clone)]
358pub struct ColumnMapping {
359 pub source_schema: SochSchema,
361 pub column_types: Vec<(String, ColumnType)>,
363 pub nested_mappings: HashMap<String, Vec<String>>,
365}
366
367impl ColumnMapping {
368 pub fn from_schema(schema: &SochSchema) -> Self {
370 let mut column_types = Vec::new();
371 let mut nested_mappings = HashMap::new();
372
373 for field in &schema.fields {
374 let dtype = ColumnType::from_soch_type(&field.field_type);
375
376 if let ColumnType::Struct(fields) = &dtype {
378 let mut nested_cols = Vec::new();
379 for (nested_name, nested_type) in fields {
380 let full_name = format!("{}.{}", field.name, nested_name);
381 column_types.push((full_name.clone(), (**nested_type).clone()));
382 nested_cols.push(full_name);
383 }
384 nested_mappings.insert(field.name.clone(), nested_cols);
385 } else {
386 column_types.push((field.name.clone(), dtype));
387 }
388 }
389
390 Self {
391 source_schema: schema.clone(),
392 column_types,
393 nested_mappings,
394 }
395 }
396
397 pub fn column_names(&self) -> Vec<&str> {
399 self.column_types.iter().map(|(n, _)| n.as_str()).collect()
400 }
401}
402
403impl SchemaBridge {
404 pub fn new() -> Self {
406 Self {
407 schema_cache: HashMap::new(),
408 }
409 }
410
411 pub fn register_schema(&mut self, schema: &SochSchema) -> Arc<ColumnMapping> {
413 if let Some(existing) = self.schema_cache.get(&schema.name) {
414 return Arc::clone(existing);
415 }
416
417 let mapping = Arc::new(ColumnMapping::from_schema(schema));
418 self.schema_cache
419 .insert(schema.name.clone(), Arc::clone(&mapping));
420 mapping
421 }
422
423 pub fn to_columnar(&self, table: &SochTable) -> Result<ColumnarTable> {
425 let mut columnar = ColumnarTable::from_schema(&table.schema);
426
427 for row in &table.rows {
428 columnar.push_row(row);
429 }
430
431 Ok(columnar)
432 }
433
434 pub fn from_columnar(
436 &self,
437 columnar: &ColumnarTable,
438 schema: &SochSchema,
439 ) -> Result<SochTable> {
440 let mut table = SochTable::new(schema.clone());
441
442 for i in 0..columnar.row_count {
443 if let Some(row) = columnar.get_row(i) {
444 table.push(row);
445 }
446 }
447
448 Ok(table)
449 }
450
451 pub fn project(&self, columnar: &ColumnarTable, columns: &[&str]) -> Result<ColumnarTable> {
453 let mut projected = ColumnarTable {
454 name: columnar.name.clone(),
455 columns: HashMap::new(),
456 column_order: Vec::new(),
457 row_count: columnar.row_count,
458 };
459
460 for col_name in columns {
461 if let Some(column) = columnar.columns.get(*col_name) {
462 projected
463 .columns
464 .insert(col_name.to_string(), column.clone());
465 projected.column_order.push(col_name.to_string());
466 } else {
467 return Err(SochDBError::InvalidArgument(format!(
468 "Column '{}' not found",
469 col_name
470 )));
471 }
472 }
473
474 Ok(projected)
475 }
476
477 pub fn filter<F>(
479 &self,
480 columnar: &ColumnarTable,
481 column: &str,
482 predicate: F,
483 ) -> Result<Vec<usize>>
484 where
485 F: Fn(&SochValue) -> bool,
486 {
487 let col = columnar.columns.get(column).ok_or_else(|| {
488 SochDBError::InvalidArgument(format!("Column '{}' not found", column))
489 })?;
490
491 let mut matching_indices = Vec::new();
492 for i in 0..col.len {
493 if let Some(value) = col.get(i)
494 && predicate(&value)
495 {
496 matching_indices.push(i);
497 }
498 }
499
500 Ok(matching_indices)
501 }
502}
503
504impl Default for SchemaBridge {
505 fn default() -> Self {
506 Self::new()
507 }
508}
509
510#[derive(Debug, Default)]
512pub struct ColumnarStats {
513 pub tables_converted: u64,
514 pub rows_processed: u64,
515 pub columns_projected: u64,
516 pub filters_applied: u64,
517 pub bytes_processed: u64,
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523
524 fn create_test_schema() -> SochSchema {
525 SochSchema::new("users")
526 .field("id", SochType::UInt)
527 .field("name", SochType::Text)
528 .field("age", SochType::Int)
529 }
530
531 fn create_test_table() -> SochTable {
532 let schema = create_test_schema();
533 let mut table = SochTable::new(schema);
534
535 table.push(SochRow::new(vec![
536 SochValue::UInt(1),
537 SochValue::Text("Alice".into()),
538 SochValue::Int(30),
539 ]));
540 table.push(SochRow::new(vec![
541 SochValue::UInt(2),
542 SochValue::Text("Bob".into()),
543 SochValue::Int(25),
544 ]));
545 table.push(SochRow::new(vec![
546 SochValue::UInt(3),
547 SochValue::Text("Carol".into()),
548 SochValue::Int(35),
549 ]));
550
551 table
552 }
553
554 #[test]
555 fn test_column_type_conversion() {
556 assert_eq!(
557 ColumnType::from_soch_type(&SochType::Int),
558 ColumnType::Int64
559 );
560 assert_eq!(
561 ColumnType::from_soch_type(&SochType::Text),
562 ColumnType::String
563 );
564 assert_eq!(
565 ColumnType::from_soch_type(&SochType::Bool),
566 ColumnType::Bool
567 );
568 }
569
570 #[test]
571 fn test_column_push_and_get() {
572 let mut col = Column::new("test", ColumnType::Int64);
573
574 col.push(&SochValue::Int(10));
575 col.push(&SochValue::Int(20));
576 col.push(&SochValue::Int(30));
577
578 assert_eq!(col.len, 3);
579 assert_eq!(col.get(0), Some(SochValue::Int(10)));
580 assert_eq!(col.get(1), Some(SochValue::Int(20)));
581 assert_eq!(col.get(2), Some(SochValue::Int(30)));
582 assert_eq!(col.get(3), None);
583 }
584
585 #[test]
586 fn test_column_null_handling() {
587 let mut col = Column::new("test", ColumnType::Int64);
588
589 col.push(&SochValue::Int(10));
590 col.push(&SochValue::Null);
591 col.push(&SochValue::Int(30));
592
593 assert!(!col.is_null(0));
594 assert!(col.is_null(1));
595 assert!(!col.is_null(2));
596
597 assert_eq!(col.get(0), Some(SochValue::Int(10)));
598 assert_eq!(col.get(1), Some(SochValue::Null));
599 assert_eq!(col.get(2), Some(SochValue::Int(30)));
600 }
601
602 #[test]
603 fn test_columnar_table_from_schema() {
604 let schema = create_test_schema();
605 let columnar = ColumnarTable::from_schema(&schema);
606
607 assert_eq!(columnar.name, "users");
608 assert_eq!(columnar.columns.len(), 3);
609 assert!(columnar.columns.contains_key("id"));
610 assert!(columnar.columns.contains_key("name"));
611 assert!(columnar.columns.contains_key("age"));
612 }
613
614 #[test]
615 fn test_soch_to_columnar_conversion() {
616 let table = create_test_table();
617 let bridge = SchemaBridge::new();
618
619 let columnar = bridge.to_columnar(&table).unwrap();
620
621 assert_eq!(columnar.row_count, 3);
622
623 let id_col = columnar.column("id").unwrap();
624 assert_eq!(id_col.get(0), Some(SochValue::UInt(1)));
625 assert_eq!(id_col.get(1), Some(SochValue::UInt(2)));
626 assert_eq!(id_col.get(2), Some(SochValue::UInt(3)));
627
628 let name_col = columnar.column("name").unwrap();
629 assert_eq!(name_col.get(0), Some(SochValue::Text("Alice".into())));
630 }
631
632 #[test]
633 fn test_columnar_to_soch_roundtrip() {
634 let original = create_test_table();
635 let bridge = SchemaBridge::new();
636
637 let columnar = bridge.to_columnar(&original).unwrap();
638 let restored = bridge.from_columnar(&columnar, &original.schema).unwrap();
639
640 assert_eq!(restored.rows.len(), original.rows.len());
641
642 for (i, row) in restored.rows.iter().enumerate() {
643 assert_eq!(row.values, original.rows[i].values);
644 }
645 }
646
647 #[test]
648 fn test_column_projection() {
649 let table = create_test_table();
650 let bridge = SchemaBridge::new();
651
652 let columnar = bridge.to_columnar(&table).unwrap();
653 let projected = bridge.project(&columnar, &["id", "name"]).unwrap();
654
655 assert_eq!(projected.columns.len(), 2);
656 assert!(projected.columns.contains_key("id"));
657 assert!(projected.columns.contains_key("name"));
658 assert!(!projected.columns.contains_key("age"));
659 }
660
661 #[test]
662 fn test_column_filter() {
663 let table = create_test_table();
664 let bridge = SchemaBridge::new();
665
666 let columnar = bridge.to_columnar(&table).unwrap();
667
668 let matches = bridge
670 .filter(&columnar, "age", |v| match v {
671 SochValue::Int(age) => *age > 28,
672 _ => false,
673 })
674 .unwrap();
675
676 assert_eq!(matches, vec![0, 2]); }
678
679 #[test]
680 fn test_schema_mapping() {
681 let schema = create_test_schema();
682 let mapping = ColumnMapping::from_schema(&schema);
683
684 assert_eq!(mapping.column_types.len(), 3);
685 assert_eq!(mapping.column_names(), vec!["id", "name", "age"]);
686 }
687
688 #[test]
689 fn test_memory_usage() {
690 let table = create_test_table();
691 let bridge = SchemaBridge::new();
692
693 let columnar = bridge.to_columnar(&table).unwrap();
694 let usage = columnar.memory_usage();
695
696 assert!(usage > 0);
698 }
699
700 #[test]
701 fn test_get_row() {
702 let table = create_test_table();
703 let bridge = SchemaBridge::new();
704
705 let columnar = bridge.to_columnar(&table).unwrap();
706
707 let row0 = columnar.get_row(0).unwrap();
708 assert_eq!(row0.values[0], SochValue::UInt(1));
709 assert_eq!(row0.values[1], SochValue::Text("Alice".into()));
710 assert_eq!(row0.values[2], SochValue::Int(30));
711
712 assert!(columnar.get_row(100).is_none());
713 }
714
715 #[test]
716 fn test_column_type_byte_size() {
717 assert_eq!(ColumnType::Bool.byte_size(), Some(1));
718 assert_eq!(ColumnType::Int64.byte_size(), Some(8));
719 assert_eq!(ColumnType::Float64.byte_size(), Some(8));
720 assert_eq!(ColumnType::String.byte_size(), None);
721 }
722
723 #[test]
724 fn test_schema_bridge_caching() {
725 let schema = create_test_schema();
726 let mut bridge = SchemaBridge::new();
727
728 let mapping1 = bridge.register_schema(&schema);
729 let mapping2 = bridge.register_schema(&schema);
730
731 assert!(Arc::ptr_eq(&mapping1, &mapping2));
733 }
734
735 #[test]
736 fn test_invalid_column_projection() {
737 let table = create_test_table();
738 let bridge = SchemaBridge::new();
739
740 let columnar = bridge.to_columnar(&table).unwrap();
741 let result = bridge.project(&columnar, &["nonexistent"]);
742
743 assert!(result.is_err());
744 }
745}