1use std::sync::Arc;
40
41use rustc_hash::FxHashMap;
42
43use crate::event::{Event, SharedEvent};
44use crate::persistence::SerializableEvent;
45
46#[derive(Debug, Clone)]
54pub enum Column {
55 Float64(Vec<f64>),
57 Int64(Vec<i64>),
59 String(Vec<Option<Arc<str>>>),
61 Bool(Vec<Option<bool>>),
63}
64
65impl Column {
66 pub const fn len(&self) -> usize {
68 match self {
69 Self::Float64(v) => v.len(),
70 Self::Int64(v) => v.len(),
71 Self::String(v) => v.len(),
72 Self::Bool(v) => v.len(),
73 }
74 }
75
76 pub const fn is_empty(&self) -> bool {
78 self.len() == 0
79 }
80
81 pub fn as_float64(&self) -> Option<&[f64]> {
83 match self {
84 Self::Float64(v) => Some(v),
85 _ => None,
86 }
87 }
88
89 pub fn as_int64(&self) -> Option<&[i64]> {
91 match self {
92 Self::Int64(v) => Some(v),
93 _ => None,
94 }
95 }
96
97 pub fn as_string(&self) -> Option<&[Option<Arc<str>>]> {
99 match self {
100 Self::String(v) => Some(v),
101 _ => None,
102 }
103 }
104
105 pub fn as_bool(&self) -> Option<&[Option<bool>]> {
107 match self {
108 Self::Bool(v) => Some(v),
109 _ => None,
110 }
111 }
112}
113
114#[derive(Debug)]
124pub struct ColumnarBuffer {
125 events: Vec<SharedEvent>,
127 timestamps: Vec<i64>,
129 columns: FxHashMap<Arc<str>, Column>,
131}
132
133impl Default for ColumnarBuffer {
134 fn default() -> Self {
135 Self::new()
136 }
137}
138
139impl ColumnarBuffer {
140 pub fn new() -> Self {
142 Self {
143 events: Vec::new(),
144 timestamps: Vec::new(),
145 columns: FxHashMap::default(),
146 }
147 }
148
149 pub fn with_capacity(capacity: usize) -> Self {
151 Self {
152 events: Vec::with_capacity(capacity),
153 timestamps: Vec::with_capacity(capacity),
154 columns: FxHashMap::default(),
155 }
156 }
157
158 pub fn push(&mut self, event: SharedEvent) {
163 self.timestamps.push(event.timestamp.timestamp_millis());
165 self.events.push(event);
166 if !self.columns.is_empty() {
169 self.columns.clear();
170 }
171 }
172
173 pub fn drain_front(&mut self, count: usize) -> Vec<SharedEvent> {
178 let count = count.min(self.events.len());
179 let drained: Vec<_> = self.events.drain(0..count).collect();
180 self.timestamps.drain(0..count);
181 self.columns.clear();
183 drained
184 }
185
186 pub fn take_all(&mut self) -> Vec<SharedEvent> {
190 self.timestamps.clear();
191 self.columns.clear();
192 std::mem::take(&mut self.events)
193 }
194
195 pub fn clear(&mut self) {
197 self.events.clear();
198 self.timestamps.clear();
199 self.columns.clear();
200 }
201
202 pub fn events(&self) -> &[SharedEvent] {
204 &self.events
205 }
206
207 pub fn timestamps(&self) -> &[i64] {
209 &self.timestamps
210 }
211
212 pub const fn len(&self) -> usize {
214 self.events.len()
215 }
216
217 pub const fn is_empty(&self) -> bool {
219 self.events.is_empty()
220 }
221
222 pub fn ensure_float_column(&mut self, field: &str) -> &[f64] {
229 let field_key: Arc<str> = field.into();
230
231 if !self.columns.contains_key(&field_key) {
232 let values: Vec<f64> = self
234 .events
235 .iter()
236 .map(|e| e.get_float(field).unwrap_or(f64::NAN))
237 .collect();
238 self.columns
239 .insert(field_key.clone(), Column::Float64(values));
240 }
241
242 match self.columns.get(&field_key) {
243 Some(Column::Float64(v)) => v,
244 _ => &[], }
246 }
247
248 pub fn ensure_int_column(&mut self, field: &str) -> &[i64] {
255 let field_key: Arc<str> = field.into();
256
257 if !self.columns.contains_key(&field_key) {
258 let values: Vec<i64> = self
259 .events
260 .iter()
261 .map(|e| e.get_int(field).unwrap_or(i64::MIN))
262 .collect();
263 self.columns
264 .insert(field_key.clone(), Column::Int64(values));
265 }
266
267 match self.columns.get(&field_key) {
268 Some(Column::Int64(v)) => v,
269 _ => &[],
270 }
271 }
272
273 pub fn ensure_string_column(&mut self, field: &str) -> &[Option<Arc<str>>] {
278 let field_key: Arc<str> = field.into();
279
280 if !self.columns.contains_key(&field_key) {
281 let values: Vec<Option<Arc<str>>> = self
282 .events
283 .iter()
284 .map(|e| e.get_str(field).map(Arc::from))
285 .collect();
286 self.columns
287 .insert(field_key.clone(), Column::String(values));
288 }
289
290 match self.columns.get(&field_key) {
291 Some(Column::String(v)) => v,
292 _ => &[],
293 }
294 }
295
296 pub fn has_column(&self, field: &str) -> bool {
298 self.columns.contains_key(field)
299 }
300
301 pub fn get_column(&self, field: &str) -> Option<&Column> {
303 self.columns.get(field)
304 }
305
306 pub fn checkpoint(&self) -> ColumnarCheckpoint {
310 ColumnarCheckpoint {
311 events: self
312 .events
313 .iter()
314 .map(|e| SerializableEvent::from(e.as_ref()))
315 .collect(),
316 timestamps: self.timestamps.clone(),
317 }
318 }
319
320 pub fn restore(&mut self, cp: &ColumnarCheckpoint) {
322 self.events = cp
323 .events
324 .iter()
325 .map(|se| Arc::new(Event::from(se.clone())))
326 .collect();
327 self.timestamps = cp.timestamps.clone();
328 self.columns.clear(); }
330
331 pub fn from_events(events: Vec<SharedEvent>) -> Self {
333 let timestamps: Vec<i64> = events
334 .iter()
335 .map(|e| e.timestamp.timestamp_millis())
336 .collect();
337 Self {
338 events,
339 timestamps,
340 columns: FxHashMap::default(),
341 }
342 }
343}
344
345#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
350pub struct ColumnarCheckpoint {
351 pub events: Vec<SerializableEvent>,
353 pub timestamps: Vec<i64>,
355}
356
357pub trait ColumnarAccess {
362 fn columnar(&self) -> &ColumnarBuffer;
364
365 fn columnar_mut(&mut self) -> &mut ColumnarBuffer;
367
368 fn get_float_column(&mut self, field: &str) -> &[f64] {
370 self.columnar_mut().ensure_float_column(field)
371 }
372
373 fn get_int_column(&mut self, field: &str) -> &[i64] {
375 self.columnar_mut().ensure_int_column(field)
376 }
377
378 fn get_string_column(&mut self, field: &str) -> &[Option<Arc<str>>] {
380 self.columnar_mut().ensure_string_column(field)
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use chrono::{Duration, Utc};
387
388 use super::*;
389 use crate::Event;
390
391 #[test]
392 fn test_columnar_buffer_push() {
393 let mut buffer = ColumnarBuffer::new();
394 let base_time = Utc::now();
395
396 buffer.push(Arc::new(
397 Event::new("Trade")
398 .with_timestamp(base_time)
399 .with_field("price", 100.0),
400 ));
401 buffer.push(Arc::new(
402 Event::new("Trade")
403 .with_timestamp(base_time + Duration::seconds(1))
404 .with_field("price", 105.0),
405 ));
406
407 assert_eq!(buffer.len(), 2);
408 assert_eq!(buffer.timestamps().len(), 2);
409 assert!(buffer.columns.is_empty()); }
411
412 #[test]
413 fn test_columnar_buffer_ensure_float_column() {
414 let mut buffer = ColumnarBuffer::new();
415
416 buffer.push(Arc::new(Event::new("Trade").with_field("price", 100.0)));
417 buffer.push(Arc::new(Event::new("Trade").with_field("price", 105.0)));
418 buffer.push(Arc::new(Event::new("Trade"))); let prices = buffer.ensure_float_column("price");
421 assert_eq!(prices.len(), 3);
422 assert_eq!(prices[0], 100.0);
423 assert_eq!(prices[1], 105.0);
424 assert!(prices[2].is_nan()); assert!(buffer.has_column("price"));
428 }
429
430 #[test]
431 fn test_columnar_buffer_drain_front() {
432 let mut buffer = ColumnarBuffer::new();
433
434 for i in 0..5 {
435 buffer.push(Arc::new(
436 Event::new("Trade").with_field("price", (i as f64) * 10.0),
437 ));
438 }
439
440 let _prices = buffer.ensure_float_column("price");
442 assert!(buffer.has_column("price"));
443
444 let drained = buffer.drain_front(2);
446 assert_eq!(drained.len(), 2);
447 assert_eq!(buffer.len(), 3);
448
449 assert!(!buffer.has_column("price"));
451
452 let prices = buffer.ensure_float_column("price");
454 assert_eq!(prices.len(), 3);
455 assert_eq!(prices[0], 20.0); }
457
458 #[test]
459 fn test_columnar_buffer_take_all() {
460 let mut buffer = ColumnarBuffer::new();
461
462 buffer.push(Arc::new(Event::new("Trade").with_field("price", 100.0)));
463 buffer.push(Arc::new(Event::new("Trade").with_field("price", 105.0)));
464
465 let events = buffer.take_all();
466 assert_eq!(events.len(), 2);
467 assert!(buffer.is_empty());
468 assert!(buffer.timestamps().is_empty());
469 }
470
471 #[test]
472 fn test_columnar_buffer_ensure_int_column() {
473 let mut buffer = ColumnarBuffer::new();
474
475 buffer.push(Arc::new(Event::new("Order").with_field("quantity", 10i64)));
476 buffer.push(Arc::new(Event::new("Order").with_field("quantity", 20i64)));
477
478 let quantities = buffer.ensure_int_column("quantity");
479 assert_eq!(quantities.len(), 2);
480 assert_eq!(quantities[0], 10);
481 assert_eq!(quantities[1], 20);
482 }
483
484 #[test]
485 fn test_columnar_buffer_ensure_string_column() {
486 let mut buffer = ColumnarBuffer::new();
487
488 buffer.push(Arc::new(Event::new("Trade").with_field("symbol", "IBM")));
489 buffer.push(Arc::new(Event::new("Trade").with_field("symbol", "AAPL")));
490 buffer.push(Arc::new(Event::new("Trade"))); let symbols = buffer.ensure_string_column("symbol");
493 assert_eq!(symbols.len(), 3);
494 assert_eq!(symbols[0].as_deref(), Some("IBM"));
495 assert_eq!(symbols[1].as_deref(), Some("AAPL"));
496 assert_eq!(symbols[2], None);
497 }
498
499 #[test]
500 fn test_columnar_buffer_checkpoint_restore() {
501 let mut buffer = ColumnarBuffer::new();
502 let base_time = Utc::now();
503
504 buffer.push(Arc::new(
505 Event::new("Trade")
506 .with_timestamp(base_time)
507 .with_field("price", 100.0),
508 ));
509 buffer.push(Arc::new(
510 Event::new("Trade")
511 .with_timestamp(base_time + Duration::seconds(1))
512 .with_field("price", 105.0),
513 ));
514
515 let _prices = buffer.ensure_float_column("price");
517
518 let cp = buffer.checkpoint();
520
521 let mut restored = ColumnarBuffer::new();
523 restored.restore(&cp);
524
525 assert_eq!(restored.len(), 2);
526 assert_eq!(restored.timestamps().len(), 2);
527 assert!(!restored.has_column("price")); let prices = restored.ensure_float_column("price");
531 assert_eq!(prices.len(), 2);
532 assert_eq!(prices[0], 100.0);
533 }
534
535 #[test]
536 fn test_columnar_buffer_from_events() {
537 let events = vec![
538 Arc::new(Event::new("Trade").with_field("price", 100.0)),
539 Arc::new(Event::new("Trade").with_field("price", 105.0)),
540 ];
541
542 let buffer = ColumnarBuffer::from_events(events);
543 assert_eq!(buffer.len(), 2);
544 assert_eq!(buffer.timestamps().len(), 2);
545 }
546
547 #[test]
548 fn test_column_type_accessors() {
549 let float_col = Column::Float64(vec![1.0, 2.0, 3.0]);
550 assert!(float_col.as_float64().is_some());
551 assert!(float_col.as_int64().is_none());
552 assert_eq!(float_col.len(), 3);
553
554 let int_col = Column::Int64(vec![1, 2, 3]);
555 assert!(int_col.as_int64().is_some());
556 assert!(int_col.as_float64().is_none());
557
558 let str_col = Column::String(vec![Some("a".into()), None]);
559 assert!(str_col.as_string().is_some());
560 assert_eq!(str_col.len(), 2);
561
562 let bool_col = Column::Bool(vec![Some(true), Some(false)]);
563 assert!(bool_col.as_bool().is_some());
564 }
565
566 #[test]
567 fn test_columnar_buffer_clear() {
568 let mut buffer = ColumnarBuffer::new();
569 buffer.push(Arc::new(Event::new("Trade").with_field("price", 100.0)));
570 buffer.ensure_float_column("price");
571
572 buffer.clear();
573
574 assert!(buffer.is_empty());
575 assert!(buffer.timestamps().is_empty());
576 assert!(!buffer.has_column("price"));
577 }
578
579 #[test]
580 fn test_columnar_buffer_with_capacity() {
581 let buffer = ColumnarBuffer::with_capacity(100);
582 assert!(buffer.is_empty());
583 }
585}