mod aggregate;
mod bitmap;
mod compile;
mod scan;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, OnceLock, RwLock};
use std::thread::JoinHandle;
use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail, ensure};
use rayon::{ThreadPool, ThreadPoolBuilder, prelude::ParallelSliceMut};
use rustc_hash::FxHashMap;
use crate::sql::{
AggregateKind, AlterTableOperationPlan, AlterTablePlan, ArithmeticOp, Command, EvalPlan,
OrderByTerm, ProjectionExpr, ScalarExpr, SelectPlan, parse_sql,
};
use crate::storage::{
DbOptions, PendingBatch, Storage, StoredRowGroup, Table, row_group_from_columnar_batch,
};
use crate::types::{ColumnarBatch, DataType, Schema, Value};
use ordered_float::OrderedFloat;
use aggregate::AggState;
use compile::{
CompiledFilterExpr, CompiledProjection, CompiledProjectionExpr, CompiledScalarExpr,
column_index, compile_filter_expr, compile_projections, required_column_indexes,
};
use scan::{
aggregate_row_group_all, aggregate_row_group_grouped, collect_row_group_aggregates,
collect_row_group_int64_values, count_row_group_bool, count_row_group_int64,
parallel_scan_table,
};
pub struct NarrowDb {
inner: Arc<RwLock<DbInner>>,
flush_handle: Mutex<Option<FlushHandle>>,
}
struct DbInner {
query_runtime: QueryRuntime,
storage: Storage,
tables: FxHashMap<String, Table>,
}
struct FlushHandle {
stop: Arc<AtomicBool>,
thread: Option<JoinHandle<()>>,
}
#[derive(Clone)]
struct QueryRuntime {
threads: usize,
pool: Arc<ThreadPool>,
}
static QUERY_THREAD_POOLS: OnceLock<Mutex<FxHashMap<usize, Arc<ThreadPool>>>> = OnceLock::new();
#[derive(Clone, Copy)]
enum FastGroupedCountKeyType {
Int64,
Bool,
}
#[derive(Clone, Copy)]
enum FastGroupedCountMode {
Hash,
SortedTopK,
}
struct FastGroupedCountPlan {
key_index: usize,
key_type: FastGroupedCountKeyType,
order_by_count_desc: bool,
mode: FastGroupedCountMode,
}
#[derive(Debug, Clone)]
pub struct QueryResult {
pub columns: Vec<String>,
pub rows: Vec<Vec<Value>>,
}
impl QueryResult {
pub fn empty() -> Self {
Self {
columns: Vec::new(),
rows: Vec::new(),
}
}
}
impl QueryRuntime {
fn new(configured_threads: Option<usize>) -> Result<Self> {
let threads = match configured_threads {
Some(0) => bail!("query_threads must be greater than zero"),
Some(threads) => threads,
None => std::thread::available_parallelism()
.map(|threads| threads.get())
.unwrap_or(1),
};
Ok(Self {
threads,
pool: query_thread_pool(threads)?,
})
}
}
fn query_thread_pool(threads: usize) -> Result<Arc<ThreadPool>> {
let pools = QUERY_THREAD_POOLS.get_or_init(|| Mutex::new(FxHashMap::default()));
let mut guard = pools
.lock()
.map_err(|_| anyhow!("query thread pool lock poisoned"))?;
if let Some(pool) = guard.get(&threads) {
return Ok(pool.clone());
}
let pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(threads)
.thread_name(|index| format!("narrowdb-query-{index}"))
.build()
.context("failed to build query thread pool")?,
);
guard.insert(threads, pool.clone());
Ok(pool)
}
impl NarrowDb {
pub fn open(path: impl AsRef<Path>, options: DbOptions) -> Result<Self> {
let query_runtime = QueryRuntime::new(options.query_threads)?;
let auto_flush_interval = options.auto_flush_interval;
let (storage, tables) = Storage::open(path, options)?;
let inner = Arc::new(RwLock::new(DbInner {
query_runtime,
storage,
tables,
}));
let flush_handle = if let Some(interval) = auto_flush_interval {
let weak = Arc::downgrade(&inner);
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let thread = std::thread::spawn(move || {
background_flush_loop(weak, interval, stop_clone);
});
Some(FlushHandle {
stop,
thread: Some(thread),
})
} else {
None
};
Ok(Self {
inner,
flush_handle: Mutex::new(flush_handle),
})
}
pub fn path(&self) -> Result<std::path::PathBuf> {
let inner = self.inner.read().map_err(|_| anyhow!("lock poisoned"))?;
Ok(inner.storage.path().to_path_buf())
}
pub fn create_table(&self, schema: Schema) -> Result<()> {
let mut inner = self.inner.write().map_err(|_| anyhow!("lock poisoned"))?;
inner.create_table_inner(schema)
}
pub fn insert_row(&self, table_name: &str, row: Vec<Value>) -> Result<()> {
let mut inner = self.inner.write().map_err(|_| anyhow!("lock poisoned"))?;
inner.insert_row_inner(table_name, row)
}
pub fn insert_rows(&self, table_name: &str, rows: Vec<Vec<Value>>) -> Result<()> {
let mut inner = self.inner.write().map_err(|_| anyhow!("lock poisoned"))?;
inner.insert_rows_inner(table_name, rows)?;
Ok(())
}
pub fn insert_rows_iter<I>(&self, table_name: &str, rows: I) -> Result<usize>
where
I: IntoIterator<Item = Vec<Value>>,
{
let mut inner = self.inner.write().map_err(|_| anyhow!("lock poisoned"))?;
inner.insert_rows_inner(table_name, rows)
}
pub fn insert_columnar_batch(&self, table_name: &str, batch: ColumnarBatch) -> Result<()> {
let mut inner = self.inner.write().map_err(|_| anyhow!("lock poisoned"))?;
inner.insert_columnar_batch_inner(table_name, batch)
}
pub fn flush_table(&self, table_name: &str) -> Result<()> {
let mut inner = self.inner.write().map_err(|_| anyhow!("lock poisoned"))?;
inner.flush_table_inner(table_name)
}
pub fn flush_all(&self) -> Result<()> {
let mut inner = self.inner.write().map_err(|_| anyhow!("lock poisoned"))?;
inner.flush_all_inner()
}
pub fn execute_one(&self, sql: &str) -> Result<QueryResult> {
let mut results = self.execute_sql(sql)?;
results.pop().context("no result from SQL statement")
}
pub fn execute_sql(&self, sql: &str) -> Result<Vec<QueryResult>> {
let commands = parse_sql(sql)?;
let mut inner = self.inner.write().map_err(|_| anyhow!("lock poisoned"))?;
inner.execute_commands(commands)
}
pub fn query(&self, sql: &str) -> Result<Vec<QueryResult>> {
let commands = parse_sql(sql)?;
let inner = self.inner.read().map_err(|_| anyhow!("lock poisoned"))?;
inner.execute_read_commands(commands)
}
}
impl Drop for NarrowDb {
fn drop(&mut self) {
if let Ok(mut handle) = self.flush_handle.lock()
&& let Some(fh) = handle.take()
{
fh.stop.store(true, AtomicOrdering::Relaxed);
if let Some(ref t) = fh.thread {
t.thread().unpark();
}
if let Some(t) = fh.thread {
let _ = t.join();
}
}
if let Ok(mut inner) = self.inner.write() {
let _ = inner.flush_all_inner();
}
}
}
fn background_flush_loop(
inner: std::sync::Weak<RwLock<DbInner>>,
interval: Duration,
stop: Arc<AtomicBool>,
) {
while !stop.load(AtomicOrdering::Relaxed) {
std::thread::park_timeout(interval);
if stop.load(AtomicOrdering::Relaxed) {
break;
}
let Some(inner) = inner.upgrade() else { break };
let Ok(mut guard) = inner.write() else { break };
let _ = guard.flush_all_inner();
}
}
impl DbInner {
fn create_table_inner(&mut self, schema: Schema) -> Result<()> {
ensure!(
!self.tables.contains_key(&schema.table_name),
"table {} already exists",
schema.table_name
);
self.storage.append_create_table(&schema)?;
let table = Table::new(schema.clone(), self.storage.options());
self.tables.insert(schema.table_name.clone(), table);
Ok(())
}
fn insert_row_inner(&mut self, table_name: &str, row: Vec<Value>) -> Result<()> {
let table = self
.tables
.get_mut(table_name)
.with_context(|| format!("unknown table {table_name}"))?;
ensure!(
row.len() == table.schema.columns.len(),
"row width mismatch"
);
let casted = row
.into_iter()
.zip(&table.schema.columns)
.map(|(value, column)| Value::cast_for(column.data_type, value))
.collect::<Result<Vec<Value>>>()?;
table.pending.append_row(casted)?;
if table.pending.rows() >= self.storage.options().row_group_size {
self.flush_table_inner(table_name)?;
}
Ok(())
}
fn insert_rows_inner<I>(&mut self, table_name: &str, rows: I) -> Result<usize>
where
I: IntoIterator<Item = Vec<Value>>,
{
let row_group_size = self.storage.options().row_group_size;
let mut inserted_rows = 0usize;
let (schema, flushed_groups) = {
let table = self
.tables
.get_mut(table_name)
.with_context(|| format!("unknown table {table_name}"))?;
let mut flushed_groups = Vec::new();
for row in rows {
inserted_rows += 1;
ensure!(
row.len() == table.schema.columns.len(),
"row width mismatch"
);
let casted = row
.into_iter()
.zip(&table.schema.columns)
.map(|(value, column)| Value::cast_for(column.data_type, value))
.collect::<Result<Vec<Value>>>()?;
table.pending.append_row(casted)?;
if table.pending.rows() >= row_group_size {
flushed_groups.push(table.pending.take_row_group());
}
}
(table.schema.clone(), flushed_groups)
};
for row_group in &flushed_groups {
self.storage
.append_row_group(table_name, row_group, &schema)?;
}
if !flushed_groups.is_empty() {
let table = self
.tables
.get_mut(table_name)
.with_context(|| format!("unknown table {table_name}"))?;
table.row_groups.extend(
flushed_groups
.into_iter()
.map(StoredRowGroup::from_row_group),
);
}
Ok(inserted_rows)
}
fn insert_columnar_batch_inner(
&mut self,
table_name: &str,
mut batch: ColumnarBatch,
) -> Result<()> {
let row_group_size = self.storage.options().row_group_size;
let (schema, flushed_groups) = {
let table = self
.tables
.get_mut(table_name)
.with_context(|| format!("unknown table {table_name}"))?;
batch.validate_against(&table.schema)?;
let mut flushed_groups = Vec::new();
while !batch.is_empty() {
if table.pending.rows() == 0 && batch.rows() >= row_group_size {
let direct_batch = batch.take_prefix(row_group_size)?;
flushed_groups.push(row_group_from_columnar_batch(direct_batch)?);
continue;
}
let room = row_group_size - table.pending.rows();
let chunk = batch.take_prefix(room.min(batch.rows()))?;
table.pending.append_columnar_batch(chunk)?;
if table.pending.rows() >= row_group_size {
flushed_groups.push(table.pending.take_row_group());
}
}
(table.schema.clone(), flushed_groups)
};
for row_group in &flushed_groups {
self.storage
.append_row_group(table_name, row_group, &schema)?;
}
if !flushed_groups.is_empty() {
let table = self
.tables
.get_mut(table_name)
.with_context(|| format!("unknown table {table_name}"))?;
table.row_groups.extend(
flushed_groups
.into_iter()
.map(StoredRowGroup::from_row_group),
);
}
Ok(())
}
fn flush_table_inner(&mut self, table_name: &str) -> Result<()> {
let table = self
.tables
.get_mut(table_name)
.with_context(|| format!("unknown table {table_name}"))?;
if table.pending.rows() == 0 {
return Ok(());
}
let row_group = table.pending.take_row_group();
self.storage
.append_row_group(table_name, &row_group, &table.schema)?;
self.storage.remap()?;
table
.row_groups
.push(StoredRowGroup::from_row_group(row_group));
table.pending = PendingBatch::new(&table.schema, self.storage.options().row_group_size);
Ok(())
}
fn flush_all_inner(&mut self) -> Result<()> {
let table_names = self.tables.keys().cloned().collect::<Vec<_>>();
for table_name in table_names {
self.flush_table_inner(&table_name)?;
}
Ok(())
}
fn alter_table_inner(&mut self, plan: AlterTablePlan) -> Result<()> {
self.flush_all_inner()?;
match plan.operation {
AlterTableOperationPlan::RenameTable { new_table_name } => {
self.rename_table_inner(&plan.table_name, &new_table_name, plan.if_exists)
}
AlterTableOperationPlan::RenameColumn {
old_column_name,
new_column_name,
} => self.rename_column_inner(
&plan.table_name,
&old_column_name,
&new_column_name,
plan.if_exists,
),
}
}
fn rename_table_inner(
&mut self,
table_name: &str,
new_table_name: &str,
if_exists: bool,
) -> Result<()> {
if !self.tables.contains_key(table_name) {
if if_exists {
return Ok(());
}
bail!("unknown table {table_name}");
}
if table_name == new_table_name {
return Ok(());
}
ensure!(
!self.tables.contains_key(new_table_name),
"table {new_table_name} already exists"
);
let mut table = self
.tables
.remove(table_name)
.expect("table existence checked above");
table.schema.table_name = new_table_name.to_string();
self.tables.insert(new_table_name.to_string(), table);
self.storage.rewrite_with_tables(&mut self.tables)?;
Ok(())
}
fn rename_column_inner(
&mut self,
table_name: &str,
old_column_name: &str,
new_column_name: &str,
if_exists: bool,
) -> Result<()> {
let Some(table) = self.tables.get_mut(table_name) else {
if if_exists {
return Ok(());
}
bail!("unknown table {table_name}");
};
let Some(column_index) = table
.schema
.columns
.iter()
.position(|column| column.name == old_column_name)
else {
bail!("unknown column {old_column_name} in table {table_name}");
};
if old_column_name != new_column_name {
ensure!(
!table
.schema
.columns
.iter()
.any(|column| column.name == new_column_name),
"column {new_column_name} already exists in table {table_name}"
);
table.schema.columns[column_index].name = new_column_name.to_string();
}
self.storage.rewrite_with_tables(&mut self.tables)?;
Ok(())
}
fn drop_table_inner(&mut self, table_name: &str, if_exists: bool) -> Result<()> {
self.flush_all_inner()?;
if self.tables.remove(table_name).is_none() {
if if_exists {
return Ok(());
}
bail!("unknown table {table_name}");
}
self.storage.rewrite_with_tables(&mut self.tables)?;
Ok(())
}
fn show_tables_inner(&self) -> QueryResult {
let mut names = self.tables.keys().cloned().collect::<Vec<_>>();
names.sort();
QueryResult {
columns: vec!["table_name".to_string()],
rows: names
.into_iter()
.map(|name| vec![Value::String(name)])
.collect(),
}
}
fn describe_table_inner(&self, table_name: &str) -> Result<QueryResult> {
let table = self
.tables
.get(table_name)
.with_context(|| format!("unknown table {table_name}"))?;
let rows = table
.schema
.columns
.iter()
.map(|column| {
vec![
Value::String(column.name.clone()),
Value::String(data_type_to_sql_name(column.data_type).to_string()),
]
})
.collect();
Ok(QueryResult {
columns: vec!["column_name".to_string(), "data_type".to_string()],
rows,
})
}
fn execute_commands(&mut self, commands: Vec<Command>) -> Result<Vec<QueryResult>> {
let mut results = Vec::new();
for command in commands {
match command {
Command::CreateTable(schema) => {
self.create_table_inner(schema)?;
results.push(QueryResult::empty());
}
Command::CreateTableIfNotExists(schema) => {
if !self.tables.contains_key(&schema.table_name) {
self.create_table_inner(schema)?;
}
results.push(QueryResult::empty());
}
Command::AlterTable(plan) => {
self.alter_table_inner(plan)?;
results.push(QueryResult::empty());
}
Command::DropTable(plan) => {
self.drop_table_inner(&plan.table_name, plan.if_exists)?;
results.push(QueryResult::empty());
}
Command::ShowTables => {
results.push(self.show_tables_inner());
}
Command::DescribeTable(table_name) => {
results.push(self.describe_table_inner(&table_name)?);
}
Command::Insert(plan) => {
self.insert_rows_inner(&plan.table_name, plan.rows)?;
results.push(QueryResult::empty());
}
Command::Select(plan) => {
self.flush_table_inner(&plan.table_name)?;
results.push(self.execute_select(&plan)?);
}
Command::Eval(plan) => {
results.push(execute_eval(plan)?);
}
}
}
Ok(results)
}
fn execute_read_commands(&self, commands: Vec<Command>) -> Result<Vec<QueryResult>> {
let mut results = Vec::new();
for command in commands {
match command {
Command::Select(plan) => {
results.push(self.execute_select(&plan)?);
}
Command::Eval(plan) => {
results.push(execute_eval(plan)?);
}
Command::ShowTables => {
results.push(self.show_tables_inner());
}
Command::DescribeTable(table_name) => {
results.push(self.describe_table_inner(&table_name)?);
}
_ => bail!("unexpected write command in read path"),
}
}
Ok(results)
}
fn execute_select(&self, plan: &SelectPlan) -> Result<QueryResult> {
let table = self
.tables
.get(&plan.table_name)
.with_context(|| format!("unknown table {}", plan.table_name))?;
if plan.projections.len() == 1
&& matches!(plan.projections[0].expr, ProjectionExpr::Column(ref name) if name == "*")
{
return self.select_all(table, plan);
}
let aggregate = plan
.projections
.iter()
.any(|projection| matches!(projection.expr, ProjectionExpr::Aggregate { .. }));
if aggregate || !plan.group_by.is_empty() {
self.select_aggregate(table, plan)
} else {
self.select_rows(table, plan)
}
}
fn select_all(&self, table: &Table, plan: &SelectPlan) -> Result<QueryResult> {
ensure!(
plan.group_by.is_empty(),
"SELECT * does not support GROUP BY"
);
let columns = table
.schema
.columns
.iter()
.map(|column| column.name.clone())
.collect::<Vec<_>>();
let filter = plan
.filter
.as_ref()
.map(|filter| compile_filter_expr(&table.schema, filter))
.transpose()?;
let required_columns = (0..table.schema.columns.len()).collect::<Vec<_>>();
let mapped = self.storage.mapped_bytes();
let col_count = table.schema.columns.len();
let query_threads = self.query_runtime.threads;
let mut rows = self.query_runtime.pool.install(|| {
parallel_scan_table(
table,
query_threads,
mapped,
filter.as_ref(),
&required_columns,
|row_group, row| {
(0..col_count)
.map(|idx| row_group.column(idx).value_at(row))
.collect()
},
)
})?;
apply_order_by_and_limit(&mut rows, &columns, &plan.order_by, plan.limit)?;
Ok(QueryResult { columns, rows })
}
fn select_rows(&self, table: &Table, plan: &SelectPlan) -> Result<QueryResult> {
let filter = plan
.filter
.as_ref()
.map(|filter| compile_filter_expr(&table.schema, filter))
.transpose()?;
let projections = compile_projections(&table.schema, &plan.group_by, &plan.projections)?;
let columns = projections
.iter()
.map(|projection| projection.alias.clone())
.collect::<Vec<_>>();
let required_columns = required_column_indexes(
table.schema.columns.len(),
filter.as_ref(),
&projections,
&[],
);
let mapped = self.storage.mapped_bytes();
let query_threads = self.query_runtime.threads;
let mut rows = self.query_runtime.pool.install(|| {
parallel_scan_table(
table,
query_threads,
mapped,
filter.as_ref(),
&required_columns,
|row_group, row| {
projections
.iter()
.map(|proj| match &proj.expr {
CompiledProjectionExpr::Column { column_index, .. } => {
row_group.column(*column_index).value_at(row)
}
CompiledProjectionExpr::Scalar(scalar) => {
eval_compiled_scalar(scalar, row_group, row).unwrap_or(Value::Null)
}
CompiledProjectionExpr::Aggregate { .. } => unreachable!(),
})
.collect()
},
)
})?;
apply_order_by_and_limit(&mut rows, &columns, &plan.order_by, plan.limit)?;
Ok(QueryResult { columns, rows })
}
fn select_aggregate(&self, table: &Table, plan: &SelectPlan) -> Result<QueryResult> {
let filter = plan
.filter
.as_ref()
.map(|filter| compile_filter_expr(&table.schema, filter))
.transpose()?;
let projections = compile_projections(&table.schema, &plan.group_by, &plan.projections)?;
let columns = projections
.iter()
.map(|projection| projection.alias.clone())
.collect::<Vec<_>>();
let group_indexes = plan
.group_by
.iter()
.map(|name| column_index(&table.schema, name))
.collect::<Result<Vec<_>>>()?;
let required_columns = required_column_indexes(
table.schema.columns.len(),
filter.as_ref(),
&projections,
&group_indexes,
);
let mapped = self.storage.mapped_bytes();
let query_threads = self.query_runtime.threads;
let parallelize_rows = query_threads > 1 && table.row_groups.len() < query_threads;
if let Some(fast_path) =
classify_fast_grouped_count(table, plan, &projections, &group_indexes)
{
return self.select_fast_grouped_count(
table,
plan,
filter.as_ref(),
&projections,
&columns,
&required_columns,
mapped,
query_threads,
parallelize_rows,
fast_path,
);
}
let mut rows = if group_indexes.is_empty() {
let partials = self.query_runtime.pool.install(|| {
collect_row_group_aggregates(
table,
query_threads,
mapped,
filter.as_ref(),
&required_columns,
|row_group| {
aggregate_row_group_all(
row_group,
filter.as_ref(),
&projections,
query_threads,
parallelize_rows,
)
},
)
})?;
let mut states = projections.iter().map(AggState::new).collect::<Vec<_>>();
let mut saw_row = false;
for partial in partials {
let Some(partial_states) = partial else {
continue;
};
saw_row = true;
for (state, partial_state) in states.iter_mut().zip(partial_states) {
state.merge(partial_state)?;
}
}
if !saw_row {
states = projections.iter().map(AggState::new).collect();
}
vec![build_output_row(&projections, &[], states)?]
} else {
let partials = self.query_runtime.pool.install(|| {
collect_row_group_aggregates(
table,
query_threads,
mapped,
filter.as_ref(),
&required_columns,
|row_group| {
aggregate_row_group_grouped(
row_group,
filter.as_ref(),
&projections,
&group_indexes,
query_threads,
parallelize_rows,
)
},
)
})?;
let mut global_groups: FxHashMap<Vec<Value>, Vec<AggState>> = FxHashMap::default();
for partial in partials {
for (key, local_states) in partial {
let states = global_groups
.entry(key)
.or_insert_with(|| projections.iter().map(AggState::new).collect());
for (state, local_state) in states.iter_mut().zip(local_states) {
state.merge(local_state)?;
}
}
}
global_groups
.into_iter()
.map(|(key, states)| build_output_row(&projections, &key, states))
.collect::<Result<Vec<_>>>()?
};
apply_order_by_and_limit(&mut rows, &columns, &plan.order_by, plan.limit)?;
Ok(QueryResult { columns, rows })
}
#[allow(clippy::too_many_arguments)]
fn select_fast_grouped_count(
&self,
table: &Table,
plan: &SelectPlan,
filter: Option<&CompiledFilterExpr>,
projections: &[CompiledProjection],
columns: &[String],
required_columns: &[usize],
mapped: Option<&[u8]>,
query_threads: usize,
parallelize_rows: bool,
fast_path: FastGroupedCountPlan,
) -> Result<QueryResult> {
let mut rows = match fast_path.key_type {
FastGroupedCountKeyType::Int64 => match fast_path.mode {
FastGroupedCountMode::Hash => {
let partials = self.query_runtime.pool.install(|| {
collect_row_group_aggregates(
table,
query_threads,
mapped,
filter,
required_columns,
|row_group| {
count_row_group_int64(
row_group,
filter,
fast_path.key_index,
query_threads,
parallelize_rows,
)
},
)
})?;
let mut counts = FxHashMap::default();
for partial in partials {
for (key, count) in partial {
*counts.entry(key).or_insert(0) += count;
}
}
materialize_int64_grouped_counts(
counts,
projections,
fast_path.order_by_count_desc,
plan.limit,
)?
}
FastGroupedCountMode::SortedTopK => {
let partials = self.query_runtime.pool.install(|| {
collect_row_group_aggregates(
table,
query_threads,
mapped,
filter,
required_columns,
|row_group| {
collect_row_group_int64_values(
row_group,
filter,
fast_path.key_index,
)
},
)
})?;
let total_keys = partials.iter().map(Vec::len).sum();
let mut keys = Vec::with_capacity(total_keys);
for partial in partials {
keys.extend(partial);
}
self.query_runtime.pool.install(|| {
keys.par_sort_unstable();
});
materialize_sorted_topk_grouped_counts(
keys,
projections,
plan.limit.expect("sorted top-k fast path requires LIMIT"),
)?
}
},
FastGroupedCountKeyType::Bool => {
let partials = self.query_runtime.pool.install(|| {
collect_row_group_aggregates(
table,
query_threads,
mapped,
filter,
required_columns,
|row_group| count_row_group_bool(row_group, filter, fast_path.key_index),
)
})?;
let mut counts = [0_i64; 2];
for partial in partials {
counts[0] += partial[0];
counts[1] += partial[1];
}
materialize_bool_grouped_counts(
counts,
projections,
fast_path.order_by_count_desc,
plan.limit,
)?
}
};
if !(fast_path.order_by_count_desc && plan.limit.is_some()) {
apply_order_by_and_limit(&mut rows, columns, &plan.order_by, plan.limit)?;
}
Ok(QueryResult {
columns: columns.to_vec(),
rows,
})
}
}
fn build_output_row(
projections: &[CompiledProjection],
key: &[Value],
states: Vec<AggState>,
) -> Result<Vec<Value>> {
let mut row = Vec::with_capacity(projections.len());
for (projection, state) in projections.iter().zip(states) {
match &projection.expr {
CompiledProjectionExpr::Column {
group_key_index, ..
} => {
let key_index = group_key_index.context("grouped column missing from key")?;
row.push(key[key_index].clone());
}
CompiledProjectionExpr::Scalar(_) => {
bail!("scalar expressions in aggregate queries are not yet supported")
}
CompiledProjectionExpr::Aggregate { .. } => row.push(state.finish()?),
}
}
Ok(row)
}
fn classify_fast_grouped_count(
table: &Table,
plan: &SelectPlan,
projections: &[CompiledProjection],
group_indexes: &[usize],
) -> Option<FastGroupedCountPlan> {
if group_indexes.len() != 1 {
return None;
}
let key_index = group_indexes[0];
let key_type = match table.schema.columns[key_index].data_type {
DataType::Int64 | DataType::Timestamp => FastGroupedCountKeyType::Int64,
DataType::Bool => FastGroupedCountKeyType::Bool,
_ => return None,
};
let mut saw_key = false;
let mut count_aliases = Vec::new();
for projection in projections {
match &projection.expr {
CompiledProjectionExpr::Column {
column_index,
group_key_index,
} if *column_index == key_index && *group_key_index == Some(0) => {
saw_key = true;
}
CompiledProjectionExpr::Aggregate {
kind: AggregateKind::Count,
column_index: None,
} => count_aliases.push(projection.alias.clone()),
_ => return None,
}
}
if !saw_key || count_aliases.is_empty() {
return None;
}
let order_by_count_desc = plan.order_by.len() == 1
&& plan.order_by[0].descending
&& count_aliases
.iter()
.any(|alias| alias == &plan.order_by[0].field);
let mode = if matches!(key_type, FastGroupedCountKeyType::Int64)
&& order_by_count_desc
&& plan.limit.is_some()
{
FastGroupedCountMode::SortedTopK
} else {
FastGroupedCountMode::Hash
};
Some(FastGroupedCountPlan {
key_index,
key_type,
order_by_count_desc,
mode,
})
}
fn materialize_int64_grouped_counts(
counts: FxHashMap<i64, i64>,
projections: &[CompiledProjection],
order_by_count_desc: bool,
limit: Option<usize>,
) -> Result<Vec<Vec<Value>>> {
let mut entries = counts.into_iter().collect::<Vec<_>>();
if order_by_count_desc {
sort_and_limit_grouped_counts(&mut entries, limit);
}
entries
.into_iter()
.map(|(key, count)| build_fast_grouped_count_row(projections, Value::Int64(key), count))
.collect()
}
fn materialize_bool_grouped_counts(
counts: [i64; 2],
projections: &[CompiledProjection],
order_by_count_desc: bool,
limit: Option<usize>,
) -> Result<Vec<Vec<Value>>> {
let mut entries = Vec::new();
if counts[0] > 0 {
entries.push((false, counts[0]));
}
if counts[1] > 0 {
entries.push((true, counts[1]));
}
if order_by_count_desc {
sort_and_limit_grouped_counts(&mut entries, limit);
}
entries
.into_iter()
.map(|(key, count)| build_fast_grouped_count_row(projections, Value::Bool(key), count))
.collect()
}
fn materialize_sorted_topk_grouped_counts(
keys: Vec<i64>,
projections: &[CompiledProjection],
limit: usize,
) -> Result<Vec<Vec<Value>>> {
if keys.is_empty() || limit == 0 {
return Ok(Vec::new());
}
let mut heap = BinaryHeap::new();
let mut index = 0;
while index < keys.len() {
let key = keys[index];
let mut count = 1_i64;
index += 1;
while index < keys.len() && keys[index] == key {
count += 1;
index += 1;
}
push_top_group_count(&mut heap, limit, key, count);
}
let mut entries = heap
.into_sorted_vec()
.into_iter()
.map(|entry| (entry.0.key, entry.0.count))
.collect::<Vec<_>>();
entries.sort_unstable_by(|lhs, rhs| rhs.1.cmp(&lhs.1));
entries
.into_iter()
.map(|(key, count)| build_fast_grouped_count_row(projections, Value::Int64(key), count))
.collect()
}
fn sort_and_limit_grouped_counts<K>(entries: &mut Vec<(K, i64)>, limit: Option<usize>) {
if let Some(limit) = limit
&& limit < entries.len()
{
entries.select_nth_unstable_by(limit, |lhs, rhs| rhs.1.cmp(&lhs.1));
entries.truncate(limit);
}
entries.sort_unstable_by(|lhs, rhs| rhs.1.cmp(&lhs.1));
}
fn build_fast_grouped_count_row(
projections: &[CompiledProjection],
key: Value,
count: i64,
) -> Result<Vec<Value>> {
let mut row = Vec::with_capacity(projections.len());
for projection in projections {
match &projection.expr {
CompiledProjectionExpr::Column {
group_key_index, ..
} => {
ensure!(
*group_key_index == Some(0),
"invalid grouped count fast path key projection"
);
row.push(key.clone());
}
CompiledProjectionExpr::Aggregate {
kind: AggregateKind::Count,
column_index: None,
} => row.push(Value::Int64(count)),
_ => bail!("invalid grouped count fast path projection"),
}
}
Ok(row)
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
struct TopGroupCount {
count: i64,
key: i64,
}
impl Ord for TopGroupCount {
fn cmp(&self, other: &Self) -> Ordering {
self.count
.cmp(&other.count)
.then_with(|| self.key.cmp(&other.key))
}
}
impl PartialOrd for TopGroupCount {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
fn push_top_group_count(
heap: &mut BinaryHeap<std::cmp::Reverse<TopGroupCount>>,
limit: usize,
key: i64,
count: i64,
) {
let entry = std::cmp::Reverse(TopGroupCount { count, key });
if heap.len() < limit {
heap.push(entry);
return;
}
if let Some(current_min) = heap.peek()
&& entry.0.count > current_min.0.count
{
heap.pop();
heap.push(entry);
}
}
fn eval_compiled_scalar(
expr: &CompiledScalarExpr,
row_group: &crate::storage::LoadedRowGroup<'_>,
row: usize,
) -> Result<Value> {
match expr {
CompiledScalarExpr::Literal(value) => Ok(value.clone()),
CompiledScalarExpr::ColumnRef(index) => Ok(row_group.column(*index).value_at(row)),
CompiledScalarExpr::UnaryMinus(inner) => {
match eval_compiled_scalar(inner, row_group, row)? {
Value::Int64(v) => Ok(Value::Int64(-v)),
Value::Float64(v) => Ok(Value::Float64(OrderedFloat(-v.into_inner()))),
other => bail!("cannot negate {other:?}"),
}
}
CompiledScalarExpr::BinaryOp { left, op, right } => {
let l = eval_compiled_scalar(left, row_group, row)?;
let r = eval_compiled_scalar(right, row_group, row)?;
eval_arithmetic(l, *op, r)
}
}
}
fn apply_order_by_and_limit(
rows: &mut Vec<Vec<Value>>,
columns: &[String],
order_by: &[OrderByTerm],
limit: Option<usize>,
) -> Result<()> {
if !order_by.is_empty() {
let order_indexes = order_by
.iter()
.map(|term| {
columns
.iter()
.position(|column| column == &term.field)
.with_context(|| {
format!("ORDER BY field {} not found in result set", term.field)
})
})
.collect::<Result<Vec<_>>>()?;
if let Some(limit) = limit
&& limit < rows.len()
{
let pivot = limit;
rows.select_nth_unstable_by(pivot, |lhs, rhs| {
compare_result_rows(lhs, rhs, &order_indexes, order_by)
});
rows.truncate(limit);
}
rows.sort_by(|lhs, rhs| compare_result_rows(lhs, rhs, &order_indexes, order_by));
}
if let Some(limit) = limit
&& rows.len() > limit
{
rows.truncate(limit);
}
Ok(())
}
fn execute_eval(plan: EvalPlan) -> Result<QueryResult> {
let mut columns = Vec::new();
let mut values = Vec::new();
for (expr, alias) in plan.exprs {
columns.push(alias);
values.push(eval_scalar(&expr)?);
}
Ok(QueryResult {
columns,
rows: vec![values],
})
}
fn eval_scalar(expr: &ScalarExpr) -> Result<Value> {
match expr {
ScalarExpr::Literal(value) => Ok(value.clone()),
ScalarExpr::ColumnRef(name) => bail!("column reference '{name}' not valid without a table"),
ScalarExpr::UnaryMinus(inner) => match eval_scalar(inner)? {
Value::Int64(v) => Ok(Value::Int64(-v)),
Value::Float64(v) => Ok(Value::Float64(OrderedFloat(-v.into_inner()))),
other => bail!("cannot negate {other:?}"),
},
ScalarExpr::BinaryOp { left, op, right } => {
let l = eval_scalar(left)?;
let r = eval_scalar(right)?;
eval_arithmetic(l, *op, r)
}
}
}
fn eval_arithmetic(left: Value, op: ArithmeticOp, right: Value) -> Result<Value> {
match (left, right) {
(Value::Int64(l), Value::Int64(r)) => {
let result = match op {
ArithmeticOp::Add => l.checked_add(r).context("integer overflow")?,
ArithmeticOp::Sub => l.checked_sub(r).context("integer overflow")?,
ArithmeticOp::Mul => l.checked_mul(r).context("integer overflow")?,
ArithmeticOp::Div => {
ensure!(r != 0, "division by zero");
l / r
}
ArithmeticOp::Mod => {
ensure!(r != 0, "division by zero");
l % r
}
};
Ok(Value::Int64(result))
}
(Value::Float64(l), Value::Float64(r)) => {
let (l, r) = (l.into_inner(), r.into_inner());
let result = match op {
ArithmeticOp::Add => l + r,
ArithmeticOp::Sub => l - r,
ArithmeticOp::Mul => l * r,
ArithmeticOp::Div => l / r,
ArithmeticOp::Mod => l % r,
};
Ok(Value::Float64(OrderedFloat(result)))
}
(Value::Int64(l), Value::Float64(r)) => eval_arithmetic(
Value::Float64(OrderedFloat(l as f64)),
op,
Value::Float64(r),
),
(Value::Float64(l), Value::Int64(r)) => eval_arithmetic(
Value::Float64(l),
op,
Value::Float64(OrderedFloat(r as f64)),
),
(l, r) => bail!("unsupported arithmetic between {l:?} and {r:?}"),
}
}
fn compare_result_rows(
lhs: &[Value],
rhs: &[Value],
indexes: &[usize],
terms: &[OrderByTerm],
) -> Ordering {
for (index, term) in indexes.iter().zip(terms) {
let ordering = lhs[*index].compare(&rhs[*index]).unwrap_or(Ordering::Equal);
let ordering = if term.descending {
ordering.reverse()
} else {
ordering
};
if ordering != Ordering::Equal {
return ordering;
}
}
Ordering::Equal
}
fn data_type_to_sql_name(data_type: DataType) -> &'static str {
match data_type {
DataType::Int64 => "INT",
DataType::Float64 => "REAL",
DataType::Bool => "BOOL",
DataType::String => "TEXT",
DataType::Timestamp => "TIMESTAMP",
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use super::*;
use crate::types::{BatchColumn, ColumnarBatch};
fn temp_db_path(label: &str) -> Result<PathBuf> {
Ok(std::env::temp_dir().join(format!(
"narrowdb-test-{label}-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
)))
}
#[test]
fn persists_and_queries() -> Result<()> {
let path = temp_db_path("persist")?;
{
let db = NarrowDb::open(
&path,
DbOptions {
row_group_size: 2,
sync_on_flush: true,
..DbOptions::default()
},
)?;
db.execute_sql(
"
CREATE TABLE logs (ts TIMESTAMP, level TEXT, service TEXT, status INT, duration REAL);
INSERT INTO logs VALUES
(1, 'info', 'api', 200, 12.0),
(2, 'error', 'api', 500, 120.0),
(3, 'error', 'worker', 503, 250.0);
",
)?;
db.flush_all()?;
}
let db = NarrowDb::open(&path, DbOptions::default())?;
let results = db.execute_sql(
"SELECT service, COUNT(*) AS errors FROM logs WHERE level = 'error' GROUP BY service ORDER BY errors DESC LIMIT 2;",
)?;
assert_eq!(results.last().unwrap().rows.len(), 2);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn preserves_projection_order_in_group_by_results() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-order-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql(
"
CREATE TABLE logs (service TEXT, host TEXT, duration REAL);
INSERT INTO logs VALUES
('api', 'a', 10.0),
('api', 'b', 20.0),
('worker', 'a', 30.0);
",
)?;
let results = db.execute_sql(
"SELECT host, service, COUNT(*) AS total FROM logs GROUP BY service, host ORDER BY host DESC LIMIT 10;",
)?;
let result = results.last().unwrap();
assert_eq!(result.columns, vec!["host", "service", "total"]);
assert_eq!(result.rows[0][0].to_string(), "b");
assert_eq!(result.rows[0][1].to_string(), "api");
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn evaluates_scalar_expressions_in_select() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-scalar-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql(
"CREATE TABLE products (name TEXT, price REAL, quantity INT);
INSERT INTO products VALUES
('widget', 10.0, 5),
('gadget', 25.0, 3),
('gizmo', 7.5, 10);",
)?;
let results =
db.execute_sql("SELECT price * 2 AS doubled FROM products ORDER BY doubled")?;
let result = results.last().unwrap();
assert_eq!(result.rows.len(), 3);
assert_eq!(result.rows[0][0], Value::Float64(OrderedFloat(15.0)));
assert_eq!(result.rows[1][0], Value::Float64(OrderedFloat(20.0)));
assert_eq!(result.rows[2][0], Value::Float64(OrderedFloat(50.0)));
let results =
db.execute_sql("SELECT price * quantity AS total FROM products ORDER BY total")?;
let result = results.last().unwrap();
assert_eq!(result.columns, vec!["total"]);
assert_eq!(result.rows[0][0], Value::Float64(OrderedFloat(50.0)));
assert_eq!(result.rows[1][0], Value::Float64(OrderedFloat(75.0)));
assert_eq!(result.rows[2][0], Value::Float64(OrderedFloat(75.0)));
let results = db
.execute_sql("SELECT name, price + 1.5 AS adjusted FROM products WHERE quantity > 4")?;
let result = results.last().unwrap();
assert_eq!(result.rows.len(), 2);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn ingests_columnar_batches() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-columnar-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql(
"CREATE TABLE logs (ts TIMESTAMP, service TEXT, status INT, duration REAL);",
)?;
db.insert_columnar_batch(
"logs",
ColumnarBatch::new(vec![
BatchColumn::Timestamp(vec![1, 2, 3]),
BatchColumn::String(vec![
"api".to_string(),
"api".to_string(),
"worker".to_string(),
]),
BatchColumn::Int64(vec![200, 500, 503]),
BatchColumn::Float64(vec![10.0, 100.0, 250.0]),
])?,
)?;
db.flush_all()?;
let results = db.execute_sql(
"SELECT service, COUNT(*) AS total FROM logs WHERE status >= 500 GROUP BY service ORDER BY total DESC LIMIT 10;",
)?;
let result = results.last().unwrap();
assert_eq!(result.rows.len(), 2);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn create_table_if_not_exists() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-ifne-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql("CREATE TABLE logs (ts TIMESTAMP, level TEXT);")?;
db.execute_sql("CREATE TABLE IF NOT EXISTS logs (ts TIMESTAMP, level TEXT);")?;
let err = db.execute_sql("CREATE TABLE logs (ts TIMESTAMP, level TEXT);");
assert!(err.is_err());
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn multi_filter_queries_match_expected_rows() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-multifilter-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql(
"CREATE TABLE logs (id INT, level TEXT, status INT, duration REAL);
INSERT INTO logs VALUES
(1, 'info', 200, 10.0),
(2, 'error', 500, 650.0),
(3, 'error', 503, 750.0),
(4, 'warn', 503, 900.0),
(5, 'error', 504, 800.0);",
)?;
let results = db.execute_sql(
"SELECT id FROM logs
WHERE level = 'error' AND status >= 503 AND duration >= 750.0
ORDER BY id;",
)?;
let result = results.last().unwrap();
assert_eq!(
result.rows,
vec![vec![Value::Int64(3)], vec![Value::Int64(5)]]
);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn grouped_string_queries_merge_across_row_groups() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-group-merge-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(
&path,
DbOptions {
row_group_size: 2,
sync_on_flush: true,
..DbOptions::default()
},
)?;
db.execute_sql(
"CREATE TABLE logs (service TEXT, status INT);
INSERT INTO logs VALUES
('api', 200),
('worker', 500),
('worker', 503),
('api', 500);",
)?;
db.flush_all()?;
let results = db.execute_sql(
"SELECT service, COUNT(*) AS total
FROM logs
GROUP BY service
ORDER BY service;",
)?;
let result = results.last().unwrap();
assert_eq!(
result.rows,
vec![
vec![Value::String("api".to_string()), Value::Int64(2)],
vec![Value::String("worker".to_string()), Value::Int64(2)],
]
);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn aggregates_ignore_nulls_for_min_and_max() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-null-agg-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql(
"CREATE TABLE metrics (service TEXT, duration REAL, status INT);
INSERT INTO metrics VALUES
('api', 10.0, 200),
('api', NULL, NULL),
('worker', 25.5, 503),
('worker', 3.0, 404);",
)?;
let results = db.execute_sql(
"SELECT MIN(duration) AS min_duration, MAX(status) AS max_status
FROM metrics;",
)?;
let result = results.last().unwrap();
assert_eq!(
result.rows,
vec![vec![Value::Float64(OrderedFloat(3.0)), Value::Int64(503),]]
);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn alters_table_and_column_names() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-alter-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
{
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql(
"CREATE TABLE logs (ts TIMESTAMP, level TEXT);
INSERT INTO logs VALUES (1, 'info'), (2, 'error');",
)?;
db.execute_sql("ALTER TABLE logs RENAME TO events;")?;
db.execute_sql("ALTER TABLE events RENAME COLUMN level TO severity;")?;
let old_table = db.execute_sql("SELECT * FROM logs;");
assert!(old_table.is_err());
let results = db.execute_sql(
"SELECT severity FROM events WHERE severity = 'error' ORDER BY severity;",
)?;
let result = results.last().unwrap();
assert_eq!(result.columns, vec!["severity"]);
assert_eq!(result.rows.len(), 1);
assert_eq!(result.rows[0][0], Value::String("error".to_string()));
}
let reopened = NarrowDb::open(&path, DbOptions::default())?;
let results = reopened.execute_sql("SELECT severity FROM events ORDER BY severity;")?;
let result = results.last().unwrap();
assert_eq!(result.columns, vec!["severity"]);
assert_eq!(result.rows.len(), 2);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn drops_tables() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-drop-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
{
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql("CREATE TABLE logs (ts TIMESTAMP, level TEXT);")?;
db.execute_sql("DROP TABLE logs;")?;
let select_missing = db.execute_sql("SELECT * FROM logs;");
assert!(select_missing.is_err());
db.execute_sql("DROP TABLE IF EXISTS logs;")?;
}
let reopened = NarrowDb::open(&path, DbOptions::default())?;
reopened.execute_sql("CREATE TABLE logs (ts TIMESTAMP, level TEXT);")?;
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn show_tables_lists_tables() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-show-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql("CREATE TABLE logs (ts TIMESTAMP, level TEXT);")?;
db.execute_sql("CREATE TABLE metrics (ts TIMESTAMP, value REAL);")?;
let results = db.execute_sql("SHOW TABLES;")?;
let result = results.last().unwrap();
assert_eq!(result.columns, vec!["table_name"]);
assert_eq!(result.rows.len(), 2);
assert_eq!(result.rows[0][0], Value::String("logs".to_string()));
assert_eq!(result.rows[1][0], Value::String("metrics".to_string()));
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn rejects_zero_query_threads() {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-threads-invalid-{}.db",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock drift")
.as_nanos()
));
let result = NarrowDb::open(
&path,
DbOptions {
query_threads: Some(0),
..DbOptions::default()
},
);
assert!(result.is_err());
}
#[test]
fn reuses_query_thread_pool_for_identical_sizes() -> Result<()> {
let runtime_a = QueryRuntime::new(Some(2))?;
let runtime_b = QueryRuntime::new(Some(2))?;
assert_eq!(runtime_a.threads, 2);
assert!(Arc::ptr_eq(&runtime_a.pool, &runtime_b.pool));
Ok(())
}
#[test]
fn query_results_match_between_single_and_multi_thread_execution() -> Result<()> {
let base = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
let path_single = std::env::temp_dir().join(format!("narrowdb-test-single-{base}.db"));
let path_multi = std::env::temp_dir().join(format!("narrowdb-test-multi-{base}.db"));
let query = "SELECT service, COUNT(*) AS total
FROM logs
WHERE status >= 500
GROUP BY service
ORDER BY service;";
let single = seeded_query_db(&path_single, Some(1))?;
let multi = seeded_query_db(&path_multi, Some(4))?;
let single_result = single.execute_sql(query)?.pop().unwrap();
let multi_result = multi.execute_sql(query)?.pop().unwrap();
assert_eq!(single_result.columns, multi_result.columns);
assert_eq!(single_result.rows, multi_result.rows);
std::fs::remove_file(path_single)?;
std::fs::remove_file(path_multi)?;
Ok(())
}
#[test]
fn preserves_row_order_with_parallel_row_materialization() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-row-order-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(
&path,
DbOptions {
row_group_size: 131_072,
query_threads: Some(4),
..DbOptions::default()
},
)?;
db.execute_sql("CREATE TABLE logs (id INT, service TEXT, status INT);")?;
let rows = 70_000;
let batch = ColumnarBatch::new(vec![
BatchColumn::Int64((0..rows).map(|value| value as i64).collect()),
BatchColumn::String((0..rows).map(|_| "api".to_string()).collect()),
BatchColumn::Int64((0..rows).map(|_| 500_i64).collect()),
])?;
db.insert_columnar_batch("logs", batch)?;
db.flush_all()?;
let result = db
.execute_sql("SELECT id FROM logs WHERE status >= 500;")?
.pop()
.unwrap();
assert_eq!(result.rows.len(), rows);
for (index, row) in result.rows.iter().enumerate() {
assert_eq!(row, &vec![Value::Int64(index as i64)]);
}
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn describe_table_returns_schema() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-describe-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql("CREATE TABLE logs (ts TIMESTAMP, level TEXT, status INT);")?;
let results = db.execute_sql("DESCRIBE logs;")?;
let result = results.last().unwrap();
assert_eq!(result.columns, vec!["column_name", "data_type"]);
assert_eq!(result.rows.len(), 3);
assert_eq!(
result.rows[0],
vec![
Value::String("ts".to_string()),
Value::String("TIMESTAMP".to_string())
]
);
assert_eq!(
result.rows[1],
vec![
Value::String("level".to_string()),
Value::String("TEXT".to_string())
]
);
assert_eq!(
result.rows[2],
vec![
Value::String("status".to_string()),
Value::String("INT".to_string())
]
);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn fast_grouped_count_returns_expected_top_k_for_int_keys() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-fast-grouped-count-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(
&path,
DbOptions {
row_group_size: 8,
query_threads: Some(4),
..DbOptions::default()
},
)?;
db.execute_sql("CREATE TABLE logs (request_id INT, status INT);")?;
db.execute_sql(
"INSERT INTO logs VALUES
(1, 500), (1, 500), (1, 500), (1, 500), (1, 500),
(2, 500), (2, 500), (2, 500), (2, 500),
(3, 500), (3, 500), (3, 500),
(4, 500), (4, 500);",
)?;
let result = db
.execute_sql(
"SELECT request_id, COUNT(*) AS total
FROM logs
WHERE status >= 500
GROUP BY request_id
ORDER BY total DESC
LIMIT 2;",
)?
.pop()
.unwrap();
assert_eq!(
result.rows,
vec![
vec![Value::Int64(1), Value::Int64(5)],
vec![Value::Int64(2), Value::Int64(4)],
]
);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn fast_grouped_count_accepts_arbitrary_ties_for_top_k() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-fast-grouped-ties-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(
&path,
DbOptions {
row_group_size: 32,
query_threads: Some(4),
..DbOptions::default()
},
)?;
db.execute_sql("CREATE TABLE logs (request_id INT, status INT);")?;
db.insert_columnar_batch(
"logs",
ColumnarBatch::new(vec![
BatchColumn::Int64((0..64).map(|value| value as i64).collect()),
BatchColumn::Int64((0..64).map(|_| 500_i64).collect()),
])?,
)?;
db.flush_all()?;
let result = db
.execute_sql(
"SELECT request_id, COUNT(*) AS total
FROM logs
WHERE status >= 500
GROUP BY request_id
ORDER BY total DESC
LIMIT 25;",
)?
.pop()
.unwrap();
assert_eq!(result.rows.len(), 25);
let mut ids = std::collections::BTreeSet::new();
for row in &result.rows {
assert_eq!(row[1], Value::Int64(1));
let request_id = row[0].as_i64().expect("request_id should be int");
assert!((0..64).contains(&(request_id as usize)));
ids.insert(request_id);
}
assert_eq!(ids.len(), 25);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn fast_grouped_count_supports_bool_keys() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-fast-grouped-bool-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(
&path,
DbOptions {
query_threads: Some(4),
..DbOptions::default()
},
)?;
db.execute_sql("CREATE TABLE logs (is_error BOOL);")?;
db.execute_sql(
"INSERT INTO logs VALUES
(true), (true), (true),
(false), (false);",
)?;
let result = db
.execute_sql(
"SELECT is_error, COUNT(*) AS total
FROM logs
GROUP BY is_error
ORDER BY total DESC;",
)?
.pop()
.unwrap();
assert_eq!(
result.rows,
vec![
vec![Value::Bool(true), Value::Int64(3)],
vec![Value::Bool(false), Value::Int64(2)],
]
);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn fast_grouped_count_supports_timestamp_keys() -> Result<()> {
let path = std::env::temp_dir().join(format!(
"narrowdb-test-fast-grouped-timestamp-{}.db",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
let db = NarrowDb::open(
&path,
DbOptions {
row_group_size: 8,
query_threads: Some(4),
..DbOptions::default()
},
)?;
db.execute_sql("CREATE TABLE logs (ts TIMESTAMP, status INT);")?;
db.execute_sql(
"INSERT INTO logs VALUES
(100, 500), (100, 500), (100, 500),
(200, 500), (200, 500),
(300, 500);",
)?;
let result = db
.execute_sql(
"SELECT ts, COUNT(*) AS total
FROM logs
WHERE status >= 500
GROUP BY ts
ORDER BY total DESC
LIMIT 2;",
)?
.pop()
.unwrap();
assert_eq!(
result.rows,
vec![
vec![Value::Int64(100), Value::Int64(3)],
vec![Value::Int64(200), Value::Int64(2)],
]
);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn query_hides_pending_rows_until_select_flushes() -> Result<()> {
let path = temp_db_path("pending-visibility")?;
let db = NarrowDb::open(
&path,
DbOptions {
row_group_size: 8,
..DbOptions::default()
},
)?;
db.execute_sql("CREATE TABLE logs (ts TIMESTAMP, service TEXT);")?;
db.insert_row(
"logs",
vec![Value::Int64(1), Value::String("api".to_string())],
)?;
let hidden = db
.query("SELECT COUNT(*) AS total FROM logs;")?
.pop()
.unwrap();
assert_eq!(hidden.rows, vec![vec![Value::Int64(0)]]);
let visible = db.execute_one("SELECT COUNT(*) AS total FROM logs;")?;
assert_eq!(visible.rows, vec![vec![Value::Int64(1)]]);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn rename_table_and_column_persist_across_reopen() -> Result<()> {
let path = temp_db_path("rename-reopen")?;
{
let db = NarrowDb::open(
&path,
DbOptions {
row_group_size: 2,
sync_on_flush: true,
..DbOptions::default()
},
)?;
db.execute_sql(
"CREATE TABLE logs (ts TIMESTAMP, service TEXT, status INT);
INSERT INTO logs VALUES
(1, 'api', 200),
(2, 'worker', 500),
(3, 'api', 503);",
)?;
db.execute_sql("ALTER TABLE logs RENAME TO app_logs;")?;
db.execute_sql("ALTER TABLE app_logs RENAME COLUMN service TO source;")?;
}
let db = NarrowDb::open(&path, DbOptions::default())?;
let describe = db.execute_one("DESCRIBE app_logs;")?;
assert_eq!(
describe.rows,
vec![
vec![
Value::String("ts".to_string()),
Value::String("TIMESTAMP".to_string()),
],
vec![
Value::String("source".to_string()),
Value::String("TEXT".to_string()),
],
vec![
Value::String("status".to_string()),
Value::String("INT".to_string()),
],
]
);
let result = db.execute_one(
"SELECT source, COUNT(*) AS total
FROM app_logs
GROUP BY source
ORDER BY source;",
)?;
assert_eq!(
result.rows,
vec![
vec![Value::String("api".to_string()), Value::Int64(2)],
vec![Value::String("worker".to_string()), Value::Int64(1)],
]
);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn drop_table_rewrite_preserves_other_tables_across_reopen() -> Result<()> {
let path = temp_db_path("drop-rewrite")?;
{
let db = NarrowDb::open(
&path,
DbOptions {
row_group_size: 8,
sync_on_flush: true,
..DbOptions::default()
},
)?;
db.execute_sql(
"CREATE TABLE logs (id INT);
CREATE TABLE metrics (name TEXT, value INT);",
)?;
db.execute_sql("INSERT INTO logs VALUES (1), (2);")?;
db.insert_row(
"metrics",
vec![Value::String("requests".to_string()), Value::Int64(42)],
)?;
db.execute_sql("DROP TABLE logs;")?;
}
let db = NarrowDb::open(&path, DbOptions::default())?;
let tables = db.execute_one("SHOW TABLES;")?;
assert_eq!(
tables.rows,
vec![vec![Value::String("metrics".to_string())]]
);
let metrics = db.execute_one("SELECT name, value FROM metrics;")?;
assert_eq!(
metrics.rows,
vec![vec![
Value::String("requests".to_string()),
Value::Int64(42)
]]
);
let err = db.execute_one("SELECT * FROM logs;");
assert!(err.is_err());
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn persisted_null_filters_survive_reopen() -> Result<()> {
let path = temp_db_path("null-reopen")?;
{
let db = NarrowDb::open(
&path,
DbOptions {
row_group_size: 2,
sync_on_flush: true,
..DbOptions::default()
},
)?;
db.execute_sql(
"CREATE TABLE events (id INT, note TEXT);
INSERT INTO events VALUES
(1, NULL),
(2, 'ok'),
(3, NULL);",
)?;
db.flush_all()?;
}
let db = NarrowDb::open(&path, DbOptions::default())?;
let nulls = db.execute_one("SELECT id FROM events WHERE note IS NULL ORDER BY id;")?;
assert_eq!(
nulls.rows,
vec![vec![Value::Int64(1)], vec![Value::Int64(3)]]
);
let non_nulls =
db.execute_one("SELECT id FROM events WHERE note IS NOT NULL ORDER BY id;")?;
assert_eq!(non_nulls.rows, vec![vec![Value::Int64(2)]]);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn supports_or_and_in_filters() -> Result<()> {
let path = temp_db_path("or-in-filters")?;
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql(
"CREATE TABLE logs (id INT, service TEXT, status INT, level TEXT);
INSERT INTO logs VALUES
(1, 'api', 200, 'info'),
(2, 'worker', 500, 'error'),
(3, 'billing', 503, 'warn'),
(4, 'api', 404, 'error'),
(5, 'search', 200, 'info');",
)?;
let or_result = db.execute_one(
"SELECT id FROM logs
WHERE service = 'api' OR status >= 500
ORDER BY id;",
)?;
assert_eq!(
or_result.rows,
vec![
vec![Value::Int64(1)],
vec![Value::Int64(2)],
vec![Value::Int64(3)],
vec![Value::Int64(4)],
]
);
let in_result = db.execute_one(
"SELECT id FROM logs
WHERE status IN (200, 404) AND level NOT IN ('warn')
ORDER BY id;",
)?;
assert_eq!(
in_result.rows,
vec![
vec![Value::Int64(1)],
vec![Value::Int64(4)],
vec![Value::Int64(5)],
]
);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn insert_rows_iter_streams_without_collecting_full_input() -> Result<()> {
let path = temp_db_path("insert-rows-iter")?;
let db = NarrowDb::open(
&path,
DbOptions {
row_group_size: 2,
..DbOptions::default()
},
)?;
db.execute_sql("CREATE TABLE logs (ts TIMESTAMP, service TEXT, status INT);")?;
let inserted = db.insert_rows_iter(
"logs",
(0..5).map(|i| {
vec![
Value::Int64(1_700_000_000_000 + i),
Value::String(if i % 2 == 0 { "api" } else { "worker" }.to_string()),
Value::Int64(if i % 2 == 0 { 200 } else { 500 }),
]
}),
)?;
assert_eq!(inserted, 5);
let result = db.execute_one("SELECT COUNT(*) AS total FROM logs;")?;
assert_eq!(result.rows, vec![vec![Value::Int64(5)]]);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn supports_multiple_order_by_terms() -> Result<()> {
let path = temp_db_path("multi-order-by")?;
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql(
"CREATE TABLE logs (service TEXT, status INT, duration REAL);
INSERT INTO logs VALUES
('api', 500, 5.0),
('worker', 500, 9.0),
('api', 200, 7.0),
('billing', 500, 9.0),
('billing', 200, 1.0);",
)?;
let result = db.execute_one(
"SELECT service, status, duration
FROM logs
ORDER BY status DESC, duration DESC, service ASC;",
)?;
assert_eq!(
result.rows,
vec![
vec![
Value::String("billing".to_string()),
Value::Int64(500),
Value::Float64(OrderedFloat(9.0)),
],
vec![
Value::String("worker".to_string()),
Value::Int64(500),
Value::Float64(OrderedFloat(9.0)),
],
vec![
Value::String("api".to_string()),
Value::Int64(500),
Value::Float64(OrderedFloat(5.0)),
],
vec![
Value::String("api".to_string()),
Value::Int64(200),
Value::Float64(OrderedFloat(7.0)),
],
vec![
Value::String("billing".to_string()),
Value::Int64(200),
Value::Float64(OrderedFloat(1.0)),
],
]
);
std::fs::remove_file(path)?;
Ok(())
}
#[test]
fn reports_clear_errors_for_unsupported_query_shapes() -> Result<()> {
let path = temp_db_path("unsupported-diagnostics")?;
let db = NarrowDb::open(&path, DbOptions::default())?;
db.execute_sql(
"CREATE TABLE logs (id INT, service TEXT);
CREATE TABLE metrics (id INT, value INT);",
)?;
let join_err = db
.execute_sql("SELECT * FROM logs JOIN metrics ON logs.id = metrics.id;")
.unwrap_err();
assert!(
join_err
.to_string()
.contains("JOIN clauses are not supported yet")
);
let distinct_err = db
.execute_sql("SELECT DISTINCT service FROM logs;")
.unwrap_err();
assert!(
distinct_err
.to_string()
.contains("SELECT DISTINCT is not supported yet")
);
let having_err = db
.execute_sql(
"SELECT service, COUNT(*) AS total FROM logs GROUP BY service HAVING total > 1;",
)
.unwrap_err();
assert!(
having_err
.to_string()
.contains("HAVING is not supported yet")
);
std::fs::remove_file(path)?;
Ok(())
}
fn seeded_query_db(path: &std::path::Path, query_threads: Option<usize>) -> Result<NarrowDb> {
let db = NarrowDb::open(
path,
DbOptions {
row_group_size: 16_384,
query_threads,
..DbOptions::default()
},
)?;
db.execute_sql("CREATE TABLE logs (service TEXT, status INT);")?;
let rows = 80_000;
let services = (0..rows)
.map(|index| match index % 4 {
0 => "api".to_string(),
1 => "worker".to_string(),
2 => "billing".to_string(),
_ => "search".to_string(),
})
.collect::<Vec<_>>();
let statuses = (0..rows)
.map(|index| if index % 3 == 0 { 503 } else { 200 })
.collect::<Vec<_>>();
db.insert_columnar_batch(
"logs",
ColumnarBatch::new(vec![
BatchColumn::String(services),
BatchColumn::Int64(statuses),
])?,
)?;
db.flush_all()?;
Ok(db)
}
}