use super::columnar_memtable::{ColumnData, ColumnarDrainResult, ColumnarSchema};
#[derive(Debug, Clone)]
pub struct ProjectionDef {
pub name: String,
pub sort_columns: Vec<usize>,
pub ascending: Vec<bool>,
}
pub fn compute_sort_order(
drain: &ColumnarDrainResult,
sort_columns: &[usize],
ascending: &[bool],
) -> Vec<usize> {
let row_count = drain.row_count as usize;
let mut indices: Vec<usize> = (0..row_count).collect();
indices.sort_by(|&a, &b| {
for (i, &col_idx) in sort_columns.iter().enumerate() {
let asc = ascending.get(i).copied().unwrap_or(true);
let ord = compare_column_values(&drain.columns[col_idx], a, b);
let ord = if asc { ord } else { ord.reverse() };
if ord != std::cmp::Ordering::Equal {
return ord;
}
}
std::cmp::Ordering::Equal
});
indices
}
pub fn apply_permutation(data: &ColumnData, perm: &[usize]) -> ColumnData {
match data {
ColumnData::Timestamp(v) => ColumnData::Timestamp(perm.iter().map(|&i| v[i]).collect()),
ColumnData::Float64(v) => ColumnData::Float64(perm.iter().map(|&i| v[i]).collect()),
ColumnData::Int64(v) => ColumnData::Int64(perm.iter().map(|&i| v[i]).collect()),
ColumnData::Symbol(v) => ColumnData::Symbol(perm.iter().map(|&i| v[i]).collect()),
}
}
fn compare_column_values(col: &ColumnData, a: usize, b: usize) -> std::cmp::Ordering {
match col {
ColumnData::Timestamp(v) => v[a].cmp(&v[b]),
ColumnData::Float64(v) => v[a].partial_cmp(&v[b]).unwrap_or(std::cmp::Ordering::Equal),
ColumnData::Int64(v) => v[a].cmp(&v[b]),
ColumnData::Symbol(v) => v[a].cmp(&v[b]),
}
}
pub fn should_use_tag_projection(
_schema: &ColumnarSchema,
tag_predicates: &[(usize, u32)], time_selectivity: f64, ) -> bool {
if tag_predicates.is_empty() {
return false; }
time_selectivity > 0.5
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::timeseries::columnar_memtable::{
ColumnType, ColumnValue, ColumnarMemtable, ColumnarMemtableConfig,
};
fn test_config() -> ColumnarMemtableConfig {
ColumnarMemtableConfig {
max_memory_bytes: 10 * 1024 * 1024,
hard_memory_limit: 20 * 1024 * 1024,
max_tag_cardinality: 1000,
}
}
#[test]
fn sort_by_tag_then_timestamp() {
let schema = ColumnarSchema {
columns: vec![
("timestamp".into(), ColumnType::Timestamp),
("value".into(), ColumnType::Float64),
("host".into(), ColumnType::Symbol),
],
timestamp_idx: 0,
codecs: vec![nodedb_codec::ColumnCodec::Auto; 3],
};
let mut mt = ColumnarMemtable::new(schema, test_config());
for i in 0..10 {
let host = if i % 2 == 0 { "prod-1" } else { "prod-2" };
mt.ingest_row(
(i % 2) as u64,
&[
ColumnValue::Timestamp(1000 + i as i64),
ColumnValue::Float64(i as f64),
ColumnValue::Symbol(host),
],
)
.unwrap();
}
let drain = mt.drain();
let perm = compute_sort_order(&drain, &[2, 0], &[true, true]);
let reordered_hosts = apply_permutation(&drain.columns[2], &perm);
let hosts = reordered_hosts.as_symbols();
assert!(hosts[..5].iter().all(|&h| h == hosts[0]));
assert!(hosts[5..].iter().all(|&h| h == hosts[5]));
assert_ne!(hosts[0], hosts[5]);
}
#[test]
fn identity_permutation_for_sorted_data() {
let schema = ColumnarSchema {
columns: vec![
("timestamp".into(), ColumnType::Timestamp),
("value".into(), ColumnType::Float64),
],
timestamp_idx: 0,
codecs: vec![nodedb_codec::ColumnCodec::Auto; 2],
};
let mut mt = ColumnarMemtable::new(schema, test_config());
for i in 0..10 {
mt.ingest_row(
1,
&[ColumnValue::Timestamp(i), ColumnValue::Float64(i as f64)],
)
.unwrap();
}
let drain = mt.drain();
let perm = compute_sort_order(&drain, &[0], &[true]);
assert_eq!(perm, (0..10).collect::<Vec<_>>());
}
#[test]
fn tag_projection_heuristic() {
let schema = ColumnarSchema {
columns: vec![
("timestamp".into(), ColumnType::Timestamp),
("host".into(), ColumnType::Symbol),
],
timestamp_idx: 0,
codecs: vec![nodedb_codec::ColumnCodec::Auto; 2],
};
assert!(should_use_tag_projection(&schema, &[(1, 0)], 0.8));
assert!(!should_use_tag_projection(&schema, &[(1, 0)], 0.1));
assert!(!should_use_tag_projection(&schema, &[], 0.9));
}
}