1use crate::http_pool::{HttpClientPool, HttpConfig};
33use std::{
34 collections::HashMap,
35 sync::{Arc, Mutex},
36};
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize)]
40pub enum LifecycleEventType {
41 ContentAdded,
43 ContentAccessed,
45 ContentRemoved,
47 ContentPinned,
49 ContentUnpinned,
51 ChunkTransferred,
53 ProofGenerated,
55 QuotaExceeded,
57 VerificationFailed,
59 PeerConnected,
61 PeerDisconnected,
63}
64
65#[derive(Debug, Clone, serde::Serialize)]
67pub struct ContentEvent {
68 pub cid: String,
70 pub event_type: LifecycleEventType,
72 pub size_bytes: Option<u64>,
74 pub peer_id: Option<String>,
76 pub metadata: Option<HashMap<String, String>>,
78}
79
80impl ContentEvent {
81 #[inline]
83 #[must_use]
84 pub fn simple(cid: String, event_type: LifecycleEventType) -> Self {
85 Self {
86 cid,
87 event_type,
88 size_bytes: None,
89 peer_id: None,
90 metadata: None,
91 }
92 }
93
94 #[inline]
96 #[must_use]
97 pub fn with_size(cid: String, event_type: LifecycleEventType, size_bytes: u64) -> Self {
98 Self {
99 cid,
100 event_type,
101 size_bytes: Some(size_bytes),
102 peer_id: None,
103 metadata: None,
104 }
105 }
106
107 #[inline]
109 #[must_use]
110 pub fn with_peer(cid: String, event_type: LifecycleEventType, peer_id: String) -> Self {
111 Self {
112 cid,
113 event_type,
114 size_bytes: None,
115 peer_id: Some(peer_id),
116 metadata: None,
117 }
118 }
119
120 #[inline]
122 #[must_use]
123 pub fn with_metadata(mut self, key: String, value: String) -> Self {
124 if self.metadata.is_none() {
125 self.metadata = Some(HashMap::new());
126 }
127 if let Some(ref mut metadata) = self.metadata {
128 metadata.insert(key, value);
129 }
130 self
131 }
132}
133
134pub type EventHandler = Arc<dyn Fn(&ContentEvent) + Send + Sync>;
136
137#[derive(Debug, Clone)]
139pub struct WebhookConfig {
140 pub url: String,
142 pub events: Vec<LifecycleEventType>,
144 pub auth_header: Option<String>,
146 pub max_retries: u32,
148 pub timeout_ms: u64,
150}
151
152impl WebhookConfig {
153 #[inline]
155 #[must_use]
156 pub fn new(url: String) -> Self {
157 Self {
158 url,
159 events: vec![],
160 auth_header: None,
161 max_retries: 3,
162 timeout_ms: 5000,
163 }
164 }
165
166 #[inline]
168 #[must_use]
169 pub fn for_events(mut self, events: Vec<LifecycleEventType>) -> Self {
170 self.events = events;
171 self
172 }
173
174 #[inline]
176 #[must_use]
177 pub fn with_auth(mut self, header: String) -> Self {
178 self.auth_header = Some(header);
179 self
180 }
181}
182
183#[derive(Debug, Clone)]
185pub struct EventHistoryEntry {
186 pub event: ContentEvent,
188 pub timestamp_ms: u64,
190}
191
192pub struct LifecycleEventManager {
194 handlers: Arc<Mutex<HashMap<LifecycleEventType, Vec<EventHandler>>>>,
196 webhooks: Arc<Mutex<Vec<WebhookConfig>>>,
198 history: Arc<Mutex<VecDeque<EventHistoryEntry>>>,
200 max_history_size: usize,
202 stats: Arc<Mutex<HashMap<LifecycleEventType, u64>>>,
204 http_pool: Arc<HttpClientPool>,
206}
207
208use std::collections::VecDeque;
209
210async fn send_webhook_request(
212 http_pool: &HttpClientPool,
213 webhook: &WebhookConfig,
214 event: &ContentEvent,
215) -> Result<(), crate::http_pool::HttpError> {
216 let json_body = serde_json::to_value(event)
218 .map_err(|e| crate::http_pool::HttpError::Serialization(e.to_string()))?;
219
220 let request = http_pool.post_json(&webhook.url, json_body).await?;
222
223 if request.status().is_success() {
225 Ok(())
226 } else {
227 Err(crate::http_pool::HttpError::Response {
228 status: request.status(),
229 message: format!("Webhook failed with status {}", request.status()),
230 })
231 }
232}
233
234impl LifecycleEventManager {
235 #[must_use]
237 pub fn new() -> Self {
238 Self {
239 handlers: Arc::new(Mutex::new(HashMap::new())),
240 webhooks: Arc::new(Mutex::new(Vec::new())),
241 history: Arc::new(Mutex::new(VecDeque::new())),
242 max_history_size: 1000,
243 stats: Arc::new(Mutex::new(HashMap::new())),
244 http_pool: Arc::new(HttpClientPool::new(HttpConfig::default())),
245 }
246 }
247
248 #[must_use]
250 #[inline]
251 pub fn with_history_size(max_history_size: usize) -> Self {
252 Self {
253 handlers: Arc::new(Mutex::new(HashMap::new())),
254 webhooks: Arc::new(Mutex::new(Vec::new())),
255 history: Arc::new(Mutex::new(VecDeque::new())),
256 max_history_size,
257 stats: Arc::new(Mutex::new(HashMap::new())),
258 http_pool: Arc::new(HttpClientPool::new(HttpConfig::default())),
259 }
260 }
261
262 pub fn on<F>(&mut self, event_type: LifecycleEventType, handler: F)
264 where
265 F: Fn(&ContentEvent) + Send + Sync + 'static,
266 {
267 let mut handlers = self.handlers.lock().unwrap();
268 handlers
269 .entry(event_type)
270 .or_default()
271 .push(Arc::new(handler));
272 }
273
274 pub fn register_webhook(&mut self, config: WebhookConfig) {
276 let mut webhooks = self.webhooks.lock().unwrap();
277 webhooks.push(config);
278 }
279
280 pub async fn emit(&self, event: ContentEvent) {
282 {
284 let mut stats = self.stats.lock().unwrap();
285 *stats.entry(event.event_type).or_insert(0) += 1;
286 }
287
288 {
290 let mut history = self.history.lock().unwrap();
291 history.push_back(EventHistoryEntry {
292 event: event.clone(),
293 timestamp_ms: crate::utils::current_timestamp_ms() as u64,
294 });
295
296 while history.len() > self.max_history_size {
298 history.pop_front();
299 }
300 }
301
302 {
304 let handlers = self.handlers.lock().unwrap();
305 if let Some(handlers_list) = handlers.get(&event.event_type) {
306 for handler in handlers_list {
307 handler(&event);
308 }
309 }
310 }
311
312 self.trigger_webhooks(&event).await;
314 }
315
316 async fn trigger_webhooks(&self, event: &ContentEvent) {
318 let webhooks = self.webhooks.lock().unwrap().clone();
319
320 for webhook in webhooks {
321 if !webhook.events.is_empty() && !webhook.events.contains(&event.event_type) {
323 continue;
324 }
325
326 let http_pool = Arc::clone(&self.http_pool);
328 let event_clone = event.clone();
329 let webhook_clone = webhook.clone();
330
331 tokio::spawn(async move {
333 for attempt in 0..=webhook_clone.max_retries {
335 match send_webhook_request(&http_pool, &webhook_clone, &event_clone).await {
336 Ok(_) => {
337 break;
339 }
340 Err(e) => {
341 eprintln!(
343 "Webhook delivery failed (attempt {}/{}): {}",
344 attempt + 1,
345 webhook_clone.max_retries + 1,
346 e
347 );
348
349 if attempt < webhook_clone.max_retries {
351 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
353 }
354 }
355 }
356 }
357 });
358 }
359 }
360
361 #[must_use]
363 #[inline]
364 pub fn get_history(&self, event_type: Option<LifecycleEventType>) -> Vec<EventHistoryEntry> {
365 let history = self.history.lock().unwrap();
366 match event_type {
367 Some(et) => history
368 .iter()
369 .filter(|entry| entry.event.event_type == et)
370 .cloned()
371 .collect(),
372 None => history.iter().cloned().collect(),
373 }
374 }
375
376 #[must_use]
378 #[inline]
379 pub fn get_recent(&self, count: usize) -> Vec<EventHistoryEntry> {
380 let history = self.history.lock().unwrap();
381 history.iter().rev().take(count).cloned().collect()
382 }
383
384 #[must_use]
386 #[inline]
387 pub fn get_event_count(&self, event_type: LifecycleEventType) -> u64 {
388 self.stats
389 .lock()
390 .unwrap()
391 .get(&event_type)
392 .copied()
393 .unwrap_or(0)
394 }
395
396 #[must_use]
398 #[inline]
399 pub fn get_total_event_count(&self) -> u64 {
400 self.stats.lock().unwrap().values().sum()
401 }
402
403 #[must_use]
405 #[inline]
406 pub fn get_stats(&self) -> HashMap<LifecycleEventType, u64> {
407 self.stats.lock().unwrap().clone()
408 }
409
410 pub fn clear_history(&mut self) {
412 self.history.lock().unwrap().clear();
413 }
414
415 pub fn reset_stats(&mut self) {
417 self.stats.lock().unwrap().clear();
418 }
419
420 pub fn clear_handlers(&mut self, event_type: LifecycleEventType) {
422 self.handlers.lock().unwrap().remove(&event_type);
423 }
424
425 pub fn clear_webhooks(&mut self) {
427 self.webhooks.lock().unwrap().clear();
428 }
429}
430
431impl Default for LifecycleEventManager {
432 #[inline]
433 fn default() -> Self {
434 Self::new()
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use std::sync::atomic::{AtomicU32, Ordering};
442
443 #[tokio::test]
444 async fn test_event_creation() {
445 let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded);
446 assert_eq!(event.cid, "QmTest");
447 assert_eq!(event.event_type, LifecycleEventType::ContentAdded);
448 assert!(event.size_bytes.is_none());
449 }
450
451 #[tokio::test]
452 async fn test_event_with_size() {
453 let event =
454 ContentEvent::with_size("QmTest".to_string(), LifecycleEventType::ContentAdded, 1024);
455 assert_eq!(event.size_bytes, Some(1024));
456 }
457
458 #[tokio::test]
459 async fn test_event_with_peer() {
460 let event = ContentEvent::with_peer(
461 "QmTest".to_string(),
462 LifecycleEventType::ChunkTransferred,
463 "peer123".to_string(),
464 );
465 assert_eq!(event.peer_id, Some("peer123".to_string()));
466 }
467
468 #[tokio::test]
469 async fn test_event_with_metadata() {
470 let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded)
471 .with_metadata("key1".to_string(), "value1".to_string())
472 .with_metadata("key2".to_string(), "value2".to_string());
473
474 assert!(event.metadata.is_some());
475 let metadata = event.metadata.unwrap();
476 assert_eq!(metadata.get("key1"), Some(&"value1".to_string()));
477 assert_eq!(metadata.get("key2"), Some(&"value2".to_string()));
478 }
479
480 #[tokio::test]
481 async fn test_handler_registration() {
482 let mut manager = LifecycleEventManager::new();
483 let counter = Arc::new(AtomicU32::new(0));
484 let counter_clone = counter.clone();
485
486 manager.on(LifecycleEventType::ContentAdded, move |_event| {
487 counter_clone.fetch_add(1, Ordering::SeqCst);
488 });
489
490 let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded);
491 manager.emit(event).await;
492
493 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
495
496 assert_eq!(counter.load(Ordering::SeqCst), 1);
497 }
498
499 #[tokio::test]
500 async fn test_multiple_handlers() {
501 let mut manager = LifecycleEventManager::new();
502 let counter = Arc::new(AtomicU32::new(0));
503
504 let counter1 = counter.clone();
505 manager.on(LifecycleEventType::ContentAdded, move |_event| {
506 counter1.fetch_add(1, Ordering::SeqCst);
507 });
508
509 let counter2 = counter.clone();
510 manager.on(LifecycleEventType::ContentAdded, move |_event| {
511 counter2.fetch_add(1, Ordering::SeqCst);
512 });
513
514 let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded);
515 manager.emit(event).await;
516
517 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
518
519 assert_eq!(counter.load(Ordering::SeqCst), 2);
520 }
521
522 #[tokio::test]
523 async fn test_event_history() {
524 let manager = LifecycleEventManager::new();
525
526 let event1 = ContentEvent::simple("QmTest1".to_string(), LifecycleEventType::ContentAdded);
527 let event2 =
528 ContentEvent::simple("QmTest2".to_string(), LifecycleEventType::ContentAccessed);
529
530 manager.emit(event1).await;
531 manager.emit(event2).await;
532
533 let history = manager.get_history(None);
534 assert_eq!(history.len(), 2);
535 }
536
537 #[tokio::test]
538 async fn test_filtered_history() {
539 let manager = LifecycleEventManager::new();
540
541 manager
542 .emit(ContentEvent::simple(
543 "Qm1".to_string(),
544 LifecycleEventType::ContentAdded,
545 ))
546 .await;
547 manager
548 .emit(ContentEvent::simple(
549 "Qm2".to_string(),
550 LifecycleEventType::ContentAccessed,
551 ))
552 .await;
553 manager
554 .emit(ContentEvent::simple(
555 "Qm3".to_string(),
556 LifecycleEventType::ContentAdded,
557 ))
558 .await;
559
560 let history = manager.get_history(Some(LifecycleEventType::ContentAdded));
561 assert_eq!(history.len(), 2);
562 }
563
564 #[tokio::test]
565 async fn test_recent_events() {
566 let manager = LifecycleEventManager::new();
567
568 for i in 0..5 {
569 manager
570 .emit(ContentEvent::simple(
571 format!("Qm{}", i),
572 LifecycleEventType::ContentAdded,
573 ))
574 .await;
575 }
576
577 let recent = manager.get_recent(3);
578 assert_eq!(recent.len(), 3);
579 }
580
581 #[tokio::test]
582 async fn test_event_statistics() {
583 let manager = LifecycleEventManager::new();
584
585 manager
586 .emit(ContentEvent::simple(
587 "Qm1".to_string(),
588 LifecycleEventType::ContentAdded,
589 ))
590 .await;
591 manager
592 .emit(ContentEvent::simple(
593 "Qm2".to_string(),
594 LifecycleEventType::ContentAdded,
595 ))
596 .await;
597 manager
598 .emit(ContentEvent::simple(
599 "Qm3".to_string(),
600 LifecycleEventType::ContentAccessed,
601 ))
602 .await;
603
604 assert_eq!(manager.get_event_count(LifecycleEventType::ContentAdded), 2);
605 assert_eq!(
606 manager.get_event_count(LifecycleEventType::ContentAccessed),
607 1
608 );
609 assert_eq!(manager.get_total_event_count(), 3);
610 }
611
612 #[tokio::test]
613 async fn test_history_size_limit() {
614 let manager = LifecycleEventManager::with_history_size(5);
615
616 for i in 0..10 {
617 manager
618 .emit(ContentEvent::simple(
619 format!("Qm{}", i),
620 LifecycleEventType::ContentAdded,
621 ))
622 .await;
623 }
624
625 let history = manager.get_history(None);
626 assert_eq!(history.len(), 5);
627 }
628
629 #[tokio::test]
630 async fn test_clear_history() {
631 let mut manager = LifecycleEventManager::new();
632
633 manager
634 .emit(ContentEvent::simple(
635 "Qm1".to_string(),
636 LifecycleEventType::ContentAdded,
637 ))
638 .await;
639 manager
640 .emit(ContentEvent::simple(
641 "Qm2".to_string(),
642 LifecycleEventType::ContentAccessed,
643 ))
644 .await;
645
646 assert_eq!(manager.get_history(None).len(), 2);
647
648 manager.clear_history();
649 assert_eq!(manager.get_history(None).len(), 0);
650 }
651
652 #[tokio::test]
653 async fn test_reset_stats() {
654 let mut manager = LifecycleEventManager::new();
655
656 manager
657 .emit(ContentEvent::simple(
658 "Qm1".to_string(),
659 LifecycleEventType::ContentAdded,
660 ))
661 .await;
662 assert_eq!(manager.get_total_event_count(), 1);
663
664 manager.reset_stats();
665 assert_eq!(manager.get_total_event_count(), 0);
666 }
667
668 #[tokio::test]
669 async fn test_webhook_config() {
670 let webhook = WebhookConfig::new("https://example.com/webhook".to_string())
671 .for_events(vec![
672 LifecycleEventType::ContentAdded,
673 LifecycleEventType::ContentRemoved,
674 ])
675 .with_auth("Bearer token123".to_string());
676
677 assert_eq!(webhook.url, "https://example.com/webhook");
678 assert_eq!(webhook.events.len(), 2);
679 assert_eq!(webhook.auth_header, Some("Bearer token123".to_string()));
680 }
681
682 #[tokio::test]
683 async fn test_webhook_registration() {
684 let mut manager = LifecycleEventManager::new();
685 let webhook = WebhookConfig::new("https://example.com/webhook".to_string());
686
687 manager.register_webhook(webhook);
688
689 manager
691 .emit(ContentEvent::simple(
692 "Qm1".to_string(),
693 LifecycleEventType::ContentAdded,
694 ))
695 .await;
696 }
697
698 #[tokio::test]
699 async fn test_clear_handlers() {
700 let mut manager = LifecycleEventManager::new();
701 let counter = Arc::new(AtomicU32::new(0));
702 let counter_clone = counter.clone();
703
704 manager.on(LifecycleEventType::ContentAdded, move |_event| {
705 counter_clone.fetch_add(1, Ordering::SeqCst);
706 });
707
708 manager
709 .emit(ContentEvent::simple(
710 "Qm1".to_string(),
711 LifecycleEventType::ContentAdded,
712 ))
713 .await;
714 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
715 assert_eq!(counter.load(Ordering::SeqCst), 1);
716
717 manager.clear_handlers(LifecycleEventType::ContentAdded);
718 manager
719 .emit(ContentEvent::simple(
720 "Qm2".to_string(),
721 LifecycleEventType::ContentAdded,
722 ))
723 .await;
724 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
725 assert_eq!(counter.load(Ordering::SeqCst), 1); }
727}