allsource_core/application/services/
projection.rs

1use crate::domain::entities::Event;
2use crate::error::Result;
3use crate::infrastructure::observability::metrics::MetricsRegistry;
4use dashmap::DashMap;
5use serde_json::Value;
6use std::sync::Arc;
7
8/// A projection aggregates events into a queryable view
9pub trait Projection: Send + Sync {
10    /// Get the name of this projection
11    fn name(&self) -> &str;
12
13    /// Process an event and update the projection state
14    fn process(&self, event: &Event) -> Result<()>;
15
16    /// Get the current state of the projection for an entity
17    fn get_state(&self, entity_id: &str) -> Option<Value>;
18
19    /// Clear all projection state
20    fn clear(&self);
21}
22
23/// Entity snapshot projection - maintains current state of each entity
24pub struct EntitySnapshotProjection {
25    name: String,
26    /// entity_id -> latest state
27    states: Arc<DashMap<String, Value>>,
28}
29
30impl EntitySnapshotProjection {
31    pub fn new(name: impl Into<String>) -> Self {
32        Self {
33            name: name.into(),
34            states: Arc::new(DashMap::new()),
35        }
36    }
37
38    /// Get all entity states
39    pub fn get_all_states(&self) -> Vec<(String, Value)> {
40        self.states
41            .iter()
42            .map(|entry| (entry.key().clone(), entry.value().clone()))
43            .collect()
44    }
45}
46
47impl Projection for EntitySnapshotProjection {
48    fn name(&self) -> &str {
49        &self.name
50    }
51
52    fn process(&self, event: &Event) -> Result<()> {
53        // Simple merge strategy: update or insert
54        self.states
55            .entry(event.entity_id_str().to_string())
56            .and_modify(|state| {
57                // Merge the event payload into existing state
58                if let Value::Object(ref mut map) = state {
59                    if let Value::Object(ref payload_map) = event.payload {
60                        for (key, value) in payload_map {
61                            map.insert(key.clone(), value.clone());
62                        }
63                    }
64                }
65            })
66            .or_insert_with(|| event.payload.clone());
67
68        Ok(())
69    }
70
71    fn get_state(&self, entity_id: &str) -> Option<Value> {
72        self.states.get(entity_id).map(|v| v.clone())
73    }
74
75    fn clear(&self) {
76        self.states.clear();
77    }
78}
79
80/// Event counter projection - counts events by type
81pub struct EventCounterProjection {
82    name: String,
83    /// event_type -> count
84    counts: Arc<DashMap<String, u64>>,
85}
86
87impl EventCounterProjection {
88    pub fn new(name: impl Into<String>) -> Self {
89        Self {
90            name: name.into(),
91            counts: Arc::new(DashMap::new()),
92        }
93    }
94
95    /// Get count for a specific event type
96    pub fn get_count(&self, event_type: &str) -> u64 {
97        self.counts.get(event_type).map(|v| *v).unwrap_or(0)
98    }
99
100    /// Get all event type counts
101    pub fn get_all_counts(&self) -> Vec<(String, u64)> {
102        self.counts
103            .iter()
104            .map(|entry| (entry.key().clone(), *entry.value()))
105            .collect()
106    }
107}
108
109impl Projection for EventCounterProjection {
110    fn name(&self) -> &str {
111        &self.name
112    }
113
114    fn process(&self, event: &Event) -> Result<()> {
115        self.counts
116            .entry(event.event_type_str().to_string())
117            .and_modify(|count| *count += 1)
118            .or_insert(1);
119
120        Ok(())
121    }
122
123    fn get_state(&self, event_type: &str) -> Option<Value> {
124        self.counts
125            .get(event_type)
126            .map(|count| serde_json::json!({ "count": *count }))
127    }
128
129    fn clear(&self) {
130        self.counts.clear();
131    }
132}
133
134/// Projection manager handles multiple projections
135pub struct ProjectionManager {
136    projections: Vec<Arc<dyn Projection>>,
137    metrics: Arc<MetricsRegistry>,
138}
139
140impl ProjectionManager {
141    pub fn new() -> Self {
142        Self::with_metrics(MetricsRegistry::new())
143    }
144
145    pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
146        Self {
147            projections: Vec::new(),
148            metrics,
149        }
150    }
151
152    /// Register a new projection
153    pub fn register(&mut self, projection: Arc<dyn Projection>) {
154        let name = projection.name();
155        tracing::info!("Registering projection: {}", name);
156        self.projections.push(projection);
157        self.metrics
158            .projections_total
159            .set(self.projections.len() as i64);
160    }
161
162    /// Process an event through all projections
163    pub fn process_event(&self, event: &Event) -> Result<()> {
164        let timer = self.metrics.projection_duration_seconds.start_timer();
165
166        for projection in &self.projections {
167            let name = projection.name();
168
169            match projection.process(event) {
170                Ok(_) => {
171                    self.metrics
172                        .projection_events_processed
173                        .with_label_values(&[name])
174                        .inc();
175                }
176                Err(e) => {
177                    self.metrics
178                        .projection_errors_total
179                        .with_label_values(&[name])
180                        .inc();
181                    tracing::error!(
182                        "Projection '{}' failed to process event {}: {}",
183                        name,
184                        event.id,
185                        e
186                    );
187                    // Continue processing other projections even if one fails
188                }
189            }
190        }
191
192        timer.observe_duration();
193        Ok(())
194    }
195
196    /// Get a projection by name
197    pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
198        self.projections.iter().find(|p| p.name() == name).cloned()
199    }
200
201    /// List all projections
202    pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
203        self.projections
204            .iter()
205            .map(|p| (p.name().to_string(), Arc::clone(p)))
206            .collect()
207    }
208
209    /// Clear all projections
210    pub fn clear_all(&self) {
211        for projection in &self.projections {
212            projection.clear();
213        }
214    }
215}
216
217impl Default for ProjectionManager {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use uuid::Uuid;
227
228    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
229        Event::reconstruct_from_strings(
230            Uuid::new_v4(),
231            event_type.to_string(),
232            entity_id.to_string(),
233            "default".to_string(),
234            serde_json::json!({
235                "name": "Test User",
236                "email": "test@example.com"
237            }),
238            chrono::Utc::now(),
239            None,
240            1,
241        )
242    }
243
244    #[test]
245    fn test_entity_snapshot_projection() {
246        let projection = EntitySnapshotProjection::new("test");
247        let event = create_test_event("user-123", "user.created");
248
249        projection.process(&event).unwrap();
250
251        let state = projection.get_state("user-123").unwrap();
252        assert_eq!(state["name"], "Test User");
253    }
254
255    #[test]
256    fn test_event_counter_projection() {
257        let projection = EventCounterProjection::new("counter");
258
259        let event1 = create_test_event("user-123", "user.created");
260        let event2 = create_test_event("user-456", "user.created");
261        let event3 = create_test_event("user-123", "user.updated");
262
263        projection.process(&event1).unwrap();
264        projection.process(&event2).unwrap();
265        projection.process(&event3).unwrap();
266
267        assert_eq!(projection.get_count("user.created"), 2);
268        assert_eq!(projection.get_count("user.updated"), 1);
269    }
270
271    #[test]
272    fn test_projection_manager() {
273        let mut manager = ProjectionManager::new();
274
275        let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
276        let counter = Arc::new(EventCounterProjection::new("counter"));
277
278        manager.register(snapshot.clone());
279        manager.register(counter.clone());
280
281        let event = create_test_event("user-123", "user.created");
282        manager.process_event(&event).unwrap();
283
284        assert!(snapshot.get_state("user-123").is_some());
285        assert_eq!(counter.get_count("user.created"), 1);
286    }
287}