use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::net::IpAddr;
use std::sync::Mutex;
use uuid::Uuid;
use super::events::WebhookEvent;
use crate::error::Result;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ProcessingStatus {
Received,
Processing,
Processed,
Failed,
}
impl std::fmt::Display for ProcessingStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Received => write!(f, "received"),
Self::Processing => write!(f, "processing"),
Self::Processed => write!(f, "processed"),
Self::Failed => write!(f, "failed"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookLogEntry {
pub id: Uuid,
pub received_at: DateTime<Utc>,
pub source_ip: IpAddr,
pub event_type: String,
pub resource_type: String,
pub resource_id: String,
pub payload: serde_json::Value,
pub processing_status: ProcessingStatus,
pub error_message: Option<String>,
pub processed_at: Option<DateTime<Utc>>,
}
impl WebhookLogEntry {
pub fn from_event(event: &WebhookEvent) -> Self {
Self {
id: Uuid::new_v4(),
received_at: event.received_at,
source_ip: event.source_ip,
event_type: event.event_type.clone(),
resource_type: event.resource_type.clone(),
resource_id: event.resource_id.clone(),
payload: event.data.clone(),
processing_status: ProcessingStatus::Received,
error_message: None,
processed_at: None,
}
}
pub fn mark_processing(&mut self) {
self.processing_status = ProcessingStatus::Processing;
}
pub fn mark_processed(&mut self) {
self.processing_status = ProcessingStatus::Processed;
self.processed_at = Some(Utc::now());
}
pub fn mark_failed(&mut self, error: impl Into<String>) {
self.processing_status = ProcessingStatus::Failed;
self.error_message = Some(error.into());
self.processed_at = Some(Utc::now());
}
}
#[derive(Debug, Default, Clone)]
pub struct WebhookLogFilter {
pub event_type: Option<String>,
pub resource_id: Option<String>,
pub status: Option<ProcessingStatus>,
pub received_after: Option<DateTime<Utc>>,
pub received_before: Option<DateTime<Utc>>,
pub limit: Option<usize>,
}
impl WebhookLogFilter {
pub fn new() -> Self {
Self::default()
}
pub fn with_event_type(mut self, event_type: impl Into<String>) -> Self {
self.event_type = Some(event_type.into());
self
}
pub fn with_resource_id(mut self, resource_id: impl Into<String>) -> Self {
self.resource_id = Some(resource_id.into());
self
}
pub fn with_status(mut self, status: ProcessingStatus) -> Self {
self.status = Some(status);
self
}
pub fn with_received_range(
mut self,
after: Option<DateTime<Utc>>,
before: Option<DateTime<Utc>>,
) -> Self {
self.received_after = after;
self.received_before = before;
self
}
pub fn with_limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
}
#[async_trait]
pub trait WebhookLogger: Send + Sync {
async fn log_received(&self, event: &WebhookEvent) -> Result<Uuid>;
async fn update_status(&self, id: Uuid, status: ProcessingStatus, error: Option<String>)
-> Result<()>;
async fn query(&self, filter: WebhookLogFilter) -> Result<Vec<WebhookLogEntry>>;
async fn get(&self, id: Uuid) -> Result<Option<WebhookLogEntry>>;
}
pub struct InMemoryWebhookLogger {
entries: Mutex<VecDeque<WebhookLogEntry>>,
max_entries: usize,
}
impl InMemoryWebhookLogger {
pub fn new() -> Self {
Self::with_capacity(10000)
}
pub fn with_capacity(max_entries: usize) -> Self {
Self {
entries: Mutex::new(VecDeque::with_capacity(max_entries.min(1000))),
max_entries,
}
}
pub fn len(&self) -> usize {
self.entries.lock().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn clear(&self) {
self.entries.lock().unwrap().clear();
}
}
impl Default for InMemoryWebhookLogger {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl WebhookLogger for InMemoryWebhookLogger {
async fn log_received(&self, event: &WebhookEvent) -> Result<Uuid> {
let entry = WebhookLogEntry::from_event(event);
let id = entry.id;
let mut entries = self.entries.lock().unwrap();
while entries.len() >= self.max_entries {
entries.pop_front();
}
entries.push_back(entry);
Ok(id)
}
async fn update_status(
&self,
id: Uuid,
status: ProcessingStatus,
error: Option<String>,
) -> Result<()> {
let mut entries = self.entries.lock().unwrap();
if let Some(entry) = entries.iter_mut().find(|e| e.id == id) {
entry.processing_status = status;
entry.error_message = error;
if matches!(status, ProcessingStatus::Processed | ProcessingStatus::Failed) {
entry.processed_at = Some(Utc::now());
}
}
Ok(())
}
async fn query(&self, filter: WebhookLogFilter) -> Result<Vec<WebhookLogEntry>> {
let entries = self.entries.lock().unwrap();
let mut results: Vec<_> = entries
.iter()
.filter(|e| {
if let Some(ref event_type) = filter.event_type {
if &e.event_type != event_type {
return false;
}
}
if let Some(ref resource_id) = filter.resource_id {
if &e.resource_id != resource_id {
return false;
}
}
if let Some(status) = filter.status {
if e.processing_status != status {
return false;
}
}
if let Some(after) = filter.received_after {
if e.received_at < after {
return false;
}
}
if let Some(before) = filter.received_before {
if e.received_at > before {
return false;
}
}
true
})
.cloned()
.collect();
results.sort_by(|a, b| b.received_at.cmp(&a.received_at));
if let Some(limit) = filter.limit {
results.truncate(limit);
}
Ok(results)
}
async fn get(&self, id: Uuid) -> Result<Option<WebhookLogEntry>> {
let entries = self.entries.lock().unwrap();
Ok(entries.iter().find(|e| e.id == id).cloned())
}
}
pub struct StdoutWebhookLogger {
pub include_payload: bool,
}
impl StdoutWebhookLogger {
pub fn new() -> Self {
Self {
include_payload: false,
}
}
pub fn with_payload() -> Self {
Self {
include_payload: true,
}
}
}
impl Default for StdoutWebhookLogger {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl WebhookLogger for StdoutWebhookLogger {
async fn log_received(&self, event: &WebhookEvent) -> Result<Uuid> {
let id = Uuid::new_v4();
if self.include_payload {
println!(
"[{}] Webhook received: {} {} {} from {} - {:?}",
event.received_at,
event.event_type,
event.resource_type,
event.resource_id,
event.source_ip,
event.data
);
} else {
println!(
"[{}] Webhook received: {} {} {} from {}",
event.received_at,
event.event_type,
event.resource_type,
event.resource_id,
event.source_ip
);
}
Ok(id)
}
async fn update_status(
&self,
id: Uuid,
status: ProcessingStatus,
error: Option<String>,
) -> Result<()> {
if let Some(err) = error {
println!("[{}] Status update: {} -> {} (error: {})", Utc::now(), id, status, err);
} else {
println!("[{}] Status update: {} -> {}", Utc::now(), id, status);
}
Ok(())
}
async fn query(&self, _filter: WebhookLogFilter) -> Result<Vec<WebhookLogEntry>> {
Ok(Vec::new())
}
async fn get(&self, _id: Uuid) -> Result<Option<WebhookLogEntry>> {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;
fn create_test_event() -> WebhookEvent {
WebhookEvent::new(
"chargeback.created",
"chargebacks",
"t1_chb_123",
serde_json::json!({"id": "t1_chb_123"}),
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
)
}
#[tokio::test]
async fn test_in_memory_logger_basic() {
let logger = InMemoryWebhookLogger::new();
let event = create_test_event();
let id = logger.log_received(&event).await.unwrap();
assert_eq!(logger.len(), 1);
let entry = logger.get(id).await.unwrap().unwrap();
assert_eq!(entry.event_type, "chargeback.created");
assert_eq!(entry.processing_status, ProcessingStatus::Received);
}
#[tokio::test]
async fn test_in_memory_logger_update_status() {
let logger = InMemoryWebhookLogger::new();
let event = create_test_event();
let id = logger.log_received(&event).await.unwrap();
logger
.update_status(id, ProcessingStatus::Processed, None)
.await
.unwrap();
let entry = logger.get(id).await.unwrap().unwrap();
assert_eq!(entry.processing_status, ProcessingStatus::Processed);
assert!(entry.processed_at.is_some());
}
#[tokio::test]
async fn test_in_memory_logger_query() {
let logger = InMemoryWebhookLogger::new();
for i in 0..5 {
let event = WebhookEvent::new(
format!("event.{}", i),
"test",
format!("id_{}", i),
serde_json::json!({}),
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
);
logger.log_received(&event).await.unwrap();
}
let all = logger.query(WebhookLogFilter::new()).await.unwrap();
assert_eq!(all.len(), 5);
let filtered = logger
.query(WebhookLogFilter::new().with_event_type("event.2"))
.await
.unwrap();
assert_eq!(filtered.len(), 1);
let limited = logger
.query(WebhookLogFilter::new().with_limit(2))
.await
.unwrap();
assert_eq!(limited.len(), 2);
}
#[tokio::test]
async fn test_in_memory_logger_capacity() {
let logger = InMemoryWebhookLogger::with_capacity(3);
for i in 0..5 {
let event = WebhookEvent::new(
format!("event.{}", i),
"test",
format!("id_{}", i),
serde_json::json!({}),
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
);
logger.log_received(&event).await.unwrap();
}
assert_eq!(logger.len(), 3);
let entries = logger.query(WebhookLogFilter::new()).await.unwrap();
assert!(entries.iter().any(|e| e.event_type == "event.4"));
assert!(entries.iter().any(|e| e.event_type == "event.3"));
assert!(entries.iter().any(|e| e.event_type == "event.2"));
}
#[test]
fn test_log_entry_from_event() {
let event = create_test_event();
let entry = WebhookLogEntry::from_event(&event);
assert_eq!(entry.event_type, "chargeback.created");
assert_eq!(entry.resource_id, "t1_chb_123");
assert_eq!(entry.processing_status, ProcessingStatus::Received);
assert!(entry.error_message.is_none());
}
#[test]
fn test_log_entry_status_updates() {
let event = create_test_event();
let mut entry = WebhookLogEntry::from_event(&event);
entry.mark_processing();
assert_eq!(entry.processing_status, ProcessingStatus::Processing);
entry.mark_processed();
assert_eq!(entry.processing_status, ProcessingStatus::Processed);
assert!(entry.processed_at.is_some());
}
#[test]
fn test_log_entry_mark_failed() {
let event = create_test_event();
let mut entry = WebhookLogEntry::from_event(&event);
entry.mark_failed("Test error");
assert_eq!(entry.processing_status, ProcessingStatus::Failed);
assert_eq!(entry.error_message, Some("Test error".to_string()));
assert!(entry.processed_at.is_some());
}
#[test]
fn test_filter_builder() {
let filter = WebhookLogFilter::new()
.with_event_type("chargeback.created")
.with_status(ProcessingStatus::Failed)
.with_limit(10);
assert_eq!(filter.event_type, Some("chargeback.created".to_string()));
assert_eq!(filter.status, Some(ProcessingStatus::Failed));
assert_eq!(filter.limit, Some(10));
}
}