Skip to main content

s4_server/
replication.rs

1//! Bucket-to-bucket asynchronous replication (v0.6 #40).
2//!
3//! AWS S3 Cross-Region Replication (CRR) lets a bucket owner declare a
4//! `ReplicationConfiguration` whose rules say "for every PUT to this
5//! bucket that matches `<filter>`, asynchronously copy the new object to
6//! `<destination_bucket>`". The source object grows an
7//! `x-amz-replication-status` of `PENDING` → `COMPLETED` (or `FAILED`),
8//! the replica gets stamped `REPLICA`, and consumers can poll either
9//! HEAD to see how the replication is going.
10//!
11//! ## v0.6 #40 scope (single-instance only)
12//!
13//! - **Same S4 endpoint** — the source bucket and the destination bucket
14//!   live on the same `S4Service`. True cross-region (multi-instance,
15//!   wire-replicated) replication is a v0.7+ follow-up that needs a
16//!   `aws-sdk-s3` PUT to a remote endpoint with its own credentials.
17//! - **Async only** — the originating `put_object` returns as soon as
18//!   the source backend write is done. The replica PUT happens on a
19//!   detached `tokio::spawn` task and never blocks the client. There is
20//!   no synchronous `replication_required` mode (would defeat the whole
21//!   point of CRR being asynchronous in the first place).
22//! - **Retry budget = 3 attempts** with exponential backoff (50ms,
23//!   100ms, 200ms). On exhaustion the per-(bucket, key) status flips to
24//!   `Failed` and `dropped_total` is bumped + a warn-level log line is
25//!   emitted so operators see the loss in `s4_replication_dropped_total`.
26//! - **Highest-priority rule wins** when multiple rules match a single
27//!   object key (S3 spec). Ties are broken by declaration order
28//!   (deterministic for tests).
29//! - **`status_enabled = false` rules never match**, mirroring the AWS
30//!   `ReplicationRuleStatus::Disabled` semantics — the rule sits in the
31//!   configuration document but is inert.
32//! - **Replica is full-body** — there is no delta replication, no
33//!   incremental fetch, no batching. Every matching PUT triggers one
34//!   independent destination PUT.
35//!
36//! ## what is NOT in v0.6 #40
37//!
38//! - Delete-marker replication (S3's `DeleteMarkerReplication` block) —
39//!   v0.7+. Right now `delete_object` does not fan out a destination
40//!   delete; the replica drifts on the source's deletion.
41//! - Replication of multipart-completed objects through the per-part
42//!   copy path. The whole compose-then-PUT result of CMU is replicated
43//!   as a single PUT, which is fine for single-instance and matches
44//!   what AWS does for source objects ≤ 5 GiB.
45//! - SSE-KMS-encrypted replicas with KMS-key-id rewriting per the
46//!   `SourceSelectionCriteria` block (the source's wrapped DEK is
47//!   replicated as-is — fine for single-instance because the same KMS
48//!   backend unwraps both copies).
49//! - Replication metrics (RTC) — a v0.7+ follow-up that wires a
50//!   `replication_lag_seconds` histogram.
51
52use std::collections::HashMap;
53use std::sync::Arc;
54use std::sync::RwLock;
55use std::sync::atomic::{AtomicU64, Ordering};
56
57use serde::{Deserialize, Serialize};
58
59/// Per-(bucket, key) replication state, surfaced as the
60/// `x-amz-replication-status` HEAD/GET response header. Values match the
61/// AWS wire form exactly.
62#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
63pub enum ReplicationStatus {
64    /// Replication has been queued (a matching rule fired and the
65    /// dispatcher task has been spawned) but the destination PUT has
66    /// not yet succeeded.
67    Pending,
68    /// Replication has succeeded — the replica exists in the
69    /// destination bucket.
70    Completed,
71    /// Replication failed permanently (retry budget exhausted).
72    Failed,
73    /// Stamped on the destination side so the replica is
74    /// distinguishable from a normal PUT, matching AWS CRR's
75    /// "replica stamp" behaviour.
76    Replica,
77}
78
79impl ReplicationStatus {
80    /// AWS wire-string form. Caller stamps it on the response as the
81    /// `x-amz-replication-status` header.
82    #[must_use]
83    pub fn as_aws_str(&self) -> &'static str {
84        match self {
85            Self::Pending => "PENDING",
86            Self::Completed => "COMPLETED",
87            Self::Failed => "FAILED",
88            Self::Replica => "REPLICA",
89        }
90    }
91}
92
93/// Filter on a `ReplicationRule` — the AND of a key-prefix predicate
94/// and a tag predicate. AWS S3's wire form uses a sum type
95/// (`Prefix | Tag | And { Prefix, Tags }`); we collapse those into
96/// the single representation that the in-memory matcher needs.
97#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
98pub struct ReplicationFilter {
99    /// Empty / `None` means "any prefix".
100    pub prefix: Option<String>,
101    /// AND of every tag pair — every entry here must be present in the
102    /// object's tag set for the rule to fire. Empty means "no tag
103    /// predicate".
104    pub tags: Vec<(String, String)>,
105}
106
107/// One replication rule. Each rule independently decides whether to
108/// copy an object based on the (key, tags) tuple; the replication
109/// manager picks the highest-priority matching rule when multiple
110/// fire on the same object.
111#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
112pub struct ReplicationRule {
113    /// Operator-supplied id (max 255 chars per AWS).
114    pub id: String,
115    /// Higher number = higher priority (S3 spec). When two rules match
116    /// the same key, the higher `priority` wins; ties broken by
117    /// declaration order.
118    pub priority: u32,
119    /// `false` makes the rule inert without removing it from the
120    /// configuration document — mirrors AWS's `Disabled` status.
121    pub status_enabled: bool,
122    /// Subset of source-bucket objects this rule applies to.
123    pub filter: ReplicationFilter,
124    /// Where to copy matching objects. Plain bucket name (no ARN) for
125    /// the v0.6 #40 single-instance scope.
126    pub destination_bucket: String,
127    /// Optional storage-class override on the replica. `None` = keep
128    /// the source's class (S3 default).
129    pub destination_storage_class: Option<String>,
130}
131
132/// Per-bucket replication configuration.
133#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
134pub struct ReplicationConfig {
135    /// Placeholder ARN — not consumed by S4 itself, kept for AWS wire
136    /// compatibility (the `Role` field is mandatory in the
137    /// `PutBucketReplication` XML payload).
138    pub role: String,
139    pub rules: Vec<ReplicationRule>,
140}
141
142/// JSON snapshot — `bucket -> ReplicationConfig`. Mirrors the shape of
143/// `notifications::NotificationSnapshot` so operators can hand-edit
144/// configurations across restart cycles.
145#[derive(Debug, Default, Serialize, Deserialize)]
146struct ReplicationSnapshot {
147    by_bucket: HashMap<String, ReplicationConfig>,
148    /// Per-(bucket, key) replication status. Persisted so a restart
149    /// doesn't lose the COMPLETED stamp on already-replicated
150    /// objects.
151    statuses: Vec<((String, String), ReplicationStatus)>,
152}
153
154/// In-memory manager of per-bucket replication configurations + per-
155/// (bucket, key) replication statuses.
156pub struct ReplicationManager {
157    by_bucket: RwLock<HashMap<String, ReplicationConfig>>,
158    /// Per-(source_bucket, key) replication status. Looked up by
159    /// `head_object` / `get_object` to stamp `x-amz-replication-status`
160    /// on the response.
161    statuses: RwLock<HashMap<(String, String), ReplicationStatus>>,
162    /// Bumped each time the dispatcher exhausts its retry budget on a
163    /// destination PUT. Exposed publicly so the metrics layer can poll
164    /// without taking the configuration lock.
165    pub dropped_total: AtomicU64,
166}
167
168impl Default for ReplicationManager {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174impl std::fmt::Debug for ReplicationManager {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        f.debug_struct("ReplicationManager")
177            .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
178            .finish_non_exhaustive()
179    }
180}
181
182impl ReplicationManager {
183    /// Empty manager — no bucket has any replication rules.
184    #[must_use]
185    pub fn new() -> Self {
186        Self {
187            by_bucket: RwLock::new(HashMap::new()),
188            statuses: RwLock::new(HashMap::new()),
189            dropped_total: AtomicU64::new(0),
190        }
191    }
192
193    /// `put_bucket_replication` handler entry. The bucket's existing
194    /// configuration is fully replaced (S3 spec — `PutBucketReplication`
195    /// is upsert-style at the bucket scope, not per-rule patch).
196    pub fn put(&self, bucket: &str, config: ReplicationConfig) {
197        self.by_bucket
198            .write()
199            .expect("replication state RwLock poisoned")
200            .insert(bucket.to_owned(), config);
201    }
202
203    /// `get_bucket_replication` handler entry. Returns `None` when
204    /// nothing is registered (AWS S3 returns
205    /// `ReplicationConfigurationNotFoundError` in that case; the
206    /// service-layer handler maps `None` accordingly).
207    #[must_use]
208    pub fn get(&self, bucket: &str) -> Option<ReplicationConfig> {
209        self.by_bucket
210            .read()
211            .expect("replication state RwLock poisoned")
212            .get(bucket)
213            .cloned()
214    }
215
216    /// Drop the configuration for `bucket`. Idempotent.
217    pub fn delete(&self, bucket: &str) {
218        self.by_bucket
219            .write()
220            .expect("replication state RwLock poisoned")
221            .remove(bucket);
222    }
223
224    /// Serialise the entire manager state (configurations + per-key
225    /// statuses) to JSON.
226    pub fn to_json(&self) -> Result<String, serde_json::Error> {
227        let snap = ReplicationSnapshot {
228            by_bucket: self
229                .by_bucket
230                .read()
231                .expect("replication state RwLock poisoned")
232                .clone(),
233            statuses: self
234                .statuses
235                .read()
236                .expect("replication state RwLock poisoned")
237                .iter()
238                .map(|(k, v)| (k.clone(), v.clone()))
239                .collect(),
240        };
241        serde_json::to_string(&snap)
242    }
243
244    /// Restore a manager from a previously-emitted snapshot. The
245    /// `dropped_total` counter is reset to 0 — historical drops are
246    /// runtime metrics, not configuration.
247    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
248        let snap: ReplicationSnapshot = serde_json::from_str(s)?;
249        Ok(Self {
250            by_bucket: RwLock::new(snap.by_bucket),
251            statuses: RwLock::new(snap.statuses.into_iter().collect()),
252            dropped_total: AtomicU64::new(0),
253        })
254    }
255
256    /// Match an object against the bucket's rules and return the
257    /// highest-priority enabled rule whose filter matches. Returns
258    /// `None` when no rule matches (or no configuration is registered
259    /// for the bucket). Ties on `priority` are broken by declaration
260    /// order — the first such rule wins.
261    #[must_use]
262    pub fn match_rule(
263        &self,
264        bucket: &str,
265        key: &str,
266        object_tags: &[(String, String)],
267    ) -> Option<ReplicationRule> {
268        let map = self
269            .by_bucket
270            .read()
271            .expect("replication state RwLock poisoned");
272        let cfg = map.get(bucket)?;
273        let mut best: Option<&ReplicationRule> = None;
274        for rule in &cfg.rules {
275            if !rule.status_enabled {
276                continue;
277            }
278            if !filter_matches(&rule.filter, key, object_tags) {
279                continue;
280            }
281            best = match best {
282                None => Some(rule),
283                Some(prev) if rule.priority > prev.priority => Some(rule),
284                Some(prev) => Some(prev),
285            };
286        }
287        best.cloned()
288    }
289
290    /// Stamp the per-(bucket, key) replication status. Replaces any
291    /// previous entry — a `Failed` follows `Pending`, etc.
292    pub fn record_status(&self, bucket: &str, key: &str, status: ReplicationStatus) {
293        self.statuses
294            .write()
295            .expect("replication state RwLock poisoned")
296            .insert((bucket.to_owned(), key.to_owned()), status);
297    }
298
299    /// Look up the recorded replication status for `(bucket, key)`.
300    /// Returns `None` when no PUT to this key has triggered
301    /// replication (= the object is not under any replication rule, or
302    /// it predates the rule's creation).
303    #[must_use]
304    pub fn lookup_status(&self, bucket: &str, key: &str) -> Option<ReplicationStatus> {
305        self.statuses
306            .read()
307            .expect("replication state RwLock poisoned")
308            .get(&(bucket.to_owned(), key.to_owned()))
309            .cloned()
310    }
311}
312
313/// AND of (prefix predicate, every tag pair). An empty / `None` prefix
314/// means "any prefix"; an empty tag list means "no tag predicate".
315fn filter_matches(
316    filter: &ReplicationFilter,
317    key: &str,
318    object_tags: &[(String, String)],
319) -> bool {
320    if let Some(p) = filter.prefix.as_deref()
321        && !p.is_empty()
322        && !key.starts_with(p)
323    {
324        return false;
325    }
326    for (tk, tv) in &filter.tags {
327        if !object_tags
328            .iter()
329            .any(|(ok, ov)| ok == tk && ov == tv)
330        {
331            return false;
332        }
333    }
334    true
335}
336
337const RETRY_ATTEMPTS: u32 = 3;
338const RETRY_BASE_MS: u64 = 50;
339
340/// Replicate one source-bucket object to the rule's destination bucket.
341///
342/// The caller supplies a `do_put` callback that performs the actual
343/// destination-bucket PUT (so unit tests can drive the dispatcher
344/// without needing a full backend). The callback receives:
345/// `(destination_bucket, key, body, metadata)` and returns a
346/// `Result<(), String>` whose `Err` triggers the retry / failure path.
347///
348/// Behaviour:
349/// - Stamps the destination metadata with `x-amz-replication-status:
350///   REPLICA` so a HEAD on the replica is distinguishable.
351/// - On callback success, records `(source_bucket, source_key) →
352///   Completed` in the manager.
353/// - On callback failure, retries up to [`RETRY_ATTEMPTS`] times with
354///   exponential backoff (50ms / 100ms / 200ms). After the budget is
355///   exhausted, records `Failed`, bumps `dropped_total`, and emits the
356///   matching Prometheus counter.
357pub async fn replicate_object<F, Fut>(
358    rule: ReplicationRule,
359    source_bucket: String,
360    source_key: String,
361    body: bytes::Bytes,
362    metadata: Option<HashMap<String, String>>,
363    do_put: F,
364    manager: Arc<ReplicationManager>,
365) where
366    F: Fn(String, String, bytes::Bytes, Option<HashMap<String, String>>) -> Fut,
367    Fut: std::future::Future<Output = Result<(), String>>,
368{
369    // Replica metadata = source metadata + `x-amz-replication-status:
370    // REPLICA` stamp. Keeping the source's compression / encryption
371    // metadata intact means a GET on the replica decodes through the
372    // same path the source would.
373    let mut replica_meta = metadata.unwrap_or_default();
374    replica_meta.insert(
375        "x-amz-replication-status".to_owned(),
376        ReplicationStatus::Replica.as_aws_str().to_owned(),
377    );
378    if let Some(ref sc) = rule.destination_storage_class {
379        replica_meta.insert("x-amz-storage-class".to_owned(), sc.clone());
380    }
381
382    let dest_bucket = rule.destination_bucket.clone();
383    for attempt in 0..RETRY_ATTEMPTS {
384        let result = do_put(
385            dest_bucket.clone(),
386            source_key.clone(),
387            body.clone(),
388            Some(replica_meta.clone()),
389        )
390        .await;
391        match result {
392            Ok(()) => {
393                manager.record_status(
394                    &source_bucket,
395                    &source_key,
396                    ReplicationStatus::Completed,
397                );
398                crate::metrics::record_replication_replicated(&source_bucket, &dest_bucket);
399                tracing::debug!(
400                    source_bucket = %source_bucket,
401                    source_key = %source_key,
402                    dest_bucket = %dest_bucket,
403                    rule_id = %rule.id,
404                    "S4 replication: COMPLETED"
405                );
406                return;
407            }
408            Err(e) => {
409                if attempt + 1 < RETRY_ATTEMPTS {
410                    let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
411                    tracing::warn!(
412                        source_bucket = %source_bucket,
413                        source_key = %source_key,
414                        dest_bucket = %dest_bucket,
415                        attempt = attempt + 1,
416                        error = %e,
417                        "S4 replication: attempt failed, retrying"
418                    );
419                    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
420                    continue;
421                }
422                manager.record_status(
423                    &source_bucket,
424                    &source_key,
425                    ReplicationStatus::Failed,
426                );
427                manager.dropped_total.fetch_add(1, Ordering::Relaxed);
428                crate::metrics::record_replication_drop(&source_bucket);
429                tracing::warn!(
430                    source_bucket = %source_bucket,
431                    source_key = %source_key,
432                    dest_bucket = %dest_bucket,
433                    rule_id = %rule.id,
434                    error = %e,
435                    "S4 replication: FAILED after {RETRY_ATTEMPTS} attempts (drop counter bumped)"
436                );
437                return;
438            }
439        }
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use std::sync::Mutex;
447
448    fn rule(
449        id: &str,
450        priority: u32,
451        enabled: bool,
452        prefix: Option<&str>,
453        tags: &[(&str, &str)],
454        dest: &str,
455    ) -> ReplicationRule {
456        ReplicationRule {
457            id: id.to_owned(),
458            priority,
459            status_enabled: enabled,
460            filter: ReplicationFilter {
461                prefix: prefix.map(str::to_owned),
462                tags: tags
463                    .iter()
464                    .map(|(k, v)| ((*k).to_owned(), (*v).to_owned()))
465                    .collect(),
466            },
467            destination_bucket: dest.to_owned(),
468            destination_storage_class: None,
469        }
470    }
471
472    #[test]
473    fn match_rule_prefix_filter_match_and_miss() {
474        let mgr = ReplicationManager::new();
475        mgr.put(
476            "src",
477            ReplicationConfig {
478                role: "arn:aws:iam::000:role/s4-test".into(),
479                rules: vec![rule("r1", 1, true, Some("logs/"), &[], "dst")],
480            },
481        );
482        assert!(mgr.match_rule("src", "logs/2026/01/01.log", &[]).is_some());
483        assert!(mgr.match_rule("src", "uploads/foo.bin", &[]).is_none());
484    }
485
486    #[test]
487    fn match_rule_no_config_for_bucket() {
488        let mgr = ReplicationManager::new();
489        assert!(mgr.match_rule("ghost", "k", &[]).is_none());
490    }
491
492    #[test]
493    fn match_rule_priority_picks_highest() {
494        let mgr = ReplicationManager::new();
495        mgr.put(
496            "src",
497            ReplicationConfig {
498                role: "arn".into(),
499                rules: vec![
500                    rule("low", 1, true, Some(""), &[], "dst-low"),
501                    rule("high", 10, true, Some(""), &[], "dst-high"),
502                    rule("mid", 5, true, Some(""), &[], "dst-mid"),
503                ],
504            },
505        );
506        let picked = mgr.match_rule("src", "any.bin", &[]).expect("match");
507        assert_eq!(picked.id, "high");
508        assert_eq!(picked.destination_bucket, "dst-high");
509    }
510
511    #[test]
512    fn match_rule_priority_tie_breaker_is_declaration_order() {
513        let mgr = ReplicationManager::new();
514        mgr.put(
515            "src",
516            ReplicationConfig {
517                role: "arn".into(),
518                rules: vec![
519                    rule("first", 5, true, Some(""), &[], "dst-first"),
520                    rule("second", 5, true, Some(""), &[], "dst-second"),
521                ],
522            },
523        );
524        let picked = mgr.match_rule("src", "k", &[]).expect("match");
525        assert_eq!(picked.id, "first", "tie on priority must keep the earlier rule");
526    }
527
528    #[test]
529    fn match_rule_tag_filter_and_of_all_tags() {
530        let mgr = ReplicationManager::new();
531        mgr.put(
532            "src",
533            ReplicationConfig {
534                role: "arn".into(),
535                rules: vec![rule(
536                    "r-tags",
537                    1,
538                    true,
539                    None,
540                    &[("env", "prod"), ("tier", "gold")],
541                    "dst",
542                )],
543            },
544        );
545        // Both tags present → match.
546        assert!(
547            mgr.match_rule(
548                "src",
549                "k",
550                &[
551                    ("env".into(), "prod".into()),
552                    ("tier".into(), "gold".into()),
553                    ("extra".into(), "ignored".into())
554                ]
555            )
556            .is_some(),
557            "all required tags present (extras OK) must match"
558        );
559        // Only one tag → AND fails.
560        assert!(
561            mgr.match_rule(
562                "src",
563                "k",
564                &[("env".into(), "prod".into())]
565            )
566            .is_none(),
567            "missing one of the required tags must not match"
568        );
569        // Wrong tag value → AND fails.
570        assert!(
571            mgr.match_rule(
572                "src",
573                "k",
574                &[
575                    ("env".into(), "dev".into()),
576                    ("tier".into(), "gold".into())
577                ]
578            )
579            .is_none(),
580            "wrong value on a required tag must not match"
581        );
582    }
583
584    #[test]
585    fn match_rule_status_disabled_never_matches() {
586        let mgr = ReplicationManager::new();
587        mgr.put(
588            "src",
589            ReplicationConfig {
590                role: "arn".into(),
591                rules: vec![rule("disabled", 100, false, None, &[], "dst")],
592            },
593        );
594        assert!(
595            mgr.match_rule("src", "anything", &[]).is_none(),
596            "status_enabled=false must not match even at high priority"
597        );
598    }
599
600    #[test]
601    fn record_and_lookup_status_round_trip() {
602        let mgr = ReplicationManager::new();
603        assert!(mgr.lookup_status("b", "k").is_none());
604        mgr.record_status("b", "k", ReplicationStatus::Pending);
605        assert_eq!(
606            mgr.lookup_status("b", "k"),
607            Some(ReplicationStatus::Pending)
608        );
609        mgr.record_status("b", "k", ReplicationStatus::Completed);
610        assert_eq!(
611            mgr.lookup_status("b", "k"),
612            Some(ReplicationStatus::Completed)
613        );
614    }
615
616    #[test]
617    fn json_round_trip_preserves_config_and_statuses() {
618        let mgr = ReplicationManager::new();
619        mgr.put(
620            "src",
621            ReplicationConfig {
622                role: "arn:aws:iam::000:role/s4".into(),
623                rules: vec![rule("r1", 7, true, Some("docs/"), &[("env", "prod")], "dst")],
624            },
625        );
626        mgr.record_status("src", "docs/a.pdf", ReplicationStatus::Completed);
627        let json = mgr.to_json().expect("to_json");
628        let mgr2 = ReplicationManager::from_json(&json).expect("from_json");
629        assert_eq!(mgr.get("src"), mgr2.get("src"));
630        assert_eq!(
631            mgr2.lookup_status("src", "docs/a.pdf"),
632            Some(ReplicationStatus::Completed)
633        );
634    }
635
636    #[test]
637    fn delete_is_idempotent() {
638        let mgr = ReplicationManager::new();
639        mgr.delete("never-existed");
640        mgr.put(
641            "b",
642            ReplicationConfig {
643                role: "arn".into(),
644                rules: vec![rule("r1", 1, true, None, &[], "dst")],
645            },
646        );
647        mgr.delete("b");
648        assert!(mgr.get("b").is_none());
649    }
650
651    #[test]
652    fn put_replaces_previous_config() {
653        let mgr = ReplicationManager::new();
654        mgr.put(
655            "b",
656            ReplicationConfig {
657                role: "arn".into(),
658                rules: vec![rule("old", 1, true, None, &[], "dst-old")],
659            },
660        );
661        mgr.put(
662            "b",
663            ReplicationConfig {
664                role: "arn".into(),
665                rules: vec![rule("new", 1, true, None, &[], "dst-new")],
666            },
667        );
668        let cfg = mgr.get("b").expect("config");
669        assert_eq!(cfg.rules.len(), 1);
670        assert_eq!(cfg.rules[0].id, "new");
671        assert_eq!(cfg.rules[0].destination_bucket, "dst-new");
672    }
673
674    #[tokio::test]
675    async fn replicate_object_happy_path_marks_completed() {
676        type Captured = Vec<(String, String, bytes::Bytes, Option<HashMap<String, String>>)>;
677        let mgr = Arc::new(ReplicationManager::new());
678        let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
679        let captured_cl = Arc::clone(&captured);
680
681        let do_put = move |dest: String,
682                           key: String,
683                           body: bytes::Bytes,
684                           meta: Option<HashMap<String, String>>| {
685            let captured = Arc::clone(&captured_cl);
686            async move {
687                captured.lock().unwrap().push((dest, key, body, meta));
688                Ok::<(), String>(())
689            }
690        };
691
692        replicate_object(
693            rule("r1", 1, true, None, &[], "dst"),
694            "src".into(),
695            "obj.bin".into(),
696            bytes::Bytes::from_static(b"hello"),
697            Some(HashMap::from([("content-type".into(), "text/plain".into())])),
698            do_put,
699            Arc::clone(&mgr),
700        )
701        .await;
702
703        assert_eq!(
704            mgr.lookup_status("src", "obj.bin"),
705            Some(ReplicationStatus::Completed)
706        );
707        assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
708        let cap = captured.lock().unwrap();
709        assert_eq!(cap.len(), 1, "do_put must run exactly once on success");
710        assert_eq!(cap[0].0, "dst");
711        assert_eq!(cap[0].1, "obj.bin");
712        assert_eq!(cap[0].2.as_ref(), b"hello");
713        let meta = cap[0].3.as_ref().expect("metadata stamped");
714        assert_eq!(
715            meta.get("x-amz-replication-status").map(String::as_str),
716            Some("REPLICA"),
717            "destination meta must carry the REPLICA stamp"
718        );
719        assert_eq!(meta.get("content-type").map(String::as_str), Some("text/plain"));
720    }
721
722    #[tokio::test]
723    async fn replicate_object_failure_after_retry_budget_marks_failed_and_bumps_drop() {
724        let mgr = Arc::new(ReplicationManager::new());
725        let attempts: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
726        let attempts_cl = Arc::clone(&attempts);
727
728        let do_put = move |_dest: String,
729                           _key: String,
730                           _body: bytes::Bytes,
731                           _meta: Option<HashMap<String, String>>| {
732            let attempts = Arc::clone(&attempts_cl);
733            async move {
734                *attempts.lock().unwrap() += 1;
735                Err::<(), String>("simulated destination 5xx".into())
736            }
737        };
738
739        replicate_object(
740            rule("r-fail", 1, true, None, &[], "dst"),
741            "src".into(),
742            "doomed.bin".into(),
743            bytes::Bytes::from_static(b"x"),
744            None,
745            do_put,
746            Arc::clone(&mgr),
747        )
748        .await;
749
750        assert_eq!(
751            *attempts.lock().unwrap(),
752            RETRY_ATTEMPTS,
753            "must retry exactly the configured budget"
754        );
755        assert_eq!(
756            mgr.lookup_status("src", "doomed.bin"),
757            Some(ReplicationStatus::Failed)
758        );
759        assert_eq!(
760            mgr.dropped_total.load(Ordering::Relaxed),
761            1,
762            "drop counter must bump exactly once after retry budget exhausted"
763        );
764    }
765
766    #[test]
767    fn replication_status_aws_strings_match_spec() {
768        assert_eq!(ReplicationStatus::Pending.as_aws_str(), "PENDING");
769        assert_eq!(ReplicationStatus::Completed.as_aws_str(), "COMPLETED");
770        assert_eq!(ReplicationStatus::Failed.as_aws_str(), "FAILED");
771        assert_eq!(ReplicationStatus::Replica.as_aws_str(), "REPLICA");
772    }
773}