1use crate::{
2 domain::entities::Event, error::Result, infrastructure::observability::metrics::MetricsRegistry,
3};
4use chrono::{DateTime, Utc};
5use dashmap::DashMap;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9 path::PathBuf,
10 sync::{
11 Arc,
12 atomic::{AtomicU64, Ordering},
13 },
14 time::Instant,
15};
16
17pub trait Projection: Send + Sync {
19 fn name(&self) -> &str;
21
22 fn process(&self, event: &Event) -> Result<()>;
24
25 fn get_state(&self, entity_id: &str) -> Option<Value>;
27
28 fn clear(&self);
30
31 fn snapshot(&self) -> Option<Value> {
34 None
35 }
36
37 fn restore(&self, _snapshot: &Value) -> Result<()> {
39 Ok(())
40 }
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ProjectionCheckpoint {
46 pub projection_name: String,
47 pub state: Value,
48 pub last_event_timestamp: DateTime<Utc>,
49 pub event_count: u64,
50}
51
52#[derive(Debug, Clone)]
54pub struct CheckpointConfig {
55 pub enabled: bool,
56 pub interval_events: u64,
58 pub interval_seconds: u64,
60}
61
62impl Default for CheckpointConfig {
63 fn default() -> Self {
64 Self {
65 enabled: true,
66 interval_events: 10_000,
67 interval_seconds: 300,
68 }
69 }
70}
71
72struct ProjectionState {
74 projection: Arc<dyn Projection>,
75 events_since_checkpoint: AtomicU64,
76 total_event_count: AtomicU64,
77 restored_up_to: parking_lot::Mutex<Option<DateTime<Utc>>>,
80}
81
82pub struct EntitySnapshotProjection {
84 name: String,
85 states: Arc<DashMap<String, (Value, DateTime<Utc>)>>,
87}
88
89impl EntitySnapshotProjection {
90 pub fn new(name: impl Into<String>) -> Self {
91 Self {
92 name: name.into(),
93 states: Arc::new(DashMap::new()),
94 }
95 }
96
97 pub fn get_all_states(&self) -> Vec<(String, Value)> {
99 self.states
100 .iter()
101 .map(|entry| (entry.key().clone(), entry.value().0.clone()))
102 .collect()
103 }
104}
105
106impl Projection for EntitySnapshotProjection {
107 fn name(&self) -> &str {
108 &self.name
109 }
110
111 fn process(&self, event: &Event) -> Result<()> {
112 self.states
118 .entry(event.entity_id_str().to_string())
119 .and_modify(|(state, last_ts)| {
120 if event.timestamp >= *last_ts {
121 if let (Value::Object(map), Value::Object(payload_map)) =
123 (state, &event.payload)
124 {
125 for (key, value) in payload_map {
126 map.insert(key.clone(), value.clone());
127 }
128 }
129 *last_ts = event.timestamp;
130 }
131 })
133 .or_insert_with(|| (event.payload.clone(), event.timestamp));
134
135 Ok(())
136 }
137
138 fn get_state(&self, entity_id: &str) -> Option<Value> {
139 self.states.get(entity_id).map(|v| v.0.clone())
140 }
141
142 fn clear(&self) {
143 self.states.clear();
144 }
145
146 fn snapshot(&self) -> Option<Value> {
147 let entries: Vec<(String, Value, DateTime<Utc>)> = self
148 .states
149 .iter()
150 .map(|entry| {
151 let (state, ts) = entry.value().clone();
152 (entry.key().clone(), state, ts)
153 })
154 .collect();
155 serde_json::to_value(entries).ok()
156 }
157
158 fn restore(&self, snapshot: &Value) -> Result<()> {
159 let entries: Vec<(String, Value, DateTime<Utc>)> = serde_json::from_value(snapshot.clone())
160 .map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
161 self.states.clear();
162 for (key, state, ts) in entries {
163 self.states.insert(key, (state, ts));
164 }
165 Ok(())
166 }
167}
168
169pub struct EventCounterProjection {
171 name: String,
172 counts: Arc<DashMap<String, u64>>,
174}
175
176impl EventCounterProjection {
177 pub fn new(name: impl Into<String>) -> Self {
178 Self {
179 name: name.into(),
180 counts: Arc::new(DashMap::new()),
181 }
182 }
183
184 pub fn get_count(&self, event_type: &str) -> u64 {
186 self.counts.get(event_type).map_or(0, |v| *v)
187 }
188
189 pub fn get_all_counts(&self) -> Vec<(String, u64)> {
191 self.counts
192 .iter()
193 .map(|entry| (entry.key().clone(), *entry.value()))
194 .collect()
195 }
196}
197
198impl Projection for EventCounterProjection {
199 fn name(&self) -> &str {
200 &self.name
201 }
202
203 fn process(&self, event: &Event) -> Result<()> {
204 self.counts
205 .entry(event.event_type_str().to_string())
206 .and_modify(|count| *count += 1)
207 .or_insert(1);
208
209 Ok(())
210 }
211
212 fn get_state(&self, event_type: &str) -> Option<Value> {
213 self.counts
214 .get(event_type)
215 .map(|count| serde_json::json!({ "count": *count }))
216 }
217
218 fn clear(&self) {
219 self.counts.clear();
220 }
221
222 fn snapshot(&self) -> Option<Value> {
223 let entries: Vec<(String, u64)> = self
224 .counts
225 .iter()
226 .map(|entry| (entry.key().clone(), *entry.value()))
227 .collect();
228 serde_json::to_value(entries).ok()
229 }
230
231 fn restore(&self, snapshot: &Value) -> Result<()> {
232 let entries: Vec<(String, u64)> = serde_json::from_value(snapshot.clone())
233 .map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
234 self.counts.clear();
235 for (key, count) in entries {
236 self.counts.insert(key, count);
237 }
238 Ok(())
239 }
240}
241
242pub struct ProjectionManager {
244 states: Vec<ProjectionState>,
245 metrics: Arc<MetricsRegistry>,
246 checkpoint_config: CheckpointConfig,
247 checkpoint_dir: Option<PathBuf>,
248 last_checkpoint_time: parking_lot::Mutex<Instant>,
249}
250
251impl ProjectionManager {
252 pub fn new() -> Self {
253 Self::with_metrics(MetricsRegistry::new())
254 }
255
256 pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
257 Self {
258 states: Vec::new(),
259 metrics,
260 checkpoint_config: CheckpointConfig::default(),
261 checkpoint_dir: None,
262 last_checkpoint_time: parking_lot::Mutex::new(Instant::now()),
263 }
264 }
265
266 pub fn with_checkpoint_config(mut self, config: CheckpointConfig, dir: PathBuf) -> Self {
268 self.checkpoint_config = config;
269 self.checkpoint_dir = Some(dir);
270 self
271 }
272
273 pub fn register(&mut self, projection: Arc<dyn Projection>) {
277 let name = projection.name().to_string();
278 tracing::info!("Registering projection: {name}");
279
280 let state = ProjectionState {
281 projection,
282 events_since_checkpoint: AtomicU64::new(0),
283 total_event_count: AtomicU64::new(0),
284 restored_up_to: parking_lot::Mutex::new(None),
285 };
286
287 if self.checkpoint_config.enabled
289 && let Some(path) = self.checkpoint_path(&name)
290 && path.exists()
291 {
292 self.try_restore_checkpoint(&path, state.projection.as_ref(), &state);
293 }
294
295 self.states.push(state);
296 self.metrics.projections_total.set(self.states.len() as i64);
297 }
298
299 fn try_restore_checkpoint(
301 &self,
302 path: &std::path::Path,
303 projection: &dyn Projection,
304 state: &ProjectionState,
305 ) {
306 let name = projection.name();
307 let data = match std::fs::read_to_string(path) {
308 Ok(d) => d,
309 Err(e) => {
310 tracing::warn!("Failed to read checkpoint file for projection '{name}': {e}");
311 return;
312 }
313 };
314 let checkpoint: ProjectionCheckpoint = match serde_json::from_str(&data) {
315 Ok(c) => c,
316 Err(e) => {
317 tracing::warn!("Failed to parse checkpoint for projection '{name}': {e}");
318 return;
319 }
320 };
321 match projection.restore(&checkpoint.state) {
322 Ok(()) => {
323 state
324 .total_event_count
325 .store(checkpoint.event_count, Ordering::Relaxed);
326 *state.restored_up_to.lock() = Some(checkpoint.last_event_timestamp);
327 let count = checkpoint.event_count;
328 let ts = checkpoint.last_event_timestamp;
329 tracing::info!(
330 "Restored projection '{name}' from checkpoint (event_count={count}, up_to={ts})",
331 );
332 }
333 Err(e) => {
334 tracing::warn!("Failed to restore projection '{name}' from checkpoint: {e}",);
335 }
336 }
337 }
338
339 #[cfg_attr(feature = "hotpath", hotpath::measure)]
343 pub fn process_event(&self, event: &Event) -> Result<()> {
344 let timer = self.metrics.projection_duration_seconds.start_timer();
345
346 for state in &self.states {
347 let name = state.projection.name();
348
349 {
351 let restored = state.restored_up_to.lock();
352 if restored.is_some_and(|up_to| event.timestamp <= up_to) {
353 continue;
354 }
355 }
356
357 match state.projection.process(event) {
358 Ok(()) => {
359 self.metrics
360 .projection_events_processed
361 .with_label_values(&[name])
362 .inc();
363 state
364 .events_since_checkpoint
365 .fetch_add(1, Ordering::Relaxed);
366 state.total_event_count.fetch_add(1, Ordering::Relaxed);
367 }
368 Err(e) => {
369 self.metrics
370 .projection_errors_total
371 .with_label_values(&[name])
372 .inc();
373 tracing::error!(
374 "Projection '{name}' failed to process event {}: {e}",
375 event.id,
376 );
377 }
379 }
380 }
381
382 if self.checkpoint_config.enabled && self.checkpoint_dir.is_some() {
384 self.maybe_checkpoint();
385 }
386
387 timer.observe_duration();
388 Ok(())
389 }
390
391 fn maybe_checkpoint(&self) {
394 let events_threshold = self.checkpoint_config.interval_events;
395 let time_threshold = self.checkpoint_config.interval_seconds;
396
397 let any_exceeded = self
398 .states
399 .iter()
400 .any(|s| s.events_since_checkpoint.load(Ordering::Relaxed) >= events_threshold);
401
402 let time_exceeded = {
403 let last = self.last_checkpoint_time.lock();
404 last.elapsed().as_secs() >= time_threshold
405 };
406
407 if (any_exceeded || time_exceeded) && self.checkpoint_all().is_err() {
408 tracing::error!("Failed to write projection checkpoints");
409 }
410 }
411
412 pub fn checkpoint_all(&self) -> Result<()> {
414 for state in &self.states {
415 self.checkpoint_one(state)?;
416 }
417 *self.last_checkpoint_time.lock() = Instant::now();
418 Ok(())
419 }
420
421 fn checkpoint_one(&self, state: &ProjectionState) -> Result<()> {
423 let name = state.projection.name();
424 let Some(snapshot) = state.projection.snapshot() else {
425 return Ok(());
426 };
427
428 let Some(path) = self.checkpoint_path(name) else {
429 return Ok(());
430 };
431
432 let checkpoint = ProjectionCheckpoint {
433 projection_name: name.to_string(),
434 state: snapshot,
435 last_event_timestamp: Utc::now(),
436 event_count: state.total_event_count.load(Ordering::Relaxed),
437 };
438
439 let json = serde_json::to_string_pretty(&checkpoint)
440 .map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
441
442 if let Some(parent) = path.parent() {
443 std::fs::create_dir_all(parent)
444 .map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
445 }
446
447 std::fs::write(&path, json)
448 .map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
449
450 state.events_since_checkpoint.store(0, Ordering::Relaxed);
451
452 tracing::debug!("Checkpointed projection '{name}'");
453 Ok(())
454 }
455
456 fn checkpoint_path(&self, name: &str) -> Option<PathBuf> {
458 self.checkpoint_dir
459 .as_ref()
460 .map(|dir| dir.join(format!("{name}.checkpoint.json")))
461 }
462
463 pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
465 self.states
466 .iter()
467 .find(|s| s.projection.name() == name)
468 .map(|s| Arc::clone(&s.projection))
469 }
470
471 pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
473 self.states
474 .iter()
475 .map(|s| (s.projection.name().to_string(), Arc::clone(&s.projection)))
476 .collect()
477 }
478
479 pub fn clear_all(&self) {
481 for state in &self.states {
482 state.projection.clear();
483 }
484 }
485
486 pub fn restored_up_to(&self, name: &str) -> Option<DateTime<Utc>> {
489 self.states
490 .iter()
491 .find(|s| s.projection.name() == name)
492 .and_then(|s| *s.restored_up_to.lock())
493 }
494}
495
496impl Default for ProjectionManager {
497 fn default() -> Self {
498 Self::new()
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use uuid::Uuid;
506
507 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
508 Event::reconstruct_from_strings(
509 Uuid::new_v4(),
510 event_type.to_string(),
511 entity_id.to_string(),
512 "default".to_string(),
513 serde_json::json!({
514 "name": "Test User",
515 "email": "test@example.com"
516 }),
517 chrono::Utc::now(),
518 None,
519 1,
520 )
521 }
522
523 fn create_test_event_with_timestamp(
524 entity_id: &str,
525 event_type: &str,
526 timestamp: DateTime<Utc>,
527 ) -> Event {
528 Event::reconstruct_from_strings(
529 Uuid::new_v4(),
530 event_type.to_string(),
531 entity_id.to_string(),
532 "default".to_string(),
533 serde_json::json!({
534 "name": "Test User",
535 "email": "test@example.com"
536 }),
537 timestamp,
538 None,
539 1,
540 )
541 }
542
543 #[test]
544 fn test_entity_snapshot_projection() {
545 let projection = EntitySnapshotProjection::new("test");
546 let event = create_test_event("user-123", "user.created");
547
548 projection.process(&event).unwrap();
549
550 let state = projection.get_state("user-123").unwrap();
551 assert_eq!(state["name"], "Test User");
552 }
553
554 #[test]
555 fn test_event_counter_projection() {
556 let projection = EventCounterProjection::new("counter");
557
558 let event1 = create_test_event("user-123", "user.created");
559 let event2 = create_test_event("user-456", "user.created");
560 let event3 = create_test_event("user-123", "user.updated");
561
562 projection.process(&event1).unwrap();
563 projection.process(&event2).unwrap();
564 projection.process(&event3).unwrap();
565
566 assert_eq!(projection.get_count("user.created"), 2);
567 assert_eq!(projection.get_count("user.updated"), 1);
568 }
569
570 #[test]
571 fn test_projection_manager() {
572 let mut manager = ProjectionManager::new();
573
574 let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
575 let counter = Arc::new(EventCounterProjection::new("counter"));
576
577 manager.register(snapshot.clone());
578 manager.register(counter.clone());
579
580 let event = create_test_event("user-123", "user.created");
581 manager.process_event(&event).unwrap();
582
583 assert!(snapshot.get_state("user-123").is_some());
584 assert_eq!(counter.get_count("user.created"), 1);
585 }
586
587 #[test]
588 fn test_entity_snapshot_snapshot_restore() {
589 let projection = EntitySnapshotProjection::new("snap");
590 let event = create_test_event("user-1", "user.created");
591 projection.process(&event).unwrap();
592
593 let snap = projection.snapshot().expect("snapshot should be Some");
594
595 let projection2 = EntitySnapshotProjection::new("snap");
596 projection2.restore(&snap).unwrap();
597
598 let state = projection2.get_state("user-1").unwrap();
599 assert_eq!(state["name"], "Test User");
600 }
601
602 #[test]
603 fn test_event_counter_snapshot_restore() {
604 let projection = EventCounterProjection::new("counter");
605 for _ in 0..5 {
606 let event = create_test_event("user-1", "user.created");
607 projection.process(&event).unwrap();
608 }
609
610 let snap = projection.snapshot().expect("snapshot should be Some");
611
612 let projection2 = EventCounterProjection::new("counter");
613 projection2.restore(&snap).unwrap();
614
615 assert_eq!(projection2.get_count("user.created"), 5);
616 }
617
618 #[test]
619 fn test_checkpoint_write_and_read() {
620 let dir = tempfile::tempdir().unwrap();
621 let checkpoint_dir = dir.path().join("projections");
622
623 let config = CheckpointConfig {
624 enabled: true,
625 interval_events: 100,
626 interval_seconds: 3600,
627 };
628
629 let mut manager =
630 ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir.clone());
631
632 let counter = Arc::new(EventCounterProjection::new("counter"));
633 manager.register(counter.clone());
634
635 for i in 0..50 {
637 let event = create_test_event(&format!("user-{i}"), "user.created");
638 manager.process_event(&event).unwrap();
639 }
640
641 manager.checkpoint_all().unwrap();
643
644 let cp_path = checkpoint_dir.join("counter.checkpoint.json");
646 assert!(cp_path.exists());
647
648 let data = std::fs::read_to_string(&cp_path).unwrap();
650 let checkpoint: ProjectionCheckpoint = serde_json::from_str(&data).unwrap();
651 assert_eq!(checkpoint.projection_name, "counter");
652 assert_eq!(checkpoint.event_count, 50);
653 }
654
655 #[test]
656 fn test_checkpoint_restore_on_register() {
657 let dir = tempfile::tempdir().unwrap();
658 let checkpoint_dir = dir.path().join("projections");
659
660 let config = CheckpointConfig {
661 enabled: true,
662 interval_events: 100_000,
663 interval_seconds: 3600,
664 };
665
666 {
668 let mut manager = ProjectionManager::new()
669 .with_checkpoint_config(config.clone(), checkpoint_dir.clone());
670
671 let counter = Arc::new(EventCounterProjection::new("counter"));
672 manager.register(counter.clone());
673
674 for i in 0..100 {
675 let event = create_test_event(&format!("user-{i}"), "user.created");
676 manager.process_event(&event).unwrap();
677 }
678
679 manager.checkpoint_all().unwrap();
680
681 assert_eq!(counter.get_count("user.created"), 100);
682 }
683
684 {
686 let mut manager =
687 ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir);
688
689 let counter = Arc::new(EventCounterProjection::new("counter"));
690 manager.register(counter.clone());
691
692 assert_eq!(counter.get_count("user.created"), 100);
694
695 assert!(manager.restored_up_to("counter").is_some());
698 }
699 }
700
701 #[test]
702 fn test_checkpoint_skips_old_events_during_replay() {
703 let dir = tempfile::tempdir().unwrap();
704 let checkpoint_dir = dir.path().join("projections");
705
706 let config = CheckpointConfig {
707 enabled: true,
708 interval_events: 100_000,
709 interval_seconds: 3600,
710 };
711
712 let checkpoint_time = Utc::now();
713
714 {
716 let mut manager = ProjectionManager::new()
717 .with_checkpoint_config(config.clone(), checkpoint_dir.clone());
718
719 let counter = Arc::new(EventCounterProjection::new("counter"));
720 manager.register(counter.clone());
721
722 for i in 0..1000 {
723 let event = create_test_event_with_timestamp(
724 &format!("user-{i}"),
725 "user.created",
726 checkpoint_time,
727 );
728 manager.process_event(&event).unwrap();
729 }
730
731 manager.checkpoint_all().unwrap();
732 assert_eq!(counter.get_count("user.created"), 1000);
733 }
734
735 {
737 let mut manager =
738 ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir);
739
740 let counter = Arc::new(EventCounterProjection::new("counter"));
741 manager.register(counter.clone());
742
743 assert_eq!(counter.get_count("user.created"), 1000);
745
746 for i in 0..1000 {
749 let event = create_test_event_with_timestamp(
750 &format!("user-{i}"),
751 "user.created",
752 checkpoint_time,
753 );
754 manager.process_event(&event).unwrap();
755 }
756
757 assert_eq!(counter.get_count("user.created"), 1000);
759
760 let later = checkpoint_time + chrono::Duration::seconds(10);
762 for i in 0..10 {
763 let event = create_test_event_with_timestamp(
764 &format!("new-user-{i}"),
765 "user.created",
766 later,
767 );
768 manager.process_event(&event).unwrap();
769 }
770
771 assert_eq!(counter.get_count("user.created"), 1010);
773 }
774 }
775
776 #[test]
777 fn test_auto_checkpoint_by_event_count() {
778 let dir = tempfile::tempdir().unwrap();
779 let checkpoint_dir = dir.path().join("projections");
780
781 let config = CheckpointConfig {
782 enabled: true,
783 interval_events: 50, interval_seconds: 3600,
785 };
786
787 let mut manager =
788 ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir.clone());
789
790 let counter = Arc::new(EventCounterProjection::new("counter"));
791 manager.register(counter.clone());
792
793 for i in 0..60 {
795 let event = create_test_event(&format!("user-{i}"), "user.created");
796 manager.process_event(&event).unwrap();
797 }
798
799 let cp_path = checkpoint_dir.join("counter.checkpoint.json");
801 assert!(cp_path.exists());
802 }
803
804 #[test]
805 fn test_default_noop_snapshot_restore() {
806 struct MinimalProjection;
809 impl Projection for MinimalProjection {
810 fn name(&self) -> &'static str {
811 "minimal"
812 }
813 fn process(&self, _event: &Event) -> Result<()> {
814 Ok(())
815 }
816 fn get_state(&self, _entity_id: &str) -> Option<Value> {
817 None
818 }
819 fn clear(&self) {}
820 }
821
822 let projection = MinimalProjection;
823 assert!(projection.snapshot().is_none());
824 assert!(projection.restore(&Value::Null).is_ok());
825 }
826
827 #[test]
828 fn test_checkpoint_with_no_dir_is_noop() {
829 let mut manager = ProjectionManager::new();
832 let counter = Arc::new(EventCounterProjection::new("counter"));
833 manager.register(counter.clone());
834
835 for _ in 0..10 {
836 let event = create_test_event("user-1", "user.created");
837 manager.process_event(&event).unwrap();
838 }
839
840 manager.checkpoint_all().unwrap();
842 }
843}