1use crate::{
2 application::{dto::QueryEventsRequest, services::projection::Projection},
3 domain::entities::Event,
4 error::{AllSourceError, Result},
5 store::EventStore,
6};
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::sync::{
11 Arc,
12 atomic::{AtomicBool, AtomicU64, Ordering},
13};
14use uuid::Uuid;
15
16#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(rename_all = "lowercase")]
19pub enum ReplayStatus {
20 Pending,
22 Running,
24 Completed,
26 Failed,
28 Cancelled,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ReplayConfig {
35 pub batch_size: usize,
37
38 pub parallel: bool,
40
41 pub workers: usize,
43
44 pub emit_progress: bool,
46
47 pub progress_interval: usize,
49}
50
51impl Default for ReplayConfig {
52 fn default() -> Self {
53 Self {
54 batch_size: 1000,
55 parallel: false,
56 workers: 4,
57 emit_progress: true,
58 progress_interval: 1000,
59 }
60 }
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct StartReplayRequest {
66 pub projection_name: Option<String>,
68
69 pub from_timestamp: Option<DateTime<Utc>>,
71
72 pub to_timestamp: Option<DateTime<Utc>>,
74
75 pub entity_id: Option<String>,
77
78 pub event_type: Option<String>,
80
81 pub config: Option<ReplayConfig>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct StartReplayResponse {
88 pub replay_id: Uuid,
89 pub status: ReplayStatus,
90 pub started_at: DateTime<Utc>,
91 pub total_events: usize,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct ReplayProgress {
97 pub replay_id: Uuid,
98 pub status: ReplayStatus,
99 pub started_at: DateTime<Utc>,
100 pub updated_at: DateTime<Utc>,
101 pub completed_at: Option<DateTime<Utc>>,
102 pub total_events: usize,
103 pub processed_events: usize,
104 pub failed_events: usize,
105 pub progress_percentage: f64,
106 pub events_per_second: f64,
107 pub error_message: Option<String>,
108}
109
110pub struct ReplayManager {
112 replays: Arc<RwLock<Vec<ReplayState>>>,
114}
115
116struct ReplayState {
118 id: Uuid,
119 projection_name: Option<String>,
120 status: Arc<RwLock<ReplayStatus>>,
121 started_at: DateTime<Utc>,
122 completed_at: Arc<RwLock<Option<DateTime<Utc>>>>,
123 total_events: usize,
124 processed_events: Arc<AtomicU64>,
125 failed_events: Arc<AtomicU64>,
126 error_message: Arc<RwLock<Option<String>>>,
127 cancelled: Arc<AtomicBool>,
128}
129
130impl ReplayManager {
131 pub fn new() -> Self {
132 Self {
133 replays: Arc::new(RwLock::new(Vec::new())),
134 }
135 }
136
137 pub fn start_replay(
139 &self,
140 store: Arc<EventStore>,
141 request: StartReplayRequest,
142 ) -> Result<StartReplayResponse> {
143 let replay_id = Uuid::new_v4();
144 let started_at = Utc::now();
145 let config = request.config.unwrap_or_default();
146
147 let query = QueryEventsRequest {
149 entity_id: request.entity_id.clone(),
150 event_type: request.event_type.clone(),
151 tenant_id: None,
152 as_of: request.to_timestamp,
153 since: request.from_timestamp,
154 until: request.to_timestamp,
155 limit: None,
156 event_type_prefix: None,
157 payload_filter: None,
158 };
159
160 let events = store.query(&query)?;
161 let total_events = events.len();
162
163 tracing::info!(
164 "🔄 Starting replay {} for {} events{}",
165 replay_id,
166 total_events,
167 request
168 .projection_name
169 .as_ref()
170 .map(|n| format!(" (projection: {n})"))
171 .unwrap_or_default()
172 );
173
174 let state = ReplayState {
176 id: replay_id,
177 projection_name: request.projection_name.clone(),
178 status: Arc::new(RwLock::new(ReplayStatus::Running)),
179 started_at,
180 completed_at: Arc::new(RwLock::new(None)),
181 total_events,
182 processed_events: Arc::new(AtomicU64::new(0)),
183 failed_events: Arc::new(AtomicU64::new(0)),
184 error_message: Arc::new(RwLock::new(None)),
185 cancelled: Arc::new(AtomicBool::new(false)),
186 };
187
188 self.replays.write().push(state);
190
191 let replays = Arc::clone(&self.replays);
193 let replay_idx = replays.read().len() - 1;
194
195 tokio::spawn(async move {
197 let result = Self::run_replay(
198 store,
199 events,
200 request.projection_name,
201 config,
202 replays.clone(),
203 replay_idx,
204 )
205 .await;
206
207 let mut replays_lock = replays.write();
209 if let Some(state) = replays_lock.get_mut(replay_idx) {
210 *state.completed_at.write() = Some(Utc::now());
211
212 match result {
213 Ok(()) => {
214 if state.cancelled.load(Ordering::Relaxed) {
215 *state.status.write() = ReplayStatus::Cancelled;
216 tracing::info!("🛑 Replay {} cancelled", state.id);
217 } else {
218 *state.status.write() = ReplayStatus::Completed;
219 tracing::info!("✅ Replay {} completed successfully", state.id);
220 }
221 }
222 Err(e) => {
223 *state.status.write() = ReplayStatus::Failed;
224 *state.error_message.write() = Some(e.to_string());
225 tracing::error!("❌ Replay {} failed: {}", state.id, e);
226 }
227 }
228 }
229 });
230
231 Ok(StartReplayResponse {
232 replay_id,
233 status: ReplayStatus::Running,
234 started_at,
235 total_events,
236 })
237 }
238
239 async fn run_replay(
241 store: Arc<EventStore>,
242 events: Vec<Event>,
243 projection_name: Option<String>,
244 config: ReplayConfig,
245 replays: Arc<RwLock<Vec<ReplayState>>>,
246 replay_idx: usize,
247 ) -> Result<()> {
248 let total = events.len();
249 let projections = store.projections.read();
250
251 let target_projections: Vec<(String, Arc<dyn Projection>)> =
253 if let Some(name) = projection_name {
254 if let Some(proj) = projections.get_projection(&name) {
255 vec![(name, proj)]
256 } else {
257 return Err(AllSourceError::ValidationError(format!(
258 "Projection not found: {name}"
259 )));
260 }
261 } else {
262 projections.list_projections()
263 };
264
265 drop(projections); for (batch_idx, chunk) in events.chunks(config.batch_size).enumerate() {
269 {
271 let replays_lock = replays.read();
272 if let Some(state) = replays_lock.get(replay_idx)
273 && state.cancelled.load(Ordering::Relaxed)
274 {
275 return Ok(());
276 }
277 }
278
279 for event in chunk {
281 for (proj_name, projection) in &target_projections {
283 if let Err(e) = projection.process(event) {
284 tracing::warn!(
285 "Failed to process event {} in projection {}: {}",
286 event.id,
287 proj_name,
288 e
289 );
290
291 let replays_lock = replays.read();
293 if let Some(state) = replays_lock.get(replay_idx) {
294 state.failed_events.fetch_add(1, Ordering::Relaxed);
295 }
296 }
297 }
298
299 let replays_lock = replays.read();
301 if let Some(state) = replays_lock.get(replay_idx) {
302 let processed = state.processed_events.fetch_add(1, Ordering::Relaxed) + 1;
303
304 if config.emit_progress && processed % config.progress_interval as u64 == 0 {
306 let progress = (processed as f64 / total as f64) * 100.0;
307 tracing::debug!(
308 "Replay progress: {}/{} ({:.1}%)",
309 processed,
310 total,
311 progress
312 );
313 }
314 }
315 }
316 }
317
318 Ok(())
319 }
320
321 pub fn get_progress(&self, replay_id: Uuid) -> Result<ReplayProgress> {
323 let replays = self.replays.read();
324
325 let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
326 AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
327 })?;
328
329 let processed = state.processed_events.load(Ordering::Relaxed);
330 let failed = state.failed_events.load(Ordering::Relaxed);
331 let total_events = state.total_events;
332 let started_at = state.started_at;
333 let status = *state.status.read();
334 let completed_at = *state.completed_at.read();
335 let error_message = state.error_message.read().clone();
336
337 drop(replays); let progress_percentage = if total_events > 0 {
340 (processed as f64 / total_events as f64) * 100.0
341 } else {
342 0.0
343 };
344
345 let updated_at = Utc::now();
346 let elapsed_seconds = (updated_at - started_at).num_seconds().max(1) as f64;
347 let events_per_second = processed as f64 / elapsed_seconds;
348
349 Ok(ReplayProgress {
350 replay_id,
351 status,
352 started_at,
353 updated_at,
354 completed_at,
355 total_events,
356 processed_events: processed as usize,
357 failed_events: failed as usize,
358 progress_percentage,
359 events_per_second,
360 error_message,
361 })
362 }
363
364 pub fn cancel_replay(&self, replay_id: Uuid) -> Result<()> {
366 let replays = self.replays.read();
367
368 let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
369 AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
370 })?;
371
372 let status = *state.status.read();
373 if status != ReplayStatus::Running {
374 return Err(AllSourceError::ValidationError(format!(
375 "Cannot cancel replay in status: {status:?}"
376 )));
377 }
378
379 state.cancelled.store(true, Ordering::Relaxed);
380 tracing::info!("🛑 Cancelling replay {}", replay_id);
381
382 Ok(())
383 }
384
385 pub fn list_replays(&self) -> Vec<ReplayProgress> {
387 let replays = self.replays.read();
388
389 replays
390 .iter()
391 .map(|state| {
392 let processed = state.processed_events.load(Ordering::Relaxed);
393 let failed = state.failed_events.load(Ordering::Relaxed);
394 let progress_percentage = if state.total_events > 0 {
395 (processed as f64 / state.total_events as f64) * 100.0
396 } else {
397 0.0
398 };
399
400 let updated_at = Utc::now();
401 let elapsed_seconds = (updated_at - state.started_at).num_seconds().max(1) as f64;
402 let events_per_second = processed as f64 / elapsed_seconds;
403
404 ReplayProgress {
405 replay_id: state.id,
406 status: *state.status.read(),
407 started_at: state.started_at,
408 updated_at,
409 completed_at: *state.completed_at.read(),
410 total_events: state.total_events,
411 processed_events: processed as usize,
412 failed_events: failed as usize,
413 progress_percentage,
414 events_per_second,
415 error_message: state.error_message.read().clone(),
416 }
417 })
418 .collect()
419 }
420
421 pub fn delete_replay(&self, replay_id: Uuid) -> Result<bool> {
423 let mut replays = self.replays.write();
424
425 let idx = replays
426 .iter()
427 .position(|r| r.id == replay_id)
428 .ok_or_else(|| {
429 AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
430 })?;
431
432 let status = *replays[idx].status.read();
433 if status == ReplayStatus::Running {
434 return Err(AllSourceError::ValidationError(
435 "Cannot delete a running replay. Cancel it first.".to_string(),
436 ));
437 }
438
439 replays.remove(idx);
440 tracing::info!("🗑️ Deleted replay {}", replay_id);
441
442 Ok(true)
443 }
444}
445
446impl Default for ReplayManager {
447 fn default() -> Self {
448 Self::new()
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455 use crate::domain::entities::Event;
456 use serde_json::json;
457
458 #[tokio::test]
459 async fn test_replay_manager_creation() {
460 let manager = ReplayManager::new();
461 let replays = manager.list_replays();
462 assert_eq!(replays.len(), 0);
463 }
464
465 #[tokio::test]
466 async fn test_replay_progress_tracking() {
467 let manager = ReplayManager::new();
468 let store = Arc::new(EventStore::new());
469
470 for i in 0..10 {
472 let event = Event::from_strings(
473 "test.event".to_string(),
474 "test-entity".to_string(),
475 "default".to_string(),
476 json!({"value": i}),
477 None,
478 )
479 .unwrap();
480 store.ingest(&event).unwrap();
481 }
482
483 let request = StartReplayRequest {
485 projection_name: None,
486 from_timestamp: None,
487 to_timestamp: None,
488 entity_id: None,
489 event_type: None,
490 config: Some(ReplayConfig {
491 batch_size: 5,
492 parallel: false,
493 workers: 1,
494 emit_progress: true,
495 progress_interval: 5,
496 }),
497 };
498
499 let response = manager.start_replay(store, request).unwrap();
500 assert_eq!(response.status, ReplayStatus::Running);
501 assert_eq!(response.total_events, 10);
502
503 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
505
506 let progress = manager.get_progress(response.replay_id).unwrap();
508 assert!(progress.processed_events <= 10);
509 }
510}