Skip to main content

allsource_core/application/services/
replay.rs

1use crate::{
2    application::{dto::QueryEventsRequest, services::projection::Projection},
3    domain::entities::Event,
4    error::{AllSourceError, Result},
5    store::EventStore,
6};
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::sync::{
11    Arc,
12    atomic::{AtomicBool, AtomicU64, Ordering},
13};
14use uuid::Uuid;
15
16/// Status of a replay operation
17#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(rename_all = "lowercase")]
19pub enum ReplayStatus {
20    /// Replay is pending and hasn't started yet
21    Pending,
22    /// Replay is currently running
23    Running,
24    /// Replay completed successfully
25    Completed,
26    /// Replay failed with an error
27    Failed,
28    /// Replay was cancelled by user
29    Cancelled,
30}
31
32/// Configuration for replay operations
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ReplayConfig {
35    /// Batch size for processing events
36    pub batch_size: usize,
37
38    /// Whether to run replay in parallel
39    pub parallel: bool,
40
41    /// Number of parallel workers (if parallel is true)
42    pub workers: usize,
43
44    /// Whether to emit progress events
45    pub emit_progress: bool,
46
47    /// Progress reporting interval (every N events)
48    pub progress_interval: usize,
49}
50
51impl Default for ReplayConfig {
52    fn default() -> Self {
53        Self {
54            batch_size: 1000,
55            parallel: false,
56            workers: 4,
57            emit_progress: true,
58            progress_interval: 1000,
59        }
60    }
61}
62
63/// Request to start a replay operation
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct StartReplayRequest {
66    /// Optional projection name to rebuild (if None, replays all projections)
67    pub projection_name: Option<String>,
68
69    /// Start from this timestamp (if None, starts from beginning)
70    pub from_timestamp: Option<DateTime<Utc>>,
71
72    /// End at this timestamp (if None, goes to end)
73    pub to_timestamp: Option<DateTime<Utc>>,
74
75    /// Filter by entity_id (optional)
76    pub entity_id: Option<String>,
77
78    /// Filter by event_type (optional)
79    pub event_type: Option<String>,
80
81    /// Replay configuration
82    pub config: Option<ReplayConfig>,
83}
84
85/// Response from starting a replay
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct StartReplayResponse {
88    pub replay_id: Uuid,
89    pub status: ReplayStatus,
90    pub started_at: DateTime<Utc>,
91    pub total_events: usize,
92}
93
94/// Progress information for a replay
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct ReplayProgress {
97    pub replay_id: Uuid,
98    pub status: ReplayStatus,
99    pub started_at: DateTime<Utc>,
100    pub updated_at: DateTime<Utc>,
101    pub completed_at: Option<DateTime<Utc>>,
102    pub total_events: usize,
103    pub processed_events: usize,
104    pub failed_events: usize,
105    pub progress_percentage: f64,
106    pub events_per_second: f64,
107    pub error_message: Option<String>,
108}
109
110/// Manages event replay and projection rebuilding
111pub struct ReplayManager {
112    /// Active replay operations
113    replays: Arc<RwLock<Vec<ReplayState>>>,
114}
115
116/// Internal state for a replay operation
117struct ReplayState {
118    id: Uuid,
119    projection_name: Option<String>,
120    status: Arc<RwLock<ReplayStatus>>,
121    started_at: DateTime<Utc>,
122    completed_at: Arc<RwLock<Option<DateTime<Utc>>>>,
123    total_events: usize,
124    processed_events: Arc<AtomicU64>,
125    failed_events: Arc<AtomicU64>,
126    error_message: Arc<RwLock<Option<String>>>,
127    cancelled: Arc<AtomicBool>,
128}
129
130impl ReplayManager {
131    pub fn new() -> Self {
132        Self {
133            replays: Arc::new(RwLock::new(Vec::new())),
134        }
135    }
136
137    /// Start a replay operation
138    pub fn start_replay(
139        &self,
140        store: Arc<EventStore>,
141        request: StartReplayRequest,
142    ) -> Result<StartReplayResponse> {
143        let replay_id = Uuid::new_v4();
144        let started_at = Utc::now();
145        let config = request.config.unwrap_or_default();
146
147        // Query events to replay
148        let query = QueryEventsRequest {
149            entity_id: request.entity_id.clone(),
150            event_type: request.event_type.clone(),
151            tenant_id: None,
152            as_of: request.to_timestamp,
153            since: request.from_timestamp,
154            until: request.to_timestamp,
155            limit: None,
156        };
157
158        let events = store.query(query)?;
159        let total_events = events.len();
160
161        tracing::info!(
162            "🔄 Starting replay {} for {} events{}",
163            replay_id,
164            total_events,
165            request
166                .projection_name
167                .as_ref()
168                .map(|n| format!(" (projection: {n})"))
169                .unwrap_or_default()
170        );
171
172        // Create replay state
173        let state = ReplayState {
174            id: replay_id,
175            projection_name: request.projection_name.clone(),
176            status: Arc::new(RwLock::new(ReplayStatus::Running)),
177            started_at,
178            completed_at: Arc::new(RwLock::new(None)),
179            total_events,
180            processed_events: Arc::new(AtomicU64::new(0)),
181            failed_events: Arc::new(AtomicU64::new(0)),
182            error_message: Arc::new(RwLock::new(None)),
183            cancelled: Arc::new(AtomicBool::new(false)),
184        };
185
186        // Store replay state
187        self.replays.write().push(state);
188
189        // Get the state references we need
190        let replays = Arc::clone(&self.replays);
191        let replay_idx = replays.read().len() - 1;
192
193        // Spawn replay task
194        tokio::spawn(async move {
195            let result = Self::run_replay(
196                store,
197                events,
198                request.projection_name,
199                config,
200                replays.clone(),
201                replay_idx,
202            )
203            .await;
204
205            // Update final status
206            let mut replays_lock = replays.write();
207            if let Some(state) = replays_lock.get_mut(replay_idx) {
208                *state.completed_at.write() = Some(Utc::now());
209
210                match result {
211                    Ok(_) => {
212                        if state.cancelled.load(Ordering::Relaxed) {
213                            *state.status.write() = ReplayStatus::Cancelled;
214                            tracing::info!("🛑 Replay {} cancelled", state.id);
215                        } else {
216                            *state.status.write() = ReplayStatus::Completed;
217                            tracing::info!("✅ Replay {} completed successfully", state.id);
218                        }
219                    }
220                    Err(e) => {
221                        *state.status.write() = ReplayStatus::Failed;
222                        *state.error_message.write() = Some(e.to_string());
223                        tracing::error!("❌ Replay {} failed: {}", state.id, e);
224                    }
225                }
226            }
227        });
228
229        Ok(StartReplayResponse {
230            replay_id,
231            status: ReplayStatus::Running,
232            started_at,
233            total_events,
234        })
235    }
236
237    /// Internal replay execution
238    async fn run_replay(
239        store: Arc<EventStore>,
240        events: Vec<Event>,
241        projection_name: Option<String>,
242        config: ReplayConfig,
243        replays: Arc<RwLock<Vec<ReplayState>>>,
244        replay_idx: usize,
245    ) -> Result<()> {
246        let total = events.len();
247        let projections = store.projections.read();
248
249        // Get target projection(s)
250        let target_projections: Vec<(String, Arc<dyn Projection>)> =
251            if let Some(name) = projection_name {
252                if let Some(proj) = projections.get_projection(&name) {
253                    vec![(name, proj)]
254                } else {
255                    return Err(AllSourceError::ValidationError(format!(
256                        "Projection not found: {}",
257                        name
258                    )));
259                }
260            } else {
261                projections.list_projections()
262            };
263
264        drop(projections); // Release lock
265
266        // Process events in batches
267        for (batch_idx, chunk) in events.chunks(config.batch_size).enumerate() {
268            // Check if cancelled
269            {
270                let replays_lock = replays.read();
271                if let Some(state) = replays_lock.get(replay_idx)
272                    && state.cancelled.load(Ordering::Relaxed)
273                {
274                    return Ok(());
275                }
276            }
277
278            // Process batch
279            for event in chunk {
280                // Apply event to each target projection
281                for (proj_name, projection) in &target_projections {
282                    if let Err(e) = projection.process(event) {
283                        tracing::warn!(
284                            "Failed to process event {} in projection {}: {}",
285                            event.id,
286                            proj_name,
287                            e
288                        );
289
290                        // Increment failed counter
291                        let replays_lock = replays.read();
292                        if let Some(state) = replays_lock.get(replay_idx) {
293                            state.failed_events.fetch_add(1, Ordering::Relaxed);
294                        }
295                    }
296                }
297
298                // Increment processed counter
299                let replays_lock = replays.read();
300                if let Some(state) = replays_lock.get(replay_idx) {
301                    let processed = state.processed_events.fetch_add(1, Ordering::Relaxed) + 1;
302
303                    // Emit progress
304                    if config.emit_progress && processed % config.progress_interval as u64 == 0 {
305                        let progress = (processed as f64 / total as f64) * 100.0;
306                        tracing::debug!(
307                            "Replay progress: {}/{} ({:.1}%)",
308                            processed,
309                            total,
310                            progress
311                        );
312                    }
313                }
314            }
315        }
316
317        Ok(())
318    }
319
320    /// Get progress for a replay operation
321    pub fn get_progress(&self, replay_id: Uuid) -> Result<ReplayProgress> {
322        let replays = self.replays.read();
323
324        let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
325            AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
326        })?;
327
328        let processed = state.processed_events.load(Ordering::Relaxed);
329        let failed = state.failed_events.load(Ordering::Relaxed);
330        let total_events = state.total_events;
331        let started_at = state.started_at;
332        let status = *state.status.read();
333        let completed_at = *state.completed_at.read();
334        let error_message = state.error_message.read().clone();
335
336        drop(replays); // Release lock before calculations
337
338        let progress_percentage = if total_events > 0 {
339            (processed as f64 / total_events as f64) * 100.0
340        } else {
341            0.0
342        };
343
344        let updated_at = Utc::now();
345        let elapsed_seconds = (updated_at - started_at).num_seconds().max(1) as f64;
346        let events_per_second = processed as f64 / elapsed_seconds;
347
348        Ok(ReplayProgress {
349            replay_id,
350            status,
351            started_at,
352            updated_at,
353            completed_at,
354            total_events,
355            processed_events: processed as usize,
356            failed_events: failed as usize,
357            progress_percentage,
358            events_per_second,
359            error_message,
360        })
361    }
362
363    /// Cancel a running replay
364    pub fn cancel_replay(&self, replay_id: Uuid) -> Result<()> {
365        let replays = self.replays.read();
366
367        let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
368            AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
369        })?;
370
371        let status = *state.status.read();
372        if status != ReplayStatus::Running {
373            return Err(AllSourceError::ValidationError(format!(
374                "Cannot cancel replay in status: {:?}",
375                status
376            )));
377        }
378
379        state.cancelled.store(true, Ordering::Relaxed);
380        tracing::info!("🛑 Cancelling replay {}", replay_id);
381
382        Ok(())
383    }
384
385    /// List all replay operations
386    pub fn list_replays(&self) -> Vec<ReplayProgress> {
387        let replays = self.replays.read();
388
389        replays
390            .iter()
391            .map(|state| {
392                let processed = state.processed_events.load(Ordering::Relaxed);
393                let failed = state.failed_events.load(Ordering::Relaxed);
394                let progress_percentage = if state.total_events > 0 {
395                    (processed as f64 / state.total_events as f64) * 100.0
396                } else {
397                    0.0
398                };
399
400                let updated_at = Utc::now();
401                let elapsed_seconds = (updated_at - state.started_at).num_seconds().max(1) as f64;
402                let events_per_second = processed as f64 / elapsed_seconds;
403
404                ReplayProgress {
405                    replay_id: state.id,
406                    status: *state.status.read(),
407                    started_at: state.started_at,
408                    updated_at,
409                    completed_at: *state.completed_at.read(),
410                    total_events: state.total_events,
411                    processed_events: processed as usize,
412                    failed_events: failed as usize,
413                    progress_percentage,
414                    events_per_second,
415                    error_message: state.error_message.read().clone(),
416                }
417            })
418            .collect()
419    }
420
421    /// Delete a completed or failed replay from history
422    pub fn delete_replay(&self, replay_id: Uuid) -> Result<bool> {
423        let mut replays = self.replays.write();
424
425        let idx = replays
426            .iter()
427            .position(|r| r.id == replay_id)
428            .ok_or_else(|| {
429                AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
430            })?;
431
432        let status = *replays[idx].status.read();
433        if status == ReplayStatus::Running {
434            return Err(AllSourceError::ValidationError(
435                "Cannot delete a running replay. Cancel it first.".to_string(),
436            ));
437        }
438
439        replays.remove(idx);
440        tracing::info!("🗑️  Deleted replay {}", replay_id);
441
442        Ok(true)
443    }
444}
445
446impl Default for ReplayManager {
447    fn default() -> Self {
448        Self::new()
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455    use crate::domain::entities::Event;
456    use serde_json::json;
457
458    #[tokio::test]
459    async fn test_replay_manager_creation() {
460        let manager = ReplayManager::new();
461        let replays = manager.list_replays();
462        assert_eq!(replays.len(), 0);
463    }
464
465    #[tokio::test]
466    async fn test_replay_progress_tracking() {
467        let manager = ReplayManager::new();
468        let store = Arc::new(EventStore::new());
469
470        // Ingest some test events
471        for i in 0..10 {
472            let event = Event::from_strings(
473                "test.event".to_string(),
474                "test-entity".to_string(),
475                "default".to_string(),
476                json!({"value": i}),
477                None,
478            )
479            .unwrap();
480            store.ingest(event).unwrap();
481        }
482
483        // Start replay
484        let request = StartReplayRequest {
485            projection_name: None,
486            from_timestamp: None,
487            to_timestamp: None,
488            entity_id: None,
489            event_type: None,
490            config: Some(ReplayConfig {
491                batch_size: 5,
492                parallel: false,
493                workers: 1,
494                emit_progress: true,
495                progress_interval: 5,
496            }),
497        };
498
499        let response = manager.start_replay(store, request).unwrap();
500        assert_eq!(response.status, ReplayStatus::Running);
501        assert_eq!(response.total_events, 10);
502
503        // Wait a bit for replay to process
504        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
505
506        // Check progress
507        let progress = manager.get_progress(response.replay_id).unwrap();
508        assert!(progress.processed_events <= 10);
509    }
510}