Skip to main content

allsource_core/application/services/
projection.rs

1use crate::{
2    domain::entities::Event, error::Result, infrastructure::observability::metrics::MetricsRegistry,
3};
4use dashmap::DashMap;
5use serde_json::Value;
6use std::sync::Arc;
7
8/// A projection aggregates events into a queryable view
9pub trait Projection: Send + Sync {
10    /// Get the name of this projection
11    fn name(&self) -> &str;
12
13    /// Process an event and update the projection state
14    fn process(&self, event: &Event) -> Result<()>;
15
16    /// Get the current state of the projection for an entity
17    fn get_state(&self, entity_id: &str) -> Option<Value>;
18
19    /// Clear all projection state
20    fn clear(&self);
21}
22
23/// Entity snapshot projection - maintains current state of each entity
24pub struct EntitySnapshotProjection {
25    name: String,
26    /// entity_id -> latest state
27    states: Arc<DashMap<String, Value>>,
28}
29
30impl EntitySnapshotProjection {
31    pub fn new(name: impl Into<String>) -> Self {
32        Self {
33            name: name.into(),
34            states: Arc::new(DashMap::new()),
35        }
36    }
37
38    /// Get all entity states
39    pub fn get_all_states(&self) -> Vec<(String, Value)> {
40        self.states
41            .iter()
42            .map(|entry| (entry.key().clone(), entry.value().clone()))
43            .collect()
44    }
45}
46
47impl Projection for EntitySnapshotProjection {
48    fn name(&self) -> &str {
49        &self.name
50    }
51
52    fn process(&self, event: &Event) -> Result<()> {
53        // Simple merge strategy: update or insert
54        self.states
55            .entry(event.entity_id_str().to_string())
56            .and_modify(|state| {
57                // Merge the event payload into existing state
58                if let (Value::Object(map), Value::Object(payload_map)) = (state, &event.payload) {
59                    for (key, value) in payload_map {
60                        map.insert(key.clone(), value.clone());
61                    }
62                }
63            })
64            .or_insert_with(|| event.payload.clone());
65
66        Ok(())
67    }
68
69    fn get_state(&self, entity_id: &str) -> Option<Value> {
70        self.states.get(entity_id).map(|v| v.clone())
71    }
72
73    fn clear(&self) {
74        self.states.clear();
75    }
76}
77
78/// Event counter projection - counts events by type
79pub struct EventCounterProjection {
80    name: String,
81    /// event_type -> count
82    counts: Arc<DashMap<String, u64>>,
83}
84
85impl EventCounterProjection {
86    pub fn new(name: impl Into<String>) -> Self {
87        Self {
88            name: name.into(),
89            counts: Arc::new(DashMap::new()),
90        }
91    }
92
93    /// Get count for a specific event type
94    pub fn get_count(&self, event_type: &str) -> u64 {
95        self.counts.get(event_type).map(|v| *v).unwrap_or(0)
96    }
97
98    /// Get all event type counts
99    pub fn get_all_counts(&self) -> Vec<(String, u64)> {
100        self.counts
101            .iter()
102            .map(|entry| (entry.key().clone(), *entry.value()))
103            .collect()
104    }
105}
106
107impl Projection for EventCounterProjection {
108    fn name(&self) -> &str {
109        &self.name
110    }
111
112    fn process(&self, event: &Event) -> Result<()> {
113        self.counts
114            .entry(event.event_type_str().to_string())
115            .and_modify(|count| *count += 1)
116            .or_insert(1);
117
118        Ok(())
119    }
120
121    fn get_state(&self, event_type: &str) -> Option<Value> {
122        self.counts
123            .get(event_type)
124            .map(|count| serde_json::json!({ "count": *count }))
125    }
126
127    fn clear(&self) {
128        self.counts.clear();
129    }
130}
131
132/// Projection manager handles multiple projections
133pub struct ProjectionManager {
134    projections: Vec<Arc<dyn Projection>>,
135    metrics: Arc<MetricsRegistry>,
136}
137
138impl ProjectionManager {
139    pub fn new() -> Self {
140        Self::with_metrics(MetricsRegistry::new())
141    }
142
143    pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
144        Self {
145            projections: Vec::new(),
146            metrics,
147        }
148    }
149
150    /// Register a new projection
151    pub fn register(&mut self, projection: Arc<dyn Projection>) {
152        let name = projection.name();
153        tracing::info!("Registering projection: {}", name);
154        self.projections.push(projection);
155        self.metrics
156            .projections_total
157            .set(self.projections.len() as i64);
158    }
159
160    /// Process an event through all projections
161    pub fn process_event(&self, event: &Event) -> Result<()> {
162        let timer = self.metrics.projection_duration_seconds.start_timer();
163
164        for projection in &self.projections {
165            let name = projection.name();
166
167            match projection.process(event) {
168                Ok(_) => {
169                    self.metrics
170                        .projection_events_processed
171                        .with_label_values(&[name])
172                        .inc();
173                }
174                Err(e) => {
175                    self.metrics
176                        .projection_errors_total
177                        .with_label_values(&[name])
178                        .inc();
179                    tracing::error!(
180                        "Projection '{}' failed to process event {}: {}",
181                        name,
182                        event.id,
183                        e
184                    );
185                    // Continue processing other projections even if one fails
186                }
187            }
188        }
189
190        timer.observe_duration();
191        Ok(())
192    }
193
194    /// Get a projection by name
195    pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
196        self.projections.iter().find(|p| p.name() == name).cloned()
197    }
198
199    /// List all projections
200    pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
201        self.projections
202            .iter()
203            .map(|p| (p.name().to_string(), Arc::clone(p)))
204            .collect()
205    }
206
207    /// Clear all projections
208    pub fn clear_all(&self) {
209        for projection in &self.projections {
210            projection.clear();
211        }
212    }
213}
214
215impl Default for ProjectionManager {
216    fn default() -> Self {
217        Self::new()
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use uuid::Uuid;
225
226    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
227        Event::reconstruct_from_strings(
228            Uuid::new_v4(),
229            event_type.to_string(),
230            entity_id.to_string(),
231            "default".to_string(),
232            serde_json::json!({
233                "name": "Test User",
234                "email": "test@example.com"
235            }),
236            chrono::Utc::now(),
237            None,
238            1,
239        )
240    }
241
242    #[test]
243    fn test_entity_snapshot_projection() {
244        let projection = EntitySnapshotProjection::new("test");
245        let event = create_test_event("user-123", "user.created");
246
247        projection.process(&event).unwrap();
248
249        let state = projection.get_state("user-123").unwrap();
250        assert_eq!(state["name"], "Test User");
251    }
252
253    #[test]
254    fn test_event_counter_projection() {
255        let projection = EventCounterProjection::new("counter");
256
257        let event1 = create_test_event("user-123", "user.created");
258        let event2 = create_test_event("user-456", "user.created");
259        let event3 = create_test_event("user-123", "user.updated");
260
261        projection.process(&event1).unwrap();
262        projection.process(&event2).unwrap();
263        projection.process(&event3).unwrap();
264
265        assert_eq!(projection.get_count("user.created"), 2);
266        assert_eq!(projection.get_count("user.updated"), 1);
267    }
268
269    #[test]
270    fn test_projection_manager() {
271        let mut manager = ProjectionManager::new();
272
273        let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
274        let counter = Arc::new(EventCounterProjection::new("counter"));
275
276        manager.register(snapshot.clone());
277        manager.register(counter.clone());
278
279        let event = create_test_event("user-123", "user.created");
280        manager.process_event(&event).unwrap();
281
282        assert!(snapshot.get_state("user-123").is_some());
283        assert_eq!(counter.get_count("user.created"), 1);
284    }
285}