1use std::collections::HashMap;
51use std::sync::Arc;
52use std::sync::RwLock;
53use std::sync::atomic::{AtomicU64, Ordering};
54
55use serde::{Deserialize, Serialize};
56
57#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
62pub enum EventType {
63 ObjectCreatedPut,
65 ObjectRemovedDelete,
68 ObjectRemovedDeleteMarker,
82}
83
84impl EventType {
85 #[must_use]
88 pub fn as_aws_str(&self) -> &'static str {
89 match self {
90 Self::ObjectCreatedPut => "s3:ObjectCreated:Put",
91 Self::ObjectRemovedDelete => "s3:ObjectRemoved:Delete",
92 Self::ObjectRemovedDeleteMarker => "s3:ObjectRemoved:DeleteMarkerCreated",
93 }
94 }
95
96 #[must_use]
101 pub fn from_aws_str(s: &str) -> Option<Self> {
102 match s {
103 "s3:ObjectCreated:Put" | "s3:ObjectCreated:*" => Some(Self::ObjectCreatedPut),
104 "s3:ObjectRemoved:Delete" => Some(Self::ObjectRemovedDelete),
105 "s3:ObjectRemoved:DeleteMarkerCreated" => Some(Self::ObjectRemovedDeleteMarker),
106 "s3:ObjectRemoved:*" => Some(Self::ObjectRemovedDelete),
107 _ => None,
108 }
109 }
110}
111
112#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
117pub enum Destination {
118 Webhook { url: String },
121 Sqs { queue_arn: String },
123 Sns { topic_arn: String },
125}
126
127impl Destination {
128 #[must_use]
131 pub fn type_tag(&self) -> &'static str {
132 match self {
133 Self::Webhook { .. } => "webhook",
134 Self::Sqs { .. } => "sqs",
135 Self::Sns { .. } => "sns",
136 }
137 }
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
146pub struct NotificationRule {
147 pub id: String,
150 pub events: Vec<EventType>,
153 pub destination: Destination,
155 pub filter_prefix: Option<String>,
159 pub filter_suffix: Option<String>,
162}
163
164#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
166pub struct NotificationConfig {
167 pub rules: Vec<NotificationRule>,
168}
169
170#[derive(Debug, Default, Serialize, Deserialize)]
174struct NotificationSnapshot {
175 by_bucket: HashMap<String, NotificationConfig>,
176}
177
178pub struct NotificationManager {
183 by_bucket: RwLock<HashMap<String, NotificationConfig>>,
184 pub dropped_total: AtomicU64,
188}
189
190impl Default for NotificationManager {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196impl std::fmt::Debug for NotificationManager {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 f.debug_struct("NotificationManager")
199 .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
200 .finish_non_exhaustive()
201 }
202}
203
204impl NotificationManager {
205 #[must_use]
207 pub fn new() -> Self {
208 Self {
209 by_bucket: RwLock::new(HashMap::new()),
210 dropped_total: AtomicU64::new(0),
211 }
212 }
213
214 pub fn put(&self, bucket: &str, config: NotificationConfig) {
218 crate::lock_recovery::recover_write(&self.by_bucket, "notifications.by_bucket")
219 .insert(bucket.to_owned(), config);
220 }
221
222 #[must_use]
227 pub fn get(&self, bucket: &str) -> Option<NotificationConfig> {
228 crate::lock_recovery::recover_read(&self.by_bucket, "notifications.by_bucket")
229 .get(bucket)
230 .cloned()
231 }
232
233 pub fn delete(&self, bucket: &str) {
235 crate::lock_recovery::recover_write(&self.by_bucket, "notifications.by_bucket")
236 .remove(bucket);
237 }
238
239 pub fn to_json(&self) -> Result<String, serde_json::Error> {
242 let snap = NotificationSnapshot {
243 by_bucket: crate::lock_recovery::recover_read(
244 &self.by_bucket,
245 "notifications.by_bucket",
246 )
247 .clone(),
248 };
249 serde_json::to_string(&snap)
250 }
251
252 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
256 let snap: NotificationSnapshot = serde_json::from_str(s)?;
257 Ok(Self {
258 by_bucket: RwLock::new(snap.by_bucket),
259 dropped_total: AtomicU64::new(0),
260 })
261 }
262
263 #[must_use]
268 pub fn match_destinations(
269 &self,
270 bucket: &str,
271 event: &EventType,
272 key: &str,
273 ) -> Vec<Destination> {
274 let map = crate::lock_recovery::recover_read(&self.by_bucket, "notifications.by_bucket");
275 let cfg = match map.get(bucket) {
276 Some(c) => c,
277 None => return Vec::new(),
278 };
279 cfg.rules
280 .iter()
281 .filter(|r| rule_matches(r, event, key))
282 .map(|r| r.destination.clone())
283 .collect()
284 }
285}
286
287fn rule_matches(rule: &NotificationRule, event: &EventType, key: &str) -> bool {
288 if !rule.events.iter().any(|e| e == event) {
289 return false;
290 }
291 if let Some(p) = rule.filter_prefix.as_deref()
292 && !p.is_empty()
293 && !key.starts_with(p)
294 {
295 return false;
296 }
297 if let Some(s) = rule.filter_suffix.as_deref()
298 && !s.is_empty()
299 && !key.ends_with(s)
300 {
301 return false;
302 }
303 true
304}
305
306#[must_use]
314#[allow(clippy::too_many_arguments)]
315pub fn build_event_json(
316 bucket: &str,
317 key: &str,
318 event: &EventType,
319 size: Option<u64>,
320 etag: Option<&str>,
321 version_id: Option<&str>,
322 request_id: &str,
323 now: chrono::DateTime<chrono::Utc>,
324) -> String {
325 let etag_clean = etag.map(|e| e.trim_matches('"').to_owned());
328 let mut object = serde_json::json!({
329 "key": key,
330 "sequencer": format!("{:016x}", now.timestamp_micros() as u64),
331 });
332 if let Some(sz) = size {
333 object["size"] = serde_json::json!(sz);
334 }
335 if let Some(ref e) = etag_clean {
336 object["eTag"] = serde_json::json!(e);
337 }
338 if let Some(v) = version_id {
339 object["versionId"] = serde_json::json!(v);
340 }
341 let event_name = event.as_aws_str();
342 let event_source = match event {
343 EventType::ObjectCreatedPut => "ObjectCreated",
344 EventType::ObjectRemovedDelete | EventType::ObjectRemovedDeleteMarker => "ObjectRemoved",
345 };
346 let record = serde_json::json!({
347 "eventVersion": "2.1",
348 "eventSource": "aws:s3",
349 "awsRegion": "us-east-1",
350 "eventTime": now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
351 "eventName": event_name,
352 "userIdentity": { "principalId": "S4" },
353 "requestParameters": { "sourceIPAddress": "0.0.0.0" },
354 "responseElements": {
355 "x-amz-request-id": request_id,
356 "x-amz-id-2": request_id,
357 },
358 "s3": {
359 "s3SchemaVersion": "1.0",
360 "configurationId": "S4-default",
361 "bucket": {
362 "name": bucket,
363 "ownerIdentity": { "principalId": "S4" },
364 "arn": format!("arn:aws:s3:::{bucket}"),
365 },
366 "object": object,
367 },
368 });
369 let _ = event_source; serde_json::json!({ "Records": [record] }).to_string()
371}
372
373const RETRY_ATTEMPTS: u32 = 3;
374const RETRY_BASE_MS: u64 = 50;
375
376#[allow(clippy::too_many_arguments)]
387pub async fn dispatch_event(
388 manager: Arc<NotificationManager>,
389 bucket: String,
390 key: String,
391 event: EventType,
392 size: Option<u64>,
393 etag: Option<String>,
394 version_id: Option<String>,
395 request_id: String,
396) {
397 let dests = manager.match_destinations(&bucket, &event, &key);
398 if dests.is_empty() {
399 return;
400 }
401 let now = chrono::Utc::now();
402 let body = build_event_json(
403 &bucket,
404 &key,
405 &event,
406 size,
407 etag.as_deref(),
408 version_id.as_deref(),
409 &request_id,
410 now,
411 );
412 for dest in dests {
413 let mgr = Arc::clone(&manager);
414 let body = body.clone();
415 tokio::spawn(async move {
433 use futures::FutureExt as _;
434 let kind = "notification";
435 let fut = send_one(mgr, dest, body);
436 if let Err(panic) = std::panic::AssertUnwindSafe(fut).catch_unwind().await {
437 let panic_msg = panic
438 .downcast_ref::<&'static str>()
439 .copied()
440 .map(str::to_owned)
441 .or_else(|| panic.downcast_ref::<String>().cloned())
442 .unwrap_or_else(|| "(non-string panic payload)".to_owned());
443 tracing::error!(
444 kind,
445 panic_payload = %panic_msg,
446 "S4 dispatcher task panicked (caught by catch_unwind, runtime not poisoned)"
447 );
448 crate::metrics::record_dispatcher_panic(kind);
449 }
450 });
451 }
452}
453
454async fn send_one(manager: Arc<NotificationManager>, dest: Destination, body: String) {
455 match dest {
456 Destination::Webhook { ref url } => {
457 let client = match reqwest::Client::builder()
458 .timeout(std::time::Duration::from_secs(5))
459 .build()
460 {
461 Ok(c) => c,
462 Err(e) => {
463 tracing::warn!(error = %e, "notifications: reqwest client build failed");
464 bump_drop(&manager, dest.type_tag());
465 return;
466 }
467 };
468 for attempt in 0..RETRY_ATTEMPTS {
469 let resp = client
470 .post(url)
471 .header("content-type", "application/json")
472 .body(body.clone())
473 .send()
474 .await;
475 match resp {
476 Ok(r) if r.status().is_success() => return,
477 Ok(r) if r.status().is_server_error() => {
478 if attempt + 1 < RETRY_ATTEMPTS {
480 let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
481 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
482 continue;
483 }
484 tracing::warn!(
485 url = %url,
486 status = %r.status(),
487 "notifications: webhook giving up after {RETRY_ATTEMPTS} attempts"
488 );
489 bump_drop(&manager, "webhook");
490 return;
491 }
492 Ok(r) => {
493 tracing::warn!(
495 url = %url,
496 status = %r.status(),
497 "notifications: webhook permanent failure, dropping"
498 );
499 return;
500 }
501 Err(e) => {
502 if attempt + 1 < RETRY_ATTEMPTS {
504 let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
505 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
506 continue;
507 }
508 tracing::warn!(
509 url = %url,
510 error = %e,
511 "notifications: webhook network failure, dropping after {RETRY_ATTEMPTS} attempts"
512 );
513 bump_drop(&manager, "webhook");
514 return;
515 }
516 }
517 }
518 }
519 Destination::Sqs { ref queue_arn } => {
520 #[cfg(feature = "aws-events")]
521 {
522 send_sqs(&manager, queue_arn, &body).await;
523 }
524 #[cfg(not(feature = "aws-events"))]
525 {
526 let _ = queue_arn;
527 let _ = body;
528 tracing::warn!(
529 "notifications: SQS destination configured but `aws-events` feature is off — dropping"
530 );
531 bump_drop(&manager, "sqs");
532 }
533 }
534 Destination::Sns { ref topic_arn } => {
535 #[cfg(feature = "aws-events")]
536 {
537 send_sns(&manager, topic_arn, &body).await;
538 }
539 #[cfg(not(feature = "aws-events"))]
540 {
541 let _ = topic_arn;
542 let _ = body;
543 tracing::warn!(
544 "notifications: SNS destination configured but `aws-events` feature is off — dropping"
545 );
546 bump_drop(&manager, "sns");
547 }
548 }
549 }
550}
551
552fn bump_drop(manager: &NotificationManager, dest_tag: &'static str) {
553 manager.dropped_total.fetch_add(1, Ordering::Relaxed);
554 crate::metrics::record_notification_drop(dest_tag);
555}
556
557#[cfg(feature = "aws-events")]
558async fn send_sqs(manager: &NotificationManager, queue_arn: &str, body: &str) {
559 let conf = aws_config::load_from_env().await;
560 let client = aws_sdk_sqs::Client::new(&conf);
561 let res = client
567 .send_message()
568 .queue_url(queue_arn)
569 .message_body(body)
570 .send()
571 .await;
572 if let Err(e) = res {
573 tracing::warn!(arn = %queue_arn, error = ?e, "notifications: SQS send failed");
574 bump_drop(manager, "sqs");
575 }
576}
577
578#[cfg(feature = "aws-events")]
579async fn send_sns(manager: &NotificationManager, topic_arn: &str, body: &str) {
580 let conf = aws_config::load_from_env().await;
581 let client = aws_sdk_sns::Client::new(&conf);
582 let res = client
583 .publish()
584 .topic_arn(topic_arn)
585 .message(body)
586 .send()
587 .await;
588 if let Err(e) = res {
589 tracing::warn!(arn = %topic_arn, error = ?e, "notifications: SNS publish failed");
590 bump_drop(manager, "sns");
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597
598 fn rule(
599 id: &str,
600 events: &[EventType],
601 dest: Destination,
602 prefix: Option<&str>,
603 suffix: Option<&str>,
604 ) -> NotificationRule {
605 NotificationRule {
606 id: id.to_owned(),
607 events: events.to_vec(),
608 destination: dest,
609 filter_prefix: prefix.map(str::to_owned),
610 filter_suffix: suffix.map(str::to_owned),
611 }
612 }
613
614 #[test]
615 fn match_destinations_single_rule_event_match() {
616 let mgr = NotificationManager::new();
617 mgr.put(
618 "b",
619 NotificationConfig {
620 rules: vec![rule(
621 "r1",
622 &[EventType::ObjectCreatedPut],
623 Destination::Webhook {
624 url: "http://hook".into(),
625 },
626 None,
627 None,
628 )],
629 },
630 );
631 let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "any/key.txt");
632 assert_eq!(dests.len(), 1, "single rule must fire on event match");
633 }
634
635 #[test]
636 fn match_destinations_prefix_filter() {
637 let mgr = NotificationManager::new();
638 mgr.put(
639 "b",
640 NotificationConfig {
641 rules: vec![rule(
642 "r1",
643 &[EventType::ObjectCreatedPut],
644 Destination::Webhook {
645 url: "http://hook".into(),
646 },
647 Some("uploads/"),
648 None,
649 )],
650 },
651 );
652 assert_eq!(
653 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "uploads/file.bin")
654 .len(),
655 1
656 );
657 assert!(
658 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "logs/file.bin")
659 .is_empty(),
660 "prefix filter must reject non-matching key"
661 );
662 }
663
664 #[test]
665 fn match_destinations_suffix_filter() {
666 let mgr = NotificationManager::new();
667 mgr.put(
668 "b",
669 NotificationConfig {
670 rules: vec![rule(
671 "r1",
672 &[EventType::ObjectCreatedPut],
673 Destination::Webhook {
674 url: "http://hook".into(),
675 },
676 None,
677 Some(".jpg"),
678 )],
679 },
680 );
681 assert_eq!(
682 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "photo.jpg")
683 .len(),
684 1
685 );
686 assert!(
687 mgr.match_destinations("b", &EventType::ObjectCreatedPut, "doc.pdf")
688 .is_empty(),
689 "suffix filter must reject non-matching key"
690 );
691 }
692
693 #[test]
694 fn match_destinations_no_rule_for_bucket() {
695 let mgr = NotificationManager::new();
696 let dests = mgr.match_destinations("ghost", &EventType::ObjectCreatedPut, "k");
697 assert!(dests.is_empty(), "unknown bucket must yield empty vec");
698 }
699
700 #[test]
701 fn match_destinations_event_type_mismatch() {
702 let mgr = NotificationManager::new();
703 mgr.put(
704 "b",
705 NotificationConfig {
706 rules: vec![rule(
707 "r1",
708 &[EventType::ObjectCreatedPut],
709 Destination::Webhook {
710 url: "http://hook".into(),
711 },
712 None,
713 None,
714 )],
715 },
716 );
717 assert!(
718 mgr.match_destinations("b", &EventType::ObjectRemovedDelete, "k")
719 .is_empty(),
720 "mismatched event type must not fire"
721 );
722 }
723
724 #[test]
725 fn match_destinations_multiple_rules_fire_in_order() {
726 let mgr = NotificationManager::new();
727 mgr.put(
728 "b",
729 NotificationConfig {
730 rules: vec![
731 rule(
732 "first",
733 &[EventType::ObjectCreatedPut],
734 Destination::Webhook {
735 url: "http://first".into(),
736 },
737 None,
738 None,
739 ),
740 rule(
741 "second",
742 &[EventType::ObjectCreatedPut],
743 Destination::Webhook {
744 url: "http://second".into(),
745 },
746 None,
747 None,
748 ),
749 ],
750 },
751 );
752 let dests = mgr.match_destinations("b", &EventType::ObjectCreatedPut, "k");
753 assert_eq!(dests.len(), 2, "both matching rules fire");
754 match (&dests[0], &dests[1]) {
755 (Destination::Webhook { url: u1 }, Destination::Webhook { url: u2 }) => {
756 assert_eq!(u1, "http://first");
757 assert_eq!(u2, "http://second");
758 }
759 _ => panic!("expected two webhooks in declaration order"),
760 }
761 }
762
763 #[test]
764 fn build_event_json_schema_matches_aws() {
765 let now = chrono::DateTime::parse_from_rfc3339("2026-05-13T10:00:00Z")
766 .unwrap()
767 .with_timezone(&chrono::Utc);
768 let body = build_event_json(
769 "my-bucket",
770 "uploads/photo.jpg",
771 &EventType::ObjectCreatedPut,
772 Some(12345),
773 Some("\"deadbeef\""),
774 Some("v-001"),
775 "REQ-1",
776 now,
777 );
778 let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
779 let rec = &v["Records"][0];
780 assert_eq!(rec["eventName"], "s3:ObjectCreated:Put");
781 assert_eq!(rec["eventTime"], "2026-05-13T10:00:00.000Z");
782 assert_eq!(rec["s3"]["bucket"]["name"], "my-bucket");
783 assert_eq!(rec["s3"]["object"]["key"], "uploads/photo.jpg");
784 assert_eq!(rec["s3"]["object"]["size"], 12345);
785 assert_eq!(rec["s3"]["object"]["eTag"], "deadbeef");
786 assert_eq!(rec["s3"]["object"]["versionId"], "v-001");
787 }
788
789 #[test]
790 fn build_event_json_omits_optional_fields() {
791 let now = chrono::Utc::now();
792 let body = build_event_json(
793 "b",
794 "k",
795 &EventType::ObjectRemovedDeleteMarker,
796 None,
797 None,
798 None,
799 "r",
800 now,
801 );
802 let v: serde_json::Value = serde_json::from_str(&body).expect("valid json");
803 let obj = &v["Records"][0]["s3"]["object"];
804 assert!(obj.get("size").is_none());
805 assert!(obj.get("eTag").is_none());
806 assert!(obj.get("versionId").is_none());
807 }
808
809 #[test]
810 fn json_round_trip() {
811 let mgr = NotificationManager::new();
812 mgr.put(
813 "b",
814 NotificationConfig {
815 rules: vec![rule(
816 "r1",
817 &[EventType::ObjectCreatedPut, EventType::ObjectRemovedDelete],
818 Destination::Sqs {
819 queue_arn: "arn:aws:sqs:us-east-1:123:q".into(),
820 },
821 Some("u/"),
822 Some(".jpg"),
823 )],
824 },
825 );
826 let json = mgr.to_json().expect("to_json");
827 let mgr2 = NotificationManager::from_json(&json).expect("from_json");
828 assert_eq!(mgr.get("b"), mgr2.get("b"));
829 }
830
831 #[test]
832 fn delete_is_idempotent() {
833 let mgr = NotificationManager::new();
834 mgr.delete("never-existed");
835 mgr.put(
836 "b",
837 NotificationConfig {
838 rules: vec![rule(
839 "r1",
840 &[EventType::ObjectCreatedPut],
841 Destination::Webhook {
842 url: "http://h".into(),
843 },
844 None,
845 None,
846 )],
847 },
848 );
849 mgr.delete("b");
850 assert!(mgr.get("b").is_none());
851 }
852
853 #[test]
854 fn put_replaces_previous_config() {
855 let mgr = NotificationManager::new();
856 mgr.put(
857 "b",
858 NotificationConfig {
859 rules: vec![rule(
860 "old",
861 &[EventType::ObjectCreatedPut],
862 Destination::Webhook {
863 url: "http://old".into(),
864 },
865 None,
866 None,
867 )],
868 },
869 );
870 mgr.put(
871 "b",
872 NotificationConfig {
873 rules: vec![rule(
874 "new",
875 &[EventType::ObjectRemovedDelete],
876 Destination::Webhook {
877 url: "http://new".into(),
878 },
879 None,
880 None,
881 )],
882 },
883 );
884 let cfg = mgr.get("b").expect("config");
885 assert_eq!(cfg.rules.len(), 1);
886 assert_eq!(cfg.rules[0].id, "new");
887 }
888
889 #[tokio::test]
890 async fn dispatch_event_via_webhook_delivers_payload() {
891 use std::sync::Mutex;
894 use tokio::io::{AsyncReadExt, AsyncWriteExt};
895 use tokio::net::TcpListener;
896
897 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
898 let addr = listener.local_addr().expect("addr");
899 let received: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
900 let received_cl = Arc::clone(&received);
901 tokio::spawn(async move {
902 if let Ok((mut sock, _)) = listener.accept().await {
903 let mut buf = vec![0u8; 16384];
904 let n = sock.read(&mut buf).await.unwrap_or(0);
905 let raw = String::from_utf8_lossy(&buf[..n]).to_string();
906 received_cl.lock().unwrap().push(raw);
907 let _ = sock
908 .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
909 .await;
910 }
911 });
912
913 let mgr = Arc::new(NotificationManager::new());
914 mgr.put(
915 "b",
916 NotificationConfig {
917 rules: vec![rule(
918 "r1",
919 &[EventType::ObjectCreatedPut],
920 Destination::Webhook {
921 url: format!("http://{addr}/hook"),
922 },
923 None,
924 None,
925 )],
926 },
927 );
928
929 dispatch_event(
930 Arc::clone(&mgr),
931 "b".into(),
932 "k.txt".into(),
933 EventType::ObjectCreatedPut,
934 Some(7),
935 Some("\"abc\"".into()),
936 None,
937 "req-1".into(),
938 )
939 .await;
940
941 for _ in 0..50 {
943 if !received.lock().unwrap().is_empty() {
944 break;
945 }
946 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
947 }
948 let raw = received.lock().unwrap().clone();
949 assert!(!raw.is_empty(), "webhook receiver got nothing");
950 let raw = &raw[0];
951 assert!(raw.contains("POST /hook"), "missing POST line");
952 assert!(raw.contains("s3:ObjectCreated:Put"), "missing event name");
953 assert!(raw.contains("\"k.txt\""), "missing key");
954 assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
955 }
956
957 #[tokio::test]
958 async fn dispatch_event_503_drops_after_retry_budget() {
959 use std::sync::Mutex;
962 use tokio::io::{AsyncReadExt, AsyncWriteExt};
963 use tokio::net::TcpListener;
964
965 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
966 let addr = listener.local_addr().expect("addr");
967 let attempt_count: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
968 let attempt_count_cl = Arc::clone(&attempt_count);
969 tokio::spawn(async move {
970 for _ in 0..RETRY_ATTEMPTS {
971 if let Ok((mut sock, _)) = listener.accept().await {
972 let mut buf = vec![0u8; 16384];
973 let _ = sock.read(&mut buf).await;
974 *attempt_count_cl.lock().unwrap() += 1;
975 let _ = sock
976 .write_all(b"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n")
977 .await;
978 }
979 }
980 });
981
982 let mgr = Arc::new(NotificationManager::new());
983 mgr.put(
984 "b",
985 NotificationConfig {
986 rules: vec![rule(
987 "r1",
988 &[EventType::ObjectCreatedPut],
989 Destination::Webhook {
990 url: format!("http://{addr}/sink"),
991 },
992 None,
993 None,
994 )],
995 },
996 );
997
998 dispatch_event(
999 Arc::clone(&mgr),
1000 "b".into(),
1001 "k".into(),
1002 EventType::ObjectCreatedPut,
1003 None,
1004 None,
1005 None,
1006 "r".into(),
1007 )
1008 .await;
1009
1010 for _ in 0..100 {
1013 if mgr.dropped_total.load(Ordering::Relaxed) > 0 {
1014 break;
1015 }
1016 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1017 }
1018 assert_eq!(
1019 mgr.dropped_total.load(Ordering::Relaxed),
1020 1,
1021 "drop counter must bump exactly once after retry budget exhausted"
1022 );
1023 }
1024
1025 #[test]
1030 fn notifications_to_json_after_panic_recovers_via_poison() {
1031 let mgr = std::sync::Arc::new(NotificationManager::new());
1032 mgr.put(
1033 "b",
1034 NotificationConfig {
1035 rules: vec![NotificationRule {
1036 id: "r1".into(),
1037 events: vec![EventType::ObjectCreatedPut],
1038 destination: Destination::Webhook {
1039 url: "http://example.invalid".into(),
1040 },
1041 filter_prefix: None,
1042 filter_suffix: None,
1043 }],
1044 },
1045 );
1046 let mgr_cl = std::sync::Arc::clone(&mgr);
1047 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1048 let mut g = mgr_cl.by_bucket.write().expect("clean lock");
1049 g.entry("b2".into()).or_default();
1050 panic!("force-poison");
1051 }));
1052 assert!(
1053 mgr.by_bucket.is_poisoned(),
1054 "write panic must poison by_bucket lock"
1055 );
1056 let json = mgr.to_json().expect("to_json after poison must succeed");
1057 let mgr2 = NotificationManager::from_json(&json).expect("from_json");
1058 assert!(
1059 mgr2.get("b").is_some(),
1060 "recovered snapshot keeps original config"
1061 );
1062 }
1063}