allsource_core/application/services/
projection.rs1use crate::{
2 domain::entities::Event, error::Result, infrastructure::observability::metrics::MetricsRegistry,
3};
4use dashmap::DashMap;
5use serde_json::Value;
6use std::sync::Arc;
7
8pub trait Projection: Send + Sync {
10 fn name(&self) -> &str;
12
13 fn process(&self, event: &Event) -> Result<()>;
15
16 fn get_state(&self, entity_id: &str) -> Option<Value>;
18
19 fn clear(&self);
21}
22
23pub struct EntitySnapshotProjection {
25 name: String,
26 states: Arc<DashMap<String, (Value, chrono::DateTime<chrono::Utc>)>>,
28}
29
30impl EntitySnapshotProjection {
31 pub fn new(name: impl Into<String>) -> Self {
32 Self {
33 name: name.into(),
34 states: Arc::new(DashMap::new()),
35 }
36 }
37
38 pub fn get_all_states(&self) -> Vec<(String, Value)> {
40 self.states
41 .iter()
42 .map(|entry| (entry.key().clone(), entry.value().0.clone()))
43 .collect()
44 }
45}
46
47impl Projection for EntitySnapshotProjection {
48 fn name(&self) -> &str {
49 &self.name
50 }
51
52 fn process(&self, event: &Event) -> Result<()> {
53 self.states
59 .entry(event.entity_id_str().to_string())
60 .and_modify(|(state, last_ts)| {
61 if event.timestamp >= *last_ts {
62 if let (Value::Object(map), Value::Object(payload_map)) =
64 (state, &event.payload)
65 {
66 for (key, value) in payload_map {
67 map.insert(key.clone(), value.clone());
68 }
69 }
70 *last_ts = event.timestamp;
71 }
72 })
74 .or_insert_with(|| (event.payload.clone(), event.timestamp));
75
76 Ok(())
77 }
78
79 fn get_state(&self, entity_id: &str) -> Option<Value> {
80 self.states.get(entity_id).map(|v| v.0.clone())
81 }
82
83 fn clear(&self) {
84 self.states.clear();
85 }
86}
87
88pub struct EventCounterProjection {
90 name: String,
91 counts: Arc<DashMap<String, u64>>,
93}
94
95impl EventCounterProjection {
96 pub fn new(name: impl Into<String>) -> Self {
97 Self {
98 name: name.into(),
99 counts: Arc::new(DashMap::new()),
100 }
101 }
102
103 pub fn get_count(&self, event_type: &str) -> u64 {
105 self.counts.get(event_type).map(|v| *v).unwrap_or(0)
106 }
107
108 pub fn get_all_counts(&self) -> Vec<(String, u64)> {
110 self.counts
111 .iter()
112 .map(|entry| (entry.key().clone(), *entry.value()))
113 .collect()
114 }
115}
116
117impl Projection for EventCounterProjection {
118 fn name(&self) -> &str {
119 &self.name
120 }
121
122 fn process(&self, event: &Event) -> Result<()> {
123 self.counts
124 .entry(event.event_type_str().to_string())
125 .and_modify(|count| *count += 1)
126 .or_insert(1);
127
128 Ok(())
129 }
130
131 fn get_state(&self, event_type: &str) -> Option<Value> {
132 self.counts
133 .get(event_type)
134 .map(|count| serde_json::json!({ "count": *count }))
135 }
136
137 fn clear(&self) {
138 self.counts.clear();
139 }
140}
141
142pub struct ProjectionManager {
144 projections: Vec<Arc<dyn Projection>>,
145 metrics: Arc<MetricsRegistry>,
146}
147
148impl ProjectionManager {
149 pub fn new() -> Self {
150 Self::with_metrics(MetricsRegistry::new())
151 }
152
153 pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
154 Self {
155 projections: Vec::new(),
156 metrics,
157 }
158 }
159
160 pub fn register(&mut self, projection: Arc<dyn Projection>) {
162 let name = projection.name();
163 tracing::info!("Registering projection: {}", name);
164 self.projections.push(projection);
165 self.metrics
166 .projections_total
167 .set(self.projections.len() as i64);
168 }
169
170 pub fn process_event(&self, event: &Event) -> Result<()> {
172 let timer = self.metrics.projection_duration_seconds.start_timer();
173
174 for projection in &self.projections {
175 let name = projection.name();
176
177 match projection.process(event) {
178 Ok(_) => {
179 self.metrics
180 .projection_events_processed
181 .with_label_values(&[name])
182 .inc();
183 }
184 Err(e) => {
185 self.metrics
186 .projection_errors_total
187 .with_label_values(&[name])
188 .inc();
189 tracing::error!(
190 "Projection '{}' failed to process event {}: {}",
191 name,
192 event.id,
193 e
194 );
195 }
197 }
198 }
199
200 timer.observe_duration();
201 Ok(())
202 }
203
204 pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
206 self.projections.iter().find(|p| p.name() == name).cloned()
207 }
208
209 pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
211 self.projections
212 .iter()
213 .map(|p| (p.name().to_string(), Arc::clone(p)))
214 .collect()
215 }
216
217 pub fn clear_all(&self) {
219 for projection in &self.projections {
220 projection.clear();
221 }
222 }
223}
224
225impl Default for ProjectionManager {
226 fn default() -> Self {
227 Self::new()
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use uuid::Uuid;
235
236 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
237 Event::reconstruct_from_strings(
238 Uuid::new_v4(),
239 event_type.to_string(),
240 entity_id.to_string(),
241 "default".to_string(),
242 serde_json::json!({
243 "name": "Test User",
244 "email": "test@example.com"
245 }),
246 chrono::Utc::now(),
247 None,
248 1,
249 )
250 }
251
252 #[test]
253 fn test_entity_snapshot_projection() {
254 let projection = EntitySnapshotProjection::new("test");
255 let event = create_test_event("user-123", "user.created");
256
257 projection.process(&event).unwrap();
258
259 let state = projection.get_state("user-123").unwrap();
260 assert_eq!(state["name"], "Test User");
261 }
262
263 #[test]
264 fn test_event_counter_projection() {
265 let projection = EventCounterProjection::new("counter");
266
267 let event1 = create_test_event("user-123", "user.created");
268 let event2 = create_test_event("user-456", "user.created");
269 let event3 = create_test_event("user-123", "user.updated");
270
271 projection.process(&event1).unwrap();
272 projection.process(&event2).unwrap();
273 projection.process(&event3).unwrap();
274
275 assert_eq!(projection.get_count("user.created"), 2);
276 assert_eq!(projection.get_count("user.updated"), 1);
277 }
278
279 #[test]
280 fn test_projection_manager() {
281 let mut manager = ProjectionManager::new();
282
283 let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
284 let counter = Arc::new(EventCounterProjection::new("counter"));
285
286 manager.register(snapshot.clone());
287 manager.register(counter.clone());
288
289 let event = create_test_event("user-123", "user.created");
290 manager.process_event(&event).unwrap();
291
292 assert!(snapshot.get_state("user-123").is_some());
293 assert_eq!(counter.get_count("user.created"), 1);
294 }
295}