allsource_core/infrastructure/persistence/
index.rs1use crate::error::Result;
2use dashmap::DashMap;
3use std::sync::Arc;
4use uuid::Uuid;
5
6#[derive(Debug, Clone)]
8pub struct IndexEntry {
9 pub event_id: Uuid,
10 pub offset: usize,
11 pub timestamp: chrono::DateTime<chrono::Utc>,
12}
13
14pub struct EventIndex {
16 entity_index: Arc<DashMap<String, Vec<IndexEntry>>>,
18
19 type_index: Arc<DashMap<String, Vec<IndexEntry>>>,
21
22 id_index: Arc<DashMap<Uuid, usize>>,
24
25 total_events: parking_lot::RwLock<usize>,
27}
28
29impl EventIndex {
30 pub fn new() -> Self {
31 Self {
32 entity_index: Arc::new(DashMap::new()),
33 type_index: Arc::new(DashMap::new()),
34 id_index: Arc::new(DashMap::new()),
35 total_events: parking_lot::RwLock::new(0),
36 }
37 }
38
39 #[cfg_attr(feature = "hotpath", hotpath::measure)]
41 pub fn index_event(
42 &self,
43 event_id: Uuid,
44 entity_id: &str,
45 event_type: &str,
46 timestamp: chrono::DateTime<chrono::Utc>,
47 offset: usize,
48 ) -> Result<()> {
49 let entry = IndexEntry {
50 event_id,
51 offset,
52 timestamp,
53 };
54
55 self.entity_index
57 .entry(entity_id.to_string())
58 .or_default()
59 .push(entry.clone());
60
61 self.type_index
63 .entry(event_type.to_string())
64 .or_default()
65 .push(entry.clone());
66
67 self.id_index.insert(event_id, offset);
69
70 let mut total = self.total_events.write();
72 *total += 1;
73
74 Ok(())
75 }
76
77 #[cfg_attr(feature = "hotpath", hotpath::measure)]
79 pub fn get_by_entity(&self, entity_id: &str) -> Option<Vec<IndexEntry>> {
80 self.entity_index
81 .get(entity_id)
82 .map(|entries| entries.clone())
83 }
84
85 #[cfg_attr(feature = "hotpath", hotpath::measure)]
87 pub fn get_by_type(&self, event_type: &str) -> Option<Vec<IndexEntry>> {
88 self.type_index
89 .get(event_type)
90 .map(|entries| entries.clone())
91 }
92
93 #[cfg_attr(feature = "hotpath", hotpath::measure)]
95 pub fn get_by_id(&self, event_id: &Uuid) -> Option<usize> {
96 self.id_index.get(event_id).map(|offset| *offset)
97 }
98
99 #[cfg_attr(feature = "hotpath", hotpath::measure)]
101 pub fn get_by_type_prefix(&self, prefix: &str) -> Vec<IndexEntry> {
102 let mut entries = Vec::new();
103 for item in self.type_index.iter() {
104 if item.key().starts_with(prefix) {
105 entries.extend(item.value().clone());
106 }
107 }
108 entries
109 }
110
111 pub fn get_all_entities(&self) -> Vec<String> {
113 self.entity_index.iter().map(|e| e.key().clone()).collect()
114 }
115
116 pub fn get_all_types(&self) -> Vec<String> {
118 self.type_index.iter().map(|e| e.key().clone()).collect()
119 }
120
121 pub fn stats(&self) -> IndexStats {
123 IndexStats {
124 total_events: *self.total_events.read(),
125 total_entities: self.entity_index.len(),
126 total_event_types: self.type_index.len(),
127 }
128 }
129
130 pub fn clear(&self) {
132 self.entity_index.clear();
133 self.type_index.clear();
134 self.id_index.clear();
135 let mut total = self.total_events.write();
136 *total = 0;
137 }
138}
139
140impl Default for EventIndex {
141 fn default() -> Self {
142 Self::new()
143 }
144}
145
146#[derive(Debug, Clone, serde::Serialize)]
147pub struct IndexStats {
148 pub total_events: usize,
149 pub total_entities: usize,
150 pub total_event_types: usize,
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn test_index_event() {
159 let index = EventIndex::new();
160 let event_id = Uuid::new_v4();
161 let timestamp = chrono::Utc::now();
162
163 index
164 .index_event(event_id, "user-123", "user.created", timestamp, 0)
165 .unwrap();
166
167 assert_eq!(index.stats().total_events, 1);
168 assert_eq!(index.stats().total_entities, 1);
169 assert_eq!(index.stats().total_event_types, 1);
170 }
171
172 #[test]
173 fn test_get_by_entity() {
174 let index = EventIndex::new();
175 let event_id = Uuid::new_v4();
176 let timestamp = chrono::Utc::now();
177
178 index
179 .index_event(event_id, "user-123", "user.created", timestamp, 0)
180 .unwrap();
181
182 let entries = index.get_by_entity("user-123").unwrap();
183 assert_eq!(entries.len(), 1);
184 assert_eq!(entries[0].event_id, event_id);
185 }
186
187 #[test]
188 fn test_get_by_type() {
189 let index = EventIndex::new();
190 let event_id = Uuid::new_v4();
191 let timestamp = chrono::Utc::now();
192
193 index
194 .index_event(event_id, "user-123", "user.created", timestamp, 0)
195 .unwrap();
196
197 let entries = index.get_by_type("user.created").unwrap();
198 assert_eq!(entries.len(), 1);
199 assert_eq!(entries[0].event_id, event_id);
200 }
201
202 #[test]
203 fn test_get_by_type_prefix() {
204 let index = EventIndex::new();
205 let ts = chrono::Utc::now();
206
207 index
208 .index_event(Uuid::new_v4(), "e-1", "index.created", ts, 0)
209 .unwrap();
210 index
211 .index_event(Uuid::new_v4(), "e-2", "index.updated", ts, 1)
212 .unwrap();
213 index
214 .index_event(Uuid::new_v4(), "e-3", "trade.created", ts, 2)
215 .unwrap();
216
217 let entries = index.get_by_type_prefix("index.");
218 assert_eq!(entries.len(), 2);
219
220 let entries = index.get_by_type_prefix("trade.");
221 assert_eq!(entries.len(), 1);
222
223 let entries = index.get_by_type_prefix("nonexistent.");
224 assert_eq!(entries.len(), 0);
225
226 let entries = index.get_by_type_prefix("");
228 assert_eq!(entries.len(), 3);
229 }
230}