1use crate::error::Result;
2use dashmap::DashMap;
3use std::sync::Arc;
4use uuid::Uuid;
5
6#[derive(Debug, Clone)]
8pub struct IndexEntry {
9 pub event_id: Uuid,
10 pub offset: usize,
11 pub timestamp: chrono::DateTime<chrono::Utc>,
12}
13
14pub struct EventIndex {
16 entity_index: Arc<DashMap<String, Vec<IndexEntry>>>,
18
19 type_index: Arc<DashMap<String, Vec<IndexEntry>>>,
21
22 id_index: Arc<DashMap<Uuid, usize>>,
24
25 total_events: parking_lot::RwLock<usize>,
27}
28
29impl EventIndex {
30 pub fn new() -> Self {
31 Self {
32 entity_index: Arc::new(DashMap::new()),
33 type_index: Arc::new(DashMap::new()),
34 id_index: Arc::new(DashMap::new()),
35 total_events: parking_lot::RwLock::new(0),
36 }
37 }
38
39 pub fn index_event(
41 &self,
42 event_id: Uuid,
43 entity_id: &str,
44 event_type: &str,
45 timestamp: chrono::DateTime<chrono::Utc>,
46 offset: usize,
47 ) -> Result<()> {
48 let entry = IndexEntry {
49 event_id,
50 offset,
51 timestamp,
52 };
53
54 self.entity_index
56 .entry(entity_id.to_string())
57 .or_insert_with(Vec::new)
58 .push(entry.clone());
59
60 self.type_index
62 .entry(event_type.to_string())
63 .or_insert_with(Vec::new)
64 .push(entry.clone());
65
66 self.id_index.insert(event_id, offset);
68
69 let mut total = self.total_events.write();
71 *total += 1;
72
73 Ok(())
74 }
75
76 pub fn get_by_entity(&self, entity_id: &str) -> Option<Vec<IndexEntry>> {
78 self.entity_index
79 .get(entity_id)
80 .map(|entries| entries.clone())
81 }
82
83 pub fn get_by_type(&self, event_type: &str) -> Option<Vec<IndexEntry>> {
85 self.type_index
86 .get(event_type)
87 .map(|entries| entries.clone())
88 }
89
90 pub fn get_by_id(&self, event_id: &Uuid) -> Option<usize> {
92 self.id_index.get(event_id).map(|offset| *offset)
93 }
94
95 pub fn get_all_entities(&self) -> Vec<String> {
97 self.entity_index.iter().map(|e| e.key().clone()).collect()
98 }
99
100 pub fn get_all_types(&self) -> Vec<String> {
102 self.type_index.iter().map(|e| e.key().clone()).collect()
103 }
104
105 pub fn stats(&self) -> IndexStats {
107 IndexStats {
108 total_events: *self.total_events.read(),
109 total_entities: self.entity_index.len(),
110 total_event_types: self.type_index.len(),
111 }
112 }
113
114 pub fn clear(&self) {
116 self.entity_index.clear();
117 self.type_index.clear();
118 self.id_index.clear();
119 let mut total = self.total_events.write();
120 *total = 0;
121 }
122}
123
124impl Default for EventIndex {
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130#[derive(Debug, Clone, serde::Serialize)]
131pub struct IndexStats {
132 pub total_events: usize,
133 pub total_entities: usize,
134 pub total_event_types: usize,
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140
141 #[test]
142 fn test_index_event() {
143 let index = EventIndex::new();
144 let event_id = Uuid::new_v4();
145 let timestamp = chrono::Utc::now();
146
147 index
148 .index_event(event_id, "user-123", "user.created", timestamp, 0)
149 .unwrap();
150
151 assert_eq!(index.stats().total_events, 1);
152 assert_eq!(index.stats().total_entities, 1);
153 assert_eq!(index.stats().total_event_types, 1);
154 }
155
156 #[test]
157 fn test_get_by_entity() {
158 let index = EventIndex::new();
159 let event_id = Uuid::new_v4();
160 let timestamp = chrono::Utc::now();
161
162 index
163 .index_event(event_id, "user-123", "user.created", timestamp, 0)
164 .unwrap();
165
166 let entries = index.get_by_entity("user-123").unwrap();
167 assert_eq!(entries.len(), 1);
168 assert_eq!(entries[0].event_id, event_id);
169 }
170
171 #[test]
172 fn test_get_by_type() {
173 let index = EventIndex::new();
174 let event_id = Uuid::new_v4();
175 let timestamp = chrono::Utc::now();
176
177 index
178 .index_event(event_id, "user-123", "user.created", timestamp, 0)
179 .unwrap();
180
181 let entries = index.get_by_type("user.created").unwrap();
182 assert_eq!(entries.len(), 1);
183 assert_eq!(entries[0].event_id, event_id);
184 }
185}