#[cfg(test)]
mod tests {
use crate::{AggFunc, ElastiCubeBuilder};
use arrow::array::{Float64Array, Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
fn create_test_cube() -> Arc<crate::ElastiCube> {
let schema = Arc::new(Schema::new(vec![
Field::new("region", DataType::Utf8, false),
Field::new("product", DataType::Utf8, false),
Field::new("sales", DataType::Float64, false),
Field::new("quantity", DataType::Int32, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["North", "South", "East", "West"])),
Arc::new(StringArray::from(vec!["A", "B", "A", "C"])),
Arc::new(Float64Array::from(vec![100.0, 200.0, 150.0, 300.0])),
Arc::new(Int32Array::from(vec![10, 20, 15, 30])),
],
)
.unwrap();
let cube = ElastiCubeBuilder::new("test_sales")
.add_dimension("region", DataType::Utf8)
.unwrap()
.add_dimension("product", DataType::Utf8)
.unwrap()
.add_measure("sales", DataType::Float64, AggFunc::Sum)
.unwrap()
.add_measure("quantity", DataType::Int32, AggFunc::Sum)
.unwrap()
.with_data(vec![batch])
.unwrap()
.build()
.unwrap();
Arc::new(cube)
}
#[test]
fn test_append_rows() {
let mut cube = (*create_test_cube()).clone();
assert_eq!(cube.row_count(), 4);
let schema = cube.arrow_schema().clone();
let new_batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["Central", "North"])),
Arc::new(StringArray::from(vec!["D", "B"])),
Arc::new(Float64Array::from(vec![250.0, 180.0])),
Arc::new(Int32Array::from(vec![25, 18])),
],
)
.unwrap();
let rows_added = cube.append_rows(new_batch).unwrap();
assert_eq!(rows_added, 2);
assert_eq!(cube.row_count(), 6);
}
#[test]
fn test_append_batches() {
let mut cube = (*create_test_cube()).clone();
assert_eq!(cube.row_count(), 4);
let schema = cube.arrow_schema().clone();
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["Central"])),
Arc::new(StringArray::from(vec!["D"])),
Arc::new(Float64Array::from(vec![250.0])),
Arc::new(Int32Array::from(vec![25])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["Northeast", "Southwest"])),
Arc::new(StringArray::from(vec!["E", "F"])),
Arc::new(Float64Array::from(vec![175.0, 225.0])),
Arc::new(Int32Array::from(vec![17, 22])),
],
)
.unwrap();
let total_rows = cube.append_batches(vec![batch1, batch2]).unwrap();
assert_eq!(total_rows, 3);
assert_eq!(cube.row_count(), 7);
}
#[test]
fn test_append_empty_batches_returns_zero() {
let mut cube = (*create_test_cube()).clone();
let original_count = cube.row_count();
let result = cube.append_batches(vec![]).unwrap();
assert_eq!(result, 0);
assert_eq!(cube.row_count(), original_count);
}
#[test]
fn test_append_with_incompatible_schema_fails() {
let mut cube = (*create_test_cube()).clone();
let wrong_schema = Arc::new(Schema::new(vec![
Field::new("wrong_field", DataType::Int32, false),
]));
let bad_batch = RecordBatch::try_new(
wrong_schema,
vec![Arc::new(Int32Array::from(vec![1, 2]))],
)
.unwrap();
let result = cube.append_rows(bad_batch);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("mismatch"));
}
#[tokio::test]
async fn test_delete_rows() {
let mut cube = (*create_test_cube()).clone();
assert_eq!(cube.row_count(), 4);
let deleted = cube.delete_rows("sales < 200").await.unwrap();
assert_eq!(deleted, 2); assert_eq!(cube.row_count(), 2); }
#[tokio::test]
async fn test_delete_rows_with_string_filter() {
let mut cube = (*create_test_cube()).clone();
assert_eq!(cube.row_count(), 4);
let deleted = cube.delete_rows("region = 'North'").await.unwrap();
assert_eq!(deleted, 1);
assert_eq!(cube.row_count(), 3);
}
#[tokio::test]
async fn test_delete_rows_no_matches() {
let mut cube = (*create_test_cube()).clone();
let original_count = cube.row_count();
let deleted = cube.delete_rows("sales > 1000").await.unwrap();
assert_eq!(deleted, 0);
assert_eq!(cube.row_count(), original_count);
}
#[tokio::test]
async fn test_delete_all_rows() {
let mut cube = (*create_test_cube()).clone();
let deleted = cube.delete_rows("sales >= 0").await.unwrap();
assert_eq!(deleted, 4);
assert_eq!(cube.row_count(), 0);
}
#[tokio::test]
async fn test_update_rows() {
let mut cube = (*create_test_cube()).clone();
assert_eq!(cube.row_count(), 4);
let schema = cube.arrow_schema().clone();
let replacement = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["North"])),
Arc::new(StringArray::from(vec!["A"])),
Arc::new(Float64Array::from(vec![500.0])), Arc::new(Int32Array::from(vec![50])), ],
)
.unwrap();
let (deleted, added) = cube.update_rows("region = 'North'", replacement).await.unwrap();
assert_eq!(deleted, 1);
assert_eq!(added, 1);
assert_eq!(cube.row_count(), 4);
let result = Arc::new(cube.clone())
.query()
.unwrap()
.select(&["region", "sales"])
.filter("region = 'North'")
.execute()
.await
.unwrap();
let batches = result.batches();
assert_eq!(batches.len(), 1);
let sales_col = batches[0]
.column_by_name("sales")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(sales_col.value(0), 500.0); }
#[tokio::test]
async fn test_update_multiple_rows() {
let mut cube = (*create_test_cube()).clone();
let schema = cube.arrow_schema().clone();
let replacement = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["North", "East"])),
Arc::new(StringArray::from(vec!["A", "A"])),
Arc::new(Float64Array::from(vec![600.0, 700.0])),
Arc::new(Int32Array::from(vec![60, 70])),
],
)
.unwrap();
let (deleted, added) = cube.update_rows("product = 'A'", replacement).await.unwrap();
assert_eq!(deleted, 2); assert_eq!(added, 2);
assert_eq!(cube.row_count(), 4);
}
#[test]
fn test_consolidate_batches() {
let mut cube = (*create_test_cube()).clone();
let schema = cube.arrow_schema().clone();
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["Central"])),
Arc::new(StringArray::from(vec!["D"])),
Arc::new(Float64Array::from(vec![250.0])),
Arc::new(Int32Array::from(vec![25])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["Northeast"])),
Arc::new(StringArray::from(vec!["E"])),
Arc::new(Float64Array::from(vec![175.0])),
Arc::new(Int32Array::from(vec![17])),
],
)
.unwrap();
cube.append_batches(vec![batch1, batch2]).unwrap();
assert_eq!(cube.batch_count(), 3);
assert_eq!(cube.row_count(), 6);
let old_count = cube.consolidate_batches().unwrap();
assert_eq!(old_count, 3);
assert_eq!(cube.batch_count(), 1);
assert_eq!(cube.row_count(), 6); }
#[test]
fn test_consolidate_single_batch_no_op() {
let mut cube = (*create_test_cube()).clone();
assert_eq!(cube.batch_count(), 1);
let old_count = cube.consolidate_batches().unwrap();
assert_eq!(old_count, 1);
assert_eq!(cube.batch_count(), 1);
}
#[test]
fn test_batch_count() {
let cube = (*create_test_cube()).clone();
assert_eq!(cube.batch_count(), 1);
}
#[tokio::test]
async fn test_sequential_operations() {
let mut cube = (*create_test_cube()).clone();
let schema = cube.arrow_schema().clone();
let new_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["Central"])),
Arc::new(StringArray::from(vec!["D"])),
Arc::new(Float64Array::from(vec![250.0])),
Arc::new(Int32Array::from(vec![25])),
],
)
.unwrap();
cube.append_rows(new_batch).unwrap();
assert_eq!(cube.row_count(), 5);
let deleted = cube.delete_rows("sales < 150").await.unwrap();
assert_eq!(deleted, 1); assert_eq!(cube.row_count(), 4);
let update_batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["South"])),
Arc::new(StringArray::from(vec!["B"])),
Arc::new(Float64Array::from(vec![999.0])),
Arc::new(Int32Array::from(vec![99])),
],
)
.unwrap();
let (deleted, added) = cube.update_rows("region = 'South'", update_batch).await.unwrap();
assert_eq!(deleted, 1);
assert_eq!(added, 1);
assert_eq!(cube.row_count(), 4);
cube.consolidate_batches().unwrap();
assert_eq!(cube.batch_count(), 1);
assert_eq!(cube.row_count(), 4);
}
}