Skip to main content

varpulis_runtime/
greta.rs

1//! # GRETA - Graph-based Real-time Event Trend Aggregation
2//!
3//! Foundation for online trend aggregation without explicit trend construction.
4//!
5//! ## References
6//!
7//! This implementation is based on:
8//!
9//! > **Olga Poppe, Chuan Lei, Elke A. Rundensteiner, and David Maier.**
10//! > *GRETA: Graph-based Real-time Event Trend Aggregation.*
11//! > Proceedings of the VLDB Endowment, Vol. 11, No. 1, pp. 80-92, 2017.
12//! > DOI: [10.14778/3151113.3151120](https://doi.org/10.14778/3151113.3151120)
13//!
14//! ## Overview
15//!
16//! GRETA solves the problem of computing aggregations over event trends (sequences
17//! matching a pattern) without explicitly constructing all matching trends, which
18//! can be exponential in the number of events.
19//!
20//! The key insight is that aggregations like COUNT, SUM, AVG can be computed
21//! incrementally by propagating partial results through an event graph, where:
22//! - Nodes represent events
23//! - Edges represent adjacency relations (event e' can precede event e in a trend)
24//!
25//! ## Key Concepts
26//!
27//! - **Event Graph**: Events as nodes, adjacency relations as edges
28//! - **Incremental Count Propagation**: `count(e) = start(e) + Σ count(e')` for predecessors e'
29//! - **Online Aggregation**: Results computed incrementally without constructing all trends
30//!
31//! ## Complexity
32//!
33//! - **Time**: O(n²) per query (vs O(2^n) for explicit trend construction)
34//! - **Space**: O(n) per query
35//!
36//! ## Usage in Varpulis
37//!
38//! This module provides the baseline non-shared aggregation that both the
39//! [`hamlet`](crate::hamlet) and [`zdd_unified`](crate::zdd_unified) approaches
40//! build upon for multi-query optimization.
41
42use std::sync::Arc;
43use std::time::Instant;
44
45use rustc_hash::FxHashMap;
46use smallvec::SmallVec;
47// Re-export shared types from varpulis-hamlet::greta so that both crates
48// agree on the same concrete types (avoids "similar names but distinct types").
49pub use varpulis_hamlet::greta::{GretaAggregate, NodeId, QueryId};
50
51use crate::event::SharedEvent;
52
53/// Type alias for graphlet identifiers
54pub type GraphletId = u32;
55
56/// An event node in the GRETA graph
57#[derive(Debug, Clone)]
58pub struct EventNode {
59    /// Unique identifier within the graph
60    pub id: NodeId,
61    /// The actual event
62    pub event: SharedEvent,
63    /// Event type index (for fast matching)
64    pub type_index: u16,
65    /// Timestamp for ordering
66    pub timestamp: Instant,
67    /// Predecessor edges (events that can precede this in a trend)
68    pub predecessors: SmallVec<[NodeId; 8]>,
69    /// Count of trends ending at this node, per query
70    /// Uses SmallVec for queries <= 8, heap for more
71    pub counts: SmallVec<[u64; 8]>,
72    /// Whether this is a start event for each query
73    pub is_start: SmallVec<[bool; 8]>,
74    /// Whether this is an end event for each query
75    pub is_end: SmallVec<[bool; 8]>,
76}
77
78impl EventNode {
79    /// Create a new event node
80    pub fn new(id: NodeId, event: SharedEvent, type_index: u16, num_queries: usize) -> Self {
81        Self {
82            id,
83            event,
84            type_index,
85            timestamp: Instant::now(),
86            predecessors: SmallVec::new(),
87            counts: SmallVec::from_elem(0, num_queries),
88            is_start: SmallVec::from_elem(false, num_queries),
89            is_end: SmallVec::from_elem(false, num_queries),
90        }
91    }
92
93    /// Get the count for a specific query
94    #[inline]
95    pub fn count(&self, query: QueryId) -> u64 {
96        self.counts.get(query as usize).copied().unwrap_or(0)
97    }
98
99    /// Set the count for a specific query
100    #[inline]
101    pub fn set_count(&mut self, query: QueryId, count: u64) {
102        if let Some(c) = self.counts.get_mut(query as usize) {
103            *c = count;
104        }
105    }
106
107    /// Add a predecessor edge
108    #[inline]
109    pub fn add_predecessor(&mut self, pred_id: NodeId) {
110        self.predecessors.push(pred_id);
111    }
112}
113
114/// The GRETA event graph for a single partition/window
115#[derive(Debug)]
116pub struct EventGraph {
117    /// All nodes in the graph, indexed by NodeId
118    nodes: Vec<EventNode>,
119    /// Nodes grouped by event type for fast lookup
120    nodes_by_type: FxHashMap<u16, Vec<NodeId>>,
121    /// Number of queries being tracked
122    num_queries: usize,
123    /// Final aggregated counts per query
124    final_counts: SmallVec<[u64; 8]>,
125    /// Node ID counter
126    next_node_id: NodeId,
127}
128
129impl EventGraph {
130    /// Create a new event graph
131    pub fn new(num_queries: usize) -> Self {
132        Self {
133            nodes: Vec::with_capacity(1024),
134            nodes_by_type: FxHashMap::default(),
135            num_queries,
136            final_counts: SmallVec::from_elem(0, num_queries),
137            next_node_id: 0,
138        }
139    }
140
141    /// Add an event to the graph
142    pub fn add_event(&mut self, event: SharedEvent, type_index: u16) -> NodeId {
143        let id = self.next_node_id;
144        self.next_node_id += 1;
145
146        let node = EventNode::new(id, event, type_index, self.num_queries);
147        self.nodes.push(node);
148
149        self.nodes_by_type.entry(type_index).or_default().push(id);
150
151        id
152    }
153
154    /// Get a node by ID
155    #[inline]
156    pub fn node(&self, id: NodeId) -> Option<&EventNode> {
157        self.nodes.get(id as usize)
158    }
159
160    /// Get a mutable node by ID
161    #[inline]
162    pub fn node_mut(&mut self, id: NodeId) -> Option<&mut EventNode> {
163        self.nodes.get_mut(id as usize)
164    }
165
166    /// Get all nodes of a specific type
167    #[inline]
168    pub fn nodes_of_type(&self, type_index: u16) -> &[NodeId] {
169        self.nodes_by_type
170            .get(&type_index)
171            .map_or(&[], |v| v.as_slice())
172    }
173
174    /// Propagate counts for a query (GRETA core algorithm)
175    ///
176    /// For each event e:
177    ///   count(e, q) = start(e, q) + Σ count(e', q) for all predecessors e'
178    pub fn propagate_counts(&mut self, query: QueryId) {
179        // Process nodes in timestamp order (they're already sorted by insertion)
180        for i in 0..self.nodes.len() {
181            let node = &self.nodes[i];
182            let is_start = node.is_start.get(query as usize).copied().unwrap_or(false);
183            let predecessors = node.predecessors.clone();
184
185            // Calculate count: start + sum of predecessor counts
186            let mut count = u64::from(is_start);
187            for pred_id in predecessors {
188                if let Some(pred) = self.nodes.get(pred_id as usize) {
189                    count = count.saturating_add(pred.count(query));
190                }
191            }
192
193            // Update count
194            if let Some(node) = self.nodes.get_mut(i) {
195                node.set_count(query, count);
196
197                // Update final count if this is an end node
198                if node.is_end.get(query as usize).copied().unwrap_or(false) {
199                    if let Some(fc) = self.final_counts.get_mut(query as usize) {
200                        *fc = fc.saturating_add(count);
201                    }
202                }
203            }
204        }
205    }
206
207    /// Get the final aggregated count for a query
208    #[inline]
209    pub fn final_count(&self, query: QueryId) -> u64 {
210        self.final_counts.get(query as usize).copied().unwrap_or(0)
211    }
212
213    /// Clear the graph for reuse
214    pub fn clear(&mut self) {
215        self.nodes.clear();
216        self.nodes_by_type.clear();
217        self.final_counts.fill(0);
218        self.next_node_id = 0;
219    }
220
221    /// Number of nodes in the graph
222    #[inline]
223    pub const fn len(&self) -> usize {
224        self.nodes.len()
225    }
226
227    /// Check if graph is empty
228    #[inline]
229    pub const fn is_empty(&self) -> bool {
230        self.nodes.is_empty()
231    }
232}
233
234/// Pattern ID type alias
235pub type PatternId = u32;
236
237/// Query definition for GRETA
238#[derive(Debug, Clone)]
239pub struct GretaQuery {
240    /// Unique query identifier
241    pub id: QueryId,
242    /// Pattern to match (references SASE+ pattern)
243    pub pattern_id: PatternId,
244    /// Event type indices in the pattern (in order)
245    pub event_types: SmallVec<[u16; 4]>,
246    /// Which types have Kleene closure
247    pub kleene_types: SmallVec<[u16; 4]>,
248    /// Aggregation function
249    pub aggregate: GretaAggregate,
250    /// Window size in milliseconds
251    pub window_ms: u64,
252    /// Slide size in milliseconds (for sliding windows)
253    pub slide_ms: u64,
254}
255
256impl GretaQuery {
257    /// Check if this query has Kleene patterns
258    #[inline]
259    pub fn has_kleene(&self) -> bool {
260        !self.kleene_types.is_empty()
261    }
262
263    /// Check if an event type is a start type for this query
264    #[inline]
265    pub fn is_start_type(&self, type_index: u16) -> bool {
266        self.event_types.first().copied() == Some(type_index)
267    }
268
269    /// Check if an event type is an end type for this query
270    #[inline]
271    pub fn is_end_type(&self, type_index: u16) -> bool {
272        self.event_types.last().copied() == Some(type_index)
273    }
274}
275
276/// GRETA executor for non-shared online trend aggregation
277#[derive(Debug)]
278pub struct GretaExecutor {
279    /// Registered queries
280    queries: Vec<GretaQuery>,
281    /// Event type name to index mapping
282    type_indices: FxHashMap<Arc<str>, u16>,
283    /// Current event graph
284    graph: EventGraph,
285}
286
287impl GretaExecutor {
288    /// Create a new GRETA executor
289    pub fn new() -> Self {
290        Self {
291            queries: Vec::new(),
292            type_indices: FxHashMap::default(),
293            graph: EventGraph::new(0),
294        }
295    }
296
297    /// Register a query
298    pub fn register_query(&mut self, query: GretaQuery) {
299        self.queries.push(query);
300        // Recreate graph with new query count
301        self.graph = EventGraph::new(self.queries.len());
302    }
303
304    /// Register an event type and get its index
305    pub fn register_type(&mut self, name: Arc<str>) -> u16 {
306        let len = self.type_indices.len() as u16;
307        *self.type_indices.entry(name).or_insert(len)
308    }
309
310    /// Get the type index for an event type name
311    #[inline]
312    pub fn type_index(&self, name: &str) -> Option<u16> {
313        self.type_indices.get(name).copied()
314    }
315
316    /// Process an event
317    pub fn process(&mut self, event: SharedEvent) -> Vec<(QueryId, u64)> {
318        let type_index = match self.type_index(&event.event_type) {
319            Some(idx) => idx,
320            None => return Vec::new(),
321        };
322
323        // Add event to graph
324        let node_id = self.graph.add_event(event, type_index);
325
326        // Mark start/end status for each query
327        for query in &self.queries {
328            if let Some(node) = self.graph.node_mut(node_id) {
329                if query.is_start_type(type_index) {
330                    if let Some(is_start) = node.is_start.get_mut(query.id as usize) {
331                        *is_start = true;
332                    }
333                }
334                if query.is_end_type(type_index) {
335                    if let Some(is_end) = node.is_end.get_mut(query.id as usize) {
336                        *is_end = true;
337                    }
338                }
339            }
340        }
341
342        // Add predecessor edges based on pattern structure
343        self.add_predecessor_edges(node_id, type_index);
344
345        // Propagate counts for all queries
346        let mut results = Vec::new();
347        for query in &self.queries {
348            self.graph.propagate_counts(query.id);
349            let count = self.graph.final_count(query.id);
350            if count > 0 {
351                results.push((query.id, count));
352            }
353        }
354
355        results
356    }
357
358    /// Add predecessor edges for a new node
359    fn add_predecessor_edges(&mut self, node_id: NodeId, type_index: u16) {
360        // Collect all predecessor IDs first to avoid borrow issues
361        let mut all_pred_ids: SmallVec<[NodeId; 16]> = SmallVec::new();
362
363        // For each query, find valid predecessors based on pattern structure
364        for query in &self.queries {
365            // Find position of this type in the pattern
366            let pos = query.event_types.iter().position(|&t| t == type_index);
367
368            if let Some(pos) = pos {
369                // Predecessors are events of the previous type in the pattern
370                // For Kleene, same type can also be a predecessor
371                let mut pred_types: SmallVec<[u16; 4]> = SmallVec::new();
372
373                if pos > 0 {
374                    pred_types.push(query.event_types[pos - 1]);
375                }
376
377                // If this type has Kleene, add self-loop
378                if query.kleene_types.contains(&type_index) {
379                    pred_types.push(type_index);
380                }
381
382                // Collect predecessor IDs
383                for pred_type in pred_types {
384                    for &pred_id in self.graph.nodes_of_type(pred_type) {
385                        if pred_id < node_id && !all_pred_ids.contains(&pred_id) {
386                            all_pred_ids.push(pred_id);
387                        }
388                    }
389                }
390            }
391        }
392
393        // Now add all predecessors
394        if let Some(node) = self.graph.node_mut(node_id) {
395            for pred_id in all_pred_ids {
396                node.add_predecessor(pred_id);
397            }
398        }
399    }
400
401    /// Flush window and get final results
402    pub fn flush(&mut self) -> Vec<(QueryId, u64)> {
403        let results: Vec<_> = self
404            .queries
405            .iter()
406            .map(|q| (q.id, self.graph.final_count(q.id)))
407            .filter(|(_, count)| *count > 0)
408            .collect();
409
410        self.graph.clear();
411        results
412    }
413}
414
415impl Default for GretaExecutor {
416    fn default() -> Self {
417        Self::new()
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424    use crate::event::Event;
425
426    #[test]
427    fn test_event_graph_basic() {
428        let mut graph = EventGraph::new(2);
429        assert!(graph.is_empty());
430
431        let event = Arc::new(Event::new("A"));
432        let id = graph.add_event(event, 0);
433        assert_eq!(id, 0);
434        assert_eq!(graph.len(), 1);
435    }
436
437    #[test]
438    fn test_greta_query() {
439        let query = GretaQuery {
440            id: 0,
441            pattern_id: 0,
442            event_types: smallvec::smallvec![0, 1],
443            kleene_types: smallvec::smallvec![1],
444            aggregate: GretaAggregate::CountTrends,
445            window_ms: 60000,
446            slide_ms: 60000,
447        };
448
449        assert!(query.has_kleene());
450        assert!(query.is_start_type(0));
451        assert!(query.is_end_type(1));
452    }
453}