chie_core/
lifecycle.rs

1//! Content lifecycle event system for webhooks and callbacks.
2//!
3//! This module provides an event-driven system for tracking content lifecycle operations.
4//! Applications can register event handlers to react to content additions, accesses, removals,
5//! and other lifecycle events.
6//!
7//! # Example
8//!
9//! ```rust
10//! use chie_core::lifecycle::{LifecycleEventManager, LifecycleEventType, ContentEvent};
11//!
12//! #[tokio::main]
13//! async fn main() {
14//!     let mut manager = LifecycleEventManager::new();
15//!
16//!     // Register an event handler
17//!     manager.on(LifecycleEventType::ContentAdded, |event| {
18//!         println!("Content added: {}", event.cid);
19//!     });
20//!
21//!     // Emit an event
22//!     manager.emit(ContentEvent {
23//!         cid: "QmExample".to_string(),
24//!         event_type: LifecycleEventType::ContentAdded,
25//!         size_bytes: Some(1024),
26//!         peer_id: None,
27//!         metadata: None,
28//!     }).await;
29//! }
30//! ```
31
32use crate::http_pool::{HttpClientPool, HttpConfig};
33use std::{
34    collections::HashMap,
35    sync::{Arc, Mutex},
36};
37
38/// Type of lifecycle event.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize)]
40pub enum LifecycleEventType {
41    /// Content was added to storage.
42    ContentAdded,
43    /// Content was accessed/requested.
44    ContentAccessed,
45    /// Content was removed from storage.
46    ContentRemoved,
47    /// Content was pinned.
48    ContentPinned,
49    /// Content was unpinned.
50    ContentUnpinned,
51    /// Chunk was transferred.
52    ChunkTransferred,
53    /// Bandwidth proof was generated.
54    ProofGenerated,
55    /// Storage quota exceeded.
56    QuotaExceeded,
57    /// Content verification failed.
58    VerificationFailed,
59    /// Peer connection established.
60    PeerConnected,
61    /// Peer connection lost.
62    PeerDisconnected,
63}
64
65/// A content lifecycle event.
66#[derive(Debug, Clone, serde::Serialize)]
67pub struct ContentEvent {
68    /// Content identifier.
69    pub cid: String,
70    /// Type of event.
71    pub event_type: LifecycleEventType,
72    /// Content size in bytes (if applicable).
73    pub size_bytes: Option<u64>,
74    /// Peer ID involved (if applicable).
75    pub peer_id: Option<String>,
76    /// Additional metadata (JSON-compatible).
77    pub metadata: Option<HashMap<String, String>>,
78}
79
80impl ContentEvent {
81    /// Create a simple event without optional fields.
82    #[inline]
83    #[must_use]
84    pub fn simple(cid: String, event_type: LifecycleEventType) -> Self {
85        Self {
86            cid,
87            event_type,
88            size_bytes: None,
89            peer_id: None,
90            metadata: None,
91        }
92    }
93
94    /// Create an event with size information.
95    #[inline]
96    #[must_use]
97    pub fn with_size(cid: String, event_type: LifecycleEventType, size_bytes: u64) -> Self {
98        Self {
99            cid,
100            event_type,
101            size_bytes: Some(size_bytes),
102            peer_id: None,
103            metadata: None,
104        }
105    }
106
107    /// Create an event with peer information.
108    #[inline]
109    #[must_use]
110    pub fn with_peer(cid: String, event_type: LifecycleEventType, peer_id: String) -> Self {
111        Self {
112            cid,
113            event_type,
114            size_bytes: None,
115            peer_id: Some(peer_id),
116            metadata: None,
117        }
118    }
119
120    /// Add metadata to the event.
121    #[inline]
122    #[must_use]
123    pub fn with_metadata(mut self, key: String, value: String) -> Self {
124        if self.metadata.is_none() {
125            self.metadata = Some(HashMap::new());
126        }
127        if let Some(ref mut metadata) = self.metadata {
128            metadata.insert(key, value);
129        }
130        self
131    }
132}
133
134/// Type alias for event handler functions.
135pub type EventHandler = Arc<dyn Fn(&ContentEvent) + Send + Sync>;
136
137/// Webhook configuration for HTTP callbacks.
138#[derive(Debug, Clone)]
139pub struct WebhookConfig {
140    /// Webhook URL.
141    pub url: String,
142    /// Events to trigger this webhook.
143    pub events: Vec<LifecycleEventType>,
144    /// Authentication header (optional).
145    pub auth_header: Option<String>,
146    /// Maximum retry attempts.
147    pub max_retries: u32,
148    /// Timeout in milliseconds.
149    pub timeout_ms: u64,
150}
151
152impl WebhookConfig {
153    /// Create a new webhook config for a URL.
154    #[inline]
155    #[must_use]
156    pub fn new(url: String) -> Self {
157        Self {
158            url,
159            events: vec![],
160            auth_header: None,
161            max_retries: 3,
162            timeout_ms: 5000,
163        }
164    }
165
166    /// Set which events should trigger this webhook.
167    #[inline]
168    #[must_use]
169    pub fn for_events(mut self, events: Vec<LifecycleEventType>) -> Self {
170        self.events = events;
171        self
172    }
173
174    /// Add an authentication header.
175    #[inline]
176    #[must_use]
177    pub fn with_auth(mut self, header: String) -> Self {
178        self.auth_header = Some(header);
179        self
180    }
181}
182
183/// Event history entry.
184#[derive(Debug, Clone)]
185pub struct EventHistoryEntry {
186    /// The event that occurred.
187    pub event: ContentEvent,
188    /// Timestamp in milliseconds since epoch.
189    pub timestamp_ms: u64,
190}
191
192/// Lifecycle event manager for handling content events.
193pub struct LifecycleEventManager {
194    /// Event handlers by event type.
195    handlers: Arc<Mutex<HashMap<LifecycleEventType, Vec<EventHandler>>>>,
196    /// Webhook configurations.
197    webhooks: Arc<Mutex<Vec<WebhookConfig>>>,
198    /// Event history (limited size).
199    history: Arc<Mutex<VecDeque<EventHistoryEntry>>>,
200    /// Maximum history size.
201    max_history_size: usize,
202    /// Event statistics.
203    stats: Arc<Mutex<HashMap<LifecycleEventType, u64>>>,
204    /// HTTP client pool for webhook requests.
205    http_pool: Arc<HttpClientPool>,
206}
207
208use std::collections::VecDeque;
209
210/// Send a webhook HTTP POST request for an event.
211async fn send_webhook_request(
212    http_pool: &HttpClientPool,
213    webhook: &WebhookConfig,
214    event: &ContentEvent,
215) -> Result<(), crate::http_pool::HttpError> {
216    // Serialize event to JSON
217    let json_body = serde_json::to_value(event)
218        .map_err(|e| crate::http_pool::HttpError::Serialization(e.to_string()))?;
219
220    // Build the request with timeout
221    let request = http_pool.post_json(&webhook.url, json_body).await?;
222
223    // Check response status
224    if request.status().is_success() {
225        Ok(())
226    } else {
227        Err(crate::http_pool::HttpError::Response {
228            status: request.status(),
229            message: format!("Webhook failed with status {}", request.status()),
230        })
231    }
232}
233
234impl LifecycleEventManager {
235    /// Create a new lifecycle event manager.
236    #[must_use]
237    pub fn new() -> Self {
238        Self {
239            handlers: Arc::new(Mutex::new(HashMap::new())),
240            webhooks: Arc::new(Mutex::new(Vec::new())),
241            history: Arc::new(Mutex::new(VecDeque::new())),
242            max_history_size: 1000,
243            stats: Arc::new(Mutex::new(HashMap::new())),
244            http_pool: Arc::new(HttpClientPool::new(HttpConfig::default())),
245        }
246    }
247
248    /// Create a new manager with custom history size.
249    #[must_use]
250    #[inline]
251    pub fn with_history_size(max_history_size: usize) -> Self {
252        Self {
253            handlers: Arc::new(Mutex::new(HashMap::new())),
254            webhooks: Arc::new(Mutex::new(Vec::new())),
255            history: Arc::new(Mutex::new(VecDeque::new())),
256            max_history_size,
257            stats: Arc::new(Mutex::new(HashMap::new())),
258            http_pool: Arc::new(HttpClientPool::new(HttpConfig::default())),
259        }
260    }
261
262    /// Register an event handler for a specific event type.
263    pub fn on<F>(&mut self, event_type: LifecycleEventType, handler: F)
264    where
265        F: Fn(&ContentEvent) + Send + Sync + 'static,
266    {
267        let mut handlers = self.handlers.lock().unwrap();
268        handlers
269            .entry(event_type)
270            .or_default()
271            .push(Arc::new(handler));
272    }
273
274    /// Register a webhook for HTTP callbacks.
275    pub fn register_webhook(&mut self, config: WebhookConfig) {
276        let mut webhooks = self.webhooks.lock().unwrap();
277        webhooks.push(config);
278    }
279
280    /// Emit an event, triggering all registered handlers.
281    pub async fn emit(&self, event: ContentEvent) {
282        // Update statistics
283        {
284            let mut stats = self.stats.lock().unwrap();
285            *stats.entry(event.event_type).or_insert(0) += 1;
286        }
287
288        // Add to history
289        {
290            let mut history = self.history.lock().unwrap();
291            history.push_back(EventHistoryEntry {
292                event: event.clone(),
293                timestamp_ms: crate::utils::current_timestamp_ms() as u64,
294            });
295
296            // Trim history if needed
297            while history.len() > self.max_history_size {
298                history.pop_front();
299            }
300        }
301
302        // Call handlers
303        {
304            let handlers = self.handlers.lock().unwrap();
305            if let Some(handlers_list) = handlers.get(&event.event_type) {
306                for handler in handlers_list {
307                    handler(&event);
308                }
309            }
310        }
311
312        // Trigger webhooks (in background)
313        self.trigger_webhooks(&event).await;
314    }
315
316    /// Trigger webhooks for an event (async).
317    async fn trigger_webhooks(&self, event: &ContentEvent) {
318        let webhooks = self.webhooks.lock().unwrap().clone();
319
320        for webhook in webhooks {
321            // Check if this webhook should be triggered for this event type
322            if !webhook.events.is_empty() && !webhook.events.contains(&event.event_type) {
323                continue;
324            }
325
326            // Clone the http_pool Arc for this task
327            let http_pool = Arc::clone(&self.http_pool);
328            let event_clone = event.clone();
329            let webhook_clone = webhook.clone();
330
331            // Spawn a background task to send the webhook
332            tokio::spawn(async move {
333                // Attempt to send webhook with retries
334                for attempt in 0..=webhook_clone.max_retries {
335                    match send_webhook_request(&http_pool, &webhook_clone, &event_clone).await {
336                        Ok(_) => {
337                            // Success - webhook delivered
338                            break;
339                        }
340                        Err(e) => {
341                            // Log error (in production, use proper logging)
342                            eprintln!(
343                                "Webhook delivery failed (attempt {}/{}): {}",
344                                attempt + 1,
345                                webhook_clone.max_retries + 1,
346                                e
347                            );
348
349                            // Don't retry if this was the last attempt
350                            if attempt < webhook_clone.max_retries {
351                                // Brief delay before retry
352                                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
353                            }
354                        }
355                    }
356                }
357            });
358        }
359    }
360
361    /// Get event history for a specific event type.
362    #[must_use]
363    #[inline]
364    pub fn get_history(&self, event_type: Option<LifecycleEventType>) -> Vec<EventHistoryEntry> {
365        let history = self.history.lock().unwrap();
366        match event_type {
367            Some(et) => history
368                .iter()
369                .filter(|entry| entry.event.event_type == et)
370                .cloned()
371                .collect(),
372            None => history.iter().cloned().collect(),
373        }
374    }
375
376    /// Get recent events (last N).
377    #[must_use]
378    #[inline]
379    pub fn get_recent(&self, count: usize) -> Vec<EventHistoryEntry> {
380        let history = self.history.lock().unwrap();
381        history.iter().rev().take(count).cloned().collect()
382    }
383
384    /// Get event count for a specific type.
385    #[must_use]
386    #[inline]
387    pub fn get_event_count(&self, event_type: LifecycleEventType) -> u64 {
388        self.stats
389            .lock()
390            .unwrap()
391            .get(&event_type)
392            .copied()
393            .unwrap_or(0)
394    }
395
396    /// Get total event count across all types.
397    #[must_use]
398    #[inline]
399    pub fn get_total_event_count(&self) -> u64 {
400        self.stats.lock().unwrap().values().sum()
401    }
402
403    /// Get all event statistics.
404    #[must_use]
405    #[inline]
406    pub fn get_stats(&self) -> HashMap<LifecycleEventType, u64> {
407        self.stats.lock().unwrap().clone()
408    }
409
410    /// Clear event history.
411    pub fn clear_history(&mut self) {
412        self.history.lock().unwrap().clear();
413    }
414
415    /// Reset event statistics.
416    pub fn reset_stats(&mut self) {
417        self.stats.lock().unwrap().clear();
418    }
419
420    /// Remove all handlers for an event type.
421    pub fn clear_handlers(&mut self, event_type: LifecycleEventType) {
422        self.handlers.lock().unwrap().remove(&event_type);
423    }
424
425    /// Remove all webhooks.
426    pub fn clear_webhooks(&mut self) {
427        self.webhooks.lock().unwrap().clear();
428    }
429}
430
431impl Default for LifecycleEventManager {
432    #[inline]
433    fn default() -> Self {
434        Self::new()
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use std::sync::atomic::{AtomicU32, Ordering};
442
443    #[tokio::test]
444    async fn test_event_creation() {
445        let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded);
446        assert_eq!(event.cid, "QmTest");
447        assert_eq!(event.event_type, LifecycleEventType::ContentAdded);
448        assert!(event.size_bytes.is_none());
449    }
450
451    #[tokio::test]
452    async fn test_event_with_size() {
453        let event =
454            ContentEvent::with_size("QmTest".to_string(), LifecycleEventType::ContentAdded, 1024);
455        assert_eq!(event.size_bytes, Some(1024));
456    }
457
458    #[tokio::test]
459    async fn test_event_with_peer() {
460        let event = ContentEvent::with_peer(
461            "QmTest".to_string(),
462            LifecycleEventType::ChunkTransferred,
463            "peer123".to_string(),
464        );
465        assert_eq!(event.peer_id, Some("peer123".to_string()));
466    }
467
468    #[tokio::test]
469    async fn test_event_with_metadata() {
470        let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded)
471            .with_metadata("key1".to_string(), "value1".to_string())
472            .with_metadata("key2".to_string(), "value2".to_string());
473
474        assert!(event.metadata.is_some());
475        let metadata = event.metadata.unwrap();
476        assert_eq!(metadata.get("key1"), Some(&"value1".to_string()));
477        assert_eq!(metadata.get("key2"), Some(&"value2".to_string()));
478    }
479
480    #[tokio::test]
481    async fn test_handler_registration() {
482        let mut manager = LifecycleEventManager::new();
483        let counter = Arc::new(AtomicU32::new(0));
484        let counter_clone = counter.clone();
485
486        manager.on(LifecycleEventType::ContentAdded, move |_event| {
487            counter_clone.fetch_add(1, Ordering::SeqCst);
488        });
489
490        let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded);
491        manager.emit(event).await;
492
493        // Wait a bit for handler to execute
494        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
495
496        assert_eq!(counter.load(Ordering::SeqCst), 1);
497    }
498
499    #[tokio::test]
500    async fn test_multiple_handlers() {
501        let mut manager = LifecycleEventManager::new();
502        let counter = Arc::new(AtomicU32::new(0));
503
504        let counter1 = counter.clone();
505        manager.on(LifecycleEventType::ContentAdded, move |_event| {
506            counter1.fetch_add(1, Ordering::SeqCst);
507        });
508
509        let counter2 = counter.clone();
510        manager.on(LifecycleEventType::ContentAdded, move |_event| {
511            counter2.fetch_add(1, Ordering::SeqCst);
512        });
513
514        let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded);
515        manager.emit(event).await;
516
517        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
518
519        assert_eq!(counter.load(Ordering::SeqCst), 2);
520    }
521
522    #[tokio::test]
523    async fn test_event_history() {
524        let manager = LifecycleEventManager::new();
525
526        let event1 = ContentEvent::simple("QmTest1".to_string(), LifecycleEventType::ContentAdded);
527        let event2 =
528            ContentEvent::simple("QmTest2".to_string(), LifecycleEventType::ContentAccessed);
529
530        manager.emit(event1).await;
531        manager.emit(event2).await;
532
533        let history = manager.get_history(None);
534        assert_eq!(history.len(), 2);
535    }
536
537    #[tokio::test]
538    async fn test_filtered_history() {
539        let manager = LifecycleEventManager::new();
540
541        manager
542            .emit(ContentEvent::simple(
543                "Qm1".to_string(),
544                LifecycleEventType::ContentAdded,
545            ))
546            .await;
547        manager
548            .emit(ContentEvent::simple(
549                "Qm2".to_string(),
550                LifecycleEventType::ContentAccessed,
551            ))
552            .await;
553        manager
554            .emit(ContentEvent::simple(
555                "Qm3".to_string(),
556                LifecycleEventType::ContentAdded,
557            ))
558            .await;
559
560        let history = manager.get_history(Some(LifecycleEventType::ContentAdded));
561        assert_eq!(history.len(), 2);
562    }
563
564    #[tokio::test]
565    async fn test_recent_events() {
566        let manager = LifecycleEventManager::new();
567
568        for i in 0..5 {
569            manager
570                .emit(ContentEvent::simple(
571                    format!("Qm{}", i),
572                    LifecycleEventType::ContentAdded,
573                ))
574                .await;
575        }
576
577        let recent = manager.get_recent(3);
578        assert_eq!(recent.len(), 3);
579    }
580
581    #[tokio::test]
582    async fn test_event_statistics() {
583        let manager = LifecycleEventManager::new();
584
585        manager
586            .emit(ContentEvent::simple(
587                "Qm1".to_string(),
588                LifecycleEventType::ContentAdded,
589            ))
590            .await;
591        manager
592            .emit(ContentEvent::simple(
593                "Qm2".to_string(),
594                LifecycleEventType::ContentAdded,
595            ))
596            .await;
597        manager
598            .emit(ContentEvent::simple(
599                "Qm3".to_string(),
600                LifecycleEventType::ContentAccessed,
601            ))
602            .await;
603
604        assert_eq!(manager.get_event_count(LifecycleEventType::ContentAdded), 2);
605        assert_eq!(
606            manager.get_event_count(LifecycleEventType::ContentAccessed),
607            1
608        );
609        assert_eq!(manager.get_total_event_count(), 3);
610    }
611
612    #[tokio::test]
613    async fn test_history_size_limit() {
614        let manager = LifecycleEventManager::with_history_size(5);
615
616        for i in 0..10 {
617            manager
618                .emit(ContentEvent::simple(
619                    format!("Qm{}", i),
620                    LifecycleEventType::ContentAdded,
621                ))
622                .await;
623        }
624
625        let history = manager.get_history(None);
626        assert_eq!(history.len(), 5);
627    }
628
629    #[tokio::test]
630    async fn test_clear_history() {
631        let mut manager = LifecycleEventManager::new();
632
633        manager
634            .emit(ContentEvent::simple(
635                "Qm1".to_string(),
636                LifecycleEventType::ContentAdded,
637            ))
638            .await;
639        manager
640            .emit(ContentEvent::simple(
641                "Qm2".to_string(),
642                LifecycleEventType::ContentAccessed,
643            ))
644            .await;
645
646        assert_eq!(manager.get_history(None).len(), 2);
647
648        manager.clear_history();
649        assert_eq!(manager.get_history(None).len(), 0);
650    }
651
652    #[tokio::test]
653    async fn test_reset_stats() {
654        let mut manager = LifecycleEventManager::new();
655
656        manager
657            .emit(ContentEvent::simple(
658                "Qm1".to_string(),
659                LifecycleEventType::ContentAdded,
660            ))
661            .await;
662        assert_eq!(manager.get_total_event_count(), 1);
663
664        manager.reset_stats();
665        assert_eq!(manager.get_total_event_count(), 0);
666    }
667
668    #[tokio::test]
669    async fn test_webhook_config() {
670        let webhook = WebhookConfig::new("https://example.com/webhook".to_string())
671            .for_events(vec![
672                LifecycleEventType::ContentAdded,
673                LifecycleEventType::ContentRemoved,
674            ])
675            .with_auth("Bearer token123".to_string());
676
677        assert_eq!(webhook.url, "https://example.com/webhook");
678        assert_eq!(webhook.events.len(), 2);
679        assert_eq!(webhook.auth_header, Some("Bearer token123".to_string()));
680    }
681
682    #[tokio::test]
683    async fn test_webhook_registration() {
684        let mut manager = LifecycleEventManager::new();
685        let webhook = WebhookConfig::new("https://example.com/webhook".to_string());
686
687        manager.register_webhook(webhook);
688
689        // Emit event (webhook will be logged in debug mode)
690        manager
691            .emit(ContentEvent::simple(
692                "Qm1".to_string(),
693                LifecycleEventType::ContentAdded,
694            ))
695            .await;
696    }
697
698    #[tokio::test]
699    async fn test_clear_handlers() {
700        let mut manager = LifecycleEventManager::new();
701        let counter = Arc::new(AtomicU32::new(0));
702        let counter_clone = counter.clone();
703
704        manager.on(LifecycleEventType::ContentAdded, move |_event| {
705            counter_clone.fetch_add(1, Ordering::SeqCst);
706        });
707
708        manager
709            .emit(ContentEvent::simple(
710                "Qm1".to_string(),
711                LifecycleEventType::ContentAdded,
712            ))
713            .await;
714        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
715        assert_eq!(counter.load(Ordering::SeqCst), 1);
716
717        manager.clear_handlers(LifecycleEventType::ContentAdded);
718        manager
719            .emit(ContentEvent::simple(
720                "Qm2".to_string(),
721                LifecycleEventType::ContentAdded,
722            ))
723            .await;
724        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
725        assert_eq!(counter.load(Ordering::SeqCst), 1); // Should still be 1
726    }
727}