use crate::http_pool::{HttpClientPool, HttpConfig};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize)]
pub enum LifecycleEventType {
ContentAdded,
ContentAccessed,
ContentRemoved,
ContentPinned,
ContentUnpinned,
ChunkTransferred,
ProofGenerated,
QuotaExceeded,
VerificationFailed,
PeerConnected,
PeerDisconnected,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ContentEvent {
pub cid: String,
pub event_type: LifecycleEventType,
pub size_bytes: Option<u64>,
pub peer_id: Option<String>,
pub metadata: Option<HashMap<String, String>>,
}
impl ContentEvent {
#[inline]
#[must_use]
pub fn simple(cid: String, event_type: LifecycleEventType) -> Self {
Self {
cid,
event_type,
size_bytes: None,
peer_id: None,
metadata: None,
}
}
#[inline]
#[must_use]
pub fn with_size(cid: String, event_type: LifecycleEventType, size_bytes: u64) -> Self {
Self {
cid,
event_type,
size_bytes: Some(size_bytes),
peer_id: None,
metadata: None,
}
}
#[inline]
#[must_use]
pub fn with_peer(cid: String, event_type: LifecycleEventType, peer_id: String) -> Self {
Self {
cid,
event_type,
size_bytes: None,
peer_id: Some(peer_id),
metadata: None,
}
}
#[inline]
#[must_use]
pub fn with_metadata(mut self, key: String, value: String) -> Self {
if self.metadata.is_none() {
self.metadata = Some(HashMap::new());
}
if let Some(ref mut metadata) = self.metadata {
metadata.insert(key, value);
}
self
}
}
pub type EventHandler = Arc<dyn Fn(&ContentEvent) + Send + Sync>;
#[derive(Debug, Clone)]
pub struct WebhookConfig {
pub url: String,
pub events: Vec<LifecycleEventType>,
pub auth_header: Option<String>,
pub max_retries: u32,
pub timeout_ms: u64,
}
impl WebhookConfig {
#[inline]
#[must_use]
pub fn new(url: String) -> Self {
Self {
url,
events: vec![],
auth_header: None,
max_retries: 3,
timeout_ms: 5000,
}
}
#[inline]
#[must_use]
pub fn for_events(mut self, events: Vec<LifecycleEventType>) -> Self {
self.events = events;
self
}
#[inline]
#[must_use]
pub fn with_auth(mut self, header: String) -> Self {
self.auth_header = Some(header);
self
}
}
#[derive(Debug, Clone)]
pub struct EventHistoryEntry {
pub event: ContentEvent,
pub timestamp_ms: u64,
}
pub struct LifecycleEventManager {
handlers: Arc<Mutex<HashMap<LifecycleEventType, Vec<EventHandler>>>>,
webhooks: Arc<Mutex<Vec<WebhookConfig>>>,
history: Arc<Mutex<VecDeque<EventHistoryEntry>>>,
max_history_size: usize,
stats: Arc<Mutex<HashMap<LifecycleEventType, u64>>>,
http_pool: Arc<HttpClientPool>,
}
use std::collections::VecDeque;
async fn send_webhook_request(
http_pool: &HttpClientPool,
webhook: &WebhookConfig,
event: &ContentEvent,
) -> Result<(), crate::http_pool::HttpError> {
let json_body = serde_json::to_value(event)
.map_err(|e| crate::http_pool::HttpError::Serialization(e.to_string()))?;
let request = http_pool.post_json(&webhook.url, json_body).await?;
if request.status().is_success() {
Ok(())
} else {
Err(crate::http_pool::HttpError::Response {
status: request.status(),
message: format!("Webhook failed with status {}", request.status()),
})
}
}
impl LifecycleEventManager {
#[must_use]
pub fn new() -> Self {
Self {
handlers: Arc::new(Mutex::new(HashMap::new())),
webhooks: Arc::new(Mutex::new(Vec::new())),
history: Arc::new(Mutex::new(VecDeque::new())),
max_history_size: 1000,
stats: Arc::new(Mutex::new(HashMap::new())),
http_pool: Arc::new(HttpClientPool::new(HttpConfig::default())),
}
}
#[must_use]
#[inline]
pub fn with_history_size(max_history_size: usize) -> Self {
Self {
handlers: Arc::new(Mutex::new(HashMap::new())),
webhooks: Arc::new(Mutex::new(Vec::new())),
history: Arc::new(Mutex::new(VecDeque::new())),
max_history_size,
stats: Arc::new(Mutex::new(HashMap::new())),
http_pool: Arc::new(HttpClientPool::new(HttpConfig::default())),
}
}
pub fn on<F>(&mut self, event_type: LifecycleEventType, handler: F)
where
F: Fn(&ContentEvent) + Send + Sync + 'static,
{
let mut handlers = self.handlers.lock().unwrap();
handlers
.entry(event_type)
.or_default()
.push(Arc::new(handler));
}
pub fn register_webhook(&mut self, config: WebhookConfig) {
let mut webhooks = self.webhooks.lock().unwrap();
webhooks.push(config);
}
pub async fn emit(&self, event: ContentEvent) {
{
let mut stats = self.stats.lock().unwrap();
*stats.entry(event.event_type).or_insert(0) += 1;
}
{
let mut history = self.history.lock().unwrap();
history.push_back(EventHistoryEntry {
event: event.clone(),
timestamp_ms: crate::utils::current_timestamp_ms() as u64,
});
while history.len() > self.max_history_size {
history.pop_front();
}
}
{
let handlers = self.handlers.lock().unwrap();
if let Some(handlers_list) = handlers.get(&event.event_type) {
for handler in handlers_list {
handler(&event);
}
}
}
self.trigger_webhooks(&event).await;
}
async fn trigger_webhooks(&self, event: &ContentEvent) {
let webhooks = self.webhooks.lock().unwrap().clone();
for webhook in webhooks {
if !webhook.events.is_empty() && !webhook.events.contains(&event.event_type) {
continue;
}
let http_pool = Arc::clone(&self.http_pool);
let event_clone = event.clone();
let webhook_clone = webhook.clone();
tokio::spawn(async move {
for attempt in 0..=webhook_clone.max_retries {
match send_webhook_request(&http_pool, &webhook_clone, &event_clone).await {
Ok(_) => {
break;
}
Err(e) => {
eprintln!(
"Webhook delivery failed (attempt {}/{}): {}",
attempt + 1,
webhook_clone.max_retries + 1,
e
);
if attempt < webhook_clone.max_retries {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
}
}
});
}
}
#[must_use]
#[inline]
pub fn get_history(&self, event_type: Option<LifecycleEventType>) -> Vec<EventHistoryEntry> {
let history = self.history.lock().unwrap();
match event_type {
Some(et) => history
.iter()
.filter(|entry| entry.event.event_type == et)
.cloned()
.collect(),
None => history.iter().cloned().collect(),
}
}
#[must_use]
#[inline]
pub fn get_recent(&self, count: usize) -> Vec<EventHistoryEntry> {
let history = self.history.lock().unwrap();
history.iter().rev().take(count).cloned().collect()
}
#[must_use]
#[inline]
pub fn get_event_count(&self, event_type: LifecycleEventType) -> u64 {
self.stats
.lock()
.unwrap()
.get(&event_type)
.copied()
.unwrap_or(0)
}
#[must_use]
#[inline]
pub fn get_total_event_count(&self) -> u64 {
self.stats.lock().unwrap().values().sum()
}
#[must_use]
#[inline]
pub fn get_stats(&self) -> HashMap<LifecycleEventType, u64> {
self.stats.lock().unwrap().clone()
}
pub fn clear_history(&mut self) {
self.history.lock().unwrap().clear();
}
pub fn reset_stats(&mut self) {
self.stats.lock().unwrap().clear();
}
pub fn clear_handlers(&mut self, event_type: LifecycleEventType) {
self.handlers.lock().unwrap().remove(&event_type);
}
pub fn clear_webhooks(&mut self) {
self.webhooks.lock().unwrap().clear();
}
}
impl Default for LifecycleEventManager {
#[inline]
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
#[tokio::test]
async fn test_event_creation() {
let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded);
assert_eq!(event.cid, "QmTest");
assert_eq!(event.event_type, LifecycleEventType::ContentAdded);
assert!(event.size_bytes.is_none());
}
#[tokio::test]
async fn test_event_with_size() {
let event =
ContentEvent::with_size("QmTest".to_string(), LifecycleEventType::ContentAdded, 1024);
assert_eq!(event.size_bytes, Some(1024));
}
#[tokio::test]
async fn test_event_with_peer() {
let event = ContentEvent::with_peer(
"QmTest".to_string(),
LifecycleEventType::ChunkTransferred,
"peer123".to_string(),
);
assert_eq!(event.peer_id, Some("peer123".to_string()));
}
#[tokio::test]
async fn test_event_with_metadata() {
let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded)
.with_metadata("key1".to_string(), "value1".to_string())
.with_metadata("key2".to_string(), "value2".to_string());
assert!(event.metadata.is_some());
let metadata = event.metadata.unwrap();
assert_eq!(metadata.get("key1"), Some(&"value1".to_string()));
assert_eq!(metadata.get("key2"), Some(&"value2".to_string()));
}
#[tokio::test]
async fn test_handler_registration() {
let mut manager = LifecycleEventManager::new();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
manager.on(LifecycleEventType::ContentAdded, move |_event| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded);
manager.emit(event).await;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_multiple_handlers() {
let mut manager = LifecycleEventManager::new();
let counter = Arc::new(AtomicU32::new(0));
let counter1 = counter.clone();
manager.on(LifecycleEventType::ContentAdded, move |_event| {
counter1.fetch_add(1, Ordering::SeqCst);
});
let counter2 = counter.clone();
manager.on(LifecycleEventType::ContentAdded, move |_event| {
counter2.fetch_add(1, Ordering::SeqCst);
});
let event = ContentEvent::simple("QmTest".to_string(), LifecycleEventType::ContentAdded);
manager.emit(event).await;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_event_history() {
let manager = LifecycleEventManager::new();
let event1 = ContentEvent::simple("QmTest1".to_string(), LifecycleEventType::ContentAdded);
let event2 =
ContentEvent::simple("QmTest2".to_string(), LifecycleEventType::ContentAccessed);
manager.emit(event1).await;
manager.emit(event2).await;
let history = manager.get_history(None);
assert_eq!(history.len(), 2);
}
#[tokio::test]
async fn test_filtered_history() {
let manager = LifecycleEventManager::new();
manager
.emit(ContentEvent::simple(
"Qm1".to_string(),
LifecycleEventType::ContentAdded,
))
.await;
manager
.emit(ContentEvent::simple(
"Qm2".to_string(),
LifecycleEventType::ContentAccessed,
))
.await;
manager
.emit(ContentEvent::simple(
"Qm3".to_string(),
LifecycleEventType::ContentAdded,
))
.await;
let history = manager.get_history(Some(LifecycleEventType::ContentAdded));
assert_eq!(history.len(), 2);
}
#[tokio::test]
async fn test_recent_events() {
let manager = LifecycleEventManager::new();
for i in 0..5 {
manager
.emit(ContentEvent::simple(
format!("Qm{}", i),
LifecycleEventType::ContentAdded,
))
.await;
}
let recent = manager.get_recent(3);
assert_eq!(recent.len(), 3);
}
#[tokio::test]
async fn test_event_statistics() {
let manager = LifecycleEventManager::new();
manager
.emit(ContentEvent::simple(
"Qm1".to_string(),
LifecycleEventType::ContentAdded,
))
.await;
manager
.emit(ContentEvent::simple(
"Qm2".to_string(),
LifecycleEventType::ContentAdded,
))
.await;
manager
.emit(ContentEvent::simple(
"Qm3".to_string(),
LifecycleEventType::ContentAccessed,
))
.await;
assert_eq!(manager.get_event_count(LifecycleEventType::ContentAdded), 2);
assert_eq!(
manager.get_event_count(LifecycleEventType::ContentAccessed),
1
);
assert_eq!(manager.get_total_event_count(), 3);
}
#[tokio::test]
async fn test_history_size_limit() {
let manager = LifecycleEventManager::with_history_size(5);
for i in 0..10 {
manager
.emit(ContentEvent::simple(
format!("Qm{}", i),
LifecycleEventType::ContentAdded,
))
.await;
}
let history = manager.get_history(None);
assert_eq!(history.len(), 5);
}
#[tokio::test]
async fn test_clear_history() {
let mut manager = LifecycleEventManager::new();
manager
.emit(ContentEvent::simple(
"Qm1".to_string(),
LifecycleEventType::ContentAdded,
))
.await;
manager
.emit(ContentEvent::simple(
"Qm2".to_string(),
LifecycleEventType::ContentAccessed,
))
.await;
assert_eq!(manager.get_history(None).len(), 2);
manager.clear_history();
assert_eq!(manager.get_history(None).len(), 0);
}
#[tokio::test]
async fn test_reset_stats() {
let mut manager = LifecycleEventManager::new();
manager
.emit(ContentEvent::simple(
"Qm1".to_string(),
LifecycleEventType::ContentAdded,
))
.await;
assert_eq!(manager.get_total_event_count(), 1);
manager.reset_stats();
assert_eq!(manager.get_total_event_count(), 0);
}
#[tokio::test]
async fn test_webhook_config() {
let webhook = WebhookConfig::new("https://example.com/webhook".to_string())
.for_events(vec![
LifecycleEventType::ContentAdded,
LifecycleEventType::ContentRemoved,
])
.with_auth("Bearer token123".to_string());
assert_eq!(webhook.url, "https://example.com/webhook");
assert_eq!(webhook.events.len(), 2);
assert_eq!(webhook.auth_header, Some("Bearer token123".to_string()));
}
#[tokio::test]
async fn test_webhook_registration() {
let mut manager = LifecycleEventManager::new();
let webhook = WebhookConfig::new("https://example.com/webhook".to_string());
manager.register_webhook(webhook);
manager
.emit(ContentEvent::simple(
"Qm1".to_string(),
LifecycleEventType::ContentAdded,
))
.await;
}
#[tokio::test]
async fn test_clear_handlers() {
let mut manager = LifecycleEventManager::new();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
manager.on(LifecycleEventType::ContentAdded, move |_event| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
manager
.emit(ContentEvent::simple(
"Qm1".to_string(),
LifecycleEventType::ContentAdded,
))
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
manager.clear_handlers(LifecycleEventType::ContentAdded);
manager
.emit(ContentEvent::simple(
"Qm2".to_string(),
LifecycleEventType::ContentAdded,
))
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert_eq!(counter.load(Ordering::SeqCst), 1); }
}