elasticube_core/cube/
mod.rs

1//! Core ElastiCube data structures
2
3mod calculated;
4mod dimension;
5mod hierarchy;
6mod measure;
7mod schema;
8mod updates;
9
10pub use calculated::{CalculatedMeasure, VirtualDimension};
11pub use dimension::Dimension;
12pub use hierarchy::Hierarchy;
13pub use measure::{AggFunc, Measure};
14pub use schema::CubeSchema;
15
16use crate::error::{Error, Result};
17use crate::query::QueryBuilder;
18use arrow::datatypes::Schema as ArrowSchema;
19use arrow::record_batch::RecordBatch;
20use std::sync::Arc;
21
22/// The main ElastiCube structure
23///
24/// Represents a multidimensional cube with dimensions, measures, and data stored
25/// in Apache Arrow's columnar format for efficient analytical queries.
26#[derive(Debug, Clone)]
27pub struct ElastiCube {
28    /// Cube metadata and schema definition
29    schema: CubeSchema,
30
31    /// Underlying Arrow schema
32    arrow_schema: Arc<ArrowSchema>,
33
34    /// Data stored as Arrow RecordBatches
35    /// Using Vec to support chunked data (each RecordBatch is a chunk)
36    data: Vec<RecordBatch>,
37
38    /// Total number of rows across all batches
39    row_count: usize,
40}
41
42impl ElastiCube {
43    /// Create a new ElastiCube
44    pub fn new(
45        schema: CubeSchema,
46        arrow_schema: Arc<ArrowSchema>,
47        data: Vec<RecordBatch>,
48    ) -> Result<Self> {
49        let row_count = data.iter().map(|batch| batch.num_rows()).sum();
50
51        Ok(Self {
52            schema,
53            arrow_schema,
54            data,
55            row_count,
56        })
57    }
58
59    /// Get the cube schema
60    pub fn schema(&self) -> &CubeSchema {
61        &self.schema
62    }
63
64    /// Get the Arrow schema
65    pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
66        &self.arrow_schema
67    }
68
69    /// Get the data batches
70    pub fn data(&self) -> &[RecordBatch] {
71        &self.data
72    }
73
74    /// Get the total number of rows
75    pub fn row_count(&self) -> usize {
76        self.row_count
77    }
78
79    /// Get all dimensions
80    pub fn dimensions(&self) -> Vec<&Dimension> {
81        self.schema.dimensions()
82    }
83
84    /// Get all measures
85    pub fn measures(&self) -> Vec<&Measure> {
86        self.schema.measures()
87    }
88
89    /// Get all hierarchies
90    pub fn hierarchies(&self) -> Vec<&Hierarchy> {
91        self.schema.hierarchies()
92    }
93
94    /// Get a dimension by name
95    pub fn get_dimension(&self, name: &str) -> Option<&Dimension> {
96        self.schema.get_dimension(name)
97    }
98
99    /// Get a measure by name
100    pub fn get_measure(&self, name: &str) -> Option<&Measure> {
101        self.schema.get_measure(name)
102    }
103
104    /// Get a hierarchy by name
105    pub fn get_hierarchy(&self, name: &str) -> Option<&Hierarchy> {
106        self.schema.get_hierarchy(name)
107    }
108
109    /// Create a query builder for this cube
110    ///
111    /// This method requires the cube to be wrapped in an `Arc<ElastiCube>` because
112    /// the query builder needs to share ownership of the cube data across async
113    /// query execution and potential caching operations.
114    ///
115    /// # Returns
116    /// A QueryBuilder instance for executing queries against this cube
117    ///
118    /// # Arc Requirement
119    /// The cube must be wrapped in `Arc` before calling this method:
120    ///
121    /// ```rust,ignore
122    /// use std::sync::Arc;
123    ///
124    /// let cube = ElastiCubeBuilder::new("sales")
125    ///     .load_csv("data.csv")?
126    ///     .build()?;
127    ///
128    /// // Wrap in Arc for querying
129    /// let cube = Arc::new(cube);
130    ///
131    /// // Now we can query
132    /// let results = cube.query()?
133    ///     .select(&["region", "SUM(sales) as total"])
134    ///     .group_by(&["region"])
135    ///     .execute()
136    ///     .await?;
137    /// ```
138    ///
139    /// # See Also
140    /// - [`query_with_config`](Self::query_with_config) - Query with custom optimization settings
141    pub fn query(self: Arc<Self>) -> Result<QueryBuilder> {
142        QueryBuilder::new(self)
143    }
144
145    /// Get cube statistics for performance analysis
146    ///
147    /// Returns statistics about the cube's data including row count,
148    /// partition count, memory usage, and column-level statistics.
149    ///
150    /// # Example
151    /// ```rust,ignore
152    /// let stats = cube.statistics();
153    /// println!("Cube: {}", stats.summary());
154    /// ```
155    pub fn statistics(&self) -> crate::optimization::CubeStatistics {
156        crate::optimization::CubeStatistics::from_batches(&self.data)
157    }
158
159    /// Create a query builder with custom optimization configuration
160    ///
161    /// Like [`query`](Self::query), this requires the cube to be wrapped in `Arc`.
162    /// Use this method when you need to customize query execution behavior such as
163    /// parallelism, batch size, or caching settings.
164    ///
165    /// # Arguments
166    /// * `config` - Optimization configuration to use for queries
167    ///
168    /// # Returns
169    /// A QueryBuilder instance with the specified optimization settings
170    ///
171    /// # Example
172    /// ```rust,ignore
173    /// use std::sync::Arc;
174    /// use elasticube_core::OptimizationConfig;
175    ///
176    /// let cube = Arc::new(cube); // Wrap in Arc
177    ///
178    /// let config = OptimizationConfig::new()
179    ///     .with_target_partitions(8)
180    ///     .with_batch_size(4096);
181    ///
182    /// let results = cube.query_with_config(config)?
183    ///     .select(&["region", "SUM(sales)"])
184    ///     .execute()
185    ///     .await?;
186    /// ```
187    pub fn query_with_config(
188        self: Arc<Self>,
189        config: crate::optimization::OptimizationConfig,
190    ) -> Result<QueryBuilder> {
191        QueryBuilder::with_config(self, config)
192    }
193
194    // ============================================================
195    // Data Update Operations
196    // ============================================================
197
198    /// Append new rows from a RecordBatch to the cube
199    ///
200    /// This method adds new rows to the cube by appending a RecordBatch.
201    /// The schema of the new batch must match the cube's schema exactly.
202    ///
203    /// # Arguments
204    /// * `batch` - RecordBatch containing rows to append
205    ///
206    /// # Returns
207    /// Number of rows added
208    ///
209    /// # Example
210    /// ```rust,ignore
211    /// let new_batch = RecordBatch::try_new(schema, columns)?;
212    /// let rows_added = cube.append_rows(new_batch)?;
213    /// println!("Added {} rows", rows_added);
214    /// ```
215    pub fn append_rows(&mut self, batch: RecordBatch) -> Result<usize> {
216        // Validate schema compatibility
217        updates::validate_batch_schema(&self.arrow_schema, &batch.schema())?;
218
219        let rows_added = batch.num_rows();
220
221        // Add the batch to our data
222        self.data.push(batch);
223        self.row_count += rows_added;
224
225        Ok(rows_added)
226    }
227
228    /// Append multiple RecordBatches to the cube (incremental loading)
229    ///
230    /// This method adds new data incrementally by appending multiple batches.
231    /// All batches must have schemas compatible with the cube's schema.
232    ///
233    /// # Arguments
234    /// * `batches` - Vector of RecordBatches to append
235    ///
236    /// # Returns
237    /// Total number of rows added
238    ///
239    /// # Example
240    /// ```rust,ignore
241    /// let batches = vec![batch1, batch2, batch3];
242    /// let total_rows = cube.append_batches(batches)?;
243    /// println!("Appended {} rows total", total_rows);
244    /// ```
245    pub fn append_batches(&mut self, batches: Vec<RecordBatch>) -> Result<usize> {
246        if batches.is_empty() {
247            return Ok(0);
248        }
249
250        // Validate all batches first
251        for batch in &batches {
252            updates::validate_batch_schema(&self.arrow_schema, &batch.schema())?;
253        }
254
255        // Count total rows
256        let rows_added: usize = batches.iter().map(|b| b.num_rows()).sum();
257
258        // Append all batches
259        self.data.extend(batches);
260        self.row_count += rows_added;
261
262        Ok(rows_added)
263    }
264
265    /// Delete rows from the cube based on a SQL filter expression
266    ///
267    /// This method removes rows that match the given SQL WHERE clause predicate.
268    /// Since RecordBatch is immutable, this creates new batches without the deleted rows.
269    ///
270    /// # Arguments
271    /// * `filter_expr` - SQL WHERE clause expression (e.g., "age < 18" or "region = 'North'")
272    ///
273    /// # Returns
274    /// Number of rows deleted
275    ///
276    /// # Example
277    /// ```rust,ignore
278    /// // Delete all rows where sales < 100
279    /// let deleted = cube.delete_rows("sales < 100").await?;
280    /// println!("Deleted {} rows", deleted);
281    /// ```
282    pub async fn delete_rows(&mut self, filter_expr: &str) -> Result<usize> {
283        // We need to evaluate the filter using DataFusion to get a boolean mask
284        // Then apply the inverse of that mask to keep only non-matching rows
285
286        use datafusion::prelude::*;
287
288        // Create a session context
289        let ctx = SessionContext::new();
290
291        // Register the current data as a table
292        let table = datafusion::datasource::MemTable::try_new(
293            self.arrow_schema.clone(),
294            vec![self.data.clone()],
295        )
296        .map_err(|e| Error::query(format!("Failed to create temp table: {}", e)))?;
297
298        ctx.register_table("temp_table", Arc::new(table))
299            .map_err(|e| Error::query(format!("Failed to register table: {}", e)))?;
300
301        // Build a query that selects all rows NOT matching the filter
302        // We invert the filter by wrapping it with NOT
303        let query = format!("SELECT * FROM temp_table WHERE NOT ({})", filter_expr);
304
305        // Execute the query
306        let df = ctx
307            .sql(&query)
308            .await
309            .map_err(|e| Error::query(format!("Failed to execute delete filter: {}", e)))?;
310
311        let results = df
312            .collect()
313            .await
314            .map_err(|e| Error::query(format!("Failed to collect delete results: {}", e)))?;
315
316        // Calculate rows deleted
317        let new_row_count: usize = results.iter().map(|b| b.num_rows()).sum();
318        let rows_deleted = self.row_count - new_row_count;
319
320        // Update the cube data
321        self.data = results;
322        self.row_count = new_row_count;
323
324        Ok(rows_deleted)
325    }
326
327    /// Update rows in the cube based on a filter and replacement batch
328    ///
329    /// This method updates rows matching a filter expression by:
330    /// 1. Deleting rows that match the filter
331    /// 2. Appending the replacement batch
332    ///
333    /// The replacement batch must have a schema compatible with the cube.
334    ///
335    /// # Arguments
336    /// * `filter_expr` - SQL WHERE clause to identify rows to update
337    /// * `replacement_batch` - RecordBatch containing updated rows
338    ///
339    /// # Returns
340    /// Tuple of (rows_deleted, rows_added)
341    ///
342    /// # Example
343    /// ```rust,ignore
344    /// // Update all North region sales with new values
345    /// let updated_data = create_updated_batch()?;
346    /// let (deleted, added) = cube.update_rows("region = 'North'", updated_data).await?;
347    /// println!("Updated {} rows", deleted);
348    /// ```
349    pub async fn update_rows(
350        &mut self,
351        filter_expr: &str,
352        replacement_batch: RecordBatch,
353    ) -> Result<(usize, usize)> {
354        // Validate the replacement batch schema
355        updates::validate_batch_schema(&self.arrow_schema, &replacement_batch.schema())?;
356
357        // Delete matching rows
358        let rows_deleted = self.delete_rows(filter_expr).await?;
359
360        // Append the replacement batch
361        let rows_added = self.append_rows(replacement_batch)?;
362
363        Ok((rows_deleted, rows_added))
364    }
365
366    /// Consolidate all data batches into a single batch
367    ///
368    /// This operation can improve query performance by reducing the number of
369    /// batches, but may increase memory usage temporarily during consolidation.
370    ///
371    /// # Returns
372    /// Number of batches before consolidation
373    ///
374    /// # Example
375    /// ```rust,ignore
376    /// let old_batch_count = cube.consolidate_batches()?;
377    /// println!("Consolidated from {} batches to 1 batch", old_batch_count);
378    /// ```
379    pub fn consolidate_batches(&mut self) -> Result<usize> {
380        let old_batch_count = self.data.len();
381
382        if old_batch_count <= 1 {
383            return Ok(old_batch_count);
384        }
385
386        // Concatenate all batches into one
387        let consolidated = updates::concat_record_batches(&self.arrow_schema, &self.data)?;
388
389        self.data = vec![consolidated];
390
391        Ok(old_batch_count)
392    }
393
394    /// Get the number of data batches in the cube
395    ///
396    /// Useful for monitoring fragmentation and deciding when to consolidate.
397    pub fn batch_count(&self) -> usize {
398        self.data.len()
399    }
400}