1use std::collections::HashMap;
53use std::sync::Arc;
54use std::sync::RwLock;
55use std::sync::atomic::{AtomicU64, Ordering};
56
57use serde::{Deserialize, Serialize};
58
59#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
63pub enum ReplicationStatus {
64 Pending,
68 Completed,
71 Failed,
73 Replica,
77}
78
79impl ReplicationStatus {
80 #[must_use]
83 pub fn as_aws_str(&self) -> &'static str {
84 match self {
85 Self::Pending => "PENDING",
86 Self::Completed => "COMPLETED",
87 Self::Failed => "FAILED",
88 Self::Replica => "REPLICA",
89 }
90 }
91}
92
93#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
98pub struct ReplicationFilter {
99 pub prefix: Option<String>,
101 pub tags: Vec<(String, String)>,
105}
106
107#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
112pub struct ReplicationRule {
113 pub id: String,
115 pub priority: u32,
119 pub status_enabled: bool,
122 pub filter: ReplicationFilter,
124 pub destination_bucket: String,
127 pub destination_storage_class: Option<String>,
130}
131
132#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
134pub struct ReplicationConfig {
135 pub role: String,
139 pub rules: Vec<ReplicationRule>,
140}
141
142#[derive(Debug, Default, Serialize, Deserialize)]
146struct ReplicationSnapshot {
147 by_bucket: HashMap<String, ReplicationConfig>,
148 statuses: Vec<((String, String), ReplicationStatus)>,
152}
153
154pub struct ReplicationManager {
157 by_bucket: RwLock<HashMap<String, ReplicationConfig>>,
158 statuses: RwLock<HashMap<(String, String), ReplicationStatus>>,
162 pub dropped_total: AtomicU64,
166}
167
168impl Default for ReplicationManager {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174impl std::fmt::Debug for ReplicationManager {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 f.debug_struct("ReplicationManager")
177 .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
178 .finish_non_exhaustive()
179 }
180}
181
182impl ReplicationManager {
183 #[must_use]
185 pub fn new() -> Self {
186 Self {
187 by_bucket: RwLock::new(HashMap::new()),
188 statuses: RwLock::new(HashMap::new()),
189 dropped_total: AtomicU64::new(0),
190 }
191 }
192
193 pub fn put(&self, bucket: &str, config: ReplicationConfig) {
197 self.by_bucket
198 .write()
199 .expect("replication state RwLock poisoned")
200 .insert(bucket.to_owned(), config);
201 }
202
203 #[must_use]
208 pub fn get(&self, bucket: &str) -> Option<ReplicationConfig> {
209 self.by_bucket
210 .read()
211 .expect("replication state RwLock poisoned")
212 .get(bucket)
213 .cloned()
214 }
215
216 pub fn delete(&self, bucket: &str) {
218 self.by_bucket
219 .write()
220 .expect("replication state RwLock poisoned")
221 .remove(bucket);
222 }
223
224 pub fn to_json(&self) -> Result<String, serde_json::Error> {
227 let snap = ReplicationSnapshot {
228 by_bucket: self
229 .by_bucket
230 .read()
231 .expect("replication state RwLock poisoned")
232 .clone(),
233 statuses: self
234 .statuses
235 .read()
236 .expect("replication state RwLock poisoned")
237 .iter()
238 .map(|(k, v)| (k.clone(), v.clone()))
239 .collect(),
240 };
241 serde_json::to_string(&snap)
242 }
243
244 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
248 let snap: ReplicationSnapshot = serde_json::from_str(s)?;
249 Ok(Self {
250 by_bucket: RwLock::new(snap.by_bucket),
251 statuses: RwLock::new(snap.statuses.into_iter().collect()),
252 dropped_total: AtomicU64::new(0),
253 })
254 }
255
256 #[must_use]
262 pub fn match_rule(
263 &self,
264 bucket: &str,
265 key: &str,
266 object_tags: &[(String, String)],
267 ) -> Option<ReplicationRule> {
268 let map = self
269 .by_bucket
270 .read()
271 .expect("replication state RwLock poisoned");
272 let cfg = map.get(bucket)?;
273 let mut best: Option<&ReplicationRule> = None;
274 for rule in &cfg.rules {
275 if !rule.status_enabled {
276 continue;
277 }
278 if !filter_matches(&rule.filter, key, object_tags) {
279 continue;
280 }
281 best = match best {
282 None => Some(rule),
283 Some(prev) if rule.priority > prev.priority => Some(rule),
284 Some(prev) => Some(prev),
285 };
286 }
287 best.cloned()
288 }
289
290 pub fn record_status(&self, bucket: &str, key: &str, status: ReplicationStatus) {
293 self.statuses
294 .write()
295 .expect("replication state RwLock poisoned")
296 .insert((bucket.to_owned(), key.to_owned()), status);
297 }
298
299 #[must_use]
304 pub fn lookup_status(&self, bucket: &str, key: &str) -> Option<ReplicationStatus> {
305 self.statuses
306 .read()
307 .expect("replication state RwLock poisoned")
308 .get(&(bucket.to_owned(), key.to_owned()))
309 .cloned()
310 }
311}
312
313fn filter_matches(
316 filter: &ReplicationFilter,
317 key: &str,
318 object_tags: &[(String, String)],
319) -> bool {
320 if let Some(p) = filter.prefix.as_deref()
321 && !p.is_empty()
322 && !key.starts_with(p)
323 {
324 return false;
325 }
326 for (tk, tv) in &filter.tags {
327 if !object_tags
328 .iter()
329 .any(|(ok, ov)| ok == tk && ov == tv)
330 {
331 return false;
332 }
333 }
334 true
335}
336
337const RETRY_ATTEMPTS: u32 = 3;
338const RETRY_BASE_MS: u64 = 50;
339
340pub async fn replicate_object<F, Fut>(
358 rule: ReplicationRule,
359 source_bucket: String,
360 source_key: String,
361 body: bytes::Bytes,
362 metadata: Option<HashMap<String, String>>,
363 do_put: F,
364 manager: Arc<ReplicationManager>,
365) where
366 F: Fn(String, String, bytes::Bytes, Option<HashMap<String, String>>) -> Fut,
367 Fut: std::future::Future<Output = Result<(), String>>,
368{
369 let mut replica_meta = metadata.unwrap_or_default();
374 replica_meta.insert(
375 "x-amz-replication-status".to_owned(),
376 ReplicationStatus::Replica.as_aws_str().to_owned(),
377 );
378 if let Some(ref sc) = rule.destination_storage_class {
379 replica_meta.insert("x-amz-storage-class".to_owned(), sc.clone());
380 }
381
382 let dest_bucket = rule.destination_bucket.clone();
383 for attempt in 0..RETRY_ATTEMPTS {
384 let result = do_put(
385 dest_bucket.clone(),
386 source_key.clone(),
387 body.clone(),
388 Some(replica_meta.clone()),
389 )
390 .await;
391 match result {
392 Ok(()) => {
393 manager.record_status(
394 &source_bucket,
395 &source_key,
396 ReplicationStatus::Completed,
397 );
398 crate::metrics::record_replication_replicated(&source_bucket, &dest_bucket);
399 tracing::debug!(
400 source_bucket = %source_bucket,
401 source_key = %source_key,
402 dest_bucket = %dest_bucket,
403 rule_id = %rule.id,
404 "S4 replication: COMPLETED"
405 );
406 return;
407 }
408 Err(e) => {
409 if attempt + 1 < RETRY_ATTEMPTS {
410 let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
411 tracing::warn!(
412 source_bucket = %source_bucket,
413 source_key = %source_key,
414 dest_bucket = %dest_bucket,
415 attempt = attempt + 1,
416 error = %e,
417 "S4 replication: attempt failed, retrying"
418 );
419 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
420 continue;
421 }
422 manager.record_status(
423 &source_bucket,
424 &source_key,
425 ReplicationStatus::Failed,
426 );
427 manager.dropped_total.fetch_add(1, Ordering::Relaxed);
428 crate::metrics::record_replication_drop(&source_bucket);
429 tracing::warn!(
430 source_bucket = %source_bucket,
431 source_key = %source_key,
432 dest_bucket = %dest_bucket,
433 rule_id = %rule.id,
434 error = %e,
435 "S4 replication: FAILED after {RETRY_ATTEMPTS} attempts (drop counter bumped)"
436 );
437 return;
438 }
439 }
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use std::sync::Mutex;
447
448 fn rule(
449 id: &str,
450 priority: u32,
451 enabled: bool,
452 prefix: Option<&str>,
453 tags: &[(&str, &str)],
454 dest: &str,
455 ) -> ReplicationRule {
456 ReplicationRule {
457 id: id.to_owned(),
458 priority,
459 status_enabled: enabled,
460 filter: ReplicationFilter {
461 prefix: prefix.map(str::to_owned),
462 tags: tags
463 .iter()
464 .map(|(k, v)| ((*k).to_owned(), (*v).to_owned()))
465 .collect(),
466 },
467 destination_bucket: dest.to_owned(),
468 destination_storage_class: None,
469 }
470 }
471
472 #[test]
473 fn match_rule_prefix_filter_match_and_miss() {
474 let mgr = ReplicationManager::new();
475 mgr.put(
476 "src",
477 ReplicationConfig {
478 role: "arn:aws:iam::000:role/s4-test".into(),
479 rules: vec![rule("r1", 1, true, Some("logs/"), &[], "dst")],
480 },
481 );
482 assert!(mgr.match_rule("src", "logs/2026/01/01.log", &[]).is_some());
483 assert!(mgr.match_rule("src", "uploads/foo.bin", &[]).is_none());
484 }
485
486 #[test]
487 fn match_rule_no_config_for_bucket() {
488 let mgr = ReplicationManager::new();
489 assert!(mgr.match_rule("ghost", "k", &[]).is_none());
490 }
491
492 #[test]
493 fn match_rule_priority_picks_highest() {
494 let mgr = ReplicationManager::new();
495 mgr.put(
496 "src",
497 ReplicationConfig {
498 role: "arn".into(),
499 rules: vec![
500 rule("low", 1, true, Some(""), &[], "dst-low"),
501 rule("high", 10, true, Some(""), &[], "dst-high"),
502 rule("mid", 5, true, Some(""), &[], "dst-mid"),
503 ],
504 },
505 );
506 let picked = mgr.match_rule("src", "any.bin", &[]).expect("match");
507 assert_eq!(picked.id, "high");
508 assert_eq!(picked.destination_bucket, "dst-high");
509 }
510
511 #[test]
512 fn match_rule_priority_tie_breaker_is_declaration_order() {
513 let mgr = ReplicationManager::new();
514 mgr.put(
515 "src",
516 ReplicationConfig {
517 role: "arn".into(),
518 rules: vec![
519 rule("first", 5, true, Some(""), &[], "dst-first"),
520 rule("second", 5, true, Some(""), &[], "dst-second"),
521 ],
522 },
523 );
524 let picked = mgr.match_rule("src", "k", &[]).expect("match");
525 assert_eq!(picked.id, "first", "tie on priority must keep the earlier rule");
526 }
527
528 #[test]
529 fn match_rule_tag_filter_and_of_all_tags() {
530 let mgr = ReplicationManager::new();
531 mgr.put(
532 "src",
533 ReplicationConfig {
534 role: "arn".into(),
535 rules: vec![rule(
536 "r-tags",
537 1,
538 true,
539 None,
540 &[("env", "prod"), ("tier", "gold")],
541 "dst",
542 )],
543 },
544 );
545 assert!(
547 mgr.match_rule(
548 "src",
549 "k",
550 &[
551 ("env".into(), "prod".into()),
552 ("tier".into(), "gold".into()),
553 ("extra".into(), "ignored".into())
554 ]
555 )
556 .is_some(),
557 "all required tags present (extras OK) must match"
558 );
559 assert!(
561 mgr.match_rule(
562 "src",
563 "k",
564 &[("env".into(), "prod".into())]
565 )
566 .is_none(),
567 "missing one of the required tags must not match"
568 );
569 assert!(
571 mgr.match_rule(
572 "src",
573 "k",
574 &[
575 ("env".into(), "dev".into()),
576 ("tier".into(), "gold".into())
577 ]
578 )
579 .is_none(),
580 "wrong value on a required tag must not match"
581 );
582 }
583
584 #[test]
585 fn match_rule_status_disabled_never_matches() {
586 let mgr = ReplicationManager::new();
587 mgr.put(
588 "src",
589 ReplicationConfig {
590 role: "arn".into(),
591 rules: vec![rule("disabled", 100, false, None, &[], "dst")],
592 },
593 );
594 assert!(
595 mgr.match_rule("src", "anything", &[]).is_none(),
596 "status_enabled=false must not match even at high priority"
597 );
598 }
599
600 #[test]
601 fn record_and_lookup_status_round_trip() {
602 let mgr = ReplicationManager::new();
603 assert!(mgr.lookup_status("b", "k").is_none());
604 mgr.record_status("b", "k", ReplicationStatus::Pending);
605 assert_eq!(
606 mgr.lookup_status("b", "k"),
607 Some(ReplicationStatus::Pending)
608 );
609 mgr.record_status("b", "k", ReplicationStatus::Completed);
610 assert_eq!(
611 mgr.lookup_status("b", "k"),
612 Some(ReplicationStatus::Completed)
613 );
614 }
615
616 #[test]
617 fn json_round_trip_preserves_config_and_statuses() {
618 let mgr = ReplicationManager::new();
619 mgr.put(
620 "src",
621 ReplicationConfig {
622 role: "arn:aws:iam::000:role/s4".into(),
623 rules: vec![rule("r1", 7, true, Some("docs/"), &[("env", "prod")], "dst")],
624 },
625 );
626 mgr.record_status("src", "docs/a.pdf", ReplicationStatus::Completed);
627 let json = mgr.to_json().expect("to_json");
628 let mgr2 = ReplicationManager::from_json(&json).expect("from_json");
629 assert_eq!(mgr.get("src"), mgr2.get("src"));
630 assert_eq!(
631 mgr2.lookup_status("src", "docs/a.pdf"),
632 Some(ReplicationStatus::Completed)
633 );
634 }
635
636 #[test]
637 fn delete_is_idempotent() {
638 let mgr = ReplicationManager::new();
639 mgr.delete("never-existed");
640 mgr.put(
641 "b",
642 ReplicationConfig {
643 role: "arn".into(),
644 rules: vec![rule("r1", 1, true, None, &[], "dst")],
645 },
646 );
647 mgr.delete("b");
648 assert!(mgr.get("b").is_none());
649 }
650
651 #[test]
652 fn put_replaces_previous_config() {
653 let mgr = ReplicationManager::new();
654 mgr.put(
655 "b",
656 ReplicationConfig {
657 role: "arn".into(),
658 rules: vec![rule("old", 1, true, None, &[], "dst-old")],
659 },
660 );
661 mgr.put(
662 "b",
663 ReplicationConfig {
664 role: "arn".into(),
665 rules: vec![rule("new", 1, true, None, &[], "dst-new")],
666 },
667 );
668 let cfg = mgr.get("b").expect("config");
669 assert_eq!(cfg.rules.len(), 1);
670 assert_eq!(cfg.rules[0].id, "new");
671 assert_eq!(cfg.rules[0].destination_bucket, "dst-new");
672 }
673
674 #[tokio::test]
675 async fn replicate_object_happy_path_marks_completed() {
676 type Captured = Vec<(String, String, bytes::Bytes, Option<HashMap<String, String>>)>;
677 let mgr = Arc::new(ReplicationManager::new());
678 let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
679 let captured_cl = Arc::clone(&captured);
680
681 let do_put = move |dest: String,
682 key: String,
683 body: bytes::Bytes,
684 meta: Option<HashMap<String, String>>| {
685 let captured = Arc::clone(&captured_cl);
686 async move {
687 captured.lock().unwrap().push((dest, key, body, meta));
688 Ok::<(), String>(())
689 }
690 };
691
692 replicate_object(
693 rule("r1", 1, true, None, &[], "dst"),
694 "src".into(),
695 "obj.bin".into(),
696 bytes::Bytes::from_static(b"hello"),
697 Some(HashMap::from([("content-type".into(), "text/plain".into())])),
698 do_put,
699 Arc::clone(&mgr),
700 )
701 .await;
702
703 assert_eq!(
704 mgr.lookup_status("src", "obj.bin"),
705 Some(ReplicationStatus::Completed)
706 );
707 assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
708 let cap = captured.lock().unwrap();
709 assert_eq!(cap.len(), 1, "do_put must run exactly once on success");
710 assert_eq!(cap[0].0, "dst");
711 assert_eq!(cap[0].1, "obj.bin");
712 assert_eq!(cap[0].2.as_ref(), b"hello");
713 let meta = cap[0].3.as_ref().expect("metadata stamped");
714 assert_eq!(
715 meta.get("x-amz-replication-status").map(String::as_str),
716 Some("REPLICA"),
717 "destination meta must carry the REPLICA stamp"
718 );
719 assert_eq!(meta.get("content-type").map(String::as_str), Some("text/plain"));
720 }
721
722 #[tokio::test]
723 async fn replicate_object_failure_after_retry_budget_marks_failed_and_bumps_drop() {
724 let mgr = Arc::new(ReplicationManager::new());
725 let attempts: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
726 let attempts_cl = Arc::clone(&attempts);
727
728 let do_put = move |_dest: String,
729 _key: String,
730 _body: bytes::Bytes,
731 _meta: Option<HashMap<String, String>>| {
732 let attempts = Arc::clone(&attempts_cl);
733 async move {
734 *attempts.lock().unwrap() += 1;
735 Err::<(), String>("simulated destination 5xx".into())
736 }
737 };
738
739 replicate_object(
740 rule("r-fail", 1, true, None, &[], "dst"),
741 "src".into(),
742 "doomed.bin".into(),
743 bytes::Bytes::from_static(b"x"),
744 None,
745 do_put,
746 Arc::clone(&mgr),
747 )
748 .await;
749
750 assert_eq!(
751 *attempts.lock().unwrap(),
752 RETRY_ATTEMPTS,
753 "must retry exactly the configured budget"
754 );
755 assert_eq!(
756 mgr.lookup_status("src", "doomed.bin"),
757 Some(ReplicationStatus::Failed)
758 );
759 assert_eq!(
760 mgr.dropped_total.load(Ordering::Relaxed),
761 1,
762 "drop counter must bump exactly once after retry budget exhausted"
763 );
764 }
765
766 #[test]
767 fn replication_status_aws_strings_match_spec() {
768 assert_eq!(ReplicationStatus::Pending.as_aws_str(), "PENDING");
769 assert_eq!(ReplicationStatus::Completed.as_aws_str(), "COMPLETED");
770 assert_eq!(ReplicationStatus::Failed.as_aws_str(), "FAILED");
771 assert_eq!(ReplicationStatus::Replica.as_aws_str(), "REPLICA");
772 }
773}