allsource_core/infrastructure/persistence/
index.rs1use crate::error::Result;
2use dashmap::DashMap;
3use std::sync::Arc;
4use uuid::Uuid;
5
6#[derive(Debug, Clone)]
8pub struct IndexEntry {
9 pub event_id: Uuid,
10 pub offset: usize,
11 pub timestamp: chrono::DateTime<chrono::Utc>,
12}
13
14pub struct EventIndex {
16 entity_index: Arc<DashMap<String, Vec<IndexEntry>>>,
18
19 type_index: Arc<DashMap<String, Vec<IndexEntry>>>,
21
22 id_index: Arc<DashMap<Uuid, usize>>,
24
25 total_events: parking_lot::RwLock<usize>,
27}
28
29impl EventIndex {
30 pub fn new() -> Self {
31 Self {
32 entity_index: Arc::new(DashMap::new()),
33 type_index: Arc::new(DashMap::new()),
34 id_index: Arc::new(DashMap::new()),
35 total_events: parking_lot::RwLock::new(0),
36 }
37 }
38
39 pub fn index_event(
41 &self,
42 event_id: Uuid,
43 entity_id: &str,
44 event_type: &str,
45 timestamp: chrono::DateTime<chrono::Utc>,
46 offset: usize,
47 ) -> Result<()> {
48 let entry = IndexEntry {
49 event_id,
50 offset,
51 timestamp,
52 };
53
54 self.entity_index
56 .entry(entity_id.to_string())
57 .or_default()
58 .push(entry.clone());
59
60 self.type_index
62 .entry(event_type.to_string())
63 .or_default()
64 .push(entry.clone());
65
66 self.id_index.insert(event_id, offset);
68
69 let mut total = self.total_events.write();
71 *total += 1;
72
73 Ok(())
74 }
75
76 pub fn get_by_entity(&self, entity_id: &str) -> Option<Vec<IndexEntry>> {
78 self.entity_index
79 .get(entity_id)
80 .map(|entries| entries.clone())
81 }
82
83 pub fn get_by_type(&self, event_type: &str) -> Option<Vec<IndexEntry>> {
85 self.type_index
86 .get(event_type)
87 .map(|entries| entries.clone())
88 }
89
90 pub fn get_by_id(&self, event_id: &Uuid) -> Option<usize> {
92 self.id_index.get(event_id).map(|offset| *offset)
93 }
94
95 pub fn get_by_type_prefix(&self, prefix: &str) -> Vec<IndexEntry> {
97 let mut entries = Vec::new();
98 for item in self.type_index.iter() {
99 if item.key().starts_with(prefix) {
100 entries.extend(item.value().clone());
101 }
102 }
103 entries
104 }
105
106 pub fn get_all_entities(&self) -> Vec<String> {
108 self.entity_index.iter().map(|e| e.key().clone()).collect()
109 }
110
111 pub fn get_all_types(&self) -> Vec<String> {
113 self.type_index.iter().map(|e| e.key().clone()).collect()
114 }
115
116 pub fn stats(&self) -> IndexStats {
118 IndexStats {
119 total_events: *self.total_events.read(),
120 total_entities: self.entity_index.len(),
121 total_event_types: self.type_index.len(),
122 }
123 }
124
125 pub fn clear(&self) {
127 self.entity_index.clear();
128 self.type_index.clear();
129 self.id_index.clear();
130 let mut total = self.total_events.write();
131 *total = 0;
132 }
133}
134
135impl Default for EventIndex {
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141#[derive(Debug, Clone, serde::Serialize)]
142pub struct IndexStats {
143 pub total_events: usize,
144 pub total_entities: usize,
145 pub total_event_types: usize,
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151
152 #[test]
153 fn test_index_event() {
154 let index = EventIndex::new();
155 let event_id = Uuid::new_v4();
156 let timestamp = chrono::Utc::now();
157
158 index
159 .index_event(event_id, "user-123", "user.created", timestamp, 0)
160 .unwrap();
161
162 assert_eq!(index.stats().total_events, 1);
163 assert_eq!(index.stats().total_entities, 1);
164 assert_eq!(index.stats().total_event_types, 1);
165 }
166
167 #[test]
168 fn test_get_by_entity() {
169 let index = EventIndex::new();
170 let event_id = Uuid::new_v4();
171 let timestamp = chrono::Utc::now();
172
173 index
174 .index_event(event_id, "user-123", "user.created", timestamp, 0)
175 .unwrap();
176
177 let entries = index.get_by_entity("user-123").unwrap();
178 assert_eq!(entries.len(), 1);
179 assert_eq!(entries[0].event_id, event_id);
180 }
181
182 #[test]
183 fn test_get_by_type() {
184 let index = EventIndex::new();
185 let event_id = Uuid::new_v4();
186 let timestamp = chrono::Utc::now();
187
188 index
189 .index_event(event_id, "user-123", "user.created", timestamp, 0)
190 .unwrap();
191
192 let entries = index.get_by_type("user.created").unwrap();
193 assert_eq!(entries.len(), 1);
194 assert_eq!(entries[0].event_id, event_id);
195 }
196
197 #[test]
198 fn test_get_by_type_prefix() {
199 let index = EventIndex::new();
200 let ts = chrono::Utc::now();
201
202 index
203 .index_event(Uuid::new_v4(), "e-1", "index.created", ts, 0)
204 .unwrap();
205 index
206 .index_event(Uuid::new_v4(), "e-2", "index.updated", ts, 1)
207 .unwrap();
208 index
209 .index_event(Uuid::new_v4(), "e-3", "trade.created", ts, 2)
210 .unwrap();
211
212 let entries = index.get_by_type_prefix("index.");
213 assert_eq!(entries.len(), 2);
214
215 let entries = index.get_by_type_prefix("trade.");
216 assert_eq!(entries.len(), 1);
217
218 let entries = index.get_by_type_prefix("nonexistent.");
219 assert_eq!(entries.len(), 0);
220
221 let entries = index.get_by_type_prefix("");
223 assert_eq!(entries.len(), 3);
224 }
225}