1use std::collections::HashMap;
51use std::sync::Arc;
52use std::sync::RwLock;
53use std::sync::atomic::{AtomicU64, Ordering};
54
55use serde::{Deserialize, Serialize};
56
57#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
62pub enum EventType {
63 ObjectCreatedPut,
65 ObjectRemovedDelete,
68 ObjectRemovedDeleteMarker,
82}
83
84impl EventType {
85 #[must_use]
88 pub fn as_aws_str(&self) -> &'static str {
89 match self {
90 Self::ObjectCreatedPut => "s3:ObjectCreated:Put",
91 Self::ObjectRemovedDelete => "s3:ObjectRemoved:Delete",
92 Self::ObjectRemovedDeleteMarker => "s3:ObjectRemoved:DeleteMarkerCreated",
93 }
94 }
95
96 #[must_use]
101 pub fn from_aws_str(s: &str) -> Option<Self> {
102 match s {
103 "s3:ObjectCreated:Put" | "s3:ObjectCreated:*" => Some(Self::ObjectCreatedPut),
104 "s3:ObjectRemoved:Delete" => Some(Self::ObjectRemovedDelete),
105 "s3:ObjectRemoved:DeleteMarkerCreated" => Some(Self::ObjectRemovedDeleteMarker),
106 "s3:ObjectRemoved:*" => Some(Self::ObjectRemovedDelete),
107 _ => None,
108 }
109 }
110}
111
112#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
117pub enum Destination {
118 Webhook { url: String },
121 Sqs { queue_arn: String },
123 Sns { topic_arn: String },
125}
126
127impl Destination {
128 #[must_use]
131 pub fn type_tag(&self) -> &'static str {
132 match self {
133 Self::Webhook { .. } => "webhook",
134 Self::Sqs { .. } => "sqs",
135 Self::Sns { .. } => "sns",
136 }
137 }
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
146pub struct NotificationRule {
147 pub id: String,
150 pub events: Vec<EventType>,
153 pub destination: Destination,
155 pub filter_prefix: Option<String>,
159 pub filter_suffix: Option<String>,
162}
163
164#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
166pub struct NotificationConfig {
167 pub rules: Vec<NotificationRule>,
168}
169
170#[derive(Debug, Default, Serialize, Deserialize)]
174struct NotificationSnapshot {
175 by_bucket: HashMap<String, NotificationConfig>,
176}
177
178pub struct NotificationManager {
183 by_bucket: RwLock<HashMap<String, NotificationConfig>>,
184 pub dropped_total: AtomicU64,
188}
189
190impl Default for NotificationManager {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196impl std::fmt::Debug for NotificationManager {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 f.debug_struct("NotificationManager")
199 .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
200 .finish_non_exhaustive()
201 }
202}
203
204impl NotificationManager {
205 #[must_use]
207 pub fn new() -> Self {
208 Self {
209 by_bucket: RwLock::new(HashMap::new()),
210 dropped_total: AtomicU64::new(0),
211 }
212 }
213
214 pub fn put(&self, bucket: &str, config: NotificationConfig) {
218 crate::lock_recovery::recover_write(&self.by_bucket, "notifications.by_bucket")
219 .insert(bucket.to_owned(), config);
220 }
221
222 #[must_use]
227 pub fn get(&self, bucket: &str) -> Option<NotificationConfig> {
228 crate::lock_recovery::recover_read(&self.by_bucket, "notifications.by_bucket")
229 .get(bucket)
230 .cloned()
231 }
232
233 pub fn delete(&self, bucket: &str) {
235 crate::lock_recovery::recover_write(&self.by_bucket, "notifications.by_bucket")
236 .remove(bucket);
237 }
238
239 pub fn to_json(&self) -> Result<String, serde_json::Error> {
242 let snap = NotificationSnapshot {
243 by_bucket: crate::lock_recovery::recover_read(
244 &self.by_bucket,
245 "notifications.by_bucket",
246 )
247 .clone(),
248 };
249 serde_json::to_string(&snap)
250 }
251
252 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
256 let snap: NotificationSnapshot = serde_json::from_str(s)?;
257 Ok(Self {
258 by_bucket: RwLock::new(snap.by_bucket),
259 dropped_total: AtomicU64::new(0),
260 })
261 }
262
263 #[must_use]
268 pub fn match_destinations(
269 &self,
270 bucket: &str,
271 event: &EventType,
272 key: &str,
273 ) -> Vec<Destination> {
274 let map = crate::lock_recovery::recover_read(&self.by_bucket, "notifications.by_bucket");
275 let cfg = match map.get(bucket) {
276 Some(c) => c,
277 None => return Vec::new(),
278 };
279 cfg.rules
280 .iter()
281 .filter(|r| rule_matches(r, event, key))
282 .map(|r| r.destination.clone())
283 .collect()
284 }
285}
286
287fn rule_matches(rule: &NotificationRule, event: &EventType, key: &str) -> bool {
288 if !rule.events.iter().any(|e| e == event) {
289 return false;
290 }
291 if let Some(p) = rule.filter_prefix.as_deref()
292 && !p.is_empty()
293 && !key.starts_with(p)
294 {
295 return false;
296 }
297 if let Some(s) = rule.filter_suffix.as_deref()
298 && !s.is_empty()
299 && !key.ends_with(s)
300 {
301 return false;
302 }
303 true
304}
305
306#[must_use]
314#[allow(clippy::too_many_arguments)]
315pub fn build_event_json(
316 bucket: &str,
317 key: &str,
318 event: &EventType,
319 size: Option<u64>,
320 etag: Option<&str>,
321 version_id: Option<&str>,
322 request_id: &str,
323 now: chrono::DateTime<chrono::Utc>,
324) -> String {
325 let etag_clean = etag.map(|e| e.trim_matches('"').to_owned());
328 let mut object = serde_json::json!({
329 "key": key,
330 "sequencer": format!("{:016x}", now.timestamp_micros() as u64),
331 });
332 if let Some(sz) = size {
333 object["size"] = serde_json::json!(sz);
334 }
335 if let Some(ref e) = etag_clean {
336 object["eTag"] = serde_json::json!(e);
337 }
338 if let Some(v) = version_id {
339 object["versionId"] = serde_json::json!(v);
340 }
341 let event_name = event.as_aws_str();
342 let event_source = match event {
343 EventType::ObjectCreatedPut => "ObjectCreated",
344 EventType::ObjectRemovedDelete | EventType::ObjectRemovedDeleteMarker => "ObjectRemoved",
345 };
346 let record = serde_json::json!({
347 "eventVersion": "2.1",
348 "eventSource": "aws:s3",
349 "awsRegion": "us-east-1",
350 "eventTime": now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
351 "eventName": event_name,
352 "userIdentity": { "principalId": "S4" },
353 "requestParameters": { "sourceIPAddress": "0.0.0.0" },
354 "responseElements": {
355 "x-amz-request-id": request_id,
356 "x-amz-id-2": request_id,
357 },
358 "s3": {
359 "s3SchemaVersion": "1.0",
360 "configurationId": "S4-default",
361 "bucket": {
362 "name": bucket,
363 "ownerIdentity": { "principalId": "S4" },
364 "arn": format!("arn:aws:s3:::{bucket}"),
365 },
366 "object": object,
367 },
368 });
369 let _ = event_source; serde_json::json!({ "Records": [record] }).to_string()
371}
372
373const RETRY_ATTEMPTS: u32 = 3;
374const RETRY_BASE_MS: u64 = 50;
375
376#[allow(clippy::too_many_arguments)]
387pub async fn dispatch_event(
388 manager: Arc<NotificationManager>,
389 bucket: String,
390 key: String,
391 event: EventType,
392 size: Option<u64>,
393 etag: Option<String>,
394 version_id: Option<String>,
395 request_id: String,
396) {
397 let dests = manager.match_destinations(&bucket, &event, &key);
398 if dests.is_empty() {
399 return;
400 }
401 let now = chrono::Utc::now();
402 let body = build_event_json(
403 &bucket,
404 &key,
405 &event,
406 size,
407 etag.as_deref(),
408 version_id.as_deref(),
409 &request_id,
410 now,
411 );
412 for dest in dests {
413 let mgr = Arc::clone(&manager);
414 let body = body.clone();
415 tokio::spawn(async move {
416 send_one(mgr, dest, body).await;
417 });
418 }
419}
420
421async fn send_one(manager: Arc<NotificationManager>, dest: Destination, body: String) {
422 match dest {
423 Destination::Webhook { ref url } => {
424 let client = match reqwest::Client::builder()
425 .timeout(std::time::Duration::from_secs(5))
426 .build()
427 {
428 Ok(c) => c,
429 Err(e) => {
430 tracing::warn!(error = %e, "notifications: reqwest client build failed");
431 bump_drop(&manager, dest.type_tag());
432 return;
433 }
434 };
435 for attempt in 0..RETRY_ATTEMPTS {
436 let resp = client
437 .post(url)
438 .header("content-type", "application/json")
439 .body(body.clone())
440 .send()
441 .await;
442 match resp {
443 Ok(r) if r.status().is_success() => return,
444 Ok(r) if r.status().is_server_error() => {
445 if attempt + 1 < RETRY_ATTEMPTS {
447 let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
448 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
449 continue;
450 }
451 tracing::warn!(
452 url = %url,
453 status = %r.status(),
454 "notifications: webhook giving up after {RETRY_ATTEMPTS} attempts"
455 );
456 bump_drop(&manager, "webhook");
457 return;
458 }
459 Ok(r) => {
460 tracing::warn!(
462 url = %url,
463 status = %r.status(),
464 "notifications: webhook permanent failure, dropping"
465 );
466 return;
467 }
468 Err(e) => {
469 if attempt + 1 < RETRY_ATTEMPTS {
471 let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
472 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
473 continue;
474 }
475 tracing::warn!(
476 url = %url,
477 error = %e,
478 "notifications: webhook network failure, dropping after {RETRY_ATTEMPTS} attempts"
479 );
480 bump_drop(&manager, "webhook");
481 return;
482 }
483 }
484 }
485 }
486 Destination::Sqs { ref queue_arn } => {
487 #[cfg(feature = "aws-events")]
488 {
489 send_sqs(&manager, queue_arn, &body).await;
490 }
491 #[cfg(not(feature = "aws-events"))]
492 {
493 let _ = queue_arn;
494 let _ = body;
495 tracing::warn!(
496 "notifications: SQS destination configured but `aws-events` feature is off — dropping"
497 );
498 bump_drop(&manager, "sqs");
499 }
500 }
501 Destination::Sns { ref topic_arn } => {
502 #[cfg(feature = "aws-events")]
503 {
504 send_sns(&manager, topic_arn, &body).await;
505 }
506 #[cfg(not(feature = "aws-events"))]
507 {
508 let _ = topic_arn;
509 let _ = body;
510 tracing::warn!(
511 "notifications: SNS destination configured but `aws-events` feature is off — dropping"
512 );
513 bump_drop(&manager, "sns");
514 }
515 }
516 }
517}
518
519fn bump_drop(manager: &NotificationManager, dest_tag: &'static str) {
520 manager.dropped_total.fetch_add(1, Ordering::Relaxed);
521 crate::metrics::record_notification_drop(dest_tag);
522}
523
524#[cfg(feature = "aws-events")]
525async fn send_sqs(manager: &NotificationManager, queue_arn: &str, body: &str) {
526 let conf = aws_config::load_from_env().await;
527 let client = aws_sdk_sqs::Client::new(&conf);
528 let res = client
534 .send_message()
535 .queue_url(queue_arn)
536 .message_body(body)
537 .send()
538 .await;
539 if let Err(e) = res {
540 tracing::warn!(arn = %queue_arn, error = ?e, "notifications: SQS send failed");
541 bump_drop(manager, "sqs");
542 }
543}
544
545#[cfg(feature = "aws-events")]
546async fn send_sns(manager: &NotificationManager, topic_arn: &str, body: &str) {
547 let conf = aws_config::load_from_env().await;
548 let client = aws_sdk_sns::Client::new(&conf);
549 let res = client
550 .publish()
551 .topic_arn(topic_arn)
552 .message(body)
553 .send()
554 .await;
555 if let Err(e) = res {
556 tracing::warn!(arn = %topic_arn, error = ?e, "notifications: SNS publish failed");
557 bump_drop(manager, "sns");
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564
565 fn rule(
566 id: &str,
567 events: &[EventType],
568 dest: Destination,
569 prefix: Option<&str>,
570 suffix: Option<&str>,
571 ) -> NotificationRule {
572 NotificationRule {
573 id: id.to_owned(),
574 events: events.to_vec(),
575 destination: dest,
576 filter_prefix: prefix.map(str::to_owned),
577 filter_suffix: suffix.map(str::to_owned),
578 }
579 }
580
581 #[test]
582 fn match_destinations_single_rule_event_match() {
583 let mgr = NotificationManager::new();
584 mgr.put(
585 "b",
586 NotificationConfig {
587 rules: vec![rule(
588 "r1",
589 &[EventType::ObjectCreatedPut],
590 Destination::Webhook {
591 url: "http://hook".into(),
592 },
593 None,
594 None,
595 )],
596 },
597 );
598 let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "any/key.txt");
599 assert_eq!(dests.len(), 1, "single rule must fire on event match");
600 }
601
602 #[test]
603 fn match_destinations_prefix_filter() {
604 let mgr = NotificationManager::new();
605 mgr.put(
606 "b",
607 NotificationConfig {
608 rules: vec![rule(
609 "r1",
610 &[EventType::ObjectCreatedPut],
611 Destination::Webhook {
612 url: "http://hook".into(),
613 },
614 Some("uploads/"),
615 None,
616 )],
617 },
618 );
619 assert_eq!(
620 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "uploads/file.bin")
621 .len(),
622 1
623 );
624 assert!(
625 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "logs/file.bin")
626 .is_empty(),
627 "prefix filter must reject non-matching key"
628 );
629 }
630
631 #[test]
632 fn match_destinations_suffix_filter() {
633 let mgr = NotificationManager::new();
634 mgr.put(
635 "b",
636 NotificationConfig {
637 rules: vec![rule(
638 "r1",
639 &[EventType::ObjectCreatedPut],
640 Destination::Webhook {
641 url: "http://hook".into(),
642 },
643 None,
644 Some(".jpg"),
645 )],
646 },
647 );
648 assert_eq!(
649 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "photo.jpg")
650 .len(),
651 1
652 );
653 assert!(
654 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "doc.pdf")
655 .is_empty(),
656 "suffix filter must reject non-matching key"
657 );
658 }
659
660 #[test]
661 fn match_destinations_no_rule_for_bucket() {
662 let mgr = NotificationManager::new();
663 let dests = mgr.match_destinations("ghost", &EventType::ObjectCreatedPut, "k");
664 assert!(dests.is_empty(), "unknown bucket must yield empty vec");
665 }
666
667 #[test]
668 fn match_destinations_event_type_mismatch() {
669 let mgr = NotificationManager::new();
670 mgr.put(
671 "b",
672 NotificationConfig {
673 rules: vec![rule(
674 "r1",
675 &[EventType::ObjectCreatedPut],
676 Destination::Webhook {
677 url: "http://hook".into(),
678 },
679 None,
680 None,
681 )],
682 },
683 );
684 assert!(
685 mgr.match_destinations("b", &EventType::ObjectRemovedDelete, "k")
686 .is_empty(),
687 "mismatched event type must not fire"
688 );
689 }
690
691 #[test]
692 fn match_destinations_multiple_rules_fire_in_order() {
693 let mgr = NotificationManager::new();
694 mgr.put(
695 "b",
696 NotificationConfig {
697 rules: vec![
698 rule(
699 "first",
700 &[EventType::ObjectCreatedPut],
701 Destination::Webhook {
702 url: "http://first".into(),
703 },
704 None,
705 None,
706 ),
707 rule(
708 "second",
709 &[EventType::ObjectCreatedPut],
710 Destination::Webhook {
711 url: "http://second".into(),
712 },
713 None,
714 None,
715 ),
716 ],
717 },
718 );
719 let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "k");
720 assert_eq!(dests.len(), 2, "both matching rules fire");
721 match (&dests[0], &dests[1]) {
722 (Destination::Webhook { url: u1 }, Destination::Webhook { url: u2 }) => {
723 assert_eq!(u1, "http://first");
724 assert_eq!(u2, "http://second");
725 }
726 _ => panic!("expected two webhooks in declaration order"),
727 }
728 }
729
730 #[test]
731 fn build_event_json_schema_matches_aws() {
732 let now = chrono::DateTime::parse_from_rfc3339("2026-05-13T10:00:00Z")
733 .unwrap()
734 .with_timezone(&chrono::Utc);
735 let body = build_event_json(
736 "my-bucket",
737 "uploads/photo.jpg",
738 &EventType::ObjectCreatedPut,
739 Some(12345),
740 Some("\"deadbeef\""),
741 Some("v-001"),
742 "REQ-1",
743 now,
744 );
745 let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
746 let rec = &v["Records"][0];
747 assert_eq!(rec["eventName"], "s3:ObjectCreated:Put");
748 assert_eq!(rec["eventTime"], "2026-05-13T10:00:00.000Z");
749 assert_eq!(rec["s3"]["bucket"]["name"], "my-bucket");
750 assert_eq!(rec["s3"]["object"]["key"], "uploads/photo.jpg");
751 assert_eq!(rec["s3"]["object"]["size"], 12345);
752 assert_eq!(rec["s3"]["object"]["eTag"], "deadbeef");
753 assert_eq!(rec["s3"]["object"]["versionId"], "v-001");
754 }
755
756 #[test]
757 fn build_event_json_omits_optional_fields() {
758 let now = chrono::Utc::now();
759 let body = build_event_json(
760 "b",
761 "k",
762 &EventType::ObjectRemovedDeleteMarker,
763 None,
764 None,
765 None,
766 "r",
767 now,
768 );
769 let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
770 let obj = &v["Records"][0]["s3"]["object"];
771 assert!(obj.get("size").is_none());
772 assert!(obj.get("eTag").is_none());
773 assert!(obj.get("versionId").is_none());
774 }
775
776 #[test]
777 fn json_round_trip() {
778 let mgr = NotificationManager::new();
779 mgr.put(
780 "b",
781 NotificationConfig {
782 rules: vec![rule(
783 "r1",
784 &[EventType::ObjectCreatedPut, EventType::ObjectRemovedDelete],
785 Destination::Sqs {
786 queue_arn: "arn:aws:sqs:us-east-1:123:q".into(),
787 },
788 Some("u/"),
789 Some(".jpg"),
790 )],
791 },
792 );
793 let json = mgr.to_json().expect("to_json");
794 let mgr2 = NotificationManager::from_json(&json).expect("from_json");
795 assert_eq!(mgr.get("b"), mgr2.get("b"));
796 }
797
798 #[test]
799 fn delete_is_idempotent() {
800 let mgr = NotificationManager::new();
801 mgr.delete("never-existed");
802 mgr.put(
803 "b",
804 NotificationConfig {
805 rules: vec![rule(
806 "r1",
807 &[EventType::ObjectCreatedPut],
808 Destination::Webhook {
809 url: "http://h".into(),
810 },
811 None,
812 None,
813 )],
814 },
815 );
816 mgr.delete("b");
817 assert!(mgr.get("b").is_none());
818 }
819
820 #[test]
821 fn put_replaces_previous_config() {
822 let mgr = NotificationManager::new();
823 mgr.put(
824 "b",
825 NotificationConfig {
826 rules: vec![rule(
827 "old",
828 &[EventType::ObjectCreatedPut],
829 Destination::Webhook {
830 url: "http://old".into(),
831 },
832 None,
833 None,
834 )],
835 },
836 );
837 mgr.put(
838 "b",
839 NotificationConfig {
840 rules: vec![rule(
841 "new",
842 &[EventType::ObjectRemovedDelete],
843 Destination::Webhook {
844 url: "http://new".into(),
845 },
846 None,
847 None,
848 )],
849 },
850 );
851 let cfg = mgr.get("b").expect("config");
852 assert_eq!(cfg.rules.len(), 1);
853 assert_eq!(cfg.rules[0].id, "new");
854 }
855
856 #[tokio::test]
857 async fn dispatch_event_via_webhook_delivers_payload() {
858 use std::sync::Mutex;
861 use tokio::io::{AsyncReadExt, AsyncWriteExt};
862 use tokio::net::TcpListener;
863
864 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
865 let addr = listener.local_addr().expect("addr");
866 let received: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
867 let received_cl = Arc::clone(&received);
868 tokio::spawn(async move {
869 if let Ok((mut sock, _)) = listener.accept().await {
870 let mut buf = vec![0u8; 16384];
871 let n = sock.read(&mut buf).await.unwrap_or(0);
872 let raw = String::from_utf8_lossy(&buf[..n]).to_string();
873 received_cl.lock().unwrap().push(raw);
874 let _ = sock
875 .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
876 .await;
877 }
878 });
879
880 let mgr = Arc::new(NotificationManager::new());
881 mgr.put(
882 "b",
883 NotificationConfig {
884 rules: vec![rule(
885 "r1",
886 &[EventType::ObjectCreatedPut],
887 Destination::Webhook {
888 url: format!("http://{addr}/hook"),
889 },
890 None,
891 None,
892 )],
893 },
894 );
895
896 dispatch_event(
897 Arc::clone(&mgr),
898 "b".into(),
899 "k.txt".into(),
900 EventType::ObjectCreatedPut,
901 Some(7),
902 Some("\"abc\"".into()),
903 None,
904 "req-1".into(),
905 )
906 .await;
907
908 for _ in 0..50 {
910 if !received.lock().unwrap().is_empty() {
911 break;
912 }
913 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
914 }
915 let raw = received.lock().unwrap().clone();
916 assert!(!raw.is_empty(), "webhook receiver got nothing");
917 let raw = &raw[0];
918 assert!(raw.contains("POST /hook"), "missing POST line");
919 assert!(raw.contains("s3:ObjectCreated:Put"), "missing event name");
920 assert!(raw.contains("\"k.txt\""), "missing key");
921 assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
922 }
923
924 #[tokio::test]
925 async fn dispatch_event_503_drops_after_retry_budget() {
926 use std::sync::Mutex;
929 use tokio::io::{AsyncReadExt, AsyncWriteExt};
930 use tokio::net::TcpListener;
931
932 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
933 let addr = listener.local_addr().expect("addr");
934 let attempt_count: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
935 let attempt_count_cl = Arc::clone(&attempt_count);
936 tokio::spawn(async move {
937 for _ in 0..RETRY_ATTEMPTS {
938 if let Ok((mut sock, _)) = listener.accept().await {
939 let mut buf = vec![0u8; 16384];
940 let _ = sock.read(&mut buf).await;
941 *attempt_count_cl.lock().unwrap() += 1;
942 let _ = sock
943 .write_all(b"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n")
944 .await;
945 }
946 }
947 });
948
949 let mgr = Arc::new(NotificationManager::new());
950 mgr.put(
951 "b",
952 NotificationConfig {
953 rules: vec![rule(
954 "r1",
955 &[EventType::ObjectCreatedPut],
956 Destination::Webhook {
957 url: format!("http://{addr}/sink"),
958 },
959 None,
960 None,
961 )],
962 },
963 );
964
965 dispatch_event(
966 Arc::clone(&mgr),
967 "b".into(),
968 "k".into(),
969 EventType::ObjectCreatedPut,
970 None,
971 None,
972 None,
973 "r".into(),
974 )
975 .await;
976
977 for _ in 0..100 {
980 if mgr.dropped_total.load(Ordering::Relaxed) > 0 {
981 break;
982 }
983 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
984 }
985 assert_eq!(
986 mgr.dropped_total.load(Ordering::Relaxed),
987 1,
988 "drop counter must bump exactly once after retry budget exhausted"
989 );
990 }
991
992 #[test]
997 fn notifications_to_json_after_panic_recovers_via_poison() {
998 let mgr = std::sync::Arc::new(NotificationManager::new());
999 mgr.put(
1000 "b",
1001 NotificationConfig {
1002 rules: vec![NotificationRule {
1003 id: "r1".into(),
1004 events: vec![EventType::ObjectCreatedPut],
1005 destination: Destination::Webhook {
1006 url: "http://example.invalid".into(),
1007 },
1008 filter_prefix: None,
1009 filter_suffix: None,
1010 }],
1011 },
1012 );
1013 let mgr_cl = std::sync::Arc::clone(&mgr);
1014 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1015 let mut g = mgr_cl.by_bucket.write().expect("clean lock");
1016 g.entry("b2".into()).or_default();
1017 panic!("force-poison");
1018 }));
1019 assert!(
1020 mgr.by_bucket.is_poisoned(),
1021 "write panic must poison by_bucket lock"
1022 );
1023 let json = mgr.to_json().expect("to_json after poison must succeed");
1024 let mgr2 = NotificationManager::from_json(&json).expect("from_json");
1025 assert!(
1026 mgr2.get("b").is_some(),
1027 "recovered snapshot keeps original config"
1028 );
1029 }
1030}