Skip to main content

allsource_core/application/services/
replay.rs

1use crate::{
2    application::{dto::QueryEventsRequest, services::projection::Projection},
3    domain::entities::Event,
4    error::{AllSourceError, Result},
5    store::EventStore,
6};
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::sync::{
11    Arc,
12    atomic::{AtomicBool, AtomicU64, Ordering},
13};
14use uuid::Uuid;
15
16/// Status of a replay operation
17#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(rename_all = "lowercase")]
19pub enum ReplayStatus {
20    /// Replay is pending and hasn't started yet
21    Pending,
22    /// Replay is currently running
23    Running,
24    /// Replay completed successfully
25    Completed,
26    /// Replay failed with an error
27    Failed,
28    /// Replay was cancelled by user
29    Cancelled,
30}
31
32/// Configuration for replay operations
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ReplayConfig {
35    /// Batch size for processing events
36    pub batch_size: usize,
37
38    /// Whether to run replay in parallel
39    pub parallel: bool,
40
41    /// Number of parallel workers (if parallel is true)
42    pub workers: usize,
43
44    /// Whether to emit progress events
45    pub emit_progress: bool,
46
47    /// Progress reporting interval (every N events)
48    pub progress_interval: usize,
49}
50
51impl Default for ReplayConfig {
52    fn default() -> Self {
53        Self {
54            batch_size: 1000,
55            parallel: false,
56            workers: 4,
57            emit_progress: true,
58            progress_interval: 1000,
59        }
60    }
61}
62
63/// Request to start a replay operation
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct StartReplayRequest {
66    /// Optional projection name to rebuild (if None, replays all projections)
67    pub projection_name: Option<String>,
68
69    /// Start from this timestamp (if None, starts from beginning)
70    pub from_timestamp: Option<DateTime<Utc>>,
71
72    /// End at this timestamp (if None, goes to end)
73    pub to_timestamp: Option<DateTime<Utc>>,
74
75    /// Filter by entity_id (optional)
76    pub entity_id: Option<String>,
77
78    /// Filter by event_type (optional)
79    pub event_type: Option<String>,
80
81    /// Replay configuration
82    pub config: Option<ReplayConfig>,
83}
84
85/// Response from starting a replay
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct StartReplayResponse {
88    pub replay_id: Uuid,
89    pub status: ReplayStatus,
90    pub started_at: DateTime<Utc>,
91    pub total_events: usize,
92}
93
94/// Progress information for a replay
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct ReplayProgress {
97    pub replay_id: Uuid,
98    pub status: ReplayStatus,
99    pub started_at: DateTime<Utc>,
100    pub updated_at: DateTime<Utc>,
101    pub completed_at: Option<DateTime<Utc>>,
102    pub total_events: usize,
103    pub processed_events: usize,
104    pub failed_events: usize,
105    pub progress_percentage: f64,
106    pub events_per_second: f64,
107    pub error_message: Option<String>,
108}
109
110/// Manages event replay and projection rebuilding
111pub struct ReplayManager {
112    /// Active replay operations
113    replays: Arc<RwLock<Vec<ReplayState>>>,
114}
115
116/// Internal state for a replay operation
117struct ReplayState {
118    id: Uuid,
119    projection_name: Option<String>,
120    status: Arc<RwLock<ReplayStatus>>,
121    started_at: DateTime<Utc>,
122    completed_at: Arc<RwLock<Option<DateTime<Utc>>>>,
123    total_events: usize,
124    processed_events: Arc<AtomicU64>,
125    failed_events: Arc<AtomicU64>,
126    error_message: Arc<RwLock<Option<String>>>,
127    cancelled: Arc<AtomicBool>,
128}
129
130impl ReplayManager {
131    pub fn new() -> Self {
132        Self {
133            replays: Arc::new(RwLock::new(Vec::new())),
134        }
135    }
136
137    /// Start a replay operation
138    pub fn start_replay(
139        &self,
140        store: Arc<EventStore>,
141        request: StartReplayRequest,
142    ) -> Result<StartReplayResponse> {
143        let replay_id = Uuid::new_v4();
144        let started_at = Utc::now();
145        let config = request.config.unwrap_or_default();
146
147        // Query events to replay
148        let query = QueryEventsRequest {
149            entity_id: request.entity_id.clone(),
150            event_type: request.event_type.clone(),
151            tenant_id: None,
152            as_of: request.to_timestamp,
153            since: request.from_timestamp,
154            until: request.to_timestamp,
155            limit: None,
156            event_type_prefix: None,
157            payload_filter: None,
158        };
159
160        let events = store.query(&query)?;
161        let total_events = events.len();
162
163        tracing::info!(
164            "🔄 Starting replay {} for {} events{}",
165            replay_id,
166            total_events,
167            request
168                .projection_name
169                .as_ref()
170                .map(|n| format!(" (projection: {n})"))
171                .unwrap_or_default()
172        );
173
174        // Create replay state
175        let state = ReplayState {
176            id: replay_id,
177            projection_name: request.projection_name.clone(),
178            status: Arc::new(RwLock::new(ReplayStatus::Running)),
179            started_at,
180            completed_at: Arc::new(RwLock::new(None)),
181            total_events,
182            processed_events: Arc::new(AtomicU64::new(0)),
183            failed_events: Arc::new(AtomicU64::new(0)),
184            error_message: Arc::new(RwLock::new(None)),
185            cancelled: Arc::new(AtomicBool::new(false)),
186        };
187
188        // Store replay state
189        self.replays.write().push(state);
190
191        // Get the state references we need
192        let replays = Arc::clone(&self.replays);
193        let replay_idx = replays.read().len() - 1;
194
195        // Spawn replay task
196        tokio::spawn(async move {
197            let result = Self::run_replay(
198                store,
199                events,
200                request.projection_name,
201                config,
202                replays.clone(),
203                replay_idx,
204            )
205            .await;
206
207            // Update final status
208            let mut replays_lock = replays.write();
209            if let Some(state) = replays_lock.get_mut(replay_idx) {
210                *state.completed_at.write() = Some(Utc::now());
211
212                match result {
213                    Ok(()) => {
214                        if state.cancelled.load(Ordering::Relaxed) {
215                            *state.status.write() = ReplayStatus::Cancelled;
216                            tracing::info!("🛑 Replay {} cancelled", state.id);
217                        } else {
218                            *state.status.write() = ReplayStatus::Completed;
219                            tracing::info!("✅ Replay {} completed successfully", state.id);
220                        }
221                    }
222                    Err(e) => {
223                        *state.status.write() = ReplayStatus::Failed;
224                        *state.error_message.write() = Some(e.to_string());
225                        tracing::error!("❌ Replay {} failed: {}", state.id, e);
226                    }
227                }
228            }
229        });
230
231        Ok(StartReplayResponse {
232            replay_id,
233            status: ReplayStatus::Running,
234            started_at,
235            total_events,
236        })
237    }
238
239    /// Internal replay execution
240    async fn run_replay(
241        store: Arc<EventStore>,
242        events: Vec<Event>,
243        projection_name: Option<String>,
244        config: ReplayConfig,
245        replays: Arc<RwLock<Vec<ReplayState>>>,
246        replay_idx: usize,
247    ) -> Result<()> {
248        let total = events.len();
249        let projections = store.projections.read();
250
251        // Get target projection(s)
252        let target_projections: Vec<(String, Arc<dyn Projection>)> =
253            if let Some(name) = projection_name {
254                if let Some(proj) = projections.get_projection(&name) {
255                    vec![(name, proj)]
256                } else {
257                    return Err(AllSourceError::ValidationError(format!(
258                        "Projection not found: {name}"
259                    )));
260                }
261            } else {
262                projections.list_projections()
263            };
264
265        drop(projections); // Release lock
266
267        // Process events in batches
268        for (batch_idx, chunk) in events.chunks(config.batch_size).enumerate() {
269            // Check if cancelled
270            {
271                let replays_lock = replays.read();
272                if let Some(state) = replays_lock.get(replay_idx)
273                    && state.cancelled.load(Ordering::Relaxed)
274                {
275                    return Ok(());
276                }
277            }
278
279            // Process batch
280            for event in chunk {
281                // Apply event to each target projection
282                for (proj_name, projection) in &target_projections {
283                    if let Err(e) = projection.process(event) {
284                        tracing::warn!(
285                            "Failed to process event {} in projection {}: {}",
286                            event.id,
287                            proj_name,
288                            e
289                        );
290
291                        // Increment failed counter
292                        let replays_lock = replays.read();
293                        if let Some(state) = replays_lock.get(replay_idx) {
294                            state.failed_events.fetch_add(1, Ordering::Relaxed);
295                        }
296                    }
297                }
298
299                // Increment processed counter
300                let replays_lock = replays.read();
301                if let Some(state) = replays_lock.get(replay_idx) {
302                    let processed = state.processed_events.fetch_add(1, Ordering::Relaxed) + 1;
303
304                    // Emit progress
305                    if config.emit_progress && processed % config.progress_interval as u64 == 0 {
306                        let progress = (processed as f64 / total as f64) * 100.0;
307                        tracing::debug!(
308                            "Replay progress: {}/{} ({:.1}%)",
309                            processed,
310                            total,
311                            progress
312                        );
313                    }
314                }
315            }
316        }
317
318        Ok(())
319    }
320
321    /// Get progress for a replay operation
322    pub fn get_progress(&self, replay_id: Uuid) -> Result<ReplayProgress> {
323        let replays = self.replays.read();
324
325        let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
326            AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
327        })?;
328
329        let processed = state.processed_events.load(Ordering::Relaxed);
330        let failed = state.failed_events.load(Ordering::Relaxed);
331        let total_events = state.total_events;
332        let started_at = state.started_at;
333        let status = *state.status.read();
334        let completed_at = *state.completed_at.read();
335        let error_message = state.error_message.read().clone();
336
337        drop(replays); // Release lock before calculations
338
339        let progress_percentage = if total_events > 0 {
340            (processed as f64 / total_events as f64) * 100.0
341        } else {
342            0.0
343        };
344
345        let updated_at = Utc::now();
346        let elapsed_seconds = (updated_at - started_at).num_seconds().max(1) as f64;
347        let events_per_second = processed as f64 / elapsed_seconds;
348
349        Ok(ReplayProgress {
350            replay_id,
351            status,
352            started_at,
353            updated_at,
354            completed_at,
355            total_events,
356            processed_events: processed as usize,
357            failed_events: failed as usize,
358            progress_percentage,
359            events_per_second,
360            error_message,
361        })
362    }
363
364    /// Cancel a running replay
365    pub fn cancel_replay(&self, replay_id: Uuid) -> Result<()> {
366        let replays = self.replays.read();
367
368        let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
369            AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
370        })?;
371
372        let status = *state.status.read();
373        if status != ReplayStatus::Running {
374            return Err(AllSourceError::ValidationError(format!(
375                "Cannot cancel replay in status: {status:?}"
376            )));
377        }
378
379        state.cancelled.store(true, Ordering::Relaxed);
380        tracing::info!("🛑 Cancelling replay {}", replay_id);
381
382        Ok(())
383    }
384
385    /// List all replay operations
386    pub fn list_replays(&self) -> Vec<ReplayProgress> {
387        let replays = self.replays.read();
388
389        replays
390            .iter()
391            .map(|state| {
392                let processed = state.processed_events.load(Ordering::Relaxed);
393                let failed = state.failed_events.load(Ordering::Relaxed);
394                let progress_percentage = if state.total_events > 0 {
395                    (processed as f64 / state.total_events as f64) * 100.0
396                } else {
397                    0.0
398                };
399
400                let updated_at = Utc::now();
401                let elapsed_seconds = (updated_at - state.started_at).num_seconds().max(1) as f64;
402                let events_per_second = processed as f64 / elapsed_seconds;
403
404                ReplayProgress {
405                    replay_id: state.id,
406                    status: *state.status.read(),
407                    started_at: state.started_at,
408                    updated_at,
409                    completed_at: *state.completed_at.read(),
410                    total_events: state.total_events,
411                    processed_events: processed as usize,
412                    failed_events: failed as usize,
413                    progress_percentage,
414                    events_per_second,
415                    error_message: state.error_message.read().clone(),
416                }
417            })
418            .collect()
419    }
420
421    /// Delete a completed or failed replay from history
422    pub fn delete_replay(&self, replay_id: Uuid) -> Result<bool> {
423        let mut replays = self.replays.write();
424
425        let idx = replays
426            .iter()
427            .position(|r| r.id == replay_id)
428            .ok_or_else(|| {
429                AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
430            })?;
431
432        let status = *replays[idx].status.read();
433        if status == ReplayStatus::Running {
434            return Err(AllSourceError::ValidationError(
435                "Cannot delete a running replay. Cancel it first.".to_string(),
436            ));
437        }
438
439        replays.remove(idx);
440        tracing::info!("🗑️  Deleted replay {}", replay_id);
441
442        Ok(true)
443    }
444}
445
446impl Default for ReplayManager {
447    fn default() -> Self {
448        Self::new()
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455    use crate::domain::entities::Event;
456    use serde_json::json;
457
458    #[tokio::test]
459    async fn test_replay_manager_creation() {
460        let manager = ReplayManager::new();
461        let replays = manager.list_replays();
462        assert_eq!(replays.len(), 0);
463    }
464
465    #[tokio::test]
466    async fn test_replay_progress_tracking() {
467        let manager = ReplayManager::new();
468        let store = Arc::new(EventStore::new());
469
470        // Ingest some test events
471        for i in 0..10 {
472            let event = Event::from_strings(
473                "test.event".to_string(),
474                "test-entity".to_string(),
475                "default".to_string(),
476                json!({"value": i}),
477                None,
478            )
479            .unwrap();
480            store.ingest(&event).unwrap();
481        }
482
483        // Start replay
484        let request = StartReplayRequest {
485            projection_name: None,
486            from_timestamp: None,
487            to_timestamp: None,
488            entity_id: None,
489            event_type: None,
490            config: Some(ReplayConfig {
491                batch_size: 5,
492                parallel: false,
493                workers: 1,
494                emit_progress: true,
495                progress_interval: 5,
496            }),
497        };
498
499        let response = manager.start_replay(store, request).unwrap();
500        assert_eq!(response.status, ReplayStatus::Running);
501        assert_eq!(response.total_events, 10);
502
503        // Wait a bit for replay to process
504        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
505
506        // Check progress
507        let progress = manager.get_progress(response.replay_id).unwrap();
508        assert!(progress.processed_events <= 10);
509    }
510}