1use crate::error::{IoError, Result};
9
10use super::types::{Column, ColumnData, ColumnarTable};
11
12#[derive(Debug, Clone, PartialEq)]
18pub struct ColumnStats {
19 pub name: String,
21 pub count: usize,
23 pub null_count: usize,
25 pub min: Option<f64>,
27 pub max: Option<f64>,
29 pub sum: Option<f64>,
31 pub distinct_count: Option<usize>,
33}
34
35impl ColumnStats {
36 pub fn from_column(col: &Column) -> Self {
38 let count = col.len();
39 let null_count = 0; match &col.data {
42 ColumnData::Float64(v) => {
43 let (min, max, sum) = if v.is_empty() {
44 (None, None, None)
45 } else {
46 let mut mn = f64::INFINITY;
47 let mut mx = f64::NEG_INFINITY;
48 let mut s = 0.0;
49 for &val in v {
50 if val < mn {
51 mn = val;
52 }
53 if val > mx {
54 mx = val;
55 }
56 s += val;
57 }
58 (Some(mn), Some(mx), Some(s))
59 };
60 ColumnStats {
61 name: col.name.clone(),
62 count,
63 null_count,
64 min,
65 max,
66 sum,
67 distinct_count: None,
68 }
69 }
70 ColumnData::Int64(v) => {
71 let (min, max, sum) = if v.is_empty() {
72 (None, None, None)
73 } else {
74 let mut mn = i64::MAX;
75 let mut mx = i64::MIN;
76 let mut s: i64 = 0;
77 for &val in v {
78 if val < mn {
79 mn = val;
80 }
81 if val > mx {
82 mx = val;
83 }
84 s = s.wrapping_add(val);
85 }
86 (Some(mn as f64), Some(mx as f64), Some(s as f64))
87 };
88 ColumnStats {
89 name: col.name.clone(),
90 count,
91 null_count,
92 min,
93 max,
94 sum,
95 distinct_count: None,
96 }
97 }
98 ColumnData::Str(v) => {
99 let distinct = {
100 let mut set = std::collections::HashSet::new();
101 for s in v {
102 set.insert(s.as_str());
103 }
104 set.len()
105 };
106 ColumnStats {
107 name: col.name.clone(),
108 count,
109 null_count,
110 min: None,
111 max: None,
112 sum: None,
113 distinct_count: Some(distinct),
114 }
115 }
116 ColumnData::Bool(v) => {
117 let true_count = v.iter().filter(|&&b| b).count();
118 ColumnStats {
119 name: col.name.clone(),
120 count,
121 null_count,
122 min: Some(0.0),
123 max: Some(1.0),
124 sum: Some(true_count as f64),
125 distinct_count: Some(if v.is_empty() {
126 0
127 } else if true_count == 0 || true_count == count {
128 1
129 } else {
130 2
131 }),
132 }
133 }
134 }
135 }
136
137 pub fn could_contain_value(&self, value: f64) -> bool {
139 match (self.min, self.max) {
140 (Some(mn), Some(mx)) => value >= mn && value <= mx,
141 _ => true, }
143 }
144}
145
146#[derive(Debug, Clone)]
148pub struct TableStats {
149 pub columns: Vec<ColumnStats>,
151 pub num_rows: usize,
153}
154
155impl TableStats {
156 pub fn from_table(table: &ColumnarTable) -> Self {
158 let columns = table
159 .columns()
160 .iter()
161 .map(|col| ColumnStats::from_column(col))
162 .collect();
163 TableStats {
164 columns,
165 num_rows: table.num_rows(),
166 }
167 }
168
169 pub fn column_stats(&self, name: &str) -> Option<&ColumnStats> {
171 self.columns.iter().find(|cs| cs.name == name)
172 }
173}
174
175#[derive(Debug, Clone)]
182pub struct RowGroup {
183 pub start_row: usize,
185 pub num_rows: usize,
187 pub stats: Vec<ColumnStats>,
189}
190
191#[derive(Debug, Clone)]
193pub struct RowGroupConfig {
194 pub max_rows_per_group: usize,
196}
197
198impl Default for RowGroupConfig {
199 fn default() -> Self {
200 RowGroupConfig {
201 max_rows_per_group: 65_536,
202 }
203 }
204}
205
206pub fn split_into_row_groups(
208 table: &ColumnarTable,
209 config: &RowGroupConfig,
210) -> Result<Vec<RowGroup>> {
211 let total_rows = table.num_rows();
212 if total_rows == 0 {
213 return Ok(Vec::new());
214 }
215
216 let max_per = config.max_rows_per_group.max(1);
217 let num_groups = (total_rows + max_per - 1) / max_per;
218 let mut groups = Vec::with_capacity(num_groups);
219
220 for g in 0..num_groups {
221 let start = g * max_per;
222 let end = (start + max_per).min(total_rows);
223 let group_rows = end - start;
224
225 let stats: Vec<ColumnStats> = table
227 .columns()
228 .iter()
229 .map(|col| {
230 let slice_col = slice_column(col, start, end);
231 ColumnStats::from_column(&slice_col)
232 })
233 .collect();
234
235 groups.push(RowGroup {
236 start_row: start,
237 num_rows: group_rows,
238 stats,
239 });
240 }
241
242 Ok(groups)
243}
244
245fn slice_column(col: &Column, start: usize, end: usize) -> Column {
247 let data = match &col.data {
248 ColumnData::Float64(v) => ColumnData::Float64(v[start..end].to_vec()),
249 ColumnData::Int64(v) => ColumnData::Int64(v[start..end].to_vec()),
250 ColumnData::Str(v) => ColumnData::Str(v[start..end].to_vec()),
251 ColumnData::Bool(v) => ColumnData::Bool(v[start..end].to_vec()),
252 };
253 Column {
254 name: col.name.clone(),
255 data,
256 }
257}
258
259pub fn extract_row_group(table: &ColumnarTable, group: &RowGroup) -> Result<ColumnarTable> {
261 let start = group.start_row;
262 let end = start + group.num_rows;
263
264 let columns: Vec<Column> = table
265 .columns()
266 .iter()
267 .map(|col| slice_column(col, start, end))
268 .collect();
269
270 ColumnarTable::from_columns(columns)
271}
272
273#[derive(Debug, Clone)]
279pub enum Predicate {
280 FloatEquals(String, f64),
282 FloatRange(String, f64, f64),
284 IntEquals(String, i64),
286 IntRange(String, i64, i64),
288 StrEquals(String, String),
290 BoolEquals(String, bool),
292 And(Vec<Predicate>),
294 Or(Vec<Predicate>),
296}
297
298impl Predicate {
299 pub fn could_match_row_group(&self, group: &RowGroup) -> bool {
303 match self {
304 Predicate::FloatEquals(col_name, val) => {
305 if let Some(stats) = group.stats.iter().find(|s| s.name == *col_name) {
306 stats.could_contain_value(*val)
307 } else {
308 true }
310 }
311 Predicate::FloatRange(col_name, lo, hi) => {
312 if let Some(stats) = group.stats.iter().find(|s| s.name == *col_name) {
313 match (stats.min, stats.max) {
314 (Some(mn), Some(mx)) => mx >= *lo && mn <= *hi,
315 _ => true,
316 }
317 } else {
318 true
319 }
320 }
321 Predicate::IntEquals(col_name, val) => {
322 if let Some(stats) = group.stats.iter().find(|s| s.name == *col_name) {
323 stats.could_contain_value(*val as f64)
324 } else {
325 true
326 }
327 }
328 Predicate::IntRange(col_name, lo, hi) => {
329 if let Some(stats) = group.stats.iter().find(|s| s.name == *col_name) {
330 match (stats.min, stats.max) {
331 (Some(mn), Some(mx)) => mx >= *lo as f64 && mn <= *hi as f64,
332 _ => true,
333 }
334 } else {
335 true
336 }
337 }
338 Predicate::StrEquals(_col_name, _val) => {
339 true
341 }
342 Predicate::BoolEquals(col_name, val) => {
343 if let Some(stats) = group.stats.iter().find(|s| s.name == *col_name) {
344 if let Some(sum) = stats.sum {
345 if *val {
346 sum > 0.0 } else {
348 sum < stats.count as f64 }
350 } else {
351 true
352 }
353 } else {
354 true
355 }
356 }
357 Predicate::And(preds) => preds.iter().all(|p| p.could_match_row_group(group)),
358 Predicate::Or(preds) => preds.iter().any(|p| p.could_match_row_group(group)),
359 }
360 }
361
362 pub fn evaluate(&self, table: &ColumnarTable) -> Result<Vec<bool>> {
365 let n = table.num_rows();
366 match self {
367 Predicate::FloatEquals(col_name, val) => {
368 let data = table.get_f64(col_name)?;
369 Ok(data
370 .iter()
371 .map(|&v| (v - val).abs() < f64::EPSILON)
372 .collect())
373 }
374 Predicate::FloatRange(col_name, lo, hi) => {
375 let data = table.get_f64(col_name)?;
376 Ok(data.iter().map(|&v| v >= *lo && v <= *hi).collect())
377 }
378 Predicate::IntEquals(col_name, val) => {
379 let data = table.get_i64(col_name)?;
380 Ok(data.iter().map(|&v| v == *val).collect())
381 }
382 Predicate::IntRange(col_name, lo, hi) => {
383 let data = table.get_i64(col_name)?;
384 Ok(data.iter().map(|&v| v >= *lo && v <= *hi).collect())
385 }
386 Predicate::StrEquals(col_name, val) => {
387 let data = table.get_str(col_name)?;
388 Ok(data.iter().map(|v| v == val).collect())
389 }
390 Predicate::BoolEquals(col_name, val) => {
391 let data = table.get_bool(col_name)?;
392 Ok(data.iter().map(|&v| v == *val).collect())
393 }
394 Predicate::And(preds) => {
395 let mut result = vec![true; n];
396 for p in preds {
397 let mask = p.evaluate(table)?;
398 for (r, m) in result.iter_mut().zip(mask.iter()) {
399 *r = *r && *m;
400 }
401 }
402 Ok(result)
403 }
404 Predicate::Or(preds) => {
405 let mut result = vec![false; n];
406 for p in preds {
407 let mask = p.evaluate(table)?;
408 for (r, m) in result.iter_mut().zip(mask.iter()) {
409 *r = *r || *m;
410 }
411 }
412 Ok(result)
413 }
414 }
415 }
416}
417
418pub fn read_columnar_with_columns<P: AsRef<std::path::Path>>(
420 path: P,
421 columns: &[&str],
422) -> Result<ColumnarTable> {
423 let full = super::reader::read_columnar(path)?;
424 select_columns(&full, columns)
425}
426
427pub fn select_columns(table: &ColumnarTable, columns: &[&str]) -> Result<ColumnarTable> {
429 let mut selected = Vec::with_capacity(columns.len());
430 for &name in columns {
431 let col = table.column(name)?;
432 selected.push(col.clone());
433 }
434 ColumnarTable::from_columns(selected)
435}
436
437pub fn filter_table(table: &ColumnarTable, predicate: &Predicate) -> Result<ColumnarTable> {
439 let mask = predicate.evaluate(table)?;
440 let mut columns = Vec::with_capacity(table.num_columns());
441
442 for col in table.columns() {
443 let filtered_data = match &col.data {
444 ColumnData::Float64(v) => {
445 let filtered: Vec<f64> = v
446 .iter()
447 .zip(mask.iter())
448 .filter(|(_, &m)| m)
449 .map(|(&val, _)| val)
450 .collect();
451 ColumnData::Float64(filtered)
452 }
453 ColumnData::Int64(v) => {
454 let filtered: Vec<i64> = v
455 .iter()
456 .zip(mask.iter())
457 .filter(|(_, &m)| m)
458 .map(|(&val, _)| val)
459 .collect();
460 ColumnData::Int64(filtered)
461 }
462 ColumnData::Str(v) => {
463 let filtered: Vec<String> = v
464 .iter()
465 .zip(mask.iter())
466 .filter(|(_, &m)| m)
467 .map(|(val, _)| val.clone())
468 .collect();
469 ColumnData::Str(filtered)
470 }
471 ColumnData::Bool(v) => {
472 let filtered: Vec<bool> = v
473 .iter()
474 .zip(mask.iter())
475 .filter(|(_, &m)| m)
476 .map(|(&val, _)| val)
477 .collect();
478 ColumnData::Bool(filtered)
479 }
480 };
481 columns.push(Column {
482 name: col.name.clone(),
483 data: filtered_data,
484 });
485 }
486
487 ColumnarTable::from_columns(columns)
488}
489
490#[cfg(test)]
495mod tests {
496 use super::*;
497
498 fn make_test_table() -> ColumnarTable {
499 ColumnarTable::from_columns(vec![
500 Column::float64("temp", vec![20.0, 22.5, 18.0, 25.0, 19.5]),
501 Column::int64("id", vec![1, 2, 3, 4, 5]),
502 Column::string(
503 "city",
504 vec![
505 "Tokyo".into(),
506 "Osaka".into(),
507 "Tokyo".into(),
508 "Kyoto".into(),
509 "Osaka".into(),
510 ],
511 ),
512 Column::boolean("active", vec![true, true, false, true, false]),
513 ])
514 .expect("table creation failed")
515 }
516
517 #[test]
518 fn test_column_stats_float64() {
519 let col = Column::float64("temp", vec![20.0, 22.5, 18.0, 25.0, 19.5]);
520 let stats = ColumnStats::from_column(&col);
521 assert_eq!(stats.count, 5);
522 assert_eq!(stats.null_count, 0);
523 assert!((stats.min.expect("no min") - 18.0).abs() < 1e-10);
524 assert!((stats.max.expect("no max") - 25.0).abs() < 1e-10);
525 assert!((stats.sum.expect("no sum") - 105.0).abs() < 1e-10);
526 }
527
528 #[test]
529 fn test_column_stats_int64() {
530 let col = Column::int64("id", vec![1, 2, 3, 4, 5]);
531 let stats = ColumnStats::from_column(&col);
532 assert_eq!(stats.count, 5);
533 assert!((stats.min.expect("no min") - 1.0).abs() < 1e-10);
534 assert!((stats.max.expect("no max") - 5.0).abs() < 1e-10);
535 assert!((stats.sum.expect("no sum") - 15.0).abs() < 1e-10);
536 }
537
538 #[test]
539 fn test_column_stats_string() {
540 let col = Column::string("city", vec!["a".into(), "b".into(), "a".into(), "c".into()]);
541 let stats = ColumnStats::from_column(&col);
542 assert_eq!(stats.count, 4);
543 assert!(stats.min.is_none());
544 assert!(stats.max.is_none());
545 assert_eq!(stats.distinct_count, Some(3));
546 }
547
548 #[test]
549 fn test_column_stats_bool() {
550 let col = Column::boolean("flags", vec![true, false, true, true, false]);
551 let stats = ColumnStats::from_column(&col);
552 assert_eq!(stats.count, 5);
553 assert_eq!(stats.distinct_count, Some(2));
554 assert!((stats.sum.expect("no sum") - 3.0).abs() < 1e-10);
555 }
556
557 #[test]
558 fn test_table_stats() {
559 let table = make_test_table();
560 let stats = TableStats::from_table(&table);
561 assert_eq!(stats.num_rows, 5);
562 assert_eq!(stats.columns.len(), 4);
563
564 let temp_stats = stats.column_stats("temp").expect("temp stats missing");
565 assert!((temp_stats.min.expect("no min") - 18.0).abs() < 1e-10);
566 }
567
568 #[test]
569 fn test_row_group_split() {
570 let table = make_test_table();
571 let config = RowGroupConfig {
572 max_rows_per_group: 2,
573 };
574 let groups = split_into_row_groups(&table, &config).expect("split failed");
575
576 assert_eq!(groups.len(), 3);
578 assert_eq!(groups[0].start_row, 0);
579 assert_eq!(groups[0].num_rows, 2);
580 assert_eq!(groups[1].start_row, 2);
581 assert_eq!(groups[1].num_rows, 2);
582 assert_eq!(groups[2].start_row, 4);
583 assert_eq!(groups[2].num_rows, 1);
584 }
585
586 #[test]
587 fn test_row_group_stats() {
588 let table = make_test_table();
589 let config = RowGroupConfig {
590 max_rows_per_group: 3,
591 };
592 let groups = split_into_row_groups(&table, &config).expect("split failed");
593
594 let g0_temp = groups[0]
596 .stats
597 .iter()
598 .find(|s| s.name == "temp")
599 .expect("temp stats");
600 assert!((g0_temp.min.expect("no min") - 18.0).abs() < 1e-10);
601 assert!((g0_temp.max.expect("no max") - 22.5).abs() < 1e-10);
602 }
603
604 #[test]
605 fn test_extract_row_group() {
606 let table = make_test_table();
607 let config = RowGroupConfig {
608 max_rows_per_group: 2,
609 };
610 let groups = split_into_row_groups(&table, &config).expect("split failed");
611
612 let sub = extract_row_group(&table, &groups[1]).expect("extract failed");
613 assert_eq!(sub.num_rows(), 2);
614 let ids = sub.get_i64("id").expect("get id failed");
615 assert_eq!(ids, &[3, 4]);
616 }
617
618 #[test]
619 fn test_predicate_pushdown_float_range() {
620 let table = make_test_table();
621 let config = RowGroupConfig {
622 max_rows_per_group: 2,
623 };
624 let groups = split_into_row_groups(&table, &config).expect("split failed");
625
626 let pred = Predicate::FloatRange("temp".to_string(), 24.0, 30.0);
628
629 let matching: Vec<usize> = groups
631 .iter()
632 .enumerate()
633 .filter(|(_, g)| pred.could_match_row_group(g))
634 .map(|(i, _)| i)
635 .collect();
636
637 assert_eq!(matching, vec![1]);
641 }
642
643 #[test]
644 fn test_predicate_evaluate_int_equals() {
645 let table = make_test_table();
646 let pred = Predicate::IntEquals("id".to_string(), 3);
647 let mask = pred.evaluate(&table).expect("eval failed");
648 assert_eq!(mask, vec![false, false, true, false, false]);
649 }
650
651 #[test]
652 fn test_predicate_evaluate_str_equals() {
653 let table = make_test_table();
654 let pred = Predicate::StrEquals("city".to_string(), "Tokyo".to_string());
655 let mask = pred.evaluate(&table).expect("eval failed");
656 assert_eq!(mask, vec![true, false, true, false, false]);
657 }
658
659 #[test]
660 fn test_predicate_and() {
661 let table = make_test_table();
662 let pred = Predicate::And(vec![
663 Predicate::FloatRange("temp".to_string(), 19.0, 23.0),
664 Predicate::BoolEquals("active".to_string(), true),
665 ]);
666 let mask = pred.evaluate(&table).expect("eval failed");
667 assert_eq!(mask, vec![true, true, false, false, false]);
671 }
672
673 #[test]
674 fn test_predicate_or() {
675 let table = make_test_table();
676 let pred = Predicate::Or(vec![
677 Predicate::IntEquals("id".to_string(), 1),
678 Predicate::IntEquals("id".to_string(), 5),
679 ]);
680 let mask = pred.evaluate(&table).expect("eval failed");
681 assert_eq!(mask, vec![true, false, false, false, true]);
682 }
683
684 #[test]
685 fn test_select_columns() {
686 let table = make_test_table();
687 let sub = select_columns(&table, &["temp", "city"]).expect("select failed");
688 assert_eq!(sub.num_columns(), 2);
689 assert_eq!(sub.column_names(), vec!["temp", "city"]);
690 }
691
692 #[test]
693 fn test_filter_table() {
694 let table = make_test_table();
695 let pred = Predicate::BoolEquals("active".to_string(), true);
696 let filtered = filter_table(&table, &pred).expect("filter failed");
697 assert_eq!(filtered.num_rows(), 3);
698 let ids = filtered.get_i64("id").expect("get id failed");
699 assert_eq!(ids, &[1, 2, 4]);
700 }
701
702 #[test]
703 fn test_filter_table_combined() {
704 let table = make_test_table();
705 let pred = Predicate::And(vec![
707 Predicate::StrEquals("city".to_string(), "Tokyo".to_string()),
708 Predicate::FloatRange("temp".to_string(), 18.0, f64::MAX),
709 ]);
710 let filtered = filter_table(&table, &pred).expect("filter failed");
711 assert_eq!(filtered.num_rows(), 2);
712 let temps = filtered.get_f64("temp").expect("get temp failed");
713 assert!((temps[0] - 20.0).abs() < 1e-10);
714 assert!((temps[1] - 18.0).abs() < 1e-10);
715 }
716
717 #[test]
718 fn test_column_projection_read() {
719 let dir = std::env::temp_dir().join("scirs2_col_proj_test");
720 let _ = std::fs::create_dir_all(&dir);
721 let path = dir.join("proj.scircol");
722
723 let table = make_test_table();
724 super::super::writer::write_columnar(&path, &table).expect("write failed");
725
726 let sub = read_columnar_with_columns(&path, &["id", "active"]).expect("read failed");
727 assert_eq!(sub.num_columns(), 2);
728 assert_eq!(sub.column_names(), vec!["id", "active"]);
729 assert_eq!(sub.num_rows(), 5);
730
731 let _ = std::fs::remove_dir_all(&dir);
732 }
733
734 #[test]
735 fn test_empty_table_stats() {
736 let table = ColumnarTable::new();
737 let stats = TableStats::from_table(&table);
738 assert_eq!(stats.num_rows, 0);
739 assert!(stats.columns.is_empty());
740 }
741
742 #[test]
743 fn test_empty_column_stats() {
744 let col = Column::float64("empty", Vec::new());
745 let stats = ColumnStats::from_column(&col);
746 assert_eq!(stats.count, 0);
747 assert!(stats.min.is_none());
748 assert!(stats.max.is_none());
749 assert!(stats.sum.is_none());
750 }
751
752 #[test]
753 fn test_could_contain_value() {
754 let col = Column::float64("x", vec![10.0, 20.0, 30.0]);
755 let stats = ColumnStats::from_column(&col);
756 assert!(stats.could_contain_value(15.0));
757 assert!(stats.could_contain_value(10.0));
758 assert!(stats.could_contain_value(30.0));
759 assert!(!stats.could_contain_value(5.0));
760 assert!(!stats.could_contain_value(35.0));
761 }
762
763 #[test]
764 fn test_row_groups_empty_table() {
765 let table = ColumnarTable::new();
766 let groups =
767 split_into_row_groups(&table, &RowGroupConfig::default()).expect("split failed");
768 assert!(groups.is_empty());
769 }
770}