Skip to main content

s3rm_rs/callback/
event_manager.rs

1//! Event callback manager.
2//!
3//! Adapted from s3sync's `callback/event_manager.rs`.
4//! Manages event callback registration and dispatching for the deletion pipeline.
5
6use std::fmt;
7use std::sync::Arc;
8
9use tokio::sync::Mutex;
10use tokio::time::Instant;
11
12use crate::types::event_callback::{EventCallback, EventData, EventType};
13
14/// Accumulated pipeline statistics for the STATS_REPORT event.
15#[derive(Default, Debug, Clone)]
16pub struct PipelineStats {
17    pub pipeline_start_time: Option<Instant>,
18    pub stats_deleted_objects: u64,
19    pub stats_deleted_bytes: u64,
20    pub stats_failed_objects: u64,
21    pub stats_skipped_objects: u64,
22    pub stats_error_count: u64,
23    pub stats_duration_sec: f64,
24    pub stats_objects_per_sec: f64,
25}
26
27impl From<PipelineStats> for EventData {
28    fn from(stats: PipelineStats) -> Self {
29        let mut event_data = EventData::new(EventType::STATS_REPORT);
30        event_data.stats_deleted_objects = Some(stats.stats_deleted_objects);
31        event_data.stats_deleted_bytes = Some(stats.stats_deleted_bytes);
32        event_data.stats_failed_objects = Some(stats.stats_failed_objects);
33        event_data.stats_skipped_objects = Some(stats.stats_skipped_objects);
34        event_data.stats_error_count = Some(stats.stats_error_count);
35        event_data.stats_duration_sec = Some(stats.stats_duration_sec);
36        event_data.stats_objects_per_sec = Some(stats.stats_objects_per_sec);
37        event_data
38    }
39}
40
41/// Manages event callback registration and dispatching.
42///
43/// Holds an optional `EventCallback` trait object and accumulated pipeline statistics.
44/// On `PIPELINE_END`, sends a `STATS_REPORT` event with accumulated stats if the caller
45/// subscribed to `STATS_REPORT` via `event_flags`.
46#[derive(Clone)]
47pub struct EventManager {
48    pub event_callback: Option<Arc<Mutex<Box<dyn EventCallback + Send + Sync>>>>,
49    pub event_flags: EventType,
50    pub dry_run: bool,
51    pub pipeline_stats: Arc<Mutex<PipelineStats>>,
52}
53
54impl Default for EventManager {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60impl EventManager {
61    pub fn new() -> Self {
62        Self {
63            event_callback: None,
64            event_flags: EventType::ALL_EVENTS,
65            dry_run: false,
66            pipeline_stats: Arc::new(Mutex::new(PipelineStats::default())),
67        }
68    }
69
70    /// Register an event callback with event type filter and dry-run flag.
71    pub fn register_callback<T: EventCallback + Send + Sync + 'static>(
72        &mut self,
73        events_flag: EventType,
74        callback: T,
75        dry_run: bool,
76    ) {
77        self.event_callback = Some(Arc::new(Mutex::new(Box::new(callback))));
78        self.event_flags = events_flag;
79        self.dry_run = dry_run;
80    }
81
82    /// Returns true if an event callback has been registered.
83    pub fn is_callback_registered(&self) -> bool {
84        self.event_callback.is_some()
85    }
86
87    /// Trigger an event, updating internal stats and dispatching to the callback.
88    pub async fn trigger_event(&self, mut event_data: EventData) {
89        self.update_pipeline_stats(&event_data).await;
90
91        if let Some(callback) = &self.event_callback {
92            let event_type = event_data.event_type;
93            if self.event_flags.contains(event_type) {
94                event_data.dry_run = self.dry_run;
95                callback.lock().await.on_event(event_data).await;
96            }
97            // On PIPELINE_END, send accumulated stats report if caller subscribed to it
98            if event_type == EventType::PIPELINE_END
99                && self.event_flags.contains(EventType::STATS_REPORT)
100            {
101                let stats = self.pipeline_stats.lock().await.clone();
102                let mut stats_event: EventData = stats.into();
103                stats_event.dry_run = self.dry_run;
104                callback.lock().await.on_event(stats_event).await;
105            }
106        }
107    }
108
109    async fn update_pipeline_stats(&self, event_data: &EventData) {
110        let mut stats = self.pipeline_stats.lock().await;
111
112        match event_data.event_type {
113            EventType::PIPELINE_START => {
114                stats.pipeline_start_time = Some(Instant::now());
115            }
116            EventType::PIPELINE_END => {
117                if let Some(start) = stats.pipeline_start_time {
118                    stats.stats_duration_sec = start.elapsed().as_secs_f64();
119                    if stats.stats_duration_sec > 1.0 {
120                        stats.stats_objects_per_sec =
121                            stats.stats_deleted_objects as f64 / stats.stats_duration_sec;
122                    } else {
123                        stats.stats_objects_per_sec = stats.stats_deleted_objects as f64;
124                    }
125                }
126            }
127            EventType::DELETE_COMPLETE => {
128                stats.stats_deleted_objects += 1;
129                if let Some(size) = event_data.size {
130                    stats.stats_deleted_bytes += size;
131                }
132            }
133            EventType::DELETE_FAILED => {
134                stats.stats_failed_objects += 1;
135            }
136            EventType::DELETE_FILTERED => {
137                stats.stats_skipped_objects += 1;
138            }
139            EventType::PIPELINE_ERROR => {
140                stats.stats_error_count += 1;
141            }
142            _ => {}
143        }
144    }
145}
146
147impl fmt::Debug for EventManager {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        f.debug_struct("EventManager")
150            .field("event_flags", &self.event_flags)
151            .field("callback_registered", &self.event_callback.is_some())
152            .finish()
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use async_trait::async_trait;
160
161    struct CollectingCallback {
162        events: Arc<Mutex<Vec<EventData>>>,
163    }
164
165    impl CollectingCallback {
166        fn new() -> (Self, Arc<Mutex<Vec<EventData>>>) {
167            let events = Arc::new(Mutex::new(Vec::new()));
168            (
169                Self {
170                    events: events.clone(),
171                },
172                events,
173            )
174        }
175    }
176
177    #[async_trait]
178    impl EventCallback for CollectingCallback {
179        async fn on_event(&mut self, event_data: EventData) {
180            self.events.lock().await.push(event_data);
181        }
182    }
183
184    #[tokio::test]
185    async fn new_manager_has_no_callback() {
186        let manager = EventManager::new();
187        assert!(!manager.is_callback_registered());
188    }
189
190    #[tokio::test]
191    async fn register_callback_and_trigger_event() {
192        let mut manager = EventManager::new();
193        let (callback, events) = CollectingCallback::new();
194        manager.register_callback(EventType::ALL_EVENTS, callback, false);
195        assert!(manager.is_callback_registered());
196
197        let event_data = EventData::new(EventType::PIPELINE_START);
198        manager.trigger_event(event_data).await;
199
200        let collected = events.lock().await;
201        assert_eq!(collected.len(), 1);
202        assert_eq!(collected[0].event_type, EventType::PIPELINE_START);
203    }
204
205    #[tokio::test]
206    async fn pipeline_end_sends_stats_report() {
207        let mut manager = EventManager::new();
208        let (callback, events) = CollectingCallback::new();
209        manager.register_callback(EventType::ALL_EVENTS, callback, false);
210
211        // Start pipeline
212        manager
213            .trigger_event(EventData::new(EventType::PIPELINE_START))
214            .await;
215
216        // Simulate some deletions
217        let mut delete_event = EventData::new(EventType::DELETE_COMPLETE);
218        delete_event.size = Some(1024);
219        manager.trigger_event(delete_event).await;
220
221        // End pipeline
222        manager
223            .trigger_event(EventData::new(EventType::PIPELINE_END))
224            .await;
225
226        let collected = events.lock().await;
227        // PIPELINE_START + DELETE_COMPLETE + PIPELINE_END + STATS_REPORT
228        assert_eq!(collected.len(), 4);
229        assert_eq!(collected[3].event_type, EventType::STATS_REPORT);
230        assert_eq!(collected[3].stats_deleted_objects, Some(1));
231        assert_eq!(collected[3].stats_deleted_bytes, Some(1024));
232    }
233
234    #[tokio::test]
235    async fn event_flag_filtering() {
236        let mut manager = EventManager::new();
237        let (callback, events) = CollectingCallback::new();
238        // Only register for DELETE_COMPLETE events
239        manager.register_callback(EventType::DELETE_COMPLETE, callback, false);
240
241        manager
242            .trigger_event(EventData::new(EventType::PIPELINE_START))
243            .await;
244        manager
245            .trigger_event(EventData::new(EventType::DELETE_COMPLETE))
246            .await;
247
248        let collected = events.lock().await;
249        // Only DELETE_COMPLETE should be dispatched
250        assert_eq!(collected.len(), 1);
251        assert_eq!(collected[0].event_type, EventType::DELETE_COMPLETE);
252    }
253
254    #[tokio::test]
255    async fn pipeline_end_without_stats_flag_skips_stats_report() {
256        let mut manager = EventManager::new();
257        let (callback, events) = CollectingCallback::new();
258        // Register for PIPELINE_START and PIPELINE_END, but NOT STATS_REPORT
259        manager.register_callback(
260            EventType::PIPELINE_START | EventType::PIPELINE_END,
261            callback,
262            false,
263        );
264
265        manager
266            .trigger_event(EventData::new(EventType::PIPELINE_START))
267            .await;
268        manager
269            .trigger_event(EventData::new(EventType::PIPELINE_END))
270            .await;
271
272        let collected = events.lock().await;
273        // Should receive PIPELINE_START + PIPELINE_END only, no STATS_REPORT
274        assert_eq!(collected.len(), 2);
275        assert_eq!(collected[0].event_type, EventType::PIPELINE_START);
276        assert_eq!(collected[1].event_type, EventType::PIPELINE_END);
277    }
278
279    #[tokio::test]
280    async fn dry_run_flag_propagated() {
281        let mut manager = EventManager::new();
282        let (callback, events) = CollectingCallback::new();
283        manager.register_callback(EventType::ALL_EVENTS, callback, true);
284
285        manager
286            .trigger_event(EventData::new(EventType::PIPELINE_START))
287            .await;
288
289        let collected = events.lock().await;
290        assert!(collected[0].dry_run);
291    }
292
293    #[test]
294    fn debug_format() {
295        let manager = EventManager::new();
296        let debug = format!("{manager:?}");
297        assert!(debug.contains("callback_registered: false"));
298    }
299
300    #[test]
301    fn pipeline_stats_to_event_data() {
302        let stats = PipelineStats {
303            pipeline_start_time: None,
304            stats_deleted_objects: 100,
305            stats_deleted_bytes: 50_000,
306            stats_failed_objects: 5,
307            stats_skipped_objects: 10,
308            stats_error_count: 1,
309            stats_duration_sec: 10.0,
310            stats_objects_per_sec: 10.0,
311        };
312
313        let event_data: EventData = stats.into();
314        assert_eq!(event_data.event_type, EventType::STATS_REPORT);
315        assert_eq!(event_data.stats_deleted_objects, Some(100));
316        assert_eq!(event_data.stats_deleted_bytes, Some(50_000));
317        assert_eq!(event_data.stats_failed_objects, Some(5));
318        assert_eq!(event_data.stats_skipped_objects, Some(10));
319    }
320}