Skip to main content

s4_server/
notifications.rs

1//! S3 bucket notifications — fire events on PUT / DELETE (v0.6 #35).
2//!
3//! AWS S3 lets a bucket owner register notification destinations (SNS, SQS,
4//! Lambda, EventBridge) that receive a JSON event payload whenever objects
5//! change. S4-server implements a subset:
6//!
7//! - **Webhook** (HTTP POST of the event JSON) — always available, no extra
8//!   crate dependencies required.
9//! - **SQS** (`Sqs { queue_arn }`) — gated behind the `aws-events` cargo
10//!   feature so the default build doesn't pull `aws-sdk-sqs`.
11//! - **SNS** (`Sns { topic_arn }`) — gated behind the same `aws-events`
12//!   feature.
13//! - **Lambda direct invoke is NOT implemented** in v0.6 #35; the recommended
14//!   path is SNS → Lambda subscription, which works through this module's SNS
15//!   destination once the feature is on.
16//!
17//! ## responsibilities (v0.6 #35)
18//!
19//! - in-memory `bucket -> NotificationConfig` map with JSON snapshot
20//!   round-trip, mirroring `versioning.rs` / `cors.rs` / `inventory.rs` so
21//!   `--notifications-state-file` is a one-line addition in `main.rs`.
22//! - `match_destinations(bucket, event, key)` walks the rule list in
23//!   declaration order and returns every destination whose event types and
24//!   prefix/suffix filter accept the (event, key) tuple. AWS allows multiple
25//!   rules to fire on a single event so the result is a `Vec`, not an
26//!   `Option`.
27//! - `build_event_json` serialises the AWS-canonical
28//!   [event payload schema](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html)
29//!   so existing AWS SDK consumers can deserialise the body without bespoke
30//!   parsing.
31//! - `dispatch_event` is the fire-and-forget runtime: spawned by the
32//!   `service.rs` PUT / DELETE handlers, it POSTs the event JSON to every
33//!   matched destination on a tokio task, retries 5xx with exponential
34//!   backoff (3 attempts), then drops + bumps the `dropped_total` counter +
35//!   logs at warn so operators see the silent loss in metrics.
36//!
37//! ## scope limitations
38//!
39//! - in-memory only (no replication across multi-instance deployments).
40//!   `--notifications-state-file <PATH>` provides restart recovery via JSON
41//!   snapshot, same shape as `--versioning-state-file`.
42//! - retry budget is fixed at 3 attempts with exponential backoff (50ms /
43//!   100ms / 200ms). Beyond that the event is dropped and `dropped_total`
44//!   is bumped — there's no on-disk dead-letter queue.
45//! - SNS/SQS use the AWS SDK's default credential chain (environment, EC2
46//!   role, etc); per-destination credential overrides are out of scope.
47//! - Lambda direct invocation is not implemented (use SNS subscription).
48//! - `EventBridge` integration is not implemented.
49
50use std::collections::HashMap;
51use std::sync::Arc;
52use std::sync::RwLock;
53use std::sync::atomic::{AtomicU64, Ordering};
54
55use serde::{Deserialize, Serialize};
56
57/// Subset of the AWS S3 event-type taxonomy. We intentionally stop short of
58/// the full ~30-event matrix because v0.6 #35 only fires PUT and DELETE
59/// hooks; more events can be added when the corresponding handlers grow
60/// notification fire-points.
61#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
62pub enum EventType {
63    /// `s3:ObjectCreated:Put`
64    ObjectCreatedPut,
65    /// `s3:ObjectRemoved:Delete` — hard delete on a non-versioned bucket, or
66    /// a specific-version DELETE on any bucket.
67    ObjectRemovedDelete,
68    /// `s3:ObjectRemoved:DeleteMarkerCreated` — versioned-bucket DELETE that
69    /// pushes a delete marker without removing prior version bytes.
70    ObjectRemovedDeleteMarker,
71}
72
73impl EventType {
74    /// AWS wire-string form. Matches the values an SDK consumer would see in
75    /// the `eventName` field of the event JSON.
76    #[must_use]
77    pub fn as_aws_str(&self) -> &'static str {
78        match self {
79            Self::ObjectCreatedPut => "s3:ObjectCreated:Put",
80            Self::ObjectRemovedDelete => "s3:ObjectRemoved:Delete",
81            Self::ObjectRemovedDeleteMarker => "s3:ObjectRemoved:DeleteMarkerCreated",
82        }
83    }
84
85    /// Parse the AWS wire form. Tolerates the AWS catch-all
86    /// `s3:ObjectCreated:*` and `s3:ObjectRemoved:*` patterns by mapping them
87    /// to the most common concrete variant in each family — matching what
88    /// AWS does when expanding the wildcard against an actual event.
89    #[must_use]
90    pub fn from_aws_str(s: &str) -> Option<Self> {
91        match s {
92            "s3:ObjectCreated:Put" | "s3:ObjectCreated:*" => Some(Self::ObjectCreatedPut),
93            "s3:ObjectRemoved:Delete" => Some(Self::ObjectRemovedDelete),
94            "s3:ObjectRemoved:DeleteMarkerCreated" => Some(Self::ObjectRemovedDeleteMarker),
95            "s3:ObjectRemoved:*" => Some(Self::ObjectRemovedDelete),
96            _ => None,
97        }
98    }
99}
100
101/// One destination for a fired event. The variant determines the dispatch
102/// path: `Webhook` is always built; `Sqs` / `Sns` are accepted at config
103/// time regardless of the build feature, but the runtime dispatcher will
104/// log + drop them when the `aws-events` feature isn't compiled in.
105#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
106pub enum Destination {
107    /// HTTP POST to the URL with the event JSON as body. Always available
108    /// (no extra cargo features required).
109    Webhook { url: String },
110    /// AWS SQS queue ARN. Active only with the `aws-events` cargo feature.
111    Sqs { queue_arn: String },
112    /// AWS SNS topic ARN. Active only with the `aws-events` cargo feature.
113    Sns { topic_arn: String },
114}
115
116impl Destination {
117    /// Short tag used as a metric label so dashboards can split drops by
118    /// destination type without leaking ARNs / URLs into Prometheus.
119    #[must_use]
120    pub fn type_tag(&self) -> &'static str {
121        match self {
122            Self::Webhook { .. } => "webhook",
123            Self::Sqs { .. } => "sqs",
124            Self::Sns { .. } => "sns",
125        }
126    }
127}
128
129/// One notification rule. Multiple rules can be registered per bucket; each
130/// rule independently chooses whether to fire on a given event by checking
131/// the event type against `events` and the object key against the
132/// `filter_prefix` / `filter_suffix` pair (both optional; both apply
133/// simultaneously when set).
134#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
135pub struct NotificationRule {
136    /// Operator-supplied id (the AWS S3 PUT API requires it; if the client
137    /// omits one, callers can synthesise `format!("rule-{i}")`).
138    pub id: String,
139    /// Event types this rule listens for. Empty means "never fire" — the
140    /// rule won't match anything.
141    pub events: Vec<EventType>,
142    /// Where to send the event when the rule matches.
143    pub destination: Destination,
144    /// AWS S3 `Filter.Key.Rules[Name=prefix].Value`. When `None`, no prefix
145    /// filter applies. Empty string is treated as "match anything", same as
146    /// `None`.
147    pub filter_prefix: Option<String>,
148    /// AWS S3 `Filter.Key.Rules[Name=suffix].Value`. Same semantics as
149    /// `filter_prefix`.
150    pub filter_suffix: Option<String>,
151}
152
153/// Per-bucket notification configuration (ordered list of rules).
154#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
155pub struct NotificationConfig {
156    pub rules: Vec<NotificationRule>,
157}
158
159/// JSON snapshot — `bucket -> NotificationConfig`. Mirrors the shape of
160/// `cors.rs` / `inventory.rs`'s `to_json` / `from_json` so the operator can
161/// hand-edit configurations across restart cycles.
162#[derive(Debug, Default, Serialize, Deserialize)]
163struct NotificationSnapshot {
164    by_bucket: HashMap<String, NotificationConfig>,
165}
166
167/// In-memory manager of per-bucket notification configurations.
168///
169/// The `dropped_total` counter is exposed publicly for the metrics layer to
170/// poll without taking the configuration lock.
171pub struct NotificationManager {
172    by_bucket: RwLock<HashMap<String, NotificationConfig>>,
173    /// Bumped by `dispatch_event` whenever a destination returns 5xx after
174    /// the configured retry budget, or when an `aws-events`-gated
175    /// destination fires without the feature compiled in.
176    pub dropped_total: AtomicU64,
177}
178
179impl Default for NotificationManager {
180    fn default() -> Self {
181        Self::new()
182    }
183}
184
185impl std::fmt::Debug for NotificationManager {
186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187        f.debug_struct("NotificationManager")
188            .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
189            .finish_non_exhaustive()
190    }
191}
192
193impl NotificationManager {
194    /// Empty manager — no bucket has any configuration registered.
195    #[must_use]
196    pub fn new() -> Self {
197        Self {
198            by_bucket: RwLock::new(HashMap::new()),
199            dropped_total: AtomicU64::new(0),
200        }
201    }
202
203    /// `put_bucket_notification_configuration` handler entry. The bucket's
204    /// existing configuration is fully replaced (S3 spec — PutBucket... is
205    /// upsert-style at the bucket scope, not per-rule patch).
206    pub fn put(&self, bucket: &str, config: NotificationConfig) {
207        self.by_bucket
208            .write()
209            .expect("notification state RwLock poisoned")
210            .insert(bucket.to_owned(), config);
211    }
212
213    /// `get_bucket_notification_configuration` handler entry. Returns the
214    /// cloned configuration, or `None` when nothing is registered. AWS S3
215    /// returns an empty configuration document (not 404) in that case; the
216    /// service-layer handler maps `None` → empty DTO accordingly.
217    #[must_use]
218    pub fn get(&self, bucket: &str) -> Option<NotificationConfig> {
219        self.by_bucket
220            .read()
221            .expect("notification state RwLock poisoned")
222            .get(bucket)
223            .cloned()
224    }
225
226    /// Drop all rules for `bucket`. Idempotent.
227    pub fn delete(&self, bucket: &str) {
228        self.by_bucket
229            .write()
230            .expect("notification state RwLock poisoned")
231            .remove(bucket);
232    }
233
234    /// Serialise the entire manager state to JSON (for
235    /// `--notifications-state-file` snapshot dumps).
236    pub fn to_json(&self) -> Result<String, serde_json::Error> {
237        let snap = NotificationSnapshot {
238            by_bucket: self
239                .by_bucket
240                .read()
241                .expect("notification state RwLock poisoned")
242                .clone(),
243        };
244        serde_json::to_string(&snap)
245    }
246
247    /// Restore a manager from a previously-emitted snapshot. The
248    /// `dropped_total` counter is reset to 0 — historical drops are not
249    /// persisted (they're a runtime metric, not configuration).
250    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
251        let snap: NotificationSnapshot = serde_json::from_str(s)?;
252        Ok(Self {
253            by_bucket: RwLock::new(snap.by_bucket),
254            dropped_total: AtomicU64::new(0),
255        })
256    }
257
258    /// Match an event against the bucket's rules and return every
259    /// destination whose rule accepts the (event type, key) tuple. Order
260    /// follows the rule declaration order so a deterministic dispatch
261    /// sequence falls out for tests.
262    #[must_use]
263    pub fn match_destinations(
264        &self,
265        bucket: &str,
266        event: &EventType,
267        key: &str,
268    ) -> Vec<Destination> {
269        let map = self
270            .by_bucket
271            .read()
272            .expect("notification state RwLock poisoned");
273        let cfg = match map.get(bucket) {
274            Some(c) => c,
275            None => return Vec::new(),
276        };
277        cfg.rules
278            .iter()
279            .filter(|r| rule_matches(r, event, key))
280            .map(|r| r.destination.clone())
281            .collect()
282    }
283}
284
285fn rule_matches(rule: &NotificationRule, event: &EventType, key: &str) -> bool {
286    if !rule.events.iter().any(|e| e == event) {
287        return false;
288    }
289    if let Some(p) = rule.filter_prefix.as_deref()
290        && !p.is_empty()
291        && !key.starts_with(p)
292    {
293        return false;
294    }
295    if let Some(s) = rule.filter_suffix.as_deref()
296        && !s.is_empty()
297        && !key.ends_with(s)
298    {
299        return false;
300    }
301    true
302}
303
304/// Build the AWS S3 Event payload JSON for a single record. Schema
305/// matches:
306/// <https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html>
307///
308/// The returned string is one full event envelope (`{"Records":[...]}`),
309/// suitable as the body of an HTTP POST or the message body of an SQS /
310/// SNS publish.
311#[must_use]
312#[allow(clippy::too_many_arguments)]
313pub fn build_event_json(
314    bucket: &str,
315    key: &str,
316    event: &EventType,
317    size: Option<u64>,
318    etag: Option<&str>,
319    version_id: Option<&str>,
320    request_id: &str,
321    now: chrono::DateTime<chrono::Utc>,
322) -> String {
323    // Trim any surrounding `"` from the etag — AWS S3 stores ETags as
324    // quoted strings on the wire but the event payload uses the bare hex.
325    let etag_clean = etag.map(|e| e.trim_matches('"').to_owned());
326    let mut object = serde_json::json!({
327        "key": key,
328        "sequencer": format!("{:016x}", now.timestamp_micros() as u64),
329    });
330    if let Some(sz) = size {
331        object["size"] = serde_json::json!(sz);
332    }
333    if let Some(ref e) = etag_clean {
334        object["eTag"] = serde_json::json!(e);
335    }
336    if let Some(v) = version_id {
337        object["versionId"] = serde_json::json!(v);
338    }
339    let event_name = event.as_aws_str();
340    let event_source = match event {
341        EventType::ObjectCreatedPut => "ObjectCreated",
342        EventType::ObjectRemovedDelete | EventType::ObjectRemovedDeleteMarker => "ObjectRemoved",
343    };
344    let record = serde_json::json!({
345        "eventVersion": "2.1",
346        "eventSource": "aws:s3",
347        "awsRegion": "us-east-1",
348        "eventTime": now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
349        "eventName": event_name,
350        "userIdentity": { "principalId": "S4" },
351        "requestParameters": { "sourceIPAddress": "0.0.0.0" },
352        "responseElements": {
353            "x-amz-request-id": request_id,
354            "x-amz-id-2": request_id,
355        },
356        "s3": {
357            "s3SchemaVersion": "1.0",
358            "configurationId": "S4-default",
359            "bucket": {
360                "name": bucket,
361                "ownerIdentity": { "principalId": "S4" },
362                "arn": format!("arn:aws:s3:::{bucket}"),
363            },
364            "object": object,
365        },
366    });
367    let _ = event_source; // surfaced via eventName; left here for future
368    serde_json::json!({ "Records": [record] }).to_string()
369}
370
371const RETRY_ATTEMPTS: u32 = 3;
372const RETRY_BASE_MS: u64 = 50;
373
374/// Fire-and-forget event dispatch. Iterates every matched destination for
375/// the (bucket, key, event) triple and sends the event JSON in detached
376/// tokio tasks; this future itself awaits the spawn-and-send pipeline so
377/// callers can `tokio::spawn` it once and forget about the outcome.
378///
379/// Webhook destinations get retried 3 times with exponential backoff on 5xx
380/// responses; permanent 4xx responses are treated as "delivered" (the
381/// receiver explicitly rejected the payload — retrying won't help) and the
382/// drop counter is NOT bumped. SNS / SQS are best-effort; without the
383/// `aws-events` cargo feature the drop counter is bumped immediately.
384#[allow(clippy::too_many_arguments)]
385pub async fn dispatch_event(
386    manager: Arc<NotificationManager>,
387    bucket: String,
388    key: String,
389    event: EventType,
390    size: Option<u64>,
391    etag: Option<String>,
392    version_id: Option<String>,
393    request_id: String,
394) {
395    let dests = manager.match_destinations(&bucket, &event, &key);
396    if dests.is_empty() {
397        return;
398    }
399    let now = chrono::Utc::now();
400    let body = build_event_json(
401        &bucket,
402        &key,
403        &event,
404        size,
405        etag.as_deref(),
406        version_id.as_deref(),
407        &request_id,
408        now,
409    );
410    for dest in dests {
411        let mgr = Arc::clone(&manager);
412        let body = body.clone();
413        tokio::spawn(async move {
414            send_one(mgr, dest, body).await;
415        });
416    }
417}
418
419async fn send_one(manager: Arc<NotificationManager>, dest: Destination, body: String) {
420    match dest {
421        Destination::Webhook { ref url } => {
422            let client = match reqwest::Client::builder()
423                .timeout(std::time::Duration::from_secs(5))
424                .build()
425            {
426                Ok(c) => c,
427                Err(e) => {
428                    tracing::warn!(error = %e, "notifications: reqwest client build failed");
429                    bump_drop(&manager, dest.type_tag());
430                    return;
431                }
432            };
433            for attempt in 0..RETRY_ATTEMPTS {
434                let resp = client
435                    .post(url)
436                    .header("content-type", "application/json")
437                    .body(body.clone())
438                    .send()
439                    .await;
440                match resp {
441                    Ok(r) if r.status().is_success() => return,
442                    Ok(r) if r.status().is_server_error() => {
443                        // 5xx — retry with backoff.
444                        if attempt + 1 < RETRY_ATTEMPTS {
445                            let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
446                            tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
447                            continue;
448                        }
449                        tracing::warn!(
450                            url = %url,
451                            status = %r.status(),
452                            "notifications: webhook giving up after {RETRY_ATTEMPTS} attempts"
453                        );
454                        bump_drop(&manager, "webhook");
455                        return;
456                    }
457                    Ok(r) => {
458                        // 4xx / redirect — receiver rejected, no point retrying.
459                        tracing::warn!(
460                            url = %url,
461                            status = %r.status(),
462                            "notifications: webhook permanent failure, dropping"
463                        );
464                        return;
465                    }
466                    Err(e) => {
467                        // Network-level error — also retry-eligible.
468                        if attempt + 1 < RETRY_ATTEMPTS {
469                            let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
470                            tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
471                            continue;
472                        }
473                        tracing::warn!(
474                            url = %url,
475                            error = %e,
476                            "notifications: webhook network failure, dropping after {RETRY_ATTEMPTS} attempts"
477                        );
478                        bump_drop(&manager, "webhook");
479                        return;
480                    }
481                }
482            }
483        }
484        Destination::Sqs { ref queue_arn } => {
485            #[cfg(feature = "aws-events")]
486            {
487                send_sqs(&manager, queue_arn, &body).await;
488            }
489            #[cfg(not(feature = "aws-events"))]
490            {
491                let _ = queue_arn;
492                let _ = body;
493                tracing::warn!(
494                    "notifications: SQS destination configured but `aws-events` feature is off — dropping"
495                );
496                bump_drop(&manager, "sqs");
497            }
498        }
499        Destination::Sns { ref topic_arn } => {
500            #[cfg(feature = "aws-events")]
501            {
502                send_sns(&manager, topic_arn, &body).await;
503            }
504            #[cfg(not(feature = "aws-events"))]
505            {
506                let _ = topic_arn;
507                let _ = body;
508                tracing::warn!(
509                    "notifications: SNS destination configured but `aws-events` feature is off — dropping"
510                );
511                bump_drop(&manager, "sns");
512            }
513        }
514    }
515}
516
517fn bump_drop(manager: &NotificationManager, dest_tag: &'static str) {
518    manager.dropped_total.fetch_add(1, Ordering::Relaxed);
519    crate::metrics::record_notification_drop(dest_tag);
520}
521
522#[cfg(feature = "aws-events")]
523async fn send_sqs(manager: &NotificationManager, queue_arn: &str, body: &str) {
524    let conf = aws_config::load_from_env().await;
525    let client = aws_sdk_sqs::Client::new(&conf);
526    // ARN form: arn:aws:sqs:<region>:<account>:<queue-name>. SQS Send
527    // expects the queue URL, but we accept the ARN at config time and
528    // synthesise the URL via SDK introspection. As a pragmatic shortcut,
529    // operators can configure the URL directly inside the ARN field — the
530    // SDK accepts both.
531    let res = client
532        .send_message()
533        .queue_url(queue_arn)
534        .message_body(body)
535        .send()
536        .await;
537    if let Err(e) = res {
538        tracing::warn!(arn = %queue_arn, error = ?e, "notifications: SQS send failed");
539        bump_drop(manager, "sqs");
540    }
541}
542
543#[cfg(feature = "aws-events")]
544async fn send_sns(manager: &NotificationManager, topic_arn: &str, body: &str) {
545    let conf = aws_config::load_from_env().await;
546    let client = aws_sdk_sns::Client::new(&conf);
547    let res = client
548        .publish()
549        .topic_arn(topic_arn)
550        .message(body)
551        .send()
552        .await;
553    if let Err(e) = res {
554        tracing::warn!(arn = %topic_arn, error = ?e, "notifications: SNS publish failed");
555        bump_drop(manager, "sns");
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562
563    fn rule(
564        id: &str,
565        events: &[EventType],
566        dest: Destination,
567        prefix: Option<&str>,
568        suffix: Option<&str>,
569    ) -> NotificationRule {
570        NotificationRule {
571            id: id.to_owned(),
572            events: events.to_vec(),
573            destination: dest,
574            filter_prefix: prefix.map(str::to_owned),
575            filter_suffix: suffix.map(str::to_owned),
576        }
577    }
578
579    #[test]
580    fn match_destinations_single_rule_event_match() {
581        let mgr = NotificationManager::new();
582        mgr.put(
583            "b",
584            NotificationConfig {
585                rules: vec![rule(
586                    "r1",
587                    &[EventType::ObjectCreatedPut],
588                    Destination::Webhook {
589                        url: "http://hook".into(),
590                    },
591                    None,
592                    None,
593                )],
594            },
595        );
596        let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "any/key.txt");
597        assert_eq!(dests.len(), 1, "single rule must fire on event match");
598    }
599
600    #[test]
601    fn match_destinations_prefix_filter() {
602        let mgr = NotificationManager::new();
603        mgr.put(
604            "b",
605            NotificationConfig {
606                rules: vec![rule(
607                    "r1",
608                    &[EventType::ObjectCreatedPut],
609                    Destination::Webhook {
610                        url: "http://hook".into(),
611                    },
612                    Some("uploads/"),
613                    None,
614                )],
615            },
616        );
617        assert_eq!(
618            mgr.match_destinations("b", &EventType::ObjectCreatedPut, "uploads/file.bin")
619                .len(),
620            1
621        );
622        assert!(
623            mgr.match_destinations("b", &EventType::ObjectCreatedPut, "logs/file.bin")
624                .is_empty(),
625            "prefix filter must reject non-matching key"
626        );
627    }
628
629    #[test]
630    fn match_destinations_suffix_filter() {
631        let mgr = NotificationManager::new();
632        mgr.put(
633            "b",
634            NotificationConfig {
635                rules: vec![rule(
636                    "r1",
637                    &[EventType::ObjectCreatedPut],
638                    Destination::Webhook {
639                        url: "http://hook".into(),
640                    },
641                    None,
642                    Some(".jpg"),
643                )],
644            },
645        );
646        assert_eq!(
647            mgr.match_destinations("b", &EventType::ObjectCreatedPut, "photo.jpg")
648                .len(),
649            1
650        );
651        assert!(
652            mgr.match_destinations("b", &EventType::ObjectCreatedPut, "doc.pdf")
653                .is_empty(),
654            "suffix filter must reject non-matching key"
655        );
656    }
657
658    #[test]
659    fn match_destinations_no_rule_for_bucket() {
660        let mgr = NotificationManager::new();
661        let dests = mgr.match_destinations("ghost", &EventType::ObjectCreatedPut, "k");
662        assert!(dests.is_empty(), "unknown bucket must yield empty vec");
663    }
664
665    #[test]
666    fn match_destinations_event_type_mismatch() {
667        let mgr = NotificationManager::new();
668        mgr.put(
669            "b",
670            NotificationConfig {
671                rules: vec![rule(
672                    "r1",
673                    &[EventType::ObjectCreatedPut],
674                    Destination::Webhook {
675                        url: "http://hook".into(),
676                    },
677                    None,
678                    None,
679                )],
680            },
681        );
682        assert!(
683            mgr.match_destinations("b", &EventType::ObjectRemovedDelete, "k")
684                .is_empty(),
685            "mismatched event type must not fire"
686        );
687    }
688
689    #[test]
690    fn match_destinations_multiple_rules_fire_in_order() {
691        let mgr = NotificationManager::new();
692        mgr.put(
693            "b",
694            NotificationConfig {
695                rules: vec![
696                    rule(
697                        "first",
698                        &[EventType::ObjectCreatedPut],
699                        Destination::Webhook {
700                            url: "http://first".into(),
701                        },
702                        None,
703                        None,
704                    ),
705                    rule(
706                        "second",
707                        &[EventType::ObjectCreatedPut],
708                        Destination::Webhook {
709                            url: "http://second".into(),
710                        },
711                        None,
712                        None,
713                    ),
714                ],
715            },
716        );
717        let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "k");
718        assert_eq!(dests.len(), 2, "both matching rules fire");
719        match (&dests[0], &dests[1]) {
720            (Destination::Webhook { url: u1 }, Destination::Webhook { url: u2 }) => {
721                assert_eq!(u1, "http://first");
722                assert_eq!(u2, "http://second");
723            }
724            _ => panic!("expected two webhooks in declaration order"),
725        }
726    }
727
728    #[test]
729    fn build_event_json_schema_matches_aws() {
730        let now =
731            chrono::DateTime::parse_from_rfc3339("2026-05-13T10:00:00Z")
732                .unwrap()
733                .with_timezone(&chrono::Utc);
734        let body = build_event_json(
735            "my-bucket",
736            "uploads/photo.jpg",
737            &EventType::ObjectCreatedPut,
738            Some(12345),
739            Some("\"deadbeef\""),
740            Some("v-001"),
741            "REQ-1",
742            now,
743        );
744        let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
745        let rec = &v["Records"][0];
746        assert_eq!(rec["eventName"], "s3:ObjectCreated:Put");
747        assert_eq!(rec["eventTime"], "2026-05-13T10:00:00.000Z");
748        assert_eq!(rec["s3"]["bucket"]["name"], "my-bucket");
749        assert_eq!(rec["s3"]["object"]["key"], "uploads/photo.jpg");
750        assert_eq!(rec["s3"]["object"]["size"], 12345);
751        assert_eq!(rec["s3"]["object"]["eTag"], "deadbeef");
752        assert_eq!(rec["s3"]["object"]["versionId"], "v-001");
753    }
754
755    #[test]
756    fn build_event_json_omits_optional_fields() {
757        let now = chrono::Utc::now();
758        let body = build_event_json(
759            "b",
760            "k",
761            &EventType::ObjectRemovedDeleteMarker,
762            None,
763            None,
764            None,
765            "r",
766            now,
767        );
768        let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
769        let obj = &v["Records"][0]["s3"]["object"];
770        assert!(obj.get("size").is_none());
771        assert!(obj.get("eTag").is_none());
772        assert!(obj.get("versionId").is_none());
773    }
774
775    #[test]
776    fn json_round_trip() {
777        let mgr = NotificationManager::new();
778        mgr.put(
779            "b",
780            NotificationConfig {
781                rules: vec![rule(
782                    "r1",
783                    &[
784                        EventType::ObjectCreatedPut,
785                        EventType::ObjectRemovedDelete,
786                    ],
787                    Destination::Sqs {
788                        queue_arn: "arn:aws:sqs:us-east-1:123:q".into(),
789                    },
790                    Some("u/"),
791                    Some(".jpg"),
792                )],
793            },
794        );
795        let json = mgr.to_json().expect("to_json");
796        let mgr2 = NotificationManager::from_json(&json).expect("from_json");
797        assert_eq!(mgr.get("b"), mgr2.get("b"));
798    }
799
800    #[test]
801    fn delete_is_idempotent() {
802        let mgr = NotificationManager::new();
803        mgr.delete("never-existed");
804        mgr.put(
805            "b",
806            NotificationConfig {
807                rules: vec![rule(
808                    "r1",
809                    &[EventType::ObjectCreatedPut],
810                    Destination::Webhook {
811                        url: "http://h".into(),
812                    },
813                    None,
814                    None,
815                )],
816            },
817        );
818        mgr.delete("b");
819        assert!(mgr.get("b").is_none());
820    }
821
822    #[test]
823    fn put_replaces_previous_config() {
824        let mgr = NotificationManager::new();
825        mgr.put(
826            "b",
827            NotificationConfig {
828                rules: vec![rule(
829                    "old",
830                    &[EventType::ObjectCreatedPut],
831                    Destination::Webhook {
832                        url: "http://old".into(),
833                    },
834                    None,
835                    None,
836                )],
837            },
838        );
839        mgr.put(
840            "b",
841            NotificationConfig {
842                rules: vec![rule(
843                    "new",
844                    &[EventType::ObjectRemovedDelete],
845                    Destination::Webhook {
846                        url: "http://new".into(),
847                    },
848                    None,
849                    None,
850                )],
851            },
852        );
853        let cfg = mgr.get("b").expect("config");
854        assert_eq!(cfg.rules.len(), 1);
855        assert_eq!(cfg.rules[0].id, "new");
856    }
857
858    #[tokio::test]
859    async fn dispatch_event_via_webhook_delivers_payload() {
860        // Spin up a tiny tokio HTTP receiver on a random port; verify the
861        // dispatcher POSTs the event JSON we expect.
862        use std::sync::Mutex;
863        use tokio::io::{AsyncReadExt, AsyncWriteExt};
864        use tokio::net::TcpListener;
865
866        let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
867        let addr = listener.local_addr().expect("addr");
868        let received: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
869        let received_cl = Arc::clone(&received);
870        tokio::spawn(async move {
871            if let Ok((mut sock, _)) = listener.accept().await {
872                let mut buf = vec![0u8; 16384];
873                let n = sock.read(&mut buf).await.unwrap_or(0);
874                let raw = String::from_utf8_lossy(&buf[..n]).to_string();
875                received_cl.lock().unwrap().push(raw);
876                let _ = sock
877                    .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
878                    .await;
879            }
880        });
881
882        let mgr = Arc::new(NotificationManager::new());
883        mgr.put(
884            "b",
885            NotificationConfig {
886                rules: vec![rule(
887                    "r1",
888                    &[EventType::ObjectCreatedPut],
889                    Destination::Webhook {
890                        url: format!("http://{addr}/hook"),
891                    },
892                    None,
893                    None,
894                )],
895            },
896        );
897
898        dispatch_event(
899            Arc::clone(&mgr),
900            "b".into(),
901            "k.txt".into(),
902            EventType::ObjectCreatedPut,
903            Some(7),
904            Some("\"abc\"".into()),
905            None,
906            "req-1".into(),
907        )
908        .await;
909
910        // The dispatcher detaches via tokio::spawn; poll briefly.
911        for _ in 0..50 {
912            if !received.lock().unwrap().is_empty() {
913                break;
914            }
915            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
916        }
917        let raw = received.lock().unwrap().clone();
918        assert!(!raw.is_empty(), "webhook receiver got nothing");
919        let raw = &raw[0];
920        assert!(raw.contains("POST /hook"), "missing POST line");
921        assert!(raw.contains("s3:ObjectCreated:Put"), "missing event name");
922        assert!(raw.contains("\"k.txt\""), "missing key");
923        assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
924    }
925
926    #[tokio::test]
927    async fn dispatch_event_503_drops_after_retry_budget() {
928        // Receiver that always returns 503 — the dispatcher must retry up
929        // to the configured budget then bump dropped_total exactly once.
930        use std::sync::Mutex;
931        use tokio::io::{AsyncReadExt, AsyncWriteExt};
932        use tokio::net::TcpListener;
933
934        let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
935        let addr = listener.local_addr().expect("addr");
936        let attempt_count: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
937        let attempt_count_cl = Arc::clone(&attempt_count);
938        tokio::spawn(async move {
939            for _ in 0..RETRY_ATTEMPTS {
940                if let Ok((mut sock, _)) = listener.accept().await {
941                    let mut buf = vec![0u8; 16384];
942                    let _ = sock.read(&mut buf).await;
943                    *attempt_count_cl.lock().unwrap() += 1;
944                    let _ = sock
945                        .write_all(b"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n")
946                        .await;
947                }
948            }
949        });
950
951        let mgr = Arc::new(NotificationManager::new());
952        mgr.put(
953            "b",
954            NotificationConfig {
955                rules: vec![rule(
956                    "r1",
957                    &[EventType::ObjectCreatedPut],
958                    Destination::Webhook {
959                        url: format!("http://{addr}/sink"),
960                    },
961                    None,
962                    None,
963                )],
964            },
965        );
966
967        dispatch_event(
968            Arc::clone(&mgr),
969            "b".into(),
970            "k".into(),
971            EventType::ObjectCreatedPut,
972            None,
973            None,
974            None,
975            "r".into(),
976        )
977        .await;
978
979        // Wait for the detached task — RETRY_ATTEMPTS attempts plus
980        // backoff (50ms + 100ms). Cap at 2s so a flaky run doesn't hang.
981        for _ in 0..100 {
982            if mgr.dropped_total.load(Ordering::Relaxed) > 0 {
983                break;
984            }
985            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
986        }
987        assert_eq!(
988            mgr.dropped_total.load(Ordering::Relaxed),
989            1,
990            "drop counter must bump exactly once after retry budget exhausted"
991        );
992    }
993}