use crate::types::{ColumnType, RowId, SqlRow, TableSchema, Value, Timestamp};
use super::config::ColumnarConfig;
pub enum ColumnBuffer {
Timestamp(Vec<i64>),
Integer(Vec<i64>),
Float(Vec<f64>),
Bool(Vec<Option<bool>>),
Text(Vec<Option<String>>),
Other(Vec<Value>),
}
impl ColumnBuffer {
pub(crate) fn new(col_type: &ColumnType) -> Self {
match col_type {
ColumnType::Timestamp => Self::Timestamp(Vec::new()),
ColumnType::Integer => Self::Integer(Vec::new()),
ColumnType::Float => Self::Float(Vec::new()),
ColumnType::Boolean => Self::Bool(Vec::new()),
ColumnType::Text => Self::Text(Vec::new()),
_ => Self::Other(Vec::new()),
}
}
pub(crate) fn push(&mut self, value: Value) {
match (self, value) {
(Self::Timestamp(v), Value::Timestamp(ts)) => v.push(ts.as_micros()),
(Self::Integer(v), Value::Integer(i)) => v.push(i),
(Self::Float(v), Value::Float(f)) => v.push(f),
(Self::Bool(v), Value::Bool(b)) => v.push(Some(b)),
(Self::Bool(v), Value::Null) => v.push(None),
(Self::Text(v), Value::Text(s)) => v.push(Some(s)),
(Self::Text(v), Value::Null) => v.push(None),
(Self::Other(v), val) => v.push(val),
(Self::Timestamp(v), val) => {
v.push(val_to_i64(&val).unwrap_or(0));
}
(Self::Integer(v), val) => {
v.push(val_to_i64(&val).unwrap_or(0));
}
(Self::Float(v), val) => {
v.push(val_to_f64(&val).unwrap_or(0.0));
}
_ => {}
}
}
fn byte_size(&self) -> usize {
match self {
Self::Timestamp(v) => v.len() * 8,
Self::Integer(v) => v.len() * 8,
Self::Float(v) => v.len() * 8,
Self::Bool(v) => v.len(),
Self::Text(v) => v.iter().map(|s| s.as_ref().map_or(1, |s| s.len() + 8)).sum(),
Self::Other(v) => v.len() * 32, }
}
fn reorder(&mut self, perm: &[usize]) {
match self {
Self::Timestamp(v) => perm_reorder(v, perm),
Self::Integer(v) => perm_reorder(v, perm),
Self::Float(v) => perm_reorder(v, perm),
Self::Bool(v) => perm_reorder(v, perm),
Self::Text(v) => perm_reorder(v, perm),
Self::Other(v) => perm_reorder(v, perm),
}
}
pub(crate) fn compute_statistics(&self, col_id: u16) -> Option<super::segment::ColumnStatistics> {
use super::segment::{value_to_raw_bytes, ColumnStatistics};
match self {
Self::Timestamp(vals) if !vals.is_empty() => {
let min = *vals.iter().min().unwrap();
let max = *vals.iter().max().unwrap();
Some(ColumnStatistics {
column_id: col_id,
min_value_raw: min.to_le_bytes(),
max_value_raw: max.to_le_bytes(),
null_count: 0,
})
}
Self::Integer(vals) if !vals.is_empty() => {
let min = *vals.iter().min().unwrap();
let max = *vals.iter().max().unwrap();
Some(ColumnStatistics {
column_id: col_id,
min_value_raw: min.to_le_bytes(),
max_value_raw: max.to_le_bytes(),
null_count: 0,
})
}
Self::Float(vals) if !vals.is_empty() => {
let min = *vals.iter().reduce(|a, b| if a < b { a } else { b }).unwrap();
let max = *vals.iter().reduce(|a, b| if a > b { a } else { b }).unwrap();
Some(ColumnStatistics {
column_id: col_id,
min_value_raw: min.to_le_bytes(),
max_value_raw: max.to_le_bytes(),
null_count: 0,
})
}
Self::Bool(vals) if !vals.is_empty() => {
let mut min_val = [0u8; 8];
let mut max_val = [0u8; 8];
let mut has_false = false;
let mut has_true = false;
let mut null_count = 0u32;
for v in vals {
match v {
Some(false) => has_false = true,
Some(true) => has_true = true,
None => null_count += 1,
}
}
if has_false { min_val[0] = 0; }
if has_true { min_val[0] = if has_false { 0 } else { 1 }; max_val[0] = 1; }
if !has_false && !has_true && null_count > 0 {
return Some(ColumnStatistics {
column_id: col_id,
min_value_raw: [0u8; 8],
max_value_raw: [0u8; 8],
null_count: vals.len() as u32,
});
}
Some(ColumnStatistics {
column_id: col_id,
min_value_raw: min_val,
max_value_raw: max_val,
null_count,
})
}
Self::Text(vals) if !vals.is_empty() => {
let non_null: Vec<&String> = vals.iter().filter_map(|v| v.as_ref()).collect();
if non_null.is_empty() {
return Some(ColumnStatistics {
column_id: col_id,
min_value_raw: [0u8; 8],
max_value_raw: [0u8; 8],
null_count: vals.len() as u32,
});
}
let min = non_null.iter().min().unwrap();
let max = non_null.iter().max().unwrap();
let null_count = vals.iter().filter(|v| v.is_none()).count() as u32;
Some(ColumnStatistics {
column_id: col_id,
min_value_raw: value_to_raw_bytes(&Value::Text(min.to_string())),
max_value_raw: value_to_raw_bytes(&Value::Text(max.to_string())),
null_count,
})
}
_ => None,
}
}
}
fn val_to_i64(v: &Value) -> Option<i64> {
match v {
Value::Integer(i) => Some(*i),
Value::Float(f) => Some(*f as i64),
Value::Bool(b) => Some(if *b { 1 } else { 0 }),
Value::Timestamp(ts) => Some(ts.as_micros()),
_ => None,
}
}
fn val_to_f64(v: &Value) -> Option<f64> {
match v {
Value::Float(f) => Some(*f),
Value::Integer(i) => Some(*i as f64),
_ => None,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlushDecision {
Continue,
Flush,
}
pub struct BufferedBatch {
pub table_id: u32,
pub columns: Vec<ColumnBuffer>,
pub row_count: usize,
pub row_ids: Vec<RowId>,
pub min_timestamp: i64,
pub max_timestamp: i64,
}
fn perm_reorder<T: Clone>(v: &mut [T], perm: &[usize]) {
let old: Vec<T> = v.to_vec();
for (i, &old_idx) in perm.iter().enumerate() {
v[i] = old[old_idx].clone();
}
}
impl BufferedBatch {
pub fn sort_by_timestamp(&mut self, ts_col_idx: Option<usize>) {
let ts_idx = match ts_col_idx {
Some(idx) if idx < self.columns.len() => idx,
_ => return,
};
if self.row_count <= 1 {
return;
}
let timestamps: Vec<i64> = match &self.columns[ts_idx] {
ColumnBuffer::Timestamp(vals) => vals.clone(),
_ => return, };
let mut indices: Vec<usize> = (0..self.row_count).collect();
indices.sort_by_key(|&i| timestamps[i]);
for col in &mut self.columns {
col.reorder(&indices);
}
let old_ids = self.row_ids.clone();
for (i, &old_idx) in indices.iter().enumerate() {
self.row_ids[i] = old_ids[old_idx];
}
}
}
pub struct ColumnarWriteBuffer {
table_id: u32,
ts_column_idx: Option<usize>,
columns: Vec<ColumnBuffer>,
row_count: usize,
byte_size: usize,
row_ids: Vec<RowId>,
min_timestamp: Option<i64>,
max_timestamp: Option<i64>,
config: ColumnarConfig,
}
impl ColumnarWriteBuffer {
pub fn new(table_id: u32, schema: &TableSchema, config: ColumnarConfig) -> Self {
let ts_column_idx = schema.timeseries_column.as_ref().and_then(|ts_col| {
schema.columns.iter().position(|c| c.name == *ts_col)
});
let columns = schema.columns.iter()
.map(|c| ColumnBuffer::new(&c.col_type))
.collect();
Self {
table_id,
ts_column_idx,
columns,
row_count: 0,
byte_size: 0,
row_ids: Vec::new(),
min_timestamp: None,
max_timestamp: None,
config,
}
}
pub fn append(&mut self, row_id: RowId, row: &[Value]) -> FlushDecision {
debug_assert_eq!(row.len(), self.columns.len());
for (i, col) in self.columns.iter_mut().enumerate() {
col.push(row[i].clone());
}
if let Some(ts_idx) = self.ts_column_idx {
if let Value::Timestamp(ts) = &row[ts_idx] {
let micros = ts.as_micros();
self.min_timestamp = Some(self.min_timestamp.map_or(micros, |m| m.min(micros)));
self.max_timestamp = Some(self.max_timestamp.map_or(micros, |m| m.max(micros)));
}
}
self.row_ids.push(row_id);
self.row_count += 1;
self.byte_size = self.columns.iter().map(|c| c.byte_size()).sum::<usize>()
+ self.row_ids.len() * 8;
if self.row_count >= self.config.buffer_row_capacity
|| self.byte_size >= self.config.buffer_byte_capacity
{
FlushDecision::Flush
} else {
FlushDecision::Continue
}
}
pub fn take(&mut self) -> Option<BufferedBatch> {
if self.row_count == 0 {
return None;
}
let mut new_columns = Vec::with_capacity(self.columns.len());
for col in &self.columns {
new_columns.push(match col {
ColumnBuffer::Timestamp(_) => ColumnBuffer::Timestamp(Vec::new()),
ColumnBuffer::Integer(_) => ColumnBuffer::Integer(Vec::new()),
ColumnBuffer::Float(_) => ColumnBuffer::Float(Vec::new()),
ColumnBuffer::Bool(_) => ColumnBuffer::Bool(Vec::new()),
ColumnBuffer::Text(_) => ColumnBuffer::Text(Vec::new()),
ColumnBuffer::Other(_) => ColumnBuffer::Other(Vec::new()),
});
}
let old_columns = std::mem::replace(&mut self.columns, new_columns);
let batch = BufferedBatch {
table_id: self.table_id,
columns: old_columns,
row_count: self.row_count,
row_ids: std::mem::take(&mut self.row_ids),
min_timestamp: self.min_timestamp.unwrap_or(0),
max_timestamp: self.max_timestamp.unwrap_or(0),
};
self.row_count = 0;
self.byte_size = 0;
self.min_timestamp = None;
self.max_timestamp = None;
Some(batch)
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn is_empty(&self) -> bool {
self.row_count == 0
}
pub fn timestamp_range(&self) -> Option<(i64, i64)> {
self.min_timestamp.zip(self.max_timestamp)
}
pub fn table_id(&self) -> u32 {
self.table_id
}
pub fn snapshot_rows(
&self,
start_ts: i64,
end_ts: i64,
_schema: &TableSchema,
column_ids: &[(u16, String)],
) -> Vec<(RowId, SqlRow)> {
if self.row_count == 0 {
return Vec::new();
}
let mut results = Vec::new();
for row_idx in 0..self.row_count {
if let Some(ts_idx) = self.ts_column_idx {
let in_range = match &self.columns[ts_idx] {
ColumnBuffer::Timestamp(vals) => {
vals.get(row_idx).is_some_and(|&ts| ts >= start_ts && ts <= end_ts)
}
_ => false,
};
if !in_range {
continue;
}
}
let mut sql_row = SqlRow::new();
for &(col_id, ref col_name) in column_ids {
let idx = col_id as usize;
if idx >= self.columns.len() {
continue;
}
let val = match &self.columns[idx] {
ColumnBuffer::Timestamp(vals) => {
vals.get(row_idx).map(|&ts| Value::Timestamp(Timestamp::from_micros(ts)))
}
ColumnBuffer::Integer(vals) => {
vals.get(row_idx).copied().map(Value::Integer)
}
ColumnBuffer::Float(vals) => {
vals.get(row_idx).copied().map(Value::Float)
}
ColumnBuffer::Bool(vals) => {
vals.get(row_idx).and_then(|v| v.map(Value::Bool)).or(Some(Value::Null))
}
ColumnBuffer::Text(vals) => {
vals.get(row_idx).and_then(|v| v.as_ref().map(|s| Value::Text(s.clone()))).or(Some(Value::Null))
}
ColumnBuffer::Other(vals) => {
vals.get(row_idx).cloned()
}
};
if let Some(v) = val {
sql_row.insert(col_name.clone(), v);
}
}
let row_id = self.row_ids.get(row_idx).copied().unwrap_or(0);
results.push((row_id, sql_row));
}
results
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{ColumnDef, ColumnType, Timestamp};
use std::sync::Arc;
fn make_schema() -> Arc<TableSchema> {
use crate::types::TableType;
let columns = vec![
ColumnDef::new("ts".to_string(), ColumnType::Timestamp, 0),
ColumnDef::new("temp".to_string(), ColumnType::Float, 1),
ColumnDef::new("label".to_string(), ColumnType::Text, 2),
];
let mut schema = crate::types::TableSchema::new("sensors".to_string(), columns);
schema.table_type = TableType::TimeSeries;
schema.timeseries_column = Some("ts".to_string());
Arc::new(schema)
}
#[test]
fn test_buffer_append_and_count() {
let schema = make_schema();
let config = ColumnarConfig::default();
let mut buf = ColumnarWriteBuffer::new(1, &schema, config);
let row = vec![
Value::Timestamp(Timestamp::from_micros(1000)),
Value::Float(25.5),
Value::Text("ok".to_string()),
];
let decision = buf.append(0, &row);
assert_eq!(decision, FlushDecision::Continue);
assert_eq!(buf.row_count(), 1);
assert!(!buf.is_empty());
assert_eq!(buf.timestamp_range(), Some((1000, 1000)));
}
#[test]
fn test_buffer_flush_decision() {
let schema = make_schema();
let mut config = ColumnarConfig::default();
config.buffer_row_capacity = 5;
let mut buf = ColumnarWriteBuffer::new(1, &schema, config);
for i in 0..5 {
let row = vec![
Value::Timestamp(Timestamp::from_micros(i * 1000)),
Value::Float(i as f64),
Value::Text(format!("row_{}", i)),
];
let decision = buf.append(i as u64, &row);
if i < 4 {
assert_eq!(decision, FlushDecision::Continue);
} else {
assert_eq!(decision, FlushDecision::Flush);
}
}
assert_eq!(buf.row_count(), 5);
}
#[test]
fn test_buffer_take_clears() {
let schema = make_schema();
let config = ColumnarConfig::default();
let mut buf = ColumnarWriteBuffer::new(1, &schema, config);
let row = vec![
Value::Timestamp(Timestamp::from_micros(1000)),
Value::Float(25.0),
Value::Text("hello".to_string()),
];
buf.append(0, &row);
let batch = buf.take().unwrap();
assert_eq!(batch.row_count, 1);
assert_eq!(batch.row_ids, vec![0]);
assert!(buf.is_empty());
assert_eq!(buf.row_count(), 0);
assert!(buf.take().is_none());
}
#[test]
fn test_buffer_timestamp_tracking() {
let schema = make_schema();
let config = ColumnarConfig::default();
let mut buf = ColumnarWriteBuffer::new(1, &schema, config);
for i in 0..10 {
let row = vec![
Value::Timestamp(Timestamp::from_micros(1000 + i * 500)),
Value::Float(i as f64),
Value::Text("x".to_string()),
];
buf.append(i as u64, &row);
}
assert_eq!(buf.timestamp_range(), Some((1000, 5500)));
let batch = buf.take().unwrap();
assert_eq!(batch.min_timestamp, 1000);
assert_eq!(batch.max_timestamp, 5500);
}
}