allsource_core/application/services/
projection.rs1use crate::{
2 domain::entities::Event, error::Result, infrastructure::observability::metrics::MetricsRegistry,
3};
4use dashmap::DashMap;
5use serde_json::Value;
6use std::sync::Arc;
7
8pub trait Projection: Send + Sync {
10 fn name(&self) -> &str;
12
13 fn process(&self, event: &Event) -> Result<()>;
15
16 fn get_state(&self, entity_id: &str) -> Option<Value>;
18
19 fn clear(&self);
21}
22
23pub struct EntitySnapshotProjection {
25 name: String,
26 states: Arc<DashMap<String, (Value, chrono::DateTime<chrono::Utc>)>>,
28}
29
30impl EntitySnapshotProjection {
31 pub fn new(name: impl Into<String>) -> Self {
32 Self {
33 name: name.into(),
34 states: Arc::new(DashMap::new()),
35 }
36 }
37
38 pub fn get_all_states(&self) -> Vec<(String, Value)> {
40 self.states
41 .iter()
42 .map(|entry| (entry.key().clone(), entry.value().0.clone()))
43 .collect()
44 }
45}
46
47impl Projection for EntitySnapshotProjection {
48 fn name(&self) -> &str {
49 &self.name
50 }
51
52 fn process(&self, event: &Event) -> Result<()> {
53 self.states
59 .entry(event.entity_id_str().to_string())
60 .and_modify(|(state, last_ts)| {
61 if event.timestamp >= *last_ts {
62 if let (Value::Object(map), Value::Object(payload_map)) =
64 (state, &event.payload)
65 {
66 for (key, value) in payload_map {
67 map.insert(key.clone(), value.clone());
68 }
69 }
70 *last_ts = event.timestamp;
71 }
72 })
74 .or_insert_with(|| (event.payload.clone(), event.timestamp));
75
76 Ok(())
77 }
78
79 fn get_state(&self, entity_id: &str) -> Option<Value> {
80 self.states.get(entity_id).map(|v| v.0.clone())
81 }
82
83 fn clear(&self) {
84 self.states.clear();
85 }
86}
87
88pub struct EventCounterProjection {
90 name: String,
91 counts: Arc<DashMap<String, u64>>,
93}
94
95impl EventCounterProjection {
96 pub fn new(name: impl Into<String>) -> Self {
97 Self {
98 name: name.into(),
99 counts: Arc::new(DashMap::new()),
100 }
101 }
102
103 pub fn get_count(&self, event_type: &str) -> u64 {
105 self.counts.get(event_type).map_or(0, |v| *v)
106 }
107
108 pub fn get_all_counts(&self) -> Vec<(String, u64)> {
110 self.counts
111 .iter()
112 .map(|entry| (entry.key().clone(), *entry.value()))
113 .collect()
114 }
115}
116
117impl Projection for EventCounterProjection {
118 fn name(&self) -> &str {
119 &self.name
120 }
121
122 fn process(&self, event: &Event) -> Result<()> {
123 self.counts
124 .entry(event.event_type_str().to_string())
125 .and_modify(|count| *count += 1)
126 .or_insert(1);
127
128 Ok(())
129 }
130
131 fn get_state(&self, event_type: &str) -> Option<Value> {
132 self.counts
133 .get(event_type)
134 .map(|count| serde_json::json!({ "count": *count }))
135 }
136
137 fn clear(&self) {
138 self.counts.clear();
139 }
140}
141
142pub struct ProjectionManager {
144 projections: Vec<Arc<dyn Projection>>,
145 metrics: Arc<MetricsRegistry>,
146}
147
148impl ProjectionManager {
149 pub fn new() -> Self {
150 Self::with_metrics(MetricsRegistry::new())
151 }
152
153 pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
154 Self {
155 projections: Vec::new(),
156 metrics,
157 }
158 }
159
160 pub fn register(&mut self, projection: Arc<dyn Projection>) {
162 let name = projection.name();
163 tracing::info!("Registering projection: {}", name);
164 self.projections.push(projection);
165 self.metrics
166 .projections_total
167 .set(self.projections.len() as i64);
168 }
169
170 #[cfg_attr(feature = "hotpath", hotpath::measure)]
172 pub fn process_event(&self, event: &Event) -> Result<()> {
173 let timer = self.metrics.projection_duration_seconds.start_timer();
174
175 for projection in &self.projections {
176 let name = projection.name();
177
178 match projection.process(event) {
179 Ok(()) => {
180 self.metrics
181 .projection_events_processed
182 .with_label_values(&[name])
183 .inc();
184 }
185 Err(e) => {
186 self.metrics
187 .projection_errors_total
188 .with_label_values(&[name])
189 .inc();
190 tracing::error!(
191 "Projection '{}' failed to process event {}: {}",
192 name,
193 event.id,
194 e
195 );
196 }
198 }
199 }
200
201 timer.observe_duration();
202 Ok(())
203 }
204
205 pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
207 self.projections.iter().find(|p| p.name() == name).cloned()
208 }
209
210 pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
212 self.projections
213 .iter()
214 .map(|p| (p.name().to_string(), Arc::clone(p)))
215 .collect()
216 }
217
218 pub fn clear_all(&self) {
220 for projection in &self.projections {
221 projection.clear();
222 }
223 }
224}
225
226impl Default for ProjectionManager {
227 fn default() -> Self {
228 Self::new()
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use uuid::Uuid;
236
237 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
238 Event::reconstruct_from_strings(
239 Uuid::new_v4(),
240 event_type.to_string(),
241 entity_id.to_string(),
242 "default".to_string(),
243 serde_json::json!({
244 "name": "Test User",
245 "email": "test@example.com"
246 }),
247 chrono::Utc::now(),
248 None,
249 1,
250 )
251 }
252
253 #[test]
254 fn test_entity_snapshot_projection() {
255 let projection = EntitySnapshotProjection::new("test");
256 let event = create_test_event("user-123", "user.created");
257
258 projection.process(&event).unwrap();
259
260 let state = projection.get_state("user-123").unwrap();
261 assert_eq!(state["name"], "Test User");
262 }
263
264 #[test]
265 fn test_event_counter_projection() {
266 let projection = EventCounterProjection::new("counter");
267
268 let event1 = create_test_event("user-123", "user.created");
269 let event2 = create_test_event("user-456", "user.created");
270 let event3 = create_test_event("user-123", "user.updated");
271
272 projection.process(&event1).unwrap();
273 projection.process(&event2).unwrap();
274 projection.process(&event3).unwrap();
275
276 assert_eq!(projection.get_count("user.created"), 2);
277 assert_eq!(projection.get_count("user.updated"), 1);
278 }
279
280 #[test]
281 fn test_projection_manager() {
282 let mut manager = ProjectionManager::new();
283
284 let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
285 let counter = Arc::new(EventCounterProjection::new("counter"));
286
287 manager.register(snapshot.clone());
288 manager.register(counter.clone());
289
290 let event = create_test_event("user-123", "user.created");
291 manager.process_event(&event).unwrap();
292
293 assert!(snapshot.get_state("user-123").is_some());
294 assert_eq!(counter.get_count("user.created"), 1);
295 }
296}