1use crate::application::dto::QueryEventsRequest;
2use crate::application::services::projection::Projection;
3use crate::domain::entities::Event;
4use crate::error::{AllSourceError, Result};
5use crate::store::EventStore;
6use chrono::{DateTime, Utc};
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::Arc;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "lowercase")]
16pub enum ReplayStatus {
17 Pending,
19 Running,
21 Completed,
23 Failed,
25 Cancelled,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct ReplayConfig {
32 pub batch_size: usize,
34
35 pub parallel: bool,
37
38 pub workers: usize,
40
41 pub emit_progress: bool,
43
44 pub progress_interval: usize,
46}
47
48impl Default for ReplayConfig {
49 fn default() -> Self {
50 Self {
51 batch_size: 1000,
52 parallel: false,
53 workers: 4,
54 emit_progress: true,
55 progress_interval: 1000,
56 }
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct StartReplayRequest {
63 pub projection_name: Option<String>,
65
66 pub from_timestamp: Option<DateTime<Utc>>,
68
69 pub to_timestamp: Option<DateTime<Utc>>,
71
72 pub entity_id: Option<String>,
74
75 pub event_type: Option<String>,
77
78 pub config: Option<ReplayConfig>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct StartReplayResponse {
85 pub replay_id: Uuid,
86 pub status: ReplayStatus,
87 pub started_at: DateTime<Utc>,
88 pub total_events: usize,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct ReplayProgress {
94 pub replay_id: Uuid,
95 pub status: ReplayStatus,
96 pub started_at: DateTime<Utc>,
97 pub updated_at: DateTime<Utc>,
98 pub completed_at: Option<DateTime<Utc>>,
99 pub total_events: usize,
100 pub processed_events: usize,
101 pub failed_events: usize,
102 pub progress_percentage: f64,
103 pub events_per_second: f64,
104 pub error_message: Option<String>,
105}
106
107pub struct ReplayManager {
109 replays: Arc<RwLock<Vec<ReplayState>>>,
111}
112
113struct ReplayState {
115 id: Uuid,
116 projection_name: Option<String>,
117 status: Arc<RwLock<ReplayStatus>>,
118 started_at: DateTime<Utc>,
119 completed_at: Arc<RwLock<Option<DateTime<Utc>>>>,
120 total_events: usize,
121 processed_events: Arc<AtomicU64>,
122 failed_events: Arc<AtomicU64>,
123 error_message: Arc<RwLock<Option<String>>>,
124 cancelled: Arc<AtomicBool>,
125}
126
127impl ReplayManager {
128 pub fn new() -> Self {
129 Self {
130 replays: Arc::new(RwLock::new(Vec::new())),
131 }
132 }
133
134 pub fn start_replay(
136 &self,
137 store: Arc<EventStore>,
138 request: StartReplayRequest,
139 ) -> Result<StartReplayResponse> {
140 let replay_id = Uuid::new_v4();
141 let started_at = Utc::now();
142 let config = request.config.unwrap_or_default();
143
144 let query = QueryEventsRequest {
146 entity_id: request.entity_id.clone(),
147 event_type: request.event_type.clone(),
148 tenant_id: None,
149 as_of: request.to_timestamp,
150 since: request.from_timestamp,
151 until: request.to_timestamp,
152 limit: None,
153 };
154
155 let events = store.query(query)?;
156 let total_events = events.len();
157
158 tracing::info!(
159 "🔄 Starting replay {} for {} events{}",
160 replay_id,
161 total_events,
162 request
163 .projection_name
164 .as_ref()
165 .map(|n| format!(" (projection: {})", n))
166 .unwrap_or_default()
167 );
168
169 let state = ReplayState {
171 id: replay_id,
172 projection_name: request.projection_name.clone(),
173 status: Arc::new(RwLock::new(ReplayStatus::Running)),
174 started_at,
175 completed_at: Arc::new(RwLock::new(None)),
176 total_events,
177 processed_events: Arc::new(AtomicU64::new(0)),
178 failed_events: Arc::new(AtomicU64::new(0)),
179 error_message: Arc::new(RwLock::new(None)),
180 cancelled: Arc::new(AtomicBool::new(false)),
181 };
182
183 self.replays.write().push(state);
185
186 let replays = Arc::clone(&self.replays);
188 let replay_idx = replays.read().len() - 1;
189
190 tokio::spawn(async move {
192 let result = Self::run_replay(
193 store,
194 events,
195 request.projection_name,
196 config,
197 replays.clone(),
198 replay_idx,
199 )
200 .await;
201
202 let mut replays_lock = replays.write();
204 if let Some(state) = replays_lock.get_mut(replay_idx) {
205 *state.completed_at.write() = Some(Utc::now());
206
207 match result {
208 Ok(_) => {
209 if state.cancelled.load(Ordering::Relaxed) {
210 *state.status.write() = ReplayStatus::Cancelled;
211 tracing::info!("🛑 Replay {} cancelled", state.id);
212 } else {
213 *state.status.write() = ReplayStatus::Completed;
214 tracing::info!("✅ Replay {} completed successfully", state.id);
215 }
216 }
217 Err(e) => {
218 *state.status.write() = ReplayStatus::Failed;
219 *state.error_message.write() = Some(e.to_string());
220 tracing::error!("❌ Replay {} failed: {}", state.id, e);
221 }
222 }
223 }
224 });
225
226 Ok(StartReplayResponse {
227 replay_id,
228 status: ReplayStatus::Running,
229 started_at,
230 total_events,
231 })
232 }
233
234 async fn run_replay(
236 store: Arc<EventStore>,
237 events: Vec<Event>,
238 projection_name: Option<String>,
239 config: ReplayConfig,
240 replays: Arc<RwLock<Vec<ReplayState>>>,
241 replay_idx: usize,
242 ) -> Result<()> {
243 let total = events.len();
244 let projections = store.projections.read();
245
246 let target_projections: Vec<(String, Arc<dyn Projection>)> =
248 if let Some(name) = projection_name {
249 if let Some(proj) = projections.get_projection(&name) {
250 vec![(name, proj)]
251 } else {
252 return Err(AllSourceError::ValidationError(format!(
253 "Projection not found: {}",
254 name
255 )));
256 }
257 } else {
258 projections.list_projections()
259 };
260
261 drop(projections); for (batch_idx, chunk) in events.chunks(config.batch_size).enumerate() {
265 {
267 let replays_lock = replays.read();
268 if let Some(state) = replays_lock.get(replay_idx) {
269 if state.cancelled.load(Ordering::Relaxed) {
270 return Ok(());
271 }
272 }
273 }
274
275 for event in chunk {
277 for (proj_name, projection) in &target_projections {
279 if let Err(e) = projection.process(event) {
280 tracing::warn!(
281 "Failed to process event {} in projection {}: {}",
282 event.id,
283 proj_name,
284 e
285 );
286
287 let replays_lock = replays.read();
289 if let Some(state) = replays_lock.get(replay_idx) {
290 state.failed_events.fetch_add(1, Ordering::Relaxed);
291 }
292 }
293 }
294
295 let replays_lock = replays.read();
297 if let Some(state) = replays_lock.get(replay_idx) {
298 let processed = state.processed_events.fetch_add(1, Ordering::Relaxed) + 1;
299
300 if config.emit_progress && processed % config.progress_interval as u64 == 0 {
302 let progress = (processed as f64 / total as f64) * 100.0;
303 tracing::debug!(
304 "Replay progress: {}/{} ({:.1}%)",
305 processed,
306 total,
307 progress
308 );
309 }
310 }
311 }
312 }
313
314 Ok(())
315 }
316
317 pub fn get_progress(&self, replay_id: Uuid) -> Result<ReplayProgress> {
319 let replays = self.replays.read();
320
321 let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
322 AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
323 })?;
324
325 let processed = state.processed_events.load(Ordering::Relaxed);
326 let failed = state.failed_events.load(Ordering::Relaxed);
327 let total_events = state.total_events;
328 let started_at = state.started_at;
329 let status = *state.status.read();
330 let completed_at = *state.completed_at.read();
331 let error_message = state.error_message.read().clone();
332
333 drop(replays); let progress_percentage = if total_events > 0 {
336 (processed as f64 / total_events as f64) * 100.0
337 } else {
338 0.0
339 };
340
341 let updated_at = Utc::now();
342 let elapsed_seconds = (updated_at - started_at).num_seconds().max(1) as f64;
343 let events_per_second = processed as f64 / elapsed_seconds;
344
345 Ok(ReplayProgress {
346 replay_id,
347 status,
348 started_at,
349 updated_at,
350 completed_at,
351 total_events,
352 processed_events: processed as usize,
353 failed_events: failed as usize,
354 progress_percentage,
355 events_per_second,
356 error_message,
357 })
358 }
359
360 pub fn cancel_replay(&self, replay_id: Uuid) -> Result<()> {
362 let replays = self.replays.read();
363
364 let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
365 AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
366 })?;
367
368 let status = *state.status.read();
369 if status != ReplayStatus::Running {
370 return Err(AllSourceError::ValidationError(format!(
371 "Cannot cancel replay in status: {:?}",
372 status
373 )));
374 }
375
376 state.cancelled.store(true, Ordering::Relaxed);
377 tracing::info!("🛑 Cancelling replay {}", replay_id);
378
379 Ok(())
380 }
381
382 pub fn list_replays(&self) -> Vec<ReplayProgress> {
384 let replays = self.replays.read();
385
386 replays
387 .iter()
388 .map(|state| {
389 let processed = state.processed_events.load(Ordering::Relaxed);
390 let failed = state.failed_events.load(Ordering::Relaxed);
391 let progress_percentage = if state.total_events > 0 {
392 (processed as f64 / state.total_events as f64) * 100.0
393 } else {
394 0.0
395 };
396
397 let updated_at = Utc::now();
398 let elapsed_seconds = (updated_at - state.started_at).num_seconds().max(1) as f64;
399 let events_per_second = processed as f64 / elapsed_seconds;
400
401 ReplayProgress {
402 replay_id: state.id,
403 status: *state.status.read(),
404 started_at: state.started_at,
405 updated_at,
406 completed_at: *state.completed_at.read(),
407 total_events: state.total_events,
408 processed_events: processed as usize,
409 failed_events: failed as usize,
410 progress_percentage,
411 events_per_second,
412 error_message: state.error_message.read().clone(),
413 }
414 })
415 .collect()
416 }
417
418 pub fn delete_replay(&self, replay_id: Uuid) -> Result<bool> {
420 let mut replays = self.replays.write();
421
422 let idx = replays
423 .iter()
424 .position(|r| r.id == replay_id)
425 .ok_or_else(|| {
426 AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
427 })?;
428
429 let status = *replays[idx].status.read();
430 if status == ReplayStatus::Running {
431 return Err(AllSourceError::ValidationError(
432 "Cannot delete a running replay. Cancel it first.".to_string(),
433 ));
434 }
435
436 replays.remove(idx);
437 tracing::info!("🗑️ Deleted replay {}", replay_id);
438
439 Ok(true)
440 }
441}
442
443impl Default for ReplayManager {
444 fn default() -> Self {
445 Self::new()
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use crate::domain::entities::Event;
453 use serde_json::json;
454
455 #[tokio::test]
456 async fn test_replay_manager_creation() {
457 let manager = ReplayManager::new();
458 let replays = manager.list_replays();
459 assert_eq!(replays.len(), 0);
460 }
461
462 #[tokio::test]
463 async fn test_replay_progress_tracking() {
464 let manager = ReplayManager::new();
465 let store = Arc::new(EventStore::new());
466
467 for i in 0..10 {
469 let event = Event::from_strings(
470 "test.event".to_string(),
471 "test-entity".to_string(),
472 "default".to_string(),
473 json!({"value": i}),
474 None,
475 )
476 .unwrap();
477 store.ingest(event).unwrap();
478 }
479
480 let request = StartReplayRequest {
482 projection_name: None,
483 from_timestamp: None,
484 to_timestamp: None,
485 entity_id: None,
486 event_type: None,
487 config: Some(ReplayConfig {
488 batch_size: 5,
489 parallel: false,
490 workers: 1,
491 emit_progress: true,
492 progress_interval: 5,
493 }),
494 };
495
496 let response = manager.start_replay(store, request).unwrap();
497 assert_eq!(response.status, ReplayStatus::Running);
498 assert_eq!(response.total_events, 10);
499
500 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
502
503 let progress = manager.get_progress(response.replay_id).unwrap();
505 assert!(progress.processed_events <= 10);
506 }
507}