Skip to main content

peat_protocol/event/
query.rs

1//! Event Query Protocol (ADR-027 Phase 3)
2//!
3//! Events with `PropagationMode::Query` are stored locally and accessible via query.
4//! Higher echelons can query subordinates for events that were not automatically propagated.
5//!
6//! ## Query Flow
7//!
8//! ```text
9//! Company → EventQuery → Platoon → Forward to Squad(s) → Collect Results
10//!    ↑                      ↓
11//!    └──────── EventQueryResponse ←──────────────────────┘
12//! ```
13//!
14//! ## Query Scopes
15//!
16//! - `Node { node_id }`: Query specific node's local store
17//! - `Formation { formation_id }`: Query all nodes in a formation
18//! - `Subordinates`: Query all direct subordinates of requester
19
20use peat_schema::event::v1::{
21    EventClass, EventFilters, EventQuery, EventQueryResponse, PeatEvent, QueryScope,
22};
23use std::collections::HashMap;
24use std::sync::{Arc, RwLock};
25use std::time::SystemTime;
26
27/// Trait for event storage backends
28pub trait EventStore: Send + Sync {
29    /// Query events matching the given filters
30    fn query(
31        &self,
32        event_class: Option<EventClass>,
33        event_type: Option<&str>,
34        after_seconds: Option<u64>,
35        before_seconds: Option<u64>,
36        source_instance_id: Option<&str>,
37        limit: u32,
38    ) -> Vec<PeatEvent>;
39
40    /// Store an event
41    fn store(&self, event: PeatEvent);
42
43    /// Get count of stored events
44    fn count(&self) -> usize;
45
46    /// Remove expired events (TTL enforcement)
47    fn remove_expired(&self);
48}
49
50/// In-memory event store implementation
51#[derive(Debug, Default)]
52pub struct InMemoryEventStore {
53    events: RwLock<HashMap<String, PeatEvent>>,
54}
55
56impl InMemoryEventStore {
57    /// Create a new in-memory event store
58    pub fn new() -> Self {
59        Self {
60            events: RwLock::new(HashMap::new()),
61        }
62    }
63}
64
65impl EventStore for InMemoryEventStore {
66    fn query(
67        &self,
68        event_class: Option<EventClass>,
69        event_type: Option<&str>,
70        after_seconds: Option<u64>,
71        before_seconds: Option<u64>,
72        source_instance_id: Option<&str>,
73        limit: u32,
74    ) -> Vec<PeatEvent> {
75        let events = self.events.read().unwrap();
76
77        let mut results: Vec<_> = events
78            .values()
79            .filter(|e| {
80                // Filter by event class
81                if let Some(class) = event_class {
82                    if e.event_class != class as i32 {
83                        return false;
84                    }
85                }
86
87                // Filter by event type
88                if let Some(et) = event_type {
89                    if !e.event_type.starts_with(et) {
90                        return false;
91                    }
92                }
93
94                // Filter by time range
95                if let Some(ts) = &e.timestamp {
96                    if let Some(after) = after_seconds {
97                        if ts.seconds < after {
98                            return false;
99                        }
100                    }
101                    if let Some(before) = before_seconds {
102                        if ts.seconds > before {
103                            return false;
104                        }
105                    }
106                }
107
108                // Filter by source instance
109                if let Some(sid) = source_instance_id {
110                    if e.source_instance_id.as_deref() != Some(sid) {
111                        return false;
112                    }
113                }
114
115                true
116            })
117            .cloned()
118            .collect();
119
120        // Sort by timestamp (newest first)
121        results.sort_by(|a, b| {
122            let ts_a = a.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
123            let ts_b = b.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
124            ts_b.cmp(&ts_a)
125        });
126
127        // Apply limit
128        if limit > 0 && results.len() > limit as usize {
129            results.truncate(limit as usize);
130        }
131
132        results
133    }
134
135    fn store(&self, event: PeatEvent) {
136        let mut events = self.events.write().unwrap();
137        events.insert(event.event_id.clone(), event);
138    }
139
140    fn count(&self) -> usize {
141        let events = self.events.read().unwrap();
142        events.len()
143    }
144
145    fn remove_expired(&self) {
146        let now = SystemTime::now()
147            .duration_since(SystemTime::UNIX_EPOCH)
148            .unwrap()
149            .as_secs();
150
151        let mut events = self.events.write().unwrap();
152        events.retain(|_, event| {
153            if let Some(routing) = &event.routing {
154                if routing.ttl_seconds > 0 {
155                    if let Some(ts) = &event.timestamp {
156                        let expiry = ts.seconds + routing.ttl_seconds as u64;
157                        return now < expiry;
158                    }
159                }
160            }
161            true // Keep events without TTL or timestamp
162        });
163    }
164}
165
166/// Handler for event queries
167///
168/// Processes incoming queries, applying filters to the local event store
169/// and optionally forwarding queries to subordinates.
170pub struct EventQueryHandler {
171    /// Node ID for this handler
172    node_id: String,
173
174    /// Formation ID this node belongs to
175    formation_id: String,
176
177    /// Local event store
178    event_store: Arc<dyn EventStore>,
179
180    /// Subordinate node IDs (for Subordinates scope queries)
181    subordinate_ids: RwLock<Vec<String>>,
182}
183
184impl EventQueryHandler {
185    /// Create a new query handler
186    pub fn new(node_id: String, formation_id: String, event_store: Arc<dyn EventStore>) -> Self {
187        Self {
188            node_id,
189            formation_id,
190            event_store,
191            subordinate_ids: RwLock::new(Vec::new()),
192        }
193    }
194
195    /// Create a new query handler with a default in-memory store
196    pub fn with_memory_store(node_id: String, formation_id: String) -> Self {
197        Self::new(node_id, formation_id, Arc::new(InMemoryEventStore::new()))
198    }
199
200    /// Register a subordinate node
201    pub fn add_subordinate(&self, node_id: &str) {
202        let mut subs = self.subordinate_ids.write().unwrap();
203        if !subs.contains(&node_id.to_string()) {
204            subs.push(node_id.to_string());
205        }
206    }
207
208    /// Remove a subordinate node
209    pub fn remove_subordinate(&self, node_id: &str) {
210        let mut subs = self.subordinate_ids.write().unwrap();
211        subs.retain(|id| id != node_id);
212    }
213
214    /// Get the node ID
215    pub fn node_id(&self) -> &str {
216        &self.node_id
217    }
218
219    /// Get the formation ID
220    pub fn formation_id(&self) -> &str {
221        &self.formation_id
222    }
223
224    /// Store an event in the local store
225    pub fn store_event(&self, event: PeatEvent) {
226        self.event_store.store(event);
227    }
228
229    /// Get count of stored events
230    pub fn event_count(&self) -> usize {
231        self.event_store.count()
232    }
233
234    /// Handle an incoming query
235    ///
236    /// For local queries (node_id matches), returns results from local store.
237    /// For subordinate queries, returns information about which nodes should be queried.
238    pub fn handle_query(&self, query: &EventQuery) -> QueryResult {
239        let scope = query.scope.as_ref();
240
241        // Determine query target
242        if let Some(scope) = scope {
243            if let Some(target) = &scope.target {
244                match target {
245                    peat_schema::event::v1::query_scope::Target::NodeId(node_id) => {
246                        if node_id == &self.node_id {
247                            // Query local store
248                            return QueryResult::Local(self.query_local(query));
249                        } else {
250                            // Forward to specific node
251                            return QueryResult::Forward(vec![node_id.clone()]);
252                        }
253                    }
254                    peat_schema::event::v1::query_scope::Target::FormationId(formation_id) => {
255                        if formation_id == &self.formation_id {
256                            // Query local store (we're in this formation)
257                            // Plus forward to subordinates
258                            let local_result = self.query_local(query);
259                            let subs = self.subordinate_ids.read().unwrap();
260                            if subs.is_empty() {
261                                return QueryResult::Local(local_result);
262                            } else {
263                                return QueryResult::LocalPlusForward(local_result, subs.clone());
264                            }
265                        } else {
266                            // Not our formation, forward to subordinates
267                            let subs = self.subordinate_ids.read().unwrap();
268                            return QueryResult::Forward(subs.clone());
269                        }
270                    }
271                    peat_schema::event::v1::query_scope::Target::Subordinates(_) => {
272                        // Query all subordinates
273                        let subs = self.subordinate_ids.read().unwrap();
274                        if subs.is_empty() {
275                            // No subordinates, return empty
276                            return QueryResult::Local(self.empty_response(query));
277                        } else {
278                            return QueryResult::Forward(subs.clone());
279                        }
280                    }
281                }
282            }
283        }
284
285        // Default: query local
286        QueryResult::Local(self.query_local(query))
287    }
288
289    /// Query the local event store
290    pub fn query_local(&self, query: &EventQuery) -> EventQueryResponse {
291        let filters = query.filters.as_ref();
292
293        let event_class = filters.and_then(|f| {
294            f.event_class
295                .map(|ec| EventClass::try_from(ec).unwrap_or(EventClass::Unspecified))
296        });
297        let event_type = filters.and_then(|f| f.event_type.as_deref());
298        let after_seconds = filters.and_then(|f| f.after_seconds);
299        let before_seconds = filters.and_then(|f| f.before_seconds);
300        let source_instance_id = filters.and_then(|f| f.source_instance_id.as_deref());
301        let limit = query.limit;
302
303        let events = self.event_store.query(
304            event_class,
305            event_type,
306            after_seconds,
307            before_seconds,
308            source_instance_id,
309            limit,
310        );
311
312        let total_matching = events.len() as u32;
313        let truncated = limit > 0 && total_matching >= limit;
314
315        EventQueryResponse {
316            query_id: query.query_id.clone(),
317            responder_id: self.node_id.clone(),
318            events,
319            total_matching,
320            truncated,
321        }
322    }
323
324    /// Create an empty response for queries with no results
325    fn empty_response(&self, query: &EventQuery) -> EventQueryResponse {
326        EventQueryResponse {
327            query_id: query.query_id.clone(),
328            responder_id: self.node_id.clone(),
329            events: vec![],
330            total_matching: 0,
331            truncated: false,
332        }
333    }
334
335    /// Merge multiple query responses into one
336    pub fn merge_responses(
337        query_id: &str,
338        responder_id: &str,
339        responses: Vec<EventQueryResponse>,
340        limit: u32,
341    ) -> EventQueryResponse {
342        let mut all_events: Vec<PeatEvent> = responses
343            .into_iter()
344            .flat_map(|r| r.events.into_iter())
345            .collect();
346
347        // Sort by timestamp (newest first)
348        all_events.sort_by(|a, b| {
349            let ts_a = a.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
350            let ts_b = b.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
351            ts_b.cmp(&ts_a)
352        });
353
354        let total_matching = all_events.len() as u32;
355        let truncated = limit > 0 && total_matching > limit;
356
357        if limit > 0 && all_events.len() > limit as usize {
358            all_events.truncate(limit as usize);
359        }
360
361        EventQueryResponse {
362            query_id: query_id.to_string(),
363            responder_id: responder_id.to_string(),
364            events: all_events,
365            total_matching,
366            truncated,
367        }
368    }
369
370    /// Create a query for a specific node
371    pub fn create_node_query(
372        requester_id: &str,
373        node_id: &str,
374        filters: Option<EventFilters>,
375        limit: u32,
376    ) -> EventQuery {
377        EventQuery {
378            query_id: generate_query_id(),
379            requester_id: requester_id.to_string(),
380            scope: Some(QueryScope {
381                target: Some(peat_schema::event::v1::query_scope::Target::NodeId(
382                    node_id.to_string(),
383                )),
384            }),
385            filters,
386            limit,
387        }
388    }
389
390    /// Create a query for a formation
391    pub fn create_formation_query(
392        requester_id: &str,
393        formation_id: &str,
394        filters: Option<EventFilters>,
395        limit: u32,
396    ) -> EventQuery {
397        EventQuery {
398            query_id: generate_query_id(),
399            requester_id: requester_id.to_string(),
400            scope: Some(QueryScope {
401                target: Some(peat_schema::event::v1::query_scope::Target::FormationId(
402                    formation_id.to_string(),
403                )),
404            }),
405            filters,
406            limit,
407        }
408    }
409
410    /// Create a query for all subordinates
411    pub fn create_subordinates_query(
412        requester_id: &str,
413        filters: Option<EventFilters>,
414        limit: u32,
415    ) -> EventQuery {
416        EventQuery {
417            query_id: generate_query_id(),
418            requester_id: requester_id.to_string(),
419            scope: Some(QueryScope {
420                target: Some(peat_schema::event::v1::query_scope::Target::Subordinates(
421                    true,
422                )),
423            }),
424            filters,
425            limit,
426        }
427    }
428
429    /// Remove expired events from the store
430    pub fn enforce_ttl(&self) {
431        self.event_store.remove_expired();
432    }
433}
434
435/// Result of handling a query
436#[derive(Debug)]
437pub enum QueryResult {
438    /// Query was handled locally, response ready
439    Local(EventQueryResponse),
440
441    /// Query should be forwarded to these node IDs
442    Forward(Vec<String>),
443
444    /// Query handled locally AND should be forwarded
445    LocalPlusForward(EventQueryResponse, Vec<String>),
446}
447
448/// Generate a unique query ID
449fn generate_query_id() -> String {
450    format!(
451        "qry-{}-{}",
452        std::process::id(),
453        SystemTime::now()
454            .duration_since(SystemTime::UNIX_EPOCH)
455            .unwrap()
456            .as_nanos()
457    )
458}
459
460/// Create event filters
461pub fn create_filters(
462    event_class: Option<EventClass>,
463    event_type: Option<&str>,
464    after_seconds: Option<u64>,
465    before_seconds: Option<u64>,
466    source_instance_id: Option<&str>,
467) -> EventFilters {
468    EventFilters {
469        event_class: event_class.map(|ec| ec as i32),
470        event_type: event_type.map(|s| s.to_string()),
471        after_seconds,
472        before_seconds,
473        source_instance_id: source_instance_id.map(|s| s.to_string()),
474    }
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480    use peat_schema::common::v1::Timestamp;
481    use peat_schema::event::v1::{AggregationPolicy, EventPriority, PropagationMode};
482
483    fn make_event(id: &str, event_type: &str, timestamp_seconds: u64) -> PeatEvent {
484        PeatEvent {
485            event_id: id.to_string(),
486            timestamp: Some(Timestamp {
487                seconds: timestamp_seconds,
488                nanos: 0,
489            }),
490            source_node_id: "node-1".to_string(),
491            source_formation_id: "squad-1".to_string(),
492            source_instance_id: Some("instance-1".to_string()),
493            event_class: EventClass::Product as i32,
494            event_type: event_type.to_string(),
495            routing: Some(AggregationPolicy {
496                propagation: PropagationMode::PropagationQuery as i32,
497                priority: EventPriority::PriorityNormal as i32,
498                ttl_seconds: 300,
499                aggregation_window_ms: 0,
500            }),
501            payload_type_url: String::new(),
502            payload_value: vec![],
503        }
504    }
505
506    #[test]
507    fn test_in_memory_store_basic() {
508        let store = InMemoryEventStore::new();
509
510        let event = make_event("evt-1", "detection", 1000);
511        store.store(event);
512
513        assert_eq!(store.count(), 1);
514
515        let results = store.query(None, None, None, None, None, 0);
516        assert_eq!(results.len(), 1);
517        assert_eq!(results[0].event_id, "evt-1");
518    }
519
520    #[test]
521    fn test_in_memory_store_filter_by_type() {
522        let store = InMemoryEventStore::new();
523
524        store.store(make_event("evt-1", "detection.vehicle", 1000));
525        store.store(make_event("evt-2", "telemetry.cpu", 1001));
526        store.store(make_event("evt-3", "detection.person", 1002));
527
528        let results = store.query(None, Some("detection"), None, None, None, 0);
529        assert_eq!(results.len(), 2);
530    }
531
532    #[test]
533    fn test_in_memory_store_filter_by_time() {
534        let store = InMemoryEventStore::new();
535
536        store.store(make_event("evt-1", "detection", 1000));
537        store.store(make_event("evt-2", "detection", 2000));
538        store.store(make_event("evt-3", "detection", 3000));
539
540        let results = store.query(None, None, Some(1500), Some(2500), None, 0);
541        assert_eq!(results.len(), 1);
542        assert_eq!(results[0].event_id, "evt-2");
543    }
544
545    #[test]
546    fn test_in_memory_store_limit() {
547        let store = InMemoryEventStore::new();
548
549        for i in 0..10 {
550            store.store(make_event(&format!("evt-{}", i), "detection", 1000 + i));
551        }
552
553        let results = store.query(None, None, None, None, None, 5);
554        assert_eq!(results.len(), 5);
555    }
556
557    #[test]
558    fn test_query_handler_local_query() {
559        let handler =
560            EventQueryHandler::with_memory_store("node-1".to_string(), "squad-1".to_string());
561
562        handler.store_event(make_event("evt-1", "detection", 1000));
563        handler.store_event(make_event("evt-2", "detection", 1001));
564
565        let query = EventQueryHandler::create_node_query("requester-1", "node-1", None, 0);
566
567        match handler.handle_query(&query) {
568            QueryResult::Local(response) => {
569                assert_eq!(response.events.len(), 2);
570                assert_eq!(response.responder_id, "node-1");
571            }
572            _ => panic!("Expected Local result"),
573        }
574    }
575
576    #[test]
577    fn test_query_handler_forward_to_node() {
578        let handler =
579            EventQueryHandler::with_memory_store("platoon-1".to_string(), "platoon-1".to_string());
580
581        handler.add_subordinate("squad-1");
582        handler.add_subordinate("squad-2");
583
584        let query = EventQueryHandler::create_node_query("requester-1", "squad-1", None, 0);
585
586        match handler.handle_query(&query) {
587            QueryResult::Forward(nodes) => {
588                assert_eq!(nodes.len(), 1);
589                assert_eq!(nodes[0], "squad-1");
590            }
591            _ => panic!("Expected Forward result"),
592        }
593    }
594
595    #[test]
596    fn test_query_handler_subordinates_query() {
597        let handler =
598            EventQueryHandler::with_memory_store("platoon-1".to_string(), "platoon-1".to_string());
599
600        handler.add_subordinate("squad-1");
601        handler.add_subordinate("squad-2");
602        handler.add_subordinate("squad-3");
603
604        let query = EventQueryHandler::create_subordinates_query("requester-1", None, 0);
605
606        match handler.handle_query(&query) {
607            QueryResult::Forward(nodes) => {
608                assert_eq!(nodes.len(), 3);
609                assert!(nodes.contains(&"squad-1".to_string()));
610                assert!(nodes.contains(&"squad-2".to_string()));
611                assert!(nodes.contains(&"squad-3".to_string()));
612            }
613            _ => panic!("Expected Forward result"),
614        }
615    }
616
617    #[test]
618    fn test_query_handler_formation_query_local() {
619        let handler =
620            EventQueryHandler::with_memory_store("node-1".to_string(), "squad-1".to_string());
621
622        handler.store_event(make_event("evt-1", "detection", 1000));
623
624        // Query for our own formation
625        let query = EventQueryHandler::create_formation_query("requester-1", "squad-1", None, 0);
626
627        match handler.handle_query(&query) {
628            QueryResult::Local(response) => {
629                assert_eq!(response.events.len(), 1);
630            }
631            _ => panic!("Expected Local result"),
632        }
633    }
634
635    #[test]
636    fn test_merge_responses() {
637        let resp1 = EventQueryResponse {
638            query_id: "qry-1".to_string(),
639            responder_id: "node-1".to_string(),
640            events: vec![make_event("evt-1", "detection", 1000)],
641            total_matching: 1,
642            truncated: false,
643        };
644
645        let resp2 = EventQueryResponse {
646            query_id: "qry-1".to_string(),
647            responder_id: "node-2".to_string(),
648            events: vec![
649                make_event("evt-2", "detection", 2000),
650                make_event("evt-3", "detection", 1500),
651            ],
652            total_matching: 2,
653            truncated: false,
654        };
655
656        let merged =
657            EventQueryHandler::merge_responses("qry-1", "platoon-1", vec![resp1, resp2], 0);
658
659        assert_eq!(merged.events.len(), 3);
660        assert_eq!(merged.total_matching, 3);
661        assert!(!merged.truncated);
662
663        // Should be sorted by timestamp descending
664        assert_eq!(merged.events[0].event_id, "evt-2"); // 2000
665        assert_eq!(merged.events[1].event_id, "evt-3"); // 1500
666        assert_eq!(merged.events[2].event_id, "evt-1"); // 1000
667    }
668
669    #[test]
670    fn test_merge_responses_with_limit() {
671        let resp1 = EventQueryResponse {
672            query_id: "qry-1".to_string(),
673            responder_id: "node-1".to_string(),
674            events: vec![
675                make_event("evt-1", "detection", 1000),
676                make_event("evt-2", "detection", 2000),
677            ],
678            total_matching: 2,
679            truncated: false,
680        };
681
682        let resp2 = EventQueryResponse {
683            query_id: "qry-1".to_string(),
684            responder_id: "node-2".to_string(),
685            events: vec![
686                make_event("evt-3", "detection", 3000),
687                make_event("evt-4", "detection", 4000),
688            ],
689            total_matching: 2,
690            truncated: false,
691        };
692
693        let merged =
694            EventQueryHandler::merge_responses("qry-1", "platoon-1", vec![resp1, resp2], 2);
695
696        assert_eq!(merged.events.len(), 2);
697        assert_eq!(merged.total_matching, 4);
698        assert!(merged.truncated);
699
700        // Should have the two most recent events
701        assert_eq!(merged.events[0].event_id, "evt-4"); // 4000
702        assert_eq!(merged.events[1].event_id, "evt-3"); // 3000
703    }
704
705    #[test]
706    fn test_create_filters() {
707        let filters = create_filters(
708            Some(EventClass::Product),
709            Some("detection"),
710            Some(1000),
711            Some(2000),
712            Some("instance-1"),
713        );
714
715        assert_eq!(filters.event_class, Some(EventClass::Product as i32));
716        assert_eq!(filters.event_type, Some("detection".to_string()));
717        assert_eq!(filters.after_seconds, Some(1000));
718        assert_eq!(filters.before_seconds, Some(2000));
719        assert_eq!(filters.source_instance_id, Some("instance-1".to_string()));
720    }
721
722    #[test]
723    fn test_ttl_enforcement() {
724        let store = InMemoryEventStore::new();
725
726        // Event with short TTL (already expired)
727        let mut event = make_event("evt-1", "detection", 1); // Very old timestamp
728        event.routing.as_mut().unwrap().ttl_seconds = 10;
729        store.store(event);
730
731        // Event without TTL (should be kept)
732        let mut event2 = make_event("evt-2", "detection", 1);
733        event2.routing.as_mut().unwrap().ttl_seconds = 0;
734        store.store(event2);
735
736        // Recent event with TTL (should be kept)
737        let now = SystemTime::now()
738            .duration_since(SystemTime::UNIX_EPOCH)
739            .unwrap()
740            .as_secs();
741        let mut event3 = make_event("evt-3", "detection", now);
742        event3.routing.as_mut().unwrap().ttl_seconds = 3600;
743        store.store(event3);
744
745        assert_eq!(store.count(), 3);
746        store.remove_expired();
747        assert_eq!(store.count(), 2); // evt-1 should be removed
748    }
749}