1use std::sync::Arc;
43use std::time::Instant;
44
45use rustc_hash::FxHashMap;
46use smallvec::SmallVec;
47pub use varpulis_hamlet::greta::{GretaAggregate, NodeId, QueryId};
50
51use crate::event::SharedEvent;
52
53pub type GraphletId = u32;
55
56#[derive(Debug, Clone)]
58pub struct EventNode {
59 pub id: NodeId,
61 pub event: SharedEvent,
63 pub type_index: u16,
65 pub timestamp: Instant,
67 pub predecessors: SmallVec<[NodeId; 8]>,
69 pub counts: SmallVec<[u64; 8]>,
72 pub is_start: SmallVec<[bool; 8]>,
74 pub is_end: SmallVec<[bool; 8]>,
76}
77
78impl EventNode {
79 pub fn new(id: NodeId, event: SharedEvent, type_index: u16, num_queries: usize) -> Self {
81 Self {
82 id,
83 event,
84 type_index,
85 timestamp: Instant::now(),
86 predecessors: SmallVec::new(),
87 counts: SmallVec::from_elem(0, num_queries),
88 is_start: SmallVec::from_elem(false, num_queries),
89 is_end: SmallVec::from_elem(false, num_queries),
90 }
91 }
92
93 #[inline]
95 pub fn count(&self, query: QueryId) -> u64 {
96 self.counts.get(query as usize).copied().unwrap_or(0)
97 }
98
99 #[inline]
101 pub fn set_count(&mut self, query: QueryId, count: u64) {
102 if let Some(c) = self.counts.get_mut(query as usize) {
103 *c = count;
104 }
105 }
106
107 #[inline]
109 pub fn add_predecessor(&mut self, pred_id: NodeId) {
110 self.predecessors.push(pred_id);
111 }
112}
113
114#[derive(Debug)]
116pub struct EventGraph {
117 nodes: Vec<EventNode>,
119 nodes_by_type: FxHashMap<u16, Vec<NodeId>>,
121 num_queries: usize,
123 final_counts: SmallVec<[u64; 8]>,
125 next_node_id: NodeId,
127}
128
129impl EventGraph {
130 pub fn new(num_queries: usize) -> Self {
132 Self {
133 nodes: Vec::with_capacity(1024),
134 nodes_by_type: FxHashMap::default(),
135 num_queries,
136 final_counts: SmallVec::from_elem(0, num_queries),
137 next_node_id: 0,
138 }
139 }
140
141 pub fn add_event(&mut self, event: SharedEvent, type_index: u16) -> NodeId {
143 let id = self.next_node_id;
144 self.next_node_id += 1;
145
146 let node = EventNode::new(id, event, type_index, self.num_queries);
147 self.nodes.push(node);
148
149 self.nodes_by_type.entry(type_index).or_default().push(id);
150
151 id
152 }
153
154 #[inline]
156 pub fn node(&self, id: NodeId) -> Option<&EventNode> {
157 self.nodes.get(id as usize)
158 }
159
160 #[inline]
162 pub fn node_mut(&mut self, id: NodeId) -> Option<&mut EventNode> {
163 self.nodes.get_mut(id as usize)
164 }
165
166 #[inline]
168 pub fn nodes_of_type(&self, type_index: u16) -> &[NodeId] {
169 self.nodes_by_type
170 .get(&type_index)
171 .map_or(&[], |v| v.as_slice())
172 }
173
174 pub fn propagate_counts(&mut self, query: QueryId) {
179 for i in 0..self.nodes.len() {
181 let node = &self.nodes[i];
182 let is_start = node.is_start.get(query as usize).copied().unwrap_or(false);
183 let predecessors = node.predecessors.clone();
184
185 let mut count = u64::from(is_start);
187 for pred_id in predecessors {
188 if let Some(pred) = self.nodes.get(pred_id as usize) {
189 count = count.saturating_add(pred.count(query));
190 }
191 }
192
193 if let Some(node) = self.nodes.get_mut(i) {
195 node.set_count(query, count);
196
197 if node.is_end.get(query as usize).copied().unwrap_or(false) {
199 if let Some(fc) = self.final_counts.get_mut(query as usize) {
200 *fc = fc.saturating_add(count);
201 }
202 }
203 }
204 }
205 }
206
207 #[inline]
209 pub fn final_count(&self, query: QueryId) -> u64 {
210 self.final_counts.get(query as usize).copied().unwrap_or(0)
211 }
212
213 pub fn clear(&mut self) {
215 self.nodes.clear();
216 self.nodes_by_type.clear();
217 self.final_counts.fill(0);
218 self.next_node_id = 0;
219 }
220
221 #[inline]
223 pub const fn len(&self) -> usize {
224 self.nodes.len()
225 }
226
227 #[inline]
229 pub const fn is_empty(&self) -> bool {
230 self.nodes.is_empty()
231 }
232}
233
234pub type PatternId = u32;
236
237#[derive(Debug, Clone)]
239pub struct GretaQuery {
240 pub id: QueryId,
242 pub pattern_id: PatternId,
244 pub event_types: SmallVec<[u16; 4]>,
246 pub kleene_types: SmallVec<[u16; 4]>,
248 pub aggregate: GretaAggregate,
250 pub window_ms: u64,
252 pub slide_ms: u64,
254}
255
256impl GretaQuery {
257 #[inline]
259 pub fn has_kleene(&self) -> bool {
260 !self.kleene_types.is_empty()
261 }
262
263 #[inline]
265 pub fn is_start_type(&self, type_index: u16) -> bool {
266 self.event_types.first().copied() == Some(type_index)
267 }
268
269 #[inline]
271 pub fn is_end_type(&self, type_index: u16) -> bool {
272 self.event_types.last().copied() == Some(type_index)
273 }
274}
275
276#[derive(Debug)]
278pub struct GretaExecutor {
279 queries: Vec<GretaQuery>,
281 type_indices: FxHashMap<Arc<str>, u16>,
283 graph: EventGraph,
285}
286
287impl GretaExecutor {
288 pub fn new() -> Self {
290 Self {
291 queries: Vec::new(),
292 type_indices: FxHashMap::default(),
293 graph: EventGraph::new(0),
294 }
295 }
296
297 pub fn register_query(&mut self, query: GretaQuery) {
299 self.queries.push(query);
300 self.graph = EventGraph::new(self.queries.len());
302 }
303
304 pub fn register_type(&mut self, name: Arc<str>) -> u16 {
306 let len = self.type_indices.len() as u16;
307 *self.type_indices.entry(name).or_insert(len)
308 }
309
310 #[inline]
312 pub fn type_index(&self, name: &str) -> Option<u16> {
313 self.type_indices.get(name).copied()
314 }
315
316 pub fn process(&mut self, event: SharedEvent) -> Vec<(QueryId, u64)> {
318 let type_index = match self.type_index(&event.event_type) {
319 Some(idx) => idx,
320 None => return Vec::new(),
321 };
322
323 let node_id = self.graph.add_event(event, type_index);
325
326 for query in &self.queries {
328 if let Some(node) = self.graph.node_mut(node_id) {
329 if query.is_start_type(type_index) {
330 if let Some(is_start) = node.is_start.get_mut(query.id as usize) {
331 *is_start = true;
332 }
333 }
334 if query.is_end_type(type_index) {
335 if let Some(is_end) = node.is_end.get_mut(query.id as usize) {
336 *is_end = true;
337 }
338 }
339 }
340 }
341
342 self.add_predecessor_edges(node_id, type_index);
344
345 let mut results = Vec::new();
347 for query in &self.queries {
348 self.graph.propagate_counts(query.id);
349 let count = self.graph.final_count(query.id);
350 if count > 0 {
351 results.push((query.id, count));
352 }
353 }
354
355 results
356 }
357
358 fn add_predecessor_edges(&mut self, node_id: NodeId, type_index: u16) {
360 let mut all_pred_ids: SmallVec<[NodeId; 16]> = SmallVec::new();
362
363 for query in &self.queries {
365 let pos = query.event_types.iter().position(|&t| t == type_index);
367
368 if let Some(pos) = pos {
369 let mut pred_types: SmallVec<[u16; 4]> = SmallVec::new();
372
373 if pos > 0 {
374 pred_types.push(query.event_types[pos - 1]);
375 }
376
377 if query.kleene_types.contains(&type_index) {
379 pred_types.push(type_index);
380 }
381
382 for pred_type in pred_types {
384 for &pred_id in self.graph.nodes_of_type(pred_type) {
385 if pred_id < node_id && !all_pred_ids.contains(&pred_id) {
386 all_pred_ids.push(pred_id);
387 }
388 }
389 }
390 }
391 }
392
393 if let Some(node) = self.graph.node_mut(node_id) {
395 for pred_id in all_pred_ids {
396 node.add_predecessor(pred_id);
397 }
398 }
399 }
400
401 pub fn flush(&mut self) -> Vec<(QueryId, u64)> {
403 let results: Vec<_> = self
404 .queries
405 .iter()
406 .map(|q| (q.id, self.graph.final_count(q.id)))
407 .filter(|(_, count)| *count > 0)
408 .collect();
409
410 self.graph.clear();
411 results
412 }
413}
414
415impl Default for GretaExecutor {
416 fn default() -> Self {
417 Self::new()
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use crate::event::Event;
425
426 #[test]
427 fn test_event_graph_basic() {
428 let mut graph = EventGraph::new(2);
429 assert!(graph.is_empty());
430
431 let event = Arc::new(Event::new("A"));
432 let id = graph.add_event(event, 0);
433 assert_eq!(id, 0);
434 assert_eq!(graph.len(), 1);
435 }
436
437 #[test]
438 fn test_greta_query() {
439 let query = GretaQuery {
440 id: 0,
441 pattern_id: 0,
442 event_types: smallvec::smallvec![0, 1],
443 kleene_types: smallvec::smallvec![1],
444 aggregate: GretaAggregate::CountTrends,
445 window_ms: 60000,
446 slide_ms: 60000,
447 };
448
449 assert!(query.has_kleene());
450 assert!(query.is_start_type(0));
451 assert!(query.is_end_type(1));
452 }
453}