s3rm_rs/callback/
event_manager.rs1use std::fmt;
7use std::sync::Arc;
8
9use tokio::sync::Mutex;
10use tokio::time::Instant;
11
12use crate::types::event_callback::{EventCallback, EventData, EventType};
13
14#[derive(Default, Debug, Clone)]
16pub struct PipelineStats {
17 pub pipeline_start_time: Option<Instant>,
18 pub stats_deleted_objects: u64,
19 pub stats_deleted_bytes: u64,
20 pub stats_failed_objects: u64,
21 pub stats_skipped_objects: u64,
22 pub stats_error_count: u64,
23 pub stats_duration_sec: f64,
24 pub stats_objects_per_sec: f64,
25}
26
27impl From<PipelineStats> for EventData {
28 fn from(stats: PipelineStats) -> Self {
29 let mut event_data = EventData::new(EventType::STATS_REPORT);
30 event_data.stats_deleted_objects = Some(stats.stats_deleted_objects);
31 event_data.stats_deleted_bytes = Some(stats.stats_deleted_bytes);
32 event_data.stats_failed_objects = Some(stats.stats_failed_objects);
33 event_data.stats_skipped_objects = Some(stats.stats_skipped_objects);
34 event_data.stats_error_count = Some(stats.stats_error_count);
35 event_data.stats_duration_sec = Some(stats.stats_duration_sec);
36 event_data.stats_objects_per_sec = Some(stats.stats_objects_per_sec);
37 event_data
38 }
39}
40
41#[derive(Clone)]
47pub struct EventManager {
48 pub event_callback: Option<Arc<Mutex<Box<dyn EventCallback + Send + Sync>>>>,
49 pub event_flags: EventType,
50 pub dry_run: bool,
51 pub pipeline_stats: Arc<Mutex<PipelineStats>>,
52}
53
54impl Default for EventManager {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60impl EventManager {
61 pub fn new() -> Self {
62 Self {
63 event_callback: None,
64 event_flags: EventType::ALL_EVENTS,
65 dry_run: false,
66 pipeline_stats: Arc::new(Mutex::new(PipelineStats::default())),
67 }
68 }
69
70 pub fn register_callback<T: EventCallback + Send + Sync + 'static>(
72 &mut self,
73 events_flag: EventType,
74 callback: T,
75 dry_run: bool,
76 ) {
77 self.event_callback = Some(Arc::new(Mutex::new(Box::new(callback))));
78 self.event_flags = events_flag;
79 self.dry_run = dry_run;
80 }
81
82 pub fn is_callback_registered(&self) -> bool {
84 self.event_callback.is_some()
85 }
86
87 pub async fn trigger_event(&self, mut event_data: EventData) {
89 self.update_pipeline_stats(&event_data).await;
90
91 if let Some(callback) = &self.event_callback {
92 let event_type = event_data.event_type;
93 if self.event_flags.contains(event_type) {
94 event_data.dry_run = self.dry_run;
95 callback.lock().await.on_event(event_data).await;
96 }
97 if event_type == EventType::PIPELINE_END
99 && self.event_flags.contains(EventType::STATS_REPORT)
100 {
101 let stats = self.pipeline_stats.lock().await.clone();
102 let mut stats_event: EventData = stats.into();
103 stats_event.dry_run = self.dry_run;
104 callback.lock().await.on_event(stats_event).await;
105 }
106 }
107 }
108
109 async fn update_pipeline_stats(&self, event_data: &EventData) {
110 let mut stats = self.pipeline_stats.lock().await;
111
112 match event_data.event_type {
113 EventType::PIPELINE_START => {
114 stats.pipeline_start_time = Some(Instant::now());
115 }
116 EventType::PIPELINE_END => {
117 if let Some(start) = stats.pipeline_start_time {
118 stats.stats_duration_sec = start.elapsed().as_secs_f64();
119 if stats.stats_duration_sec > 1.0 {
120 stats.stats_objects_per_sec =
121 stats.stats_deleted_objects as f64 / stats.stats_duration_sec;
122 } else {
123 stats.stats_objects_per_sec = stats.stats_deleted_objects as f64;
124 }
125 }
126 }
127 EventType::DELETE_COMPLETE => {
128 stats.stats_deleted_objects += 1;
129 if let Some(size) = event_data.size {
130 stats.stats_deleted_bytes += size;
131 }
132 }
133 EventType::DELETE_FAILED => {
134 stats.stats_failed_objects += 1;
135 }
136 EventType::DELETE_FILTERED => {
137 stats.stats_skipped_objects += 1;
138 }
139 EventType::PIPELINE_ERROR => {
140 stats.stats_error_count += 1;
141 }
142 _ => {}
143 }
144 }
145}
146
147impl fmt::Debug for EventManager {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 f.debug_struct("EventManager")
150 .field("event_flags", &self.event_flags)
151 .field("callback_registered", &self.event_callback.is_some())
152 .finish()
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use async_trait::async_trait;
160
161 struct CollectingCallback {
162 events: Arc<Mutex<Vec<EventData>>>,
163 }
164
165 impl CollectingCallback {
166 fn new() -> (Self, Arc<Mutex<Vec<EventData>>>) {
167 let events = Arc::new(Mutex::new(Vec::new()));
168 (
169 Self {
170 events: events.clone(),
171 },
172 events,
173 )
174 }
175 }
176
177 #[async_trait]
178 impl EventCallback for CollectingCallback {
179 async fn on_event(&mut self, event_data: EventData) {
180 self.events.lock().await.push(event_data);
181 }
182 }
183
184 #[tokio::test]
185 async fn new_manager_has_no_callback() {
186 let manager = EventManager::new();
187 assert!(!manager.is_callback_registered());
188 }
189
190 #[tokio::test]
191 async fn register_callback_and_trigger_event() {
192 let mut manager = EventManager::new();
193 let (callback, events) = CollectingCallback::new();
194 manager.register_callback(EventType::ALL_EVENTS, callback, false);
195 assert!(manager.is_callback_registered());
196
197 let event_data = EventData::new(EventType::PIPELINE_START);
198 manager.trigger_event(event_data).await;
199
200 let collected = events.lock().await;
201 assert_eq!(collected.len(), 1);
202 assert_eq!(collected[0].event_type, EventType::PIPELINE_START);
203 }
204
205 #[tokio::test]
206 async fn pipeline_end_sends_stats_report() {
207 let mut manager = EventManager::new();
208 let (callback, events) = CollectingCallback::new();
209 manager.register_callback(EventType::ALL_EVENTS, callback, false);
210
211 manager
213 .trigger_event(EventData::new(EventType::PIPELINE_START))
214 .await;
215
216 let mut delete_event = EventData::new(EventType::DELETE_COMPLETE);
218 delete_event.size = Some(1024);
219 manager.trigger_event(delete_event).await;
220
221 manager
223 .trigger_event(EventData::new(EventType::PIPELINE_END))
224 .await;
225
226 let collected = events.lock().await;
227 assert_eq!(collected.len(), 4);
229 assert_eq!(collected[3].event_type, EventType::STATS_REPORT);
230 assert_eq!(collected[3].stats_deleted_objects, Some(1));
231 assert_eq!(collected[3].stats_deleted_bytes, Some(1024));
232 }
233
234 #[tokio::test]
235 async fn event_flag_filtering() {
236 let mut manager = EventManager::new();
237 let (callback, events) = CollectingCallback::new();
238 manager.register_callback(EventType::DELETE_COMPLETE, callback, false);
240
241 manager
242 .trigger_event(EventData::new(EventType::PIPELINE_START))
243 .await;
244 manager
245 .trigger_event(EventData::new(EventType::DELETE_COMPLETE))
246 .await;
247
248 let collected = events.lock().await;
249 assert_eq!(collected.len(), 1);
251 assert_eq!(collected[0].event_type, EventType::DELETE_COMPLETE);
252 }
253
254 #[tokio::test]
255 async fn pipeline_end_without_stats_flag_skips_stats_report() {
256 let mut manager = EventManager::new();
257 let (callback, events) = CollectingCallback::new();
258 manager.register_callback(
260 EventType::PIPELINE_START | EventType::PIPELINE_END,
261 callback,
262 false,
263 );
264
265 manager
266 .trigger_event(EventData::new(EventType::PIPELINE_START))
267 .await;
268 manager
269 .trigger_event(EventData::new(EventType::PIPELINE_END))
270 .await;
271
272 let collected = events.lock().await;
273 assert_eq!(collected.len(), 2);
275 assert_eq!(collected[0].event_type, EventType::PIPELINE_START);
276 assert_eq!(collected[1].event_type, EventType::PIPELINE_END);
277 }
278
279 #[tokio::test]
280 async fn dry_run_flag_propagated() {
281 let mut manager = EventManager::new();
282 let (callback, events) = CollectingCallback::new();
283 manager.register_callback(EventType::ALL_EVENTS, callback, true);
284
285 manager
286 .trigger_event(EventData::new(EventType::PIPELINE_START))
287 .await;
288
289 let collected = events.lock().await;
290 assert!(collected[0].dry_run);
291 }
292
293 #[test]
294 fn debug_format() {
295 let manager = EventManager::new();
296 let debug = format!("{manager:?}");
297 assert!(debug.contains("callback_registered: false"));
298 }
299
300 #[test]
301 fn pipeline_stats_to_event_data() {
302 let stats = PipelineStats {
303 pipeline_start_time: None,
304 stats_deleted_objects: 100,
305 stats_deleted_bytes: 50_000,
306 stats_failed_objects: 5,
307 stats_skipped_objects: 10,
308 stats_error_count: 1,
309 stats_duration_sec: 10.0,
310 stats_objects_per_sec: 10.0,
311 };
312
313 let event_data: EventData = stats.into();
314 assert_eq!(event_data.event_type, EventType::STATS_REPORT);
315 assert_eq!(event_data.stats_deleted_objects, Some(100));
316 assert_eq!(event_data.stats_deleted_bytes, Some(50_000));
317 assert_eq!(event_data.stats_failed_objects, Some(5));
318 assert_eq!(event_data.stats_skipped_objects, Some(10));
319 }
320}