Skip to main content

s4_server/
notifications.rs

1//! S3 bucket notifications — fire events on PUT / DELETE (v0.6 #35).
2//!
3//! AWS S3 lets a bucket owner register notification destinations (SNS, SQS,
4//! Lambda, EventBridge) that receive a JSON event payload whenever objects
5//! change. S4-server implements a subset:
6//!
7//! - **Webhook** (HTTP POST of the event JSON) — always available, no extra
8//!   crate dependencies required.
9//! - **SQS** (`Sqs { queue_arn }`) — gated behind the `aws-events` cargo
10//!   feature so the default build doesn't pull `aws-sdk-sqs`.
11//! - **SNS** (`Sns { topic_arn }`) — gated behind the same `aws-events`
12//!   feature.
13//! - **Lambda direct invoke is NOT implemented** in v0.6 #35; the recommended
14//!   path is SNS → Lambda subscription, which works through this module's SNS
15//!   destination once the feature is on.
16//!
17//! ## responsibilities (v0.6 #35)
18//!
19//! - in-memory `bucket -> NotificationConfig` map with JSON snapshot
20//!   round-trip, mirroring `versioning.rs` / `cors.rs` / `inventory.rs` so
21//!   `--notifications-state-file` is a one-line addition in `main.rs`.
22//! - `match_destinations(bucket, event, key)` walks the rule list in
23//!   declaration order and returns every destination whose event types and
24//!   prefix/suffix filter accept the (event, key) tuple. AWS allows multiple
25//!   rules to fire on a single event so the result is a `Vec`, not an
26//!   `Option`.
27//! - `build_event_json` serialises the AWS-canonical
28//!   [event payload schema](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html)
29//!   so existing AWS SDK consumers can deserialise the body without bespoke
30//!   parsing.
31//! - `dispatch_event` is the fire-and-forget runtime: spawned by the
32//!   `service.rs` PUT / DELETE handlers, it POSTs the event JSON to every
33//!   matched destination on a tokio task, retries 5xx with exponential
34//!   backoff (3 attempts), then drops + bumps the `dropped_total` counter +
35//!   logs at warn so operators see the silent loss in metrics.
36//!
37//! ## scope limitations
38//!
39//! - in-memory only (no replication across multi-instance deployments).
40//!   `--notifications-state-file <PATH>` provides restart recovery via JSON
41//!   snapshot, same shape as `--versioning-state-file`.
42//! - retry budget is fixed at 3 attempts with exponential backoff (50ms /
43//!   100ms / 200ms). Beyond that the event is dropped and `dropped_total`
44//!   is bumped — there's no on-disk dead-letter queue.
45//! - SNS/SQS use the AWS SDK's default credential chain (environment, EC2
46//!   role, etc); per-destination credential overrides are out of scope.
47//! - Lambda direct invocation is not implemented (use SNS subscription).
48//! - `EventBridge` integration is not implemented.
49
50use std::collections::HashMap;
51use std::sync::Arc;
52use std::sync::RwLock;
53use std::sync::atomic::{AtomicU64, Ordering};
54
55use serde::{Deserialize, Serialize};
56
57/// Subset of the AWS S3 event-type taxonomy. We intentionally stop short of
58/// the full ~30-event matrix because v0.6 #35 only fires PUT and DELETE
59/// hooks; more events can be added when the corresponding handlers grow
60/// notification fire-points.
61#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
62pub enum EventType {
63    /// `s3:ObjectCreated:Put`
64    ObjectCreatedPut,
65    /// `s3:ObjectRemoved:Delete` — hard delete on a non-versioned bucket, or
66    /// a specific-version DELETE on any bucket.
67    ObjectRemovedDelete,
68    /// `s3:ObjectRemoved:DeleteMarkerCreated` — DELETE on a bucket with
69    /// versioning state Enabled OR Suspended.
70    ///
71    /// **Enabled**: pushes a delete marker only; prior version bytes
72    /// survive and are still reachable via `?versionId=`.
73    ///
74    /// **Suspended**: pushes a delete marker AND physically deletes
75    /// the prior null version (the `null` version is overwritten by
76    /// the marker — AWS S3 spec). Subscribers cannot tell from the
77    /// event type alone whether a prior version still exists; if the
78    /// distinction matters, query versioning state via
79    /// `GetBucketVersioning` or rely on the receiving system's chain
80    /// awareness.
81    ObjectRemovedDeleteMarker,
82}
83
84impl EventType {
85    /// AWS wire-string form. Matches the values an SDK consumer would see in
86    /// the `eventName` field of the event JSON.
87    #[must_use]
88    pub fn as_aws_str(&self) -> &'static str {
89        match self {
90            Self::ObjectCreatedPut => "s3:ObjectCreated:Put",
91            Self::ObjectRemovedDelete => "s3:ObjectRemoved:Delete",
92            Self::ObjectRemovedDeleteMarker => "s3:ObjectRemoved:DeleteMarkerCreated",
93        }
94    }
95
96    /// Parse the AWS wire form. Tolerates the AWS catch-all
97    /// `s3:ObjectCreated:*` and `s3:ObjectRemoved:*` patterns by mapping them
98    /// to the most common concrete variant in each family — matching what
99    /// AWS does when expanding the wildcard against an actual event.
100    #[must_use]
101    pub fn from_aws_str(s: &str) -> Option<Self> {
102        match s {
103            "s3:ObjectCreated:Put" | "s3:ObjectCreated:*" => Some(Self::ObjectCreatedPut),
104            "s3:ObjectRemoved:Delete" => Some(Self::ObjectRemovedDelete),
105            "s3:ObjectRemoved:DeleteMarkerCreated" => Some(Self::ObjectRemovedDeleteMarker),
106            "s3:ObjectRemoved:*" => Some(Self::ObjectRemovedDelete),
107            _ => None,
108        }
109    }
110}
111
112/// One destination for a fired event. The variant determines the dispatch
113/// path: `Webhook` is always built; `Sqs` / `Sns` are accepted at config
114/// time regardless of the build feature, but the runtime dispatcher will
115/// log + drop them when the `aws-events` feature isn't compiled in.
116#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
117pub enum Destination {
118    /// HTTP POST to the URL with the event JSON as body. Always available
119    /// (no extra cargo features required).
120    Webhook { url: String },
121    /// AWS SQS queue ARN. Active only with the `aws-events` cargo feature.
122    Sqs { queue_arn: String },
123    /// AWS SNS topic ARN. Active only with the `aws-events` cargo feature.
124    Sns { topic_arn: String },
125}
126
127impl Destination {
128    /// Short tag used as a metric label so dashboards can split drops by
129    /// destination type without leaking ARNs / URLs into Prometheus.
130    #[must_use]
131    pub fn type_tag(&self) -> &'static str {
132        match self {
133            Self::Webhook { .. } => "webhook",
134            Self::Sqs { .. } => "sqs",
135            Self::Sns { .. } => "sns",
136        }
137    }
138}
139
140/// One notification rule. Multiple rules can be registered per bucket; each
141/// rule independently chooses whether to fire on a given event by checking
142/// the event type against `events` and the object key against the
143/// `filter_prefix` / `filter_suffix` pair (both optional; both apply
144/// simultaneously when set).
145#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
146pub struct NotificationRule {
147    /// Operator-supplied id (the AWS S3 PUT API requires it; if the client
148    /// omits one, callers can synthesise `format!("rule-{i}")`).
149    pub id: String,
150    /// Event types this rule listens for. Empty means "never fire" — the
151    /// rule won't match anything.
152    pub events: Vec<EventType>,
153    /// Where to send the event when the rule matches.
154    pub destination: Destination,
155    /// AWS S3 `Filter.Key.Rules[Name=prefix].Value`. When `None`, no prefix
156    /// filter applies. Empty string is treated as "match anything", same as
157    /// `None`.
158    pub filter_prefix: Option<String>,
159    /// AWS S3 `Filter.Key.Rules[Name=suffix].Value`. Same semantics as
160    /// `filter_prefix`.
161    pub filter_suffix: Option<String>,
162}
163
164/// Per-bucket notification configuration (ordered list of rules).
165#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
166pub struct NotificationConfig {
167    pub rules: Vec<NotificationRule>,
168}
169
170/// JSON snapshot — `bucket -> NotificationConfig`. Mirrors the shape of
171/// `cors.rs` / `inventory.rs`'s `to_json` / `from_json` so the operator can
172/// hand-edit configurations across restart cycles.
173#[derive(Debug, Default, Serialize, Deserialize)]
174struct NotificationSnapshot {
175    by_bucket: HashMap<String, NotificationConfig>,
176}
177
178/// In-memory manager of per-bucket notification configurations.
179///
180/// The `dropped_total` counter is exposed publicly for the metrics layer to
181/// poll without taking the configuration lock.
182pub struct NotificationManager {
183    by_bucket: RwLock<HashMap<String, NotificationConfig>>,
184    /// Bumped by `dispatch_event` whenever a destination returns 5xx after
185    /// the configured retry budget, or when an `aws-events`-gated
186    /// destination fires without the feature compiled in.
187    pub dropped_total: AtomicU64,
188}
189
190impl Default for NotificationManager {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196impl std::fmt::Debug for NotificationManager {
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        f.debug_struct("NotificationManager")
199            .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
200            .finish_non_exhaustive()
201    }
202}
203
204impl NotificationManager {
205    /// Empty manager — no bucket has any configuration registered.
206    #[must_use]
207    pub fn new() -> Self {
208        Self {
209            by_bucket: RwLock::new(HashMap::new()),
210            dropped_total: AtomicU64::new(0),
211        }
212    }
213
214    /// `put_bucket_notification_configuration` handler entry. The bucket's
215    /// existing configuration is fully replaced (S3 spec — PutBucket... is
216    /// upsert-style at the bucket scope, not per-rule patch).
217    pub fn put(&self, bucket: &str, config: NotificationConfig) {
218        crate::lock_recovery::recover_write(&self.by_bucket, "notifications.by_bucket")
219            .insert(bucket.to_owned(), config);
220    }
221
222    /// `get_bucket_notification_configuration` handler entry. Returns the
223    /// cloned configuration, or `None` when nothing is registered. AWS S3
224    /// returns an empty configuration document (not 404) in that case; the
225    /// service-layer handler maps `None` → empty DTO accordingly.
226    #[must_use]
227    pub fn get(&self, bucket: &str) -> Option<NotificationConfig> {
228        crate::lock_recovery::recover_read(&self.by_bucket, "notifications.by_bucket")
229            .get(bucket)
230            .cloned()
231    }
232
233    /// Drop all rules for `bucket`. Idempotent.
234    pub fn delete(&self, bucket: &str) {
235        crate::lock_recovery::recover_write(&self.by_bucket, "notifications.by_bucket")
236            .remove(bucket);
237    }
238
239    /// Serialise the entire manager state to JSON (for
240    /// `--notifications-state-file` snapshot dumps).
241    pub fn to_json(&self) -> Result<String, serde_json::Error> {
242        let snap = NotificationSnapshot {
243            by_bucket: crate::lock_recovery::recover_read(
244                &self.by_bucket,
245                "notifications.by_bucket",
246            )
247            .clone(),
248        };
249        serde_json::to_string(&snap)
250    }
251
252    /// Restore a manager from a previously-emitted snapshot. The
253    /// `dropped_total` counter is reset to 0 — historical drops are not
254    /// persisted (they're a runtime metric, not configuration).
255    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
256        let snap: NotificationSnapshot = serde_json::from_str(s)?;
257        Ok(Self {
258            by_bucket: RwLock::new(snap.by_bucket),
259            dropped_total: AtomicU64::new(0),
260        })
261    }
262
263    /// Match an event against the bucket's rules and return every
264    /// destination whose rule accepts the (event type, key) tuple. Order
265    /// follows the rule declaration order so a deterministic dispatch
266    /// sequence falls out for tests.
267    #[must_use]
268    pub fn match_destinations(
269        &self,
270        bucket: &str,
271        event: &EventType,
272        key: &str,
273    ) -> Vec<Destination> {
274        let map = crate::lock_recovery::recover_read(&self.by_bucket, "notifications.by_bucket");
275        let cfg = match map.get(bucket) {
276            Some(c) => c,
277            None => return Vec::new(),
278        };
279        cfg.rules
280            .iter()
281            .filter(|r| rule_matches(r, event, key))
282            .map(|r| r.destination.clone())
283            .collect()
284    }
285}
286
287fn rule_matches(rule: &NotificationRule, event: &EventType, key: &str) -> bool {
288    if !rule.events.iter().any(|e| e == event) {
289        return false;
290    }
291    if let Some(p) = rule.filter_prefix.as_deref()
292        && !p.is_empty()
293        && !key.starts_with(p)
294    {
295        return false;
296    }
297    if let Some(s) = rule.filter_suffix.as_deref()
298        && !s.is_empty()
299        && !key.ends_with(s)
300    {
301        return false;
302    }
303    true
304}
305
306/// Build the AWS S3 Event payload JSON for a single record. Schema
307/// matches:
308/// <https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html>
309///
310/// The returned string is one full event envelope (`{"Records":[...]}`),
311/// suitable as the body of an HTTP POST or the message body of an SQS /
312/// SNS publish.
313#[must_use]
314#[allow(clippy::too_many_arguments)]
315pub fn build_event_json(
316    bucket: &str,
317    key: &str,
318    event: &EventType,
319    size: Option<u64>,
320    etag: Option<&str>,
321    version_id: Option<&str>,
322    request_id: &str,
323    now: chrono::DateTime<chrono::Utc>,
324) -> String {
325    // Trim any surrounding `"` from the etag — AWS S3 stores ETags as
326    // quoted strings on the wire but the event payload uses the bare hex.
327    let etag_clean = etag.map(|e| e.trim_matches('"').to_owned());
328    let mut object = serde_json::json!({
329        "key": key,
330        "sequencer": format!("{:016x}", now.timestamp_micros() as u64),
331    });
332    if let Some(sz) = size {
333        object["size"] = serde_json::json!(sz);
334    }
335    if let Some(ref e) = etag_clean {
336        object["eTag"] = serde_json::json!(e);
337    }
338    if let Some(v) = version_id {
339        object["versionId"] = serde_json::json!(v);
340    }
341    let event_name = event.as_aws_str();
342    let event_source = match event {
343        EventType::ObjectCreatedPut => "ObjectCreated",
344        EventType::ObjectRemovedDelete | EventType::ObjectRemovedDeleteMarker => "ObjectRemoved",
345    };
346    let record = serde_json::json!({
347        "eventVersion": "2.1",
348        "eventSource": "aws:s3",
349        "awsRegion": "us-east-1",
350        "eventTime": now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
351        "eventName": event_name,
352        "userIdentity": { "principalId": "S4" },
353        "requestParameters": { "sourceIPAddress": "0.0.0.0" },
354        "responseElements": {
355            "x-amz-request-id": request_id,
356            "x-amz-id-2": request_id,
357        },
358        "s3": {
359            "s3SchemaVersion": "1.0",
360            "configurationId": "S4-default",
361            "bucket": {
362                "name": bucket,
363                "ownerIdentity": { "principalId": "S4" },
364                "arn": format!("arn:aws:s3:::{bucket}"),
365            },
366            "object": object,
367        },
368    });
369    let _ = event_source; // surfaced via eventName; left here for future
370    serde_json::json!({ "Records": [record] }).to_string()
371}
372
373const RETRY_ATTEMPTS: u32 = 3;
374const RETRY_BASE_MS: u64 = 50;
375
376/// Fire-and-forget event dispatch. Iterates every matched destination for
377/// the (bucket, key, event) triple and sends the event JSON in detached
378/// tokio tasks; this future itself awaits the spawn-and-send pipeline so
379/// callers can `tokio::spawn` it once and forget about the outcome.
380///
381/// Webhook destinations get retried 3 times with exponential backoff on 5xx
382/// responses; permanent 4xx responses are treated as "delivered" (the
383/// receiver explicitly rejected the payload — retrying won't help) and the
384/// drop counter is NOT bumped. SNS / SQS are best-effort; without the
385/// `aws-events` cargo feature the drop counter is bumped immediately.
386#[allow(clippy::too_many_arguments)]
387pub async fn dispatch_event(
388    manager: Arc<NotificationManager>,
389    bucket: String,
390    key: String,
391    event: EventType,
392    size: Option<u64>,
393    etag: Option<String>,
394    version_id: Option<String>,
395    request_id: String,
396) {
397    let dests = manager.match_destinations(&bucket, &event, &key);
398    if dests.is_empty() {
399        return;
400    }
401    let now = chrono::Utc::now();
402    let body = build_event_json(
403        &bucket,
404        &key,
405        &event,
406        size,
407        etag.as_deref(),
408        version_id.as_deref(),
409        &request_id,
410        now,
411    );
412    for dest in dests {
413        let mgr = Arc::clone(&manager);
414        let body = body.clone();
415        // v0.8.5 #81 (audit H-7): wrap the per-destination `send_one`
416        // body in `futures::FutureExt::catch_unwind` so a panic inside
417        // the reqwest stack / aws-sdk-sns / aws-sdk-sqs (or a bug in
418        // our retry loop) does NOT bubble out of the detached task as
419        // a `JoinError` that no operator dashboard scrapes. Caught
420        // panics bump `s4_dispatcher_panics_total{kind="notification"}`
421        // + log at ERROR with the panic payload, so silent feature
422        // degradation (= every webhook dispatch panicking and never
423        // reaching the receiver, but the gateway itself looking
424        // healthy) becomes a first-class metric the operator can
425        // alert on.
426        //
427        // `AssertUnwindSafe` is required because the `Arc<...>` +
428        // `String` captures are not `UnwindSafe` by default; the
429        // safety contract here is "we don't continue using any of
430        // those captures after the panic" which trivially holds (we
431        // drop them and return).
432        tokio::spawn(async move {
433            use futures::FutureExt as _;
434            let kind = "notification";
435            let fut = send_one(mgr, dest, body);
436            if let Err(panic) = std::panic::AssertUnwindSafe(fut).catch_unwind().await {
437                let panic_msg = panic
438                    .downcast_ref::<&'static str>()
439                    .copied()
440                    .map(str::to_owned)
441                    .or_else(|| panic.downcast_ref::<String>().cloned())
442                    .unwrap_or_else(|| "(non-string panic payload)".to_owned());
443                tracing::error!(
444                    kind,
445                    panic_payload = %panic_msg,
446                    "S4 dispatcher task panicked (caught by catch_unwind, runtime not poisoned)"
447                );
448                crate::metrics::record_dispatcher_panic(kind);
449            }
450        });
451    }
452}
453
454async fn send_one(manager: Arc<NotificationManager>, dest: Destination, body: String) {
455    match dest {
456        Destination::Webhook { ref url } => {
457            let client = match reqwest::Client::builder()
458                .timeout(std::time::Duration::from_secs(5))
459                .build()
460            {
461                Ok(c) => c,
462                Err(e) => {
463                    tracing::warn!(error = %e, "notifications: reqwest client build failed");
464                    bump_drop(&manager, dest.type_tag());
465                    return;
466                }
467            };
468            for attempt in 0..RETRY_ATTEMPTS {
469                let resp = client
470                    .post(url)
471                    .header("content-type", "application/json")
472                    .body(body.clone())
473                    .send()
474                    .await;
475                match resp {
476                    Ok(r) if r.status().is_success() => return,
477                    Ok(r) if r.status().is_server_error() => {
478                        // 5xx — retry with backoff.
479                        if attempt + 1 < RETRY_ATTEMPTS {
480                            let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
481                            tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
482                            continue;
483                        }
484                        tracing::warn!(
485                            url = %url,
486                            status = %r.status(),
487                            "notifications: webhook giving up after {RETRY_ATTEMPTS} attempts"
488                        );
489                        bump_drop(&manager, "webhook");
490                        return;
491                    }
492                    Ok(r) => {
493                        // 4xx / redirect — receiver rejected, no point retrying.
494                        tracing::warn!(
495                            url = %url,
496                            status = %r.status(),
497                            "notifications: webhook permanent failure, dropping"
498                        );
499                        return;
500                    }
501                    Err(e) => {
502                        // Network-level error — also retry-eligible.
503                        if attempt + 1 < RETRY_ATTEMPTS {
504                            let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
505                            tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
506                            continue;
507                        }
508                        tracing::warn!(
509                            url = %url,
510                            error = %e,
511                            "notifications: webhook network failure, dropping after {RETRY_ATTEMPTS} attempts"
512                        );
513                        bump_drop(&manager, "webhook");
514                        return;
515                    }
516                }
517            }
518        }
519        Destination::Sqs { ref queue_arn } => {
520            #[cfg(feature = "aws-events")]
521            {
522                send_sqs(&manager, queue_arn, &body).await;
523            }
524            #[cfg(not(feature = "aws-events"))]
525            {
526                let _ = queue_arn;
527                let _ = body;
528                tracing::warn!(
529                    "notifications: SQS destination configured but `aws-events` feature is off — dropping"
530                );
531                bump_drop(&manager, "sqs");
532            }
533        }
534        Destination::Sns { ref topic_arn } => {
535            #[cfg(feature = "aws-events")]
536            {
537                send_sns(&manager, topic_arn, &body).await;
538            }
539            #[cfg(not(feature = "aws-events"))]
540            {
541                let _ = topic_arn;
542                let _ = body;
543                tracing::warn!(
544                    "notifications: SNS destination configured but `aws-events` feature is off — dropping"
545                );
546                bump_drop(&manager, "sns");
547            }
548        }
549    }
550}
551
552fn bump_drop(manager: &NotificationManager, dest_tag: &'static str) {
553    manager.dropped_total.fetch_add(1, Ordering::Relaxed);
554    crate::metrics::record_notification_drop(dest_tag);
555}
556
557#[cfg(feature = "aws-events")]
558async fn send_sqs(manager: &NotificationManager, queue_arn: &str, body: &str) {
559    let conf = aws_config::load_from_env().await;
560    let client = aws_sdk_sqs::Client::new(&conf);
561    // ARN form: arn:aws:sqs:<region>:<account>:<queue-name>. SQS Send
562    // expects the queue URL, but we accept the ARN at config time and
563    // synthesise the URL via SDK introspection. As a pragmatic shortcut,
564    // operators can configure the URL directly inside the ARN field — the
565    // SDK accepts both.
566    let res = client
567        .send_message()
568        .queue_url(queue_arn)
569        .message_body(body)
570        .send()
571        .await;
572    if let Err(e) = res {
573        tracing::warn!(arn = %queue_arn, error = ?e, "notifications: SQS send failed");
574        bump_drop(manager, "sqs");
575    }
576}
577
578#[cfg(feature = "aws-events")]
579async fn send_sns(manager: &NotificationManager, topic_arn: &str, body: &str) {
580    let conf = aws_config::load_from_env().await;
581    let client = aws_sdk_sns::Client::new(&conf);
582    let res = client
583        .publish()
584        .topic_arn(topic_arn)
585        .message(body)
586        .send()
587        .await;
588    if let Err(e) = res {
589        tracing::warn!(arn = %topic_arn, error = ?e, "notifications: SNS publish failed");
590        bump_drop(manager, "sns");
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597
598    fn rule(
599        id: &str,
600        events: &[EventType],
601        dest: Destination,
602        prefix: Option<&str>,
603        suffix: Option<&str>,
604    ) -> NotificationRule {
605        NotificationRule {
606            id: id.to_owned(),
607            events: events.to_vec(),
608            destination: dest,
609            filter_prefix: prefix.map(str::to_owned),
610            filter_suffix: suffix.map(str::to_owned),
611        }
612    }
613
614    #[test]
615    fn match_destinations_single_rule_event_match() {
616        let mgr = NotificationManager::new();
617        mgr.put(
618            "b",
619            NotificationConfig {
620                rules: vec![rule(
621                    "r1",
622                    &[EventType::ObjectCreatedPut],
623                    Destination::Webhook {
624                        url: "http://hook".into(),
625                    },
626                    None,
627                    None,
628                )],
629            },
630        );
631        let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "any/key.txt");
632        assert_eq!(dests.len(), 1, "single rule must fire on event match");
633    }
634
635    #[test]
636    fn match_destinations_prefix_filter() {
637        let mgr = NotificationManager::new();
638        mgr.put(
639            "b",
640            NotificationConfig {
641                rules: vec![rule(
642                    "r1",
643                    &[EventType::ObjectCreatedPut],
644                    Destination::Webhook {
645                        url: "http://hook".into(),
646                    },
647                    Some("uploads/"),
648                    None,
649                )],
650            },
651        );
652        assert_eq!(
653            mgr.match_destinations("b", &EventType::ObjectCreatedPut, "uploads/file.bin")
654                .len(),
655            1
656        );
657        assert!(
658            mgr.match_destinations("b", &EventType::ObjectCreatedPut, "logs/file.bin")
659                .is_empty(),
660            "prefix filter must reject non-matching key"
661        );
662    }
663
664    #[test]
665    fn match_destinations_suffix_filter() {
666        let mgr = NotificationManager::new();
667        mgr.put(
668            "b",
669            NotificationConfig {
670                rules: vec![rule(
671                    "r1",
672                    &[EventType::ObjectCreatedPut],
673                    Destination::Webhook {
674                        url: "http://hook".into(),
675                    },
676                    None,
677                    Some(".jpg"),
678                )],
679            },
680        );
681        assert_eq!(
682            mgr.match_destinations("b", &EventType::ObjectCreatedPut, "photo.jpg")
683                .len(),
684            1
685        );
686        assert!(
687            mgr.match_destinations("b", &EventType::ObjectCreatedPut, "doc.pdf")
688                .is_empty(),
689            "suffix filter must reject non-matching key"
690        );
691    }
692
693    #[test]
694    fn match_destinations_no_rule_for_bucket() {
695        let mgr = NotificationManager::new();
696        let dests = mgr.match_destinations("ghost", &EventType::ObjectCreatedPut, "k");
697        assert!(dests.is_empty(), "unknown bucket must yield empty vec");
698    }
699
700    #[test]
701    fn match_destinations_event_type_mismatch() {
702        let mgr = NotificationManager::new();
703        mgr.put(
704            "b",
705            NotificationConfig {
706                rules: vec![rule(
707                    "r1",
708                    &[EventType::ObjectCreatedPut],
709                    Destination::Webhook {
710                        url: "http://hook".into(),
711                    },
712                    None,
713                    None,
714                )],
715            },
716        );
717        assert!(
718            mgr.match_destinations("b", &EventType::ObjectRemovedDelete, "k")
719                .is_empty(),
720            "mismatched event type must not fire"
721        );
722    }
723
724    #[test]
725    fn match_destinations_multiple_rules_fire_in_order() {
726        let mgr = NotificationManager::new();
727        mgr.put(
728            "b",
729            NotificationConfig {
730                rules: vec![
731                    rule(
732                        "first",
733                        &[EventType::ObjectCreatedPut],
734                        Destination::Webhook {
735                            url: "http://first".into(),
736                        },
737                        None,
738                        None,
739                    ),
740                    rule(
741                        "second",
742                        &[EventType::ObjectCreatedPut],
743                        Destination::Webhook {
744                            url: "http://second".into(),
745                        },
746                        None,
747                        None,
748                    ),
749                ],
750            },
751        );
752        let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "k");
753        assert_eq!(dests.len(), 2, "both matching rules fire");
754        match (&dests[0], &dests[1]) {
755            (Destination::Webhook { url: u1 }, Destination::Webhook { url: u2 }) => {
756                assert_eq!(u1, "http://first");
757                assert_eq!(u2, "http://second");
758            }
759            _ => panic!("expected two webhooks in declaration order"),
760        }
761    }
762
763    #[test]
764    fn build_event_json_schema_matches_aws() {
765        let now = chrono::DateTime::parse_from_rfc3339("2026-05-13T10:00:00Z")
766            .unwrap()
767            .with_timezone(&chrono::Utc);
768        let body = build_event_json(
769            "my-bucket",
770            "uploads/photo.jpg",
771            &EventType::ObjectCreatedPut,
772            Some(12345),
773            Some("\"deadbeef\""),
774            Some("v-001"),
775            "REQ-1",
776            now,
777        );
778        let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
779        let rec = &v["Records"][0];
780        assert_eq!(rec["eventName"], "s3:ObjectCreated:Put");
781        assert_eq!(rec["eventTime"], "2026-05-13T10:00:00.000Z");
782        assert_eq!(rec["s3"]["bucket"]["name"], "my-bucket");
783        assert_eq!(rec["s3"]["object"]["key"], "uploads/photo.jpg");
784        assert_eq!(rec["s3"]["object"]["size"], 12345);
785        assert_eq!(rec["s3"]["object"]["eTag"], "deadbeef");
786        assert_eq!(rec["s3"]["object"]["versionId"], "v-001");
787    }
788
789    #[test]
790    fn build_event_json_omits_optional_fields() {
791        let now = chrono::Utc::now();
792        let body = build_event_json(
793            "b",
794            "k",
795            &EventType::ObjectRemovedDeleteMarker,
796            None,
797            None,
798            None,
799            "r",
800            now,
801        );
802        let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
803        let obj = &v["Records"][0]["s3"]["object"];
804        assert!(obj.get("size").is_none());
805        assert!(obj.get("eTag").is_none());
806        assert!(obj.get("versionId").is_none());
807    }
808
809    #[test]
810    fn json_round_trip() {
811        let mgr = NotificationManager::new();
812        mgr.put(
813            "b",
814            NotificationConfig {
815                rules: vec![rule(
816                    "r1",
817                    &[EventType::ObjectCreatedPut, EventType::ObjectRemovedDelete],
818                    Destination::Sqs {
819                        queue_arn: "arn:aws:sqs:us-east-1:123:q".into(),
820                    },
821                    Some("u/"),
822                    Some(".jpg"),
823                )],
824            },
825        );
826        let json = mgr.to_json().expect("to_json");
827        let mgr2 = NotificationManager::from_json(&json).expect("from_json");
828        assert_eq!(mgr.get("b"), mgr2.get("b"));
829    }
830
831    #[test]
832    fn delete_is_idempotent() {
833        let mgr = NotificationManager::new();
834        mgr.delete("never-existed");
835        mgr.put(
836            "b",
837            NotificationConfig {
838                rules: vec![rule(
839                    "r1",
840                    &[EventType::ObjectCreatedPut],
841                    Destination::Webhook {
842                        url: "http://h".into(),
843                    },
844                    None,
845                    None,
846                )],
847            },
848        );
849        mgr.delete("b");
850        assert!(mgr.get("b").is_none());
851    }
852
853    #[test]
854    fn put_replaces_previous_config() {
855        let mgr = NotificationManager::new();
856        mgr.put(
857            "b",
858            NotificationConfig {
859                rules: vec![rule(
860                    "old",
861                    &[EventType::ObjectCreatedPut],
862                    Destination::Webhook {
863                        url: "http://old".into(),
864                    },
865                    None,
866                    None,
867                )],
868            },
869        );
870        mgr.put(
871            "b",
872            NotificationConfig {
873                rules: vec![rule(
874                    "new",
875                    &[EventType::ObjectRemovedDelete],
876                    Destination::Webhook {
877                        url: "http://new".into(),
878                    },
879                    None,
880                    None,
881                )],
882            },
883        );
884        let cfg = mgr.get("b").expect("config");
885        assert_eq!(cfg.rules.len(), 1);
886        assert_eq!(cfg.rules[0].id, "new");
887    }
888
889    #[tokio::test]
890    async fn dispatch_event_via_webhook_delivers_payload() {
891        // Spin up a tiny tokio HTTP receiver on a random port; verify the
892        // dispatcher POSTs the event JSON we expect.
893        use std::sync::Mutex;
894        use tokio::io::{AsyncReadExt, AsyncWriteExt};
895        use tokio::net::TcpListener;
896
897        let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
898        let addr = listener.local_addr().expect("addr");
899        let received: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
900        let received_cl = Arc::clone(&received);
901        tokio::spawn(async move {
902            if let Ok((mut sock, _)) = listener.accept().await {
903                let mut buf = vec![0u8; 16384];
904                let n = sock.read(&mut buf).await.unwrap_or(0);
905                let raw = String::from_utf8_lossy(&buf[..n]).to_string();
906                received_cl.lock().unwrap().push(raw);
907                let _ = sock
908                    .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
909                    .await;
910            }
911        });
912
913        let mgr = Arc::new(NotificationManager::new());
914        mgr.put(
915            "b",
916            NotificationConfig {
917                rules: vec![rule(
918                    "r1",
919                    &[EventType::ObjectCreatedPut],
920                    Destination::Webhook {
921                        url: format!("http://{addr}/hook"),
922                    },
923                    None,
924                    None,
925                )],
926            },
927        );
928
929        dispatch_event(
930            Arc::clone(&mgr),
931            "b".into(),
932            "k.txt".into(),
933            EventType::ObjectCreatedPut,
934            Some(7),
935            Some("\"abc\"".into()),
936            None,
937            "req-1".into(),
938        )
939        .await;
940
941        // The dispatcher detaches via tokio::spawn; poll briefly.
942        for _ in 0..50 {
943            if !received.lock().unwrap().is_empty() {
944                break;
945            }
946            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
947        }
948        let raw = received.lock().unwrap().clone();
949        assert!(!raw.is_empty(), "webhook receiver got nothing");
950        let raw = &raw[0];
951        assert!(raw.contains("POST /hook"), "missing POST line");
952        assert!(raw.contains("s3:ObjectCreated:Put"), "missing event name");
953        assert!(raw.contains("\"k.txt\""), "missing key");
954        assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
955    }
956
957    #[tokio::test]
958    async fn dispatch_event_503_drops_after_retry_budget() {
959        // Receiver that always returns 503 — the dispatcher must retry up
960        // to the configured budget then bump dropped_total exactly once.
961        use std::sync::Mutex;
962        use tokio::io::{AsyncReadExt, AsyncWriteExt};
963        use tokio::net::TcpListener;
964
965        let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
966        let addr = listener.local_addr().expect("addr");
967        let attempt_count: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
968        let attempt_count_cl = Arc::clone(&attempt_count);
969        tokio::spawn(async move {
970            for _ in 0..RETRY_ATTEMPTS {
971                if let Ok((mut sock, _)) = listener.accept().await {
972                    let mut buf = vec![0u8; 16384];
973                    let _ = sock.read(&mut buf).await;
974                    *attempt_count_cl.lock().unwrap() += 1;
975                    let _ = sock
976                        .write_all(b"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n")
977                        .await;
978                }
979            }
980        });
981
982        let mgr = Arc::new(NotificationManager::new());
983        mgr.put(
984            "b",
985            NotificationConfig {
986                rules: vec![rule(
987                    "r1",
988                    &[EventType::ObjectCreatedPut],
989                    Destination::Webhook {
990                        url: format!("http://{addr}/sink"),
991                    },
992                    None,
993                    None,
994                )],
995            },
996        );
997
998        dispatch_event(
999            Arc::clone(&mgr),
1000            "b".into(),
1001            "k".into(),
1002            EventType::ObjectCreatedPut,
1003            None,
1004            None,
1005            None,
1006            "r".into(),
1007        )
1008        .await;
1009
1010        // Wait for the detached task — RETRY_ATTEMPTS attempts plus
1011        // backoff (50ms + 100ms). Cap at 2s so a flaky run doesn't hang.
1012        for _ in 0..100 {
1013            if mgr.dropped_total.load(Ordering::Relaxed) > 0 {
1014                break;
1015            }
1016            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1017        }
1018        assert_eq!(
1019            mgr.dropped_total.load(Ordering::Relaxed),
1020            1,
1021            "drop counter must bump exactly once after retry budget exhausted"
1022        );
1023    }
1024
1025    /// v0.8.4 #77 (audit H-8): a panic inside the `by_bucket` write
1026    /// guard poisons the lock. `to_json` must recover via
1027    /// [`crate::lock_recovery::recover_read`] and surface the data
1028    /// instead of re-panicking on the SIGUSR1 dump-back path.
1029    #[test]
1030    fn notifications_to_json_after_panic_recovers_via_poison() {
1031        let mgr = std::sync::Arc::new(NotificationManager::new());
1032        mgr.put(
1033            "b",
1034            NotificationConfig {
1035                rules: vec![NotificationRule {
1036                    id: "r1".into(),
1037                    events: vec![EventType::ObjectCreatedPut],
1038                    destination: Destination::Webhook {
1039                        url: "http://example.invalid".into(),
1040                    },
1041                    filter_prefix: None,
1042                    filter_suffix: None,
1043                }],
1044            },
1045        );
1046        let mgr_cl = std::sync::Arc::clone(&mgr);
1047        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1048            let mut g = mgr_cl.by_bucket.write().expect("clean lock");
1049            g.entry("b2".into()).or_default();
1050            panic!("force-poison");
1051        }));
1052        assert!(
1053            mgr.by_bucket.is_poisoned(),
1054            "write panic must poison by_bucket lock"
1055        );
1056        let json = mgr.to_json().expect("to_json after poison must succeed");
1057        let mgr2 = NotificationManager::from_json(&json).expect("from_json");
1058        assert!(
1059            mgr2.get("b").is_some(),
1060            "recovered snapshot keeps original config"
1061        );
1062    }
1063}