allsource_core/
replay.rs

1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::application::dto::QueryEventsRequest;
4use crate::projection::Projection;
5use crate::store::EventStore;
6use chrono::{DateTime, Utc};
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::Arc;
11use uuid::Uuid;
12
13/// Status of a replay operation
14#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "lowercase")]
16pub enum ReplayStatus {
17    /// Replay is pending and hasn't started yet
18    Pending,
19    /// Replay is currently running
20    Running,
21    /// Replay completed successfully
22    Completed,
23    /// Replay failed with an error
24    Failed,
25    /// Replay was cancelled by user
26    Cancelled,
27}
28
29/// Configuration for replay operations
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct ReplayConfig {
32    /// Batch size for processing events
33    pub batch_size: usize,
34
35    /// Whether to run replay in parallel
36    pub parallel: bool,
37
38    /// Number of parallel workers (if parallel is true)
39    pub workers: usize,
40
41    /// Whether to emit progress events
42    pub emit_progress: bool,
43
44    /// Progress reporting interval (every N events)
45    pub progress_interval: usize,
46}
47
48impl Default for ReplayConfig {
49    fn default() -> Self {
50        Self {
51            batch_size: 1000,
52            parallel: false,
53            workers: 4,
54            emit_progress: true,
55            progress_interval: 1000,
56        }
57    }
58}
59
60/// Request to start a replay operation
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct StartReplayRequest {
63    /// Optional projection name to rebuild (if None, replays all projections)
64    pub projection_name: Option<String>,
65
66    /// Start from this timestamp (if None, starts from beginning)
67    pub from_timestamp: Option<DateTime<Utc>>,
68
69    /// End at this timestamp (if None, goes to end)
70    pub to_timestamp: Option<DateTime<Utc>>,
71
72    /// Filter by entity_id (optional)
73    pub entity_id: Option<String>,
74
75    /// Filter by event_type (optional)
76    pub event_type: Option<String>,
77
78    /// Replay configuration
79    pub config: Option<ReplayConfig>,
80}
81
82/// Response from starting a replay
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct StartReplayResponse {
85    pub replay_id: Uuid,
86    pub status: ReplayStatus,
87    pub started_at: DateTime<Utc>,
88    pub total_events: usize,
89}
90
91/// Progress information for a replay
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct ReplayProgress {
94    pub replay_id: Uuid,
95    pub status: ReplayStatus,
96    pub started_at: DateTime<Utc>,
97    pub updated_at: DateTime<Utc>,
98    pub completed_at: Option<DateTime<Utc>>,
99    pub total_events: usize,
100    pub processed_events: usize,
101    pub failed_events: usize,
102    pub progress_percentage: f64,
103    pub events_per_second: f64,
104    pub error_message: Option<String>,
105}
106
107/// Manages event replay and projection rebuilding
108pub struct ReplayManager {
109    /// Active replay operations
110    replays: Arc<RwLock<Vec<ReplayState>>>,
111}
112
113/// Internal state for a replay operation
114struct ReplayState {
115    id: Uuid,
116    projection_name: Option<String>,
117    status: Arc<RwLock<ReplayStatus>>,
118    started_at: DateTime<Utc>,
119    completed_at: Arc<RwLock<Option<DateTime<Utc>>>>,
120    total_events: usize,
121    processed_events: Arc<AtomicU64>,
122    failed_events: Arc<AtomicU64>,
123    error_message: Arc<RwLock<Option<String>>>,
124    cancelled: Arc<AtomicBool>,
125}
126
127impl ReplayManager {
128    pub fn new() -> Self {
129        Self {
130            replays: Arc::new(RwLock::new(Vec::new())),
131        }
132    }
133
134    /// Start a replay operation
135    pub fn start_replay(
136        &self,
137        store: Arc<EventStore>,
138        request: StartReplayRequest,
139    ) -> Result<StartReplayResponse> {
140        let replay_id = Uuid::new_v4();
141        let started_at = Utc::now();
142        let config = request.config.unwrap_or_default();
143
144        // Query events to replay
145        let query = QueryEventsRequest {
146            entity_id: request.entity_id.clone(),
147            event_type: request.event_type.clone(),
148            tenant_id: None,
149            as_of: request.to_timestamp,
150            since: request.from_timestamp,
151            until: request.to_timestamp,
152            limit: None,
153        };
154
155        let events = store.query(query)?;
156        let total_events = events.len();
157
158        tracing::info!(
159            "🔄 Starting replay {} for {} events{}",
160            replay_id,
161            total_events,
162            request.projection_name
163                .as_ref()
164                .map(|n| format!(" (projection: {})", n))
165                .unwrap_or_default()
166        );
167
168        // Create replay state
169        let state = ReplayState {
170            id: replay_id,
171            projection_name: request.projection_name.clone(),
172            status: Arc::new(RwLock::new(ReplayStatus::Running)),
173            started_at,
174            completed_at: Arc::new(RwLock::new(None)),
175            total_events,
176            processed_events: Arc::new(AtomicU64::new(0)),
177            failed_events: Arc::new(AtomicU64::new(0)),
178            error_message: Arc::new(RwLock::new(None)),
179            cancelled: Arc::new(AtomicBool::new(false)),
180        };
181
182        // Store replay state
183        self.replays.write().push(state);
184
185        // Get the state references we need
186        let replays = Arc::clone(&self.replays);
187        let replay_idx = replays.read().len() - 1;
188
189        // Spawn replay task
190        tokio::spawn(async move {
191            let result = Self::run_replay(
192                store,
193                events,
194                request.projection_name,
195                config,
196                replays.clone(),
197                replay_idx,
198            ).await;
199
200            // Update final status
201            let mut replays_lock = replays.write();
202            if let Some(state) = replays_lock.get_mut(replay_idx) {
203                *state.completed_at.write() = Some(Utc::now());
204
205                match result {
206                    Ok(_) => {
207                        if state.cancelled.load(Ordering::Relaxed) {
208                            *state.status.write() = ReplayStatus::Cancelled;
209                            tracing::info!("🛑 Replay {} cancelled", state.id);
210                        } else {
211                            *state.status.write() = ReplayStatus::Completed;
212                            tracing::info!("✅ Replay {} completed successfully", state.id);
213                        }
214                    }
215                    Err(e) => {
216                        *state.status.write() = ReplayStatus::Failed;
217                        *state.error_message.write() = Some(e.to_string());
218                        tracing::error!("❌ Replay {} failed: {}", state.id, e);
219                    }
220                }
221            }
222        });
223
224        Ok(StartReplayResponse {
225            replay_id,
226            status: ReplayStatus::Running,
227            started_at,
228            total_events,
229        })
230    }
231
232    /// Internal replay execution
233    async fn run_replay(
234        store: Arc<EventStore>,
235        events: Vec<Event>,
236        projection_name: Option<String>,
237        config: ReplayConfig,
238        replays: Arc<RwLock<Vec<ReplayState>>>,
239        replay_idx: usize,
240    ) -> Result<()> {
241        let total = events.len();
242        let projections = store.projections.read();
243
244        // Get target projection(s)
245        let target_projections: Vec<(String, Arc<dyn Projection>)> = if let Some(name) = projection_name {
246            if let Some(proj) = projections.get_projection(&name) {
247                vec![(name, proj)]
248            } else {
249                return Err(AllSourceError::ValidationError(format!(
250                    "Projection not found: {}",
251                    name
252                )));
253            }
254        } else {
255            projections.list_projections()
256        };
257
258        drop(projections); // Release lock
259
260        // Process events in batches
261        for (batch_idx, chunk) in events.chunks(config.batch_size).enumerate() {
262            // Check if cancelled
263            {
264                let replays_lock = replays.read();
265                if let Some(state) = replays_lock.get(replay_idx) {
266                    if state.cancelled.load(Ordering::Relaxed) {
267                        return Ok(());
268                    }
269                }
270            }
271
272            // Process batch
273            for event in chunk {
274                // Apply event to each target projection
275                for (proj_name, projection) in &target_projections {
276                    if let Err(e) = projection.process(event) {
277                        tracing::warn!(
278                            "Failed to process event {} in projection {}: {}",
279                            event.id,
280                            proj_name,
281                            e
282                        );
283
284                        // Increment failed counter
285                        let replays_lock = replays.read();
286                        if let Some(state) = replays_lock.get(replay_idx) {
287                            state.failed_events.fetch_add(1, Ordering::Relaxed);
288                        }
289                    }
290                }
291
292                // Increment processed counter
293                let replays_lock = replays.read();
294                if let Some(state) = replays_lock.get(replay_idx) {
295                    let processed = state.processed_events.fetch_add(1, Ordering::Relaxed) + 1;
296
297                    // Emit progress
298                    if config.emit_progress && processed % config.progress_interval as u64 == 0 {
299                        let progress = (processed as f64 / total as f64) * 100.0;
300                        tracing::debug!(
301                            "Replay progress: {}/{} ({:.1}%)",
302                            processed,
303                            total,
304                            progress
305                        );
306                    }
307                }
308            }
309        }
310
311        Ok(())
312    }
313
314    /// Get progress for a replay operation
315    pub fn get_progress(&self, replay_id: Uuid) -> Result<ReplayProgress> {
316        let replays = self.replays.read();
317
318        let state = replays
319            .iter()
320            .find(|r| r.id == replay_id)
321            .ok_or_else(|| {
322                AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
323            })?;
324
325        let processed = state.processed_events.load(Ordering::Relaxed);
326        let failed = state.failed_events.load(Ordering::Relaxed);
327        let total_events = state.total_events;
328        let started_at = state.started_at;
329        let status = *state.status.read();
330        let completed_at = *state.completed_at.read();
331        let error_message = state.error_message.read().clone();
332
333        drop(replays); // Release lock before calculations
334
335        let progress_percentage = if total_events > 0 {
336            (processed as f64 / total_events as f64) * 100.0
337        } else {
338            0.0
339        };
340
341        let updated_at = Utc::now();
342        let elapsed_seconds = (updated_at - started_at).num_seconds().max(1) as f64;
343        let events_per_second = processed as f64 / elapsed_seconds;
344
345        Ok(ReplayProgress {
346            replay_id,
347            status,
348            started_at,
349            updated_at,
350            completed_at,
351            total_events,
352            processed_events: processed as usize,
353            failed_events: failed as usize,
354            progress_percentage,
355            events_per_second,
356            error_message,
357        })
358    }
359
360    /// Cancel a running replay
361    pub fn cancel_replay(&self, replay_id: Uuid) -> Result<()> {
362        let replays = self.replays.read();
363
364        let state = replays
365            .iter()
366            .find(|r| r.id == replay_id)
367            .ok_or_else(|| {
368                AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
369            })?;
370
371        let status = *state.status.read();
372        if status != ReplayStatus::Running {
373            return Err(AllSourceError::ValidationError(format!(
374                "Cannot cancel replay in status: {:?}",
375                status
376            )));
377        }
378
379        state.cancelled.store(true, Ordering::Relaxed);
380        tracing::info!("🛑 Cancelling replay {}", replay_id);
381
382        Ok(())
383    }
384
385    /// List all replay operations
386    pub fn list_replays(&self) -> Vec<ReplayProgress> {
387        let replays = self.replays.read();
388
389        replays
390            .iter()
391            .map(|state| {
392                let processed = state.processed_events.load(Ordering::Relaxed);
393                let failed = state.failed_events.load(Ordering::Relaxed);
394                let progress_percentage = if state.total_events > 0 {
395                    (processed as f64 / state.total_events as f64) * 100.0
396                } else {
397                    0.0
398                };
399
400                let updated_at = Utc::now();
401                let elapsed_seconds = (updated_at - state.started_at).num_seconds().max(1) as f64;
402                let events_per_second = processed as f64 / elapsed_seconds;
403
404                ReplayProgress {
405                    replay_id: state.id,
406                    status: *state.status.read(),
407                    started_at: state.started_at,
408                    updated_at,
409                    completed_at: *state.completed_at.read(),
410                    total_events: state.total_events,
411                    processed_events: processed as usize,
412                    failed_events: failed as usize,
413                    progress_percentage,
414                    events_per_second,
415                    error_message: state.error_message.read().clone(),
416                }
417            })
418            .collect()
419    }
420
421    /// Delete a completed or failed replay from history
422    pub fn delete_replay(&self, replay_id: Uuid) -> Result<bool> {
423        let mut replays = self.replays.write();
424
425        let idx = replays
426            .iter()
427            .position(|r| r.id == replay_id)
428            .ok_or_else(|| {
429                AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
430            })?;
431
432        let status = *replays[idx].status.read();
433        if status == ReplayStatus::Running {
434            return Err(AllSourceError::ValidationError(
435                "Cannot delete a running replay. Cancel it first.".to_string(),
436            ));
437        }
438
439        replays.remove(idx);
440        tracing::info!("🗑️  Deleted replay {}", replay_id);
441
442        Ok(true)
443    }
444}
445
446impl Default for ReplayManager {
447    fn default() -> Self {
448        Self::new()
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455    use crate::domain::entities::Event;
456    use serde_json::json;
457
458    #[tokio::test]
459    async fn test_replay_manager_creation() {
460        let manager = ReplayManager::new();
461        let replays = manager.list_replays();
462        assert_eq!(replays.len(), 0);
463    }
464
465    #[tokio::test]
466    async fn test_replay_progress_tracking() {
467        let manager = ReplayManager::new();
468        let store = Arc::new(EventStore::new());
469
470        // Ingest some test events
471        for i in 0..10 {
472            let event = Event::from_strings(
473                "test.event".to_string(),
474                "test-entity".to_string(),
475                "default".to_string(),
476                json!({"value": i}),
477                None,
478            ).unwrap();
479            store.ingest(event).unwrap();
480        }
481
482        // Start replay
483        let request = StartReplayRequest {
484            projection_name: None,
485            from_timestamp: None,
486            to_timestamp: None,
487            entity_id: None,
488            event_type: None,
489            config: Some(ReplayConfig {
490                batch_size: 5,
491                parallel: false,
492                workers: 1,
493                emit_progress: true,
494                progress_interval: 5,
495            }),
496        };
497
498        let response = manager.start_replay(store, request).unwrap();
499        assert_eq!(response.status, ReplayStatus::Running);
500        assert_eq!(response.total_events, 10);
501
502        // Wait a bit for replay to process
503        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
504
505        // Check progress
506        let progress = manager.get_progress(response.replay_id).unwrap();
507        assert!(progress.processed_events <= 10);
508    }
509}