use crate::{
domain::entities::Event, error::Result, infrastructure::observability::metrics::MetricsRegistry,
};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
path::PathBuf,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Instant,
};
pub trait Projection: Send + Sync {
fn name(&self) -> &str;
fn process(&self, event: &Event) -> Result<()>;
fn get_state(&self, entity_id: &str) -> Option<Value>;
fn clear(&self);
fn snapshot(&self) -> Option<Value> {
None
}
fn restore(&self, _snapshot: &Value) -> Result<()> {
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProjectionCheckpoint {
pub projection_name: String,
pub state: Value,
pub last_event_timestamp: DateTime<Utc>,
pub event_count: u64,
}
#[derive(Debug, Clone)]
pub struct CheckpointConfig {
pub enabled: bool,
pub interval_events: u64,
pub interval_seconds: u64,
}
impl Default for CheckpointConfig {
fn default() -> Self {
Self {
enabled: true,
interval_events: 10_000,
interval_seconds: 300,
}
}
}
struct ProjectionState {
projection: Arc<dyn Projection>,
events_since_checkpoint: AtomicU64,
total_event_count: AtomicU64,
restored_up_to: parking_lot::Mutex<Option<DateTime<Utc>>>,
}
pub struct EntitySnapshotProjection {
name: String,
states: Arc<DashMap<String, (Value, DateTime<Utc>)>>,
}
impl EntitySnapshotProjection {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
states: Arc::new(DashMap::new()),
}
}
pub fn get_all_states(&self) -> Vec<(String, Value)> {
self.states
.iter()
.map(|entry| (entry.key().clone(), entry.value().0.clone()))
.collect()
}
}
impl Projection for EntitySnapshotProjection {
fn name(&self) -> &str {
&self.name
}
fn process(&self, event: &Event) -> Result<()> {
self.states
.entry(event.entity_id_str().to_string())
.and_modify(|(state, last_ts)| {
if event.timestamp >= *last_ts {
if let (Value::Object(map), Value::Object(payload_map)) =
(state, &event.payload)
{
for (key, value) in payload_map {
map.insert(key.clone(), value.clone());
}
}
*last_ts = event.timestamp;
}
})
.or_insert_with(|| (event.payload.clone(), event.timestamp));
Ok(())
}
fn get_state(&self, entity_id: &str) -> Option<Value> {
self.states.get(entity_id).map(|v| v.0.clone())
}
fn clear(&self) {
self.states.clear();
}
fn snapshot(&self) -> Option<Value> {
let entries: Vec<(String, Value, DateTime<Utc>)> = self
.states
.iter()
.map(|entry| {
let (state, ts) = entry.value().clone();
(entry.key().clone(), state, ts)
})
.collect();
serde_json::to_value(entries).ok()
}
fn restore(&self, snapshot: &Value) -> Result<()> {
let entries: Vec<(String, Value, DateTime<Utc>)> = serde_json::from_value(snapshot.clone())
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
self.states.clear();
for (key, state, ts) in entries {
self.states.insert(key, (state, ts));
}
Ok(())
}
}
pub struct EventCounterProjection {
name: String,
counts: Arc<DashMap<String, u64>>,
}
impl EventCounterProjection {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
counts: Arc::new(DashMap::new()),
}
}
pub fn get_count(&self, event_type: &str) -> u64 {
self.counts.get(event_type).map_or(0, |v| *v)
}
pub fn get_all_counts(&self) -> Vec<(String, u64)> {
self.counts
.iter()
.map(|entry| (entry.key().clone(), *entry.value()))
.collect()
}
}
impl Projection for EventCounterProjection {
fn name(&self) -> &str {
&self.name
}
fn process(&self, event: &Event) -> Result<()> {
self.counts
.entry(event.event_type_str().to_string())
.and_modify(|count| *count += 1)
.or_insert(1);
Ok(())
}
fn get_state(&self, event_type: &str) -> Option<Value> {
self.counts
.get(event_type)
.map(|count| serde_json::json!({ "count": *count }))
}
fn clear(&self) {
self.counts.clear();
}
fn snapshot(&self) -> Option<Value> {
let entries: Vec<(String, u64)> = self
.counts
.iter()
.map(|entry| (entry.key().clone(), *entry.value()))
.collect();
serde_json::to_value(entries).ok()
}
fn restore(&self, snapshot: &Value) -> Result<()> {
let entries: Vec<(String, u64)> = serde_json::from_value(snapshot.clone())
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
self.counts.clear();
for (key, count) in entries {
self.counts.insert(key, count);
}
Ok(())
}
}
pub struct ProjectionManager {
states: Vec<ProjectionState>,
metrics: Arc<MetricsRegistry>,
checkpoint_config: CheckpointConfig,
checkpoint_dir: Option<PathBuf>,
last_checkpoint_time: parking_lot::Mutex<Instant>,
}
impl ProjectionManager {
pub fn new() -> Self {
Self::with_metrics(MetricsRegistry::new())
}
pub fn with_metrics(metrics: Arc<MetricsRegistry>) -> Self {
Self {
states: Vec::new(),
metrics,
checkpoint_config: CheckpointConfig::default(),
checkpoint_dir: None,
last_checkpoint_time: parking_lot::Mutex::new(Instant::now()),
}
}
pub fn with_checkpoint_config(mut self, config: CheckpointConfig, dir: PathBuf) -> Self {
self.checkpoint_config = config;
self.checkpoint_dir = Some(dir);
self
}
pub fn register(&mut self, projection: Arc<dyn Projection>) {
let name = projection.name().to_string();
tracing::info!("Registering projection: {name}");
let state = ProjectionState {
projection,
events_since_checkpoint: AtomicU64::new(0),
total_event_count: AtomicU64::new(0),
restored_up_to: parking_lot::Mutex::new(None),
};
if self.checkpoint_config.enabled
&& let Some(path) = self.checkpoint_path(&name)
&& path.exists()
{
self.try_restore_checkpoint(&path, state.projection.as_ref(), &state);
}
self.states.push(state);
self.metrics.projections_total.set(self.states.len() as i64);
}
fn try_restore_checkpoint(
&self,
path: &std::path::Path,
projection: &dyn Projection,
state: &ProjectionState,
) {
let name = projection.name();
let data = match std::fs::read_to_string(path) {
Ok(d) => d,
Err(e) => {
tracing::warn!("Failed to read checkpoint file for projection '{name}': {e}");
return;
}
};
let checkpoint: ProjectionCheckpoint = match serde_json::from_str(&data) {
Ok(c) => c,
Err(e) => {
tracing::warn!("Failed to parse checkpoint for projection '{name}': {e}");
return;
}
};
match projection.restore(&checkpoint.state) {
Ok(()) => {
state
.total_event_count
.store(checkpoint.event_count, Ordering::Relaxed);
*state.restored_up_to.lock() = Some(checkpoint.last_event_timestamp);
let count = checkpoint.event_count;
let ts = checkpoint.last_event_timestamp;
tracing::info!(
"Restored projection '{name}' from checkpoint (event_count={count}, up_to={ts})",
);
}
Err(e) => {
tracing::warn!("Failed to restore projection '{name}' from checkpoint: {e}",);
}
}
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn process_event(&self, event: &Event) -> Result<()> {
let timer = self.metrics.projection_duration_seconds.start_timer();
for state in &self.states {
let name = state.projection.name();
{
let restored = state.restored_up_to.lock();
if restored.is_some_and(|up_to| event.timestamp <= up_to) {
continue;
}
}
match state.projection.process(event) {
Ok(()) => {
self.metrics
.projection_events_processed
.with_label_values(&[name])
.inc();
state
.events_since_checkpoint
.fetch_add(1, Ordering::Relaxed);
state.total_event_count.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
self.metrics
.projection_errors_total
.with_label_values(&[name])
.inc();
tracing::error!(
"Projection '{name}' failed to process event {}: {e}",
event.id,
);
}
}
}
if self.checkpoint_config.enabled && self.checkpoint_dir.is_some() {
self.maybe_checkpoint();
}
timer.observe_duration();
Ok(())
}
fn maybe_checkpoint(&self) {
let events_threshold = self.checkpoint_config.interval_events;
let time_threshold = self.checkpoint_config.interval_seconds;
let any_exceeded = self
.states
.iter()
.any(|s| s.events_since_checkpoint.load(Ordering::Relaxed) >= events_threshold);
let time_exceeded = {
let last = self.last_checkpoint_time.lock();
last.elapsed().as_secs() >= time_threshold
};
if (any_exceeded || time_exceeded) && self.checkpoint_all().is_err() {
tracing::error!("Failed to write projection checkpoints");
}
}
pub fn checkpoint_all(&self) -> Result<()> {
for state in &self.states {
self.checkpoint_one(state)?;
}
*self.last_checkpoint_time.lock() = Instant::now();
Ok(())
}
fn checkpoint_one(&self, state: &ProjectionState) -> Result<()> {
let name = state.projection.name();
let Some(snapshot) = state.projection.snapshot() else {
return Ok(());
};
let Some(path) = self.checkpoint_path(name) else {
return Ok(());
};
let checkpoint = ProjectionCheckpoint {
projection_name: name.to_string(),
state: snapshot,
last_event_timestamp: Utc::now(),
event_count: state.total_event_count.load(Ordering::Relaxed),
};
let json = serde_json::to_string_pretty(&checkpoint)
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
}
std::fs::write(&path, json)
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
state.events_since_checkpoint.store(0, Ordering::Relaxed);
tracing::debug!("Checkpointed projection '{name}'");
Ok(())
}
fn checkpoint_path(&self, name: &str) -> Option<PathBuf> {
self.checkpoint_dir
.as_ref()
.map(|dir| dir.join(format!("{name}.checkpoint.json")))
}
pub fn get_projection(&self, name: &str) -> Option<Arc<dyn Projection>> {
self.states
.iter()
.find(|s| s.projection.name() == name)
.map(|s| Arc::clone(&s.projection))
}
pub fn list_projections(&self) -> Vec<(String, Arc<dyn Projection>)> {
self.states
.iter()
.map(|s| (s.projection.name().to_string(), Arc::clone(&s.projection)))
.collect()
}
pub fn clear_all(&self) {
for state in &self.states {
state.projection.clear();
}
}
pub fn restored_up_to(&self, name: &str) -> Option<DateTime<Utc>> {
self.states
.iter()
.find(|s| s.projection.name() == name)
.and_then(|s| *s.restored_up_to.lock())
}
}
impl Default for ProjectionManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use uuid::Uuid;
fn create_test_event(entity_id: &str, event_type: &str) -> Event {
Event::reconstruct_from_strings(
Uuid::new_v4(),
event_type.to_string(),
entity_id.to_string(),
"default".to_string(),
serde_json::json!({
"name": "Test User",
"email": "test@example.com"
}),
chrono::Utc::now(),
None,
1,
)
}
fn create_test_event_with_timestamp(
entity_id: &str,
event_type: &str,
timestamp: DateTime<Utc>,
) -> Event {
Event::reconstruct_from_strings(
Uuid::new_v4(),
event_type.to_string(),
entity_id.to_string(),
"default".to_string(),
serde_json::json!({
"name": "Test User",
"email": "test@example.com"
}),
timestamp,
None,
1,
)
}
#[test]
fn test_entity_snapshot_projection() {
let projection = EntitySnapshotProjection::new("test");
let event = create_test_event("user-123", "user.created");
projection.process(&event).unwrap();
let state = projection.get_state("user-123").unwrap();
assert_eq!(state["name"], "Test User");
}
#[test]
fn test_event_counter_projection() {
let projection = EventCounterProjection::new("counter");
let event1 = create_test_event("user-123", "user.created");
let event2 = create_test_event("user-456", "user.created");
let event3 = create_test_event("user-123", "user.updated");
projection.process(&event1).unwrap();
projection.process(&event2).unwrap();
projection.process(&event3).unwrap();
assert_eq!(projection.get_count("user.created"), 2);
assert_eq!(projection.get_count("user.updated"), 1);
}
#[test]
fn test_projection_manager() {
let mut manager = ProjectionManager::new();
let snapshot = Arc::new(EntitySnapshotProjection::new("snapshot"));
let counter = Arc::new(EventCounterProjection::new("counter"));
manager.register(snapshot.clone());
manager.register(counter.clone());
let event = create_test_event("user-123", "user.created");
manager.process_event(&event).unwrap();
assert!(snapshot.get_state("user-123").is_some());
assert_eq!(counter.get_count("user.created"), 1);
}
#[test]
fn test_entity_snapshot_snapshot_restore() {
let projection = EntitySnapshotProjection::new("snap");
let event = create_test_event("user-1", "user.created");
projection.process(&event).unwrap();
let snap = projection.snapshot().expect("snapshot should be Some");
let projection2 = EntitySnapshotProjection::new("snap");
projection2.restore(&snap).unwrap();
let state = projection2.get_state("user-1").unwrap();
assert_eq!(state["name"], "Test User");
}
#[test]
fn test_event_counter_snapshot_restore() {
let projection = EventCounterProjection::new("counter");
for _ in 0..5 {
let event = create_test_event("user-1", "user.created");
projection.process(&event).unwrap();
}
let snap = projection.snapshot().expect("snapshot should be Some");
let projection2 = EventCounterProjection::new("counter");
projection2.restore(&snap).unwrap();
assert_eq!(projection2.get_count("user.created"), 5);
}
#[test]
fn test_checkpoint_write_and_read() {
let dir = tempfile::tempdir().unwrap();
let checkpoint_dir = dir.path().join("projections");
let config = CheckpointConfig {
enabled: true,
interval_events: 100,
interval_seconds: 3600,
};
let mut manager =
ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir.clone());
let counter = Arc::new(EventCounterProjection::new("counter"));
manager.register(counter.clone());
for i in 0..50 {
let event = create_test_event(&format!("user-{i}"), "user.created");
manager.process_event(&event).unwrap();
}
manager.checkpoint_all().unwrap();
let cp_path = checkpoint_dir.join("counter.checkpoint.json");
assert!(cp_path.exists());
let data = std::fs::read_to_string(&cp_path).unwrap();
let checkpoint: ProjectionCheckpoint = serde_json::from_str(&data).unwrap();
assert_eq!(checkpoint.projection_name, "counter");
assert_eq!(checkpoint.event_count, 50);
}
#[test]
fn test_checkpoint_restore_on_register() {
let dir = tempfile::tempdir().unwrap();
let checkpoint_dir = dir.path().join("projections");
let config = CheckpointConfig {
enabled: true,
interval_events: 100_000,
interval_seconds: 3600,
};
{
let mut manager = ProjectionManager::new()
.with_checkpoint_config(config.clone(), checkpoint_dir.clone());
let counter = Arc::new(EventCounterProjection::new("counter"));
manager.register(counter.clone());
for i in 0..100 {
let event = create_test_event(&format!("user-{i}"), "user.created");
manager.process_event(&event).unwrap();
}
manager.checkpoint_all().unwrap();
assert_eq!(counter.get_count("user.created"), 100);
}
{
let mut manager =
ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir);
let counter = Arc::new(EventCounterProjection::new("counter"));
manager.register(counter.clone());
assert_eq!(counter.get_count("user.created"), 100);
assert!(manager.restored_up_to("counter").is_some());
}
}
#[test]
fn test_checkpoint_skips_old_events_during_replay() {
let dir = tempfile::tempdir().unwrap();
let checkpoint_dir = dir.path().join("projections");
let config = CheckpointConfig {
enabled: true,
interval_events: 100_000,
interval_seconds: 3600,
};
let checkpoint_time = Utc::now();
{
let mut manager = ProjectionManager::new()
.with_checkpoint_config(config.clone(), checkpoint_dir.clone());
let counter = Arc::new(EventCounterProjection::new("counter"));
manager.register(counter.clone());
for i in 0..1000 {
let event = create_test_event_with_timestamp(
&format!("user-{i}"),
"user.created",
checkpoint_time,
);
manager.process_event(&event).unwrap();
}
manager.checkpoint_all().unwrap();
assert_eq!(counter.get_count("user.created"), 1000);
}
{
let mut manager =
ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir);
let counter = Arc::new(EventCounterProjection::new("counter"));
manager.register(counter.clone());
assert_eq!(counter.get_count("user.created"), 1000);
for i in 0..1000 {
let event = create_test_event_with_timestamp(
&format!("user-{i}"),
"user.created",
checkpoint_time,
);
manager.process_event(&event).unwrap();
}
assert_eq!(counter.get_count("user.created"), 1000);
let later = checkpoint_time + chrono::Duration::seconds(10);
for i in 0..10 {
let event = create_test_event_with_timestamp(
&format!("new-user-{i}"),
"user.created",
later,
);
manager.process_event(&event).unwrap();
}
assert_eq!(counter.get_count("user.created"), 1010);
}
}
#[test]
fn test_auto_checkpoint_by_event_count() {
let dir = tempfile::tempdir().unwrap();
let checkpoint_dir = dir.path().join("projections");
let config = CheckpointConfig {
enabled: true,
interval_events: 50, interval_seconds: 3600,
};
let mut manager =
ProjectionManager::new().with_checkpoint_config(config, checkpoint_dir.clone());
let counter = Arc::new(EventCounterProjection::new("counter"));
manager.register(counter.clone());
for i in 0..60 {
let event = create_test_event(&format!("user-{i}"), "user.created");
manager.process_event(&event).unwrap();
}
let cp_path = checkpoint_dir.join("counter.checkpoint.json");
assert!(cp_path.exists());
}
#[test]
fn test_default_noop_snapshot_restore() {
struct MinimalProjection;
impl Projection for MinimalProjection {
fn name(&self) -> &'static str {
"minimal"
}
fn process(&self, _event: &Event) -> Result<()> {
Ok(())
}
fn get_state(&self, _entity_id: &str) -> Option<Value> {
None
}
fn clear(&self) {}
}
let projection = MinimalProjection;
assert!(projection.snapshot().is_none());
assert!(projection.restore(&Value::Null).is_ok());
}
#[test]
fn test_checkpoint_with_no_dir_is_noop() {
let mut manager = ProjectionManager::new();
let counter = Arc::new(EventCounterProjection::new("counter"));
manager.register(counter.clone());
for _ in 0..10 {
let event = create_test_event("user-1", "user.created");
manager.process_event(&event).unwrap();
}
manager.checkpoint_all().unwrap();
}
}