mod calculated;
mod dimension;
mod hierarchy;
mod measure;
mod schema;
mod updates;
pub use calculated::{CalculatedMeasure, VirtualDimension};
pub use dimension::Dimension;
pub use hierarchy::Hierarchy;
pub use measure::{AggFunc, Measure};
pub use schema::CubeSchema;
use crate::error::{Error, Result};
use crate::query::QueryBuilder;
use arrow::datatypes::Schema as ArrowSchema;
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct ElastiCube {
schema: CubeSchema,
arrow_schema: Arc<ArrowSchema>,
data: Vec<RecordBatch>,
row_count: usize,
}
impl ElastiCube {
pub fn new(
schema: CubeSchema,
arrow_schema: Arc<ArrowSchema>,
data: Vec<RecordBatch>,
) -> Result<Self> {
let row_count = data.iter().map(|batch| batch.num_rows()).sum();
Ok(Self {
schema,
arrow_schema,
data,
row_count,
})
}
pub fn schema(&self) -> &CubeSchema {
&self.schema
}
pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
&self.arrow_schema
}
pub fn data(&self) -> &[RecordBatch] {
&self.data
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn dimensions(&self) -> Vec<&Dimension> {
self.schema.dimensions()
}
pub fn measures(&self) -> Vec<&Measure> {
self.schema.measures()
}
pub fn hierarchies(&self) -> Vec<&Hierarchy> {
self.schema.hierarchies()
}
pub fn get_dimension(&self, name: &str) -> Option<&Dimension> {
self.schema.get_dimension(name)
}
pub fn get_measure(&self, name: &str) -> Option<&Measure> {
self.schema.get_measure(name)
}
pub fn get_hierarchy(&self, name: &str) -> Option<&Hierarchy> {
self.schema.get_hierarchy(name)
}
pub fn query(self: Arc<Self>) -> Result<QueryBuilder> {
QueryBuilder::new(self)
}
pub fn statistics(&self) -> crate::optimization::CubeStatistics {
crate::optimization::CubeStatistics::from_batches(&self.data)
}
pub fn query_with_config(
self: Arc<Self>,
config: crate::optimization::OptimizationConfig,
) -> Result<QueryBuilder> {
QueryBuilder::with_config(self, config)
}
pub fn append_rows(&mut self, batch: RecordBatch) -> Result<usize> {
updates::validate_batch_schema(&self.arrow_schema, &batch.schema())?;
let rows_added = batch.num_rows();
self.data.push(batch);
self.row_count += rows_added;
Ok(rows_added)
}
pub fn append_batches(&mut self, batches: Vec<RecordBatch>) -> Result<usize> {
if batches.is_empty() {
return Ok(0);
}
for batch in &batches {
updates::validate_batch_schema(&self.arrow_schema, &batch.schema())?;
}
let rows_added: usize = batches.iter().map(|b| b.num_rows()).sum();
self.data.extend(batches);
self.row_count += rows_added;
Ok(rows_added)
}
pub async fn delete_rows(&mut self, filter_expr: &str) -> Result<usize> {
use datafusion::prelude::*;
let ctx = SessionContext::new();
let table = datafusion::datasource::MemTable::try_new(
self.arrow_schema.clone(),
vec![self.data.clone()],
)
.map_err(|e| Error::query(format!("Failed to create temp table: {}", e)))?;
ctx.register_table("temp_table", Arc::new(table))
.map_err(|e| Error::query(format!("Failed to register table: {}", e)))?;
let query = format!("SELECT * FROM temp_table WHERE NOT ({})", filter_expr);
let df = ctx
.sql(&query)
.await
.map_err(|e| Error::query(format!("Failed to execute delete filter: {}", e)))?;
let results = df
.collect()
.await
.map_err(|e| Error::query(format!("Failed to collect delete results: {}", e)))?;
let new_row_count: usize = results.iter().map(|b| b.num_rows()).sum();
let rows_deleted = self.row_count - new_row_count;
self.data = results;
self.row_count = new_row_count;
Ok(rows_deleted)
}
pub async fn update_rows(
&mut self,
filter_expr: &str,
replacement_batch: RecordBatch,
) -> Result<(usize, usize)> {
updates::validate_batch_schema(&self.arrow_schema, &replacement_batch.schema())?;
let rows_deleted = self.delete_rows(filter_expr).await?;
let rows_added = self.append_rows(replacement_batch)?;
Ok((rows_deleted, rows_added))
}
pub fn consolidate_batches(&mut self) -> Result<usize> {
let old_batch_count = self.data.len();
if old_batch_count <= 1 {
return Ok(old_batch_count);
}
let consolidated = updates::concat_record_batches(&self.arrow_schema, &self.data)?;
self.data = vec![consolidated];
Ok(old_batch_count)
}
pub fn batch_count(&self) -> usize {
self.data.len()
}
}