1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::application::dto::QueryEventsRequest;
4use crate::projection::Projection;
5use crate::store::EventStore;
6use chrono::{DateTime, Utc};
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::Arc;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "lowercase")]
16pub enum ReplayStatus {
17 Pending,
19 Running,
21 Completed,
23 Failed,
25 Cancelled,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct ReplayConfig {
32 pub batch_size: usize,
34
35 pub parallel: bool,
37
38 pub workers: usize,
40
41 pub emit_progress: bool,
43
44 pub progress_interval: usize,
46}
47
48impl Default for ReplayConfig {
49 fn default() -> Self {
50 Self {
51 batch_size: 1000,
52 parallel: false,
53 workers: 4,
54 emit_progress: true,
55 progress_interval: 1000,
56 }
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct StartReplayRequest {
63 pub projection_name: Option<String>,
65
66 pub from_timestamp: Option<DateTime<Utc>>,
68
69 pub to_timestamp: Option<DateTime<Utc>>,
71
72 pub entity_id: Option<String>,
74
75 pub event_type: Option<String>,
77
78 pub config: Option<ReplayConfig>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct StartReplayResponse {
85 pub replay_id: Uuid,
86 pub status: ReplayStatus,
87 pub started_at: DateTime<Utc>,
88 pub total_events: usize,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct ReplayProgress {
94 pub replay_id: Uuid,
95 pub status: ReplayStatus,
96 pub started_at: DateTime<Utc>,
97 pub updated_at: DateTime<Utc>,
98 pub completed_at: Option<DateTime<Utc>>,
99 pub total_events: usize,
100 pub processed_events: usize,
101 pub failed_events: usize,
102 pub progress_percentage: f64,
103 pub events_per_second: f64,
104 pub error_message: Option<String>,
105}
106
107pub struct ReplayManager {
109 replays: Arc<RwLock<Vec<ReplayState>>>,
111}
112
113struct ReplayState {
115 id: Uuid,
116 projection_name: Option<String>,
117 status: Arc<RwLock<ReplayStatus>>,
118 started_at: DateTime<Utc>,
119 completed_at: Arc<RwLock<Option<DateTime<Utc>>>>,
120 total_events: usize,
121 processed_events: Arc<AtomicU64>,
122 failed_events: Arc<AtomicU64>,
123 error_message: Arc<RwLock<Option<String>>>,
124 cancelled: Arc<AtomicBool>,
125}
126
127impl ReplayManager {
128 pub fn new() -> Self {
129 Self {
130 replays: Arc::new(RwLock::new(Vec::new())),
131 }
132 }
133
134 pub fn start_replay(
136 &self,
137 store: Arc<EventStore>,
138 request: StartReplayRequest,
139 ) -> Result<StartReplayResponse> {
140 let replay_id = Uuid::new_v4();
141 let started_at = Utc::now();
142 let config = request.config.unwrap_or_default();
143
144 let query = QueryEventsRequest {
146 entity_id: request.entity_id.clone(),
147 event_type: request.event_type.clone(),
148 tenant_id: None,
149 as_of: request.to_timestamp,
150 since: request.from_timestamp,
151 until: request.to_timestamp,
152 limit: None,
153 };
154
155 let events = store.query(query)?;
156 let total_events = events.len();
157
158 tracing::info!(
159 "🔄 Starting replay {} for {} events{}",
160 replay_id,
161 total_events,
162 request.projection_name
163 .as_ref()
164 .map(|n| format!(" (projection: {})", n))
165 .unwrap_or_default()
166 );
167
168 let state = ReplayState {
170 id: replay_id,
171 projection_name: request.projection_name.clone(),
172 status: Arc::new(RwLock::new(ReplayStatus::Running)),
173 started_at,
174 completed_at: Arc::new(RwLock::new(None)),
175 total_events,
176 processed_events: Arc::new(AtomicU64::new(0)),
177 failed_events: Arc::new(AtomicU64::new(0)),
178 error_message: Arc::new(RwLock::new(None)),
179 cancelled: Arc::new(AtomicBool::new(false)),
180 };
181
182 self.replays.write().push(state);
184
185 let replays = Arc::clone(&self.replays);
187 let replay_idx = replays.read().len() - 1;
188
189 tokio::spawn(async move {
191 let result = Self::run_replay(
192 store,
193 events,
194 request.projection_name,
195 config,
196 replays.clone(),
197 replay_idx,
198 ).await;
199
200 let mut replays_lock = replays.write();
202 if let Some(state) = replays_lock.get_mut(replay_idx) {
203 *state.completed_at.write() = Some(Utc::now());
204
205 match result {
206 Ok(_) => {
207 if state.cancelled.load(Ordering::Relaxed) {
208 *state.status.write() = ReplayStatus::Cancelled;
209 tracing::info!("🛑 Replay {} cancelled", state.id);
210 } else {
211 *state.status.write() = ReplayStatus::Completed;
212 tracing::info!("✅ Replay {} completed successfully", state.id);
213 }
214 }
215 Err(e) => {
216 *state.status.write() = ReplayStatus::Failed;
217 *state.error_message.write() = Some(e.to_string());
218 tracing::error!("❌ Replay {} failed: {}", state.id, e);
219 }
220 }
221 }
222 });
223
224 Ok(StartReplayResponse {
225 replay_id,
226 status: ReplayStatus::Running,
227 started_at,
228 total_events,
229 })
230 }
231
232 async fn run_replay(
234 store: Arc<EventStore>,
235 events: Vec<Event>,
236 projection_name: Option<String>,
237 config: ReplayConfig,
238 replays: Arc<RwLock<Vec<ReplayState>>>,
239 replay_idx: usize,
240 ) -> Result<()> {
241 let total = events.len();
242 let projections = store.projections.read();
243
244 let target_projections: Vec<(String, Arc<dyn Projection>)> = if let Some(name) = projection_name {
246 if let Some(proj) = projections.get_projection(&name) {
247 vec![(name, proj)]
248 } else {
249 return Err(AllSourceError::ValidationError(format!(
250 "Projection not found: {}",
251 name
252 )));
253 }
254 } else {
255 projections.list_projections()
256 };
257
258 drop(projections); for (batch_idx, chunk) in events.chunks(config.batch_size).enumerate() {
262 {
264 let replays_lock = replays.read();
265 if let Some(state) = replays_lock.get(replay_idx) {
266 if state.cancelled.load(Ordering::Relaxed) {
267 return Ok(());
268 }
269 }
270 }
271
272 for event in chunk {
274 for (proj_name, projection) in &target_projections {
276 if let Err(e) = projection.process(event) {
277 tracing::warn!(
278 "Failed to process event {} in projection {}: {}",
279 event.id,
280 proj_name,
281 e
282 );
283
284 let replays_lock = replays.read();
286 if let Some(state) = replays_lock.get(replay_idx) {
287 state.failed_events.fetch_add(1, Ordering::Relaxed);
288 }
289 }
290 }
291
292 let replays_lock = replays.read();
294 if let Some(state) = replays_lock.get(replay_idx) {
295 let processed = state.processed_events.fetch_add(1, Ordering::Relaxed) + 1;
296
297 if config.emit_progress && processed % config.progress_interval as u64 == 0 {
299 let progress = (processed as f64 / total as f64) * 100.0;
300 tracing::debug!(
301 "Replay progress: {}/{} ({:.1}%)",
302 processed,
303 total,
304 progress
305 );
306 }
307 }
308 }
309 }
310
311 Ok(())
312 }
313
314 pub fn get_progress(&self, replay_id: Uuid) -> Result<ReplayProgress> {
316 let replays = self.replays.read();
317
318 let state = replays
319 .iter()
320 .find(|r| r.id == replay_id)
321 .ok_or_else(|| {
322 AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
323 })?;
324
325 let processed = state.processed_events.load(Ordering::Relaxed);
326 let failed = state.failed_events.load(Ordering::Relaxed);
327 let total_events = state.total_events;
328 let started_at = state.started_at;
329 let status = *state.status.read();
330 let completed_at = *state.completed_at.read();
331 let error_message = state.error_message.read().clone();
332
333 drop(replays); let progress_percentage = if total_events > 0 {
336 (processed as f64 / total_events as f64) * 100.0
337 } else {
338 0.0
339 };
340
341 let updated_at = Utc::now();
342 let elapsed_seconds = (updated_at - started_at).num_seconds().max(1) as f64;
343 let events_per_second = processed as f64 / elapsed_seconds;
344
345 Ok(ReplayProgress {
346 replay_id,
347 status,
348 started_at,
349 updated_at,
350 completed_at,
351 total_events,
352 processed_events: processed as usize,
353 failed_events: failed as usize,
354 progress_percentage,
355 events_per_second,
356 error_message,
357 })
358 }
359
360 pub fn cancel_replay(&self, replay_id: Uuid) -> Result<()> {
362 let replays = self.replays.read();
363
364 let state = replays
365 .iter()
366 .find(|r| r.id == replay_id)
367 .ok_or_else(|| {
368 AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
369 })?;
370
371 let status = *state.status.read();
372 if status != ReplayStatus::Running {
373 return Err(AllSourceError::ValidationError(format!(
374 "Cannot cancel replay in status: {:?}",
375 status
376 )));
377 }
378
379 state.cancelled.store(true, Ordering::Relaxed);
380 tracing::info!("🛑 Cancelling replay {}", replay_id);
381
382 Ok(())
383 }
384
385 pub fn list_replays(&self) -> Vec<ReplayProgress> {
387 let replays = self.replays.read();
388
389 replays
390 .iter()
391 .map(|state| {
392 let processed = state.processed_events.load(Ordering::Relaxed);
393 let failed = state.failed_events.load(Ordering::Relaxed);
394 let progress_percentage = if state.total_events > 0 {
395 (processed as f64 / state.total_events as f64) * 100.0
396 } else {
397 0.0
398 };
399
400 let updated_at = Utc::now();
401 let elapsed_seconds = (updated_at - state.started_at).num_seconds().max(1) as f64;
402 let events_per_second = processed as f64 / elapsed_seconds;
403
404 ReplayProgress {
405 replay_id: state.id,
406 status: *state.status.read(),
407 started_at: state.started_at,
408 updated_at,
409 completed_at: *state.completed_at.read(),
410 total_events: state.total_events,
411 processed_events: processed as usize,
412 failed_events: failed as usize,
413 progress_percentage,
414 events_per_second,
415 error_message: state.error_message.read().clone(),
416 }
417 })
418 .collect()
419 }
420
421 pub fn delete_replay(&self, replay_id: Uuid) -> Result<bool> {
423 let mut replays = self.replays.write();
424
425 let idx = replays
426 .iter()
427 .position(|r| r.id == replay_id)
428 .ok_or_else(|| {
429 AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
430 })?;
431
432 let status = *replays[idx].status.read();
433 if status == ReplayStatus::Running {
434 return Err(AllSourceError::ValidationError(
435 "Cannot delete a running replay. Cancel it first.".to_string(),
436 ));
437 }
438
439 replays.remove(idx);
440 tracing::info!("🗑️ Deleted replay {}", replay_id);
441
442 Ok(true)
443 }
444}
445
446impl Default for ReplayManager {
447 fn default() -> Self {
448 Self::new()
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455 use crate::domain::entities::Event;
456 use serde_json::json;
457
458 #[tokio::test]
459 async fn test_replay_manager_creation() {
460 let manager = ReplayManager::new();
461 let replays = manager.list_replays();
462 assert_eq!(replays.len(), 0);
463 }
464
465 #[tokio::test]
466 async fn test_replay_progress_tracking() {
467 let manager = ReplayManager::new();
468 let store = Arc::new(EventStore::new());
469
470 for i in 0..10 {
472 let event = Event::from_strings(
473 "test.event".to_string(),
474 "test-entity".to_string(),
475 "default".to_string(),
476 json!({"value": i}),
477 None,
478 ).unwrap();
479 store.ingest(event).unwrap();
480 }
481
482 let request = StartReplayRequest {
484 projection_name: None,
485 from_timestamp: None,
486 to_timestamp: None,
487 entity_id: None,
488 event_type: None,
489 config: Some(ReplayConfig {
490 batch_size: 5,
491 parallel: false,
492 workers: 1,
493 emit_progress: true,
494 progress_interval: 5,
495 }),
496 };
497
498 let response = manager.start_replay(store, request).unwrap();
499 assert_eq!(response.status, ReplayStatus::Running);
500 assert_eq!(response.total_events, 10);
501
502 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
504
505 let progress = manager.get_progress(response.replay_id).unwrap();
507 assert!(progress.processed_events <= 10);
508 }
509}