allsource_core/application/services/
projection.rs1use crate::domain::entities::Event;
2use crate::error::Result;
3use crate::infrastructure::observability::metrics::MetricsRegistry;
4use dashmap::DashMap;
5use serde_json::Value;
6use std::sync::Arc;
7
8pub trait Projection: Send + Sync {
10 fn name(&self) -> &str;
12
13 fn process(&self, event: &Event) -> Result<()>;
15
16 fn get_state(&self, entity_id: &str) -> Option<Value>;
18
19 fn clear(&self);
21}
22
23pub struct EntitySnapshotProjection {
25 name: String,
26 states: Arc<DashMap<String, Value>>,
28}
29
30impl EntitySnapshotProjection {
31 pub fn new(name: impl Into<String>) -> Self {
32 Self {
33 name: name.into(),
34 states: Arc::new(DashMap::new()),
35 }
36 }
37
38 pub fn get_all_states(&self) -> Vec<(String, Value)> {
40 self.states
41 .iter()
42 .map(|entry| (entry.key().clone(), entry.value().clone()))
43 .collect()
44 }
45}
46
47impl Projection for EntitySnapshotProjection {
48 fn name(&self) -> &str {
49 &self.name
50 }
51
52 fn process(&self, event: &Event) -> Result<()> {
53 self.states
55 .entry(event.entity_id_str().to_string())
56 .and_modify(|state| {
57 if let Value::Object(ref mut map) = state {
59 if let Value::Object(ref payload_map) = event.payload {
60 for (key, value) in payload_map {
61 map.insert(key.clone(), value.clone());
62 }
63 }
64 }
65 })
66 .or_insert_with(|| event.payload.clone());
67
68 Ok(())
69 }
70
71 fn get_state(&self, entity_id: &str) -> Option<Value> {
72 self.states.get(entity_id).map(|v| v.clone())
73 }
74
75 fn clear(&self) {
76 self.states.clear();
77 }
78}
79
80pub struct EventCounterProjection {
82 name: String,
83 counts: Arc<DashMap<String, u64>>,
85}
86
87impl EventCounterProjection {
88 pub fn new(name: impl Into<String>) -> Self {
89 Self {
90 name: name.into(),
91 counts: Arc::new(DashMap::new()),
92 }
93 }
94
95 pub fn get_count(&self, event_type: &str) -> u64 {
97 self.counts.get(event_type).map(|v| *v).unwrap_or(0)
98 }
99
100 pub fn get_all_counts(&self) -> Vec<(String, u64)> {
102 self.counts
103 .iter()
104 .map(|entry| (entry.key().clone(), *entry.value()))
105 .collect()
106 }
107}
108
109impl Projection for EventCounterProjection {
110 fn name(&self) -> &str {
111 &self.name
112 }
113
114 fn process(&self, event: &Event) -> Result<()> {
115 self.counts
116 .entry(event.event_type_str().to_string())
117 .and_modify(|count| *count += 1)
118 .or_insert(1);
119
120 Ok(())
121 }
122
123 fn get_state(&self, event_type: &str) -> Option<Value> {
124 self.counts
125 .get(event_type)
126 .map(|count| serde_json::json!({ "count": *count }))
127 }
128
129 fn clear(&self) {
130 self.counts.clear();
131 }
132}
133
134pub struct ProjectionManager {
136 projections: Vec<Arc<dyn Projection>>,
137 metrics: Arc<MetricsRegistry>,
138}
139
140impl ProjectionManager {
141 pub fn new() -> Self {
142 Self::with_metrics(MetricsRegistry::new())
143 }
144
145 pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
146 Self {
147 projections: Vec::new(),
148 metrics,
149 }
150 }
151
152 pub fn register(&mut self, projection: Arc<dyn Projection>) {
154 let name = projection.name();
155 tracing::info!("Registering projection: {}", name);
156 self.projections.push(projection);
157 self.metrics
158 .projections_total
159 .set(self.projections.len() as i64);
160 }
161
162 pub fn process_event(&self, event: &Event) -> Result<()> {
164 let timer = self.metrics.projection_duration_seconds.start_timer();
165
166 for projection in &self.projections {
167 let name = projection.name();
168
169 match projection.process(event) {
170 Ok(_) => {
171 self.metrics
172 .projection_events_processed
173 .with_label_values(&[name])
174 .inc();
175 }
176 Err(e) => {
177 self.metrics
178 .projection_errors_total
179 .with_label_values(&[name])
180 .inc();
181 tracing::error!(
182 "Projection '{}' failed to process event {}: {}",
183 name,
184 event.id,
185 e
186 );
187 }
189 }
190 }
191
192 timer.observe_duration();
193 Ok(())
194 }
195
196 pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
198 self.projections.iter().find(|p| p.name() == name).cloned()
199 }
200
201 pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
203 self.projections
204 .iter()
205 .map(|p| (p.name().to_string(), Arc::clone(p)))
206 .collect()
207 }
208
209 pub fn clear_all(&self) {
211 for projection in &self.projections {
212 projection.clear();
213 }
214 }
215}
216
217impl Default for ProjectionManager {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use uuid::Uuid;
227
228 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
229 Event::reconstruct_from_strings(
230 Uuid::new_v4(),
231 event_type.to_string(),
232 entity_id.to_string(),
233 "default".to_string(),
234 serde_json::json!({
235 "name": "Test User",
236 "email": "test@example.com"
237 }),
238 chrono::Utc::now(),
239 None,
240 1,
241 )
242 }
243
244 #[test]
245 fn test_entity_snapshot_projection() {
246 let projection = EntitySnapshotProjection::new("test");
247 let event = create_test_event("user-123", "user.created");
248
249 projection.process(&event).unwrap();
250
251 let state = projection.get_state("user-123").unwrap();
252 assert_eq!(state["name"], "Test User");
253 }
254
255 #[test]
256 fn test_event_counter_projection() {
257 let projection = EventCounterProjection::new("counter");
258
259 let event1 = create_test_event("user-123", "user.created");
260 let event2 = create_test_event("user-456", "user.created");
261 let event3 = create_test_event("user-123", "user.updated");
262
263 projection.process(&event1).unwrap();
264 projection.process(&event2).unwrap();
265 projection.process(&event3).unwrap();
266
267 assert_eq!(projection.get_count("user.created"), 2);
268 assert_eq!(projection.get_count("user.updated"), 1);
269 }
270
271 #[test]
272 fn test_projection_manager() {
273 let mut manager = ProjectionManager::new();
274
275 let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
276 let counter = Arc::new(EventCounterProjection::new("counter"));
277
278 manager.register(snapshot.clone());
279 manager.register(counter.clone());
280
281 let event = create_test_event("user-123", "user.created");
282 manager.process_event(&event).unwrap();
283
284 assert!(snapshot.get_state("user-123").is_some());
285 assert_eq!(counter.get_count("user.created"), 1);
286 }
287}