Skip to main content

s4_server/
replication.rs

1//! Bucket-to-bucket asynchronous replication (v0.6 #40).
2//!
3//! AWS S3 Cross-Region Replication (CRR) lets a bucket owner declare a
4//! `ReplicationConfiguration` whose rules say "for every PUT to this
5//! bucket that matches `<filter>`, asynchronously copy the new object to
6//! `<destination_bucket>`". The source object grows an
7//! `x-amz-replication-status` of `PENDING` → `COMPLETED` (or `FAILED`),
8//! the replica gets stamped `REPLICA`, and consumers can poll either
9//! HEAD to see how the replication is going.
10//!
11//! ## v0.6 #40 scope (single-instance only)
12//!
13//! - **Same S4 endpoint** — the source bucket and the destination bucket
14//!   live on the same `S4Service`. True cross-region (multi-instance,
15//!   wire-replicated) replication is a v0.7+ follow-up that needs a
16//!   `aws-sdk-s3` PUT to a remote endpoint with its own credentials.
17//! - **Async only** — the originating `put_object` returns as soon as
18//!   the source backend write is done. The replica PUT happens on a
19//!   detached `tokio::spawn` task and never blocks the client. There is
20//!   no synchronous `replication_required` mode (would defeat the whole
21//!   point of CRR being asynchronous in the first place).
22//! - **Retry budget = 3 attempts** with exponential backoff (50ms,
23//!   100ms, 200ms). On exhaustion the per-(bucket, key) status flips to
24//!   `Failed` and `dropped_total` is bumped + a warn-level log line is
25//!   emitted so operators see the loss in `s4_replication_dropped_total`.
26//! - **Highest-priority rule wins** when multiple rules match a single
27//!   object key (S3 spec). Ties are broken by declaration order
28//!   (deterministic for tests).
29//! - **`status_enabled = false` rules never match**, mirroring the AWS
30//!   `ReplicationRuleStatus::Disabled` semantics — the rule sits in the
31//!   configuration document but is inert.
32//! - **Replica is full-body** — there is no delta replication, no
33//!   incremental fetch, no batching. Every matching PUT triggers one
34//!   independent destination PUT.
35//!
36//! ## what is NOT in v0.6 #40
37//!
38//! - Delete-marker replication (S3's `DeleteMarkerReplication` block) —
39//!   v0.7+. Right now `delete_object` does not fan out a destination
40//!   delete; the replica drifts on the source's deletion.
41//! - Replication of multipart-completed objects through the per-part
42//!   copy path. The whole compose-then-PUT result of CMU is replicated
43//!   as a single PUT, which is fine for single-instance and matches
44//!   what AWS does for source objects ≤ 5 GiB.
45//! - SSE-KMS-encrypted replicas with KMS-key-id rewriting per the
46//!   `SourceSelectionCriteria` block (the source's wrapped DEK is
47//!   replicated as-is — fine for single-instance because the same KMS
48//!   backend unwraps both copies).
49//! - Replication metrics (RTC) — a v0.7+ follow-up that wires a
50//!   `replication_lag_seconds` histogram.
51
52use std::collections::HashMap;
53use std::sync::Arc;
54use std::sync::RwLock;
55use std::sync::atomic::{AtomicU64, Ordering};
56
57use serde::{Deserialize, Serialize};
58
59/// Per-(bucket, key) replication state, surfaced as the
60/// `x-amz-replication-status` HEAD/GET response header. Values match the
61/// AWS wire form exactly.
62#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
63pub enum ReplicationStatus {
64    /// Replication has been queued (a matching rule fired and the
65    /// dispatcher task has been spawned) but the destination PUT has
66    /// not yet succeeded.
67    Pending,
68    /// Replication has succeeded — the replica exists in the
69    /// destination bucket.
70    Completed,
71    /// Replication failed permanently (retry budget exhausted).
72    Failed,
73    /// Stamped on the destination side so the replica is
74    /// distinguishable from a normal PUT, matching AWS CRR's
75    /// "replica stamp" behaviour.
76    Replica,
77}
78
79impl ReplicationStatus {
80    /// AWS wire-string form. Caller stamps it on the response as the
81    /// `x-amz-replication-status` header.
82    #[must_use]
83    pub fn as_aws_str(&self) -> &'static str {
84        match self {
85            Self::Pending => "PENDING",
86            Self::Completed => "COMPLETED",
87            Self::Failed => "FAILED",
88            Self::Replica => "REPLICA",
89        }
90    }
91}
92
93/// Filter on a `ReplicationRule` — the AND of a key-prefix predicate
94/// and a tag predicate. AWS S3's wire form uses a sum type
95/// (`Prefix | Tag | And { Prefix, Tags }`); we collapse those into
96/// the single representation that the in-memory matcher needs.
97#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
98pub struct ReplicationFilter {
99    /// Empty / `None` means "any prefix".
100    pub prefix: Option<String>,
101    /// AND of every tag pair — every entry here must be present in the
102    /// object's tag set for the rule to fire. Empty means "no tag
103    /// predicate".
104    pub tags: Vec<(String, String)>,
105}
106
107/// One replication rule. Each rule independently decides whether to
108/// copy an object based on the (key, tags) tuple; the replication
109/// manager picks the highest-priority matching rule when multiple
110/// fire on the same object.
111#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
112pub struct ReplicationRule {
113    /// Operator-supplied id (max 255 chars per AWS).
114    pub id: String,
115    /// Higher number = higher priority (S3 spec). When two rules match
116    /// the same key, the higher `priority` wins; ties broken by
117    /// declaration order.
118    pub priority: u32,
119    /// `false` makes the rule inert without removing it from the
120    /// configuration document — mirrors AWS's `Disabled` status.
121    pub status_enabled: bool,
122    /// Subset of source-bucket objects this rule applies to.
123    pub filter: ReplicationFilter,
124    /// Where to copy matching objects. Plain bucket name (no ARN) for
125    /// the v0.6 #40 single-instance scope.
126    pub destination_bucket: String,
127    /// Optional storage-class override on the replica. `None` = keep
128    /// the source's class (S3 default).
129    pub destination_storage_class: Option<String>,
130}
131
132/// Per-bucket replication configuration.
133#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
134pub struct ReplicationConfig {
135    /// Placeholder ARN — not consumed by S4 itself, kept for AWS wire
136    /// compatibility (the `Role` field is mandatory in the
137    /// `PutBucketReplication` XML payload).
138    pub role: String,
139    pub rules: Vec<ReplicationRule>,
140}
141
142/// Per-(source_bucket, source_key) replication status entry, paired
143/// with the **generation token** of the source PUT that produced it.
144///
145/// ## v0.8.2 #61 — generation token
146///
147/// Each `put_object` (or `complete_multipart_upload`) on a source key
148/// pulls a fresh, monotonically-increasing `generation` from the
149/// manager. The detached replication task carries that generation and
150/// only stamps the status when its generation is `>=` the stored one
151/// (CAS-style). A stale retry whose generation has been overtaken by a
152/// newer PUT is silently dropped, so the destination bucket never gets
153/// rolled back to older bytes. See [`ReplicationManager::next_generation`]
154/// + [`ReplicationManager::record_status_if_newer`].
155#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
156pub struct ReplicationStatusEntry {
157    pub status: ReplicationStatus,
158    pub generation: u64,
159}
160
161/// JSON snapshot — `bucket -> ReplicationConfig`. Mirrors the shape of
162/// `notifications::NotificationSnapshot` so operators can hand-edit
163/// configurations across restart cycles.
164///
165/// ## v0.8.2 #61 back-compat
166///
167/// Pre-#61 snapshots stored `Vec<((String, String), ReplicationStatus)>`
168/// (status only; no generation). The new format stores
169/// `Vec<((String, String), ReplicationStatusEntry)>`. Serde is set up
170/// with `#[serde(untagged)]` on a wrapper enum so old snapshots load
171/// with `generation = 0`. A `generation = 0` entry is never stale —
172/// the very next PUT mints `generation = 1` which wins the CAS — so
173/// the migration is loss-free.
174#[derive(Debug, Default, Serialize, Deserialize)]
175struct ReplicationSnapshot {
176    by_bucket: HashMap<String, ReplicationConfig>,
177    /// Per-(bucket, key) replication status. Persisted so a restart
178    /// doesn't lose the COMPLETED stamp on already-replicated
179    /// objects.
180    statuses: Vec<((String, String), StatusOrEntry)>,
181    /// v0.8.2 #61: persist the next generation so a restart doesn't
182    /// reissue tokens that are still in-flight. Optional for
183    /// back-compat — pre-#61 snapshots restore with `next_generation = 1`.
184    #[serde(default)]
185    next_generation: u64,
186}
187
188/// Back-compat wrapper for snapshot deserialisation: accepts either a
189/// bare `ReplicationStatus` (pre-#61 schema) or a full
190/// `ReplicationStatusEntry`. `serde(untagged)` tries the variants in
191/// declaration order — the more-structured `Entry` variant first so
192/// new snapshots round-trip, falling back to bare `Status` for old
193/// snapshots.
194#[derive(Clone, Debug, Serialize, Deserialize)]
195#[serde(untagged)]
196enum StatusOrEntry {
197    Entry(ReplicationStatusEntry),
198    Status(ReplicationStatus),
199}
200
201impl StatusOrEntry {
202    fn into_entry(self) -> ReplicationStatusEntry {
203        match self {
204            Self::Entry(e) => e,
205            Self::Status(s) => ReplicationStatusEntry {
206                status: s,
207                generation: 0,
208            },
209        }
210    }
211}
212
213/// In-memory manager of per-bucket replication configurations + per-
214/// (bucket, key) replication statuses.
215pub struct ReplicationManager {
216    by_bucket: RwLock<HashMap<String, ReplicationConfig>>,
217    /// Per-(source_bucket, key) replication status entry (status +
218    /// generation token). Looked up by `head_object` / `get_object` to
219    /// stamp `x-amz-replication-status` on the response.
220    statuses: RwLock<HashMap<(String, String), ReplicationStatusEntry>>,
221    /// v0.8.2 #61: monotonic per-source-PUT generation counter. Each
222    /// `put_object` (or `complete_multipart_upload`) on a replicated
223    /// source bucket calls [`Self::next_generation`] before spawning
224    /// its detached replication task. The dispatcher carries the
225    /// generation through to [`Self::record_status_if_newer`], which
226    /// drops the stamp + the destination write when a newer
227    /// generation has already won — guaranteeing the destination
228    /// can't be rolled back by a slow retry.
229    pub next_generation: AtomicU64,
230    /// Bumped each time the dispatcher exhausts its retry budget on a
231    /// destination PUT. Exposed publicly so the metrics layer can poll
232    /// without taking the configuration lock.
233    pub dropped_total: AtomicU64,
234}
235
236impl Default for ReplicationManager {
237    fn default() -> Self {
238        Self::new()
239    }
240}
241
242impl std::fmt::Debug for ReplicationManager {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        f.debug_struct("ReplicationManager")
245            .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
246            .finish_non_exhaustive()
247    }
248}
249
250impl ReplicationManager {
251    /// Empty manager — no bucket has any replication rules. The
252    /// generation counter starts at 1 so the first PUT-issued token is
253    /// `1` (a stored entry's `generation = 0` from a pre-#61 snapshot
254    /// is then strictly less and the very next PUT wins the CAS).
255    #[must_use]
256    pub fn new() -> Self {
257        Self {
258            by_bucket: RwLock::new(HashMap::new()),
259            statuses: RwLock::new(HashMap::new()),
260            next_generation: AtomicU64::new(1),
261            dropped_total: AtomicU64::new(0),
262        }
263    }
264
265    /// v0.8.2 #61: mint a fresh, monotonically-increasing generation
266    /// token. Caller is the per-source-PUT dispatcher fork (the body-
267    /// bearing `put_object` branch, the body-less `put_object` branch,
268    /// and `complete_multipart_upload`). The token is then carried
269    /// through [`replicate_object`] to [`Self::record_status_if_newer`]
270    /// so a stale retry can be detected and dropped.
271    ///
272    /// Uses `Relaxed` ordering — we only need uniqueness +
273    /// monotonicity per atomic; the cross-thread happens-before
274    /// between PUT-A's spawn and the dispatcher reading the body is
275    /// already established by `tokio::spawn`'s implicit
276    /// `Acquire/Release` on the task queue.
277    pub fn next_generation(&self) -> u64 {
278        self.next_generation.fetch_add(1, Ordering::Relaxed)
279    }
280
281    /// `put_bucket_replication` handler entry. The bucket's existing
282    /// configuration is fully replaced (S3 spec — `PutBucketReplication`
283    /// is upsert-style at the bucket scope, not per-rule patch).
284    pub fn put(&self, bucket: &str, config: ReplicationConfig) {
285        self.by_bucket
286            .write()
287            .expect("replication state RwLock poisoned")
288            .insert(bucket.to_owned(), config);
289    }
290
291    /// `get_bucket_replication` handler entry. Returns `None` when
292    /// nothing is registered (AWS S3 returns
293    /// `ReplicationConfigurationNotFoundError` in that case; the
294    /// service-layer handler maps `None` accordingly).
295    #[must_use]
296    pub fn get(&self, bucket: &str) -> Option<ReplicationConfig> {
297        self.by_bucket
298            .read()
299            .expect("replication state RwLock poisoned")
300            .get(bucket)
301            .cloned()
302    }
303
304    /// Drop the configuration for `bucket`. Idempotent.
305    pub fn delete(&self, bucket: &str) {
306        self.by_bucket
307            .write()
308            .expect("replication state RwLock poisoned")
309            .remove(bucket);
310    }
311
312    /// Serialise the entire manager state (configurations + per-key
313    /// statuses + next generation counter) to JSON. The status entries
314    /// are emitted in the v0.8.2 #61 schema (`ReplicationStatusEntry`);
315    /// readers built before #61 will see the embedded
316    /// `{ status, generation }` shape via the `untagged` enum and
317    /// (older binaries) reject — but the production restart path always
318    /// runs the same binary against its own snapshot.
319    pub fn to_json(&self) -> Result<String, serde_json::Error> {
320        let snap = ReplicationSnapshot {
321            by_bucket: self
322                .by_bucket
323                .read()
324                .expect("replication state RwLock poisoned")
325                .clone(),
326            statuses: self
327                .statuses
328                .read()
329                .expect("replication state RwLock poisoned")
330                .iter()
331                .map(|(k, v)| (k.clone(), StatusOrEntry::Entry(v.clone())))
332                .collect(),
333            next_generation: self.next_generation.load(Ordering::Relaxed),
334        };
335        serde_json::to_string(&snap)
336    }
337
338    /// Restore a manager from a previously-emitted snapshot. The
339    /// `dropped_total` counter is reset to 0 — historical drops are
340    /// runtime metrics, not configuration.
341    ///
342    /// ## Back-compat (v0.8.2 #61)
343    ///
344    /// Pre-#61 snapshots store bare `ReplicationStatus` (no
345    /// generation). The `untagged` `StatusOrEntry` enum picks them up
346    /// and assigns `generation = 0`, which the CAS-style
347    /// [`Self::record_status_if_newer`] treats as "always overridable
348    /// by the next real PUT" — guaranteed loss-free migration. The
349    /// `next_generation` counter defaults to `1` when the snapshot
350    /// predates #61 (= `serde(default)` on the field).
351    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
352        let snap: ReplicationSnapshot = serde_json::from_str(s)?;
353        let statuses: HashMap<(String, String), ReplicationStatusEntry> = snap
354            .statuses
355            .into_iter()
356            .map(|(k, v)| (k, v.into_entry()))
357            .collect();
358        // Pre-#61 snapshots come back with `next_generation = 0`
359        // (`serde(default)` on `u64`); start fresh at 1 so all newly-
360        // minted tokens are strictly greater than the legacy
361        // `generation = 0` entries.
362        let next_gen = snap.next_generation.max(1);
363        Ok(Self {
364            by_bucket: RwLock::new(snap.by_bucket),
365            statuses: RwLock::new(statuses),
366            next_generation: AtomicU64::new(next_gen),
367            dropped_total: AtomicU64::new(0),
368        })
369    }
370
371    /// Match an object against the bucket's rules and return the
372    /// highest-priority enabled rule whose filter matches. Returns
373    /// `None` when no rule matches (or no configuration is registered
374    /// for the bucket). Ties on `priority` are broken by declaration
375    /// order — the first such rule wins.
376    #[must_use]
377    pub fn match_rule(
378        &self,
379        bucket: &str,
380        key: &str,
381        object_tags: &[(String, String)],
382    ) -> Option<ReplicationRule> {
383        let map = self
384            .by_bucket
385            .read()
386            .expect("replication state RwLock poisoned");
387        let cfg = map.get(bucket)?;
388        let mut best: Option<&ReplicationRule> = None;
389        for rule in &cfg.rules {
390            if !rule.status_enabled {
391                continue;
392            }
393            if !filter_matches(&rule.filter, key, object_tags) {
394                continue;
395            }
396            best = match best {
397                None => Some(rule),
398                Some(prev) if rule.priority > prev.priority => Some(rule),
399                Some(prev) => Some(prev),
400            };
401        }
402        best.cloned()
403    }
404
405    /// Stamp the per-(bucket, key) replication status with no
406    /// generation guard. Replaces any previous entry. **Generation is
407    /// reset to 0** (= overridable by the next real PUT) — callers
408    /// that hold a generation token must use
409    /// [`Self::record_status_if_newer`] instead.
410    ///
411    /// Use cases (kept for back-compat + the eager `Pending` stamp the
412    /// service-layer dispatcher emits before spawning the actual
413    /// replication task):
414    /// - Eager `Pending` stamp synchronously alongside the source PUT
415    ///   so a HEAD between PUT-return and dispatcher-completion sees
416    ///   `PENDING` instead of `None`.
417    /// - Tests that don't care about generation (legacy assertions).
418    pub fn record_status(&self, bucket: &str, key: &str, status: ReplicationStatus) {
419        self.statuses
420            .write()
421            .expect("replication state RwLock poisoned")
422            .insert(
423                (bucket.to_owned(), key.to_owned()),
424                ReplicationStatusEntry {
425                    status,
426                    generation: 0,
427                },
428            );
429    }
430
431    /// v0.8.2 #61: CAS-style stamp. Only updates the entry when
432    /// `generation >= entry.generation`; rejects the update (returns
433    /// `false`) when `generation < entry.generation` because a newer
434    /// PUT has already won and we must not roll the source's status
435    /// back to a stale terminal state.
436    ///
437    /// ## Returns
438    ///
439    /// - `true` — the stamp was accepted; the caller may proceed with
440    ///   the destination-bucket PUT (in [`replicate_object`]) /
441    ///   declare success.
442    /// - `false` — a strictly-newer generation has already stamped the
443    ///   entry; the caller must **drop the destination write** to
444    ///   avoid overwriting newer bytes with a stale retry's body.
445    ///
446    /// Equality (`generation == entry.generation`) is accepted because
447    /// the same generation may legitimately stamp twice across the
448    /// dispatcher's retry budget (`Pending` → `Completed` on the same
449    /// task).
450    pub fn record_status_if_newer(
451        &self,
452        bucket: &str,
453        key: &str,
454        generation: u64,
455        status: ReplicationStatus,
456    ) -> bool {
457        let mut map = self
458            .statuses
459            .write()
460            .expect("replication state RwLock poisoned");
461        let entry = map
462            .entry((bucket.to_owned(), key.to_owned()))
463            .or_insert(ReplicationStatusEntry {
464                status: ReplicationStatus::Pending,
465                generation: 0,
466            });
467        if generation < entry.generation {
468            return false;
469        }
470        entry.generation = generation;
471        entry.status = status;
472        true
473    }
474
475    /// Look up the recorded replication status for `(bucket, key)`.
476    /// Returns `None` when no PUT to this key has triggered
477    /// replication (= the object is not under any replication rule, or
478    /// it predates the rule's creation).
479    ///
480    /// The `generation` field of the entry is intentionally not
481    /// surfaced here — it's an internal CAS guard, not part of the
482    /// AWS wire shape.
483    #[must_use]
484    pub fn lookup_status(&self, bucket: &str, key: &str) -> Option<ReplicationStatus> {
485        self.statuses
486            .read()
487            .expect("replication state RwLock poisoned")
488            .get(&(bucket.to_owned(), key.to_owned()))
489            .map(|entry| entry.status.clone())
490    }
491}
492
493/// AND of (prefix predicate, every tag pair). An empty / `None` prefix
494/// means "any prefix"; an empty tag list means "no tag predicate".
495fn filter_matches(
496    filter: &ReplicationFilter,
497    key: &str,
498    object_tags: &[(String, String)],
499) -> bool {
500    if let Some(p) = filter.prefix.as_deref()
501        && !p.is_empty()
502        && !key.starts_with(p)
503    {
504        return false;
505    }
506    for (tk, tv) in &filter.tags {
507        if !object_tags
508            .iter()
509            .any(|(ok, ov)| ok == tk && ov == tv)
510        {
511            return false;
512        }
513    }
514    true
515}
516
517const RETRY_ATTEMPTS: u32 = 3;
518const RETRY_BASE_MS: u64 = 50;
519
520/// Replicate one source-bucket object to the rule's destination bucket.
521///
522/// The caller supplies a `do_put` callback that performs the actual
523/// destination-bucket PUT (so unit tests can drive the dispatcher
524/// without needing a full backend). The callback receives:
525/// `(destination_bucket, key, body, metadata)` and returns a
526/// `Result<(), String>` whose `Err` triggers the retry / failure path.
527///
528/// Behaviour:
529/// - Stamps the destination metadata with `x-amz-replication-status:
530///   REPLICA` so a HEAD on the replica is distinguishable.
531/// - On callback success, records `(source_bucket, source_key) →
532///   Completed` in the manager **iff this task's `generation` is not
533///   already overtaken** (CAS-style guard — see [`ReplicationManager::
534///   record_status_if_newer`]).
535/// - On callback failure, retries up to [`RETRY_ATTEMPTS`] times with
536///   exponential backoff (50ms / 100ms / 200ms). After the budget is
537///   exhausted, records `Failed`, bumps `dropped_total`, and emits the
538///   matching Prometheus counter — also CAS-guarded.
539///
540/// ## v0.8.2 #61 — generation token + destination key override
541///
542/// The two new parameters fix the audit's C-1 + C-3 findings:
543///
544/// - `generation` — monotonic per-source-PUT token from
545///   [`ReplicationManager::next_generation`]. CAS-stamps the source's
546///   status + **suppresses the actual destination PUT** when the
547///   token has been overtaken. Without this guard, a slow retry of
548///   PUT-A could land in the destination *after* PUT-B has already
549///   replicated — rolling the destination back to A's bytes. With
550///   the guard, A's task notices B's higher generation has won and
551///   silently drops its destination write.
552/// - `destination_key_override` — the storage-side key the destination
553///   bucket should write under. For an unversioned source this is
554///   `None` and the dispatcher falls back to the source's logical key
555///   (= the AWS-default behaviour). For a **versioning-Enabled**
556///   source the caller passes `Some(versioned_shadow_key(key, vid))`
557///   so the destination's version chain receives the new version
558///   under the same shadow path the source uses (= a `?versionId=`
559///   GET on the destination resolves through the same shadow-key
560///   lookup as the source).
561// 9 args is the post-#61 wire-shape: rule + (source_bucket, source_key,
562// body, metadata) + do_put + manager + (generation, dest_override). A
563// shape struct would split the call site without buying anything; the
564// caller (`spawn_replication_if_matched`) constructs every field
565// inline, so the indirection is pure noise.
566#[allow(clippy::too_many_arguments)]
567pub async fn replicate_object<F, Fut>(
568    rule: ReplicationRule,
569    source_bucket: String,
570    source_key: String,
571    body: bytes::Bytes,
572    metadata: Option<HashMap<String, String>>,
573    do_put: F,
574    manager: Arc<ReplicationManager>,
575    generation: u64,
576    destination_key_override: Option<String>,
577) where
578    F: Fn(String, String, bytes::Bytes, Option<HashMap<String, String>>) -> Fut,
579    Fut: std::future::Future<Output = Result<(), String>>,
580{
581    // Replica metadata = source metadata + `x-amz-replication-status:
582    // REPLICA` stamp. Keeping the source's compression / encryption
583    // metadata intact means a GET on the replica decodes through the
584    // same path the source would.
585    let mut replica_meta = metadata.unwrap_or_default();
586    replica_meta.insert(
587        "x-amz-replication-status".to_owned(),
588        ReplicationStatus::Replica.as_aws_str().to_owned(),
589    );
590    if let Some(ref sc) = rule.destination_storage_class {
591        replica_meta.insert("x-amz-storage-class".to_owned(), sc.clone());
592    }
593
594    let dest_bucket = rule.destination_bucket.clone();
595    // v0.8.2 #61: when the source PUT was a versioned write, the
596    // override carries the storage-side shadow key
597    // (`<key>.__s4ver__/<vid>`); otherwise we use the logical key.
598    let dest_key = destination_key_override.unwrap_or_else(|| source_key.clone());
599    for attempt in 0..RETRY_ATTEMPTS {
600        // v0.8.2 #61 C-3: pre-PUT generation check. If a newer
601        // generation has already stamped a terminal status on this
602        // (bucket, key), our retry is stale — silently drop the
603        // destination write so we don't roll the destination back to
604        // older bytes. We use `record_status_if_newer` with the
605        // **current** entry's status as a no-op when we're not stale,
606        // but the cheap path is to peek and bail.
607        if let Some(entry) = manager
608            .statuses
609            .read()
610            .expect("replication state RwLock poisoned")
611            .get(&(source_bucket.clone(), source_key.clone()))
612            .cloned()
613            && entry.generation > generation
614        {
615            tracing::debug!(
616                source_bucket = %source_bucket,
617                source_key = %source_key,
618                dest_bucket = %dest_bucket,
619                rule_id = %rule.id,
620                generation,
621                stored_generation = entry.generation,
622                "S4 replication: stale generation, dropping destination PUT"
623            );
624            return;
625        }
626        let result = do_put(
627            dest_bucket.clone(),
628            dest_key.clone(),
629            body.clone(),
630            Some(replica_meta.clone()),
631        )
632        .await;
633        match result {
634            Ok(()) => {
635                let accepted = manager.record_status_if_newer(
636                    &source_bucket,
637                    &source_key,
638                    generation,
639                    ReplicationStatus::Completed,
640                );
641                if !accepted {
642                    // v0.8.2 #61 C-3: the destination PUT raced — a
643                    // newer generation stamped between our pre-check
644                    // and our `do_put.await`. The destination now
645                    // *might* hold our stale bytes (the newer PUT
646                    // could have landed after ours) but we stop
647                    // re-stamping and let the newer task overwrite on
648                    // its own success. Bumps the metric so operators
649                    // see the race surfaced.
650                    crate::metrics::record_replication_drop(&source_bucket);
651                    manager.dropped_total.fetch_add(1, Ordering::Relaxed);
652                    tracing::warn!(
653                        source_bucket = %source_bucket,
654                        source_key = %source_key,
655                        dest_bucket = %dest_bucket,
656                        rule_id = %rule.id,
657                        generation,
658                        "S4 replication: completed but a newer generation has won; \
659                         status not stamped"
660                    );
661                    return;
662                }
663                crate::metrics::record_replication_replicated(&source_bucket, &dest_bucket);
664                tracing::debug!(
665                    source_bucket = %source_bucket,
666                    source_key = %source_key,
667                    dest_bucket = %dest_bucket,
668                    rule_id = %rule.id,
669                    generation,
670                    "S4 replication: COMPLETED"
671                );
672                return;
673            }
674            Err(e) => {
675                if attempt + 1 < RETRY_ATTEMPTS {
676                    let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
677                    tracing::warn!(
678                        source_bucket = %source_bucket,
679                        source_key = %source_key,
680                        dest_bucket = %dest_bucket,
681                        attempt = attempt + 1,
682                        generation,
683                        error = %e,
684                        "S4 replication: attempt failed, retrying"
685                    );
686                    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
687                    continue;
688                }
689                // CAS the terminal Failed too — a newer generation that
690                // succeeded must not be rolled back to Failed.
691                let accepted = manager.record_status_if_newer(
692                    &source_bucket,
693                    &source_key,
694                    generation,
695                    ReplicationStatus::Failed,
696                );
697                manager.dropped_total.fetch_add(1, Ordering::Relaxed);
698                crate::metrics::record_replication_drop(&source_bucket);
699                tracing::warn!(
700                    source_bucket = %source_bucket,
701                    source_key = %source_key,
702                    dest_bucket = %dest_bucket,
703                    rule_id = %rule.id,
704                    generation,
705                    error = %e,
706                    accepted_failed_stamp = accepted,
707                    "S4 replication: FAILED after {RETRY_ATTEMPTS} attempts (drop counter bumped)"
708                );
709                return;
710            }
711        }
712    }
713}
714
715#[cfg(test)]
716mod tests {
717    use super::*;
718    use std::sync::Mutex;
719
720    fn rule(
721        id: &str,
722        priority: u32,
723        enabled: bool,
724        prefix: Option<&str>,
725        tags: &[(&str, &str)],
726        dest: &str,
727    ) -> ReplicationRule {
728        ReplicationRule {
729            id: id.to_owned(),
730            priority,
731            status_enabled: enabled,
732            filter: ReplicationFilter {
733                prefix: prefix.map(str::to_owned),
734                tags: tags
735                    .iter()
736                    .map(|(k, v)| ((*k).to_owned(), (*v).to_owned()))
737                    .collect(),
738            },
739            destination_bucket: dest.to_owned(),
740            destination_storage_class: None,
741        }
742    }
743
744    #[test]
745    fn match_rule_prefix_filter_match_and_miss() {
746        let mgr = ReplicationManager::new();
747        mgr.put(
748            "src",
749            ReplicationConfig {
750                role: "arn:aws:iam::000:role/s4-test".into(),
751                rules: vec![rule("r1", 1, true, Some("logs/"), &[], "dst")],
752            },
753        );
754        assert!(mgr.match_rule("src", "logs/2026/01/01.log", &[]).is_some());
755        assert!(mgr.match_rule("src", "uploads/foo.bin", &[]).is_none());
756    }
757
758    #[test]
759    fn match_rule_no_config_for_bucket() {
760        let mgr = ReplicationManager::new();
761        assert!(mgr.match_rule("ghost", "k", &[]).is_none());
762    }
763
764    #[test]
765    fn match_rule_priority_picks_highest() {
766        let mgr = ReplicationManager::new();
767        mgr.put(
768            "src",
769            ReplicationConfig {
770                role: "arn".into(),
771                rules: vec![
772                    rule("low", 1, true, Some(""), &[], "dst-low"),
773                    rule("high", 10, true, Some(""), &[], "dst-high"),
774                    rule("mid", 5, true, Some(""), &[], "dst-mid"),
775                ],
776            },
777        );
778        let picked = mgr.match_rule("src", "any.bin", &[]).expect("match");
779        assert_eq!(picked.id, "high");
780        assert_eq!(picked.destination_bucket, "dst-high");
781    }
782
783    #[test]
784    fn match_rule_priority_tie_breaker_is_declaration_order() {
785        let mgr = ReplicationManager::new();
786        mgr.put(
787            "src",
788            ReplicationConfig {
789                role: "arn".into(),
790                rules: vec![
791                    rule("first", 5, true, Some(""), &[], "dst-first"),
792                    rule("second", 5, true, Some(""), &[], "dst-second"),
793                ],
794            },
795        );
796        let picked = mgr.match_rule("src", "k", &[]).expect("match");
797        assert_eq!(picked.id, "first", "tie on priority must keep the earlier rule");
798    }
799
800    #[test]
801    fn match_rule_tag_filter_and_of_all_tags() {
802        let mgr = ReplicationManager::new();
803        mgr.put(
804            "src",
805            ReplicationConfig {
806                role: "arn".into(),
807                rules: vec![rule(
808                    "r-tags",
809                    1,
810                    true,
811                    None,
812                    &[("env", "prod"), ("tier", "gold")],
813                    "dst",
814                )],
815            },
816        );
817        // Both tags present → match.
818        assert!(
819            mgr.match_rule(
820                "src",
821                "k",
822                &[
823                    ("env".into(), "prod".into()),
824                    ("tier".into(), "gold".into()),
825                    ("extra".into(), "ignored".into())
826                ]
827            )
828            .is_some(),
829            "all required tags present (extras OK) must match"
830        );
831        // Only one tag → AND fails.
832        assert!(
833            mgr.match_rule(
834                "src",
835                "k",
836                &[("env".into(), "prod".into())]
837            )
838            .is_none(),
839            "missing one of the required tags must not match"
840        );
841        // Wrong tag value → AND fails.
842        assert!(
843            mgr.match_rule(
844                "src",
845                "k",
846                &[
847                    ("env".into(), "dev".into()),
848                    ("tier".into(), "gold".into())
849                ]
850            )
851            .is_none(),
852            "wrong value on a required tag must not match"
853        );
854    }
855
856    #[test]
857    fn match_rule_status_disabled_never_matches() {
858        let mgr = ReplicationManager::new();
859        mgr.put(
860            "src",
861            ReplicationConfig {
862                role: "arn".into(),
863                rules: vec![rule("disabled", 100, false, None, &[], "dst")],
864            },
865        );
866        assert!(
867            mgr.match_rule("src", "anything", &[]).is_none(),
868            "status_enabled=false must not match even at high priority"
869        );
870    }
871
872    #[test]
873    fn record_and_lookup_status_round_trip() {
874        let mgr = ReplicationManager::new();
875        assert!(mgr.lookup_status("b", "k").is_none());
876        mgr.record_status("b", "k", ReplicationStatus::Pending);
877        assert_eq!(
878            mgr.lookup_status("b", "k"),
879            Some(ReplicationStatus::Pending)
880        );
881        mgr.record_status("b", "k", ReplicationStatus::Completed);
882        assert_eq!(
883            mgr.lookup_status("b", "k"),
884            Some(ReplicationStatus::Completed)
885        );
886    }
887
888    #[test]
889    fn json_round_trip_preserves_config_and_statuses() {
890        let mgr = ReplicationManager::new();
891        mgr.put(
892            "src",
893            ReplicationConfig {
894                role: "arn:aws:iam::000:role/s4".into(),
895                rules: vec![rule("r1", 7, true, Some("docs/"), &[("env", "prod")], "dst")],
896            },
897        );
898        mgr.record_status("src", "docs/a.pdf", ReplicationStatus::Completed);
899        let json = mgr.to_json().expect("to_json");
900        let mgr2 = ReplicationManager::from_json(&json).expect("from_json");
901        assert_eq!(mgr.get("src"), mgr2.get("src"));
902        assert_eq!(
903            mgr2.lookup_status("src", "docs/a.pdf"),
904            Some(ReplicationStatus::Completed)
905        );
906    }
907
908    #[test]
909    fn delete_is_idempotent() {
910        let mgr = ReplicationManager::new();
911        mgr.delete("never-existed");
912        mgr.put(
913            "b",
914            ReplicationConfig {
915                role: "arn".into(),
916                rules: vec![rule("r1", 1, true, None, &[], "dst")],
917            },
918        );
919        mgr.delete("b");
920        assert!(mgr.get("b").is_none());
921    }
922
923    #[test]
924    fn put_replaces_previous_config() {
925        let mgr = ReplicationManager::new();
926        mgr.put(
927            "b",
928            ReplicationConfig {
929                role: "arn".into(),
930                rules: vec![rule("old", 1, true, None, &[], "dst-old")],
931            },
932        );
933        mgr.put(
934            "b",
935            ReplicationConfig {
936                role: "arn".into(),
937                rules: vec![rule("new", 1, true, None, &[], "dst-new")],
938            },
939        );
940        let cfg = mgr.get("b").expect("config");
941        assert_eq!(cfg.rules.len(), 1);
942        assert_eq!(cfg.rules[0].id, "new");
943        assert_eq!(cfg.rules[0].destination_bucket, "dst-new");
944    }
945
946    #[tokio::test]
947    async fn replicate_object_happy_path_marks_completed() {
948        type Captured = Vec<(String, String, bytes::Bytes, Option<HashMap<String, String>>)>;
949        let mgr = Arc::new(ReplicationManager::new());
950        let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
951        let captured_cl = Arc::clone(&captured);
952
953        let do_put = move |dest: String,
954                           key: String,
955                           body: bytes::Bytes,
956                           meta: Option<HashMap<String, String>>| {
957            let captured = Arc::clone(&captured_cl);
958            async move {
959                captured.lock().unwrap().push((dest, key, body, meta));
960                Ok::<(), String>(())
961            }
962        };
963
964        replicate_object(
965            rule("r1", 1, true, None, &[], "dst"),
966            "src".into(),
967            "obj.bin".into(),
968            bytes::Bytes::from_static(b"hello"),
969            Some(HashMap::from([("content-type".into(), "text/plain".into())])),
970            do_put,
971            Arc::clone(&mgr),
972            mgr.next_generation(),
973            None,
974        )
975        .await;
976
977        assert_eq!(
978            mgr.lookup_status("src", "obj.bin"),
979            Some(ReplicationStatus::Completed)
980        );
981        assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
982        let cap = captured.lock().unwrap();
983        assert_eq!(cap.len(), 1, "do_put must run exactly once on success");
984        assert_eq!(cap[0].0, "dst");
985        assert_eq!(cap[0].1, "obj.bin");
986        assert_eq!(cap[0].2.as_ref(), b"hello");
987        let meta = cap[0].3.as_ref().expect("metadata stamped");
988        assert_eq!(
989            meta.get("x-amz-replication-status").map(String::as_str),
990            Some("REPLICA"),
991            "destination meta must carry the REPLICA stamp"
992        );
993        assert_eq!(meta.get("content-type").map(String::as_str), Some("text/plain"));
994    }
995
996    #[tokio::test]
997    async fn replicate_object_failure_after_retry_budget_marks_failed_and_bumps_drop() {
998        let mgr = Arc::new(ReplicationManager::new());
999        let attempts: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
1000        let attempts_cl = Arc::clone(&attempts);
1001
1002        let do_put = move |_dest: String,
1003                           _key: String,
1004                           _body: bytes::Bytes,
1005                           _meta: Option<HashMap<String, String>>| {
1006            let attempts = Arc::clone(&attempts_cl);
1007            async move {
1008                *attempts.lock().unwrap() += 1;
1009                Err::<(), String>("simulated destination 5xx".into())
1010            }
1011        };
1012
1013        replicate_object(
1014            rule("r-fail", 1, true, None, &[], "dst"),
1015            "src".into(),
1016            "doomed.bin".into(),
1017            bytes::Bytes::from_static(b"x"),
1018            None,
1019            do_put,
1020            Arc::clone(&mgr),
1021            mgr.next_generation(),
1022            None,
1023        )
1024        .await;
1025
1026        assert_eq!(
1027            *attempts.lock().unwrap(),
1028            RETRY_ATTEMPTS,
1029            "must retry exactly the configured budget"
1030        );
1031        assert_eq!(
1032            mgr.lookup_status("src", "doomed.bin"),
1033            Some(ReplicationStatus::Failed)
1034        );
1035        assert_eq!(
1036            mgr.dropped_total.load(Ordering::Relaxed),
1037            1,
1038            "drop counter must bump exactly once after retry budget exhausted"
1039        );
1040    }
1041
1042    #[test]
1043    fn replication_status_aws_strings_match_spec() {
1044        assert_eq!(ReplicationStatus::Pending.as_aws_str(), "PENDING");
1045        assert_eq!(ReplicationStatus::Completed.as_aws_str(), "COMPLETED");
1046        assert_eq!(ReplicationStatus::Failed.as_aws_str(), "FAILED");
1047        assert_eq!(ReplicationStatus::Replica.as_aws_str(), "REPLICA");
1048    }
1049
1050    // ----- v0.8.2 #61: generation token CAS unit tests -----
1051
1052    #[test]
1053    fn record_status_if_newer_accepts_higher_generation() {
1054        let mgr = ReplicationManager::new();
1055        // First stamp at gen=5 — no prior entry, accepted.
1056        assert!(mgr.record_status_if_newer(
1057            "b",
1058            "k",
1059            5,
1060            ReplicationStatus::Pending,
1061        ));
1062        // Higher generation overrides.
1063        assert!(mgr.record_status_if_newer(
1064            "b",
1065            "k",
1066            7,
1067            ReplicationStatus::Completed,
1068        ));
1069        assert_eq!(
1070            mgr.lookup_status("b", "k"),
1071            Some(ReplicationStatus::Completed)
1072        );
1073    }
1074
1075    #[test]
1076    fn record_status_if_newer_rejects_stale_generation() {
1077        let mgr = ReplicationManager::new();
1078        // Newer PUT lands first.
1079        assert!(mgr.record_status_if_newer(
1080            "b",
1081            "k",
1082            10,
1083            ReplicationStatus::Completed,
1084        ));
1085        // Older retry must be rejected — destination must not roll
1086        // back to "alpha" once "beta" has stamped Completed.
1087        let accepted = mgr.record_status_if_newer(
1088            "b",
1089            "k",
1090            3,
1091            ReplicationStatus::Completed,
1092        );
1093        assert!(!accepted, "stale generation must be rejected");
1094        // Stored entry stays at the newer generation's terminal state.
1095        assert_eq!(
1096            mgr.lookup_status("b", "k"),
1097            Some(ReplicationStatus::Completed)
1098        );
1099    }
1100
1101    #[test]
1102    fn record_status_if_newer_accepts_equal_generation() {
1103        // Same generation may legitimately re-stamp (Pending →
1104        // Completed transition on the same task). The CAS is `>=`
1105        // not `>`.
1106        let mgr = ReplicationManager::new();
1107        assert!(mgr.record_status_if_newer(
1108            "b",
1109            "k",
1110            42,
1111            ReplicationStatus::Pending,
1112        ));
1113        assert!(mgr.record_status_if_newer(
1114            "b",
1115            "k",
1116            42,
1117            ReplicationStatus::Completed,
1118        ));
1119        assert_eq!(
1120            mgr.lookup_status("b", "k"),
1121            Some(ReplicationStatus::Completed)
1122        );
1123    }
1124
1125    #[test]
1126    fn next_generation_is_monotonic() {
1127        let mgr = ReplicationManager::new();
1128        let g1 = mgr.next_generation();
1129        let g2 = mgr.next_generation();
1130        let g3 = mgr.next_generation();
1131        assert!(g2 > g1, "g2={g2} must exceed g1={g1}");
1132        assert!(g3 > g2, "g3={g3} must exceed g2={g2}");
1133        assert_eq!(g2, g1 + 1);
1134        assert_eq!(g3, g2 + 1);
1135    }
1136
1137    #[test]
1138    fn snapshot_pre_61_format_loads_with_zero_generation() {
1139        // Pre-v0.8.2 #61 snapshot shape: bare `ReplicationStatus`,
1140        // no `next_generation` field. The `untagged` enum + serde
1141        // default must round-trip lossily into the new shape, with
1142        // `generation = 0` (= guaranteed loseable to next real PUT).
1143        let pre_61_json = r#"{
1144            "by_bucket": {},
1145            "statuses": [
1146                [["src", "k"], "Completed"]
1147            ]
1148        }"#;
1149        let mgr = ReplicationManager::from_json(pre_61_json)
1150            .expect("pre-#61 snapshot must deserialise");
1151        assert_eq!(
1152            mgr.lookup_status("src", "k"),
1153            Some(ReplicationStatus::Completed)
1154        );
1155        // First mint after restore is `1` (max(0, 1)).
1156        assert_eq!(mgr.next_generation(), 1);
1157        // The `generation = 0` legacy entry is overridable by any
1158        // real PUT (= a generation >= 1).
1159        assert!(mgr.record_status_if_newer(
1160            "src",
1161            "k",
1162            1,
1163            ReplicationStatus::Pending,
1164        ));
1165    }
1166}