Skip to main content

allsource_core/application/services/
projection.rs

1use crate::{
2    domain::entities::Event, error::Result, infrastructure::observability::metrics::MetricsRegistry,
3};
4use dashmap::DashMap;
5use serde_json::Value;
6use std::sync::Arc;
7
8/// A projection aggregates events into a queryable view
9pub trait Projection: Send + Sync {
10    /// Get the name of this projection
11    fn name(&self) -> &str;
12
13    /// Process an event and update the projection state
14    fn process(&self, event: &Event) -> Result<()>;
15
16    /// Get the current state of the projection for an entity
17    fn get_state(&self, entity_id: &str) -> Option<Value>;
18
19    /// Clear all projection state
20    fn clear(&self);
21}
22
23/// Entity snapshot projection - maintains current state of each entity
24pub struct EntitySnapshotProjection {
25    name: String,
26    /// entity_id -> (latest state, last event timestamp)
27    states: Arc<DashMap<String, (Value, chrono::DateTime<chrono::Utc>)>>,
28}
29
30impl EntitySnapshotProjection {
31    pub fn new(name: impl Into<String>) -> Self {
32        Self {
33            name: name.into(),
34            states: Arc::new(DashMap::new()),
35        }
36    }
37
38    /// Get all entity states
39    pub fn get_all_states(&self) -> Vec<(String, Value)> {
40        self.states
41            .iter()
42            .map(|entry| (entry.key().clone(), entry.value().0.clone()))
43            .collect()
44    }
45}
46
47impl Projection for EntitySnapshotProjection {
48    fn name(&self) -> &str {
49        &self.name
50    }
51
52    fn process(&self, event: &Event) -> Result<()> {
53        // Timestamp-aware merge strategy: only merge if the event is at least
54        // as new as the last processed event for this entity. This ensures
55        // convergence during bidirectional sync where events may arrive out of
56        // order. In the normal (non-sync) case, events are always in order so
57        // this condition is always true.
58        self.states
59            .entry(event.entity_id_str().to_string())
60            .and_modify(|(state, last_ts)| {
61                if event.timestamp >= *last_ts {
62                    // Merge the event payload into existing state
63                    if let (Value::Object(map), Value::Object(payload_map)) =
64                        (state, &event.payload)
65                    {
66                        for (key, value) in payload_map {
67                            map.insert(key.clone(), value.clone());
68                        }
69                    }
70                    *last_ts = event.timestamp;
71                }
72                // Out-of-order event: skip merge to ensure convergence
73            })
74            .or_insert_with(|| (event.payload.clone(), event.timestamp));
75
76        Ok(())
77    }
78
79    fn get_state(&self, entity_id: &str) -> Option<Value> {
80        self.states.get(entity_id).map(|v| v.0.clone())
81    }
82
83    fn clear(&self) {
84        self.states.clear();
85    }
86}
87
88/// Event counter projection - counts events by type
89pub struct EventCounterProjection {
90    name: String,
91    /// event_type -> count
92    counts: Arc<DashMap<String, u64>>,
93}
94
95impl EventCounterProjection {
96    pub fn new(name: impl Into<String>) -> Self {
97        Self {
98            name: name.into(),
99            counts: Arc::new(DashMap::new()),
100        }
101    }
102
103    /// Get count for a specific event type
104    pub fn get_count(&self, event_type: &str) -> u64 {
105        self.counts.get(event_type).map(|v| *v).unwrap_or(0)
106    }
107
108    /// Get all event type counts
109    pub fn get_all_counts(&self) -> Vec<(String, u64)> {
110        self.counts
111            .iter()
112            .map(|entry| (entry.key().clone(), *entry.value()))
113            .collect()
114    }
115}
116
117impl Projection for EventCounterProjection {
118    fn name(&self) -> &str {
119        &self.name
120    }
121
122    fn process(&self, event: &Event) -> Result<()> {
123        self.counts
124            .entry(event.event_type_str().to_string())
125            .and_modify(|count| *count += 1)
126            .or_insert(1);
127
128        Ok(())
129    }
130
131    fn get_state(&self, event_type: &str) -> Option<Value> {
132        self.counts
133            .get(event_type)
134            .map(|count| serde_json::json!({ "count": *count }))
135    }
136
137    fn clear(&self) {
138        self.counts.clear();
139    }
140}
141
142/// Projection manager handles multiple projections
143pub struct ProjectionManager {
144    projections: Vec<Arc<dyn Projection>>,
145    metrics: Arc<MetricsRegistry>,
146}
147
148impl ProjectionManager {
149    pub fn new() -> Self {
150        Self::with_metrics(MetricsRegistry::new())
151    }
152
153    pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
154        Self {
155            projections: Vec::new(),
156            metrics,
157        }
158    }
159
160    /// Register a new projection
161    pub fn register(&mut self, projection: Arc<dyn Projection>) {
162        let name = projection.name();
163        tracing::info!("Registering projection: {}", name);
164        self.projections.push(projection);
165        self.metrics
166            .projections_total
167            .set(self.projections.len() as i64);
168    }
169
170    /// Process an event through all projections
171    pub fn process_event(&self, event: &Event) -> Result<()> {
172        let timer = self.metrics.projection_duration_seconds.start_timer();
173
174        for projection in &self.projections {
175            let name = projection.name();
176
177            match projection.process(event) {
178                Ok(_) => {
179                    self.metrics
180                        .projection_events_processed
181                        .with_label_values(&[name])
182                        .inc();
183                }
184                Err(e) => {
185                    self.metrics
186                        .projection_errors_total
187                        .with_label_values(&[name])
188                        .inc();
189                    tracing::error!(
190                        "Projection '{}' failed to process event {}: {}",
191                        name,
192                        event.id,
193                        e
194                    );
195                    // Continue processing other projections even if one fails
196                }
197            }
198        }
199
200        timer.observe_duration();
201        Ok(())
202    }
203
204    /// Get a projection by name
205    pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
206        self.projections.iter().find(|p| p.name() == name).cloned()
207    }
208
209    /// List all projections
210    pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
211        self.projections
212            .iter()
213            .map(|p| (p.name().to_string(), Arc::clone(p)))
214            .collect()
215    }
216
217    /// Clear all projections
218    pub fn clear_all(&self) {
219        for projection in &self.projections {
220            projection.clear();
221        }
222    }
223}
224
225impl Default for ProjectionManager {
226    fn default() -> Self {
227        Self::new()
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use uuid::Uuid;
235
236    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
237        Event::reconstruct_from_strings(
238            Uuid::new_v4(),
239            event_type.to_string(),
240            entity_id.to_string(),
241            "default".to_string(),
242            serde_json::json!({
243                "name": "Test User",
244                "email": "test@example.com"
245            }),
246            chrono::Utc::now(),
247            None,
248            1,
249        )
250    }
251
252    #[test]
253    fn test_entity_snapshot_projection() {
254        let projection = EntitySnapshotProjection::new("test");
255        let event = create_test_event("user-123", "user.created");
256
257        projection.process(&event).unwrap();
258
259        let state = projection.get_state("user-123").unwrap();
260        assert_eq!(state["name"], "Test User");
261    }
262
263    #[test]
264    fn test_event_counter_projection() {
265        let projection = EventCounterProjection::new("counter");
266
267        let event1 = create_test_event("user-123", "user.created");
268        let event2 = create_test_event("user-456", "user.created");
269        let event3 = create_test_event("user-123", "user.updated");
270
271        projection.process(&event1).unwrap();
272        projection.process(&event2).unwrap();
273        projection.process(&event3).unwrap();
274
275        assert_eq!(projection.get_count("user.created"), 2);
276        assert_eq!(projection.get_count("user.updated"), 1);
277    }
278
279    #[test]
280    fn test_projection_manager() {
281        let mut manager = ProjectionManager::new();
282
283        let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
284        let counter = Arc::new(EventCounterProjection::new("counter"));
285
286        manager.register(snapshot.clone());
287        manager.register(counter.clone());
288
289        let event = create_test_event("user-123", "user.created");
290        manager.process_event(&event).unwrap();
291
292        assert!(snapshot.get_state("user-123").is_some());
293        assert_eq!(counter.get_count("user.created"), 1);
294    }
295}