Skip to main content

allsource_core/infrastructure/persistence/
index.rs

1use crate::error::Result;
2use dashmap::DashMap;
3use std::sync::Arc;
4use uuid::Uuid;
5
6/// Event index entry
7#[derive(Debug, Clone)]
8pub struct IndexEntry {
9    pub event_id: Uuid,
10    pub offset: usize,
11    pub timestamp: chrono::DateTime<chrono::Utc>,
12}
13
14/// High-performance concurrent index for fast event lookups
15pub struct EventIndex {
16    /// Index by entity_id -> list of event entries
17    entity_index: Arc<DashMap<String, Vec<IndexEntry>>>,
18
19    /// Index by event_type -> list of event entries
20    type_index: Arc<DashMap<String, Vec<IndexEntry>>>,
21
22    /// Index by event_id -> offset (for direct lookups)
23    id_index: Arc<DashMap<Uuid, usize>>,
24
25    /// Total indexed events
26    total_events: parking_lot::RwLock<usize>,
27}
28
29impl EventIndex {
30    pub fn new() -> Self {
31        Self {
32            entity_index: Arc::new(DashMap::new()),
33            type_index: Arc::new(DashMap::new()),
34            id_index: Arc::new(DashMap::new()),
35            total_events: parking_lot::RwLock::new(0),
36        }
37    }
38
39    /// Add an event to all relevant indices
40    pub fn index_event(
41        &self,
42        event_id: Uuid,
43        entity_id: &str,
44        event_type: &str,
45        timestamp: chrono::DateTime<chrono::Utc>,
46        offset: usize,
47    ) -> Result<()> {
48        let entry = IndexEntry {
49            event_id,
50            offset,
51            timestamp,
52        };
53
54        // Index by entity_id
55        self.entity_index
56            .entry(entity_id.to_string())
57            .or_default()
58            .push(entry.clone());
59
60        // Index by event_type
61        self.type_index
62            .entry(event_type.to_string())
63            .or_default()
64            .push(entry.clone());
65
66        // Index by event_id
67        self.id_index.insert(event_id, offset);
68
69        // Increment total
70        let mut total = self.total_events.write();
71        *total += 1;
72
73        Ok(())
74    }
75
76    /// Get all event offsets for an entity
77    pub fn get_by_entity(&self, entity_id: &str) -> Option<Vec<IndexEntry>> {
78        self.entity_index
79            .get(entity_id)
80            .map(|entries| entries.clone())
81    }
82
83    /// Get all event offsets for an event type
84    pub fn get_by_type(&self, event_type: &str) -> Option<Vec<IndexEntry>> {
85        self.type_index
86            .get(event_type)
87            .map(|entries| entries.clone())
88    }
89
90    /// Get event offset by ID
91    pub fn get_by_id(&self, event_id: &Uuid) -> Option<usize> {
92        self.id_index.get(event_id).map(|offset| *offset)
93    }
94
95    /// Get all event entries matching a type prefix (e.g., "index." matches "index.created")
96    pub fn get_by_type_prefix(&self, prefix: &str) -> Vec<IndexEntry> {
97        let mut entries = Vec::new();
98        for item in self.type_index.iter() {
99            if item.key().starts_with(prefix) {
100                entries.extend(item.value().clone());
101            }
102        }
103        entries
104    }
105
106    /// Get all entities being tracked
107    pub fn get_all_entities(&self) -> Vec<String> {
108        self.entity_index.iter().map(|e| e.key().clone()).collect()
109    }
110
111    /// Get all event types
112    pub fn get_all_types(&self) -> Vec<String> {
113        self.type_index.iter().map(|e| e.key().clone()).collect()
114    }
115
116    /// Get statistics
117    pub fn stats(&self) -> IndexStats {
118        IndexStats {
119            total_events: *self.total_events.read(),
120            total_entities: self.entity_index.len(),
121            total_event_types: self.type_index.len(),
122        }
123    }
124
125    /// Clear all indices (useful for testing)
126    pub fn clear(&self) {
127        self.entity_index.clear();
128        self.type_index.clear();
129        self.id_index.clear();
130        let mut total = self.total_events.write();
131        *total = 0;
132    }
133}
134
135impl Default for EventIndex {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141#[derive(Debug, Clone, serde::Serialize)]
142pub struct IndexStats {
143    pub total_events: usize,
144    pub total_entities: usize,
145    pub total_event_types: usize,
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151
152    #[test]
153    fn test_index_event() {
154        let index = EventIndex::new();
155        let event_id = Uuid::new_v4();
156        let timestamp = chrono::Utc::now();
157
158        index
159            .index_event(event_id, "user-123", "user.created", timestamp, 0)
160            .unwrap();
161
162        assert_eq!(index.stats().total_events, 1);
163        assert_eq!(index.stats().total_entities, 1);
164        assert_eq!(index.stats().total_event_types, 1);
165    }
166
167    #[test]
168    fn test_get_by_entity() {
169        let index = EventIndex::new();
170        let event_id = Uuid::new_v4();
171        let timestamp = chrono::Utc::now();
172
173        index
174            .index_event(event_id, "user-123", "user.created", timestamp, 0)
175            .unwrap();
176
177        let entries = index.get_by_entity("user-123").unwrap();
178        assert_eq!(entries.len(), 1);
179        assert_eq!(entries[0].event_id, event_id);
180    }
181
182    #[test]
183    fn test_get_by_type() {
184        let index = EventIndex::new();
185        let event_id = Uuid::new_v4();
186        let timestamp = chrono::Utc::now();
187
188        index
189            .index_event(event_id, "user-123", "user.created", timestamp, 0)
190            .unwrap();
191
192        let entries = index.get_by_type("user.created").unwrap();
193        assert_eq!(entries.len(), 1);
194        assert_eq!(entries[0].event_id, event_id);
195    }
196
197    #[test]
198    fn test_get_by_type_prefix() {
199        let index = EventIndex::new();
200        let ts = chrono::Utc::now();
201
202        index
203            .index_event(Uuid::new_v4(), "e-1", "index.created", ts, 0)
204            .unwrap();
205        index
206            .index_event(Uuid::new_v4(), "e-2", "index.updated", ts, 1)
207            .unwrap();
208        index
209            .index_event(Uuid::new_v4(), "e-3", "trade.created", ts, 2)
210            .unwrap();
211
212        let entries = index.get_by_type_prefix("index.");
213        assert_eq!(entries.len(), 2);
214
215        let entries = index.get_by_type_prefix("trade.");
216        assert_eq!(entries.len(), 1);
217
218        let entries = index.get_by_type_prefix("nonexistent.");
219        assert_eq!(entries.len(), 0);
220
221        // Empty prefix matches all
222        let entries = index.get_by_type_prefix("");
223        assert_eq!(entries.len(), 3);
224    }
225}