1use std::{collections::HashMap, marker::PhantomData, sync::Arc};
10
11use tokio::sync::{mpsc, RwLock};
12
13use super::{Event, EventStore, EventStoreBackend, Projection};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct ProjectionPosition {
18 pub version: u64,
20 pub updated_at: std::time::SystemTime,
22}
23
24impl ProjectionPosition {
25 pub fn initial() -> Self {
27 Self {
28 version: 0,
29 updated_at: std::time::SystemTime::now(),
30 }
31 }
32
33 pub fn advance(&mut self, version: u64) {
35 self.version = version;
36 self.updated_at = std::time::SystemTime::now();
37 }
38}
39
40#[derive(Debug, Clone)]
42pub struct ProjectionMetadata {
43 pub name: String,
45 pub position: ProjectionPosition,
47 pub rebuilding: bool,
49}
50
51trait ErasedProjection<E: Event>: Send + Sync {
53 fn apply_event(&mut self, event: &E);
55 fn name(&self) -> &str;
57 fn position(&self) -> ProjectionPosition;
59 fn set_rebuilding(&mut self, rebuilding: bool);
61}
62
63struct ProjectionWrapper<P: Projection> {
65 projection: P,
66 metadata: ProjectionMetadata,
67}
68
69impl<P: Projection> ErasedProjection<P::Event> for ProjectionWrapper<P> {
70 fn apply_event(&mut self, event: &P::Event) {
71 self.projection.apply(event);
72 self.metadata
73 .position
74 .advance(self.metadata.position.version + 1);
75 }
76
77 fn name(&self) -> &str {
78 &self.metadata.name
79 }
80
81 fn position(&self) -> ProjectionPosition {
82 self.metadata.position
83 }
84
85 fn set_rebuilding(&mut self, rebuilding: bool) {
86 self.metadata.rebuilding = rebuilding;
87 }
88}
89
90type ProjectionMap<E> = HashMap<String, Box<dyn ErasedProjection<E>>>;
92
93pub struct ProjectionRegistry<E: Event, B: EventStoreBackend<E>> {
95 projections: Arc<RwLock<ProjectionMap<E>>>,
96 event_store: Arc<EventStore<E, B>>,
97 _phantom: PhantomData<E>,
98}
99
100impl<E: Event, B: EventStoreBackend<E>> ProjectionRegistry<E, B> {
101 pub fn new(event_store: EventStore<E, B>) -> Self {
103 Self {
104 projections: Arc::new(RwLock::new(HashMap::new())),
105 event_store: Arc::new(event_store),
106 _phantom: PhantomData,
107 }
108 }
109
110 pub async fn register<P: Projection<Event = E> + 'static>(
112 &self,
113 name: impl Into<String>,
114 projection: P,
115 ) {
116 let name = name.into();
117 let wrapper = ProjectionWrapper {
118 projection,
119 metadata: ProjectionMetadata {
120 name: name.clone(),
121 position: ProjectionPosition::initial(),
122 rebuilding: false,
123 },
124 };
125
126 let mut projections = self.projections.write().await;
127 projections.insert(name, Box::new(wrapper));
128 }
129
130 pub async fn get<P: Projection<Event = E> + 'static>(
132 &self,
133 _name: &str,
134 ) -> Option<Arc<RwLock<P>>> {
135 None
139 }
140
141 pub async fn rebuild(&self, name: &str) -> Result<(), String> {
143 {
145 let mut projections = self.projections.write().await;
146 if let Some(projection) = projections.get_mut(name) {
147 projection.set_rebuilding(true);
148 } else {
149 return Err(format!("Projection '{}' not found", name));
150 }
151 }
152
153 let events = self.event_store.get_all_events().await?;
155
156 {
158 let mut projections = self.projections.write().await;
159 if let Some(projection) = projections.get_mut(name) {
160 for event in events {
161 projection.apply_event(&event);
162 }
163 projection.set_rebuilding(false);
164 }
165 }
166
167 Ok(())
168 }
169
170 pub async fn rebuild_all(&self) -> Result<(), String> {
172 let projection_names: Vec<String> = {
173 let projections = self.projections.read().await;
174 projections.keys().cloned().collect()
175 };
176
177 for name in projection_names {
178 self.rebuild(&name).await?;
179 }
180
181 Ok(())
182 }
183
184 pub async fn get_metadata(&self, name: &str) -> Option<ProjectionMetadata> {
186 let projections = self.projections.read().await;
187 projections.get(name).map(|p| ProjectionMetadata {
188 name: p.name().to_string(),
189 position: p.position(),
190 rebuilding: false, })
192 }
193
194 pub async fn get_all_metadata(&self) -> Vec<ProjectionMetadata> {
196 let projections = self.projections.read().await;
197 projections
198 .values()
199 .map(|p| ProjectionMetadata {
200 name: p.name().to_string(),
201 position: p.position(),
202 rebuilding: false,
203 })
204 .collect()
205 }
206
207 pub async fn start_subscription(&self) -> Result<(), String> {
209 let (tx, mut rx) = mpsc::channel::<E>(100);
210
211 self.event_store.subscribe(tx).await;
213
214 let projections = Arc::clone(&self.projections);
216 tokio::spawn(async move {
217 while let Some(event) = rx.recv().await {
218 let mut projections = projections.write().await;
219 for projection in projections.values_mut() {
220 projection.apply_event(&event);
221 }
222 }
223 });
224
225 Ok(())
226 }
227
228 pub async fn count(&self) -> usize {
230 self.projections.read().await.len()
231 }
232}
233
234impl<E: Event, B: EventStoreBackend<E>> Clone for ProjectionRegistry<E, B> {
235 fn clone(&self) -> Self {
236 Self {
237 projections: Arc::clone(&self.projections),
238 event_store: Arc::clone(&self.event_store),
239 _phantom: PhantomData,
240 }
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use crate::cqrs::InMemoryBackend;
248
249 #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
250 enum TestEvent {
251 Created { id: String, value: i32 },
252 Updated { id: String, value: i32 },
253 }
254
255 impl Event for TestEvent {}
256
257 struct TestProjection {
258 data: HashMap<String, i32>,
259 }
260
261 impl TestProjection {
262 fn new() -> Self {
263 Self {
264 data: HashMap::new(),
265 }
266 }
267
268 #[allow(dead_code)]
269 fn get(&self, id: &str) -> Option<i32> {
270 self.data.get(id).copied()
271 }
272 }
273
274 impl Projection for TestProjection {
275 type Event = TestEvent;
276
277 fn apply(&mut self, event: &Self::Event) {
278 match event {
279 TestEvent::Created { id, value } => {
280 self.data.insert(id.clone(), *value);
281 }
282 TestEvent::Updated { id, value } => {
283 self.data.insert(id.clone(), *value);
284 }
285 }
286 }
287 }
288
289 #[tokio::test]
290 async fn test_projection_registration() {
291 let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
292 let registry = ProjectionRegistry::new(store);
293
294 registry
295 .register("test-projection", TestProjection::new())
296 .await;
297
298 assert_eq!(registry.count().await, 1);
299 }
300
301 #[tokio::test]
302 async fn test_projection_rebuild() {
303 let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
304
305 store
307 .append(
308 "test-1",
309 vec![
310 TestEvent::Created {
311 id: "1".to_string(),
312 value: 10,
313 },
314 TestEvent::Updated {
315 id: "1".to_string(),
316 value: 20,
317 },
318 ],
319 )
320 .await
321 .unwrap();
322
323 let registry = ProjectionRegistry::new(store);
324 registry
325 .register("test-projection", TestProjection::new())
326 .await;
327
328 registry.rebuild("test-projection").await.unwrap();
330
331 let metadata = registry.get_metadata("test-projection").await.unwrap();
333 assert_eq!(metadata.position.version, 2);
334 }
335
336 #[tokio::test]
337 async fn test_projection_metadata() {
338 let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
339 let registry = ProjectionRegistry::new(store);
340
341 registry.register("proj-1", TestProjection::new()).await;
342 registry.register("proj-2", TestProjection::new()).await;
343
344 let all_metadata = registry.get_all_metadata().await;
345 assert_eq!(all_metadata.len(), 2);
346
347 let metadata = registry.get_metadata("proj-1").await.unwrap();
348 assert_eq!(metadata.name, "proj-1");
349 assert_eq!(metadata.position.version, 0);
350 }
351
352 #[tokio::test]
353 async fn test_rebuild_all() {
354 let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
355
356 store
358 .append(
359 "test",
360 vec![TestEvent::Created {
361 id: "1".to_string(),
362 value: 100,
363 }],
364 )
365 .await
366 .unwrap();
367
368 let registry = ProjectionRegistry::new(store);
369 registry.register("proj-1", TestProjection::new()).await;
370 registry.register("proj-2", TestProjection::new()).await;
371
372 registry.rebuild_all().await.unwrap();
374
375 let meta1 = registry.get_metadata("proj-1").await.unwrap();
377 let meta2 = registry.get_metadata("proj-2").await.unwrap();
378
379 assert_eq!(meta1.position.version, 1);
380 assert_eq!(meta2.position.version, 1);
381 }
382}