1use crate::{
2 application::{dto::QueryEventsRequest, services::projection::Projection},
3 domain::entities::Event,
4 error::{AllSourceError, Result},
5 store::EventStore,
6};
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::sync::{
11 Arc,
12 atomic::{AtomicBool, AtomicU64, Ordering},
13};
14use uuid::Uuid;
15
16#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(rename_all = "lowercase")]
19pub enum ReplayStatus {
20 Pending,
22 Running,
24 Completed,
26 Failed,
28 Cancelled,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ReplayConfig {
35 pub batch_size: usize,
37
38 pub parallel: bool,
40
41 pub workers: usize,
43
44 pub emit_progress: bool,
46
47 pub progress_interval: usize,
49}
50
51impl Default for ReplayConfig {
52 fn default() -> Self {
53 Self {
54 batch_size: 1000,
55 parallel: false,
56 workers: 4,
57 emit_progress: true,
58 progress_interval: 1000,
59 }
60 }
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct StartReplayRequest {
66 pub projection_name: Option<String>,
68
69 pub from_timestamp: Option<DateTime<Utc>>,
71
72 pub to_timestamp: Option<DateTime<Utc>>,
74
75 pub entity_id: Option<String>,
77
78 pub event_type: Option<String>,
80
81 pub config: Option<ReplayConfig>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct StartReplayResponse {
88 pub replay_id: Uuid,
89 pub status: ReplayStatus,
90 pub started_at: DateTime<Utc>,
91 pub total_events: usize,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct ReplayProgress {
97 pub replay_id: Uuid,
98 pub status: ReplayStatus,
99 pub started_at: DateTime<Utc>,
100 pub updated_at: DateTime<Utc>,
101 pub completed_at: Option<DateTime<Utc>>,
102 pub total_events: usize,
103 pub processed_events: usize,
104 pub failed_events: usize,
105 pub progress_percentage: f64,
106 pub events_per_second: f64,
107 pub error_message: Option<String>,
108}
109
110pub struct ReplayManager {
112 replays: Arc<RwLock<Vec<ReplayState>>>,
114}
115
116struct ReplayState {
118 id: Uuid,
119 projection_name: Option<String>,
120 status: Arc<RwLock<ReplayStatus>>,
121 started_at: DateTime<Utc>,
122 completed_at: Arc<RwLock<Option<DateTime<Utc>>>>,
123 total_events: usize,
124 processed_events: Arc<AtomicU64>,
125 failed_events: Arc<AtomicU64>,
126 error_message: Arc<RwLock<Option<String>>>,
127 cancelled: Arc<AtomicBool>,
128}
129
130impl ReplayManager {
131 pub fn new() -> Self {
132 Self {
133 replays: Arc::new(RwLock::new(Vec::new())),
134 }
135 }
136
137 pub fn start_replay(
139 &self,
140 store: Arc<EventStore>,
141 request: StartReplayRequest,
142 ) -> Result<StartReplayResponse> {
143 let replay_id = Uuid::new_v4();
144 let started_at = Utc::now();
145 let config = request.config.unwrap_or_default();
146
147 let query = QueryEventsRequest {
149 entity_id: request.entity_id.clone(),
150 event_type: request.event_type.clone(),
151 tenant_id: None,
152 as_of: request.to_timestamp,
153 since: request.from_timestamp,
154 until: request.to_timestamp,
155 limit: None,
156 };
157
158 let events = store.query(query)?;
159 let total_events = events.len();
160
161 tracing::info!(
162 "🔄 Starting replay {} for {} events{}",
163 replay_id,
164 total_events,
165 request
166 .projection_name
167 .as_ref()
168 .map(|n| format!(" (projection: {n})"))
169 .unwrap_or_default()
170 );
171
172 let state = ReplayState {
174 id: replay_id,
175 projection_name: request.projection_name.clone(),
176 status: Arc::new(RwLock::new(ReplayStatus::Running)),
177 started_at,
178 completed_at: Arc::new(RwLock::new(None)),
179 total_events,
180 processed_events: Arc::new(AtomicU64::new(0)),
181 failed_events: Arc::new(AtomicU64::new(0)),
182 error_message: Arc::new(RwLock::new(None)),
183 cancelled: Arc::new(AtomicBool::new(false)),
184 };
185
186 self.replays.write().push(state);
188
189 let replays = Arc::clone(&self.replays);
191 let replay_idx = replays.read().len() - 1;
192
193 tokio::spawn(async move {
195 let result = Self::run_replay(
196 store,
197 events,
198 request.projection_name,
199 config,
200 replays.clone(),
201 replay_idx,
202 )
203 .await;
204
205 let mut replays_lock = replays.write();
207 if let Some(state) = replays_lock.get_mut(replay_idx) {
208 *state.completed_at.write() = Some(Utc::now());
209
210 match result {
211 Ok(_) => {
212 if state.cancelled.load(Ordering::Relaxed) {
213 *state.status.write() = ReplayStatus::Cancelled;
214 tracing::info!("🛑 Replay {} cancelled", state.id);
215 } else {
216 *state.status.write() = ReplayStatus::Completed;
217 tracing::info!("✅ Replay {} completed successfully", state.id);
218 }
219 }
220 Err(e) => {
221 *state.status.write() = ReplayStatus::Failed;
222 *state.error_message.write() = Some(e.to_string());
223 tracing::error!("❌ Replay {} failed: {}", state.id, e);
224 }
225 }
226 }
227 });
228
229 Ok(StartReplayResponse {
230 replay_id,
231 status: ReplayStatus::Running,
232 started_at,
233 total_events,
234 })
235 }
236
237 async fn run_replay(
239 store: Arc<EventStore>,
240 events: Vec<Event>,
241 projection_name: Option<String>,
242 config: ReplayConfig,
243 replays: Arc<RwLock<Vec<ReplayState>>>,
244 replay_idx: usize,
245 ) -> Result<()> {
246 let total = events.len();
247 let projections = store.projections.read();
248
249 let target_projections: Vec<(String, Arc<dyn Projection>)> =
251 if let Some(name) = projection_name {
252 if let Some(proj) = projections.get_projection(&name) {
253 vec![(name, proj)]
254 } else {
255 return Err(AllSourceError::ValidationError(format!(
256 "Projection not found: {}",
257 name
258 )));
259 }
260 } else {
261 projections.list_projections()
262 };
263
264 drop(projections); for (batch_idx, chunk) in events.chunks(config.batch_size).enumerate() {
268 {
270 let replays_lock = replays.read();
271 if let Some(state) = replays_lock.get(replay_idx)
272 && state.cancelled.load(Ordering::Relaxed)
273 {
274 return Ok(());
275 }
276 }
277
278 for event in chunk {
280 for (proj_name, projection) in &target_projections {
282 if let Err(e) = projection.process(event) {
283 tracing::warn!(
284 "Failed to process event {} in projection {}: {}",
285 event.id,
286 proj_name,
287 e
288 );
289
290 let replays_lock = replays.read();
292 if let Some(state) = replays_lock.get(replay_idx) {
293 state.failed_events.fetch_add(1, Ordering::Relaxed);
294 }
295 }
296 }
297
298 let replays_lock = replays.read();
300 if let Some(state) = replays_lock.get(replay_idx) {
301 let processed = state.processed_events.fetch_add(1, Ordering::Relaxed) + 1;
302
303 if config.emit_progress && processed % config.progress_interval as u64 == 0 {
305 let progress = (processed as f64 / total as f64) * 100.0;
306 tracing::debug!(
307 "Replay progress: {}/{} ({:.1}%)",
308 processed,
309 total,
310 progress
311 );
312 }
313 }
314 }
315 }
316
317 Ok(())
318 }
319
320 pub fn get_progress(&self, replay_id: Uuid) -> Result<ReplayProgress> {
322 let replays = self.replays.read();
323
324 let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
325 AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
326 })?;
327
328 let processed = state.processed_events.load(Ordering::Relaxed);
329 let failed = state.failed_events.load(Ordering::Relaxed);
330 let total_events = state.total_events;
331 let started_at = state.started_at;
332 let status = *state.status.read();
333 let completed_at = *state.completed_at.read();
334 let error_message = state.error_message.read().clone();
335
336 drop(replays); let progress_percentage = if total_events > 0 {
339 (processed as f64 / total_events as f64) * 100.0
340 } else {
341 0.0
342 };
343
344 let updated_at = Utc::now();
345 let elapsed_seconds = (updated_at - started_at).num_seconds().max(1) as f64;
346 let events_per_second = processed as f64 / elapsed_seconds;
347
348 Ok(ReplayProgress {
349 replay_id,
350 status,
351 started_at,
352 updated_at,
353 completed_at,
354 total_events,
355 processed_events: processed as usize,
356 failed_events: failed as usize,
357 progress_percentage,
358 events_per_second,
359 error_message,
360 })
361 }
362
363 pub fn cancel_replay(&self, replay_id: Uuid) -> Result<()> {
365 let replays = self.replays.read();
366
367 let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
368 AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
369 })?;
370
371 let status = *state.status.read();
372 if status != ReplayStatus::Running {
373 return Err(AllSourceError::ValidationError(format!(
374 "Cannot cancel replay in status: {:?}",
375 status
376 )));
377 }
378
379 state.cancelled.store(true, Ordering::Relaxed);
380 tracing::info!("🛑 Cancelling replay {}", replay_id);
381
382 Ok(())
383 }
384
385 pub fn list_replays(&self) -> Vec<ReplayProgress> {
387 let replays = self.replays.read();
388
389 replays
390 .iter()
391 .map(|state| {
392 let processed = state.processed_events.load(Ordering::Relaxed);
393 let failed = state.failed_events.load(Ordering::Relaxed);
394 let progress_percentage = if state.total_events > 0 {
395 (processed as f64 / state.total_events as f64) * 100.0
396 } else {
397 0.0
398 };
399
400 let updated_at = Utc::now();
401 let elapsed_seconds = (updated_at - state.started_at).num_seconds().max(1) as f64;
402 let events_per_second = processed as f64 / elapsed_seconds;
403
404 ReplayProgress {
405 replay_id: state.id,
406 status: *state.status.read(),
407 started_at: state.started_at,
408 updated_at,
409 completed_at: *state.completed_at.read(),
410 total_events: state.total_events,
411 processed_events: processed as usize,
412 failed_events: failed as usize,
413 progress_percentage,
414 events_per_second,
415 error_message: state.error_message.read().clone(),
416 }
417 })
418 .collect()
419 }
420
421 pub fn delete_replay(&self, replay_id: Uuid) -> Result<bool> {
423 let mut replays = self.replays.write();
424
425 let idx = replays
426 .iter()
427 .position(|r| r.id == replay_id)
428 .ok_or_else(|| {
429 AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
430 })?;
431
432 let status = *replays[idx].status.read();
433 if status == ReplayStatus::Running {
434 return Err(AllSourceError::ValidationError(
435 "Cannot delete a running replay. Cancel it first.".to_string(),
436 ));
437 }
438
439 replays.remove(idx);
440 tracing::info!("🗑️ Deleted replay {}", replay_id);
441
442 Ok(true)
443 }
444}
445
446impl Default for ReplayManager {
447 fn default() -> Self {
448 Self::new()
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455 use crate::domain::entities::Event;
456 use serde_json::json;
457
458 #[tokio::test]
459 async fn test_replay_manager_creation() {
460 let manager = ReplayManager::new();
461 let replays = manager.list_replays();
462 assert_eq!(replays.len(), 0);
463 }
464
465 #[tokio::test]
466 async fn test_replay_progress_tracking() {
467 let manager = ReplayManager::new();
468 let store = Arc::new(EventStore::new());
469
470 for i in 0..10 {
472 let event = Event::from_strings(
473 "test.event".to_string(),
474 "test-entity".to_string(),
475 "default".to_string(),
476 json!({"value": i}),
477 None,
478 )
479 .unwrap();
480 store.ingest(event).unwrap();
481 }
482
483 let request = StartReplayRequest {
485 projection_name: None,
486 from_timestamp: None,
487 to_timestamp: None,
488 entity_id: None,
489 event_type: None,
490 config: Some(ReplayConfig {
491 batch_size: 5,
492 parallel: false,
493 workers: 1,
494 emit_progress: true,
495 progress_interval: 5,
496 }),
497 };
498
499 let response = manager.start_replay(store, request).unwrap();
500 assert_eq!(response.status, ReplayStatus::Running);
501 assert_eq!(response.total_events, 10);
502
503 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
505
506 let progress = manager.get_progress(response.replay_id).unwrap();
508 assert!(progress.processed_events <= 10);
509 }
510}