1use std::collections::HashMap;
51use std::sync::Arc;
52use std::sync::RwLock;
53use std::sync::atomic::{AtomicU64, Ordering};
54
55use serde::{Deserialize, Serialize};
56
57#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
62pub enum EventType {
63 ObjectCreatedPut,
65 ObjectRemovedDelete,
68 ObjectRemovedDeleteMarker,
82}
83
84impl EventType {
85 #[must_use]
88 pub fn as_aws_str(&self) -> &'static str {
89 match self {
90 Self::ObjectCreatedPut => "s3:ObjectCreated:Put",
91 Self::ObjectRemovedDelete => "s3:ObjectRemoved:Delete",
92 Self::ObjectRemovedDeleteMarker => "s3:ObjectRemoved:DeleteMarkerCreated",
93 }
94 }
95
96 #[must_use]
101 pub fn from_aws_str(s: &str) -> Option<Self> {
102 match s {
103 "s3:ObjectCreated:Put" | "s3:ObjectCreated:*" => Some(Self::ObjectCreatedPut),
104 "s3:ObjectRemoved:Delete" => Some(Self::ObjectRemovedDelete),
105 "s3:ObjectRemoved:DeleteMarkerCreated" => Some(Self::ObjectRemovedDeleteMarker),
106 "s3:ObjectRemoved:*" => Some(Self::ObjectRemovedDelete),
107 _ => None,
108 }
109 }
110}
111
112#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
117pub enum Destination {
118 Webhook { url: String },
121 Sqs { queue_arn: String },
123 Sns { topic_arn: String },
125}
126
127impl Destination {
128 #[must_use]
131 pub fn type_tag(&self) -> &'static str {
132 match self {
133 Self::Webhook { .. } => "webhook",
134 Self::Sqs { .. } => "sqs",
135 Self::Sns { .. } => "sns",
136 }
137 }
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
146pub struct NotificationRule {
147 pub id: String,
150 pub events: Vec<EventType>,
153 pub destination: Destination,
155 pub filter_prefix: Option<String>,
159 pub filter_suffix: Option<String>,
162}
163
164#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
166pub struct NotificationConfig {
167 pub rules: Vec<NotificationRule>,
168}
169
170#[derive(Debug, Default, Serialize, Deserialize)]
174struct NotificationSnapshot {
175 by_bucket: HashMap<String, NotificationConfig>,
176}
177
178pub struct NotificationManager {
183 by_bucket: RwLock<HashMap<String, NotificationConfig>>,
184 pub dropped_total: AtomicU64,
188}
189
190impl Default for NotificationManager {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196impl std::fmt::Debug for NotificationManager {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 f.debug_struct("NotificationManager")
199 .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
200 .finish_non_exhaustive()
201 }
202}
203
204impl NotificationManager {
205 #[must_use]
207 pub fn new() -> Self {
208 Self {
209 by_bucket: RwLock::new(HashMap::new()),
210 dropped_total: AtomicU64::new(0),
211 }
212 }
213
214 pub fn put(&self, bucket: &str, config: NotificationConfig) {
218 self.by_bucket
219 .write()
220 .expect("notification state RwLock poisoned")
221 .insert(bucket.to_owned(), config);
222 }
223
224 #[must_use]
229 pub fn get(&self, bucket: &str) -> Option<NotificationConfig> {
230 self.by_bucket
231 .read()
232 .expect("notification state RwLock poisoned")
233 .get(bucket)
234 .cloned()
235 }
236
237 pub fn delete(&self, bucket: &str) {
239 self.by_bucket
240 .write()
241 .expect("notification state RwLock poisoned")
242 .remove(bucket);
243 }
244
245 pub fn to_json(&self) -> Result<String, serde_json::Error> {
248 let snap = NotificationSnapshot {
249 by_bucket: self
250 .by_bucket
251 .read()
252 .expect("notification state RwLock poisoned")
253 .clone(),
254 };
255 serde_json::to_string(&snap)
256 }
257
258 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
262 let snap: NotificationSnapshot = serde_json::from_str(s)?;
263 Ok(Self {
264 by_bucket: RwLock::new(snap.by_bucket),
265 dropped_total: AtomicU64::new(0),
266 })
267 }
268
269 #[must_use]
274 pub fn match_destinations(
275 &self,
276 bucket: &str,
277 event: &EventType,
278 key: &str,
279 ) -> Vec<Destination> {
280 let map = self
281 .by_bucket
282 .read()
283 .expect("notification state RwLock poisoned");
284 let cfg = match map.get(bucket) {
285 Some(c) => c,
286 None => return Vec::new(),
287 };
288 cfg.rules
289 .iter()
290 .filter(|r| rule_matches(r, event, key))
291 .map(|r| r.destination.clone())
292 .collect()
293 }
294}
295
296fn rule_matches(rule: &NotificationRule, event: &EventType, key: &str) -> bool {
297 if !rule.events.iter().any(|e| e == event) {
298 return false;
299 }
300 if let Some(p) = rule.filter_prefix.as_deref()
301 && !p.is_empty()
302 && !key.starts_with(p)
303 {
304 return false;
305 }
306 if let Some(s) = rule.filter_suffix.as_deref()
307 && !s.is_empty()
308 && !key.ends_with(s)
309 {
310 return false;
311 }
312 true
313}
314
315#[must_use]
323#[allow(clippy::too_many_arguments)]
324pub fn build_event_json(
325 bucket: &str,
326 key: &str,
327 event: &EventType,
328 size: Option<u64>,
329 etag: Option<&str>,
330 version_id: Option<&str>,
331 request_id: &str,
332 now: chrono::DateTime<chrono::Utc>,
333) -> String {
334 let etag_clean = etag.map(|e| e.trim_matches('"').to_owned());
337 let mut object = serde_json::json!({
338 "key": key,
339 "sequencer": format!("{:016x}", now.timestamp_micros() as u64),
340 });
341 if let Some(sz) = size {
342 object["size"] = serde_json::json!(sz);
343 }
344 if let Some(ref e) = etag_clean {
345 object["eTag"] = serde_json::json!(e);
346 }
347 if let Some(v) = version_id {
348 object["versionId"] = serde_json::json!(v);
349 }
350 let event_name = event.as_aws_str();
351 let event_source = match event {
352 EventType::ObjectCreatedPut => "ObjectCreated",
353 EventType::ObjectRemovedDelete | EventType::ObjectRemovedDeleteMarker => "ObjectRemoved",
354 };
355 let record = serde_json::json!({
356 "eventVersion": "2.1",
357 "eventSource": "aws:s3",
358 "awsRegion": "us-east-1",
359 "eventTime": now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
360 "eventName": event_name,
361 "userIdentity": { "principalId": "S4" },
362 "requestParameters": { "sourceIPAddress": "0.0.0.0" },
363 "responseElements": {
364 "x-amz-request-id": request_id,
365 "x-amz-id-2": request_id,
366 },
367 "s3": {
368 "s3SchemaVersion": "1.0",
369 "configurationId": "S4-default",
370 "bucket": {
371 "name": bucket,
372 "ownerIdentity": { "principalId": "S4" },
373 "arn": format!("arn:aws:s3:::{bucket}"),
374 },
375 "object": object,
376 },
377 });
378 let _ = event_source; serde_json::json!({ "Records": [record] }).to_string()
380}
381
382const RETRY_ATTEMPTS: u32 = 3;
383const RETRY_BASE_MS: u64 = 50;
384
385#[allow(clippy::too_many_arguments)]
396pub async fn dispatch_event(
397 manager: Arc<NotificationManager>,
398 bucket: String,
399 key: String,
400 event: EventType,
401 size: Option<u64>,
402 etag: Option<String>,
403 version_id: Option<String>,
404 request_id: String,
405) {
406 let dests = manager.match_destinations(&bucket, &event, &key);
407 if dests.is_empty() {
408 return;
409 }
410 let now = chrono::Utc::now();
411 let body = build_event_json(
412 &bucket,
413 &key,
414 &event,
415 size,
416 etag.as_deref(),
417 version_id.as_deref(),
418 &request_id,
419 now,
420 );
421 for dest in dests {
422 let mgr = Arc::clone(&manager);
423 let body = body.clone();
424 tokio::spawn(async move {
425 send_one(mgr, dest, body).await;
426 });
427 }
428}
429
430async fn send_one(manager: Arc<NotificationManager>, dest: Destination, body: String) {
431 match dest {
432 Destination::Webhook { ref url } => {
433 let client = match reqwest::Client::builder()
434 .timeout(std::time::Duration::from_secs(5))
435 .build()
436 {
437 Ok(c) => c,
438 Err(e) => {
439 tracing::warn!(error = %e, "notifications: reqwest client build failed");
440 bump_drop(&manager, dest.type_tag());
441 return;
442 }
443 };
444 for attempt in 0..RETRY_ATTEMPTS {
445 let resp = client
446 .post(url)
447 .header("content-type", "application/json")
448 .body(body.clone())
449 .send()
450 .await;
451 match resp {
452 Ok(r) if r.status().is_success() => return,
453 Ok(r) if r.status().is_server_error() => {
454 if attempt + 1 < RETRY_ATTEMPTS {
456 let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
457 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
458 continue;
459 }
460 tracing::warn!(
461 url = %url,
462 status = %r.status(),
463 "notifications: webhook giving up after {RETRY_ATTEMPTS} attempts"
464 );
465 bump_drop(&manager, "webhook");
466 return;
467 }
468 Ok(r) => {
469 tracing::warn!(
471 url = %url,
472 status = %r.status(),
473 "notifications: webhook permanent failure, dropping"
474 );
475 return;
476 }
477 Err(e) => {
478 if attempt + 1 < RETRY_ATTEMPTS {
480 let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
481 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
482 continue;
483 }
484 tracing::warn!(
485 url = %url,
486 error = %e,
487 "notifications: webhook network failure, dropping after {RETRY_ATTEMPTS} attempts"
488 );
489 bump_drop(&manager, "webhook");
490 return;
491 }
492 }
493 }
494 }
495 Destination::Sqs { ref queue_arn } => {
496 #[cfg(feature = "aws-events")]
497 {
498 send_sqs(&manager, queue_arn, &body).await;
499 }
500 #[cfg(not(feature = "aws-events"))]
501 {
502 let _ = queue_arn;
503 let _ = body;
504 tracing::warn!(
505 "notifications: SQS destination configured but `aws-events` feature is off — dropping"
506 );
507 bump_drop(&manager, "sqs");
508 }
509 }
510 Destination::Sns { ref topic_arn } => {
511 #[cfg(feature = "aws-events")]
512 {
513 send_sns(&manager, topic_arn, &body).await;
514 }
515 #[cfg(not(feature = "aws-events"))]
516 {
517 let _ = topic_arn;
518 let _ = body;
519 tracing::warn!(
520 "notifications: SNS destination configured but `aws-events` feature is off — dropping"
521 );
522 bump_drop(&manager, "sns");
523 }
524 }
525 }
526}
527
528fn bump_drop(manager: &NotificationManager, dest_tag: &'static str) {
529 manager.dropped_total.fetch_add(1, Ordering::Relaxed);
530 crate::metrics::record_notification_drop(dest_tag);
531}
532
533#[cfg(feature = "aws-events")]
534async fn send_sqs(manager: &NotificationManager, queue_arn: &str, body: &str) {
535 let conf = aws_config::load_from_env().await;
536 let client = aws_sdk_sqs::Client::new(&conf);
537 let res = client
543 .send_message()
544 .queue_url(queue_arn)
545 .message_body(body)
546 .send()
547 .await;
548 if let Err(e) = res {
549 tracing::warn!(arn = %queue_arn, error = ?e, "notifications: SQS send failed");
550 bump_drop(manager, "sqs");
551 }
552}
553
554#[cfg(feature = "aws-events")]
555async fn send_sns(manager: &NotificationManager, topic_arn: &str, body: &str) {
556 let conf = aws_config::load_from_env().await;
557 let client = aws_sdk_sns::Client::new(&conf);
558 let res = client
559 .publish()
560 .topic_arn(topic_arn)
561 .message(body)
562 .send()
563 .await;
564 if let Err(e) = res {
565 tracing::warn!(arn = %topic_arn, error = ?e, "notifications: SNS publish failed");
566 bump_drop(manager, "sns");
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573
574 fn rule(
575 id: &str,
576 events: &[EventType],
577 dest: Destination,
578 prefix: Option<&str>,
579 suffix: Option<&str>,
580 ) -> NotificationRule {
581 NotificationRule {
582 id: id.to_owned(),
583 events: events.to_vec(),
584 destination: dest,
585 filter_prefix: prefix.map(str::to_owned),
586 filter_suffix: suffix.map(str::to_owned),
587 }
588 }
589
590 #[test]
591 fn match_destinations_single_rule_event_match() {
592 let mgr = NotificationManager::new();
593 mgr.put(
594 "b",
595 NotificationConfig {
596 rules: vec![rule(
597 "r1",
598 &[EventType::ObjectCreatedPut],
599 Destination::Webhook {
600 url: "http://hook".into(),
601 },
602 None,
603 None,
604 )],
605 },
606 );
607 let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "any/key.txt");
608 assert_eq!(dests.len(), 1, "single rule must fire on event match");
609 }
610
611 #[test]
612 fn match_destinations_prefix_filter() {
613 let mgr = NotificationManager::new();
614 mgr.put(
615 "b",
616 NotificationConfig {
617 rules: vec![rule(
618 "r1",
619 &[EventType::ObjectCreatedPut],
620 Destination::Webhook {
621 url: "http://hook".into(),
622 },
623 Some("uploads/"),
624 None,
625 )],
626 },
627 );
628 assert_eq!(
629 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "uploads/file.bin")
630 .len(),
631 1
632 );
633 assert!(
634 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "logs/file.bin")
635 .is_empty(),
636 "prefix filter must reject non-matching key"
637 );
638 }
639
640 #[test]
641 fn match_destinations_suffix_filter() {
642 let mgr = NotificationManager::new();
643 mgr.put(
644 "b",
645 NotificationConfig {
646 rules: vec![rule(
647 "r1",
648 &[EventType::ObjectCreatedPut],
649 Destination::Webhook {
650 url: "http://hook".into(),
651 },
652 None,
653 Some(".jpg"),
654 )],
655 },
656 );
657 assert_eq!(
658 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "photo.jpg")
659 .len(),
660 1
661 );
662 assert!(
663 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "doc.pdf")
664 .is_empty(),
665 "suffix filter must reject non-matching key"
666 );
667 }
668
669 #[test]
670 fn match_destinations_no_rule_for_bucket() {
671 let mgr = NotificationManager::new();
672 let dests = mgr.match_destinations("ghost", &EventType::ObjectCreatedPut, "k");
673 assert!(dests.is_empty(), "unknown bucket must yield empty vec");
674 }
675
676 #[test]
677 fn match_destinations_event_type_mismatch() {
678 let mgr = NotificationManager::new();
679 mgr.put(
680 "b",
681 NotificationConfig {
682 rules: vec![rule(
683 "r1",
684 &[EventType::ObjectCreatedPut],
685 Destination::Webhook {
686 url: "http://hook".into(),
687 },
688 None,
689 None,
690 )],
691 },
692 );
693 assert!(
694 mgr.match_destinations("b", &EventType::ObjectRemovedDelete, "k")
695 .is_empty(),
696 "mismatched event type must not fire"
697 );
698 }
699
700 #[test]
701 fn match_destinations_multiple_rules_fire_in_order() {
702 let mgr = NotificationManager::new();
703 mgr.put(
704 "b",
705 NotificationConfig {
706 rules: vec![
707 rule(
708 "first",
709 &[EventType::ObjectCreatedPut],
710 Destination::Webhook {
711 url: "http://first".into(),
712 },
713 None,
714 None,
715 ),
716 rule(
717 "second",
718 &[EventType::ObjectCreatedPut],
719 Destination::Webhook {
720 url: "http://second".into(),
721 },
722 None,
723 None,
724 ),
725 ],
726 },
727 );
728 let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "k");
729 assert_eq!(dests.len(), 2, "both matching rules fire");
730 match (&dests[0], &dests[1]) {
731 (Destination::Webhook { url: u1 }, Destination::Webhook { url: u2 }) => {
732 assert_eq!(u1, "http://first");
733 assert_eq!(u2, "http://second");
734 }
735 _ => panic!("expected two webhooks in declaration order"),
736 }
737 }
738
739 #[test]
740 fn build_event_json_schema_matches_aws() {
741 let now =
742 chrono::DateTime::parse_from_rfc3339("2026-05-13T10:00:00Z")
743 .unwrap()
744 .with_timezone(&chrono::Utc);
745 let body = build_event_json(
746 "my-bucket",
747 "uploads/photo.jpg",
748 &EventType::ObjectCreatedPut,
749 Some(12345),
750 Some("\"deadbeef\""),
751 Some("v-001"),
752 "REQ-1",
753 now,
754 );
755 let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
756 let rec = &v["Records"][0];
757 assert_eq!(rec["eventName"], "s3:ObjectCreated:Put");
758 assert_eq!(rec["eventTime"], "2026-05-13T10:00:00.000Z");
759 assert_eq!(rec["s3"]["bucket"]["name"], "my-bucket");
760 assert_eq!(rec["s3"]["object"]["key"], "uploads/photo.jpg");
761 assert_eq!(rec["s3"]["object"]["size"], 12345);
762 assert_eq!(rec["s3"]["object"]["eTag"], "deadbeef");
763 assert_eq!(rec["s3"]["object"]["versionId"], "v-001");
764 }
765
766 #[test]
767 fn build_event_json_omits_optional_fields() {
768 let now = chrono::Utc::now();
769 let body = build_event_json(
770 "b",
771 "k",
772 &EventType::ObjectRemovedDeleteMarker,
773 None,
774 None,
775 None,
776 "r",
777 now,
778 );
779 let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
780 let obj = &v["Records"][0]["s3"]["object"];
781 assert!(obj.get("size").is_none());
782 assert!(obj.get("eTag").is_none());
783 assert!(obj.get("versionId").is_none());
784 }
785
786 #[test]
787 fn json_round_trip() {
788 let mgr = NotificationManager::new();
789 mgr.put(
790 "b",
791 NotificationConfig {
792 rules: vec![rule(
793 "r1",
794 &[
795 EventType::ObjectCreatedPut,
796 EventType::ObjectRemovedDelete,
797 ],
798 Destination::Sqs {
799 queue_arn: "arn:aws:sqs:us-east-1:123:q".into(),
800 },
801 Some("u/"),
802 Some(".jpg"),
803 )],
804 },
805 );
806 let json = mgr.to_json().expect("to_json");
807 let mgr2 = NotificationManager::from_json(&json).expect("from_json");
808 assert_eq!(mgr.get("b"), mgr2.get("b"));
809 }
810
811 #[test]
812 fn delete_is_idempotent() {
813 let mgr = NotificationManager::new();
814 mgr.delete("never-existed");
815 mgr.put(
816 "b",
817 NotificationConfig {
818 rules: vec![rule(
819 "r1",
820 &[EventType::ObjectCreatedPut],
821 Destination::Webhook {
822 url: "http://h".into(),
823 },
824 None,
825 None,
826 )],
827 },
828 );
829 mgr.delete("b");
830 assert!(mgr.get("b").is_none());
831 }
832
833 #[test]
834 fn put_replaces_previous_config() {
835 let mgr = NotificationManager::new();
836 mgr.put(
837 "b",
838 NotificationConfig {
839 rules: vec![rule(
840 "old",
841 &[EventType::ObjectCreatedPut],
842 Destination::Webhook {
843 url: "http://old".into(),
844 },
845 None,
846 None,
847 )],
848 },
849 );
850 mgr.put(
851 "b",
852 NotificationConfig {
853 rules: vec![rule(
854 "new",
855 &[EventType::ObjectRemovedDelete],
856 Destination::Webhook {
857 url: "http://new".into(),
858 },
859 None,
860 None,
861 )],
862 },
863 );
864 let cfg = mgr.get("b").expect("config");
865 assert_eq!(cfg.rules.len(), 1);
866 assert_eq!(cfg.rules[0].id, "new");
867 }
868
869 #[tokio::test]
870 async fn dispatch_event_via_webhook_delivers_payload() {
871 use std::sync::Mutex;
874 use tokio::io::{AsyncReadExt, AsyncWriteExt};
875 use tokio::net::TcpListener;
876
877 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
878 let addr = listener.local_addr().expect("addr");
879 let received: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
880 let received_cl = Arc::clone(&received);
881 tokio::spawn(async move {
882 if let Ok((mut sock, _)) = listener.accept().await {
883 let mut buf = vec![0u8; 16384];
884 let n = sock.read(&mut buf).await.unwrap_or(0);
885 let raw = String::from_utf8_lossy(&buf[..n]).to_string();
886 received_cl.lock().unwrap().push(raw);
887 let _ = sock
888 .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
889 .await;
890 }
891 });
892
893 let mgr = Arc::new(NotificationManager::new());
894 mgr.put(
895 "b",
896 NotificationConfig {
897 rules: vec![rule(
898 "r1",
899 &[EventType::ObjectCreatedPut],
900 Destination::Webhook {
901 url: format!("http://{addr}/hook"),
902 },
903 None,
904 None,
905 )],
906 },
907 );
908
909 dispatch_event(
910 Arc::clone(&mgr),
911 "b".into(),
912 "k.txt".into(),
913 EventType::ObjectCreatedPut,
914 Some(7),
915 Some("\"abc\"".into()),
916 None,
917 "req-1".into(),
918 )
919 .await;
920
921 for _ in 0..50 {
923 if !received.lock().unwrap().is_empty() {
924 break;
925 }
926 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
927 }
928 let raw = received.lock().unwrap().clone();
929 assert!(!raw.is_empty(), "webhook receiver got nothing");
930 let raw = &raw[0];
931 assert!(raw.contains("POST /hook"), "missing POST line");
932 assert!(raw.contains("s3:ObjectCreated:Put"), "missing event name");
933 assert!(raw.contains("\"k.txt\""), "missing key");
934 assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
935 }
936
937 #[tokio::test]
938 async fn dispatch_event_503_drops_after_retry_budget() {
939 use std::sync::Mutex;
942 use tokio::io::{AsyncReadExt, AsyncWriteExt};
943 use tokio::net::TcpListener;
944
945 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
946 let addr = listener.local_addr().expect("addr");
947 let attempt_count: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
948 let attempt_count_cl = Arc::clone(&attempt_count);
949 tokio::spawn(async move {
950 for _ in 0..RETRY_ATTEMPTS {
951 if let Ok((mut sock, _)) = listener.accept().await {
952 let mut buf = vec![0u8; 16384];
953 let _ = sock.read(&mut buf).await;
954 *attempt_count_cl.lock().unwrap() += 1;
955 let _ = sock
956 .write_all(b"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n")
957 .await;
958 }
959 }
960 });
961
962 let mgr = Arc::new(NotificationManager::new());
963 mgr.put(
964 "b",
965 NotificationConfig {
966 rules: vec![rule(
967 "r1",
968 &[EventType::ObjectCreatedPut],
969 Destination::Webhook {
970 url: format!("http://{addr}/sink"),
971 },
972 None,
973 None,
974 )],
975 },
976 );
977
978 dispatch_event(
979 Arc::clone(&mgr),
980 "b".into(),
981 "k".into(),
982 EventType::ObjectCreatedPut,
983 None,
984 None,
985 None,
986 "r".into(),
987 )
988 .await;
989
990 for _ in 0..100 {
993 if mgr.dropped_total.load(Ordering::Relaxed) > 0 {
994 break;
995 }
996 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
997 }
998 assert_eq!(
999 mgr.dropped_total.load(Ordering::Relaxed),
1000 1,
1001 "drop counter must bump exactly once after retry budget exhausted"
1002 );
1003 }
1004}