Skip to main content

allsource_core/application/services/
projection.rs

1use crate::{
2    domain::entities::Event, error::Result, infrastructure::observability::metrics::MetricsRegistry,
3};
4use chrono::{DateTime, Utc};
5use dashmap::DashMap;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::{
9    path::PathBuf,
10    sync::{
11        Arc,
12        atomic::{AtomicU64, Ordering},
13    },
14    time::Instant,
15};
16
17/// A projection aggregates events into a queryable view
18pub trait Projection: Send + Sync {
19    /// Get the name of this projection
20    fn name(&self) -> &str;
21
22    /// Process an event and update the projection state
23    fn process(&self, event: &Event) -> Result<()>;
24
25    /// Get the current state of the projection for an entity
26    fn get_state(&self, entity_id: &str) -> Option<Value>;
27
28    /// Clear all projection state
29    fn clear(&self);
30
31    /// Snapshot the entire projection state for checkpointing.
32    /// Returns `None` if the projection does not support checkpointing.
33    fn snapshot(&self) -> Option<Value> {
34        None
35    }
36
37    /// Restore projection state from a previously saved snapshot.
38    fn restore(&self, _snapshot: &Value) -> Result<()> {
39        Ok(())
40    }
41}
42
43/// A checkpoint of a projection's state, persisted to disk.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ProjectionCheckpoint {
46    pub projection_name: String,
47    pub state: Value,
48    pub last_event_timestamp: DateTime<Utc>,
49    pub event_count: u64,
50}
51
52/// Configuration for projection checkpointing behaviour.
53#[derive(Debug, Clone)]
54pub struct CheckpointConfig {
55    pub enabled: bool,
56    /// Checkpoint after every N events processed by a projection.
57    pub interval_events: u64,
58    /// Checkpoint after M seconds since the last checkpoint.
59    pub interval_seconds: u64,
60}
61
62impl Default for CheckpointConfig {
63    fn default() -> Self {
64        Self {
65            enabled: true,
66            interval_events: 10_000,
67            interval_seconds: 300,
68        }
69    }
70}
71
72/// Per-projection tracking state for checkpointing.
73struct ProjectionState {
74    projection: Arc<dyn Projection>,
75    events_since_checkpoint: AtomicU64,
76    total_event_count: AtomicU64,
77    /// The timestamp of the checkpoint that was restored (if any).
78    /// Events at or before this timestamp are skipped during replay.
79    restored_up_to: parking_lot::Mutex<Option<DateTime<Utc>>>,
80}
81
82/// Entity snapshot projection - maintains current state of each entity
83pub struct EntitySnapshotProjection {
84    name: String,
85    /// entity_id -> (latest state, last event timestamp)
86    states: Arc<DashMap<String, (Value, DateTime<Utc>)>>,
87}
88
89impl EntitySnapshotProjection {
90    pub fn new(name: impl Into<String>) -> Self {
91        Self {
92            name: name.into(),
93            states: Arc::new(DashMap::new()),
94        }
95    }
96
97    /// Get all entity states
98    pub fn get_all_states(&self) -> Vec<(String, Value)> {
99        self.states
100            .iter()
101            .map(|entry| (entry.key().clone(), entry.value().0.clone()))
102            .collect()
103    }
104}
105
106impl Projection for EntitySnapshotProjection {
107    fn name(&self) -> &str {
108        &self.name
109    }
110
111    fn process(&self, event: &Event) -> Result<()> {
112        // Timestamp-aware merge strategy: only merge if the event is at least
113        // as new as the last processed event for this entity. This ensures
114        // convergence during bidirectional sync where events may arrive out of
115        // order. In the normal (non-sync) case, events are always in order so
116        // this condition is always true.
117        self.states
118            .entry(event.entity_id_str().to_string())
119            .and_modify(|(state, last_ts)| {
120                if event.timestamp >= *last_ts {
121                    // Merge the event payload into existing state
122                    if let (Value::Object(map), Value::Object(payload_map)) =
123                        (state, &event.payload)
124                    {
125                        for (key, value) in payload_map {
126                            map.insert(key.clone(), value.clone());
127                        }
128                    }
129                    *last_ts = event.timestamp;
130                }
131                // Out-of-order event: skip merge to ensure convergence
132            })
133            .or_insert_with(|| (event.payload.clone(), event.timestamp));
134
135        Ok(())
136    }
137
138    fn get_state(&self, entity_id: &str) -> Option<Value> {
139        self.states.get(entity_id).map(|v| v.0.clone())
140    }
141
142    fn clear(&self) {
143        self.states.clear();
144    }
145
146    fn snapshot(&self) -> Option<Value> {
147        let entries: Vec<(String, Value, DateTime<Utc>)> = self
148            .states
149            .iter()
150            .map(|entry| {
151                let (state, ts) = entry.value().clone();
152                (entry.key().clone(), state, ts)
153            })
154            .collect();
155        serde_json::to_value(entries).ok()
156    }
157
158    fn restore(&self, snapshot: &Value) -> Result<()> {
159        let entries: Vec<(String, Value, DateTime<Utc>)> = serde_json::from_value(snapshot.clone())
160            .map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
161        self.states.clear();
162        for (key, state, ts) in entries {
163            self.states.insert(key, (state, ts));
164        }
165        Ok(())
166    }
167}
168
169/// Event counter projection - counts events by type
170pub struct EventCounterProjection {
171    name: String,
172    /// event_type -> count
173    counts: Arc<DashMap<String, u64>>,
174}
175
176impl EventCounterProjection {
177    pub fn new(name: impl Into<String>) -> Self {
178        Self {
179            name: name.into(),
180            counts: Arc::new(DashMap::new()),
181        }
182    }
183
184    /// Get count for a specific event type
185    pub fn get_count(&self, event_type: &str) -> u64 {
186        self.counts.get(event_type).map_or(0, |v| *v)
187    }
188
189    /// Get all event type counts
190    pub fn get_all_counts(&self) -> Vec<(String, u64)> {
191        self.counts
192            .iter()
193            .map(|entry| (entry.key().clone(), *entry.value()))
194            .collect()
195    }
196}
197
198impl Projection for EventCounterProjection {
199    fn name(&self) -> &str {
200        &self.name
201    }
202
203    fn process(&self, event: &Event) -> Result<()> {
204        self.counts
205            .entry(event.event_type_str().to_string())
206            .and_modify(|count| *count += 1)
207            .or_insert(1);
208
209        Ok(())
210    }
211
212    fn get_state(&self, event_type: &str) -> Option<Value> {
213        self.counts
214            .get(event_type)
215            .map(|count| serde_json::json!({ "count": *count }))
216    }
217
218    fn clear(&self) {
219        self.counts.clear();
220    }
221
222    fn snapshot(&self) -> Option<Value> {
223        let entries: Vec<(String, u64)> = self
224            .counts
225            .iter()
226            .map(|entry| (entry.key().clone(), *entry.value()))
227            .collect();
228        serde_json::to_value(entries).ok()
229    }
230
231    fn restore(&self, snapshot: &Value) -> Result<()> {
232        let entries: Vec<(String, u64)> = serde_json::from_value(snapshot.clone())
233            .map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
234        self.counts.clear();
235        for (key, count) in entries {
236            self.counts.insert(key, count);
237        }
238        Ok(())
239    }
240}
241
242/// Projection manager handles multiple projections
243pub struct ProjectionManager {
244    states: Vec<ProjectionState>,
245    metrics: Arc<MetricsRegistry>,
246    checkpoint_config: CheckpointConfig,
247    checkpoint_dir: Option<PathBuf>,
248    last_checkpoint_time: parking_lot::Mutex<Instant>,
249}
250
251impl ProjectionManager {
252    pub fn new() -> Self {
253        Self::with_metrics(MetricsRegistry::new())
254    }
255
256    pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
257        Self {
258            states: Vec::new(),
259            metrics,
260            checkpoint_config: CheckpointConfig::default(),
261            checkpoint_dir: None,
262            last_checkpoint_time: parking_lot::Mutex::new(Instant::now()),
263        }
264    }
265
266    /// Set checkpoint configuration and directory.
267    pub fn with_checkpoint_config(mut self, config: CheckpointConfig, dir: PathBuf) -> Self {
268        self.checkpoint_config = config;
269        self.checkpoint_dir = Some(dir);
270        self
271    }
272
273    /// Register a new projection. If a checkpoint file exists on disk, restore
274    /// from it so that only events newer than `last_event_timestamp` need to be
275    /// replayed.
276    pub fn register(&mut self, projection: Arc<dyn Projection>) {
277        let name = projection.name().to_string();
278        tracing::info!("Registering projection: {name}");
279
280        let state = ProjectionState {
281            projection,
282            events_since_checkpoint: AtomicU64::new(0),
283            total_event_count: AtomicU64::new(0),
284            restored_up_to: parking_lot::Mutex::new(None),
285        };
286
287        // Attempt to restore from checkpoint
288        if self.checkpoint_config.enabled
289            && let Some(path) = self.checkpoint_path(&name)
290            && path.exists()
291        {
292            self.try_restore_checkpoint(&path, state.projection.as_ref(), &state);
293        }
294
295        self.states.push(state);
296        self.metrics.projections_total.set(self.states.len() as i64);
297    }
298
299    /// Try to read a checkpoint file and restore the projection from it.
300    fn try_restore_checkpoint(
301        &self,
302        path: &std::path::Path,
303        projection: &dyn Projection,
304        state: &ProjectionState,
305    ) {
306        let name = projection.name();
307        let data = match std::fs::read_to_string(path) {
308            Ok(d) => d,
309            Err(e) => {
310                tracing::warn!("Failed to read checkpoint file for projection '{name}': {e}");
311                return;
312            }
313        };
314        let checkpoint: ProjectionCheckpoint = match serde_json::from_str(&data) {
315            Ok(c) => c,
316            Err(e) => {
317                tracing::warn!("Failed to parse checkpoint for projection '{name}': {e}");
318                return;
319            }
320        };
321        match projection.restore(&checkpoint.state) {
322            Ok(()) => {
323                state
324                    .total_event_count
325                    .store(checkpoint.event_count, Ordering::Relaxed);
326                *state.restored_up_to.lock() = Some(checkpoint.last_event_timestamp);
327                let count = checkpoint.event_count;
328                let ts = checkpoint.last_event_timestamp;
329                tracing::info!(
330                    "Restored projection '{name}' from checkpoint (event_count={count}, up_to={ts})",
331                );
332            }
333            Err(e) => {
334                tracing::warn!("Failed to restore projection '{name}' from checkpoint: {e}",);
335            }
336        }
337    }
338
339    /// Process an event through all projections.
340    /// Events at or before a projection's restored checkpoint timestamp are
341    /// skipped (they were already folded into the restored state).
342    #[cfg_attr(feature = "hotpath", hotpath::measure)]
343    pub fn process_event(&self, event: &Event) -> Result<()> {
344        let timer = self.metrics.projection_duration_seconds.start_timer();
345
346        for state in &self.states {
347            let name = state.projection.name();
348
349            // Skip events that are already covered by the restored checkpoint.
350            {
351                let restored = state.restored_up_to.lock();
352                if restored.is_some_and(|up_to| event.timestamp <= up_to) {
353                    continue;
354                }
355            }
356
357            match state.projection.process(event) {
358                Ok(()) => {
359                    self.metrics
360                        .projection_events_processed
361                        .with_label_values(&[name])
362                        .inc();
363                    state
364                        .events_since_checkpoint
365                        .fetch_add(1, Ordering::Relaxed);
366                    state.total_event_count.fetch_add(1, Ordering::Relaxed);
367                }
368                Err(e) => {
369                    self.metrics
370                        .projection_errors_total
371                        .with_label_values(&[name])
372                        .inc();
373                    tracing::error!(
374                        "Projection '{name}' failed to process event {}: {e}",
375                        event.id,
376                    );
377                    // Continue processing other projections even if one fails
378                }
379            }
380        }
381
382        // Check if we should write a checkpoint (by event count or time).
383        if self.checkpoint_config.enabled && self.checkpoint_dir.is_some() {
384            self.maybe_checkpoint();
385        }
386
387        timer.observe_duration();
388        Ok(())
389    }
390
391    /// Check whether the event or time interval threshold has been reached
392    /// and checkpoint all projections if so.
393    fn maybe_checkpoint(&self) {
394        let events_threshold = self.checkpoint_config.interval_events;
395        let time_threshold = self.checkpoint_config.interval_seconds;
396
397        let any_exceeded = self
398            .states
399            .iter()
400            .any(|s| s.events_since_checkpoint.load(Ordering::Relaxed) >= events_threshold);
401
402        let time_exceeded = {
403            let last = self.last_checkpoint_time.lock();
404            last.elapsed().as_secs() >= time_threshold
405        };
406
407        if (any_exceeded || time_exceeded) && self.checkpoint_all().is_err() {
408            tracing::error!("Failed to write projection checkpoints");
409        }
410    }
411
412    /// Checkpoint all projections that support snapshotting.
413    pub fn checkpoint_all(&self) -> Result<()> {
414        for state in &self.states {
415            self.checkpoint_one(state)?;
416        }
417        *self.last_checkpoint_time.lock() = Instant::now();
418        Ok(())
419    }
420
421    /// Checkpoint a single projection.
422    fn checkpoint_one(&self, state: &ProjectionState) -> Result<()> {
423        let name = state.projection.name();
424        let Some(snapshot) = state.projection.snapshot() else {
425            return Ok(());
426        };
427
428        let Some(path) = self.checkpoint_path(name) else {
429            return Ok(());
430        };
431
432        let checkpoint = ProjectionCheckpoint {
433            projection_name: name.to_string(),
434            state: snapshot,
435            last_event_timestamp: Utc::now(),
436            event_count: state.total_event_count.load(Ordering::Relaxed),
437        };
438
439        let json = serde_json::to_string_pretty(&checkpoint)
440            .map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
441
442        if let Some(parent) = path.parent() {
443            std::fs::create_dir_all(parent)
444                .map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
445        }
446
447        std::fs::write(&path, json)
448            .map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
449
450        state.events_since_checkpoint.store(0, Ordering::Relaxed);
451
452        tracing::debug!("Checkpointed projection '{name}'");
453        Ok(())
454    }
455
456    /// Build the checkpoint file path for a projection.
457    fn checkpoint_path(&self, name: &str) -> Option<PathBuf> {
458        self.checkpoint_dir
459            .as_ref()
460            .map(|dir| dir.join(format!("{name}.checkpoint.json")))
461    }
462
463    /// Get a projection by name
464    pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
465        self.states
466            .iter()
467            .find(|s| s.projection.name() == name)
468            .map(|s| Arc::clone(&s.projection))
469    }
470
471    /// List all projections
472    pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
473        self.states
474            .iter()
475            .map(|s| (s.projection.name().to_string(), Arc::clone(&s.projection)))
476            .collect()
477    }
478
479    /// Clear all projections
480    pub fn clear_all(&self) {
481        for state in &self.states {
482            state.projection.clear();
483        }
484    }
485
486    /// Return the restored-up-to timestamp for a given projection (if any).
487    /// Useful in tests to verify that a checkpoint was restored.
488    pub fn restored_up_to(&self, name: &str) -> Option<DateTime<Utc>> {
489        self.states
490            .iter()
491            .find(|s| s.projection.name() == name)
492            .and_then(|s| *s.restored_up_to.lock())
493    }
494}
495
496impl Default for ProjectionManager {
497    fn default() -> Self {
498        Self::new()
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use uuid::Uuid;
506
507    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
508        Event::reconstruct_from_strings(
509            Uuid::new_v4(),
510            event_type.to_string(),
511            entity_id.to_string(),
512            "default".to_string(),
513            serde_json::json!({
514                "name": "Test User",
515                "email": "test@example.com"
516            }),
517            chrono::Utc::now(),
518            None,
519            1,
520        )
521    }
522
523    fn create_test_event_with_timestamp(
524        entity_id: &str,
525        event_type: &str,
526        timestamp: DateTime<Utc>,
527    ) -> Event {
528        Event::reconstruct_from_strings(
529            Uuid::new_v4(),
530            event_type.to_string(),
531            entity_id.to_string(),
532            "default".to_string(),
533            serde_json::json!({
534                "name": "Test User",
535                "email": "test@example.com"
536            }),
537            timestamp,
538            None,
539            1,
540        )
541    }
542
543    #[test]
544    fn test_entity_snapshot_projection() {
545        let projection = EntitySnapshotProjection::new("test");
546        let event = create_test_event("user-123", "user.created");
547
548        projection.process(&event).unwrap();
549
550        let state = projection.get_state("user-123").unwrap();
551        assert_eq!(state["name"], "Test User");
552    }
553
554    #[test]
555    fn test_event_counter_projection() {
556        let projection = EventCounterProjection::new("counter");
557
558        let event1 = create_test_event("user-123", "user.created");
559        let event2 = create_test_event("user-456", "user.created");
560        let event3 = create_test_event("user-123", "user.updated");
561
562        projection.process(&event1).unwrap();
563        projection.process(&event2).unwrap();
564        projection.process(&event3).unwrap();
565
566        assert_eq!(projection.get_count("user.created"), 2);
567        assert_eq!(projection.get_count("user.updated"), 1);
568    }
569
570    #[test]
571    fn test_projection_manager() {
572        let mut manager = ProjectionManager::new();
573
574        let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
575        let counter = Arc::new(EventCounterProjection::new("counter"));
576
577        manager.register(snapshot.clone());
578        manager.register(counter.clone());
579
580        let event = create_test_event("user-123", "user.created");
581        manager.process_event(&event).unwrap();
582
583        assert!(snapshot.get_state("user-123").is_some());
584        assert_eq!(counter.get_count("user.created"), 1);
585    }
586
587    #[test]
588    fn test_entity_snapshot_snapshot_restore() {
589        let projection = EntitySnapshotProjection::new("snap");
590        let event = create_test_event("user-1", "user.created");
591        projection.process(&event).unwrap();
592
593        let snap = projection.snapshot().expect("snapshot should be Some");
594
595        let projection2 = EntitySnapshotProjection::new("snap");
596        projection2.restore(&snap).unwrap();
597
598        let state = projection2.get_state("user-1").unwrap();
599        assert_eq!(state["name"], "Test User");
600    }
601
602    #[test]
603    fn test_event_counter_snapshot_restore() {
604        let projection = EventCounterProjection::new("counter");
605        for _ in 0..5 {
606            let event = create_test_event("user-1", "user.created");
607            projection.process(&event).unwrap();
608        }
609
610        let snap = projection.snapshot().expect("snapshot should be Some");
611
612        let projection2 = EventCounterProjection::new("counter");
613        projection2.restore(&snap).unwrap();
614
615        assert_eq!(projection2.get_count("user.created"), 5);
616    }
617
618    #[test]
619    fn test_checkpoint_write_and_read() {
620        let dir = tempfile::tempdir().unwrap();
621        let checkpoint_dir = dir.path().join("projections");
622
623        let config = CheckpointConfig {
624            enabled: true,
625            interval_events: 100,
626            interval_seconds: 3600,
627        };
628
629        let mut manager =
630            ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir.clone());
631
632        let counter = Arc::new(EventCounterProjection::new("counter"));
633        manager.register(counter.clone());
634
635        // Process some events
636        for i in 0..50 {
637            let event = create_test_event(&format!("user-{i}"), "user.created");
638            manager.process_event(&event).unwrap();
639        }
640
641        // Manually trigger checkpoint
642        manager.checkpoint_all().unwrap();
643
644        // Verify the file was written
645        let cp_path = checkpoint_dir.join("counter.checkpoint.json");
646        assert!(cp_path.exists());
647
648        // Read and verify the checkpoint content
649        let data = std::fs::read_to_string(&cp_path).unwrap();
650        let checkpoint: ProjectionCheckpoint = serde_json::from_str(&data).unwrap();
651        assert_eq!(checkpoint.projection_name, "counter");
652        assert_eq!(checkpoint.event_count, 50);
653    }
654
655    #[test]
656    fn test_checkpoint_restore_on_register() {
657        let dir = tempfile::tempdir().unwrap();
658        let checkpoint_dir = dir.path().join("projections");
659
660        let config = CheckpointConfig {
661            enabled: true,
662            interval_events: 100_000,
663            interval_seconds: 3600,
664        };
665
666        // Phase 1: process events and checkpoint
667        {
668            let mut manager = ProjectionManager::new()
669                .with_checkpoint_config(config.clone(), checkpoint_dir.clone());
670
671            let counter = Arc::new(EventCounterProjection::new("counter"));
672            manager.register(counter.clone());
673
674            for i in 0..100 {
675                let event = create_test_event(&format!("user-{i}"), "user.created");
676                manager.process_event(&event).unwrap();
677            }
678
679            manager.checkpoint_all().unwrap();
680
681            assert_eq!(counter.get_count("user.created"), 100);
682        }
683
684        // Phase 2: create new manager, register same projection, verify restore
685        {
686            let mut manager =
687                ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir);
688
689            let counter = Arc::new(EventCounterProjection::new("counter"));
690            manager.register(counter.clone());
691
692            // The projection should have been restored from the checkpoint
693            assert_eq!(counter.get_count("user.created"), 100);
694
695            // The manager should have recorded that events up to the checkpoint
696            // timestamp can be skipped
697            assert!(manager.restored_up_to("counter").is_some());
698        }
699    }
700
701    #[test]
702    fn test_checkpoint_skips_old_events_during_replay() {
703        let dir = tempfile::tempdir().unwrap();
704        let checkpoint_dir = dir.path().join("projections");
705
706        let config = CheckpointConfig {
707            enabled: true,
708            interval_events: 100_000,
709            interval_seconds: 3600,
710        };
711
712        let checkpoint_time = Utc::now();
713
714        // Phase 1: process 1000 events and checkpoint
715        {
716            let mut manager = ProjectionManager::new()
717                .with_checkpoint_config(config.clone(), checkpoint_dir.clone());
718
719            let counter = Arc::new(EventCounterProjection::new("counter"));
720            manager.register(counter.clone());
721
722            for i in 0..1000 {
723                let event = create_test_event_with_timestamp(
724                    &format!("user-{i}"),
725                    "user.created",
726                    checkpoint_time,
727                );
728                manager.process_event(&event).unwrap();
729            }
730
731            manager.checkpoint_all().unwrap();
732            assert_eq!(counter.get_count("user.created"), 1000);
733        }
734
735        // Phase 2: simulate restart — restore from checkpoint and replay
736        {
737            let mut manager =
738                ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir);
739
740            let counter = Arc::new(EventCounterProjection::new("counter"));
741            manager.register(counter.clone());
742
743            // Counter should be restored
744            assert_eq!(counter.get_count("user.created"), 1000);
745
746            // Replay the old 1000 events (should be skipped because their
747            // timestamp <= checkpoint timestamp)
748            for i in 0..1000 {
749                let event = create_test_event_with_timestamp(
750                    &format!("user-{i}"),
751                    "user.created",
752                    checkpoint_time,
753                );
754                manager.process_event(&event).unwrap();
755            }
756
757            // Count should still be 1000 — old events were skipped
758            assert_eq!(counter.get_count("user.created"), 1000);
759
760            // Now process 10 NEW events with a later timestamp
761            let later = checkpoint_time + chrono::Duration::seconds(10);
762            for i in 0..10 {
763                let event = create_test_event_with_timestamp(
764                    &format!("new-user-{i}"),
765                    "user.created",
766                    later,
767                );
768                manager.process_event(&event).unwrap();
769            }
770
771            // Now we should have 1010 total
772            assert_eq!(counter.get_count("user.created"), 1010);
773        }
774    }
775
776    #[test]
777    fn test_auto_checkpoint_by_event_count() {
778        let dir = tempfile::tempdir().unwrap();
779        let checkpoint_dir = dir.path().join("projections");
780
781        let config = CheckpointConfig {
782            enabled: true,
783            interval_events: 50, // checkpoint every 50 events
784            interval_seconds: 3600,
785        };
786
787        let mut manager =
788            ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir.clone());
789
790        let counter = Arc::new(EventCounterProjection::new("counter"));
791        manager.register(counter.clone());
792
793        // Process 60 events — should trigger auto-checkpoint at 50
794        for i in 0..60 {
795            let event = create_test_event(&format!("user-{i}"), "user.created");
796            manager.process_event(&event).unwrap();
797        }
798
799        // Checkpoint file should exist
800        let cp_path = checkpoint_dir.join("counter.checkpoint.json");
801        assert!(cp_path.exists());
802    }
803
804    #[test]
805    fn test_default_noop_snapshot_restore() {
806        // A projection that does NOT implement snapshot/restore should use the
807        // default no-op implementations and not break anything.
808        struct MinimalProjection;
809        impl Projection for MinimalProjection {
810            fn name(&self) -> &'static str {
811                "minimal"
812            }
813            fn process(&self, _event: &Event) -> Result<()> {
814                Ok(())
815            }
816            fn get_state(&self, _entity_id: &str) -> Option<Value> {
817                None
818            }
819            fn clear(&self) {}
820        }
821
822        let projection = MinimalProjection;
823        assert!(projection.snapshot().is_none());
824        assert!(projection.restore(&Value::Null).is_ok());
825    }
826
827    #[test]
828    fn test_checkpoint_with_no_dir_is_noop() {
829        // When no checkpoint_dir is configured, checkpoint operations should
830        // succeed silently (no-op).
831        let mut manager = ProjectionManager::new();
832        let counter = Arc::new(EventCounterProjection::new("counter"));
833        manager.register(counter.clone());
834
835        for _ in 0..10 {
836            let event = create_test_event("user-1", "user.created");
837            manager.process_event(&event).unwrap();
838        }
839
840        // Should not error even without a checkpoint directory
841        manager.checkpoint_all().unwrap();
842    }
843}