Skip to main content

allsource_core/application/services/
projection.rs

1use crate::{
2    domain::entities::Event, error::Result, infrastructure::observability::metrics::MetricsRegistry,
3};
4use dashmap::DashMap;
5use serde_json::Value;
6use std::sync::Arc;
7
8/// A projection aggregates events into a queryable view
9pub trait Projection: Send + Sync {
10    /// Get the name of this projection
11    fn name(&self) -> &str;
12
13    /// Process an event and update the projection state
14    fn process(&self, event: &Event) -> Result<()>;
15
16    /// Get the current state of the projection for an entity
17    fn get_state(&self, entity_id: &str) -> Option<Value>;
18
19    /// Clear all projection state
20    fn clear(&self);
21}
22
23/// Entity snapshot projection - maintains current state of each entity
24pub struct EntitySnapshotProjection {
25    name: String,
26    /// entity_id -> (latest state, last event timestamp)
27    states: Arc<DashMap<String, (Value, chrono::DateTime<chrono::Utc>)>>,
28}
29
30impl EntitySnapshotProjection {
31    pub fn new(name: impl Into<String>) -> Self {
32        Self {
33            name: name.into(),
34            states: Arc::new(DashMap::new()),
35        }
36    }
37
38    /// Get all entity states
39    pub fn get_all_states(&self) -> Vec<(String, Value)> {
40        self.states
41            .iter()
42            .map(|entry| (entry.key().clone(), entry.value().0.clone()))
43            .collect()
44    }
45}
46
47impl Projection for EntitySnapshotProjection {
48    fn name(&self) -> &str {
49        &self.name
50    }
51
52    fn process(&self, event: &Event) -> Result<()> {
53        // Timestamp-aware merge strategy: only merge if the event is at least
54        // as new as the last processed event for this entity. This ensures
55        // convergence during bidirectional sync where events may arrive out of
56        // order. In the normal (non-sync) case, events are always in order so
57        // this condition is always true.
58        self.states
59            .entry(event.entity_id_str().to_string())
60            .and_modify(|(state, last_ts)| {
61                if event.timestamp >= *last_ts {
62                    // Merge the event payload into existing state
63                    if let (Value::Object(map), Value::Object(payload_map)) =
64                        (state, &event.payload)
65                    {
66                        for (key, value) in payload_map {
67                            map.insert(key.clone(), value.clone());
68                        }
69                    }
70                    *last_ts = event.timestamp;
71                }
72                // Out-of-order event: skip merge to ensure convergence
73            })
74            .or_insert_with(|| (event.payload.clone(), event.timestamp));
75
76        Ok(())
77    }
78
79    fn get_state(&self, entity_id: &str) -> Option<Value> {
80        self.states.get(entity_id).map(|v| v.0.clone())
81    }
82
83    fn clear(&self) {
84        self.states.clear();
85    }
86}
87
88/// Event counter projection - counts events by type
89pub struct EventCounterProjection {
90    name: String,
91    /// event_type -> count
92    counts: Arc<DashMap<String, u64>>,
93}
94
95impl EventCounterProjection {
96    pub fn new(name: impl Into<String>) -> Self {
97        Self {
98            name: name.into(),
99            counts: Arc::new(DashMap::new()),
100        }
101    }
102
103    /// Get count for a specific event type
104    pub fn get_count(&self, event_type: &str) -> u64 {
105        self.counts.get(event_type).map_or(0, |v| *v)
106    }
107
108    /// Get all event type counts
109    pub fn get_all_counts(&self) -> Vec<(String, u64)> {
110        self.counts
111            .iter()
112            .map(|entry| (entry.key().clone(), *entry.value()))
113            .collect()
114    }
115}
116
117impl Projection for EventCounterProjection {
118    fn name(&self) -> &str {
119        &self.name
120    }
121
122    fn process(&self, event: &Event) -> Result<()> {
123        self.counts
124            .entry(event.event_type_str().to_string())
125            .and_modify(|count| *count += 1)
126            .or_insert(1);
127
128        Ok(())
129    }
130
131    fn get_state(&self, event_type: &str) -> Option<Value> {
132        self.counts
133            .get(event_type)
134            .map(|count| serde_json::json!({ "count": *count }))
135    }
136
137    fn clear(&self) {
138        self.counts.clear();
139    }
140}
141
142/// Projection manager handles multiple projections
143pub struct ProjectionManager {
144    projections: Vec<Arc<dyn Projection>>,
145    metrics: Arc<MetricsRegistry>,
146}
147
148impl ProjectionManager {
149    pub fn new() -> Self {
150        Self::with_metrics(MetricsRegistry::new())
151    }
152
153    pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
154        Self {
155            projections: Vec::new(),
156            metrics,
157        }
158    }
159
160    /// Register a new projection
161    pub fn register(&mut self, projection: Arc<dyn Projection>) {
162        let name = projection.name();
163        tracing::info!("Registering projection: {}", name);
164        self.projections.push(projection);
165        self.metrics
166            .projections_total
167            .set(self.projections.len() as i64);
168    }
169
170    /// Process an event through all projections
171    #[cfg_attr(feature = "hotpath", hotpath::measure)]
172    pub fn process_event(&self, event: &Event) -> Result<()> {
173        let timer = self.metrics.projection_duration_seconds.start_timer();
174
175        for projection in &self.projections {
176            let name = projection.name();
177
178            match projection.process(event) {
179                Ok(()) => {
180                    self.metrics
181                        .projection_events_processed
182                        .with_label_values(&[name])
183                        .inc();
184                }
185                Err(e) => {
186                    self.metrics
187                        .projection_errors_total
188                        .with_label_values(&[name])
189                        .inc();
190                    tracing::error!(
191                        "Projection '{}' failed to process event {}: {}",
192                        name,
193                        event.id,
194                        e
195                    );
196                    // Continue processing other projections even if one fails
197                }
198            }
199        }
200
201        timer.observe_duration();
202        Ok(())
203    }
204
205    /// Get a projection by name
206    pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
207        self.projections.iter().find(|p| p.name() == name).cloned()
208    }
209
210    /// List all projections
211    pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
212        self.projections
213            .iter()
214            .map(|p| (p.name().to_string(), Arc::clone(p)))
215            .collect()
216    }
217
218    /// Clear all projections
219    pub fn clear_all(&self) {
220        for projection in &self.projections {
221            projection.clear();
222        }
223    }
224}
225
226impl Default for ProjectionManager {
227    fn default() -> Self {
228        Self::new()
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use uuid::Uuid;
236
237    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
238        Event::reconstruct_from_strings(
239            Uuid::new_v4(),
240            event_type.to_string(),
241            entity_id.to_string(),
242            "default".to_string(),
243            serde_json::json!({
244                "name": "Test User",
245                "email": "test@example.com"
246            }),
247            chrono::Utc::now(),
248            None,
249            1,
250        )
251    }
252
253    #[test]
254    fn test_entity_snapshot_projection() {
255        let projection = EntitySnapshotProjection::new("test");
256        let event = create_test_event("user-123", "user.created");
257
258        projection.process(&event).unwrap();
259
260        let state = projection.get_state("user-123").unwrap();
261        assert_eq!(state["name"], "Test User");
262    }
263
264    #[test]
265    fn test_event_counter_projection() {
266        let projection = EventCounterProjection::new("counter");
267
268        let event1 = create_test_event("user-123", "user.created");
269        let event2 = create_test_event("user-456", "user.created");
270        let event3 = create_test_event("user-123", "user.updated");
271
272        projection.process(&event1).unwrap();
273        projection.process(&event2).unwrap();
274        projection.process(&event3).unwrap();
275
276        assert_eq!(projection.get_count("user.created"), 2);
277        assert_eq!(projection.get_count("user.updated"), 1);
278    }
279
280    #[test]
281    fn test_projection_manager() {
282        let mut manager = ProjectionManager::new();
283
284        let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
285        let counter = Arc::new(EventCounterProjection::new("counter"));
286
287        manager.register(snapshot.clone());
288        manager.register(counter.clone());
289
290        let event = create_test_event("user-123", "user.created");
291        manager.process_event(&event).unwrap();
292
293        assert!(snapshot.get_state("user-123").is_some());
294        assert_eq!(counter.get_count("user.created"), 1);
295    }
296}