use std::collections::{HashMap, VecDeque};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use crate::types::{JobId, JobState};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct JobEvent {
pub job_id: JobId,
pub job_name: String,
pub new_state: JobState,
pub occurred_at: u64,
pub message: Option<String>,
pub metadata: HashMap<String, String>,
}
impl JobEvent {
#[must_use]
pub fn new(job_id: JobId, job_name: impl Into<String>, new_state: JobState) -> Self {
let occurred_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs();
Self {
job_id,
job_name: job_name.into(),
new_state,
occurred_at,
message: None,
metadata: HashMap::new(),
}
}
#[must_use]
pub fn with_message(mut self, msg: impl Into<String>) -> Self {
self.message = Some(msg.into());
self
}
#[must_use]
pub fn with_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
#[must_use]
pub fn dedup_key(&self) -> String {
format!("{}:{}", self.job_id, state_tag(self.new_state))
}
}
fn state_tag(state: JobState) -> &'static str {
match state {
JobState::Queued => "queued",
JobState::Running => "running",
JobState::Completed => "completed",
JobState::Failed => "failed",
JobState::Cancelled => "cancelled",
JobState::Pending => "pending",
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum EventFilter {
All,
States(Vec<JobState>),
TerminalOnly,
FailureOnly,
}
impl EventFilter {
#[must_use]
pub fn matches(&self, event: &JobEvent) -> bool {
match self {
Self::All => true,
Self::States(states) => states.contains(&event.new_state),
Self::TerminalOnly => matches!(
event.new_state,
JobState::Completed | JobState::Failed | JobState::Cancelled
),
Self::FailureOnly => matches!(event.new_state, JobState::Failed),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum NotificationTarget {
Webhook(WebhookSpec),
Email(EmailSpec),
InProcess {
channel: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WebhookSpec {
pub url: String,
pub headers: HashMap<String, String>,
pub secret: Option<String>,
pub max_retries: u32,
pub timeout_ms: u64,
}
impl WebhookSpec {
#[must_use]
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
headers: HashMap::new(),
secret: None,
max_retries: 3,
timeout_ms: 5_000,
}
}
#[must_use]
pub fn with_bearer(mut self, token: impl Into<String>) -> Self {
self.headers
.insert("Authorization".into(), format!("Bearer {}", token.into()));
self
}
#[must_use]
pub fn with_secret(mut self, secret: impl Into<String>) -> Self {
self.secret = Some(secret.into());
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EmailSpec {
pub from: String,
pub to: Vec<String>,
pub cc: Vec<String>,
pub subject_template: String,
pub include_payload: bool,
}
impl EmailSpec {
#[must_use]
pub fn new(from: impl Into<String>, to: impl Into<String>) -> Self {
Self {
from: from.into(),
to: vec![to.into()],
cc: Vec::new(),
subject_template: "Batch job {job_name} → {state}".into(),
include_payload: false,
}
}
#[must_use]
pub fn render_subject(&self, event: &JobEvent) -> String {
self.subject_template
.replace("{job_name}", &event.job_name)
.replace("{state}", state_tag(event.new_state))
}
}
#[derive(Debug, Clone)]
struct DedupEntry {
key: String,
seen_at: u64,
}
#[derive(Debug)]
pub struct DedupWindow {
window_secs: u64,
seen: VecDeque<DedupEntry>,
}
impl DedupWindow {
#[must_use]
pub fn new(window_secs: u64) -> Self {
Self {
window_secs,
seen: VecDeque::new(),
}
}
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs()
}
fn evict(&mut self) {
let cutoff = Self::now_secs().saturating_sub(self.window_secs);
while let Some(front) = self.seen.front() {
if front.seen_at < cutoff {
self.seen.pop_front();
} else {
break;
}
}
}
pub fn check_and_record(&mut self, key: &str) -> bool {
self.evict();
let now = Self::now_secs();
let duplicate = self.seen.iter().any(|e| e.key == key);
if !duplicate {
self.seen.push_back(DedupEntry {
key: key.to_owned(),
seen_at: now,
});
}
duplicate
}
#[must_use]
pub fn len(&self) -> usize {
self.seen.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.seen.is_empty()
}
}
pub trait Dispatcher: Send + Sync {
fn dispatch(&self, event: &JobEvent, target: &NotificationTarget) -> Result<(), DispatchError>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DispatchError {
pub reason: String,
pub retryable: bool,
}
impl DispatchError {
#[must_use]
pub fn permanent(reason: impl Into<String>) -> Self {
Self {
reason: reason.into(),
retryable: false,
}
}
#[must_use]
pub fn transient(reason: impl Into<String>) -> Self {
Self {
reason: reason.into(),
retryable: true,
}
}
}
impl std::fmt::Display for DispatchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} ({})",
self.reason,
if self.retryable {
"retryable"
} else {
"permanent"
}
)
}
}
#[derive(Debug, Clone)]
pub struct Subscription {
pub id: SubscriptionId,
pub target: NotificationTarget,
pub filter: EventFilter,
pub active: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SubscriptionId(String);
impl SubscriptionId {
fn new() -> Self {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.subsec_nanos();
Self(format!("sub-{nanos:x}"))
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for SubscriptionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone)]
pub struct DeliveryRecord {
pub subscription_id: SubscriptionId,
pub event: JobEvent,
pub outcome: DeliveryOutcome,
pub attempted_at: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DeliveryOutcome {
Delivered,
Deduplicated,
FilteredOut,
Failed(String),
SubscriptionInactive,
}
#[derive(Debug, Clone)]
pub struct HubConfig {
pub dedup_window_secs: u64,
pub max_delivery_history: usize,
}
impl Default for HubConfig {
fn default() -> Self {
Self {
dedup_window_secs: 60,
max_delivery_history: 10_000,
}
}
}
struct HubState {
subscriptions: Vec<Subscription>,
dedup: DedupWindow,
delivery_history: VecDeque<DeliveryRecord>,
max_history: usize,
events_published: u64,
deliveries_ok: u64,
deliveries_failed: u64,
deduplicated: u64,
}
impl HubState {
fn new(config: &HubConfig) -> Self {
Self {
subscriptions: Vec::new(),
dedup: DedupWindow::new(config.dedup_window_secs),
delivery_history: VecDeque::new(),
max_history: config.max_delivery_history,
events_published: 0,
deliveries_ok: 0,
deliveries_failed: 0,
deduplicated: 0,
}
}
fn record(&mut self, rec: DeliveryRecord) {
if self.delivery_history.len() >= self.max_history {
self.delivery_history.pop_front();
}
self.delivery_history.push_back(rec);
}
}
pub struct NotificationHub {
state: Mutex<HubState>,
dispatcher: Option<Box<dyn Dispatcher>>,
}
impl NotificationHub {
#[must_use]
pub fn new(config: HubConfig, dispatcher: Option<Box<dyn Dispatcher>>) -> Self {
Self {
state: Mutex::new(HubState::new(&config)),
dispatcher,
}
}
pub fn subscribe(&self, target: NotificationTarget, filter: EventFilter) -> SubscriptionId {
let id = SubscriptionId::new();
let sub = Subscription {
id: id.clone(),
target,
filter,
active: true,
};
self.state.lock().subscriptions.push(sub);
id
}
pub fn pause(&self, id: &SubscriptionId) -> bool {
let mut guard = self.state.lock();
if let Some(sub) = guard.subscriptions.iter_mut().find(|s| &s.id == id) {
sub.active = false;
true
} else {
false
}
}
pub fn resume(&self, id: &SubscriptionId) -> bool {
let mut guard = self.state.lock();
if let Some(sub) = guard.subscriptions.iter_mut().find(|s| &s.id == id) {
sub.active = true;
true
} else {
false
}
}
pub fn unsubscribe(&self, id: &SubscriptionId) -> bool {
let mut guard = self.state.lock();
let before = guard.subscriptions.len();
guard.subscriptions.retain(|s| &s.id != id);
guard.subscriptions.len() < before
}
#[must_use]
pub fn subscriptions(&self) -> Vec<Subscription> {
self.state.lock().subscriptions.clone()
}
pub fn publish(&self, event: JobEvent) {
let mut guard = self.state.lock();
guard.events_published += 1;
let dedup_key = event.dedup_key();
if guard.dedup.check_and_record(&dedup_key) {
guard.deduplicated += 1;
let now = HubState::now_secs();
if let Some(sub) = guard.subscriptions.first() {
let rec = DeliveryRecord {
subscription_id: sub.id.clone(),
event: event.clone(),
outcome: DeliveryOutcome::Deduplicated,
attempted_at: now,
};
guard.record(rec);
}
return;
}
let subs: Vec<Subscription> = guard.subscriptions.clone();
let now = HubState::now_secs();
for sub in &subs {
if !sub.active {
let rec = DeliveryRecord {
subscription_id: sub.id.clone(),
event: event.clone(),
outcome: DeliveryOutcome::SubscriptionInactive,
attempted_at: now,
};
guard.record(rec);
continue;
}
if !sub.filter.matches(&event) {
let rec = DeliveryRecord {
subscription_id: sub.id.clone(),
event: event.clone(),
outcome: DeliveryOutcome::FilteredOut,
attempted_at: now,
};
guard.record(rec);
continue;
}
let outcome = if let Some(dispatcher) = &self.dispatcher {
match dispatcher.dispatch(&event, &sub.target) {
Ok(()) => {
guard.deliveries_ok += 1;
DeliveryOutcome::Delivered
}
Err(e) => {
guard.deliveries_failed += 1;
DeliveryOutcome::Failed(e.reason)
}
}
} else {
guard.deliveries_ok += 1;
DeliveryOutcome::Delivered
};
let rec = DeliveryRecord {
subscription_id: sub.id.clone(),
event: event.clone(),
outcome,
attempted_at: now,
};
guard.record(rec);
}
}
#[must_use]
pub fn delivery_history(&self, limit: usize) -> Vec<DeliveryRecord> {
let guard = self.state.lock();
let history = &guard.delivery_history;
let skip = history.len().saturating_sub(limit);
history.iter().skip(skip).cloned().collect()
}
#[must_use]
pub fn history_for_job(&self, job_id: &JobId) -> Vec<DeliveryRecord> {
let guard = self.state.lock();
guard
.delivery_history
.iter()
.filter(|r| &r.event.job_id == job_id)
.cloned()
.collect()
}
#[must_use]
pub fn stats(&self) -> HubStats {
let guard = self.state.lock();
HubStats {
events_published: guard.events_published,
deliveries_ok: guard.deliveries_ok,
deliveries_failed: guard.deliveries_failed,
deduplicated: guard.deduplicated,
active_subscriptions: guard.subscriptions.iter().filter(|s| s.active).count(),
total_subscriptions: guard.subscriptions.len(),
dedup_window_size: guard.dedup.len(),
}
}
}
impl HubState {
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs()
}
}
#[derive(Debug, Clone)]
pub struct HubStats {
pub events_published: u64,
pub deliveries_ok: u64,
pub deliveries_failed: u64,
pub deduplicated: u64,
pub active_subscriptions: usize,
pub total_subscriptions: usize,
pub dedup_window_size: usize,
}
pub struct NoOpDispatcher;
impl Dispatcher for NoOpDispatcher {
fn dispatch(
&self,
_event: &JobEvent,
_target: &NotificationTarget,
) -> Result<(), DispatchError> {
Ok(())
}
}
pub struct FailingDispatcher {
pub retryable: bool,
}
impl Dispatcher for FailingDispatcher {
fn dispatch(
&self,
_event: &JobEvent,
_target: &NotificationTarget,
) -> Result<(), DispatchError> {
Err(if self.retryable {
DispatchError::transient("simulated transient failure")
} else {
DispatchError::permanent("simulated permanent failure")
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{JobId, JobState};
fn make_event(state: JobState) -> JobEvent {
JobEvent::new(JobId::new(), "test-job", state)
}
#[test]
fn test_dedup_window_first_occurrence_is_not_duplicate() {
let mut w = DedupWindow::new(60);
assert!(!w.check_and_record("key-a"));
assert_eq!(w.len(), 1);
}
#[test]
fn test_dedup_window_second_occurrence_is_duplicate() {
let mut w = DedupWindow::new(60);
assert!(!w.check_and_record("key-a"));
assert!(w.check_and_record("key-a"));
}
#[test]
fn test_dedup_window_different_keys_not_duplicated() {
let mut w = DedupWindow::new(60);
assert!(!w.check_and_record("key-a"));
assert!(!w.check_and_record("key-b"));
assert_eq!(w.len(), 2);
}
#[test]
fn test_dedup_window_zero_window_never_deduplicates() {
let mut w = DedupWindow::new(0);
assert!(!w.check_and_record("key-a"));
let _ = w.check_and_record("key-a");
}
#[test]
fn test_filter_all_matches_every_state() {
let filter = EventFilter::All;
for state in [
JobState::Queued,
JobState::Running,
JobState::Completed,
JobState::Failed,
JobState::Cancelled,
] {
assert!(filter.matches(&make_event(state)));
}
}
#[test]
fn test_filter_terminal_only() {
let filter = EventFilter::TerminalOnly;
assert!(filter.matches(&make_event(JobState::Completed)));
assert!(filter.matches(&make_event(JobState::Failed)));
assert!(filter.matches(&make_event(JobState::Cancelled)));
assert!(!filter.matches(&make_event(JobState::Queued)));
assert!(!filter.matches(&make_event(JobState::Running)));
}
#[test]
fn test_filter_failure_only() {
let filter = EventFilter::FailureOnly;
assert!(filter.matches(&make_event(JobState::Failed)));
assert!(!filter.matches(&make_event(JobState::Completed)));
assert!(!filter.matches(&make_event(JobState::Cancelled)));
}
#[test]
fn test_filter_specific_states() {
let filter = EventFilter::States(vec![JobState::Running, JobState::Completed]);
assert!(filter.matches(&make_event(JobState::Running)));
assert!(filter.matches(&make_event(JobState::Completed)));
assert!(!filter.matches(&make_event(JobState::Failed)));
}
#[test]
fn test_hub_subscribe_and_stats() {
let hub = NotificationHub::new(HubConfig::default(), None);
let _id = hub.subscribe(
NotificationTarget::InProcess {
channel: "test".into(),
},
EventFilter::All,
);
let stats = hub.stats();
assert_eq!(stats.total_subscriptions, 1);
assert_eq!(stats.active_subscriptions, 1);
}
#[test]
fn test_hub_publish_increments_counter() {
let hub = NotificationHub::new(HubConfig::default(), Some(Box::new(NoOpDispatcher)));
hub.subscribe(
NotificationTarget::InProcess {
channel: "ch".into(),
},
EventFilter::All,
);
hub.publish(make_event(JobState::Completed));
assert_eq!(hub.stats().events_published, 1);
assert_eq!(hub.stats().deliveries_ok, 1);
}
#[test]
fn test_hub_deduplication_suppresses_repeat() {
let hub = NotificationHub::new(HubConfig::default(), Some(Box::new(NoOpDispatcher)));
hub.subscribe(
NotificationTarget::InProcess {
channel: "ch".into(),
},
EventFilter::All,
);
let job_id = JobId::new();
let ev1 = JobEvent::new(job_id.clone(), "job", JobState::Completed);
let ev2 = JobEvent::new(job_id, "job", JobState::Completed);
hub.publish(ev1);
hub.publish(ev2);
let stats = hub.stats();
assert_eq!(stats.events_published, 2);
assert_eq!(stats.deduplicated, 1);
}
#[test]
fn test_hub_filter_applied_per_subscription() {
let hub = NotificationHub::new(HubConfig::default(), Some(Box::new(NoOpDispatcher)));
hub.subscribe(
NotificationTarget::InProcess {
channel: "failure-only".into(),
},
EventFilter::FailureOnly,
);
hub.publish(make_event(JobState::Completed));
let stats = hub.stats();
assert_eq!(stats.deliveries_ok, 0);
hub.publish(make_event(JobState::Failed));
let stats = hub.stats();
assert_eq!(stats.deliveries_ok, 1);
}
#[test]
fn test_hub_failing_dispatcher_records_failure() {
let hub = NotificationHub::new(
HubConfig::default(),
Some(Box::new(FailingDispatcher { retryable: false })),
);
hub.subscribe(
NotificationTarget::InProcess {
channel: "ch".into(),
},
EventFilter::All,
);
hub.publish(make_event(JobState::Running));
let stats = hub.stats();
assert_eq!(stats.deliveries_failed, 1);
assert_eq!(stats.deliveries_ok, 0);
}
#[test]
fn test_hub_pause_and_resume() {
let hub = NotificationHub::new(HubConfig::default(), Some(Box::new(NoOpDispatcher)));
let id = hub.subscribe(
NotificationTarget::InProcess {
channel: "ch".into(),
},
EventFilter::All,
);
assert!(hub.pause(&id));
hub.publish(make_event(JobState::Running));
assert_eq!(hub.stats().deliveries_ok, 0);
assert!(hub.resume(&id));
hub.publish(make_event(JobState::Completed)); assert_eq!(hub.stats().deliveries_ok, 1);
}
#[test]
fn test_hub_unsubscribe() {
let hub = NotificationHub::new(HubConfig::default(), Some(Box::new(NoOpDispatcher)));
let id = hub.subscribe(
NotificationTarget::InProcess {
channel: "ch".into(),
},
EventFilter::All,
);
assert!(hub.unsubscribe(&id));
assert_eq!(hub.stats().total_subscriptions, 0);
assert!(!hub.unsubscribe(&id));
}
#[test]
fn test_hub_delivery_history() {
let hub = NotificationHub::new(HubConfig::default(), Some(Box::new(NoOpDispatcher)));
hub.subscribe(
NotificationTarget::InProcess {
channel: "ch".into(),
},
EventFilter::All,
);
hub.publish(make_event(JobState::Queued));
hub.publish(make_event(JobState::Running));
let history = hub.delivery_history(10);
assert_eq!(history.len(), 2);
}
#[test]
fn test_webhook_spec_builder() {
let spec = WebhookSpec::new("https://example.com/hook")
.with_bearer("tok123")
.with_secret("s3cr3t");
assert!(spec.headers.contains_key("Authorization"));
assert_eq!(spec.secret.as_deref(), Some("s3cr3t"));
assert_eq!(spec.max_retries, 3);
}
#[test]
fn test_email_spec_render_subject() {
let spec = EmailSpec::new("from@example.com", "to@example.com");
let event = JobEvent::new(JobId::new(), "encode-job", JobState::Completed);
let subject = spec.render_subject(&event);
assert!(subject.contains("encode-job"));
assert!(subject.contains("completed"));
}
#[test]
fn test_job_event_dedup_key_uniqueness() {
let id = JobId::new();
let e1 = JobEvent::new(id.clone(), "j", JobState::Completed);
let e2 = JobEvent::new(id.clone(), "j", JobState::Failed);
assert_ne!(e1.dedup_key(), e2.dedup_key());
}
#[test]
fn test_hub_history_for_job() {
let hub = NotificationHub::new(HubConfig::default(), Some(Box::new(NoOpDispatcher)));
let id = hub.subscribe(
NotificationTarget::InProcess {
channel: "ch".into(),
},
EventFilter::All,
);
let job_a = JobId::from("job-a");
let job_b = JobId::from("job-b");
hub.publish(JobEvent::new(job_a.clone(), "A", JobState::Running));
hub.publish(JobEvent::new(job_b.clone(), "B", JobState::Completed));
let history_a = hub.history_for_job(&job_a);
assert_eq!(history_a.len(), 1);
hub.unsubscribe(&id);
}
}