1use std::collections::HashMap;
51use std::sync::Arc;
52use std::sync::RwLock;
53use std::sync::atomic::{AtomicU64, Ordering};
54
55use serde::{Deserialize, Serialize};
56
57#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
62pub enum EventType {
63 ObjectCreatedPut,
65 ObjectRemovedDelete,
68 ObjectRemovedDeleteMarker,
71}
72
73impl EventType {
74 #[must_use]
77 pub fn as_aws_str(&self) -> &'static str {
78 match self {
79 Self::ObjectCreatedPut => "s3:ObjectCreated:Put",
80 Self::ObjectRemovedDelete => "s3:ObjectRemoved:Delete",
81 Self::ObjectRemovedDeleteMarker => "s3:ObjectRemoved:DeleteMarkerCreated",
82 }
83 }
84
85 #[must_use]
90 pub fn from_aws_str(s: &str) -> Option<Self> {
91 match s {
92 "s3:ObjectCreated:Put" | "s3:ObjectCreated:*" => Some(Self::ObjectCreatedPut),
93 "s3:ObjectRemoved:Delete" => Some(Self::ObjectRemovedDelete),
94 "s3:ObjectRemoved:DeleteMarkerCreated" => Some(Self::ObjectRemovedDeleteMarker),
95 "s3:ObjectRemoved:*" => Some(Self::ObjectRemovedDelete),
96 _ => None,
97 }
98 }
99}
100
101#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
106pub enum Destination {
107 Webhook { url: String },
110 Sqs { queue_arn: String },
112 Sns { topic_arn: String },
114}
115
116impl Destination {
117 #[must_use]
120 pub fn type_tag(&self) -> &'static str {
121 match self {
122 Self::Webhook { .. } => "webhook",
123 Self::Sqs { .. } => "sqs",
124 Self::Sns { .. } => "sns",
125 }
126 }
127}
128
129#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
135pub struct NotificationRule {
136 pub id: String,
139 pub events: Vec<EventType>,
142 pub destination: Destination,
144 pub filter_prefix: Option<String>,
148 pub filter_suffix: Option<String>,
151}
152
153#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
155pub struct NotificationConfig {
156 pub rules: Vec<NotificationRule>,
157}
158
159#[derive(Debug, Default, Serialize, Deserialize)]
163struct NotificationSnapshot {
164 by_bucket: HashMap<String, NotificationConfig>,
165}
166
167pub struct NotificationManager {
172 by_bucket: RwLock<HashMap<String, NotificationConfig>>,
173 pub dropped_total: AtomicU64,
177}
178
179impl Default for NotificationManager {
180 fn default() -> Self {
181 Self::new()
182 }
183}
184
185impl std::fmt::Debug for NotificationManager {
186 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187 f.debug_struct("NotificationManager")
188 .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
189 .finish_non_exhaustive()
190 }
191}
192
193impl NotificationManager {
194 #[must_use]
196 pub fn new() -> Self {
197 Self {
198 by_bucket: RwLock::new(HashMap::new()),
199 dropped_total: AtomicU64::new(0),
200 }
201 }
202
203 pub fn put(&self, bucket: &str, config: NotificationConfig) {
207 self.by_bucket
208 .write()
209 .expect("notification state RwLock poisoned")
210 .insert(bucket.to_owned(), config);
211 }
212
213 #[must_use]
218 pub fn get(&self, bucket: &str) -> Option<NotificationConfig> {
219 self.by_bucket
220 .read()
221 .expect("notification state RwLock poisoned")
222 .get(bucket)
223 .cloned()
224 }
225
226 pub fn delete(&self, bucket: &str) {
228 self.by_bucket
229 .write()
230 .expect("notification state RwLock poisoned")
231 .remove(bucket);
232 }
233
234 pub fn to_json(&self) -> Result<String, serde_json::Error> {
237 let snap = NotificationSnapshot {
238 by_bucket: self
239 .by_bucket
240 .read()
241 .expect("notification state RwLock poisoned")
242 .clone(),
243 };
244 serde_json::to_string(&snap)
245 }
246
247 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
251 let snap: NotificationSnapshot = serde_json::from_str(s)?;
252 Ok(Self {
253 by_bucket: RwLock::new(snap.by_bucket),
254 dropped_total: AtomicU64::new(0),
255 })
256 }
257
258 #[must_use]
263 pub fn match_destinations(
264 &self,
265 bucket: &str,
266 event: &EventType,
267 key: &str,
268 ) -> Vec<Destination> {
269 let map = self
270 .by_bucket
271 .read()
272 .expect("notification state RwLock poisoned");
273 let cfg = match map.get(bucket) {
274 Some(c) => c,
275 None => return Vec::new(),
276 };
277 cfg.rules
278 .iter()
279 .filter(|r| rule_matches(r, event, key))
280 .map(|r| r.destination.clone())
281 .collect()
282 }
283}
284
285fn rule_matches(rule: &NotificationRule, event: &EventType, key: &str) -> bool {
286 if !rule.events.iter().any(|e| e == event) {
287 return false;
288 }
289 if let Some(p) = rule.filter_prefix.as_deref()
290 && !p.is_empty()
291 && !key.starts_with(p)
292 {
293 return false;
294 }
295 if let Some(s) = rule.filter_suffix.as_deref()
296 && !s.is_empty()
297 && !key.ends_with(s)
298 {
299 return false;
300 }
301 true
302}
303
304#[must_use]
312#[allow(clippy::too_many_arguments)]
313pub fn build_event_json(
314 bucket: &str,
315 key: &str,
316 event: &EventType,
317 size: Option<u64>,
318 etag: Option<&str>,
319 version_id: Option<&str>,
320 request_id: &str,
321 now: chrono::DateTime<chrono::Utc>,
322) -> String {
323 let etag_clean = etag.map(|e| e.trim_matches('"').to_owned());
326 let mut object = serde_json::json!({
327 "key": key,
328 "sequencer": format!("{:016x}", now.timestamp_micros() as u64),
329 });
330 if let Some(sz) = size {
331 object["size"] = serde_json::json!(sz);
332 }
333 if let Some(ref e) = etag_clean {
334 object["eTag"] = serde_json::json!(e);
335 }
336 if let Some(v) = version_id {
337 object["versionId"] = serde_json::json!(v);
338 }
339 let event_name = event.as_aws_str();
340 let event_source = match event {
341 EventType::ObjectCreatedPut => "ObjectCreated",
342 EventType::ObjectRemovedDelete | EventType::ObjectRemovedDeleteMarker => "ObjectRemoved",
343 };
344 let record = serde_json::json!({
345 "eventVersion": "2.1",
346 "eventSource": "aws:s3",
347 "awsRegion": "us-east-1",
348 "eventTime": now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
349 "eventName": event_name,
350 "userIdentity": { "principalId": "S4" },
351 "requestParameters": { "sourceIPAddress": "0.0.0.0" },
352 "responseElements": {
353 "x-amz-request-id": request_id,
354 "x-amz-id-2": request_id,
355 },
356 "s3": {
357 "s3SchemaVersion": "1.0",
358 "configurationId": "S4-default",
359 "bucket": {
360 "name": bucket,
361 "ownerIdentity": { "principalId": "S4" },
362 "arn": format!("arn:aws:s3:::{bucket}"),
363 },
364 "object": object,
365 },
366 });
367 let _ = event_source; serde_json::json!({ "Records": [record] }).to_string()
369}
370
371const RETRY_ATTEMPTS: u32 = 3;
372const RETRY_BASE_MS: u64 = 50;
373
374#[allow(clippy::too_many_arguments)]
385pub async fn dispatch_event(
386 manager: Arc<NotificationManager>,
387 bucket: String,
388 key: String,
389 event: EventType,
390 size: Option<u64>,
391 etag: Option<String>,
392 version_id: Option<String>,
393 request_id: String,
394) {
395 let dests = manager.match_destinations(&bucket, &event, &key);
396 if dests.is_empty() {
397 return;
398 }
399 let now = chrono::Utc::now();
400 let body = build_event_json(
401 &bucket,
402 &key,
403 &event,
404 size,
405 etag.as_deref(),
406 version_id.as_deref(),
407 &request_id,
408 now,
409 );
410 for dest in dests {
411 let mgr = Arc::clone(&manager);
412 let body = body.clone();
413 tokio::spawn(async move {
414 send_one(mgr, dest, body).await;
415 });
416 }
417}
418
419async fn send_one(manager: Arc<NotificationManager>, dest: Destination, body: String) {
420 match dest {
421 Destination::Webhook { ref url } => {
422 let client = match reqwest::Client::builder()
423 .timeout(std::time::Duration::from_secs(5))
424 .build()
425 {
426 Ok(c) => c,
427 Err(e) => {
428 tracing::warn!(error = %e, "notifications: reqwest client build failed");
429 bump_drop(&manager, dest.type_tag());
430 return;
431 }
432 };
433 for attempt in 0..RETRY_ATTEMPTS {
434 let resp = client
435 .post(url)
436 .header("content-type", "application/json")
437 .body(body.clone())
438 .send()
439 .await;
440 match resp {
441 Ok(r) if r.status().is_success() => return,
442 Ok(r) if r.status().is_server_error() => {
443 if attempt + 1 < RETRY_ATTEMPTS {
445 let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
446 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
447 continue;
448 }
449 tracing::warn!(
450 url = %url,
451 status = %r.status(),
452 "notifications: webhook giving up after {RETRY_ATTEMPTS} attempts"
453 );
454 bump_drop(&manager, "webhook");
455 return;
456 }
457 Ok(r) => {
458 tracing::warn!(
460 url = %url,
461 status = %r.status(),
462 "notifications: webhook permanent failure, dropping"
463 );
464 return;
465 }
466 Err(e) => {
467 if attempt + 1 < RETRY_ATTEMPTS {
469 let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
470 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
471 continue;
472 }
473 tracing::warn!(
474 url = %url,
475 error = %e,
476 "notifications: webhook network failure, dropping after {RETRY_ATTEMPTS} attempts"
477 );
478 bump_drop(&manager, "webhook");
479 return;
480 }
481 }
482 }
483 }
484 Destination::Sqs { ref queue_arn } => {
485 #[cfg(feature = "aws-events")]
486 {
487 send_sqs(&manager, queue_arn, &body).await;
488 }
489 #[cfg(not(feature = "aws-events"))]
490 {
491 let _ = queue_arn;
492 let _ = body;
493 tracing::warn!(
494 "notifications: SQS destination configured but `aws-events` feature is off — dropping"
495 );
496 bump_drop(&manager, "sqs");
497 }
498 }
499 Destination::Sns { ref topic_arn } => {
500 #[cfg(feature = "aws-events")]
501 {
502 send_sns(&manager, topic_arn, &body).await;
503 }
504 #[cfg(not(feature = "aws-events"))]
505 {
506 let _ = topic_arn;
507 let _ = body;
508 tracing::warn!(
509 "notifications: SNS destination configured but `aws-events` feature is off — dropping"
510 );
511 bump_drop(&manager, "sns");
512 }
513 }
514 }
515}
516
517fn bump_drop(manager: &NotificationManager, dest_tag: &'static str) {
518 manager.dropped_total.fetch_add(1, Ordering::Relaxed);
519 crate::metrics::record_notification_drop(dest_tag);
520}
521
522#[cfg(feature = "aws-events")]
523async fn send_sqs(manager: &NotificationManager, queue_arn: &str, body: &str) {
524 let conf = aws_config::load_from_env().await;
525 let client = aws_sdk_sqs::Client::new(&conf);
526 let res = client
532 .send_message()
533 .queue_url(queue_arn)
534 .message_body(body)
535 .send()
536 .await;
537 if let Err(e) = res {
538 tracing::warn!(arn = %queue_arn, error = ?e, "notifications: SQS send failed");
539 bump_drop(manager, "sqs");
540 }
541}
542
543#[cfg(feature = "aws-events")]
544async fn send_sns(manager: &NotificationManager, topic_arn: &str, body: &str) {
545 let conf = aws_config::load_from_env().await;
546 let client = aws_sdk_sns::Client::new(&conf);
547 let res = client
548 .publish()
549 .topic_arn(topic_arn)
550 .message(body)
551 .send()
552 .await;
553 if let Err(e) = res {
554 tracing::warn!(arn = %topic_arn, error = ?e, "notifications: SNS publish failed");
555 bump_drop(manager, "sns");
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562
563 fn rule(
564 id: &str,
565 events: &[EventType],
566 dest: Destination,
567 prefix: Option<&str>,
568 suffix: Option<&str>,
569 ) -> NotificationRule {
570 NotificationRule {
571 id: id.to_owned(),
572 events: events.to_vec(),
573 destination: dest,
574 filter_prefix: prefix.map(str::to_owned),
575 filter_suffix: suffix.map(str::to_owned),
576 }
577 }
578
579 #[test]
580 fn match_destinations_single_rule_event_match() {
581 let mgr = NotificationManager::new();
582 mgr.put(
583 "b",
584 NotificationConfig {
585 rules: vec![rule(
586 "r1",
587 &[EventType::ObjectCreatedPut],
588 Destination::Webhook {
589 url: "http://hook".into(),
590 },
591 None,
592 None,
593 )],
594 },
595 );
596 let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "any/key.txt");
597 assert_eq!(dests.len(), 1, "single rule must fire on event match");
598 }
599
600 #[test]
601 fn match_destinations_prefix_filter() {
602 let mgr = NotificationManager::new();
603 mgr.put(
604 "b",
605 NotificationConfig {
606 rules: vec![rule(
607 "r1",
608 &[EventType::ObjectCreatedPut],
609 Destination::Webhook {
610 url: "http://hook".into(),
611 },
612 Some("uploads/"),
613 None,
614 )],
615 },
616 );
617 assert_eq!(
618 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "uploads/file.bin")
619 .len(),
620 1
621 );
622 assert!(
623 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "logs/file.bin")
624 .is_empty(),
625 "prefix filter must reject non-matching key"
626 );
627 }
628
629 #[test]
630 fn match_destinations_suffix_filter() {
631 let mgr = NotificationManager::new();
632 mgr.put(
633 "b",
634 NotificationConfig {
635 rules: vec![rule(
636 "r1",
637 &[EventType::ObjectCreatedPut],
638 Destination::Webhook {
639 url: "http://hook".into(),
640 },
641 None,
642 Some(".jpg"),
643 )],
644 },
645 );
646 assert_eq!(
647 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "photo.jpg")
648 .len(),
649 1
650 );
651 assert!(
652 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "doc.pdf")
653 .is_empty(),
654 "suffix filter must reject non-matching key"
655 );
656 }
657
658 #[test]
659 fn match_destinations_no_rule_for_bucket() {
660 let mgr = NotificationManager::new();
661 let dests = mgr.match_destinations("ghost", &EventType::ObjectCreatedPut, "k");
662 assert!(dests.is_empty(), "unknown bucket must yield empty vec");
663 }
664
665 #[test]
666 fn match_destinations_event_type_mismatch() {
667 let mgr = NotificationManager::new();
668 mgr.put(
669 "b",
670 NotificationConfig {
671 rules: vec![rule(
672 "r1",
673 &[EventType::ObjectCreatedPut],
674 Destination::Webhook {
675 url: "http://hook".into(),
676 },
677 None,
678 None,
679 )],
680 },
681 );
682 assert!(
683 mgr.match_destinations("b", &EventType::ObjectRemovedDelete, "k")
684 .is_empty(),
685 "mismatched event type must not fire"
686 );
687 }
688
689 #[test]
690 fn match_destinations_multiple_rules_fire_in_order() {
691 let mgr = NotificationManager::new();
692 mgr.put(
693 "b",
694 NotificationConfig {
695 rules: vec![
696 rule(
697 "first",
698 &[EventType::ObjectCreatedPut],
699 Destination::Webhook {
700 url: "http://first".into(),
701 },
702 None,
703 None,
704 ),
705 rule(
706 "second",
707 &[EventType::ObjectCreatedPut],
708 Destination::Webhook {
709 url: "http://second".into(),
710 },
711 None,
712 None,
713 ),
714 ],
715 },
716 );
717 let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "k");
718 assert_eq!(dests.len(), 2, "both matching rules fire");
719 match (&dests[0], &dests[1]) {
720 (Destination::Webhook { url: u1 }, Destination::Webhook { url: u2 }) => {
721 assert_eq!(u1, "http://first");
722 assert_eq!(u2, "http://second");
723 }
724 _ => panic!("expected two webhooks in declaration order"),
725 }
726 }
727
728 #[test]
729 fn build_event_json_schema_matches_aws() {
730 let now =
731 chrono::DateTime::parse_from_rfc3339("2026-05-13T10:00:00Z")
732 .unwrap()
733 .with_timezone(&chrono::Utc);
734 let body = build_event_json(
735 "my-bucket",
736 "uploads/photo.jpg",
737 &EventType::ObjectCreatedPut,
738 Some(12345),
739 Some("\"deadbeef\""),
740 Some("v-001"),
741 "REQ-1",
742 now,
743 );
744 let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
745 let rec = &v["Records"][0];
746 assert_eq!(rec["eventName"], "s3:ObjectCreated:Put");
747 assert_eq!(rec["eventTime"], "2026-05-13T10:00:00.000Z");
748 assert_eq!(rec["s3"]["bucket"]["name"], "my-bucket");
749 assert_eq!(rec["s3"]["object"]["key"], "uploads/photo.jpg");
750 assert_eq!(rec["s3"]["object"]["size"], 12345);
751 assert_eq!(rec["s3"]["object"]["eTag"], "deadbeef");
752 assert_eq!(rec["s3"]["object"]["versionId"], "v-001");
753 }
754
755 #[test]
756 fn build_event_json_omits_optional_fields() {
757 let now = chrono::Utc::now();
758 let body = build_event_json(
759 "b",
760 "k",
761 &EventType::ObjectRemovedDeleteMarker,
762 None,
763 None,
764 None,
765 "r",
766 now,
767 );
768 let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
769 let obj = &v["Records"][0]["s3"]["object"];
770 assert!(obj.get("size").is_none());
771 assert!(obj.get("eTag").is_none());
772 assert!(obj.get("versionId").is_none());
773 }
774
775 #[test]
776 fn json_round_trip() {
777 let mgr = NotificationManager::new();
778 mgr.put(
779 "b",
780 NotificationConfig {
781 rules: vec![rule(
782 "r1",
783 &[
784 EventType::ObjectCreatedPut,
785 EventType::ObjectRemovedDelete,
786 ],
787 Destination::Sqs {
788 queue_arn: "arn:aws:sqs:us-east-1:123:q".into(),
789 },
790 Some("u/"),
791 Some(".jpg"),
792 )],
793 },
794 );
795 let json = mgr.to_json().expect("to_json");
796 let mgr2 = NotificationManager::from_json(&json).expect("from_json");
797 assert_eq!(mgr.get("b"), mgr2.get("b"));
798 }
799
800 #[test]
801 fn delete_is_idempotent() {
802 let mgr = NotificationManager::new();
803 mgr.delete("never-existed");
804 mgr.put(
805 "b",
806 NotificationConfig {
807 rules: vec![rule(
808 "r1",
809 &[EventType::ObjectCreatedPut],
810 Destination::Webhook {
811 url: "http://h".into(),
812 },
813 None,
814 None,
815 )],
816 },
817 );
818 mgr.delete("b");
819 assert!(mgr.get("b").is_none());
820 }
821
822 #[test]
823 fn put_replaces_previous_config() {
824 let mgr = NotificationManager::new();
825 mgr.put(
826 "b",
827 NotificationConfig {
828 rules: vec![rule(
829 "old",
830 &[EventType::ObjectCreatedPut],
831 Destination::Webhook {
832 url: "http://old".into(),
833 },
834 None,
835 None,
836 )],
837 },
838 );
839 mgr.put(
840 "b",
841 NotificationConfig {
842 rules: vec![rule(
843 "new",
844 &[EventType::ObjectRemovedDelete],
845 Destination::Webhook {
846 url: "http://new".into(),
847 },
848 None,
849 None,
850 )],
851 },
852 );
853 let cfg = mgr.get("b").expect("config");
854 assert_eq!(cfg.rules.len(), 1);
855 assert_eq!(cfg.rules[0].id, "new");
856 }
857
858 #[tokio::test]
859 async fn dispatch_event_via_webhook_delivers_payload() {
860 use std::sync::Mutex;
863 use tokio::io::{AsyncReadExt, AsyncWriteExt};
864 use tokio::net::TcpListener;
865
866 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
867 let addr = listener.local_addr().expect("addr");
868 let received: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
869 let received_cl = Arc::clone(&received);
870 tokio::spawn(async move {
871 if let Ok((mut sock, _)) = listener.accept().await {
872 let mut buf = vec![0u8; 16384];
873 let n = sock.read(&mut buf).await.unwrap_or(0);
874 let raw = String::from_utf8_lossy(&buf[..n]).to_string();
875 received_cl.lock().unwrap().push(raw);
876 let _ = sock
877 .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
878 .await;
879 }
880 });
881
882 let mgr = Arc::new(NotificationManager::new());
883 mgr.put(
884 "b",
885 NotificationConfig {
886 rules: vec![rule(
887 "r1",
888 &[EventType::ObjectCreatedPut],
889 Destination::Webhook {
890 url: format!("http://{addr}/hook"),
891 },
892 None,
893 None,
894 )],
895 },
896 );
897
898 dispatch_event(
899 Arc::clone(&mgr),
900 "b".into(),
901 "k.txt".into(),
902 EventType::ObjectCreatedPut,
903 Some(7),
904 Some("\"abc\"".into()),
905 None,
906 "req-1".into(),
907 )
908 .await;
909
910 for _ in 0..50 {
912 if !received.lock().unwrap().is_empty() {
913 break;
914 }
915 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
916 }
917 let raw = received.lock().unwrap().clone();
918 assert!(!raw.is_empty(), "webhook receiver got nothing");
919 let raw = &raw[0];
920 assert!(raw.contains("POST /hook"), "missing POST line");
921 assert!(raw.contains("s3:ObjectCreated:Put"), "missing event name");
922 assert!(raw.contains("\"k.txt\""), "missing key");
923 assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
924 }
925
926 #[tokio::test]
927 async fn dispatch_event_503_drops_after_retry_budget() {
928 use std::sync::Mutex;
931 use tokio::io::{AsyncReadExt, AsyncWriteExt};
932 use tokio::net::TcpListener;
933
934 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
935 let addr = listener.local_addr().expect("addr");
936 let attempt_count: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
937 let attempt_count_cl = Arc::clone(&attempt_count);
938 tokio::spawn(async move {
939 for _ in 0..RETRY_ATTEMPTS {
940 if let Ok((mut sock, _)) = listener.accept().await {
941 let mut buf = vec![0u8; 16384];
942 let _ = sock.read(&mut buf).await;
943 *attempt_count_cl.lock().unwrap() += 1;
944 let _ = sock
945 .write_all(b"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n")
946 .await;
947 }
948 }
949 });
950
951 let mgr = Arc::new(NotificationManager::new());
952 mgr.put(
953 "b",
954 NotificationConfig {
955 rules: vec![rule(
956 "r1",
957 &[EventType::ObjectCreatedPut],
958 Destination::Webhook {
959 url: format!("http://{addr}/sink"),
960 },
961 None,
962 None,
963 )],
964 },
965 );
966
967 dispatch_event(
968 Arc::clone(&mgr),
969 "b".into(),
970 "k".into(),
971 EventType::ObjectCreatedPut,
972 None,
973 None,
974 None,
975 "r".into(),
976 )
977 .await;
978
979 for _ in 0..100 {
982 if mgr.dropped_total.load(Ordering::Relaxed) > 0 {
983 break;
984 }
985 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
986 }
987 assert_eq!(
988 mgr.dropped_total.load(Ordering::Relaxed),
989 1,
990 "drop counter must bump exactly once after retry budget exhausted"
991 );
992 }
993}