allsource_core/
projection.rs1use crate::error::Result;
2use crate::domain::entities::Event;
3use crate::metrics::MetricsRegistry;
4use dashmap::DashMap;
5use serde_json::Value;
6use std::sync::Arc;
7
8pub trait Projection: Send + Sync {
10 fn name(&self) -> &str;
12
13 fn process(&self, event: &Event) -> Result<()>;
15
16 fn get_state(&self, entity_id: &str) -> Option<Value>;
18
19 fn clear(&self);
21}
22
23pub struct EntitySnapshotProjection {
25 name: String,
26 states: Arc<DashMap<String, Value>>,
28}
29
30impl EntitySnapshotProjection {
31 pub fn new(name: impl Into<String>) -> Self {
32 Self {
33 name: name.into(),
34 states: Arc::new(DashMap::new()),
35 }
36 }
37
38 pub fn get_all_states(&self) -> Vec<(String, Value)> {
40 self.states
41 .iter()
42 .map(|entry| (entry.key().clone(), entry.value().clone()))
43 .collect()
44 }
45}
46
47impl Projection for EntitySnapshotProjection {
48 fn name(&self) -> &str {
49 &self.name
50 }
51
52 fn process(&self, event: &Event) -> Result<()> {
53 self.states
55 .entry(event.entity_id_str().to_string())
56 .and_modify(|state| {
57 if let Value::Object(ref mut map) = state {
59 if let Value::Object(ref payload_map) = event.payload {
60 for (key, value) in payload_map {
61 map.insert(key.clone(), value.clone());
62 }
63 }
64 }
65 })
66 .or_insert_with(|| event.payload.clone());
67
68 Ok(())
69 }
70
71 fn get_state(&self, entity_id: &str) -> Option<Value> {
72 self.states.get(entity_id).map(|v| v.clone())
73 }
74
75 fn clear(&self) {
76 self.states.clear();
77 }
78}
79
80pub struct EventCounterProjection {
82 name: String,
83 counts: Arc<DashMap<String, u64>>,
85}
86
87impl EventCounterProjection {
88 pub fn new(name: impl Into<String>) -> Self {
89 Self {
90 name: name.into(),
91 counts: Arc::new(DashMap::new()),
92 }
93 }
94
95 pub fn get_count(&self, event_type: &str) -> u64 {
97 self.counts
98 .get(event_type)
99 .map(|v| *v)
100 .unwrap_or(0)
101 }
102
103 pub fn get_all_counts(&self) -> Vec<(String, u64)> {
105 self.counts
106 .iter()
107 .map(|entry| (entry.key().clone(), *entry.value()))
108 .collect()
109 }
110}
111
112impl Projection for EventCounterProjection {
113 fn name(&self) -> &str {
114 &self.name
115 }
116
117 fn process(&self, event: &Event) -> Result<()> {
118 self.counts
119 .entry(event.event_type_str().to_string())
120 .and_modify(|count| *count += 1)
121 .or_insert(1);
122
123 Ok(())
124 }
125
126 fn get_state(&self, event_type: &str) -> Option<Value> {
127 self.counts
128 .get(event_type)
129 .map(|count| serde_json::json!({ "count": *count }))
130 }
131
132 fn clear(&self) {
133 self.counts.clear();
134 }
135}
136
137pub struct ProjectionManager {
139 projections: Vec<Arc<dyn Projection>>,
140 metrics: Arc<MetricsRegistry>,
141}
142
143impl ProjectionManager {
144 pub fn new() -> Self {
145 Self::with_metrics(MetricsRegistry::new())
146 }
147
148 pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
149 Self {
150 projections: Vec::new(),
151 metrics,
152 }
153 }
154
155 pub fn register(&mut self, projection: Arc<dyn Projection>) {
157 let name = projection.name();
158 tracing::info!("Registering projection: {}", name);
159 self.projections.push(projection);
160 self.metrics.projections_total.set(self.projections.len() as i64);
161 }
162
163 pub fn process_event(&self, event: &Event) -> Result<()> {
165 let timer = self.metrics.projection_duration_seconds.start_timer();
166
167 for projection in &self.projections {
168 let name = projection.name();
169
170 match projection.process(event) {
171 Ok(_) => {
172 self.metrics.projection_events_processed
173 .with_label_values(&[name])
174 .inc();
175 }
176 Err(e) => {
177 self.metrics.projection_errors_total
178 .with_label_values(&[name])
179 .inc();
180 tracing::error!(
181 "Projection '{}' failed to process event {}: {}",
182 name,
183 event.id,
184 e
185 );
186 }
188 }
189 }
190
191 timer.observe_duration();
192 Ok(())
193 }
194
195 pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
197 self.projections.iter().find(|p| p.name() == name).cloned()
198 }
199
200 pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
202 self.projections
203 .iter()
204 .map(|p| (p.name().to_string(), Arc::clone(p)))
205 .collect()
206 }
207
208 pub fn clear_all(&self) {
210 for projection in &self.projections {
211 projection.clear();
212 }
213 }
214}
215
216impl Default for ProjectionManager {
217 fn default() -> Self {
218 Self::new()
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use uuid::Uuid;
226
227 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
228 Event::reconstruct_from_strings(
229 Uuid::new_v4(),
230 event_type.to_string(),
231 entity_id.to_string(),
232 "default".to_string(),
233 serde_json::json!({
234 "name": "Test User",
235 "email": "test@example.com"
236 }),
237 chrono::Utc::now(),
238 None,
239 1,
240 )
241 }
242
243 #[test]
244 fn test_entity_snapshot_projection() {
245 let projection = EntitySnapshotProjection::new("test");
246 let event = create_test_event("user-123", "user.created");
247
248 projection.process(&event).unwrap();
249
250 let state = projection.get_state("user-123").unwrap();
251 assert_eq!(state["name"], "Test User");
252 }
253
254 #[test]
255 fn test_event_counter_projection() {
256 let projection = EventCounterProjection::new("counter");
257
258 let event1 = create_test_event("user-123", "user.created");
259 let event2 = create_test_event("user-456", "user.created");
260 let event3 = create_test_event("user-123", "user.updated");
261
262 projection.process(&event1).unwrap();
263 projection.process(&event2).unwrap();
264 projection.process(&event3).unwrap();
265
266 assert_eq!(projection.get_count("user.created"), 2);
267 assert_eq!(projection.get_count("user.updated"), 1);
268 }
269
270 #[test]
271 fn test_projection_manager() {
272 let mut manager = ProjectionManager::new();
273
274 let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
275 let counter = Arc::new(EventCounterProjection::new("counter"));
276
277 manager.register(snapshot.clone());
278 manager.register(counter.clone());
279
280 let event = create_test_event("user-123", "user.created");
281 manager.process_event(&event).unwrap();
282
283 assert!(snapshot.get_state("user-123").is_some());
284 assert_eq!(counter.get_count("user.created"), 1);
285 }
286}