Skip to main content

allsource_core/application/services/
replay.rs

1use crate::{
2    application::{dto::QueryEventsRequest, services::projection::Projection},
3    domain::entities::Event,
4    error::{AllSourceError, Result},
5    store::EventStore,
6};
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::sync::{
11    Arc,
12    atomic::{AtomicBool, AtomicU64, Ordering},
13};
14use uuid::Uuid;
15
16/// Status of a replay operation
17#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(rename_all = "lowercase")]
19pub enum ReplayStatus {
20    /// Replay is pending and hasn't started yet
21    Pending,
22    /// Replay is currently running
23    Running,
24    /// Replay completed successfully
25    Completed,
26    /// Replay failed with an error
27    Failed,
28    /// Replay was cancelled by user
29    Cancelled,
30}
31
32/// Configuration for replay operations
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ReplayConfig {
35    /// Batch size for processing events
36    pub batch_size: usize,
37
38    /// Whether to run replay in parallel
39    pub parallel: bool,
40
41    /// Number of parallel workers (if parallel is true)
42    pub workers: usize,
43
44    /// Whether to emit progress events
45    pub emit_progress: bool,
46
47    /// Progress reporting interval (every N events)
48    pub progress_interval: usize,
49}
50
51impl Default for ReplayConfig {
52    fn default() -> Self {
53        Self {
54            batch_size: 1000,
55            parallel: false,
56            workers: 4,
57            emit_progress: true,
58            progress_interval: 1000,
59        }
60    }
61}
62
63/// Request to start a replay operation
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct StartReplayRequest {
66    /// Optional projection name to rebuild (if None, replays all projections)
67    pub projection_name: Option<String>,
68
69    /// Start from this timestamp (if None, starts from beginning)
70    pub from_timestamp: Option<DateTime<Utc>>,
71
72    /// End at this timestamp (if None, goes to end)
73    pub to_timestamp: Option<DateTime<Utc>>,
74
75    /// Filter by entity_id (optional)
76    pub entity_id: Option<String>,
77
78    /// Filter by event_type (optional)
79    pub event_type: Option<String>,
80
81    /// Replay configuration
82    pub config: Option<ReplayConfig>,
83}
84
85/// Response from starting a replay
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct StartReplayResponse {
88    pub replay_id: Uuid,
89    pub status: ReplayStatus,
90    pub started_at: DateTime<Utc>,
91    pub total_events: usize,
92}
93
94/// Progress information for a replay
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct ReplayProgress {
97    pub replay_id: Uuid,
98    pub status: ReplayStatus,
99    pub started_at: DateTime<Utc>,
100    pub updated_at: DateTime<Utc>,
101    pub completed_at: Option<DateTime<Utc>>,
102    pub total_events: usize,
103    pub processed_events: usize,
104    pub failed_events: usize,
105    pub progress_percentage: f64,
106    pub events_per_second: f64,
107    pub error_message: Option<String>,
108}
109
110/// Manages event replay and projection rebuilding
111pub struct ReplayManager {
112    /// Active replay operations
113    replays: Arc<RwLock<Vec<ReplayState>>>,
114}
115
116/// Internal state for a replay operation
117struct ReplayState {
118    id: Uuid,
119    projection_name: Option<String>,
120    status: Arc<RwLock<ReplayStatus>>,
121    started_at: DateTime<Utc>,
122    completed_at: Arc<RwLock<Option<DateTime<Utc>>>>,
123    total_events: usize,
124    processed_events: Arc<AtomicU64>,
125    failed_events: Arc<AtomicU64>,
126    error_message: Arc<RwLock<Option<String>>>,
127    cancelled: Arc<AtomicBool>,
128}
129
130impl ReplayManager {
131    pub fn new() -> Self {
132        Self {
133            replays: Arc::new(RwLock::new(Vec::new())),
134        }
135    }
136
137    /// Start a replay operation
138    pub fn start_replay(
139        &self,
140        store: Arc<EventStore>,
141        request: StartReplayRequest,
142    ) -> Result<StartReplayResponse> {
143        let replay_id = Uuid::new_v4();
144        let started_at = Utc::now();
145        let config = request.config.unwrap_or_default();
146
147        // Query events to replay
148        let query = QueryEventsRequest {
149            entity_id: request.entity_id.clone(),
150            event_type: request.event_type.clone(),
151            tenant_id: None,
152            as_of: request.to_timestamp,
153            since: request.from_timestamp,
154            until: request.to_timestamp,
155            limit: None,
156            event_type_prefix: None,
157            payload_filter: None,
158        };
159
160        let events = store.query(query)?;
161        let total_events = events.len();
162
163        tracing::info!(
164            "🔄 Starting replay {} for {} events{}",
165            replay_id,
166            total_events,
167            request
168                .projection_name
169                .as_ref()
170                .map(|n| format!(" (projection: {n})"))
171                .unwrap_or_default()
172        );
173
174        // Create replay state
175        let state = ReplayState {
176            id: replay_id,
177            projection_name: request.projection_name.clone(),
178            status: Arc::new(RwLock::new(ReplayStatus::Running)),
179            started_at,
180            completed_at: Arc::new(RwLock::new(None)),
181            total_events,
182            processed_events: Arc::new(AtomicU64::new(0)),
183            failed_events: Arc::new(AtomicU64::new(0)),
184            error_message: Arc::new(RwLock::new(None)),
185            cancelled: Arc::new(AtomicBool::new(false)),
186        };
187
188        // Store replay state
189        self.replays.write().push(state);
190
191        // Get the state references we need
192        let replays = Arc::clone(&self.replays);
193        let replay_idx = replays.read().len() - 1;
194
195        // Spawn replay task
196        tokio::spawn(async move {
197            let result = Self::run_replay(
198                store,
199                events,
200                request.projection_name,
201                config,
202                replays.clone(),
203                replay_idx,
204            )
205            .await;
206
207            // Update final status
208            let mut replays_lock = replays.write();
209            if let Some(state) = replays_lock.get_mut(replay_idx) {
210                *state.completed_at.write() = Some(Utc::now());
211
212                match result {
213                    Ok(_) => {
214                        if state.cancelled.load(Ordering::Relaxed) {
215                            *state.status.write() = ReplayStatus::Cancelled;
216                            tracing::info!("🛑 Replay {} cancelled", state.id);
217                        } else {
218                            *state.status.write() = ReplayStatus::Completed;
219                            tracing::info!("✅ Replay {} completed successfully", state.id);
220                        }
221                    }
222                    Err(e) => {
223                        *state.status.write() = ReplayStatus::Failed;
224                        *state.error_message.write() = Some(e.to_string());
225                        tracing::error!("❌ Replay {} failed: {}", state.id, e);
226                    }
227                }
228            }
229        });
230
231        Ok(StartReplayResponse {
232            replay_id,
233            status: ReplayStatus::Running,
234            started_at,
235            total_events,
236        })
237    }
238
239    /// Internal replay execution
240    async fn run_replay(
241        store: Arc<EventStore>,
242        events: Vec<Event>,
243        projection_name: Option<String>,
244        config: ReplayConfig,
245        replays: Arc<RwLock<Vec<ReplayState>>>,
246        replay_idx: usize,
247    ) -> Result<()> {
248        let total = events.len();
249        let projections = store.projections.read();
250
251        // Get target projection(s)
252        let target_projections: Vec<(String, Arc<dyn Projection>)> =
253            if let Some(name) = projection_name {
254                if let Some(proj) = projections.get_projection(&name) {
255                    vec![(name, proj)]
256                } else {
257                    return Err(AllSourceError::ValidationError(format!(
258                        "Projection not found: {}",
259                        name
260                    )));
261                }
262            } else {
263                projections.list_projections()
264            };
265
266        drop(projections); // Release lock
267
268        // Process events in batches
269        for (batch_idx, chunk) in events.chunks(config.batch_size).enumerate() {
270            // Check if cancelled
271            {
272                let replays_lock = replays.read();
273                if let Some(state) = replays_lock.get(replay_idx)
274                    && state.cancelled.load(Ordering::Relaxed)
275                {
276                    return Ok(());
277                }
278            }
279
280            // Process batch
281            for event in chunk {
282                // Apply event to each target projection
283                for (proj_name, projection) in &target_projections {
284                    if let Err(e) = projection.process(event) {
285                        tracing::warn!(
286                            "Failed to process event {} in projection {}: {}",
287                            event.id,
288                            proj_name,
289                            e
290                        );
291
292                        // Increment failed counter
293                        let replays_lock = replays.read();
294                        if let Some(state) = replays_lock.get(replay_idx) {
295                            state.failed_events.fetch_add(1, Ordering::Relaxed);
296                        }
297                    }
298                }
299
300                // Increment processed counter
301                let replays_lock = replays.read();
302                if let Some(state) = replays_lock.get(replay_idx) {
303                    let processed = state.processed_events.fetch_add(1, Ordering::Relaxed) + 1;
304
305                    // Emit progress
306                    if config.emit_progress && processed % config.progress_interval as u64 == 0 {
307                        let progress = (processed as f64 / total as f64) * 100.0;
308                        tracing::debug!(
309                            "Replay progress: {}/{} ({:.1}%)",
310                            processed,
311                            total,
312                            progress
313                        );
314                    }
315                }
316            }
317        }
318
319        Ok(())
320    }
321
322    /// Get progress for a replay operation
323    pub fn get_progress(&self, replay_id: Uuid) -> Result<ReplayProgress> {
324        let replays = self.replays.read();
325
326        let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
327            AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
328        })?;
329
330        let processed = state.processed_events.load(Ordering::Relaxed);
331        let failed = state.failed_events.load(Ordering::Relaxed);
332        let total_events = state.total_events;
333        let started_at = state.started_at;
334        let status = *state.status.read();
335        let completed_at = *state.completed_at.read();
336        let error_message = state.error_message.read().clone();
337
338        drop(replays); // Release lock before calculations
339
340        let progress_percentage = if total_events > 0 {
341            (processed as f64 / total_events as f64) * 100.0
342        } else {
343            0.0
344        };
345
346        let updated_at = Utc::now();
347        let elapsed_seconds = (updated_at - started_at).num_seconds().max(1) as f64;
348        let events_per_second = processed as f64 / elapsed_seconds;
349
350        Ok(ReplayProgress {
351            replay_id,
352            status,
353            started_at,
354            updated_at,
355            completed_at,
356            total_events,
357            processed_events: processed as usize,
358            failed_events: failed as usize,
359            progress_percentage,
360            events_per_second,
361            error_message,
362        })
363    }
364
365    /// Cancel a running replay
366    pub fn cancel_replay(&self, replay_id: Uuid) -> Result<()> {
367        let replays = self.replays.read();
368
369        let state = replays.iter().find(|r| r.id == replay_id).ok_or_else(|| {
370            AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
371        })?;
372
373        let status = *state.status.read();
374        if status != ReplayStatus::Running {
375            return Err(AllSourceError::ValidationError(format!(
376                "Cannot cancel replay in status: {:?}",
377                status
378            )));
379        }
380
381        state.cancelled.store(true, Ordering::Relaxed);
382        tracing::info!("🛑 Cancelling replay {}", replay_id);
383
384        Ok(())
385    }
386
387    /// List all replay operations
388    pub fn list_replays(&self) -> Vec<ReplayProgress> {
389        let replays = self.replays.read();
390
391        replays
392            .iter()
393            .map(|state| {
394                let processed = state.processed_events.load(Ordering::Relaxed);
395                let failed = state.failed_events.load(Ordering::Relaxed);
396                let progress_percentage = if state.total_events > 0 {
397                    (processed as f64 / state.total_events as f64) * 100.0
398                } else {
399                    0.0
400                };
401
402                let updated_at = Utc::now();
403                let elapsed_seconds = (updated_at - state.started_at).num_seconds().max(1) as f64;
404                let events_per_second = processed as f64 / elapsed_seconds;
405
406                ReplayProgress {
407                    replay_id: state.id,
408                    status: *state.status.read(),
409                    started_at: state.started_at,
410                    updated_at,
411                    completed_at: *state.completed_at.read(),
412                    total_events: state.total_events,
413                    processed_events: processed as usize,
414                    failed_events: failed as usize,
415                    progress_percentage,
416                    events_per_second,
417                    error_message: state.error_message.read().clone(),
418                }
419            })
420            .collect()
421    }
422
423    /// Delete a completed or failed replay from history
424    pub fn delete_replay(&self, replay_id: Uuid) -> Result<bool> {
425        let mut replays = self.replays.write();
426
427        let idx = replays
428            .iter()
429            .position(|r| r.id == replay_id)
430            .ok_or_else(|| {
431                AllSourceError::ValidationError(format!("Replay not found: {replay_id}"))
432            })?;
433
434        let status = *replays[idx].status.read();
435        if status == ReplayStatus::Running {
436            return Err(AllSourceError::ValidationError(
437                "Cannot delete a running replay. Cancel it first.".to_string(),
438            ));
439        }
440
441        replays.remove(idx);
442        tracing::info!("🗑️  Deleted replay {}", replay_id);
443
444        Ok(true)
445    }
446}
447
448impl Default for ReplayManager {
449    fn default() -> Self {
450        Self::new()
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use crate::domain::entities::Event;
458    use serde_json::json;
459
460    #[tokio::test]
461    async fn test_replay_manager_creation() {
462        let manager = ReplayManager::new();
463        let replays = manager.list_replays();
464        assert_eq!(replays.len(), 0);
465    }
466
467    #[tokio::test]
468    async fn test_replay_progress_tracking() {
469        let manager = ReplayManager::new();
470        let store = Arc::new(EventStore::new());
471
472        // Ingest some test events
473        for i in 0..10 {
474            let event = Event::from_strings(
475                "test.event".to_string(),
476                "test-entity".to_string(),
477                "default".to_string(),
478                json!({"value": i}),
479                None,
480            )
481            .unwrap();
482            store.ingest(event).unwrap();
483        }
484
485        // Start replay
486        let request = StartReplayRequest {
487            projection_name: None,
488            from_timestamp: None,
489            to_timestamp: None,
490            entity_id: None,
491            event_type: None,
492            config: Some(ReplayConfig {
493                batch_size: 5,
494                parallel: false,
495                workers: 1,
496                emit_progress: true,
497                progress_interval: 5,
498            }),
499        };
500
501        let response = manager.start_replay(store, request).unwrap();
502        assert_eq!(response.status, ReplayStatus::Running);
503        assert_eq!(response.total_events, 10);
504
505        // Wait a bit for replay to process
506        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
507
508        // Check progress
509        let progress = manager.get_progress(response.replay_id).unwrap();
510        assert!(progress.processed_events <= 10);
511    }
512}