1use std::collections::HashMap;
55use std::sync::Arc;
56
57use crate::soch::{SochRow, SochSchema, SochTable, SochType, SochValue};
58use crate::{Result, SochDBError};
59
60#[derive(Debug, Clone, PartialEq)]
62pub enum ColumnType {
63 Bool,
64 Int8,
65 Int16,
66 Int32,
67 Int64,
68 UInt8,
69 UInt16,
70 UInt32,
71 UInt64,
72 Float32,
73 Float64,
74 String,
75 Binary,
76 Struct(Vec<(String, Box<ColumnType>)>),
78 List(Box<ColumnType>),
80}
81
82impl ColumnType {
83 pub fn from_soch_type(soch_type: &SochType) -> Self {
85 match soch_type {
86 SochType::Bool => ColumnType::Bool,
87 SochType::Int => ColumnType::Int64,
88 SochType::UInt => ColumnType::UInt64,
89 SochType::Float => ColumnType::Float64,
90 SochType::Text => ColumnType::String,
91 SochType::Binary => ColumnType::Binary,
92 SochType::Array(inner) => ColumnType::List(Box::new(Self::from_soch_type(inner))),
93 SochType::Object(fields) => {
94 let struct_fields: Vec<_> = fields
95 .iter()
96 .map(|(name, ty)| (name.clone(), Box::new(Self::from_soch_type(ty))))
97 .collect();
98 ColumnType::Struct(struct_fields)
99 }
100 SochType::Null => ColumnType::Int64, SochType::Ref(_) => ColumnType::String, SochType::Optional(inner) => Self::from_soch_type(inner), }
104 }
105
106 pub fn byte_size(&self) -> Option<usize> {
108 match self {
109 ColumnType::Bool => Some(1),
110 ColumnType::Int8 | ColumnType::UInt8 => Some(1),
111 ColumnType::Int16 | ColumnType::UInt16 => Some(2),
112 ColumnType::Int32 | ColumnType::UInt32 | ColumnType::Float32 => Some(4),
113 ColumnType::Int64 | ColumnType::UInt64 | ColumnType::Float64 => Some(8),
114 _ => None, }
116 }
117}
118
119#[derive(Debug, Clone)]
121pub struct Column {
122 pub name: String,
124 pub dtype: ColumnType,
126 pub data: ColumnData,
128 pub validity: Option<Vec<u8>>,
130 pub len: usize,
132}
133
134#[derive(Debug, Clone)]
136pub enum ColumnData {
137 Bool(Vec<bool>),
138 Int64(Vec<i64>),
139 UInt64(Vec<u64>),
140 Float64(Vec<f64>),
141 String(Vec<String>),
142 Binary(Vec<Vec<u8>>),
143 Offsets(Vec<u32>),
145}
146
147impl Column {
148 pub fn new(name: impl Into<String>, dtype: ColumnType) -> Self {
150 let data = match &dtype {
151 ColumnType::Bool => ColumnData::Bool(Vec::new()),
152 ColumnType::Int8 | ColumnType::Int16 | ColumnType::Int32 | ColumnType::Int64 => {
153 ColumnData::Int64(Vec::new())
154 }
155 ColumnType::UInt8 | ColumnType::UInt16 | ColumnType::UInt32 | ColumnType::UInt64 => {
156 ColumnData::UInt64(Vec::new())
157 }
158 ColumnType::Float32 | ColumnType::Float64 => ColumnData::Float64(Vec::new()),
159 ColumnType::String => ColumnData::String(Vec::new()),
160 ColumnType::Binary => ColumnData::Binary(Vec::new()),
161 ColumnType::Struct(_) | ColumnType::List(_) => ColumnData::Offsets(Vec::new()),
162 };
163
164 Self {
165 name: name.into(),
166 dtype,
167 data,
168 validity: None,
169 len: 0,
170 }
171 }
172
173 pub fn push(&mut self, value: &SochValue) {
175 match (&mut self.data, value) {
176 (ColumnData::Bool(v), SochValue::Bool(b)) => v.push(*b),
177 (ColumnData::Int64(v), SochValue::Int(i)) => v.push(*i),
178 (ColumnData::UInt64(v), SochValue::UInt(u)) => v.push(*u),
179 (ColumnData::Float64(v), SochValue::Float(f)) => v.push(*f),
180 (ColumnData::String(v), SochValue::Text(s)) => v.push(s.clone()),
181 (ColumnData::Binary(v), SochValue::Binary(b)) => v.push(b.clone()),
182 (ColumnData::Int64(v), SochValue::Null) => {
183 v.push(0);
184 self.set_null(self.len);
185 }
186 (ColumnData::UInt64(v), SochValue::Null) => {
187 v.push(0);
188 self.set_null(self.len);
189 }
190 (ColumnData::Float64(v), SochValue::Null) => {
191 v.push(0.0);
192 self.set_null(self.len);
193 }
194 (ColumnData::String(v), SochValue::Null) => {
195 v.push(String::new());
196 self.set_null(self.len);
197 }
198 _ => {} }
200 self.len += 1;
201 }
202
203 fn set_null(&mut self, idx: usize) {
205 if self.validity.is_none() {
206 let bytes_needed = (self.len + 8) / 8;
208 self.validity = Some(vec![0xFF; bytes_needed]);
209 }
210
211 if let Some(ref mut bitmap) = self.validity {
212 let byte_idx = idx / 8;
213 let bit_idx = idx % 8;
214
215 while bitmap.len() <= byte_idx {
217 bitmap.push(0xFF);
218 }
219
220 bitmap[byte_idx] &= !(1 << bit_idx);
222 }
223 }
224
225 pub fn is_null(&self, idx: usize) -> bool {
227 match &self.validity {
228 None => false,
229 Some(bitmap) => {
230 let byte_idx = idx / 8;
231 let bit_idx = idx % 8;
232 if byte_idx >= bitmap.len() {
233 false
234 } else {
235 (bitmap[byte_idx] & (1 << bit_idx)) == 0
236 }
237 }
238 }
239 }
240
241 pub fn get(&self, idx: usize) -> Option<SochValue> {
243 if idx >= self.len {
244 return None;
245 }
246
247 if self.is_null(idx) {
248 return Some(SochValue::Null);
249 }
250
251 match &self.data {
252 ColumnData::Bool(v) => v.get(idx).map(|b| SochValue::Bool(*b)),
253 ColumnData::Int64(v) => v.get(idx).map(|i| SochValue::Int(*i)),
254 ColumnData::UInt64(v) => v.get(idx).map(|u| SochValue::UInt(*u)),
255 ColumnData::Float64(v) => v.get(idx).map(|f| SochValue::Float(*f)),
256 ColumnData::String(v) => v.get(idx).map(|s| SochValue::Text(s.clone())),
257 ColumnData::Binary(v) => v.get(idx).map(|b| SochValue::Binary(b.clone())),
258 ColumnData::Offsets(_) => None, }
260 }
261
262 pub fn memory_usage(&self) -> usize {
264 let data_size = match &self.data {
265 ColumnData::Bool(v) => v.len(),
266 ColumnData::Int64(v) => v.len() * 8,
267 ColumnData::UInt64(v) => v.len() * 8,
268 ColumnData::Float64(v) => v.len() * 8,
269 ColumnData::String(v) => v.iter().map(|s| s.len()).sum(),
270 ColumnData::Binary(v) => v.iter().map(|b| b.len()).sum(),
271 ColumnData::Offsets(v) => v.len() * 4,
272 };
273
274 let validity_size = self.validity.as_ref().map_or(0, |v| v.len());
275 data_size + validity_size
276 }
277}
278
279#[derive(Debug, Clone)]
281pub struct ColumnarTable {
282 pub name: String,
284 pub columns: HashMap<String, Column>,
286 pub column_order: Vec<String>,
288 pub row_count: usize,
290}
291
292impl ColumnarTable {
293 pub fn from_schema(schema: &SochSchema) -> Self {
295 let mut columns = HashMap::new();
296 let mut column_order = Vec::new();
297
298 for field in &schema.fields {
299 let dtype = ColumnType::from_soch_type(&field.field_type);
300 let column = Column::new(&field.name, dtype);
301 column_order.push(field.name.clone());
302 columns.insert(field.name.clone(), column);
303 }
304
305 Self {
306 name: schema.name.clone(),
307 columns,
308 column_order,
309 row_count: 0,
310 }
311 }
312
313 pub fn push_row(&mut self, row: &SochRow) {
315 for (i, col_name) in self.column_order.iter().enumerate() {
316 if let Some(column) = self.columns.get_mut(col_name) {
317 if let Some(value) = row.values.get(i) {
318 column.push(value);
319 } else {
320 column.push(&SochValue::Null);
321 }
322 }
323 }
324 self.row_count += 1;
325 }
326
327 pub fn get_row(&self, idx: usize) -> Option<SochRow> {
329 if idx >= self.row_count {
330 return None;
331 }
332
333 let values: Vec<SochValue> = self
334 .column_order
335 .iter()
336 .filter_map(|col_name| self.columns.get(col_name)?.get(idx))
337 .collect();
338
339 Some(SochRow::new(values))
340 }
341
342 pub fn column(&self, name: &str) -> Option<&Column> {
344 self.columns.get(name)
345 }
346
347 pub fn memory_usage(&self) -> usize {
349 self.columns.values().map(|c| c.memory_usage()).sum()
350 }
351}
352
353pub struct SchemaBridge {
355 schema_cache: HashMap<String, Arc<ColumnMapping>>,
357}
358
359#[derive(Debug, Clone)]
361pub struct ColumnMapping {
362 pub source_schema: SochSchema,
364 pub column_types: Vec<(String, ColumnType)>,
366 pub nested_mappings: HashMap<String, Vec<String>>,
368}
369
370impl ColumnMapping {
371 pub fn from_schema(schema: &SochSchema) -> Self {
373 let mut column_types = Vec::new();
374 let mut nested_mappings = HashMap::new();
375
376 for field in &schema.fields {
377 let dtype = ColumnType::from_soch_type(&field.field_type);
378
379 if let ColumnType::Struct(fields) = &dtype {
381 let mut nested_cols = Vec::new();
382 for (nested_name, nested_type) in fields {
383 let full_name = format!("{}.{}", field.name, nested_name);
384 column_types.push((full_name.clone(), (**nested_type).clone()));
385 nested_cols.push(full_name);
386 }
387 nested_mappings.insert(field.name.clone(), nested_cols);
388 } else {
389 column_types.push((field.name.clone(), dtype));
390 }
391 }
392
393 Self {
394 source_schema: schema.clone(),
395 column_types,
396 nested_mappings,
397 }
398 }
399
400 pub fn column_names(&self) -> Vec<&str> {
402 self.column_types.iter().map(|(n, _)| n.as_str()).collect()
403 }
404}
405
406impl SchemaBridge {
407 pub fn new() -> Self {
409 Self {
410 schema_cache: HashMap::new(),
411 }
412 }
413
414 pub fn register_schema(&mut self, schema: &SochSchema) -> Arc<ColumnMapping> {
416 if let Some(existing) = self.schema_cache.get(&schema.name) {
417 return Arc::clone(existing);
418 }
419
420 let mapping = Arc::new(ColumnMapping::from_schema(schema));
421 self.schema_cache
422 .insert(schema.name.clone(), Arc::clone(&mapping));
423 mapping
424 }
425
426 pub fn to_columnar(&self, table: &SochTable) -> Result<ColumnarTable> {
428 let mut columnar = ColumnarTable::from_schema(&table.schema);
429
430 for row in &table.rows {
431 columnar.push_row(row);
432 }
433
434 Ok(columnar)
435 }
436
437 pub fn from_columnar(
439 &self,
440 columnar: &ColumnarTable,
441 schema: &SochSchema,
442 ) -> Result<SochTable> {
443 let mut table = SochTable::new(schema.clone());
444
445 for i in 0..columnar.row_count {
446 if let Some(row) = columnar.get_row(i) {
447 table.push(row);
448 }
449 }
450
451 Ok(table)
452 }
453
454 pub fn project(&self, columnar: &ColumnarTable, columns: &[&str]) -> Result<ColumnarTable> {
456 let mut projected = ColumnarTable {
457 name: columnar.name.clone(),
458 columns: HashMap::new(),
459 column_order: Vec::new(),
460 row_count: columnar.row_count,
461 };
462
463 for col_name in columns {
464 if let Some(column) = columnar.columns.get(*col_name) {
465 projected
466 .columns
467 .insert(col_name.to_string(), column.clone());
468 projected.column_order.push(col_name.to_string());
469 } else {
470 return Err(SochDBError::InvalidArgument(format!(
471 "Column '{}' not found",
472 col_name
473 )));
474 }
475 }
476
477 Ok(projected)
478 }
479
480 pub fn filter<F>(
482 &self,
483 columnar: &ColumnarTable,
484 column: &str,
485 predicate: F,
486 ) -> Result<Vec<usize>>
487 where
488 F: Fn(&SochValue) -> bool,
489 {
490 let col = columnar.columns.get(column).ok_or_else(|| {
491 SochDBError::InvalidArgument(format!("Column '{}' not found", column))
492 })?;
493
494 let mut matching_indices = Vec::new();
495 for i in 0..col.len {
496 if let Some(value) = col.get(i)
497 && predicate(&value)
498 {
499 matching_indices.push(i);
500 }
501 }
502
503 Ok(matching_indices)
504 }
505}
506
507impl Default for SchemaBridge {
508 fn default() -> Self {
509 Self::new()
510 }
511}
512
513#[derive(Debug, Default)]
515pub struct ColumnarStats {
516 pub tables_converted: u64,
517 pub rows_processed: u64,
518 pub columns_projected: u64,
519 pub filters_applied: u64,
520 pub bytes_processed: u64,
521}
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526
527 fn create_test_schema() -> SochSchema {
528 SochSchema::new("users")
529 .field("id", SochType::UInt)
530 .field("name", SochType::Text)
531 .field("age", SochType::Int)
532 }
533
534 fn create_test_table() -> SochTable {
535 let schema = create_test_schema();
536 let mut table = SochTable::new(schema);
537
538 table.push(SochRow::new(vec![
539 SochValue::UInt(1),
540 SochValue::Text("Alice".into()),
541 SochValue::Int(30),
542 ]));
543 table.push(SochRow::new(vec![
544 SochValue::UInt(2),
545 SochValue::Text("Bob".into()),
546 SochValue::Int(25),
547 ]));
548 table.push(SochRow::new(vec![
549 SochValue::UInt(3),
550 SochValue::Text("Carol".into()),
551 SochValue::Int(35),
552 ]));
553
554 table
555 }
556
557 #[test]
558 fn test_column_type_conversion() {
559 assert_eq!(
560 ColumnType::from_soch_type(&SochType::Int),
561 ColumnType::Int64
562 );
563 assert_eq!(
564 ColumnType::from_soch_type(&SochType::Text),
565 ColumnType::String
566 );
567 assert_eq!(
568 ColumnType::from_soch_type(&SochType::Bool),
569 ColumnType::Bool
570 );
571 }
572
573 #[test]
574 fn test_column_push_and_get() {
575 let mut col = Column::new("test", ColumnType::Int64);
576
577 col.push(&SochValue::Int(10));
578 col.push(&SochValue::Int(20));
579 col.push(&SochValue::Int(30));
580
581 assert_eq!(col.len, 3);
582 assert_eq!(col.get(0), Some(SochValue::Int(10)));
583 assert_eq!(col.get(1), Some(SochValue::Int(20)));
584 assert_eq!(col.get(2), Some(SochValue::Int(30)));
585 assert_eq!(col.get(3), None);
586 }
587
588 #[test]
589 fn test_column_null_handling() {
590 let mut col = Column::new("test", ColumnType::Int64);
591
592 col.push(&SochValue::Int(10));
593 col.push(&SochValue::Null);
594 col.push(&SochValue::Int(30));
595
596 assert!(!col.is_null(0));
597 assert!(col.is_null(1));
598 assert!(!col.is_null(2));
599
600 assert_eq!(col.get(0), Some(SochValue::Int(10)));
601 assert_eq!(col.get(1), Some(SochValue::Null));
602 assert_eq!(col.get(2), Some(SochValue::Int(30)));
603 }
604
605 #[test]
606 fn test_columnar_table_from_schema() {
607 let schema = create_test_schema();
608 let columnar = ColumnarTable::from_schema(&schema);
609
610 assert_eq!(columnar.name, "users");
611 assert_eq!(columnar.columns.len(), 3);
612 assert!(columnar.columns.contains_key("id"));
613 assert!(columnar.columns.contains_key("name"));
614 assert!(columnar.columns.contains_key("age"));
615 }
616
617 #[test]
618 fn test_soch_to_columnar_conversion() {
619 let table = create_test_table();
620 let bridge = SchemaBridge::new();
621
622 let columnar = bridge.to_columnar(&table).unwrap();
623
624 assert_eq!(columnar.row_count, 3);
625
626 let id_col = columnar.column("id").unwrap();
627 assert_eq!(id_col.get(0), Some(SochValue::UInt(1)));
628 assert_eq!(id_col.get(1), Some(SochValue::UInt(2)));
629 assert_eq!(id_col.get(2), Some(SochValue::UInt(3)));
630
631 let name_col = columnar.column("name").unwrap();
632 assert_eq!(name_col.get(0), Some(SochValue::Text("Alice".into())));
633 }
634
635 #[test]
636 fn test_columnar_to_soch_roundtrip() {
637 let original = create_test_table();
638 let bridge = SchemaBridge::new();
639
640 let columnar = bridge.to_columnar(&original).unwrap();
641 let restored = bridge.from_columnar(&columnar, &original.schema).unwrap();
642
643 assert_eq!(restored.rows.len(), original.rows.len());
644
645 for (i, row) in restored.rows.iter().enumerate() {
646 assert_eq!(row.values, original.rows[i].values);
647 }
648 }
649
650 #[test]
651 fn test_column_projection() {
652 let table = create_test_table();
653 let bridge = SchemaBridge::new();
654
655 let columnar = bridge.to_columnar(&table).unwrap();
656 let projected = bridge.project(&columnar, &["id", "name"]).unwrap();
657
658 assert_eq!(projected.columns.len(), 2);
659 assert!(projected.columns.contains_key("id"));
660 assert!(projected.columns.contains_key("name"));
661 assert!(!projected.columns.contains_key("age"));
662 }
663
664 #[test]
665 fn test_column_filter() {
666 let table = create_test_table();
667 let bridge = SchemaBridge::new();
668
669 let columnar = bridge.to_columnar(&table).unwrap();
670
671 let matches = bridge
673 .filter(&columnar, "age", |v| match v {
674 SochValue::Int(age) => *age > 28,
675 _ => false,
676 })
677 .unwrap();
678
679 assert_eq!(matches, vec![0, 2]); }
681
682 #[test]
683 fn test_schema_mapping() {
684 let schema = create_test_schema();
685 let mapping = ColumnMapping::from_schema(&schema);
686
687 assert_eq!(mapping.column_types.len(), 3);
688 assert_eq!(mapping.column_names(), vec!["id", "name", "age"]);
689 }
690
691 #[test]
692 fn test_memory_usage() {
693 let table = create_test_table();
694 let bridge = SchemaBridge::new();
695
696 let columnar = bridge.to_columnar(&table).unwrap();
697 let usage = columnar.memory_usage();
698
699 assert!(usage > 0);
701 }
702
703 #[test]
704 fn test_get_row() {
705 let table = create_test_table();
706 let bridge = SchemaBridge::new();
707
708 let columnar = bridge.to_columnar(&table).unwrap();
709
710 let row0 = columnar.get_row(0).unwrap();
711 assert_eq!(row0.values[0], SochValue::UInt(1));
712 assert_eq!(row0.values[1], SochValue::Text("Alice".into()));
713 assert_eq!(row0.values[2], SochValue::Int(30));
714
715 assert!(columnar.get_row(100).is_none());
716 }
717
718 #[test]
719 fn test_column_type_byte_size() {
720 assert_eq!(ColumnType::Bool.byte_size(), Some(1));
721 assert_eq!(ColumnType::Int64.byte_size(), Some(8));
722 assert_eq!(ColumnType::Float64.byte_size(), Some(8));
723 assert_eq!(ColumnType::String.byte_size(), None);
724 }
725
726 #[test]
727 fn test_schema_bridge_caching() {
728 let schema = create_test_schema();
729 let mut bridge = SchemaBridge::new();
730
731 let mapping1 = bridge.register_schema(&schema);
732 let mapping2 = bridge.register_schema(&schema);
733
734 assert!(Arc::ptr_eq(&mapping1, &mapping2));
736 }
737
738 #[test]
739 fn test_invalid_column_projection() {
740 let table = create_test_table();
741 let bridge = SchemaBridge::new();
742
743 let columnar = bridge.to_columnar(&table).unwrap();
744 let result = bridge.project(&columnar, &["nonexistent"]);
745
746 assert!(result.is_err());
747 }
748}