1use std::{collections::HashMap, marker::PhantomData, sync::Arc};
10
11use tokio::sync::{mpsc, RwLock};
12
13use super::{Event, EventStore, EventStoreBackend, Projection};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct ProjectionPosition {
18 pub version: u64,
20 pub updated_at: std::time::SystemTime,
22}
23
24impl ProjectionPosition {
25 pub fn initial() -> Self {
27 Self {
28 version: 0,
29 updated_at: std::time::SystemTime::now(),
30 }
31 }
32
33 pub fn advance(&mut self, version: u64) {
35 self.version = version;
36 self.updated_at = std::time::SystemTime::now();
37 }
38}
39
40#[derive(Debug, Clone)]
42pub struct ProjectionMetadata {
43 pub name: String,
45 pub position: ProjectionPosition,
47 pub rebuilding: bool,
49}
50
51trait ErasedProjection<E: Event>: Send + Sync {
53 fn apply_event(&mut self, event: &E);
55 fn name(&self) -> &str;
57 fn position(&self) -> ProjectionPosition;
59 fn set_rebuilding(&mut self, rebuilding: bool);
61}
62
63struct ProjectionWrapper<P: Projection> {
65 projection: P,
66 metadata: ProjectionMetadata,
67}
68
69impl<P: Projection> ErasedProjection<P::Event> for ProjectionWrapper<P> {
70 fn apply_event(&mut self, event: &P::Event) {
71 self.projection.apply(event);
72 self.metadata
73 .position
74 .advance(self.metadata.position.version + 1);
75 }
76
77 fn name(&self) -> &str {
78 &self.metadata.name
79 }
80
81 fn position(&self) -> ProjectionPosition {
82 self.metadata.position
83 }
84
85 fn set_rebuilding(&mut self, rebuilding: bool) {
86 self.metadata.rebuilding = rebuilding;
87 }
88}
89
90type ProjectionMap<E> = HashMap<String, Box<dyn ErasedProjection<E>>>;
92
93pub struct ProjectionRegistry<E: Event, B: EventStoreBackend<E>> {
95 projections: Arc<RwLock<ProjectionMap<E>>>,
96 event_store: Arc<EventStore<E, B>>,
97 _phantom: PhantomData<E>,
98}
99
100impl<E: Event, B: EventStoreBackend<E>> ProjectionRegistry<E, B> {
101 pub fn new(event_store: EventStore<E, B>) -> Self {
103 Self {
104 projections: Arc::new(RwLock::new(HashMap::new())),
105 event_store: Arc::new(event_store),
106 _phantom: PhantomData,
107 }
108 }
109
110 pub async fn register<P: Projection<Event = E> + 'static>(
112 &self,
113 name: impl Into<String>,
114 projection: P,
115 ) {
116 let name = name.into();
117 let wrapper = ProjectionWrapper {
118 projection,
119 metadata: ProjectionMetadata {
120 name: name.clone(),
121 position: ProjectionPosition::initial(),
122 rebuilding: false,
123 },
124 };
125
126 let mut projections = self.projections.write().await;
127 projections.insert(name, Box::new(wrapper));
128 }
129
130 pub async fn get<P: Projection<Event = E> + 'static>(
132 &self,
133 _name: &str,
134 ) -> Option<Arc<RwLock<P>>> {
135 None
139 }
140
141 pub async fn rebuild(&self, name: &str) -> Result<(), String> {
143 {
145 let mut projections = self.projections.write().await;
146 if let Some(projection) = projections.get_mut(name) {
147 projection.set_rebuilding(true);
148 } else {
149 return Err(format!("Projection '{}' not found", name));
150 }
151 }
152
153 let events = self.event_store.get_all_events().await?;
155
156 {
158 let mut projections = self.projections.write().await;
159 if let Some(projection) = projections.get_mut(name) {
160 for event in events {
161 projection.apply_event(&event);
162 }
163 projection.set_rebuilding(false);
164 }
165 }
166
167 Ok(())
168 }
169
170 pub async fn rebuild_all(&self) -> Result<(), String> {
172 let projection_names: Vec<String> = {
173 let projections = self.projections.read().await;
174 projections.keys().cloned().collect()
175 };
176
177 for name in projection_names {
178 self.rebuild(&name).await?;
179 }
180
181 Ok(())
182 }
183
184 pub async fn get_metadata(&self, name: &str) -> Option<ProjectionMetadata> {
186 let projections = self.projections.read().await;
187 projections.get(name).map(|p| ProjectionMetadata {
188 name: p.name().to_string(),
189 position: p.position(),
190 rebuilding: false, })
192 }
193
194 pub async fn get_all_metadata(&self) -> Vec<ProjectionMetadata> {
196 let projections = self.projections.read().await;
197 projections
198 .values()
199 .map(|p| ProjectionMetadata {
200 name: p.name().to_string(),
201 position: p.position(),
202 rebuilding: false,
203 })
204 .collect()
205 }
206
207 pub async fn start_subscription(&self) -> Result<(), String> {
209 let (tx, mut rx) = mpsc::channel::<E>(100);
210
211 self.event_store.subscribe(tx).await;
213
214 let projections = Arc::clone(&self.projections);
216 tokio::spawn(async move {
217 while let Some(event) = rx.recv().await {
218 let mut projections = projections.write().await;
219 for projection in projections.values_mut() {
220 projection.apply_event(&event);
221 }
222 }
223 });
224
225 Ok(())
226 }
227
228 pub async fn count(&self) -> usize {
230 self.projections.read().await.len()
231 }
232}
233
234impl<E: Event, B: EventStoreBackend<E>> Clone for ProjectionRegistry<E, B> {
235 fn clone(&self) -> Self {
236 Self {
237 projections: Arc::clone(&self.projections),
238 event_store: Arc::clone(&self.event_store),
239 _phantom: PhantomData,
240 }
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use crate::cqrs::{EventTypeName, InMemoryBackend};
248
249 #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
250 enum TestEvent {
251 Created { id: String, value: i32 },
252 Updated { id: String, value: i32 },
253 }
254
255 impl EventTypeName for TestEvent {}
256 impl Event for TestEvent {}
257
258 struct TestProjection {
259 data: HashMap<String, i32>,
260 }
261
262 impl TestProjection {
263 fn new() -> Self {
264 Self {
265 data: HashMap::new(),
266 }
267 }
268
269 #[allow(dead_code)]
270 fn get(&self, id: &str) -> Option<i32> {
271 self.data.get(id).copied()
272 }
273 }
274
275 impl Projection for TestProjection {
276 type Event = TestEvent;
277
278 fn apply(&mut self, event: &Self::Event) {
279 match event {
280 TestEvent::Created { id, value } => {
281 self.data.insert(id.clone(), *value);
282 }
283 TestEvent::Updated { id, value } => {
284 self.data.insert(id.clone(), *value);
285 }
286 }
287 }
288 }
289
290 #[tokio::test]
291 async fn test_projection_registration() {
292 let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
293 let registry = ProjectionRegistry::new(store);
294
295 registry
296 .register("test-projection", TestProjection::new())
297 .await;
298
299 assert_eq!(registry.count().await, 1);
300 }
301
302 #[tokio::test]
303 async fn test_projection_rebuild() {
304 let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
305
306 store
308 .append(
309 "test-1",
310 vec![
311 TestEvent::Created {
312 id: "1".to_string(),
313 value: 10,
314 },
315 TestEvent::Updated {
316 id: "1".to_string(),
317 value: 20,
318 },
319 ],
320 )
321 .await
322 .unwrap();
323
324 let registry = ProjectionRegistry::new(store);
325 registry
326 .register("test-projection", TestProjection::new())
327 .await;
328
329 registry.rebuild("test-projection").await.unwrap();
331
332 let metadata = registry.get_metadata("test-projection").await.unwrap();
334 assert_eq!(metadata.position.version, 2);
335 }
336
337 #[tokio::test]
338 async fn test_projection_metadata() {
339 let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
340 let registry = ProjectionRegistry::new(store);
341
342 registry.register("proj-1", TestProjection::new()).await;
343 registry.register("proj-2", TestProjection::new()).await;
344
345 let all_metadata = registry.get_all_metadata().await;
346 assert_eq!(all_metadata.len(), 2);
347
348 let metadata = registry.get_metadata("proj-1").await.unwrap();
349 assert_eq!(metadata.name, "proj-1");
350 assert_eq!(metadata.position.version, 0);
351 }
352
353 #[tokio::test]
354 async fn test_rebuild_all() {
355 let store = EventStore::<TestEvent, InMemoryBackend<TestEvent>>::new();
356
357 store
359 .append(
360 "test",
361 vec![TestEvent::Created {
362 id: "1".to_string(),
363 value: 100,
364 }],
365 )
366 .await
367 .unwrap();
368
369 let registry = ProjectionRegistry::new(store);
370 registry.register("proj-1", TestProjection::new()).await;
371 registry.register("proj-2", TestProjection::new()).await;
372
373 registry.rebuild_all().await.unwrap();
375
376 let meta1 = registry.get_metadata("proj-1").await.unwrap();
378 let meta2 = registry.get_metadata("proj-2").await.unwrap();
379
380 assert_eq!(meta1.position.version, 1);
381 assert_eq!(meta2.position.version, 1);
382 }
383}