allsource_core/
projection.rs

1use crate::error::Result;
2use crate::domain::entities::Event;
3use crate::metrics::MetricsRegistry;
4use dashmap::DashMap;
5use serde_json::Value;
6use std::sync::Arc;
7
8/// A projection aggregates events into a queryable view
9pub trait Projection: Send + Sync {
10    /// Get the name of this projection
11    fn name(&self) -> &str;
12
13    /// Process an event and update the projection state
14    fn process(&self, event: &Event) -> Result<()>;
15
16    /// Get the current state of the projection for an entity
17    fn get_state(&self, entity_id: &str) -> Option<Value>;
18
19    /// Clear all projection state
20    fn clear(&self);
21}
22
23/// Entity snapshot projection - maintains current state of each entity
24pub struct EntitySnapshotProjection {
25    name: String,
26    /// entity_id -> latest state
27    states: Arc<DashMap<String, Value>>,
28}
29
30impl EntitySnapshotProjection {
31    pub fn new(name: impl Into<String>) -> Self {
32        Self {
33            name: name.into(),
34            states: Arc::new(DashMap::new()),
35        }
36    }
37
38    /// Get all entity states
39    pub fn get_all_states(&self) -> Vec<(String, Value)> {
40        self.states
41            .iter()
42            .map(|entry| (entry.key().clone(), entry.value().clone()))
43            .collect()
44    }
45}
46
47impl Projection for EntitySnapshotProjection {
48    fn name(&self) -> &str {
49        &self.name
50    }
51
52    fn process(&self, event: &Event) -> Result<()> {
53        // Simple merge strategy: update or insert
54        self.states
55            .entry(event.entity_id_str().to_string())
56            .and_modify(|state| {
57                // Merge the event payload into existing state
58                if let Value::Object(ref mut map) = state {
59                    if let Value::Object(ref payload_map) = event.payload {
60                        for (key, value) in payload_map {
61                            map.insert(key.clone(), value.clone());
62                        }
63                    }
64                }
65            })
66            .or_insert_with(|| event.payload.clone());
67
68        Ok(())
69    }
70
71    fn get_state(&self, entity_id: &str) -> Option<Value> {
72        self.states.get(entity_id).map(|v| v.clone())
73    }
74
75    fn clear(&self) {
76        self.states.clear();
77    }
78}
79
80/// Event counter projection - counts events by type
81pub struct EventCounterProjection {
82    name: String,
83    /// event_type -> count
84    counts: Arc<DashMap<String, u64>>,
85}
86
87impl EventCounterProjection {
88    pub fn new(name: impl Into<String>) -> Self {
89        Self {
90            name: name.into(),
91            counts: Arc::new(DashMap::new()),
92        }
93    }
94
95    /// Get count for a specific event type
96    pub fn get_count(&self, event_type: &str) -> u64 {
97        self.counts
98            .get(event_type)
99            .map(|v| *v)
100            .unwrap_or(0)
101    }
102
103    /// Get all event type counts
104    pub fn get_all_counts(&self) -> Vec<(String, u64)> {
105        self.counts
106            .iter()
107            .map(|entry| (entry.key().clone(), *entry.value()))
108            .collect()
109    }
110}
111
112impl Projection for EventCounterProjection {
113    fn name(&self) -> &str {
114        &self.name
115    }
116
117    fn process(&self, event: &Event) -> Result<()> {
118        self.counts
119            .entry(event.event_type_str().to_string())
120            .and_modify(|count| *count += 1)
121            .or_insert(1);
122
123        Ok(())
124    }
125
126    fn get_state(&self, event_type: &str) -> Option<Value> {
127        self.counts
128            .get(event_type)
129            .map(|count| serde_json::json!({ "count": *count }))
130    }
131
132    fn clear(&self) {
133        self.counts.clear();
134    }
135}
136
137/// Projection manager handles multiple projections
138pub struct ProjectionManager {
139    projections: Vec<Arc<dyn Projection>>,
140    metrics: Arc<MetricsRegistry>,
141}
142
143impl ProjectionManager {
144    pub fn new() -> Self {
145        Self::with_metrics(MetricsRegistry::new())
146    }
147
148    pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
149        Self {
150            projections: Vec::new(),
151            metrics,
152        }
153    }
154
155    /// Register a new projection
156    pub fn register(&mut self, projection: Arc<dyn Projection>) {
157        let name = projection.name();
158        tracing::info!("Registering projection: {}", name);
159        self.projections.push(projection);
160        self.metrics.projections_total.set(self.projections.len() as i64);
161    }
162
163    /// Process an event through all projections
164    pub fn process_event(&self, event: &Event) -> Result<()> {
165        let timer = self.metrics.projection_duration_seconds.start_timer();
166
167        for projection in &self.projections {
168            let name = projection.name();
169
170            match projection.process(event) {
171                Ok(_) => {
172                    self.metrics.projection_events_processed
173                        .with_label_values(&[name])
174                        .inc();
175                }
176                Err(e) => {
177                    self.metrics.projection_errors_total
178                        .with_label_values(&[name])
179                        .inc();
180                    tracing::error!(
181                        "Projection '{}' failed to process event {}: {}",
182                        name,
183                        event.id,
184                        e
185                    );
186                    // Continue processing other projections even if one fails
187                }
188            }
189        }
190
191        timer.observe_duration();
192        Ok(())
193    }
194
195    /// Get a projection by name
196    pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
197        self.projections.iter().find(|p| p.name() == name).cloned()
198    }
199
200    /// List all projections
201    pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
202        self.projections
203            .iter()
204            .map(|p| (p.name().to_string(), Arc::clone(p)))
205            .collect()
206    }
207
208    /// Clear all projections
209    pub fn clear_all(&self) {
210        for projection in &self.projections {
211            projection.clear();
212        }
213    }
214}
215
216impl Default for ProjectionManager {
217    fn default() -> Self {
218        Self::new()
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use uuid::Uuid;
226
227    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
228        Event::reconstruct_from_strings(
229            Uuid::new_v4(),
230            event_type.to_string(),
231            entity_id.to_string(),
232            "default".to_string(),
233            serde_json::json!({
234                "name": "Test User",
235                "email": "test@example.com"
236            }),
237            chrono::Utc::now(),
238            None,
239            1,
240        )
241    }
242
243    #[test]
244    fn test_entity_snapshot_projection() {
245        let projection = EntitySnapshotProjection::new("test");
246        let event = create_test_event("user-123", "user.created");
247
248        projection.process(&event).unwrap();
249
250        let state = projection.get_state("user-123").unwrap();
251        assert_eq!(state["name"], "Test User");
252    }
253
254    #[test]
255    fn test_event_counter_projection() {
256        let projection = EventCounterProjection::new("counter");
257
258        let event1 = create_test_event("user-123", "user.created");
259        let event2 = create_test_event("user-456", "user.created");
260        let event3 = create_test_event("user-123", "user.updated");
261
262        projection.process(&event1).unwrap();
263        projection.process(&event2).unwrap();
264        projection.process(&event3).unwrap();
265
266        assert_eq!(projection.get_count("user.created"), 2);
267        assert_eq!(projection.get_count("user.updated"), 1);
268    }
269
270    #[test]
271    fn test_projection_manager() {
272        let mut manager = ProjectionManager::new();
273
274        let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
275        let counter = Arc::new(EventCounterProjection::new("counter"));
276
277        manager.register(snapshot.clone());
278        manager.register(counter.clone());
279
280        let event = create_test_event("user-123", "user.created");
281        manager.process_event(&event).unwrap();
282
283        assert!(snapshot.get_state("user-123").is_some());
284        assert_eq!(counter.get_count("user.created"), 1);
285    }
286}