allsource_core/application/services/
projection.rs1use crate::{
2 domain::entities::Event, error::Result, infrastructure::observability::metrics::MetricsRegistry,
3};
4use dashmap::DashMap;
5use serde_json::Value;
6use std::sync::Arc;
7
8pub trait Projection: Send + Sync {
10 fn name(&self) -> &str;
12
13 fn process(&self, event: &Event) -> Result<()>;
15
16 fn get_state(&self, entity_id: &str) -> Option<Value>;
18
19 fn clear(&self);
21}
22
23pub struct EntitySnapshotProjection {
25 name: String,
26 states: Arc<DashMap<String, Value>>,
28}
29
30impl EntitySnapshotProjection {
31 pub fn new(name: impl Into<String>) -> Self {
32 Self {
33 name: name.into(),
34 states: Arc::new(DashMap::new()),
35 }
36 }
37
38 pub fn get_all_states(&self) -> Vec<(String, Value)> {
40 self.states
41 .iter()
42 .map(|entry| (entry.key().clone(), entry.value().clone()))
43 .collect()
44 }
45}
46
47impl Projection for EntitySnapshotProjection {
48 fn name(&self) -> &str {
49 &self.name
50 }
51
52 fn process(&self, event: &Event) -> Result<()> {
53 self.states
55 .entry(event.entity_id_str().to_string())
56 .and_modify(|state| {
57 if let (Value::Object(map), Value::Object(payload_map)) = (state, &event.payload) {
59 for (key, value) in payload_map {
60 map.insert(key.clone(), value.clone());
61 }
62 }
63 })
64 .or_insert_with(|| event.payload.clone());
65
66 Ok(())
67 }
68
69 fn get_state(&self, entity_id: &str) -> Option<Value> {
70 self.states.get(entity_id).map(|v| v.clone())
71 }
72
73 fn clear(&self) {
74 self.states.clear();
75 }
76}
77
78pub struct EventCounterProjection {
80 name: String,
81 counts: Arc<DashMap<String, u64>>,
83}
84
85impl EventCounterProjection {
86 pub fn new(name: impl Into<String>) -> Self {
87 Self {
88 name: name.into(),
89 counts: Arc::new(DashMap::new()),
90 }
91 }
92
93 pub fn get_count(&self, event_type: &str) -> u64 {
95 self.counts.get(event_type).map(|v| *v).unwrap_or(0)
96 }
97
98 pub fn get_all_counts(&self) -> Vec<(String, u64)> {
100 self.counts
101 .iter()
102 .map(|entry| (entry.key().clone(), *entry.value()))
103 .collect()
104 }
105}
106
107impl Projection for EventCounterProjection {
108 fn name(&self) -> &str {
109 &self.name
110 }
111
112 fn process(&self, event: &Event) -> Result<()> {
113 self.counts
114 .entry(event.event_type_str().to_string())
115 .and_modify(|count| *count += 1)
116 .or_insert(1);
117
118 Ok(())
119 }
120
121 fn get_state(&self, event_type: &str) -> Option<Value> {
122 self.counts
123 .get(event_type)
124 .map(|count| serde_json::json!({ "count": *count }))
125 }
126
127 fn clear(&self) {
128 self.counts.clear();
129 }
130}
131
132pub struct ProjectionManager {
134 projections: Vec<Arc<dyn Projection>>,
135 metrics: Arc<MetricsRegistry>,
136}
137
138impl ProjectionManager {
139 pub fn new() -> Self {
140 Self::with_metrics(MetricsRegistry::new())
141 }
142
143 pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
144 Self {
145 projections: Vec::new(),
146 metrics,
147 }
148 }
149
150 pub fn register(&mut self, projection: Arc<dyn Projection>) {
152 let name = projection.name();
153 tracing::info!("Registering projection: {}", name);
154 self.projections.push(projection);
155 self.metrics
156 .projections_total
157 .set(self.projections.len() as i64);
158 }
159
160 pub fn process_event(&self, event: &Event) -> Result<()> {
162 let timer = self.metrics.projection_duration_seconds.start_timer();
163
164 for projection in &self.projections {
165 let name = projection.name();
166
167 match projection.process(event) {
168 Ok(_) => {
169 self.metrics
170 .projection_events_processed
171 .with_label_values(&[name])
172 .inc();
173 }
174 Err(e) => {
175 self.metrics
176 .projection_errors_total
177 .with_label_values(&[name])
178 .inc();
179 tracing::error!(
180 "Projection '{}' failed to process event {}: {}",
181 name,
182 event.id,
183 e
184 );
185 }
187 }
188 }
189
190 timer.observe_duration();
191 Ok(())
192 }
193
194 pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
196 self.projections.iter().find(|p| p.name() == name).cloned()
197 }
198
199 pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
201 self.projections
202 .iter()
203 .map(|p| (p.name().to_string(), Arc::clone(p)))
204 .collect()
205 }
206
207 pub fn clear_all(&self) {
209 for projection in &self.projections {
210 projection.clear();
211 }
212 }
213}
214
215impl Default for ProjectionManager {
216 fn default() -> Self {
217 Self::new()
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use uuid::Uuid;
225
226 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
227 Event::reconstruct_from_strings(
228 Uuid::new_v4(),
229 event_type.to_string(),
230 entity_id.to_string(),
231 "default".to_string(),
232 serde_json::json!({
233 "name": "Test User",
234 "email": "test@example.com"
235 }),
236 chrono::Utc::now(),
237 None,
238 1,
239 )
240 }
241
242 #[test]
243 fn test_entity_snapshot_projection() {
244 let projection = EntitySnapshotProjection::new("test");
245 let event = create_test_event("user-123", "user.created");
246
247 projection.process(&event).unwrap();
248
249 let state = projection.get_state("user-123").unwrap();
250 assert_eq!(state["name"], "Test User");
251 }
252
253 #[test]
254 fn test_event_counter_projection() {
255 let projection = EventCounterProjection::new("counter");
256
257 let event1 = create_test_event("user-123", "user.created");
258 let event2 = create_test_event("user-456", "user.created");
259 let event3 = create_test_event("user-123", "user.updated");
260
261 projection.process(&event1).unwrap();
262 projection.process(&event2).unwrap();
263 projection.process(&event3).unwrap();
264
265 assert_eq!(projection.get_count("user.created"), 2);
266 assert_eq!(projection.get_count("user.updated"), 1);
267 }
268
269 #[test]
270 fn test_projection_manager() {
271 let mut manager = ProjectionManager::new();
272
273 let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
274 let counter = Arc::new(EventCounterProjection::new("counter"));
275
276 manager.register(snapshot.clone());
277 manager.register(counter.clone());
278
279 let event = create_test_event("user-123", "user.created");
280 manager.process_event(&event).unwrap();
281
282 assert!(snapshot.get_state("user-123").is_some());
283 assert_eq!(counter.get_count("user.created"), 1);
284 }
285}