use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::{AtomicU64, Ordering};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum EventType {
ObjectCreatedPut,
ObjectRemovedDelete,
ObjectRemovedDeleteMarker,
}
impl EventType {
#[must_use]
pub fn as_aws_str(&self) -> &'static str {
match self {
Self::ObjectCreatedPut => "s3:ObjectCreated:Put",
Self::ObjectRemovedDelete => "s3:ObjectRemoved:Delete",
Self::ObjectRemovedDeleteMarker => "s3:ObjectRemoved:DeleteMarkerCreated",
}
}
#[must_use]
pub fn from_aws_str(s: &str) -> Option<Self> {
match s {
"s3:ObjectCreated:Put" | "s3:ObjectCreated:*" => Some(Self::ObjectCreatedPut),
"s3:ObjectRemoved:Delete" => Some(Self::ObjectRemovedDelete),
"s3:ObjectRemoved:DeleteMarkerCreated" => Some(Self::ObjectRemovedDeleteMarker),
"s3:ObjectRemoved:*" => Some(Self::ObjectRemovedDelete),
_ => None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Destination {
Webhook { url: String },
Sqs { queue_arn: String },
Sns { topic_arn: String },
}
impl Destination {
#[must_use]
pub fn type_tag(&self) -> &'static str {
match self {
Self::Webhook { .. } => "webhook",
Self::Sqs { .. } => "sqs",
Self::Sns { .. } => "sns",
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct NotificationRule {
pub id: String,
pub events: Vec<EventType>,
pub destination: Destination,
pub filter_prefix: Option<String>,
pub filter_suffix: Option<String>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct NotificationConfig {
pub rules: Vec<NotificationRule>,
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct NotificationSnapshot {
by_bucket: HashMap<String, NotificationConfig>,
}
pub struct NotificationManager {
by_bucket: RwLock<HashMap<String, NotificationConfig>>,
pub dropped_total: AtomicU64,
}
impl Default for NotificationManager {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for NotificationManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NotificationManager")
.field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl NotificationManager {
#[must_use]
pub fn new() -> Self {
Self {
by_bucket: RwLock::new(HashMap::new()),
dropped_total: AtomicU64::new(0),
}
}
pub fn put(&self, bucket: &str, config: NotificationConfig) {
self.by_bucket
.write()
.expect("notification state RwLock poisoned")
.insert(bucket.to_owned(), config);
}
#[must_use]
pub fn get(&self, bucket: &str) -> Option<NotificationConfig> {
self.by_bucket
.read()
.expect("notification state RwLock poisoned")
.get(bucket)
.cloned()
}
pub fn delete(&self, bucket: &str) {
self.by_bucket
.write()
.expect("notification state RwLock poisoned")
.remove(bucket);
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
let snap = NotificationSnapshot {
by_bucket: self
.by_bucket
.read()
.expect("notification state RwLock poisoned")
.clone(),
};
serde_json::to_string(&snap)
}
pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
let snap: NotificationSnapshot = serde_json::from_str(s)?;
Ok(Self {
by_bucket: RwLock::new(snap.by_bucket),
dropped_total: AtomicU64::new(0),
})
}
#[must_use]
pub fn match_destinations(
&self,
bucket: &str,
event: &EventType,
key: &str,
) -> Vec<Destination> {
let map = self
.by_bucket
.read()
.expect("notification state RwLock poisoned");
let cfg = match map.get(bucket) {
Some(c) => c,
None => return Vec::new(),
};
cfg.rules
.iter()
.filter(|r| rule_matches(r, event, key))
.map(|r| r.destination.clone())
.collect()
}
}
fn rule_matches(rule: &NotificationRule, event: &EventType, key: &str) -> bool {
if !rule.events.iter().any(|e| e == event) {
return false;
}
if let Some(p) = rule.filter_prefix.as_deref()
&& !p.is_empty()
&& !key.starts_with(p)
{
return false;
}
if let Some(s) = rule.filter_suffix.as_deref()
&& !s.is_empty()
&& !key.ends_with(s)
{
return false;
}
true
}
#[must_use]
#[allow(clippy::too_many_arguments)]
pub fn build_event_json(
bucket: &str,
key: &str,
event: &EventType,
size: Option<u64>,
etag: Option<&str>,
version_id: Option<&str>,
request_id: &str,
now: chrono::DateTime<chrono::Utc>,
) -> String {
let etag_clean = etag.map(|e| e.trim_matches('"').to_owned());
let mut object = serde_json::json!({
"key": key,
"sequencer": format!("{:016x}", now.timestamp_micros() as u64),
});
if let Some(sz) = size {
object["size"] = serde_json::json!(sz);
}
if let Some(ref e) = etag_clean {
object["eTag"] = serde_json::json!(e);
}
if let Some(v) = version_id {
object["versionId"] = serde_json::json!(v);
}
let event_name = event.as_aws_str();
let event_source = match event {
EventType::ObjectCreatedPut => "ObjectCreated",
EventType::ObjectRemovedDelete | EventType::ObjectRemovedDeleteMarker => "ObjectRemoved",
};
let record = serde_json::json!({
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "us-east-1",
"eventTime": now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
"eventName": event_name,
"userIdentity": { "principalId": "S4" },
"requestParameters": { "sourceIPAddress": "0.0.0.0" },
"responseElements": {
"x-amz-request-id": request_id,
"x-amz-id-2": request_id,
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "S4-default",
"bucket": {
"name": bucket,
"ownerIdentity": { "principalId": "S4" },
"arn": format!("arn:aws:s3:::{bucket}"),
},
"object": object,
},
});
let _ = event_source; serde_json::json!({ "Records": [record] }).to_string()
}
const RETRY_ATTEMPTS: u32 = 3;
const RETRY_BASE_MS: u64 = 50;
#[allow(clippy::too_many_arguments)]
pub async fn dispatch_event(
manager: Arc<NotificationManager>,
bucket: String,
key: String,
event: EventType,
size: Option<u64>,
etag: Option<String>,
version_id: Option<String>,
request_id: String,
) {
let dests = manager.match_destinations(&bucket, &event, &key);
if dests.is_empty() {
return;
}
let now = chrono::Utc::now();
let body = build_event_json(
&bucket,
&key,
&event,
size,
etag.as_deref(),
version_id.as_deref(),
&request_id,
now,
);
for dest in dests {
let mgr = Arc::clone(&manager);
let body = body.clone();
tokio::spawn(async move {
send_one(mgr, dest, body).await;
});
}
}
async fn send_one(manager: Arc<NotificationManager>, dest: Destination, body: String) {
match dest {
Destination::Webhook { ref url } => {
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
{
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, "notifications: reqwest client build failed");
bump_drop(&manager, dest.type_tag());
return;
}
};
for attempt in 0..RETRY_ATTEMPTS {
let resp = client
.post(url)
.header("content-type", "application/json")
.body(body.clone())
.send()
.await;
match resp {
Ok(r) if r.status().is_success() => return,
Ok(r) if r.status().is_server_error() => {
if attempt + 1 < RETRY_ATTEMPTS {
let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
continue;
}
tracing::warn!(
url = %url,
status = %r.status(),
"notifications: webhook giving up after {RETRY_ATTEMPTS} attempts"
);
bump_drop(&manager, "webhook");
return;
}
Ok(r) => {
tracing::warn!(
url = %url,
status = %r.status(),
"notifications: webhook permanent failure, dropping"
);
return;
}
Err(e) => {
if attempt + 1 < RETRY_ATTEMPTS {
let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
continue;
}
tracing::warn!(
url = %url,
error = %e,
"notifications: webhook network failure, dropping after {RETRY_ATTEMPTS} attempts"
);
bump_drop(&manager, "webhook");
return;
}
}
}
}
Destination::Sqs { ref queue_arn } => {
#[cfg(feature = "aws-events")]
{
send_sqs(&manager, queue_arn, &body).await;
}
#[cfg(not(feature = "aws-events"))]
{
let _ = queue_arn;
let _ = body;
tracing::warn!(
"notifications: SQS destination configured but `aws-events` feature is off — dropping"
);
bump_drop(&manager, "sqs");
}
}
Destination::Sns { ref topic_arn } => {
#[cfg(feature = "aws-events")]
{
send_sns(&manager, topic_arn, &body).await;
}
#[cfg(not(feature = "aws-events"))]
{
let _ = topic_arn;
let _ = body;
tracing::warn!(
"notifications: SNS destination configured but `aws-events` feature is off — dropping"
);
bump_drop(&manager, "sns");
}
}
}
}
fn bump_drop(manager: &NotificationManager, dest_tag: &'static str) {
manager.dropped_total.fetch_add(1, Ordering::Relaxed);
crate::metrics::record_notification_drop(dest_tag);
}
#[cfg(feature = "aws-events")]
async fn send_sqs(manager: &NotificationManager, queue_arn: &str, body: &str) {
let conf = aws_config::load_from_env().await;
let client = aws_sdk_sqs::Client::new(&conf);
let res = client
.send_message()
.queue_url(queue_arn)
.message_body(body)
.send()
.await;
if let Err(e) = res {
tracing::warn!(arn = %queue_arn, error = ?e, "notifications: SQS send failed");
bump_drop(manager, "sqs");
}
}
#[cfg(feature = "aws-events")]
async fn send_sns(manager: &NotificationManager, topic_arn: &str, body: &str) {
let conf = aws_config::load_from_env().await;
let client = aws_sdk_sns::Client::new(&conf);
let res = client
.publish()
.topic_arn(topic_arn)
.message(body)
.send()
.await;
if let Err(e) = res {
tracing::warn!(arn = %topic_arn, error = ?e, "notifications: SNS publish failed");
bump_drop(manager, "sns");
}
}
#[cfg(test)]
mod tests {
use super::*;
fn rule(
id: &str,
events: &[EventType],
dest: Destination,
prefix: Option<&str>,
suffix: Option<&str>,
) -> NotificationRule {
NotificationRule {
id: id.to_owned(),
events: events.to_vec(),
destination: dest,
filter_prefix: prefix.map(str::to_owned),
filter_suffix: suffix.map(str::to_owned),
}
}
#[test]
fn match_destinations_single_rule_event_match() {
let mgr = NotificationManager::new();
mgr.put(
"b",
NotificationConfig {
rules: vec![rule(
"r1",
&[EventType::ObjectCreatedPut],
Destination::Webhook {
url: "http://hook".into(),
},
None,
None,
)],
},
);
let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "any/key.txt");
assert_eq!(dests.len(), 1, "single rule must fire on event match");
}
#[test]
fn match_destinations_prefix_filter() {
let mgr = NotificationManager::new();
mgr.put(
"b",
NotificationConfig {
rules: vec![rule(
"r1",
&[EventType::ObjectCreatedPut],
Destination::Webhook {
url: "http://hook".into(),
},
Some("uploads/"),
None,
)],
},
);
assert_eq!(
mgr.match_destinations("b", &EventType::ObjectCreatedPut, "uploads/file.bin")
.len(),
1
);
assert!(
mgr.match_destinations("b", &EventType::ObjectCreatedPut, "logs/file.bin")
.is_empty(),
"prefix filter must reject non-matching key"
);
}
#[test]
fn match_destinations_suffix_filter() {
let mgr = NotificationManager::new();
mgr.put(
"b",
NotificationConfig {
rules: vec![rule(
"r1",
&[EventType::ObjectCreatedPut],
Destination::Webhook {
url: "http://hook".into(),
},
None,
Some(".jpg"),
)],
},
);
assert_eq!(
mgr.match_destinations("b", &EventType::ObjectCreatedPut, "photo.jpg")
.len(),
1
);
assert!(
mgr.match_destinations("b", &EventType::ObjectCreatedPut, "doc.pdf")
.is_empty(),
"suffix filter must reject non-matching key"
);
}
#[test]
fn match_destinations_no_rule_for_bucket() {
let mgr = NotificationManager::new();
let dests = mgr.match_destinations("ghost", &EventType::ObjectCreatedPut, "k");
assert!(dests.is_empty(), "unknown bucket must yield empty vec");
}
#[test]
fn match_destinations_event_type_mismatch() {
let mgr = NotificationManager::new();
mgr.put(
"b",
NotificationConfig {
rules: vec![rule(
"r1",
&[EventType::ObjectCreatedPut],
Destination::Webhook {
url: "http://hook".into(),
},
None,
None,
)],
},
);
assert!(
mgr.match_destinations("b", &EventType::ObjectRemovedDelete, "k")
.is_empty(),
"mismatched event type must not fire"
);
}
#[test]
fn match_destinations_multiple_rules_fire_in_order() {
let mgr = NotificationManager::new();
mgr.put(
"b",
NotificationConfig {
rules: vec![
rule(
"first",
&[EventType::ObjectCreatedPut],
Destination::Webhook {
url: "http://first".into(),
},
None,
None,
),
rule(
"second",
&[EventType::ObjectCreatedPut],
Destination::Webhook {
url: "http://second".into(),
},
None,
None,
),
],
},
);
let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "k");
assert_eq!(dests.len(), 2, "both matching rules fire");
match (&dests[0], &dests[1]) {
(Destination::Webhook { url: u1 }, Destination::Webhook { url: u2 }) => {
assert_eq!(u1, "http://first");
assert_eq!(u2, "http://second");
}
_ => panic!("expected two webhooks in declaration order"),
}
}
#[test]
fn build_event_json_schema_matches_aws() {
let now =
chrono::DateTime::parse_from_rfc3339("2026-05-13T10:00:00Z")
.unwrap()
.with_timezone(&chrono::Utc);
let body = build_event_json(
"my-bucket",
"uploads/photo.jpg",
&EventType::ObjectCreatedPut,
Some(12345),
Some("\"deadbeef\""),
Some("v-001"),
"REQ-1",
now,
);
let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
let rec = &v["Records"][0];
assert_eq!(rec["eventName"], "s3:ObjectCreated:Put");
assert_eq!(rec["eventTime"], "2026-05-13T10:00:00.000Z");
assert_eq!(rec["s3"]["bucket"]["name"], "my-bucket");
assert_eq!(rec["s3"]["object"]["key"], "uploads/photo.jpg");
assert_eq!(rec["s3"]["object"]["size"], 12345);
assert_eq!(rec["s3"]["object"]["eTag"], "deadbeef");
assert_eq!(rec["s3"]["object"]["versionId"], "v-001");
}
#[test]
fn build_event_json_omits_optional_fields() {
let now = chrono::Utc::now();
let body = build_event_json(
"b",
"k",
&EventType::ObjectRemovedDeleteMarker,
None,
None,
None,
"r",
now,
);
let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
let obj = &v["Records"][0]["s3"]["object"];
assert!(obj.get("size").is_none());
assert!(obj.get("eTag").is_none());
assert!(obj.get("versionId").is_none());
}
#[test]
fn json_round_trip() {
let mgr = NotificationManager::new();
mgr.put(
"b",
NotificationConfig {
rules: vec![rule(
"r1",
&[
EventType::ObjectCreatedPut,
EventType::ObjectRemovedDelete,
],
Destination::Sqs {
queue_arn: "arn:aws:sqs:us-east-1:123:q".into(),
},
Some("u/"),
Some(".jpg"),
)],
},
);
let json = mgr.to_json().expect("to_json");
let mgr2 = NotificationManager::from_json(&json).expect("from_json");
assert_eq!(mgr.get("b"), mgr2.get("b"));
}
#[test]
fn delete_is_idempotent() {
let mgr = NotificationManager::new();
mgr.delete("never-existed");
mgr.put(
"b",
NotificationConfig {
rules: vec![rule(
"r1",
&[EventType::ObjectCreatedPut],
Destination::Webhook {
url: "http://h".into(),
},
None,
None,
)],
},
);
mgr.delete("b");
assert!(mgr.get("b").is_none());
}
#[test]
fn put_replaces_previous_config() {
let mgr = NotificationManager::new();
mgr.put(
"b",
NotificationConfig {
rules: vec![rule(
"old",
&[EventType::ObjectCreatedPut],
Destination::Webhook {
url: "http://old".into(),
},
None,
None,
)],
},
);
mgr.put(
"b",
NotificationConfig {
rules: vec![rule(
"new",
&[EventType::ObjectRemovedDelete],
Destination::Webhook {
url: "http://new".into(),
},
None,
None,
)],
},
);
let cfg = mgr.get("b").expect("config");
assert_eq!(cfg.rules.len(), 1);
assert_eq!(cfg.rules[0].id, "new");
}
#[tokio::test]
async fn dispatch_event_via_webhook_delivers_payload() {
use std::sync::Mutex;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("addr");
let received: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let received_cl = Arc::clone(&received);
tokio::spawn(async move {
if let Ok((mut sock, _)) = listener.accept().await {
let mut buf = vec![0u8; 16384];
let n = sock.read(&mut buf).await.unwrap_or(0);
let raw = String::from_utf8_lossy(&buf[..n]).to_string();
received_cl.lock().unwrap().push(raw);
let _ = sock
.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
.await;
}
});
let mgr = Arc::new(NotificationManager::new());
mgr.put(
"b",
NotificationConfig {
rules: vec![rule(
"r1",
&[EventType::ObjectCreatedPut],
Destination::Webhook {
url: format!("http://{addr}/hook"),
},
None,
None,
)],
},
);
dispatch_event(
Arc::clone(&mgr),
"b".into(),
"k.txt".into(),
EventType::ObjectCreatedPut,
Some(7),
Some("\"abc\"".into()),
None,
"req-1".into(),
)
.await;
for _ in 0..50 {
if !received.lock().unwrap().is_empty() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}
let raw = received.lock().unwrap().clone();
assert!(!raw.is_empty(), "webhook receiver got nothing");
let raw = &raw[0];
assert!(raw.contains("POST /hook"), "missing POST line");
assert!(raw.contains("s3:ObjectCreated:Put"), "missing event name");
assert!(raw.contains("\"k.txt\""), "missing key");
assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn dispatch_event_503_drops_after_retry_budget() {
use std::sync::Mutex;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("addr");
let attempt_count: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
let attempt_count_cl = Arc::clone(&attempt_count);
tokio::spawn(async move {
for _ in 0..RETRY_ATTEMPTS {
if let Ok((mut sock, _)) = listener.accept().await {
let mut buf = vec![0u8; 16384];
let _ = sock.read(&mut buf).await;
*attempt_count_cl.lock().unwrap() += 1;
let _ = sock
.write_all(b"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n")
.await;
}
}
});
let mgr = Arc::new(NotificationManager::new());
mgr.put(
"b",
NotificationConfig {
rules: vec![rule(
"r1",
&[EventType::ObjectCreatedPut],
Destination::Webhook {
url: format!("http://{addr}/sink"),
},
None,
None,
)],
},
);
dispatch_event(
Arc::clone(&mgr),
"b".into(),
"k".into(),
EventType::ObjectCreatedPut,
None,
None,
None,
"r".into(),
)
.await;
for _ in 0..100 {
if mgr.dropped_total.load(Ordering::Relaxed) > 0 {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}
assert_eq!(
mgr.dropped_total.load(Ordering::Relaxed),
1,
"drop counter must bump exactly once after retry budget exhausted"
);
}
}