use crate::priority::JobPriority;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock, broadcast};
use uuid::Uuid;
mod uuid_string {
use serde::{Deserialize, Deserializer, Serializer};
use uuid::Uuid;
pub fn serialize<S>(uuid: &Uuid, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&uuid.to_string())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Uuid, D::Error>
where
D: Deserializer<'de>,
{
use serde::de::Error;
let s = String::deserialize(deserializer)?;
Uuid::parse_str(&s).map_err(D::Error::custom)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobLifecycleEvent {
#[serde(with = "uuid_string")]
pub event_id: Uuid,
#[serde(with = "uuid_string")]
pub job_id: Uuid,
pub queue_name: String,
pub event_type: JobLifecycleEventType,
pub priority: JobPriority,
pub timestamp: DateTime<Utc>,
pub processing_time_ms: Option<u64>,
pub error: Option<JobError>,
pub payload: Option<serde_json::Value>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum JobLifecycleEventType {
Enqueued,
Started,
Completed,
Failed,
Retried,
Dead,
TimedOut,
Cancelled,
Archived,
Restored,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobError {
pub message: String,
pub error_type: Option<String>,
pub details: Option<String>,
pub retry_attempt: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct EventFilter {
pub event_types: Vec<JobLifecycleEventType>,
pub queue_names: Vec<String>,
pub priorities: Vec<JobPriority>,
pub min_processing_time_ms: Option<u64>,
pub max_processing_time_ms: Option<u64>,
pub metadata_filters: HashMap<String, String>,
pub include_payload: bool,
}
impl EventFilter {
pub fn new() -> Self {
Self::default()
}
pub fn with_event_types(mut self, types: Vec<JobLifecycleEventType>) -> Self {
self.event_types = types;
self
}
pub fn with_queues(mut self, queues: Vec<String>) -> Self {
self.queue_names = queues;
self
}
pub fn with_queue_names(mut self, queue_names: Vec<String>) -> Self {
self.queue_names = queue_names;
self
}
pub fn with_priorities(mut self, priorities: Vec<JobPriority>) -> Self {
self.priorities = priorities;
self
}
pub fn with_processing_time_range(mut self, min_ms: Option<u64>, max_ms: Option<u64>) -> Self {
self.min_processing_time_ms = min_ms;
self.max_processing_time_ms = max_ms;
self
}
pub fn with_metadata_filter(mut self, key: String, value: String) -> Self {
self.metadata_filters.insert(key, value);
self
}
pub fn include_payload(mut self) -> Self {
self.include_payload = true;
self
}
pub fn matches(&self, event: &JobLifecycleEvent) -> bool {
if !self.event_types.is_empty() && !self.event_types.contains(&event.event_type) {
return false;
}
if !self.queue_names.is_empty() && !self.queue_names.contains(&event.queue_name) {
return false;
}
if !self.priorities.is_empty() && !self.priorities.contains(&event.priority) {
return false;
}
if let Some(processing_time) = event.processing_time_ms {
if let Some(min_time) = self.min_processing_time_ms {
if processing_time < min_time {
return false;
}
}
if let Some(max_time) = self.max_processing_time_ms {
if processing_time > max_time {
return false;
}
}
}
for (key, expected_value) in &self.metadata_filters {
if let Some(actual_value) = event.metadata.get(key) {
if actual_value != expected_value {
return false;
}
} else {
return false;
}
}
true
}
}
#[derive(Debug)]
pub struct EventSubscription {
pub id: Uuid,
pub filter: EventFilter,
pub receiver: broadcast::Receiver<JobLifecycleEvent>,
}
pub struct EventManager {
sender: broadcast::Sender<JobLifecycleEvent>,
subscriptions: Arc<RwLock<HashMap<Uuid, EventFilter>>>,
config: EventConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventConfig {
pub max_buffer_size: usize,
pub include_payload_default: bool,
pub max_payload_size_bytes: usize,
pub log_events: bool,
}
impl Default for EventConfig {
fn default() -> Self {
Self {
max_buffer_size: 10_000,
include_payload_default: false,
max_payload_size_bytes: 64 * 1024, log_events: false,
}
}
}
impl EventManager {
pub fn new(config: EventConfig) -> Self {
let (sender, _) = broadcast::channel(config.max_buffer_size);
Self {
sender,
subscriptions: Arc::new(RwLock::new(HashMap::new())),
config,
}
}
pub fn new_default() -> Self {
Self::new(EventConfig::default())
}
pub async fn publish_event(&self, mut event: JobLifecycleEvent) -> crate::Result<()> {
if !self.config.include_payload_default {
event.payload = None;
} else if let Some(ref payload) = event.payload {
let payload_size = serde_json::to_string(payload)?.len();
if payload_size > self.config.max_payload_size_bytes {
event.payload = None;
event.metadata.insert(
"payload_truncated".to_string(),
format!(
"Payload size {} exceeded limit {}",
payload_size, self.config.max_payload_size_bytes
),
);
}
}
if self.config.log_events {
tracing::debug!(
"Publishing job lifecycle event: {} for job {} in queue {}",
event.event_type,
event.job_id,
event.queue_name
);
}
match self.sender.send(event) {
Ok(subscriber_count) => {
if self.config.log_events && subscriber_count > 0 {
tracing::trace!("Event delivered to {} subscribers", subscriber_count);
}
Ok(())
}
Err(broadcast::error::SendError(_)) => {
Ok(())
}
}
}
pub async fn subscribe(&self, filter: EventFilter) -> crate::Result<EventSubscription> {
let subscription_id = Uuid::new_v4();
let receiver = self.sender.subscribe();
{
let mut subscriptions = self.subscriptions.write().await;
subscriptions.insert(subscription_id, filter.clone());
}
Ok(EventSubscription {
id: subscription_id,
filter,
receiver,
})
}
pub async fn unsubscribe(&self, subscription_id: Uuid) -> crate::Result<()> {
let mut subscriptions = self.subscriptions.write().await;
subscriptions.remove(&subscription_id);
Ok(())
}
pub async fn subscription_count(&self) -> usize {
let subscriptions = self.subscriptions.read().await;
subscriptions.len()
}
pub async fn get_stats(&self) -> EventManagerStats {
let subscriptions = self.subscriptions.read().await;
EventManagerStats {
active_subscriptions: subscriptions.len(),
buffer_capacity: self.config.max_buffer_size,
buffer_current_size: self.sender.len(),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EventManagerStats {
pub active_subscriptions: usize,
pub buffer_capacity: usize,
pub buffer_current_size: usize,
}
pub trait JobLifecycleEventBuilder {
fn enqueued(job_id: Uuid, queue_name: String, priority: JobPriority) -> JobLifecycleEvent;
fn started(job_id: Uuid, queue_name: String, priority: JobPriority) -> JobLifecycleEvent;
fn completed(
job_id: Uuid,
queue_name: String,
priority: JobPriority,
processing_time_ms: u64,
) -> JobLifecycleEvent;
fn failed(
job_id: Uuid,
queue_name: String,
priority: JobPriority,
error: JobError,
) -> JobLifecycleEvent;
fn dead(
job_id: Uuid,
queue_name: String,
priority: JobPriority,
final_error: JobError,
) -> JobLifecycleEvent;
fn timed_out(
job_id: Uuid,
queue_name: String,
priority: JobPriority,
timeout_duration_ms: u64,
) -> JobLifecycleEvent;
}
impl JobLifecycleEventBuilder for JobLifecycleEvent {
fn enqueued(job_id: Uuid, queue_name: String, priority: JobPriority) -> JobLifecycleEvent {
JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id,
queue_name,
event_type: JobLifecycleEventType::Enqueued,
priority,
timestamp: Utc::now(),
processing_time_ms: None,
error: None,
payload: None,
metadata: HashMap::new(),
}
}
fn started(job_id: Uuid, queue_name: String, priority: JobPriority) -> JobLifecycleEvent {
JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id,
queue_name,
event_type: JobLifecycleEventType::Started,
priority,
timestamp: Utc::now(),
processing_time_ms: None,
error: None,
payload: None,
metadata: HashMap::new(),
}
}
fn completed(
job_id: Uuid,
queue_name: String,
priority: JobPriority,
processing_time_ms: u64,
) -> JobLifecycleEvent {
JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id,
queue_name,
event_type: JobLifecycleEventType::Completed,
priority,
timestamp: Utc::now(),
processing_time_ms: Some(processing_time_ms),
error: None,
payload: None,
metadata: HashMap::new(),
}
}
fn failed(
job_id: Uuid,
queue_name: String,
priority: JobPriority,
error: JobError,
) -> JobLifecycleEvent {
JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id,
queue_name,
event_type: JobLifecycleEventType::Failed,
priority,
timestamp: Utc::now(),
processing_time_ms: None,
error: Some(error),
payload: None,
metadata: HashMap::new(),
}
}
fn dead(
job_id: Uuid,
queue_name: String,
priority: JobPriority,
final_error: JobError,
) -> JobLifecycleEvent {
JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id,
queue_name,
event_type: JobLifecycleEventType::Dead,
priority,
timestamp: Utc::now(),
processing_time_ms: None,
error: Some(final_error),
payload: None,
metadata: HashMap::new(),
}
}
fn timed_out(
job_id: Uuid,
queue_name: String,
priority: JobPriority,
timeout_duration_ms: u64,
) -> JobLifecycleEvent {
let mut metadata = HashMap::new();
metadata.insert(
"timeout_duration_ms".to_string(),
timeout_duration_ms.to_string(),
);
JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id,
queue_name,
event_type: JobLifecycleEventType::TimedOut,
priority,
timestamp: Utc::now(),
processing_time_ms: Some(timeout_duration_ms),
error: Some(JobError {
message: format!("Job timed out after {}ms", timeout_duration_ms),
error_type: Some("timeout".to_string()),
details: None,
retry_attempt: None,
}),
payload: None,
metadata,
}
}
}
impl std::fmt::Display for JobLifecycleEventType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
JobLifecycleEventType::Enqueued => write!(f, "enqueued"),
JobLifecycleEventType::Started => write!(f, "started"),
JobLifecycleEventType::Completed => write!(f, "completed"),
JobLifecycleEventType::Failed => write!(f, "failed"),
JobLifecycleEventType::Retried => write!(f, "retried"),
JobLifecycleEventType::Dead => write!(f, "dead"),
JobLifecycleEventType::TimedOut => write!(f, "timed_out"),
JobLifecycleEventType::Cancelled => write!(f, "cancelled"),
JobLifecycleEventType::Archived => write!(f, "archived"),
JobLifecycleEventType::Restored => write!(f, "restored"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tokio::time::timeout;
#[test]
fn test_event_filter_creation() {
let filter = EventFilter::new()
.with_event_types(vec![
JobLifecycleEventType::Completed,
JobLifecycleEventType::Failed,
])
.with_queues(vec!["email".to_string(), "notifications".to_string()])
.with_priorities(vec![JobPriority::High, JobPriority::Critical])
.with_processing_time_range(Some(1000), Some(5000))
.with_metadata_filter("source".to_string(), "api".to_string())
.include_payload();
assert_eq!(filter.event_types.len(), 2);
assert_eq!(filter.queue_names.len(), 2);
assert_eq!(filter.priorities.len(), 2);
assert_eq!(filter.min_processing_time_ms, Some(1000));
assert_eq!(filter.max_processing_time_ms, Some(5000));
assert!(filter.include_payload);
assert_eq!(
filter.metadata_filters.get("source"),
Some(&"api".to_string())
);
}
#[test]
fn test_event_filter_matching() {
let filter = EventFilter::new()
.with_event_types(vec![JobLifecycleEventType::Completed])
.with_queues(vec!["email".to_string()]);
let matching_event = JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id: Uuid::new_v4(),
queue_name: "email".to_string(),
event_type: JobLifecycleEventType::Completed,
priority: JobPriority::Normal,
timestamp: Utc::now(),
processing_time_ms: Some(1000),
error: None,
payload: None,
metadata: HashMap::new(),
};
let non_matching_event = JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id: Uuid::new_v4(),
queue_name: "notifications".to_string(),
event_type: JobLifecycleEventType::Failed,
priority: JobPriority::Normal,
timestamp: Utc::now(),
processing_time_ms: None,
error: None,
payload: None,
metadata: HashMap::new(),
};
assert!(filter.matches(&matching_event));
assert!(!filter.matches(&non_matching_event));
}
#[test]
fn test_job_lifecycle_event_builder() {
let job_id = Uuid::new_v4();
let queue_name = "test_queue".to_string();
let priority = JobPriority::High;
let enqueued = JobLifecycleEvent::enqueued(job_id, queue_name.clone(), priority);
assert_eq!(enqueued.event_type, JobLifecycleEventType::Enqueued);
assert_eq!(enqueued.job_id, job_id);
assert_eq!(enqueued.queue_name, queue_name);
assert_eq!(enqueued.priority, priority);
let completed = JobLifecycleEvent::completed(job_id, queue_name.clone(), priority, 1500);
assert_eq!(completed.event_type, JobLifecycleEventType::Completed);
assert_eq!(completed.processing_time_ms, Some(1500));
let error = JobError {
message: "Test error".to_string(),
error_type: Some("validation".to_string()),
details: None,
retry_attempt: Some(1),
};
let failed = JobLifecycleEvent::failed(job_id, queue_name.clone(), priority, error);
assert_eq!(failed.event_type, JobLifecycleEventType::Failed);
assert!(failed.error.is_some());
}
#[tokio::test]
async fn test_event_manager_publish_subscribe() {
let manager = EventManager::new_default();
let filter = EventFilter::new().with_event_types(vec![JobLifecycleEventType::Completed]);
let mut subscription = manager.subscribe(filter).await.unwrap();
let event = JobLifecycleEvent::completed(
Uuid::new_v4(),
"test_queue".to_string(),
JobPriority::Normal,
1000,
);
manager.publish_event(event.clone()).await.unwrap();
let received_event = timeout(Duration::from_millis(100), subscription.receiver.recv())
.await
.unwrap()
.unwrap();
assert_eq!(received_event.event_type, JobLifecycleEventType::Completed);
assert_eq!(received_event.job_id, event.job_id);
}
#[tokio::test]
async fn test_event_manager_filtered_subscription() {
let manager = EventManager::new_default();
let filter = EventFilter::new().with_event_types(vec![JobLifecycleEventType::Completed]);
let mut subscription = manager.subscribe(filter).await.unwrap();
let failed_event = JobLifecycleEvent::failed(
Uuid::new_v4(),
"test_queue".to_string(),
JobPriority::Normal,
JobError {
message: "Test error".to_string(),
error_type: None,
details: None,
retry_attempt: None,
},
);
manager.publish_event(failed_event).await.unwrap();
let completed_event = JobLifecycleEvent::completed(
Uuid::new_v4(),
"test_queue".to_string(),
JobPriority::Normal,
1000,
);
manager
.publish_event(completed_event.clone())
.await
.unwrap();
let mut received_event = None;
for _ in 0..10 {
match timeout(Duration::from_millis(100), subscription.receiver.recv()).await {
Ok(Ok(event)) => {
if subscription.filter.matches(&event) {
received_event = Some(event);
break;
}
}
_ => break,
}
}
let received_event = received_event.expect("Should have received a matching event");
assert_eq!(received_event.event_type, JobLifecycleEventType::Completed);
assert_eq!(received_event.job_id, completed_event.job_id);
assert!(
timeout(Duration::from_millis(50), subscription.receiver.recv())
.await
.is_err()
);
}
#[tokio::test]
async fn test_event_manager_stats() {
let manager = EventManager::new_default();
let initial_stats = manager.get_stats().await;
assert_eq!(initial_stats.active_subscriptions, 0);
let filter = EventFilter::new();
let _subscription = manager.subscribe(filter).await.unwrap();
let stats_with_subscription = manager.get_stats().await;
assert_eq!(stats_with_subscription.active_subscriptions, 1);
}
#[tokio::test]
async fn test_event_manager_unsubscribe() {
let manager = EventManager::new_default();
let filter = EventFilter::new();
let subscription = manager.subscribe(filter).await.unwrap();
let subscription_id = subscription.id;
assert_eq!(manager.subscription_count().await, 1);
manager.unsubscribe(subscription_id).await.unwrap();
assert_eq!(manager.subscription_count().await, 0);
}
#[test]
fn test_event_type_display() {
assert_eq!(JobLifecycleEventType::Enqueued.to_string(), "enqueued");
assert_eq!(JobLifecycleEventType::Started.to_string(), "started");
assert_eq!(JobLifecycleEventType::Completed.to_string(), "completed");
assert_eq!(JobLifecycleEventType::Failed.to_string(), "failed");
assert_eq!(JobLifecycleEventType::Dead.to_string(), "dead");
assert_eq!(JobLifecycleEventType::TimedOut.to_string(), "timed_out");
}
#[test]
fn test_processing_time_filter() {
let filter = EventFilter::new().with_processing_time_range(Some(1000), Some(3000));
let fast_event = JobLifecycleEvent::completed(
Uuid::new_v4(),
"test".to_string(),
JobPriority::Normal,
500,
);
let medium_event = JobLifecycleEvent::completed(
Uuid::new_v4(),
"test".to_string(),
JobPriority::Normal,
2000,
);
let slow_event = JobLifecycleEvent::completed(
Uuid::new_v4(),
"test".to_string(),
JobPriority::Normal,
5000,
);
assert!(!filter.matches(&fast_event));
assert!(filter.matches(&medium_event));
assert!(!filter.matches(&slow_event));
}
#[test]
fn test_metadata_filter() {
let filter = EventFilter::new()
.with_metadata_filter("source".to_string(), "api".to_string())
.with_metadata_filter("version".to_string(), "v1".to_string());
let mut matching_metadata = HashMap::new();
matching_metadata.insert("source".to_string(), "api".to_string());
matching_metadata.insert("version".to_string(), "v1".to_string());
let mut partial_metadata = HashMap::new();
partial_metadata.insert("source".to_string(), "api".to_string());
let matching_event = JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id: Uuid::new_v4(),
queue_name: "test".to_string(),
event_type: JobLifecycleEventType::Completed,
priority: JobPriority::Normal,
timestamp: Utc::now(),
processing_time_ms: None,
error: None,
payload: None,
metadata: matching_metadata,
};
let partial_event = JobLifecycleEvent {
event_id: Uuid::new_v4(),
job_id: Uuid::new_v4(),
queue_name: "test".to_string(),
event_type: JobLifecycleEventType::Completed,
priority: JobPriority::Normal,
timestamp: Utc::now(),
processing_time_ms: None,
error: None,
payload: None,
metadata: partial_metadata,
};
assert!(filter.matches(&matching_event));
assert!(!filter.matches(&partial_event));
}
#[tokio::test]
async fn test_concurrent_event_publishing() {
let manager = Arc::new(EventManager::new_default());
let filter = EventFilter::new();
let mut subscription = manager.subscribe(filter).await.unwrap();
let num_tasks = 10;
let events_per_task = 5;
let mut handles = Vec::new();
for task_id in 0..num_tasks {
let manager_clone = manager.clone();
let handle = tokio::spawn(async move {
for event_id in 0..events_per_task {
let mut event = JobLifecycleEvent::completed(
Uuid::new_v4(),
format!("queue_{}", task_id),
JobPriority::Normal,
1000,
);
event
.metadata
.insert("task_id".to_string(), task_id.to_string());
event
.metadata
.insert("event_id".to_string(), event_id.to_string());
manager_clone.publish_event(event).await.unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let mut received_events = Vec::new();
for _ in 0..(num_tasks * events_per_task) {
let event = timeout(Duration::from_millis(100), subscription.receiver.recv())
.await
.unwrap()
.unwrap();
received_events.push(event);
}
assert_eq!(received_events.len(), num_tasks * events_per_task);
let mut task_counts = std::collections::HashMap::new();
for event in received_events {
let task_id = event
.metadata
.get("task_id")
.unwrap()
.parse::<usize>()
.unwrap();
*task_counts.entry(task_id).or_insert(0) += 1;
}
assert_eq!(task_counts.len(), num_tasks);
for count in task_counts.values() {
assert_eq!(*count, events_per_task);
}
}
#[tokio::test]
async fn test_event_manager_buffer_overflow() {
let config = EventConfig {
max_buffer_size: 2, ..Default::default()
};
let manager = EventManager::new(config);
for _i in 0..5 {
let event = JobLifecycleEvent::completed(
Uuid::new_v4(),
"test".to_string(),
JobPriority::Normal,
1000,
);
manager.publish_event(event).await.unwrap();
}
let stats = manager.get_stats().await;
assert_eq!(stats.buffer_capacity, 2);
assert!(stats.buffer_current_size <= stats.buffer_capacity);
}
#[test]
fn test_job_error_serialization_roundtrip() {
let original_error = JobError {
message: "Database connection failed".to_string(),
error_type: Some("DatabaseError".to_string()),
details: Some(
serde_json::to_string(&serde_json::json!({
"host": "localhost",
"port": 5432,
"database": "hammerwork"
}))
.unwrap(),
),
retry_attempt: Some(3),
};
let serialized = serde_json::to_string(&original_error).unwrap();
let deserialized: JobError = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.message, original_error.message);
assert_eq!(deserialized.error_type, original_error.error_type);
assert_eq!(deserialized.retry_attempt, original_error.retry_attempt);
let details_json: serde_json::Value =
serde_json::from_str(deserialized.details.as_ref().unwrap()).unwrap();
assert_eq!(details_json["host"], "localhost");
}
#[test]
fn test_event_filter_empty_means_all() {
let empty_filter = EventFilter::new();
let events = vec![
JobLifecycleEvent::enqueued(Uuid::new_v4(), "queue1".to_string(), JobPriority::Low),
JobLifecycleEvent::started(Uuid::new_v4(), "queue2".to_string(), JobPriority::High),
JobLifecycleEvent::completed(
Uuid::new_v4(),
"queue3".to_string(),
JobPriority::Critical,
2000,
),
];
for event in events {
assert!(
empty_filter.matches(&event),
"Empty filter should match all events"
);
}
}
#[test]
fn test_event_config_serialization() {
let config = EventConfig {
max_buffer_size: 2000,
include_payload_default: true,
max_payload_size_bytes: 128 * 1024, log_events: true,
};
let json_str = serde_json::to_string(&config).unwrap();
let deserialized: EventConfig = serde_json::from_str(&json_str).unwrap();
assert_eq!(deserialized.max_buffer_size, 2000);
assert!(deserialized.include_payload_default);
assert_eq!(deserialized.max_payload_size_bytes, 128 * 1024);
assert!(deserialized.log_events);
}
}