Skip to main content

allsource_core/infrastructure/persistence/
index.rs

1use crate::error::Result;
2use dashmap::DashMap;
3use std::sync::Arc;
4use uuid::Uuid;
5
6/// Event index entry
7#[derive(Debug, Clone)]
8pub struct IndexEntry {
9    pub event_id: Uuid,
10    pub offset: usize,
11    pub timestamp: chrono::DateTime<chrono::Utc>,
12}
13
14/// High-performance concurrent index for fast event lookups
15pub struct EventIndex {
16    /// Index by entity_id -> list of event entries
17    entity_index: Arc<DashMap<String, Vec<IndexEntry>>>,
18
19    /// Index by event_type -> list of event entries
20    type_index: Arc<DashMap<String, Vec<IndexEntry>>>,
21
22    /// Index by event_id -> offset (for direct lookups)
23    id_index: Arc<DashMap<Uuid, usize>>,
24
25    /// Total indexed events
26    total_events: parking_lot::RwLock<usize>,
27}
28
29impl EventIndex {
30    pub fn new() -> Self {
31        Self {
32            entity_index: Arc::new(DashMap::new()),
33            type_index: Arc::new(DashMap::new()),
34            id_index: Arc::new(DashMap::new()),
35            total_events: parking_lot::RwLock::new(0),
36        }
37    }
38
39    /// Add an event to all relevant indices
40    #[cfg_attr(feature = "hotpath", hotpath::measure)]
41    pub fn index_event(
42        &self,
43        event_id: Uuid,
44        entity_id: &str,
45        event_type: &str,
46        timestamp: chrono::DateTime<chrono::Utc>,
47        offset: usize,
48    ) -> Result<()> {
49        let entry = IndexEntry {
50            event_id,
51            offset,
52            timestamp,
53        };
54
55        // Index by entity_id
56        self.entity_index
57            .entry(entity_id.to_string())
58            .or_default()
59            .push(entry.clone());
60
61        // Index by event_type
62        self.type_index
63            .entry(event_type.to_string())
64            .or_default()
65            .push(entry.clone());
66
67        // Index by event_id
68        self.id_index.insert(event_id, offset);
69
70        // Increment total
71        let mut total = self.total_events.write();
72        *total += 1;
73
74        Ok(())
75    }
76
77    /// Get all event offsets for an entity
78    #[cfg_attr(feature = "hotpath", hotpath::measure)]
79    pub fn get_by_entity(&self, entity_id: &str) -> Option<Vec<IndexEntry>> {
80        self.entity_index
81            .get(entity_id)
82            .map(|entries| entries.clone())
83    }
84
85    /// Get all event offsets for an event type
86    #[cfg_attr(feature = "hotpath", hotpath::measure)]
87    pub fn get_by_type(&self, event_type: &str) -> Option<Vec<IndexEntry>> {
88        self.type_index
89            .get(event_type)
90            .map(|entries| entries.clone())
91    }
92
93    /// Get event offset by ID
94    #[cfg_attr(feature = "hotpath", hotpath::measure)]
95    pub fn get_by_id(&self, event_id: &Uuid) -> Option<usize> {
96        self.id_index.get(event_id).map(|offset| *offset)
97    }
98
99    /// Get all event entries matching a type prefix (e.g., "index." matches "index.created")
100    #[cfg_attr(feature = "hotpath", hotpath::measure)]
101    pub fn get_by_type_prefix(&self, prefix: &str) -> Vec<IndexEntry> {
102        let mut entries = Vec::new();
103        for item in self.type_index.iter() {
104            if item.key().starts_with(prefix) {
105                entries.extend(item.value().clone());
106            }
107        }
108        entries
109    }
110
111    /// Get all entities being tracked
112    pub fn get_all_entities(&self) -> Vec<String> {
113        self.entity_index.iter().map(|e| e.key().clone()).collect()
114    }
115
116    /// Get all event types
117    pub fn get_all_types(&self) -> Vec<String> {
118        self.type_index.iter().map(|e| e.key().clone()).collect()
119    }
120
121    /// Get statistics
122    pub fn stats(&self) -> IndexStats {
123        IndexStats {
124            total_events: *self.total_events.read(),
125            total_entities: self.entity_index.len(),
126            total_event_types: self.type_index.len(),
127        }
128    }
129
130    /// Clear all indices (useful for testing)
131    pub fn clear(&self) {
132        self.entity_index.clear();
133        self.type_index.clear();
134        self.id_index.clear();
135        let mut total = self.total_events.write();
136        *total = 0;
137    }
138}
139
140impl Default for EventIndex {
141    fn default() -> Self {
142        Self::new()
143    }
144}
145
146#[derive(Debug, Clone, serde::Serialize)]
147pub struct IndexStats {
148    pub total_events: usize,
149    pub total_entities: usize,
150    pub total_event_types: usize,
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[test]
158    fn test_index_event() {
159        let index = EventIndex::new();
160        let event_id = Uuid::new_v4();
161        let timestamp = chrono::Utc::now();
162
163        index
164            .index_event(event_id, "user-123", "user.created", timestamp, 0)
165            .unwrap();
166
167        assert_eq!(index.stats().total_events, 1);
168        assert_eq!(index.stats().total_entities, 1);
169        assert_eq!(index.stats().total_event_types, 1);
170    }
171
172    #[test]
173    fn test_get_by_entity() {
174        let index = EventIndex::new();
175        let event_id = Uuid::new_v4();
176        let timestamp = chrono::Utc::now();
177
178        index
179            .index_event(event_id, "user-123", "user.created", timestamp, 0)
180            .unwrap();
181
182        let entries = index.get_by_entity("user-123").unwrap();
183        assert_eq!(entries.len(), 1);
184        assert_eq!(entries[0].event_id, event_id);
185    }
186
187    #[test]
188    fn test_get_by_type() {
189        let index = EventIndex::new();
190        let event_id = Uuid::new_v4();
191        let timestamp = chrono::Utc::now();
192
193        index
194            .index_event(event_id, "user-123", "user.created", timestamp, 0)
195            .unwrap();
196
197        let entries = index.get_by_type("user.created").unwrap();
198        assert_eq!(entries.len(), 1);
199        assert_eq!(entries[0].event_id, event_id);
200    }
201
202    #[test]
203    fn test_get_by_type_prefix() {
204        let index = EventIndex::new();
205        let ts = chrono::Utc::now();
206
207        index
208            .index_event(Uuid::new_v4(), "e-1", "index.created", ts, 0)
209            .unwrap();
210        index
211            .index_event(Uuid::new_v4(), "e-2", "index.updated", ts, 1)
212            .unwrap();
213        index
214            .index_event(Uuid::new_v4(), "e-3", "trade.created", ts, 2)
215            .unwrap();
216
217        let entries = index.get_by_type_prefix("index.");
218        assert_eq!(entries.len(), 2);
219
220        let entries = index.get_by_type_prefix("trade.");
221        assert_eq!(entries.len(), 1);
222
223        let entries = index.get_by_type_prefix("nonexistent.");
224        assert_eq!(entries.len(), 0);
225
226        // Empty prefix matches all
227        let entries = index.get_by_type_prefix("");
228        assert_eq!(entries.len(), 3);
229    }
230}