elasticube_core/cube/mod.rs
1//! Core ElastiCube data structures
2
3mod calculated;
4mod dimension;
5mod hierarchy;
6mod measure;
7mod schema;
8mod updates;
9
10pub use calculated::{CalculatedMeasure, VirtualDimension};
11pub use dimension::Dimension;
12pub use hierarchy::Hierarchy;
13pub use measure::{AggFunc, Measure};
14pub use schema::CubeSchema;
15
16use crate::error::{Error, Result};
17use crate::query::QueryBuilder;
18use arrow::datatypes::Schema as ArrowSchema;
19use arrow::record_batch::RecordBatch;
20use std::sync::Arc;
21
22/// The main ElastiCube structure
23///
24/// Represents a multidimensional cube with dimensions, measures, and data stored
25/// in Apache Arrow's columnar format for efficient analytical queries.
26#[derive(Debug, Clone)]
27pub struct ElastiCube {
28 /// Cube metadata and schema definition
29 schema: CubeSchema,
30
31 /// Underlying Arrow schema
32 arrow_schema: Arc<ArrowSchema>,
33
34 /// Data stored as Arrow RecordBatches
35 /// Using Vec to support chunked data (each RecordBatch is a chunk)
36 data: Vec<RecordBatch>,
37
38 /// Total number of rows across all batches
39 row_count: usize,
40}
41
42impl ElastiCube {
43 /// Create a new ElastiCube
44 pub fn new(
45 schema: CubeSchema,
46 arrow_schema: Arc<ArrowSchema>,
47 data: Vec<RecordBatch>,
48 ) -> Result<Self> {
49 let row_count = data.iter().map(|batch| batch.num_rows()).sum();
50
51 Ok(Self {
52 schema,
53 arrow_schema,
54 data,
55 row_count,
56 })
57 }
58
59 /// Get the cube schema
60 pub fn schema(&self) -> &CubeSchema {
61 &self.schema
62 }
63
64 /// Get the Arrow schema
65 pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
66 &self.arrow_schema
67 }
68
69 /// Get the data batches
70 pub fn data(&self) -> &[RecordBatch] {
71 &self.data
72 }
73
74 /// Get the total number of rows
75 pub fn row_count(&self) -> usize {
76 self.row_count
77 }
78
79 /// Get all dimensions
80 pub fn dimensions(&self) -> Vec<&Dimension> {
81 self.schema.dimensions()
82 }
83
84 /// Get all measures
85 pub fn measures(&self) -> Vec<&Measure> {
86 self.schema.measures()
87 }
88
89 /// Get all hierarchies
90 pub fn hierarchies(&self) -> Vec<&Hierarchy> {
91 self.schema.hierarchies()
92 }
93
94 /// Get a dimension by name
95 pub fn get_dimension(&self, name: &str) -> Option<&Dimension> {
96 self.schema.get_dimension(name)
97 }
98
99 /// Get a measure by name
100 pub fn get_measure(&self, name: &str) -> Option<&Measure> {
101 self.schema.get_measure(name)
102 }
103
104 /// Get a hierarchy by name
105 pub fn get_hierarchy(&self, name: &str) -> Option<&Hierarchy> {
106 self.schema.get_hierarchy(name)
107 }
108
109 /// Create a query builder for this cube
110 ///
111 /// This method requires the cube to be wrapped in an `Arc<ElastiCube>` because
112 /// the query builder needs to share ownership of the cube data across async
113 /// query execution and potential caching operations.
114 ///
115 /// # Returns
116 /// A QueryBuilder instance for executing queries against this cube
117 ///
118 /// # Arc Requirement
119 /// The cube must be wrapped in `Arc` before calling this method:
120 ///
121 /// ```rust,ignore
122 /// use std::sync::Arc;
123 ///
124 /// let cube = ElastiCubeBuilder::new("sales")
125 /// .load_csv("data.csv")?
126 /// .build()?;
127 ///
128 /// // Wrap in Arc for querying
129 /// let cube = Arc::new(cube);
130 ///
131 /// // Now we can query
132 /// let results = cube.query()?
133 /// .select(&["region", "SUM(sales) as total"])
134 /// .group_by(&["region"])
135 /// .execute()
136 /// .await?;
137 /// ```
138 ///
139 /// # See Also
140 /// - [`query_with_config`](Self::query_with_config) - Query with custom optimization settings
141 pub fn query(self: Arc<Self>) -> Result<QueryBuilder> {
142 QueryBuilder::new(self)
143 }
144
145 /// Get cube statistics for performance analysis
146 ///
147 /// Returns statistics about the cube's data including row count,
148 /// partition count, memory usage, and column-level statistics.
149 ///
150 /// # Example
151 /// ```rust,ignore
152 /// let stats = cube.statistics();
153 /// println!("Cube: {}", stats.summary());
154 /// ```
155 pub fn statistics(&self) -> crate::optimization::CubeStatistics {
156 crate::optimization::CubeStatistics::from_batches(&self.data)
157 }
158
159 /// Create a query builder with custom optimization configuration
160 ///
161 /// Like [`query`](Self::query), this requires the cube to be wrapped in `Arc`.
162 /// Use this method when you need to customize query execution behavior such as
163 /// parallelism, batch size, or caching settings.
164 ///
165 /// # Arguments
166 /// * `config` - Optimization configuration to use for queries
167 ///
168 /// # Returns
169 /// A QueryBuilder instance with the specified optimization settings
170 ///
171 /// # Example
172 /// ```rust,ignore
173 /// use std::sync::Arc;
174 /// use elasticube_core::OptimizationConfig;
175 ///
176 /// let cube = Arc::new(cube); // Wrap in Arc
177 ///
178 /// let config = OptimizationConfig::new()
179 /// .with_target_partitions(8)
180 /// .with_batch_size(4096);
181 ///
182 /// let results = cube.query_with_config(config)?
183 /// .select(&["region", "SUM(sales)"])
184 /// .execute()
185 /// .await?;
186 /// ```
187 pub fn query_with_config(
188 self: Arc<Self>,
189 config: crate::optimization::OptimizationConfig,
190 ) -> Result<QueryBuilder> {
191 QueryBuilder::with_config(self, config)
192 }
193
194 // ============================================================
195 // Data Update Operations
196 // ============================================================
197
198 /// Append new rows from a RecordBatch to the cube
199 ///
200 /// This method adds new rows to the cube by appending a RecordBatch.
201 /// The schema of the new batch must match the cube's schema exactly.
202 ///
203 /// # Arguments
204 /// * `batch` - RecordBatch containing rows to append
205 ///
206 /// # Returns
207 /// Number of rows added
208 ///
209 /// # Example
210 /// ```rust,ignore
211 /// let new_batch = RecordBatch::try_new(schema, columns)?;
212 /// let rows_added = cube.append_rows(new_batch)?;
213 /// println!("Added {} rows", rows_added);
214 /// ```
215 pub fn append_rows(&mut self, batch: RecordBatch) -> Result<usize> {
216 // Validate schema compatibility
217 updates::validate_batch_schema(&self.arrow_schema, &batch.schema())?;
218
219 let rows_added = batch.num_rows();
220
221 // Add the batch to our data
222 self.data.push(batch);
223 self.row_count += rows_added;
224
225 Ok(rows_added)
226 }
227
228 /// Append multiple RecordBatches to the cube (incremental loading)
229 ///
230 /// This method adds new data incrementally by appending multiple batches.
231 /// All batches must have schemas compatible with the cube's schema.
232 ///
233 /// # Arguments
234 /// * `batches` - Vector of RecordBatches to append
235 ///
236 /// # Returns
237 /// Total number of rows added
238 ///
239 /// # Example
240 /// ```rust,ignore
241 /// let batches = vec![batch1, batch2, batch3];
242 /// let total_rows = cube.append_batches(batches)?;
243 /// println!("Appended {} rows total", total_rows);
244 /// ```
245 pub fn append_batches(&mut self, batches: Vec<RecordBatch>) -> Result<usize> {
246 if batches.is_empty() {
247 return Ok(0);
248 }
249
250 // Validate all batches first
251 for batch in &batches {
252 updates::validate_batch_schema(&self.arrow_schema, &batch.schema())?;
253 }
254
255 // Count total rows
256 let rows_added: usize = batches.iter().map(|b| b.num_rows()).sum();
257
258 // Append all batches
259 self.data.extend(batches);
260 self.row_count += rows_added;
261
262 Ok(rows_added)
263 }
264
265 /// Delete rows from the cube based on a SQL filter expression
266 ///
267 /// This method removes rows that match the given SQL WHERE clause predicate.
268 /// Since RecordBatch is immutable, this creates new batches without the deleted rows.
269 ///
270 /// # Arguments
271 /// * `filter_expr` - SQL WHERE clause expression (e.g., "age < 18" or "region = 'North'")
272 ///
273 /// # Returns
274 /// Number of rows deleted
275 ///
276 /// # Example
277 /// ```rust,ignore
278 /// // Delete all rows where sales < 100
279 /// let deleted = cube.delete_rows("sales < 100").await?;
280 /// println!("Deleted {} rows", deleted);
281 /// ```
282 pub async fn delete_rows(&mut self, filter_expr: &str) -> Result<usize> {
283 // We need to evaluate the filter using DataFusion to get a boolean mask
284 // Then apply the inverse of that mask to keep only non-matching rows
285
286 use datafusion::prelude::*;
287
288 // Create a session context
289 let ctx = SessionContext::new();
290
291 // Register the current data as a table
292 let table = datafusion::datasource::MemTable::try_new(
293 self.arrow_schema.clone(),
294 vec![self.data.clone()],
295 )
296 .map_err(|e| Error::query(format!("Failed to create temp table: {}", e)))?;
297
298 ctx.register_table("temp_table", Arc::new(table))
299 .map_err(|e| Error::query(format!("Failed to register table: {}", e)))?;
300
301 // Build a query that selects all rows NOT matching the filter
302 // We invert the filter by wrapping it with NOT
303 let query = format!("SELECT * FROM temp_table WHERE NOT ({})", filter_expr);
304
305 // Execute the query
306 let df = ctx
307 .sql(&query)
308 .await
309 .map_err(|e| Error::query(format!("Failed to execute delete filter: {}", e)))?;
310
311 let results = df
312 .collect()
313 .await
314 .map_err(|e| Error::query(format!("Failed to collect delete results: {}", e)))?;
315
316 // Calculate rows deleted
317 let new_row_count: usize = results.iter().map(|b| b.num_rows()).sum();
318 let rows_deleted = self.row_count - new_row_count;
319
320 // Update the cube data
321 self.data = results;
322 self.row_count = new_row_count;
323
324 Ok(rows_deleted)
325 }
326
327 /// Update rows in the cube based on a filter and replacement batch
328 ///
329 /// This method updates rows matching a filter expression by:
330 /// 1. Deleting rows that match the filter
331 /// 2. Appending the replacement batch
332 ///
333 /// The replacement batch must have a schema compatible with the cube.
334 ///
335 /// # Arguments
336 /// * `filter_expr` - SQL WHERE clause to identify rows to update
337 /// * `replacement_batch` - RecordBatch containing updated rows
338 ///
339 /// # Returns
340 /// Tuple of (rows_deleted, rows_added)
341 ///
342 /// # Example
343 /// ```rust,ignore
344 /// // Update all North region sales with new values
345 /// let updated_data = create_updated_batch()?;
346 /// let (deleted, added) = cube.update_rows("region = 'North'", updated_data).await?;
347 /// println!("Updated {} rows", deleted);
348 /// ```
349 pub async fn update_rows(
350 &mut self,
351 filter_expr: &str,
352 replacement_batch: RecordBatch,
353 ) -> Result<(usize, usize)> {
354 // Validate the replacement batch schema
355 updates::validate_batch_schema(&self.arrow_schema, &replacement_batch.schema())?;
356
357 // Delete matching rows
358 let rows_deleted = self.delete_rows(filter_expr).await?;
359
360 // Append the replacement batch
361 let rows_added = self.append_rows(replacement_batch)?;
362
363 Ok((rows_deleted, rows_added))
364 }
365
366 /// Consolidate all data batches into a single batch
367 ///
368 /// This operation can improve query performance by reducing the number of
369 /// batches, but may increase memory usage temporarily during consolidation.
370 ///
371 /// # Returns
372 /// Number of batches before consolidation
373 ///
374 /// # Example
375 /// ```rust,ignore
376 /// let old_batch_count = cube.consolidate_batches()?;
377 /// println!("Consolidated from {} batches to 1 batch", old_batch_count);
378 /// ```
379 pub fn consolidate_batches(&mut self) -> Result<usize> {
380 let old_batch_count = self.data.len();
381
382 if old_batch_count <= 1 {
383 return Ok(old_batch_count);
384 }
385
386 // Concatenate all batches into one
387 let consolidated = updates::concat_record_batches(&self.arrow_schema, &self.data)?;
388
389 self.data = vec![consolidated];
390
391 Ok(old_batch_count)
392 }
393
394 /// Get the number of data batches in the cube
395 ///
396 /// Useful for monitoring fragmentation and deciding when to consolidate.
397 pub fn batch_count(&self) -> usize {
398 self.data.len()
399 }
400}