Skip to main content

varpulis_runtime/
columnar.rs

1//! Columnar Event Storage for SIMD-Optimized Aggregations
2//!
3//! This module provides columnar storage for windowed event processing,
4//! enabling SIMD-optimized aggregations on contiguous memory.
5//!
6//! # Architecture
7//!
8//! Instead of storing events row-by-row (`Vec<Event>`), this module stores:
9//! - Original events for predicates and emit operations
10//! - Eagerly extracted timestamps for temporal operations
11//! - Lazily populated column caches for numeric aggregations
12//!
13//! # Performance Gains
14//!
15//! - 8-40x faster aggregations (SIMD on contiguous memory)
16//! - Amortized field extraction across multiple aggregations
17//! - Better cache locality for numeric operations
18//!
19//! # Example
20//!
21//! ```rust,no_run
22//! use varpulis_runtime::columnar::ColumnarBuffer;
23//! use varpulis_runtime::Event;
24//! use std::sync::Arc;
25//!
26//! let mut buffer = ColumnarBuffer::new();
27//! buffer.push(Arc::new(Event::new("Trade").with_field("price", 100.0)));
28//! buffer.push(Arc::new(Event::new("Trade").with_field("price", 105.0)));
29//!
30//! // First access extracts field values to contiguous array
31//! let prices = buffer.ensure_float_column("price");
32//! assert_eq!(prices.len(), 2);
33//!
34//! // Subsequent accesses reuse cached column
35//! let prices_again = buffer.ensure_float_column("price");
36//! assert_eq!(prices_again.len(), 2);
37//! ```
38
39use std::sync::Arc;
40
41use rustc_hash::FxHashMap;
42
43use crate::event::{Event, SharedEvent};
44use crate::persistence::SerializableEvent;
45
46/// Type-specific column storage for SIMD operations.
47///
48/// Each variant stores values in a contiguous Vec for cache-friendly
49/// SIMD processing. Missing values are represented as:
50/// - `f64::NAN` for Float64
51/// - `i64::MIN` for Int64
52/// - `None` for String and Bool
53#[derive(Debug, Clone)]
54pub enum Column {
55    /// Float64 column with NaN for missing values
56    Float64(Vec<f64>),
57    /// Int64 column with i64::MIN for missing values
58    Int64(Vec<i64>),
59    /// String column with Option for nullable strings
60    String(Vec<Option<Arc<str>>>),
61    /// Bool column with Option for nullable bools
62    Bool(Vec<Option<bool>>),
63}
64
65impl Column {
66    /// Returns the number of elements in the column.
67    pub const fn len(&self) -> usize {
68        match self {
69            Self::Float64(v) => v.len(),
70            Self::Int64(v) => v.len(),
71            Self::String(v) => v.len(),
72            Self::Bool(v) => v.len(),
73        }
74    }
75
76    /// Returns true if the column is empty.
77    pub const fn is_empty(&self) -> bool {
78        self.len() == 0
79    }
80
81    /// Returns the Float64 slice if this is a Float64 column.
82    pub fn as_float64(&self) -> Option<&[f64]> {
83        match self {
84            Self::Float64(v) => Some(v),
85            _ => None,
86        }
87    }
88
89    /// Returns the Int64 slice if this is an Int64 column.
90    pub fn as_int64(&self) -> Option<&[i64]> {
91        match self {
92            Self::Int64(v) => Some(v),
93            _ => None,
94        }
95    }
96
97    /// Returns the String slice if this is a String column.
98    pub fn as_string(&self) -> Option<&[Option<Arc<str>>]> {
99        match self {
100            Self::String(v) => Some(v),
101            _ => None,
102        }
103    }
104
105    /// Returns the Bool slice if this is a Bool column.
106    pub fn as_bool(&self) -> Option<&[Option<bool>]> {
107        match self {
108            Self::Bool(v) => Some(v),
109            _ => None,
110        }
111    }
112}
113
114/// Columnar buffer with lazy field extraction.
115///
116/// This buffer stores events in both row and columnar format:
117/// - `events`: Original events preserved for predicates/emit operations
118/// - `timestamps`: Eagerly extracted timestamps for temporal operations
119/// - `columns`: Lazily populated cache of extracted field columns
120///
121/// When an aggregation needs a field, the column is extracted once and
122/// cached for subsequent aggregations on the same field.
123#[derive(Debug)]
124pub struct ColumnarBuffer {
125    /// Original events preserved for predicates and emit
126    events: Vec<SharedEvent>,
127    /// Timestamps in milliseconds (eagerly extracted)
128    timestamps: Vec<i64>,
129    /// Lazily populated column cache by field name
130    columns: FxHashMap<Arc<str>, Column>,
131}
132
133impl Default for ColumnarBuffer {
134    fn default() -> Self {
135        Self::new()
136    }
137}
138
139impl ColumnarBuffer {
140    /// Create a new empty columnar buffer.
141    pub fn new() -> Self {
142        Self {
143            events: Vec::new(),
144            timestamps: Vec::new(),
145            columns: FxHashMap::default(),
146        }
147    }
148
149    /// Create a new columnar buffer with the specified capacity.
150    pub fn with_capacity(capacity: usize) -> Self {
151        Self {
152            events: Vec::with_capacity(capacity),
153            timestamps: Vec::with_capacity(capacity),
154            columns: FxHashMap::default(),
155        }
156    }
157
158    /// Push an event into the buffer.
159    ///
160    /// This eagerly extracts the timestamp and clears any cached columns
161    /// to maintain consistency.
162    pub fn push(&mut self, event: SharedEvent) {
163        // Eagerly extract timestamp
164        self.timestamps.push(event.timestamp.timestamp_millis());
165        self.events.push(event);
166        // Clear column cache since we added new data
167        // (lazily re-extracted on next access)
168        if !self.columns.is_empty() {
169            self.columns.clear();
170        }
171    }
172
173    /// Drain the first `count` events from the buffer.
174    ///
175    /// Returns the drained events as a `Vec<SharedEvent>`.
176    /// Clears all cached columns since indices shift.
177    pub fn drain_front(&mut self, count: usize) -> Vec<SharedEvent> {
178        let count = count.min(self.events.len());
179        let drained: Vec<_> = self.events.drain(0..count).collect();
180        self.timestamps.drain(0..count);
181        // Clear column cache since indices shifted
182        self.columns.clear();
183        drained
184    }
185
186    /// Take all events from the buffer, leaving it empty.
187    ///
188    /// Returns the events as a `Vec<SharedEvent>`.
189    pub fn take_all(&mut self) -> Vec<SharedEvent> {
190        self.timestamps.clear();
191        self.columns.clear();
192        std::mem::take(&mut self.events)
193    }
194
195    /// Clear all events from the buffer.
196    pub fn clear(&mut self) {
197        self.events.clear();
198        self.timestamps.clear();
199        self.columns.clear();
200    }
201
202    /// Get the events as a slice.
203    pub fn events(&self) -> &[SharedEvent] {
204        &self.events
205    }
206
207    /// Get the timestamps as a slice (milliseconds since epoch).
208    pub fn timestamps(&self) -> &[i64] {
209        &self.timestamps
210    }
211
212    /// Get the number of events in the buffer.
213    pub const fn len(&self) -> usize {
214        self.events.len()
215    }
216
217    /// Check if the buffer is empty.
218    pub const fn is_empty(&self) -> bool {
219        self.events.is_empty()
220    }
221
222    /// Ensure a Float64 column exists for the given field.
223    ///
224    /// If the column is already cached, returns the cached slice.
225    /// Otherwise, extracts the field from all events and caches it.
226    ///
227    /// Missing values are stored as `f64::NAN`.
228    pub fn ensure_float_column(&mut self, field: &str) -> &[f64] {
229        let field_key: Arc<str> = field.into();
230
231        if !self.columns.contains_key(&field_key) {
232            // Extract column from events
233            let values: Vec<f64> = self
234                .events
235                .iter()
236                .map(|e| e.get_float(field).unwrap_or(f64::NAN))
237                .collect();
238            self.columns
239                .insert(field_key.clone(), Column::Float64(values));
240        }
241
242        match self.columns.get(&field_key) {
243            Some(Column::Float64(v)) => v,
244            _ => &[], // Should never happen due to insert above
245        }
246    }
247
248    /// Ensure an Int64 column exists for the given field.
249    ///
250    /// If the column is already cached, returns the cached slice.
251    /// Otherwise, extracts the field from all events and caches it.
252    ///
253    /// Missing values are stored as `i64::MIN`.
254    pub fn ensure_int_column(&mut self, field: &str) -> &[i64] {
255        let field_key: Arc<str> = field.into();
256
257        if !self.columns.contains_key(&field_key) {
258            let values: Vec<i64> = self
259                .events
260                .iter()
261                .map(|e| e.get_int(field).unwrap_or(i64::MIN))
262                .collect();
263            self.columns
264                .insert(field_key.clone(), Column::Int64(values));
265        }
266
267        match self.columns.get(&field_key) {
268            Some(Column::Int64(v)) => v,
269            _ => &[],
270        }
271    }
272
273    /// Ensure a String column exists for the given field.
274    ///
275    /// If the column is already cached, returns the cached slice.
276    /// Otherwise, extracts the field from all events and caches it.
277    pub fn ensure_string_column(&mut self, field: &str) -> &[Option<Arc<str>>] {
278        let field_key: Arc<str> = field.into();
279
280        if !self.columns.contains_key(&field_key) {
281            let values: Vec<Option<Arc<str>>> = self
282                .events
283                .iter()
284                .map(|e| e.get_str(field).map(Arc::from))
285                .collect();
286            self.columns
287                .insert(field_key.clone(), Column::String(values));
288        }
289
290        match self.columns.get(&field_key) {
291            Some(Column::String(v)) => v,
292            _ => &[],
293        }
294    }
295
296    /// Check if a column is already cached.
297    pub fn has_column(&self, field: &str) -> bool {
298        self.columns.contains_key(field)
299    }
300
301    /// Get the cached column if it exists.
302    pub fn get_column(&self, field: &str) -> Option<&Column> {
303        self.columns.get(field)
304    }
305
306    /// Create a checkpoint of the buffer for persistence.
307    ///
308    /// Note: Columns are NOT checkpointed - they are lazily re-extracted on restore.
309    pub fn checkpoint(&self) -> ColumnarCheckpoint {
310        ColumnarCheckpoint {
311            events: self
312                .events
313                .iter()
314                .map(|e| SerializableEvent::from(e.as_ref()))
315                .collect(),
316            timestamps: self.timestamps.clone(),
317        }
318    }
319
320    /// Restore buffer state from a checkpoint.
321    pub fn restore(&mut self, cp: &ColumnarCheckpoint) {
322        self.events = cp
323            .events
324            .iter()
325            .map(|se| Arc::new(Event::from(se.clone())))
326            .collect();
327        self.timestamps = cp.timestamps.clone();
328        self.columns.clear(); // Columns will be lazily re-extracted
329    }
330
331    /// Create from a Vec of SharedEvents.
332    pub fn from_events(events: Vec<SharedEvent>) -> Self {
333        let timestamps: Vec<i64> = events
334            .iter()
335            .map(|e| e.timestamp.timestamp_millis())
336            .collect();
337        Self {
338            events,
339            timestamps,
340            columns: FxHashMap::default(),
341        }
342    }
343}
344
345/// Checkpoint for columnar buffer persistence.
346///
347/// Columns are NOT checkpointed - they are lazily re-extracted on restore.
348/// This keeps checkpoint size minimal while maintaining correct behavior.
349#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
350pub struct ColumnarCheckpoint {
351    /// Serialized events
352    pub events: Vec<SerializableEvent>,
353    /// Timestamps in milliseconds
354    pub timestamps: Vec<i64>,
355}
356
357/// Trait for windows that support columnar access.
358///
359/// This trait provides a uniform interface for accessing columnar storage
360/// across different window types.
361pub trait ColumnarAccess {
362    /// Get a reference to the columnar buffer.
363    fn columnar(&self) -> &ColumnarBuffer;
364
365    /// Get a mutable reference to the columnar buffer.
366    fn columnar_mut(&mut self) -> &mut ColumnarBuffer;
367
368    /// Get a float column, ensuring it exists.
369    fn get_float_column(&mut self, field: &str) -> &[f64] {
370        self.columnar_mut().ensure_float_column(field)
371    }
372
373    /// Get an int column, ensuring it exists.
374    fn get_int_column(&mut self, field: &str) -> &[i64] {
375        self.columnar_mut().ensure_int_column(field)
376    }
377
378    /// Get a string column, ensuring it exists.
379    fn get_string_column(&mut self, field: &str) -> &[Option<Arc<str>>] {
380        self.columnar_mut().ensure_string_column(field)
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use chrono::{Duration, Utc};
387
388    use super::*;
389    use crate::Event;
390
391    #[test]
392    fn test_columnar_buffer_push() {
393        let mut buffer = ColumnarBuffer::new();
394        let base_time = Utc::now();
395
396        buffer.push(Arc::new(
397            Event::new("Trade")
398                .with_timestamp(base_time)
399                .with_field("price", 100.0),
400        ));
401        buffer.push(Arc::new(
402            Event::new("Trade")
403                .with_timestamp(base_time + Duration::seconds(1))
404                .with_field("price", 105.0),
405        ));
406
407        assert_eq!(buffer.len(), 2);
408        assert_eq!(buffer.timestamps().len(), 2);
409        assert!(buffer.columns.is_empty()); // Columns not extracted yet
410    }
411
412    #[test]
413    fn test_columnar_buffer_ensure_float_column() {
414        let mut buffer = ColumnarBuffer::new();
415
416        buffer.push(Arc::new(Event::new("Trade").with_field("price", 100.0)));
417        buffer.push(Arc::new(Event::new("Trade").with_field("price", 105.0)));
418        buffer.push(Arc::new(Event::new("Trade"))); // Missing price
419
420        let prices = buffer.ensure_float_column("price");
421        assert_eq!(prices.len(), 3);
422        assert_eq!(prices[0], 100.0);
423        assert_eq!(prices[1], 105.0);
424        assert!(prices[2].is_nan()); // Missing value is NaN
425
426        // Should be cached now
427        assert!(buffer.has_column("price"));
428    }
429
430    #[test]
431    fn test_columnar_buffer_drain_front() {
432        let mut buffer = ColumnarBuffer::new();
433
434        for i in 0..5 {
435            buffer.push(Arc::new(
436                Event::new("Trade").with_field("price", (i as f64) * 10.0),
437            ));
438        }
439
440        // Extract column before drain
441        let _prices = buffer.ensure_float_column("price");
442        assert!(buffer.has_column("price"));
443
444        // Drain first 2 events
445        let drained = buffer.drain_front(2);
446        assert_eq!(drained.len(), 2);
447        assert_eq!(buffer.len(), 3);
448
449        // Column cache should be cleared
450        assert!(!buffer.has_column("price"));
451
452        // Re-extract should work with updated data
453        let prices = buffer.ensure_float_column("price");
454        assert_eq!(prices.len(), 3);
455        assert_eq!(prices[0], 20.0); // Was index 2
456    }
457
458    #[test]
459    fn test_columnar_buffer_take_all() {
460        let mut buffer = ColumnarBuffer::new();
461
462        buffer.push(Arc::new(Event::new("Trade").with_field("price", 100.0)));
463        buffer.push(Arc::new(Event::new("Trade").with_field("price", 105.0)));
464
465        let events = buffer.take_all();
466        assert_eq!(events.len(), 2);
467        assert!(buffer.is_empty());
468        assert!(buffer.timestamps().is_empty());
469    }
470
471    #[test]
472    fn test_columnar_buffer_ensure_int_column() {
473        let mut buffer = ColumnarBuffer::new();
474
475        buffer.push(Arc::new(Event::new("Order").with_field("quantity", 10i64)));
476        buffer.push(Arc::new(Event::new("Order").with_field("quantity", 20i64)));
477
478        let quantities = buffer.ensure_int_column("quantity");
479        assert_eq!(quantities.len(), 2);
480        assert_eq!(quantities[0], 10);
481        assert_eq!(quantities[1], 20);
482    }
483
484    #[test]
485    fn test_columnar_buffer_ensure_string_column() {
486        let mut buffer = ColumnarBuffer::new();
487
488        buffer.push(Arc::new(Event::new("Trade").with_field("symbol", "IBM")));
489        buffer.push(Arc::new(Event::new("Trade").with_field("symbol", "AAPL")));
490        buffer.push(Arc::new(Event::new("Trade"))); // Missing symbol
491
492        let symbols = buffer.ensure_string_column("symbol");
493        assert_eq!(symbols.len(), 3);
494        assert_eq!(symbols[0].as_deref(), Some("IBM"));
495        assert_eq!(symbols[1].as_deref(), Some("AAPL"));
496        assert_eq!(symbols[2], None);
497    }
498
499    #[test]
500    fn test_columnar_buffer_checkpoint_restore() {
501        let mut buffer = ColumnarBuffer::new();
502        let base_time = Utc::now();
503
504        buffer.push(Arc::new(
505            Event::new("Trade")
506                .with_timestamp(base_time)
507                .with_field("price", 100.0),
508        ));
509        buffer.push(Arc::new(
510            Event::new("Trade")
511                .with_timestamp(base_time + Duration::seconds(1))
512                .with_field("price", 105.0),
513        ));
514
515        // Extract column before checkpoint
516        let _prices = buffer.ensure_float_column("price");
517
518        // Checkpoint
519        let cp = buffer.checkpoint();
520
521        // Restore to new buffer
522        let mut restored = ColumnarBuffer::new();
523        restored.restore(&cp);
524
525        assert_eq!(restored.len(), 2);
526        assert_eq!(restored.timestamps().len(), 2);
527        assert!(!restored.has_column("price")); // Columns not preserved
528
529        // Should be able to re-extract
530        let prices = restored.ensure_float_column("price");
531        assert_eq!(prices.len(), 2);
532        assert_eq!(prices[0], 100.0);
533    }
534
535    #[test]
536    fn test_columnar_buffer_from_events() {
537        let events = vec![
538            Arc::new(Event::new("Trade").with_field("price", 100.0)),
539            Arc::new(Event::new("Trade").with_field("price", 105.0)),
540        ];
541
542        let buffer = ColumnarBuffer::from_events(events);
543        assert_eq!(buffer.len(), 2);
544        assert_eq!(buffer.timestamps().len(), 2);
545    }
546
547    #[test]
548    fn test_column_type_accessors() {
549        let float_col = Column::Float64(vec![1.0, 2.0, 3.0]);
550        assert!(float_col.as_float64().is_some());
551        assert!(float_col.as_int64().is_none());
552        assert_eq!(float_col.len(), 3);
553
554        let int_col = Column::Int64(vec![1, 2, 3]);
555        assert!(int_col.as_int64().is_some());
556        assert!(int_col.as_float64().is_none());
557
558        let str_col = Column::String(vec![Some("a".into()), None]);
559        assert!(str_col.as_string().is_some());
560        assert_eq!(str_col.len(), 2);
561
562        let bool_col = Column::Bool(vec![Some(true), Some(false)]);
563        assert!(bool_col.as_bool().is_some());
564    }
565
566    #[test]
567    fn test_columnar_buffer_clear() {
568        let mut buffer = ColumnarBuffer::new();
569        buffer.push(Arc::new(Event::new("Trade").with_field("price", 100.0)));
570        buffer.ensure_float_column("price");
571
572        buffer.clear();
573
574        assert!(buffer.is_empty());
575        assert!(buffer.timestamps().is_empty());
576        assert!(!buffer.has_column("price"));
577    }
578
579    #[test]
580    fn test_columnar_buffer_with_capacity() {
581        let buffer = ColumnarBuffer::with_capacity(100);
582        assert!(buffer.is_empty());
583        // Capacity is internal, just verify it doesn't panic
584    }
585}