allsource_core/application/services/
replay.rs

1use crate::application::dto::QueryEventsRequest;
2use crate::application::services::projection::Projection;
3use crate::domain::entities::Event;
4use crate::error::{AllSourceError, Result};
5use crate::store::EventStore;
6use chrono::{DateTime, Utc};
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::Arc;
11use uuid::Uuid;
12
13/// Status of a replay operation
14#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "lowercase")]
16pub enum ReplayStatus {
17    /// Replay is pending and hasn't started yet
18    Pending,
19    /// Replay is currently running
20    Running,
21    /// Replay completed successfully
22    Completed,
23    /// Replay failed with an error
24    Failed,
25    /// Replay was cancelled by user
26    Cancelled,
27}
28
29/// Configuration for replay operations
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct ReplayConfig {
32    /// Batch size for processing events
33    pub batch_size: usize,
34
35    /// Whether to run replay in parallel
36    pub parallel: bool,
37
38    /// Number of parallel workers (if parallel is true)
39    pub workers: usize,
40
41    /// Whether to emit progress events
42    pub emit_progress: bool,
43
44    /// Progress reporting interval (every N events)
45    pub progress_interval: usize,
46}
47
48impl Default for ReplayConfig {
49    fn default() -> Self {
50        Self {
51            batch_size: 1000,
52            parallel: false,
53            workers: 4,
54            emit_progress: true,
55            progress_interval: 1000,
56        }
57    }
58}
59
60/// Request to start a replay operation
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct StartReplayRequest {
63    /// Optional projection name to rebuild (if None, replays all projections)
64    pub projection_name: Option<String>,
65
66    /// Start from this timestamp (if None, starts from beginning)
67    pub from_timestamp: Option<DateTime<Utc>>,
68
69    /// End at this timestamp (if None, goes to end)
70    pub to_timestamp: Option<DateTime<Utc>>,
71
72    /// Filter by entity_id (optional)
73    pub entity_id: Option<String>,
74
75    /// Filter by event_type (optional)
76    pub event_type: Option<String>,
77
78    /// Replay configuration
79    pub config: Option<ReplayConfig>,
80}
81
82/// Response from starting a replay
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct StartReplayResponse {
85    pub replay_id: Uuid,
86    pub status: ReplayStatus,
87    pub started_at: DateTime<Utc>,
88    pub total_events: usize,
89}
90
91/// Progress information for a replay
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct ReplayProgress {
94    pub replay_id: Uuid,
95    pub status: ReplayStatus,
96    pub started_at: DateTime<Utc>,
97    pub updated_at: DateTime<Utc>,
98    pub completed_at: Option<DateTime<Utc>>,
99    pub total_events: usize,
100    pub processed_events: usize,
101    pub failed_events: usize,
102    pub progress_percentage: f64,
103    pub events_per_second: f64,
104    pub error_message: Option<String>,
105}
106
107/// Manages event replay and projection rebuilding
108pub struct ReplayManager {
109    /// Active replay operations
110    replays: Arc<RwLock<Vec<ReplayState>>>,
111}
112
113/// Internal state for a replay operation
114struct ReplayState {
115    id: Uuid,
116    projection_name: Option<String>,
117    status: Arc<RwLock<ReplayStatus>>,
118    started_at: DateTime<Utc>,
119    completed_at: Arc<RwLock<Option<DateTime<Utc>>>>,
120    total_events: usize,
121    processed_events: Arc<AtomicU64>,
122    failed_events: Arc<AtomicU64>,
123    error_message: Arc<RwLock<Option<String>>>,
124    cancelled: Arc<AtomicBool>,
125}
126
127impl ReplayManager {
128    pub fn new() -> Self {
129        Self {
130            replays: Arc::new(RwLock::new(Vec::new())),
131        }
132    }
133
134    /// Start a replay operation
135    pub fn start_replay(
136        &self,
137        store: Arc<EventStore>,
138        request: StartReplayRequest,
139    ) -> Result<StartReplayResponse> {
140        let replay_id = Uuid::new_v4();
141        let started_at = Utc::now();
142        let config = request.config.unwrap_or_default();
143
144        // Query events to replay
145        let query = QueryEventsRequest {
146            entity_id: request.entity_id.clone(),
147            event_type: request.event_type.clone(),
148            tenant_id: None,
149            as_of: request.to_timestamp,
150            since: request.from_timestamp,
151            until: request.to_timestamp,
152            limit: None,
153        };
154
155        let events = store.query(query)?;
156        let total_events = events.len();
157
158        tracing::info!(
159            "🔄 Starting replay {} for {} events{}",
160            replay_id,
161            total_events,
162            request
163                .projection_name
164                .as_ref()
165                .map(|n| format!(" (projection: {})", n))
166                .unwrap_or_default()
167        );
168
169        // Create replay state
170        let state = ReplayState {
171            id: replay_id,
172            projection_name: request.projection_name.clone(),
173            status: Arc::new(RwLock::new(ReplayStatus::Running)),
174            started_at,
175            completed_at: Arc::new(RwLock::new(None)),
176            total_events,
177            processed_events: Arc::new(AtomicU64::new(0)),
178            failed_events: Arc::new(AtomicU64::new(0)),
179            error_message: Arc::new(RwLock::new(None)),
180            cancelled: Arc::new(AtomicBool::new(false)),
181        };
182
183        // Store replay state
184        self.replays.write().push(state);
185
186        // Get the state references we need
187        let replays = Arc::clone(&self.replays);
188        let replay_idx = replays.read().len() - 1;
189
190        // Spawn replay task
191        tokio::spawn(async move {
192            let result = Self::run_replay(
193                store,
194                events,
195                request.projection_name,
196                config,
197                replays.clone(),
198                replay_idx,
199            )
200            .await;
201
202            // Update final status
203            let mut replays_lock = replays.write();
204            if let Some(state) = replays_lock.get_mut(replay_idx) {
205                *state.completed_at.write() = Some(Utc::now());
206
207                match result {
208                    Ok(_) => {
209                        if state.cancelled.load(Ordering::Relaxed) {
210                            *state.status.write() = ReplayStatus::Cancelled;
211                            tracing::info!("🛑 Replay {} cancelled", state.id);
212                        } else {
213                            *state.status.write() = ReplayStatus::Completed;
214                            tracing::info!("✅ Replay {} completed successfully", state.id);
215                        }
216                    }
217                    Err(e) => {
218                        *state.status.write() = ReplayStatus::Failed;
219                        *state.error_message.write() = Some(e.to_string());
220                        tracing::error!("❌ Replay {} failed: {}", state.id, e);
221                    }
222                }
223            }
224        });
225
226        Ok(StartReplayResponse {
227            replay_id,
228            status: ReplayStatus::Running,
229            started_at,
230            total_events,
231        })
232    }
233
234    /// Internal replay execution
235    async fn run_replay(
236        store: Arc<EventStore>,
237        events: Vec<Event>,
238        projection_name: Option<String>,
239        config: ReplayConfig,
240        replays: Arc<RwLock<Vec<ReplayState>>>,
241        replay_idx: usize,
242    ) -> Result<()> {
243        let total = events.len();
244        let projections = store.projections.read();
245
246        // Get target projection(s)
247        let target_projections: Vec<(String, Arc<dyn Projection>)> =
248            if let Some(name) = projection_name {
249                if let Some(proj) = projections.get_projection(&name) {
250                    vec![(name, proj)]
251                } else {
252                    return Err(AllSourceError::ValidationError(format!(
253                        "Projection not found: {}",
254                        name
255                    )));
256                }
257            } else {
258                projections.list_projections()
259            };
260
261        drop(projections); // Release lock
262
263        // Process events in batches
264        for (batch_idx, chunk) in events.chunks(config.batch_size).enumerate() {
265            // Check if cancelled
266            {
267                let replays_lock = replays.read();
268                if let Some(state) = replays_lock.get(replay_idx) {
269                    if state.cancelled.load(Ordering::Relaxed) {
270                        return Ok(());
271                    }
272                }
273            }
274
275            // Process batch
276            for event in chunk {
277                // Apply event to each target projection
278                for (proj_name, projection) in &target_projections {
279                    if let Err(e) = projection.process(event) {
280                        tracing::warn!(
281                            "Failed to process event {} in projection {}: {}",
282                            event.id,
283                            proj_name,
284                            e
285                        );
286
287                        // Increment failed counter
288                        let replays_lock = replays.read();
289                        if let Some(state) = replays_lock.get(replay_idx) {
290                            state.failed_events.fetch_add(1, Ordering::Relaxed);
291                        }
292                    }
293                }
294
295                // Increment processed counter
296                let replays_lock = replays.read();
297                if let Some(state) = replays_lock.get(replay_idx) {
298                    let processed = state.processed_events.fetch_add(1, Ordering::Relaxed) + 1;
299
300                    // Emit progress
301                    if config.emit_progress && processed % config.progress_interval as u64 == 0 {
302                        let progress = (processed as f64 / total as f64) * 100.0;
303                        tracing::debug!(
304                            "Replay progress: {}/{} ({:.1}%)",
305                            processed,
306                            total,
307                            progress
308                        );
309                    }
310                }
311            }
312        }
313
314        Ok(())
315    }
316
317    /// Get progress for a replay operation
318    pub fn get_progress(&self, replay_id: Uuid) -> Result<ReplayProgress> {
319        let replays = self.replays.read();
320
321        let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
322            AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
323        })?;
324
325        let processed = state.processed_events.load(Ordering::Relaxed);
326        let failed = state.failed_events.load(Ordering::Relaxed);
327        let total_events = state.total_events;
328        let started_at = state.started_at;
329        let status = *state.status.read();
330        let completed_at = *state.completed_at.read();
331        let error_message = state.error_message.read().clone();
332
333        drop(replays); // Release lock before calculations
334
335        let progress_percentage = if total_events > 0 {
336            (processed as f64 / total_events as f64) * 100.0
337        } else {
338            0.0
339        };
340
341        let updated_at = Utc::now();
342        let elapsed_seconds = (updated_at - started_at).num_seconds().max(1) as f64;
343        let events_per_second = processed as f64 / elapsed_seconds;
344
345        Ok(ReplayProgress {
346            replay_id,
347            status,
348            started_at,
349            updated_at,
350            completed_at,
351            total_events,
352            processed_events: processed as usize,
353            failed_events: failed as usize,
354            progress_percentage,
355            events_per_second,
356            error_message,
357        })
358    }
359
360    /// Cancel a running replay
361    pub fn cancel_replay(&self, replay_id: Uuid) -> Result<()> {
362        let replays = self.replays.read();
363
364        let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
365            AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
366        })?;
367
368        let status = *state.status.read();
369        if status != ReplayStatus::Running {
370            return Err(AllSourceError::ValidationError(format!(
371                "Cannot cancel replay in status: {:?}",
372                status
373            )));
374        }
375
376        state.cancelled.store(true, Ordering::Relaxed);
377        tracing::info!("🛑 Cancelling replay {}", replay_id);
378
379        Ok(())
380    }
381
382    /// List all replay operations
383    pub fn list_replays(&self) -> Vec<ReplayProgress> {
384        let replays = self.replays.read();
385
386        replays
387            .iter()
388            .map(|state| {
389                let processed = state.processed_events.load(Ordering::Relaxed);
390                let failed = state.failed_events.load(Ordering::Relaxed);
391                let progress_percentage = if state.total_events > 0 {
392                    (processed as f64 / state.total_events as f64) * 100.0
393                } else {
394                    0.0
395                };
396
397                let updated_at = Utc::now();
398                let elapsed_seconds = (updated_at - state.started_at).num_seconds().max(1) as f64;
399                let events_per_second = processed as f64 / elapsed_seconds;
400
401                ReplayProgress {
402                    replay_id: state.id,
403                    status: *state.status.read(),
404                    started_at: state.started_at,
405                    updated_at,
406                    completed_at: *state.completed_at.read(),
407                    total_events: state.total_events,
408                    processed_events: processed as usize,
409                    failed_events: failed as usize,
410                    progress_percentage,
411                    events_per_second,
412                    error_message: state.error_message.read().clone(),
413                }
414            })
415            .collect()
416    }
417
418    /// Delete a completed or failed replay from history
419    pub fn delete_replay(&self, replay_id: Uuid) -> Result<bool> {
420        let mut replays = self.replays.write();
421
422        let idx = replays
423            .iter()
424            .position(|r| r.id == replay_id)
425            .ok_or_else(|| {
426                AllSourceError::ValidationError(format!("Replay not found: {}", replay_id))
427            })?;
428
429        let status = *replays[idx].status.read();
430        if status == ReplayStatus::Running {
431            return Err(AllSourceError::ValidationError(
432                "Cannot delete a running replay. Cancel it first.".to_string(),
433            ));
434        }
435
436        replays.remove(idx);
437        tracing::info!("🗑️  Deleted replay {}", replay_id);
438
439        Ok(true)
440    }
441}
442
443impl Default for ReplayManager {
444    fn default() -> Self {
445        Self::new()
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452    use crate::domain::entities::Event;
453    use serde_json::json;
454
455    #[tokio::test]
456    async fn test_replay_manager_creation() {
457        let manager = ReplayManager::new();
458        let replays = manager.list_replays();
459        assert_eq!(replays.len(), 0);
460    }
461
462    #[tokio::test]
463    async fn test_replay_progress_tracking() {
464        let manager = ReplayManager::new();
465        let store = Arc::new(EventStore::new());
466
467        // Ingest some test events
468        for i in 0..10 {
469            let event = Event::from_strings(
470                "test.event".to_string(),
471                "test-entity".to_string(),
472                "default".to_string(),
473                json!({"value": i}),
474                None,
475            )
476            .unwrap();
477            store.ingest(event).unwrap();
478        }
479
480        // Start replay
481        let request = StartReplayRequest {
482            projection_name: None,
483            from_timestamp: None,
484            to_timestamp: None,
485            entity_id: None,
486            event_type: None,
487            config: Some(ReplayConfig {
488                batch_size: 5,
489                parallel: false,
490                workers: 1,
491                emit_progress: true,
492                progress_interval: 5,
493            }),
494        };
495
496        let response = manager.start_replay(store, request).unwrap();
497        assert_eq!(response.status, ReplayStatus::Running);
498        assert_eq!(response.total_events, 10);
499
500        // Wait a bit for replay to process
501        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
502
503        // Check progress
504        let progress = manager.get_progress(response.replay_id).unwrap();
505        assert!(progress.processed_events <= 10);
506    }
507}