allframe_core/cqrs/
projection_registry.rs

1//! Projection Registry for automatic projection lifecycle management
2//!
3//! The ProjectionRegistry eliminates projection boilerplate by providing:
4//! - Automatic event subscription
5//! - Consistency tracking
6//! - Rebuild functionality
7//! - Multi-projection coordination
8
9use std::{collections::HashMap, marker::PhantomData, sync::Arc};
10
11use tokio::sync::{mpsc, RwLock};
12
13use super::{Event, EventStore, EventStoreBackend, Projection};
14
15/// Position tracker for projection consistency
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct ProjectionPosition {
18    /// Last event version processed by this projection
19    pub version: u64,
20    /// Timestamp of last update
21    pub updated_at: std::time::SystemTime,
22}
23
24impl ProjectionPosition {
25    /// Create a new position at version 0
26    pub fn initial() -> Self {
27        Self {
28            version: 0,
29            updated_at: std::time::SystemTime::now(),
30        }
31    }
32
33    /// Update to a new version
34    pub fn advance(&mut self, version: u64) {
35        self.version = version;
36        self.updated_at = std::time::SystemTime::now();
37    }
38}
39
40/// Projection metadata for tracking and management
41#[derive(Debug, Clone)]
42pub struct ProjectionMetadata {
43    /// Unique name of the projection
44    pub name: String,
45    /// Current position in event stream
46    pub position: ProjectionPosition,
47    /// Whether projection is currently rebuilding
48    pub rebuilding: bool,
49}
50
51/// Type-erased projection wrapper for registry storage
52trait ErasedProjection<E: Event>: Send + Sync {
53    /// Apply an event to the projection
54    fn apply_event(&mut self, event: &E);
55    /// Get projection name
56    fn name(&self) -> &str;
57    /// Get current position
58    fn position(&self) -> ProjectionPosition;
59    /// Mark as rebuilding
60    fn set_rebuilding(&mut self, rebuilding: bool);
61}
62
63/// Concrete wrapper for projections
64struct ProjectionWrapper<P: Projection> {
65    projection: P,
66    metadata: ProjectionMetadata,
67}
68
69impl<P: Projection> ErasedProjection<P::Event> for ProjectionWrapper<P> {
70    fn apply_event(&mut self, event: &P::Event) {
71        self.projection.apply(event);
72        self.metadata
73            .position
74            .advance(self.metadata.position.version + 1);
75    }
76
77    fn name(&self) -> &str {
78        &self.metadata.name
79    }
80
81    fn position(&self) -> ProjectionPosition {
82        self.metadata.position
83    }
84
85    fn set_rebuilding(&mut self, rebuilding: bool) {
86        self.metadata.rebuilding = rebuilding;
87    }
88}
89
90/// Type alias for projection storage
91type ProjectionMap<E> = HashMap<String, Box<dyn ErasedProjection<E>>>;
92
93/// Projection Registry for managing multiple projections
94pub struct ProjectionRegistry<E: Event, B: EventStoreBackend<E>> {
95    projections: Arc<RwLock<ProjectionMap<E>>>,
96    event_store: Arc<EventStore<E, B>>,
97    _phantom: PhantomData<E>,
98}
99
100impl<E: Event, B: EventStoreBackend<E>> ProjectionRegistry<E, B> {
101    /// Create a new projection registry
102    pub fn new(event_store: EventStore<E, B>) -> Self {
103        Self {
104            projections: Arc::new(RwLock::new(HashMap::new())),
105            event_store: Arc::new(event_store),
106            _phantom: PhantomData,
107        }
108    }
109
110    /// Register a projection with automatic event subscription
111    pub async fn register<P: Projection<Event = E> + 'static>(
112        &self,
113        name: impl Into<String>,
114        projection: P,
115    ) {
116        let name = name.into();
117        let wrapper = ProjectionWrapper {
118            projection,
119            metadata: ProjectionMetadata {
120                name: name.clone(),
121                position: ProjectionPosition::initial(),
122                rebuilding: false,
123            },
124        };
125
126        let mut projections = self.projections.write().await;
127        projections.insert(name, Box::new(wrapper));
128    }
129
130    /// Get a projection by name
131    pub async fn get<P: Projection<Event = E> + 'static>(
132        &self,
133        _name: &str,
134    ) -> Option<Arc<RwLock<P>>> {
135        // Note: This is a simplified version. In a real implementation,
136        // we'd need to downcast the type-erased projection back to P.
137        // For now, we'll focus on the core functionality.
138        None
139    }
140
141    /// Rebuild a specific projection from scratch
142    pub async fn rebuild(&self, name: &str) -> Result<(), String> {
143        // Mark projection as rebuilding
144        {
145            let mut projections = self.projections.write().await;
146            if let Some(projection) = projections.get_mut(name) {
147                projection.set_rebuilding(true);
148            } else {
149                return Err(format!("Projection '{}' not found", name));
150            }
151        }
152
153        // Get all events from event store
154        let events = self.event_store.get_all_events().await?;
155
156        // Apply all events to the projection
157        {
158            let mut projections = self.projections.write().await;
159            if let Some(projection) = projections.get_mut(name) {
160                for event in events {
161                    projection.apply_event(&event);
162                }
163                projection.set_rebuilding(false);
164            }
165        }
166
167        Ok(())
168    }
169
170    /// Rebuild all projections from scratch
171    pub async fn rebuild_all(&self) -> Result<(), String> {
172        let projection_names: Vec<String> = {
173            let projections = self.projections.read().await;
174            projections.keys().cloned().collect()
175        };
176
177        for name in projection_names {
178            self.rebuild(&name).await?;
179        }
180
181        Ok(())
182    }
183
184    /// Get metadata for a projection
185    pub async fn get_metadata(&self, name: &str) -> Option<ProjectionMetadata> {
186        let projections = self.projections.read().await;
187        projections.get(name).map(|p| ProjectionMetadata {
188            name: p.name().to_string(),
189            position: p.position(),
190            rebuilding: false, // Would need to track this properly
191        })
192    }
193
194    /// Get metadata for all projections
195    pub async fn get_all_metadata(&self) -> Vec<ProjectionMetadata> {
196        let projections = self.projections.read().await;
197        projections
198            .values()
199            .map(|p| ProjectionMetadata {
200                name: p.name().to_string(),
201                position: p.position(),
202                rebuilding: false,
203            })
204            .collect()
205    }
206
207    /// Subscribe projections to new events
208    pub async fn start_subscription(&self) -> Result<(), String> {
209        let (tx, mut rx) = mpsc::channel::<E>(100);
210
211        // Subscribe to event store
212        self.event_store.subscribe(tx).await;
213
214        // Spawn task to handle incoming events
215        let projections = Arc::clone(&self.projections);
216        tokio::spawn(async move {
217            while let Some(event) = rx.recv().await {
218                let mut projections = projections.write().await;
219                for projection in projections.values_mut() {
220                    projection.apply_event(&event);
221                }
222            }
223        });
224
225        Ok(())
226    }
227
228    /// Get number of registered projections
229    pub async fn count(&self) -> usize {
230        self.projections.read().await.len()
231    }
232}
233
234impl<E: Event, B: EventStoreBackend<E>> Clone for ProjectionRegistry<E, B> {
235    fn clone(&self) -> Self {
236        Self {
237            projections: Arc::clone(&self.projections),
238            event_store: Arc::clone(&self.event_store),
239            _phantom: PhantomData,
240        }
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use crate::cqrs::InMemoryBackend;
248
249    #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
250    enum TestEvent {
251        Created { id: String, value: i32 },
252        Updated { id: String, value: i32 },
253    }
254
255    impl Event for TestEvent {}
256
257    struct TestProjection {
258        data: HashMap<String, i32>,
259    }
260
261    impl TestProjection {
262        fn new() -> Self {
263            Self {
264                data: HashMap::new(),
265            }
266        }
267
268        #[allow(dead_code)]
269        fn get(&self, id: &str) -> Option<i32> {
270            self.data.get(id).copied()
271        }
272    }
273
274    impl Projection for TestProjection {
275        type Event = TestEvent;
276
277        fn apply(&mut self, event: &Self::Event) {
278            match event {
279                TestEvent::Created { id, value } => {
280                    self.data.insert(id.clone(), *value);
281                }
282                TestEvent::Updated { id, value } => {
283                    self.data.insert(id.clone(), *value);
284                }
285            }
286        }
287    }
288
289    #[tokio::test]
290    async fn test_projection_registration() {
291        let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
292        let registry = ProjectionRegistry::new(store);
293
294        registry
295            .register("test-projection", TestProjection::new())
296            .await;
297
298        assert_eq!(registry.count().await, 1);
299    }
300
301    #[tokio::test]
302    async fn test_projection_rebuild() {
303        let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
304
305        // Add some events
306        store
307            .append(
308                "test-1",
309                vec![
310                    TestEvent::Created {
311                        id: "1".to_string(),
312                        value: 10,
313                    },
314                    TestEvent::Updated {
315                        id: "1".to_string(),
316                        value: 20,
317                    },
318                ],
319            )
320            .await
321            .unwrap();
322
323        let registry = ProjectionRegistry::new(store);
324        registry
325            .register("test-projection", TestProjection::new())
326            .await;
327
328        // Rebuild projection
329        registry.rebuild("test-projection").await.unwrap();
330
331        // Verify metadata
332        let metadata = registry.get_metadata("test-projection").await.unwrap();
333        assert_eq!(metadata.position.version, 2);
334    }
335
336    #[tokio::test]
337    async fn test_projection_metadata() {
338        let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
339        let registry = ProjectionRegistry::new(store);
340
341        registry.register("proj-1", TestProjection::new()).await;
342        registry.register("proj-2", TestProjection::new()).await;
343
344        let all_metadata = registry.get_all_metadata().await;
345        assert_eq!(all_metadata.len(), 2);
346
347        let metadata = registry.get_metadata("proj-1").await.unwrap();
348        assert_eq!(metadata.name, "proj-1");
349        assert_eq!(metadata.position.version, 0);
350    }
351
352    #[tokio::test]
353    async fn test_rebuild_all() {
354        let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
355
356        // Add events
357        store
358            .append(
359                "test",
360                vec![TestEvent::Created {
361                    id: "1".to_string(),
362                    value: 100,
363                }],
364            )
365            .await
366            .unwrap();
367
368        let registry = ProjectionRegistry::new(store);
369        registry.register("proj-1", TestProjection::new()).await;
370        registry.register("proj-2", TestProjection::new()).await;
371
372        // Rebuild all
373        registry.rebuild_all().await.unwrap();
374
375        // Both projections should have processed the event
376        let meta1 = registry.get_metadata("proj-1").await.unwrap();
377        let meta2 = registry.get_metadata("proj-2").await.unwrap();
378
379        assert_eq!(meta1.position.version, 1);
380        assert_eq!(meta2.position.version, 1);
381    }
382}