allsource_core/
index.rs

1use crate::error::Result;
2use dashmap::DashMap;
3use std::sync::Arc;
4use uuid::Uuid;
5
6/// Event index entry
7#[derive(Debug, Clone)]
8pub struct IndexEntry {
9    pub event_id: Uuid,
10    pub offset: usize,
11    pub timestamp: chrono::DateTime<chrono::Utc>,
12}
13
14/// High-performance concurrent index for fast event lookups
15pub struct EventIndex {
16    /// Index by entity_id -> list of event entries
17    entity_index: Arc<DashMap<String, Vec<IndexEntry>>>,
18
19    /// Index by event_type -> list of event entries
20    type_index: Arc<DashMap<String, Vec<IndexEntry>>>,
21
22    /// Index by event_id -> offset (for direct lookups)
23    id_index: Arc<DashMap<Uuid, usize>>,
24
25    /// Total indexed events
26    total_events: parking_lot::RwLock<usize>,
27}
28
29impl EventIndex {
30    pub fn new() -> Self {
31        Self {
32            entity_index: Arc::new(DashMap::new()),
33            type_index: Arc::new(DashMap::new()),
34            id_index: Arc::new(DashMap::new()),
35            total_events: parking_lot::RwLock::new(0),
36        }
37    }
38
39    /// Add an event to all relevant indices
40    pub fn index_event(
41        &self,
42        event_id: Uuid,
43        entity_id: &str,
44        event_type: &str,
45        timestamp: chrono::DateTime<chrono::Utc>,
46        offset: usize,
47    ) -> Result<()> {
48        let entry = IndexEntry {
49            event_id,
50            offset,
51            timestamp,
52        };
53
54        // Index by entity_id
55        self.entity_index
56            .entry(entity_id.to_string())
57            .or_insert_with(Vec::new)
58            .push(entry.clone());
59
60        // Index by event_type
61        self.type_index
62            .entry(event_type.to_string())
63            .or_insert_with(Vec::new)
64            .push(entry.clone());
65
66        // Index by event_id
67        self.id_index.insert(event_id, offset);
68
69        // Increment total
70        let mut total = self.total_events.write();
71        *total += 1;
72
73        Ok(())
74    }
75
76    /// Get all event offsets for an entity
77    pub fn get_by_entity(&self, entity_id: &str) -> Option<Vec<IndexEntry>> {
78        self.entity_index
79            .get(entity_id)
80            .map(|entries| entries.clone())
81    }
82
83    /// Get all event offsets for an event type
84    pub fn get_by_type(&self, event_type: &str) -> Option<Vec<IndexEntry>> {
85        self.type_index
86            .get(event_type)
87            .map(|entries| entries.clone())
88    }
89
90    /// Get event offset by ID
91    pub fn get_by_id(&self, event_id: &Uuid) -> Option<usize> {
92        self.id_index.get(event_id).map(|offset| *offset)
93    }
94
95    /// Get all entities being tracked
96    pub fn get_all_entities(&self) -> Vec<String> {
97        self.entity_index.iter().map(|e| e.key().clone()).collect()
98    }
99
100    /// Get all event types
101    pub fn get_all_types(&self) -> Vec<String> {
102        self.type_index.iter().map(|e| e.key().clone()).collect()
103    }
104
105    /// Get statistics
106    pub fn stats(&self) -> IndexStats {
107        IndexStats {
108            total_events: *self.total_events.read(),
109            total_entities: self.entity_index.len(),
110            total_event_types: self.type_index.len(),
111        }
112    }
113
114    /// Clear all indices (useful for testing)
115    pub fn clear(&self) {
116        self.entity_index.clear();
117        self.type_index.clear();
118        self.id_index.clear();
119        let mut total = self.total_events.write();
120        *total = 0;
121    }
122}
123
124impl Default for EventIndex {
125    fn default() -> Self {
126        Self::new()
127    }
128}
129
130#[derive(Debug, Clone, serde::Serialize)]
131pub struct IndexStats {
132    pub total_events: usize,
133    pub total_entities: usize,
134    pub total_event_types: usize,
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[test]
142    fn test_index_event() {
143        let index = EventIndex::new();
144        let event_id = Uuid::new_v4();
145        let timestamp = chrono::Utc::now();
146
147        index
148            .index_event(event_id, "user-123", "user.created", timestamp, 0)
149            .unwrap();
150
151        assert_eq!(index.stats().total_events, 1);
152        assert_eq!(index.stats().total_entities, 1);
153        assert_eq!(index.stats().total_event_types, 1);
154    }
155
156    #[test]
157    fn test_get_by_entity() {
158        let index = EventIndex::new();
159        let event_id = Uuid::new_v4();
160        let timestamp = chrono::Utc::now();
161
162        index
163            .index_event(event_id, "user-123", "user.created", timestamp, 0)
164            .unwrap();
165
166        let entries = index.get_by_entity("user-123").unwrap();
167        assert_eq!(entries.len(), 1);
168        assert_eq!(entries[0].event_id, event_id);
169    }
170
171    #[test]
172    fn test_get_by_type() {
173        let index = EventIndex::new();
174        let event_id = Uuid::new_v4();
175        let timestamp = chrono::Utc::now();
176
177        index
178            .index_event(event_id, "user-123", "user.created", timestamp, 0)
179            .unwrap();
180
181        let entries = index.get_by_type("user.created").unwrap();
182        assert_eq!(entries.len(), 1);
183        assert_eq!(entries[0].event_id, event_id);
184    }
185}