1use peat_schema::event::v1::{
21 EventClass, EventFilters, EventQuery, EventQueryResponse, PeatEvent, QueryScope,
22};
23use std::collections::HashMap;
24use std::sync::{Arc, RwLock};
25use std::time::SystemTime;
26
27pub trait EventStore: Send + Sync {
29 fn query(
31 &self,
32 event_class: Option<EventClass>,
33 event_type: Option<&str>,
34 after_seconds: Option<u64>,
35 before_seconds: Option<u64>,
36 source_instance_id: Option<&str>,
37 limit: u32,
38 ) -> Vec<PeatEvent>;
39
40 fn store(&self, event: PeatEvent);
42
43 fn count(&self) -> usize;
45
46 fn remove_expired(&self);
48}
49
50#[derive(Debug, Default)]
52pub struct InMemoryEventStore {
53 events: RwLock<HashMap<String, PeatEvent>>,
54}
55
56impl InMemoryEventStore {
57 pub fn new() -> Self {
59 Self {
60 events: RwLock::new(HashMap::new()),
61 }
62 }
63}
64
65impl EventStore for InMemoryEventStore {
66 fn query(
67 &self,
68 event_class: Option<EventClass>,
69 event_type: Option<&str>,
70 after_seconds: Option<u64>,
71 before_seconds: Option<u64>,
72 source_instance_id: Option<&str>,
73 limit: u32,
74 ) -> Vec<PeatEvent> {
75 let events = self.events.read().unwrap();
76
77 let mut results: Vec<_> = events
78 .values()
79 .filter(|e| {
80 if let Some(class) = event_class {
82 if e.event_class != class as i32 {
83 return false;
84 }
85 }
86
87 if let Some(et) = event_type {
89 if !e.event_type.starts_with(et) {
90 return false;
91 }
92 }
93
94 if let Some(ts) = &e.timestamp {
96 if let Some(after) = after_seconds {
97 if ts.seconds < after {
98 return false;
99 }
100 }
101 if let Some(before) = before_seconds {
102 if ts.seconds > before {
103 return false;
104 }
105 }
106 }
107
108 if let Some(sid) = source_instance_id {
110 if e.source_instance_id.as_deref() != Some(sid) {
111 return false;
112 }
113 }
114
115 true
116 })
117 .cloned()
118 .collect();
119
120 results.sort_by(|a, b| {
122 let ts_a = a.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
123 let ts_b = b.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
124 ts_b.cmp(&ts_a)
125 });
126
127 if limit > 0 && results.len() > limit as usize {
129 results.truncate(limit as usize);
130 }
131
132 results
133 }
134
135 fn store(&self, event: PeatEvent) {
136 let mut events = self.events.write().unwrap();
137 events.insert(event.event_id.clone(), event);
138 }
139
140 fn count(&self) -> usize {
141 let events = self.events.read().unwrap();
142 events.len()
143 }
144
145 fn remove_expired(&self) {
146 let now = SystemTime::now()
147 .duration_since(SystemTime::UNIX_EPOCH)
148 .unwrap()
149 .as_secs();
150
151 let mut events = self.events.write().unwrap();
152 events.retain(|_, event| {
153 if let Some(routing) = &event.routing {
154 if routing.ttl_seconds > 0 {
155 if let Some(ts) = &event.timestamp {
156 let expiry = ts.seconds + routing.ttl_seconds as u64;
157 return now < expiry;
158 }
159 }
160 }
161 true });
163 }
164}
165
166pub struct EventQueryHandler {
171 node_id: String,
173
174 formation_id: String,
176
177 event_store: Arc<dyn EventStore>,
179
180 subordinate_ids: RwLock<Vec<String>>,
182}
183
184impl EventQueryHandler {
185 pub fn new(node_id: String, formation_id: String, event_store: Arc<dyn EventStore>) -> Self {
187 Self {
188 node_id,
189 formation_id,
190 event_store,
191 subordinate_ids: RwLock::new(Vec::new()),
192 }
193 }
194
195 pub fn with_memory_store(node_id: String, formation_id: String) -> Self {
197 Self::new(node_id, formation_id, Arc::new(InMemoryEventStore::new()))
198 }
199
200 pub fn add_subordinate(&self, node_id: &str) {
202 let mut subs = self.subordinate_ids.write().unwrap();
203 if !subs.contains(&node_id.to_string()) {
204 subs.push(node_id.to_string());
205 }
206 }
207
208 pub fn remove_subordinate(&self, node_id: &str) {
210 let mut subs = self.subordinate_ids.write().unwrap();
211 subs.retain(|id| id != node_id);
212 }
213
214 pub fn node_id(&self) -> &str {
216 &self.node_id
217 }
218
219 pub fn formation_id(&self) -> &str {
221 &self.formation_id
222 }
223
224 pub fn store_event(&self, event: PeatEvent) {
226 self.event_store.store(event);
227 }
228
229 pub fn event_count(&self) -> usize {
231 self.event_store.count()
232 }
233
234 pub fn handle_query(&self, query: &EventQuery) -> QueryResult {
239 let scope = query.scope.as_ref();
240
241 if let Some(scope) = scope {
243 if let Some(target) = &scope.target {
244 match target {
245 peat_schema::event::v1::query_scope::Target::NodeId(node_id) => {
246 if node_id == &self.node_id {
247 return QueryResult::Local(self.query_local(query));
249 } else {
250 return QueryResult::Forward(vec![node_id.clone()]);
252 }
253 }
254 peat_schema::event::v1::query_scope::Target::FormationId(formation_id) => {
255 if formation_id == &self.formation_id {
256 let local_result = self.query_local(query);
259 let subs = self.subordinate_ids.read().unwrap();
260 if subs.is_empty() {
261 return QueryResult::Local(local_result);
262 } else {
263 return QueryResult::LocalPlusForward(local_result, subs.clone());
264 }
265 } else {
266 let subs = self.subordinate_ids.read().unwrap();
268 return QueryResult::Forward(subs.clone());
269 }
270 }
271 peat_schema::event::v1::query_scope::Target::Subordinates(_) => {
272 let subs = self.subordinate_ids.read().unwrap();
274 if subs.is_empty() {
275 return QueryResult::Local(self.empty_response(query));
277 } else {
278 return QueryResult::Forward(subs.clone());
279 }
280 }
281 }
282 }
283 }
284
285 QueryResult::Local(self.query_local(query))
287 }
288
289 pub fn query_local(&self, query: &EventQuery) -> EventQueryResponse {
291 let filters = query.filters.as_ref();
292
293 let event_class = filters.and_then(|f| {
294 f.event_class
295 .map(|ec| EventClass::try_from(ec).unwrap_or(EventClass::Unspecified))
296 });
297 let event_type = filters.and_then(|f| f.event_type.as_deref());
298 let after_seconds = filters.and_then(|f| f.after_seconds);
299 let before_seconds = filters.and_then(|f| f.before_seconds);
300 let source_instance_id = filters.and_then(|f| f.source_instance_id.as_deref());
301 let limit = query.limit;
302
303 let events = self.event_store.query(
304 event_class,
305 event_type,
306 after_seconds,
307 before_seconds,
308 source_instance_id,
309 limit,
310 );
311
312 let total_matching = events.len() as u32;
313 let truncated = limit > 0 && total_matching >= limit;
314
315 EventQueryResponse {
316 query_id: query.query_id.clone(),
317 responder_id: self.node_id.clone(),
318 events,
319 total_matching,
320 truncated,
321 }
322 }
323
324 fn empty_response(&self, query: &EventQuery) -> EventQueryResponse {
326 EventQueryResponse {
327 query_id: query.query_id.clone(),
328 responder_id: self.node_id.clone(),
329 events: vec![],
330 total_matching: 0,
331 truncated: false,
332 }
333 }
334
335 pub fn merge_responses(
337 query_id: &str,
338 responder_id: &str,
339 responses: Vec<EventQueryResponse>,
340 limit: u32,
341 ) -> EventQueryResponse {
342 let mut all_events: Vec<PeatEvent> = responses
343 .into_iter()
344 .flat_map(|r| r.events.into_iter())
345 .collect();
346
347 all_events.sort_by(|a, b| {
349 let ts_a = a.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
350 let ts_b = b.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
351 ts_b.cmp(&ts_a)
352 });
353
354 let total_matching = all_events.len() as u32;
355 let truncated = limit > 0 && total_matching > limit;
356
357 if limit > 0 && all_events.len() > limit as usize {
358 all_events.truncate(limit as usize);
359 }
360
361 EventQueryResponse {
362 query_id: query_id.to_string(),
363 responder_id: responder_id.to_string(),
364 events: all_events,
365 total_matching,
366 truncated,
367 }
368 }
369
370 pub fn create_node_query(
372 requester_id: &str,
373 node_id: &str,
374 filters: Option<EventFilters>,
375 limit: u32,
376 ) -> EventQuery {
377 EventQuery {
378 query_id: generate_query_id(),
379 requester_id: requester_id.to_string(),
380 scope: Some(QueryScope {
381 target: Some(peat_schema::event::v1::query_scope::Target::NodeId(
382 node_id.to_string(),
383 )),
384 }),
385 filters,
386 limit,
387 }
388 }
389
390 pub fn create_formation_query(
392 requester_id: &str,
393 formation_id: &str,
394 filters: Option<EventFilters>,
395 limit: u32,
396 ) -> EventQuery {
397 EventQuery {
398 query_id: generate_query_id(),
399 requester_id: requester_id.to_string(),
400 scope: Some(QueryScope {
401 target: Some(peat_schema::event::v1::query_scope::Target::FormationId(
402 formation_id.to_string(),
403 )),
404 }),
405 filters,
406 limit,
407 }
408 }
409
410 pub fn create_subordinates_query(
412 requester_id: &str,
413 filters: Option<EventFilters>,
414 limit: u32,
415 ) -> EventQuery {
416 EventQuery {
417 query_id: generate_query_id(),
418 requester_id: requester_id.to_string(),
419 scope: Some(QueryScope {
420 target: Some(peat_schema::event::v1::query_scope::Target::Subordinates(
421 true,
422 )),
423 }),
424 filters,
425 limit,
426 }
427 }
428
429 pub fn enforce_ttl(&self) {
431 self.event_store.remove_expired();
432 }
433}
434
435#[derive(Debug)]
437pub enum QueryResult {
438 Local(EventQueryResponse),
440
441 Forward(Vec<String>),
443
444 LocalPlusForward(EventQueryResponse, Vec<String>),
446}
447
448fn generate_query_id() -> String {
450 format!(
451 "qry-{}-{}",
452 std::process::id(),
453 SystemTime::now()
454 .duration_since(SystemTime::UNIX_EPOCH)
455 .unwrap()
456 .as_nanos()
457 )
458}
459
460pub fn create_filters(
462 event_class: Option<EventClass>,
463 event_type: Option<&str>,
464 after_seconds: Option<u64>,
465 before_seconds: Option<u64>,
466 source_instance_id: Option<&str>,
467) -> EventFilters {
468 EventFilters {
469 event_class: event_class.map(|ec| ec as i32),
470 event_type: event_type.map(|s| s.to_string()),
471 after_seconds,
472 before_seconds,
473 source_instance_id: source_instance_id.map(|s| s.to_string()),
474 }
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480 use peat_schema::common::v1::Timestamp;
481 use peat_schema::event::v1::{AggregationPolicy, EventPriority, PropagationMode};
482
483 fn make_event(id: &str, event_type: &str, timestamp_seconds: u64) -> PeatEvent {
484 PeatEvent {
485 event_id: id.to_string(),
486 timestamp: Some(Timestamp {
487 seconds: timestamp_seconds,
488 nanos: 0,
489 }),
490 source_node_id: "node-1".to_string(),
491 source_formation_id: "squad-1".to_string(),
492 source_instance_id: Some("instance-1".to_string()),
493 event_class: EventClass::Product as i32,
494 event_type: event_type.to_string(),
495 routing: Some(AggregationPolicy {
496 propagation: PropagationMode::PropagationQuery as i32,
497 priority: EventPriority::PriorityNormal as i32,
498 ttl_seconds: 300,
499 aggregation_window_ms: 0,
500 }),
501 payload_type_url: String::new(),
502 payload_value: vec![],
503 }
504 }
505
506 #[test]
507 fn test_in_memory_store_basic() {
508 let store = InMemoryEventStore::new();
509
510 let event = make_event("evt-1", "detection", 1000);
511 store.store(event);
512
513 assert_eq!(store.count(), 1);
514
515 let results = store.query(None, None, None, None, None, 0);
516 assert_eq!(results.len(), 1);
517 assert_eq!(results[0].event_id, "evt-1");
518 }
519
520 #[test]
521 fn test_in_memory_store_filter_by_type() {
522 let store = InMemoryEventStore::new();
523
524 store.store(make_event("evt-1", "detection.vehicle", 1000));
525 store.store(make_event("evt-2", "telemetry.cpu", 1001));
526 store.store(make_event("evt-3", "detection.person", 1002));
527
528 let results = store.query(None, Some("detection"), None, None, None, 0);
529 assert_eq!(results.len(), 2);
530 }
531
532 #[test]
533 fn test_in_memory_store_filter_by_time() {
534 let store = InMemoryEventStore::new();
535
536 store.store(make_event("evt-1", "detection", 1000));
537 store.store(make_event("evt-2", "detection", 2000));
538 store.store(make_event("evt-3", "detection", 3000));
539
540 let results = store.query(None, None, Some(1500), Some(2500), None, 0);
541 assert_eq!(results.len(), 1);
542 assert_eq!(results[0].event_id, "evt-2");
543 }
544
545 #[test]
546 fn test_in_memory_store_limit() {
547 let store = InMemoryEventStore::new();
548
549 for i in 0..10 {
550 store.store(make_event(&format!("evt-{}", i), "detection", 1000 + i));
551 }
552
553 let results = store.query(None, None, None, None, None, 5);
554 assert_eq!(results.len(), 5);
555 }
556
557 #[test]
558 fn test_query_handler_local_query() {
559 let handler =
560 EventQueryHandler::with_memory_store("node-1".to_string(), "squad-1".to_string());
561
562 handler.store_event(make_event("evt-1", "detection", 1000));
563 handler.store_event(make_event("evt-2", "detection", 1001));
564
565 let query = EventQueryHandler::create_node_query("requester-1", "node-1", None, 0);
566
567 match handler.handle_query(&query) {
568 QueryResult::Local(response) => {
569 assert_eq!(response.events.len(), 2);
570 assert_eq!(response.responder_id, "node-1");
571 }
572 _ => panic!("Expected Local result"),
573 }
574 }
575
576 #[test]
577 fn test_query_handler_forward_to_node() {
578 let handler =
579 EventQueryHandler::with_memory_store("platoon-1".to_string(), "platoon-1".to_string());
580
581 handler.add_subordinate("squad-1");
582 handler.add_subordinate("squad-2");
583
584 let query = EventQueryHandler::create_node_query("requester-1", "squad-1", None, 0);
585
586 match handler.handle_query(&query) {
587 QueryResult::Forward(nodes) => {
588 assert_eq!(nodes.len(), 1);
589 assert_eq!(nodes[0], "squad-1");
590 }
591 _ => panic!("Expected Forward result"),
592 }
593 }
594
595 #[test]
596 fn test_query_handler_subordinates_query() {
597 let handler =
598 EventQueryHandler::with_memory_store("platoon-1".to_string(), "platoon-1".to_string());
599
600 handler.add_subordinate("squad-1");
601 handler.add_subordinate("squad-2");
602 handler.add_subordinate("squad-3");
603
604 let query = EventQueryHandler::create_subordinates_query("requester-1", None, 0);
605
606 match handler.handle_query(&query) {
607 QueryResult::Forward(nodes) => {
608 assert_eq!(nodes.len(), 3);
609 assert!(nodes.contains(&"squad-1".to_string()));
610 assert!(nodes.contains(&"squad-2".to_string()));
611 assert!(nodes.contains(&"squad-3".to_string()));
612 }
613 _ => panic!("Expected Forward result"),
614 }
615 }
616
617 #[test]
618 fn test_query_handler_formation_query_local() {
619 let handler =
620 EventQueryHandler::with_memory_store("node-1".to_string(), "squad-1".to_string());
621
622 handler.store_event(make_event("evt-1", "detection", 1000));
623
624 let query = EventQueryHandler::create_formation_query("requester-1", "squad-1", None, 0);
626
627 match handler.handle_query(&query) {
628 QueryResult::Local(response) => {
629 assert_eq!(response.events.len(), 1);
630 }
631 _ => panic!("Expected Local result"),
632 }
633 }
634
635 #[test]
636 fn test_merge_responses() {
637 let resp1 = EventQueryResponse {
638 query_id: "qry-1".to_string(),
639 responder_id: "node-1".to_string(),
640 events: vec![make_event("evt-1", "detection", 1000)],
641 total_matching: 1,
642 truncated: false,
643 };
644
645 let resp2 = EventQueryResponse {
646 query_id: "qry-1".to_string(),
647 responder_id: "node-2".to_string(),
648 events: vec![
649 make_event("evt-2", "detection", 2000),
650 make_event("evt-3", "detection", 1500),
651 ],
652 total_matching: 2,
653 truncated: false,
654 };
655
656 let merged =
657 EventQueryHandler::merge_responses("qry-1", "platoon-1", vec![resp1, resp2], 0);
658
659 assert_eq!(merged.events.len(), 3);
660 assert_eq!(merged.total_matching, 3);
661 assert!(!merged.truncated);
662
663 assert_eq!(merged.events[0].event_id, "evt-2"); assert_eq!(merged.events[1].event_id, "evt-3"); assert_eq!(merged.events[2].event_id, "evt-1"); }
668
669 #[test]
670 fn test_merge_responses_with_limit() {
671 let resp1 = EventQueryResponse {
672 query_id: "qry-1".to_string(),
673 responder_id: "node-1".to_string(),
674 events: vec![
675 make_event("evt-1", "detection", 1000),
676 make_event("evt-2", "detection", 2000),
677 ],
678 total_matching: 2,
679 truncated: false,
680 };
681
682 let resp2 = EventQueryResponse {
683 query_id: "qry-1".to_string(),
684 responder_id: "node-2".to_string(),
685 events: vec![
686 make_event("evt-3", "detection", 3000),
687 make_event("evt-4", "detection", 4000),
688 ],
689 total_matching: 2,
690 truncated: false,
691 };
692
693 let merged =
694 EventQueryHandler::merge_responses("qry-1", "platoon-1", vec![resp1, resp2], 2);
695
696 assert_eq!(merged.events.len(), 2);
697 assert_eq!(merged.total_matching, 4);
698 assert!(merged.truncated);
699
700 assert_eq!(merged.events[0].event_id, "evt-4"); assert_eq!(merged.events[1].event_id, "evt-3"); }
704
705 #[test]
706 fn test_create_filters() {
707 let filters = create_filters(
708 Some(EventClass::Product),
709 Some("detection"),
710 Some(1000),
711 Some(2000),
712 Some("instance-1"),
713 );
714
715 assert_eq!(filters.event_class, Some(EventClass::Product as i32));
716 assert_eq!(filters.event_type, Some("detection".to_string()));
717 assert_eq!(filters.after_seconds, Some(1000));
718 assert_eq!(filters.before_seconds, Some(2000));
719 assert_eq!(filters.source_instance_id, Some("instance-1".to_string()));
720 }
721
722 #[test]
723 fn test_ttl_enforcement() {
724 let store = InMemoryEventStore::new();
725
726 let mut event = make_event("evt-1", "detection", 1); event.routing.as_mut().unwrap().ttl_seconds = 10;
729 store.store(event);
730
731 let mut event2 = make_event("evt-2", "detection", 1);
733 event2.routing.as_mut().unwrap().ttl_seconds = 0;
734 store.store(event2);
735
736 let now = SystemTime::now()
738 .duration_since(SystemTime::UNIX_EPOCH)
739 .unwrap()
740 .as_secs();
741 let mut event3 = make_event("evt-3", "detection", now);
742 event3.routing.as_mut().unwrap().ttl_seconds = 3600;
743 store.store(event3);
744
745 assert_eq!(store.count(), 3);
746 store.remove_expired();
747 assert_eq!(store.count(), 2); }
749}