Skip to main content

s4_server/
replication.rs

1//! Bucket-to-bucket asynchronous replication (v0.6 #40).
2//!
3//! AWS S3 Cross-Region Replication (CRR) lets a bucket owner declare a
4//! `ReplicationConfiguration` whose rules say "for every PUT to this
5//! bucket that matches `<filter>`, asynchronously copy the new object to
6//! `<destination_bucket>`". The source object grows an
7//! `x-amz-replication-status` of `PENDING` → `COMPLETED` (or `FAILED`),
8//! the replica gets stamped `REPLICA`, and consumers can poll either
9//! HEAD to see how the replication is going.
10//!
11//! ## v0.6 #40 scope (single-instance only)
12//!
13//! - **Same S4 endpoint** — the source bucket and the destination bucket
14//!   live on the same `S4Service`. True cross-region (multi-instance,
15//!   wire-replicated) replication is a v0.7+ follow-up that needs a
16//!   `aws-sdk-s3` PUT to a remote endpoint with its own credentials.
17//! - **Async only** — the originating `put_object` returns as soon as
18//!   the source backend write is done. The replica PUT happens on a
19//!   detached `tokio::spawn` task and never blocks the client. There is
20//!   no synchronous `replication_required` mode (would defeat the whole
21//!   point of CRR being asynchronous in the first place).
22//! - **Retry budget = 3 attempts** with exponential backoff (50ms,
23//!   100ms, 200ms). On exhaustion the per-(bucket, key) status flips to
24//!   `Failed` and `dropped_total` is bumped + a warn-level log line is
25//!   emitted so operators see the loss in `s4_replication_dropped_total`.
26//! - **Highest-priority rule wins** when multiple rules match a single
27//!   object key (S3 spec). Ties are broken by declaration order
28//!   (deterministic for tests).
29//! - **`status_enabled = false` rules never match**, mirroring the AWS
30//!   `ReplicationRuleStatus::Disabled` semantics — the rule sits in the
31//!   configuration document but is inert.
32//! - **Replica is full-body** — there is no delta replication, no
33//!   incremental fetch, no batching. Every matching PUT triggers one
34//!   independent destination PUT.
35//!
36//! ## what is NOT in v0.6 #40
37//!
38//! - Delete-marker replication (S3's `DeleteMarkerReplication` block) —
39//!   v0.7+. Right now `delete_object` does not fan out a destination
40//!   delete; the replica drifts on the source's deletion.
41//! - Replication of multipart-completed objects through the per-part
42//!   copy path. The whole compose-then-PUT result of CMU is replicated
43//!   as a single PUT, which is fine for single-instance and matches
44//!   what AWS does for source objects ≤ 5 GiB.
45//! - SSE-KMS-encrypted replicas with KMS-key-id rewriting per the
46//!   `SourceSelectionCriteria` block (the source's wrapped DEK is
47//!   replicated as-is — fine for single-instance because the same KMS
48//!   backend unwraps both copies).
49//! - Replication metrics (RTC) — a v0.7+ follow-up that wires a
50//!   `replication_lag_seconds` histogram.
51
52use std::collections::HashMap;
53use std::sync::Arc;
54use std::sync::RwLock;
55use std::sync::atomic::{AtomicU64, Ordering};
56
57use chrono::{DateTime, Utc};
58use serde::{Deserialize, Serialize};
59
60/// Per-(bucket, key) replication state, surfaced as the
61/// `x-amz-replication-status` HEAD/GET response header. Values match the
62/// AWS wire form exactly.
63#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
64pub enum ReplicationStatus {
65    /// Replication has been queued (a matching rule fired and the
66    /// dispatcher task has been spawned) but the destination PUT has
67    /// not yet succeeded.
68    Pending,
69    /// Replication has succeeded — the replica exists in the
70    /// destination bucket.
71    Completed,
72    /// Replication failed permanently (retry budget exhausted).
73    Failed,
74    /// Stamped on the destination side so the replica is
75    /// distinguishable from a normal PUT, matching AWS CRR's
76    /// "replica stamp" behaviour.
77    Replica,
78}
79
80impl ReplicationStatus {
81    /// AWS wire-string form. Caller stamps it on the response as the
82    /// `x-amz-replication-status` header.
83    #[must_use]
84    pub fn as_aws_str(&self) -> &'static str {
85        match self {
86            Self::Pending => "PENDING",
87            Self::Completed => "COMPLETED",
88            Self::Failed => "FAILED",
89            Self::Replica => "REPLICA",
90        }
91    }
92}
93
94/// Filter on a `ReplicationRule` — the AND of a key-prefix predicate
95/// and a tag predicate. AWS S3's wire form uses a sum type
96/// (`Prefix | Tag | And { Prefix, Tags }`); we collapse those into
97/// the single representation that the in-memory matcher needs.
98#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
99pub struct ReplicationFilter {
100    /// Empty / `None` means "any prefix".
101    pub prefix: Option<String>,
102    /// AND of every tag pair — every entry here must be present in the
103    /// object's tag set for the rule to fire. Empty means "no tag
104    /// predicate".
105    pub tags: Vec<(String, String)>,
106}
107
108/// One replication rule. Each rule independently decides whether to
109/// copy an object based on the (key, tags) tuple; the replication
110/// manager picks the highest-priority matching rule when multiple
111/// fire on the same object.
112#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
113pub struct ReplicationRule {
114    /// Operator-supplied id (max 255 chars per AWS).
115    pub id: String,
116    /// Higher number = higher priority (S3 spec). When two rules match
117    /// the same key, the higher `priority` wins; ties broken by
118    /// declaration order.
119    pub priority: u32,
120    /// `false` makes the rule inert without removing it from the
121    /// configuration document — mirrors AWS's `Disabled` status.
122    pub status_enabled: bool,
123    /// Subset of source-bucket objects this rule applies to.
124    pub filter: ReplicationFilter,
125    /// Where to copy matching objects. Plain bucket name (no ARN) for
126    /// the v0.6 #40 single-instance scope.
127    pub destination_bucket: String,
128    /// Optional storage-class override on the replica. `None` = keep
129    /// the source's class (S3 default).
130    pub destination_storage_class: Option<String>,
131}
132
133/// Per-bucket replication configuration.
134#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
135pub struct ReplicationConfig {
136    /// Placeholder ARN — not consumed by S4 itself, kept for AWS wire
137    /// compatibility (the `Role` field is mandatory in the
138    /// `PutBucketReplication` XML payload).
139    pub role: String,
140    pub rules: Vec<ReplicationRule>,
141}
142
143/// Per-(source_bucket, source_key) replication status entry, paired
144/// with the **generation token** of the source PUT that produced it.
145///
146/// ## v0.8.2 #61 — generation token
147///
148/// Each `put_object` (or `complete_multipart_upload`) on a source key
149/// pulls a fresh, monotonically-increasing `generation` from the
150/// manager. The detached replication task carries that generation and
151/// only stamps the status when its generation is `>=` the stored one
152/// (CAS-style). A stale retry whose generation has been overtaken by a
153/// newer PUT is silently dropped, so the destination bucket never gets
154/// rolled back to older bytes. See [`ReplicationManager::next_generation`]
155/// + [`ReplicationManager::record_status_if_newer`].
156///
157/// ## v0.8.3 #66 — `recorded_at` for sweep + TTL (H-5 audit fix)
158///
159/// Each stamp records the wall-clock time the entry was last updated.
160/// The hourly sweep task ([`ReplicationManager::sweep_stale`]) drops
161/// terminal entries (`Completed` / `Failed`) older than the operator-
162/// configured TTL, bounding the otherwise-unbounded growth of the
163/// `statuses` map under workloads with many unique keys. `Pending`
164/// entries are never swept (they are still in-flight and dropping them
165/// would lose the eventual `Completed` / `Failed` stamp the dispatcher
166/// is racing toward). Pre-#66 snapshots without `recorded_at` deserialise
167/// with `Utc::now()` (= "freshly observed at restart") which delays
168/// the first sweep by one TTL cycle but never drops a still-relevant
169/// entry early.
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
171pub struct ReplicationStatusEntry {
172    pub status: ReplicationStatus,
173    pub generation: u64,
174    /// v0.8.3 #66: when the entry was last updated. The sweep drops
175    /// terminal entries (Completed / Failed) older than the operator-
176    /// configured TTL. Pending entries are never swept (still in-flight).
177    /// Pre-#66 snapshots default to `Utc::now()` so legacy entries get
178    /// one full TTL window of grace before becoming sweep-eligible.
179    #[serde(default = "Utc::now")]
180    pub recorded_at: DateTime<Utc>,
181}
182
183/// JSON snapshot — `bucket -> ReplicationConfig`. Mirrors the shape of
184/// `notifications::NotificationSnapshot` so operators can hand-edit
185/// configurations across restart cycles.
186///
187/// ## v0.8.2 #61 back-compat
188///
189/// Pre-#61 snapshots stored `Vec<((String, String), ReplicationStatus)>`
190/// (status only; no generation). The new format stores
191/// `Vec<((String, String), ReplicationStatusEntry)>`. Serde is set up
192/// with `#[serde(untagged)]` on a wrapper enum so old snapshots load
193/// with `generation = 0`. A `generation = 0` entry is never stale —
194/// the very next PUT mints `generation = 1` which wins the CAS — so
195/// the migration is loss-free.
196#[derive(Debug, Default, Serialize, Deserialize)]
197struct ReplicationSnapshot {
198    by_bucket: HashMap<String, ReplicationConfig>,
199    /// Per-(bucket, key) replication status. Persisted so a restart
200    /// doesn't lose the COMPLETED stamp on already-replicated
201    /// objects.
202    statuses: Vec<((String, String), StatusOrEntry)>,
203    /// v0.8.2 #61: persist the next generation so a restart doesn't
204    /// reissue tokens that are still in-flight. Optional for
205    /// back-compat — pre-#61 snapshots restore with `next_generation = 1`.
206    #[serde(default)]
207    next_generation: u64,
208}
209
210/// Back-compat wrapper for snapshot deserialisation: accepts either a
211/// bare `ReplicationStatus` (pre-#61 schema) or a full
212/// `ReplicationStatusEntry`. `serde(untagged)` tries the variants in
213/// declaration order — the more-structured `Entry` variant first so
214/// new snapshots round-trip, falling back to bare `Status` for old
215/// snapshots.
216#[derive(Clone, Debug, Serialize, Deserialize)]
217#[serde(untagged)]
218enum StatusOrEntry {
219    Entry(ReplicationStatusEntry),
220    Status(ReplicationStatus),
221}
222
223impl StatusOrEntry {
224    fn into_entry(self) -> ReplicationStatusEntry {
225        match self {
226            Self::Entry(e) => e,
227            // v0.8.3 #66: pre-#61 snapshots have no `recorded_at`; stamp
228            // `Utc::now()` so the first sweep tick sees them as freshly
229            // observed and gives them one full TTL window of grace.
230            Self::Status(s) => ReplicationStatusEntry {
231                status: s,
232                generation: 0,
233                recorded_at: Utc::now(),
234            },
235        }
236    }
237}
238
239/// In-memory manager of per-bucket replication configurations + per-
240/// (bucket, key) replication statuses.
241pub struct ReplicationManager {
242    by_bucket: RwLock<HashMap<String, ReplicationConfig>>,
243    /// Per-(source_bucket, key) replication status entry (status +
244    /// generation token). Looked up by `head_object` / `get_object` to
245    /// stamp `x-amz-replication-status` on the response.
246    statuses: RwLock<HashMap<(String, String), ReplicationStatusEntry>>,
247    /// v0.8.2 #61: monotonic per-source-PUT generation counter. Each
248    /// `put_object` (or `complete_multipart_upload`) on a replicated
249    /// source bucket calls [`Self::next_generation`] before spawning
250    /// its detached replication task. The dispatcher carries the
251    /// generation through to [`Self::record_status_if_newer`], which
252    /// drops the stamp + the destination write when a newer
253    /// generation has already won — guaranteeing the destination
254    /// can't be rolled back by a slow retry.
255    pub next_generation: AtomicU64,
256    /// Bumped each time the dispatcher exhausts its retry budget on a
257    /// destination PUT. Exposed publicly so the metrics layer can poll
258    /// without taking the configuration lock.
259    pub dropped_total: AtomicU64,
260}
261
262impl Default for ReplicationManager {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268impl std::fmt::Debug for ReplicationManager {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        f.debug_struct("ReplicationManager")
271            .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
272            .finish_non_exhaustive()
273    }
274}
275
276impl ReplicationManager {
277    /// Empty manager — no bucket has any replication rules. The
278    /// generation counter starts at 1 so the first PUT-issued token is
279    /// `1` (a stored entry's `generation = 0` from a pre-#61 snapshot
280    /// is then strictly less and the very next PUT wins the CAS).
281    #[must_use]
282    pub fn new() -> Self {
283        Self {
284            by_bucket: RwLock::new(HashMap::new()),
285            statuses: RwLock::new(HashMap::new()),
286            next_generation: AtomicU64::new(1),
287            dropped_total: AtomicU64::new(0),
288        }
289    }
290
291    /// v0.8.2 #61: mint a fresh, monotonically-increasing generation
292    /// token. Caller is the per-source-PUT dispatcher fork (the body-
293    /// bearing `put_object` branch, the body-less `put_object` branch,
294    /// and `complete_multipart_upload`). The token is then carried
295    /// through [`replicate_object`] to [`Self::record_status_if_newer`]
296    /// so a stale retry can be detected and dropped.
297    ///
298    /// Uses `Relaxed` ordering — we only need uniqueness +
299    /// monotonicity per atomic; the cross-thread happens-before
300    /// between PUT-A's spawn and the dispatcher reading the body is
301    /// already established by `tokio::spawn`'s implicit
302    /// `Acquire/Release` on the task queue.
303    pub fn next_generation(&self) -> u64 {
304        self.next_generation.fetch_add(1, Ordering::Relaxed)
305    }
306
307    /// `put_bucket_replication` handler entry. The bucket's existing
308    /// configuration is fully replaced (S3 spec — `PutBucketReplication`
309    /// is upsert-style at the bucket scope, not per-rule patch).
310    pub fn put(&self, bucket: &str, config: ReplicationConfig) {
311        self.by_bucket
312            .write()
313            .expect("replication state RwLock poisoned")
314            .insert(bucket.to_owned(), config);
315    }
316
317    /// `get_bucket_replication` handler entry. Returns `None` when
318    /// nothing is registered (AWS S3 returns
319    /// `ReplicationConfigurationNotFoundError` in that case; the
320    /// service-layer handler maps `None` accordingly).
321    #[must_use]
322    pub fn get(&self, bucket: &str) -> Option<ReplicationConfig> {
323        self.by_bucket
324            .read()
325            .expect("replication state RwLock poisoned")
326            .get(bucket)
327            .cloned()
328    }
329
330    /// Drop the configuration for `bucket`. Idempotent.
331    pub fn delete(&self, bucket: &str) {
332        self.by_bucket
333            .write()
334            .expect("replication state RwLock poisoned")
335            .remove(bucket);
336    }
337
338    /// Serialise the entire manager state (configurations + per-key
339    /// statuses + next generation counter) to JSON. The status entries
340    /// are emitted in the v0.8.2 #61 schema (`ReplicationStatusEntry`);
341    /// readers built before #61 will see the embedded
342    /// `{ status, generation }` shape via the `untagged` enum and
343    /// (older binaries) reject — but the production restart path always
344    /// runs the same binary against its own snapshot.
345    pub fn to_json(&self) -> Result<String, serde_json::Error> {
346        let snap = ReplicationSnapshot {
347            by_bucket: self
348                .by_bucket
349                .read()
350                .expect("replication state RwLock poisoned")
351                .clone(),
352            statuses: self
353                .statuses
354                .read()
355                .expect("replication state RwLock poisoned")
356                .iter()
357                .map(|(k, v)| (k.clone(), StatusOrEntry::Entry(v.clone())))
358                .collect(),
359            next_generation: self.next_generation.load(Ordering::Relaxed),
360        };
361        serde_json::to_string(&snap)
362    }
363
364    /// Restore a manager from a previously-emitted snapshot. The
365    /// `dropped_total` counter is reset to 0 — historical drops are
366    /// runtime metrics, not configuration.
367    ///
368    /// ## Back-compat (v0.8.2 #61)
369    ///
370    /// Pre-#61 snapshots store bare `ReplicationStatus` (no
371    /// generation). The `untagged` `StatusOrEntry` enum picks them up
372    /// and assigns `generation = 0`, which the CAS-style
373    /// [`Self::record_status_if_newer`] treats as "always overridable
374    /// by the next real PUT" — guaranteed loss-free migration. The
375    /// `next_generation` counter defaults to `1` when the snapshot
376    /// predates #61 (= `serde(default)` on the field).
377    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
378        let snap: ReplicationSnapshot = serde_json::from_str(s)?;
379        let statuses: HashMap<(String, String), ReplicationStatusEntry> = snap
380            .statuses
381            .into_iter()
382            .map(|(k, v)| (k, v.into_entry()))
383            .collect();
384        // Pre-#61 snapshots come back with `next_generation = 0`
385        // (`serde(default)` on `u64`); start fresh at 1 so all newly-
386        // minted tokens are strictly greater than the legacy
387        // `generation = 0` entries.
388        let next_gen = snap.next_generation.max(1);
389        Ok(Self {
390            by_bucket: RwLock::new(snap.by_bucket),
391            statuses: RwLock::new(statuses),
392            next_generation: AtomicU64::new(next_gen),
393            dropped_total: AtomicU64::new(0),
394        })
395    }
396
397    /// Match an object against the bucket's rules and return the
398    /// highest-priority enabled rule whose filter matches. Returns
399    /// `None` when no rule matches (or no configuration is registered
400    /// for the bucket). Ties on `priority` are broken by declaration
401    /// order — the first such rule wins.
402    #[must_use]
403    pub fn match_rule(
404        &self,
405        bucket: &str,
406        key: &str,
407        object_tags: &[(String, String)],
408    ) -> Option<ReplicationRule> {
409        let map = self
410            .by_bucket
411            .read()
412            .expect("replication state RwLock poisoned");
413        let cfg = map.get(bucket)?;
414        let mut best: Option<&ReplicationRule> = None;
415        for rule in &cfg.rules {
416            if !rule.status_enabled {
417                continue;
418            }
419            if !filter_matches(&rule.filter, key, object_tags) {
420                continue;
421            }
422            best = match best {
423                None => Some(rule),
424                Some(prev) if rule.priority > prev.priority => Some(rule),
425                Some(prev) => Some(prev),
426            };
427        }
428        best.cloned()
429    }
430
431    /// Stamp the per-(bucket, key) replication status with no
432    /// generation guard. Replaces any previous entry. **Generation is
433    /// reset to 0** (= overridable by the next real PUT) — callers
434    /// that hold a generation token must use
435    /// [`Self::record_status_if_newer`] instead.
436    ///
437    /// Use cases (kept for back-compat + the eager `Pending` stamp the
438    /// service-layer dispatcher emits before spawning the actual
439    /// replication task):
440    /// - Eager `Pending` stamp synchronously alongside the source PUT
441    ///   so a HEAD between PUT-return and dispatcher-completion sees
442    ///   `PENDING` instead of `None`.
443    /// - Tests that don't care about generation (legacy assertions).
444    pub fn record_status(&self, bucket: &str, key: &str, status: ReplicationStatus) {
445        self.statuses
446            .write()
447            .expect("replication state RwLock poisoned")
448            .insert(
449                (bucket.to_owned(), key.to_owned()),
450                ReplicationStatusEntry {
451                    status,
452                    generation: 0,
453                    // v0.8.3 #66: stamp now so a subsequent sweep can
454                    // age this entry out once it reaches a terminal
455                    // state and exceeds the configured TTL.
456                    recorded_at: Utc::now(),
457                },
458            );
459    }
460
461    /// v0.8.2 #61: CAS-style stamp. Only updates the entry when
462    /// `generation >= entry.generation`; rejects the update (returns
463    /// `false`) when `generation < entry.generation` because a newer
464    /// PUT has already won and we must not roll the source's status
465    /// back to a stale terminal state.
466    ///
467    /// ## Returns
468    ///
469    /// - `true` — the stamp was accepted; the caller may proceed with
470    ///   the destination-bucket PUT (in [`replicate_object`]) /
471    ///   declare success.
472    /// - `false` — a strictly-newer generation has already stamped the
473    ///   entry; the caller must **drop the destination write** to
474    ///   avoid overwriting newer bytes with a stale retry's body.
475    ///
476    /// Equality (`generation == entry.generation`) is accepted because
477    /// the same generation may legitimately stamp twice across the
478    /// dispatcher's retry budget (`Pending` → `Completed` on the same
479    /// task).
480    pub fn record_status_if_newer(
481        &self,
482        bucket: &str,
483        key: &str,
484        generation: u64,
485        status: ReplicationStatus,
486    ) -> bool {
487        let mut map = self
488            .statuses
489            .write()
490            .expect("replication state RwLock poisoned");
491        let now = Utc::now();
492        let entry = map
493            .entry((bucket.to_owned(), key.to_owned()))
494            .or_insert(ReplicationStatusEntry {
495                status: ReplicationStatus::Pending,
496                generation: 0,
497                // v0.8.3 #66: stamp at insertion; will be overwritten
498                // immediately below when the CAS accepts.
499                recorded_at: now,
500            });
501        if generation < entry.generation {
502            return false;
503        }
504        entry.generation = generation;
505        entry.status = status;
506        // v0.8.3 #66: refresh the timestamp on every accepted stamp so
507        // a Pending → Completed transition (same generation) resets
508        // the sweep clock — the TTL is measured from the **last**
509        // terminal stamp, not the first observation.
510        entry.recorded_at = now;
511        true
512    }
513
514    /// v0.8.3 #66 (H-5 audit fix): drop terminal-state entries
515    /// (`Completed` / `Failed`) older than `max_age`. `Pending` entries
516    /// are never swept because they are still in-flight — the
517    /// dispatcher is racing toward a terminal stamp and dropping the
518    /// `Pending` would lose the eventual outcome (and let the entry
519    /// re-emerge under the original key with no recorded history).
520    /// `Replica` entries can theoretically appear here through legacy
521    /// paths and are likewise preserved (the destination-side stamp is
522    /// not produced by `record_status_if_newer` in the current code,
523    /// but the conservative filter keeps any future use loss-free).
524    ///
525    /// Cutoff is `now - max_age` rather than `Utc::now() - max_age` so
526    /// callers can drive the clock deterministically in tests.
527    ///
528    /// Returns the number of entries removed (operators dashboard via
529    /// `s4_replication_status_swept_total`).
530    pub fn sweep_stale(&self, now: DateTime<Utc>, max_age: chrono::Duration) -> usize {
531        let mut map = self
532            .statuses
533            .write()
534            .expect("replication state RwLock poisoned");
535        let cutoff = now - max_age;
536        let stale: Vec<(String, String)> = map
537            .iter()
538            .filter(|(_, e)| {
539                matches!(
540                    e.status,
541                    ReplicationStatus::Completed | ReplicationStatus::Failed
542                ) && e.recorded_at < cutoff
543            })
544            .map(|(k, _)| k.clone())
545            .collect();
546        let count = stale.len();
547        for k in stale {
548            map.remove(&k);
549        }
550        count
551    }
552
553    /// Look up the recorded replication status for `(bucket, key)`.
554    /// Returns `None` when no PUT to this key has triggered
555    /// replication (= the object is not under any replication rule, or
556    /// it predates the rule's creation).
557    ///
558    /// The `generation` field of the entry is intentionally not
559    /// surfaced here — it's an internal CAS guard, not part of the
560    /// AWS wire shape.
561    #[must_use]
562    pub fn lookup_status(&self, bucket: &str, key: &str) -> Option<ReplicationStatus> {
563        self.statuses
564            .read()
565            .expect("replication state RwLock poisoned")
566            .get(&(bucket.to_owned(), key.to_owned()))
567            .map(|entry| entry.status.clone())
568    }
569}
570
571/// AND of (prefix predicate, every tag pair). An empty / `None` prefix
572/// means "any prefix"; an empty tag list means "no tag predicate".
573fn filter_matches(
574    filter: &ReplicationFilter,
575    key: &str,
576    object_tags: &[(String, String)],
577) -> bool {
578    if let Some(p) = filter.prefix.as_deref()
579        && !p.is_empty()
580        && !key.starts_with(p)
581    {
582        return false;
583    }
584    for (tk, tv) in &filter.tags {
585        if !object_tags
586            .iter()
587            .any(|(ok, ov)| ok == tk && ov == tv)
588        {
589            return false;
590        }
591    }
592    true
593}
594
595const RETRY_ATTEMPTS: u32 = 3;
596const RETRY_BASE_MS: u64 = 50;
597
598/// v0.8.3 #68 (audit M-1): emit a single WARN log line per
599/// `(source_bucket, dest_bucket)` pair the first time we observe a
600/// replication PUT that wanted to propagate Object Lock state but the
601/// destination side has no `ObjectLockManager` attached. The metric
602/// (`s4_replication_lock_propagation_skipped_total`) bumps every time
603/// (so dashboards see the rate); the log is dedup'd because operators
604/// only need to know once that the configuration is asymmetric.
605///
606/// The dedup set lives in a process-static `Mutex<HashSet<(src, dst)>>`
607/// — bounded by the (#source × #destination) pair count, which is
608/// always small (operator-declared rules, not per-key).
609pub fn warn_lock_propagation_skipped(source_bucket: &str, dest_bucket: &str) {
610    use std::collections::HashSet;
611    use std::sync::{Mutex, OnceLock};
612    static SEEN: OnceLock<Mutex<HashSet<(String, String)>>> = OnceLock::new();
613    let seen = SEEN.get_or_init(|| Mutex::new(HashSet::new()));
614    let key = (source_bucket.to_owned(), dest_bucket.to_owned());
615    let first_time = {
616        let mut guard = seen.lock().expect("warn-once HashSet Mutex poisoned");
617        guard.insert(key)
618    };
619    if first_time {
620        tracing::warn!(
621            source_bucket = %source_bucket,
622            dest_bucket = %dest_bucket,
623            "S4 replication: source carries Object Lock state but destination \
624             bucket has no ObjectLockManager attached — replica will be freely \
625             deletable on the destination (WORM posture is source-only). Attach \
626             an ObjectLockManager via S4Service::with_object_lock() on the \
627             destination-side gateway to honour cross-bucket WORM."
628        );
629    }
630    crate::metrics::record_replication_lock_propagation_skipped();
631}
632
633/// Replicate one source-bucket object to the rule's destination bucket.
634///
635/// The caller supplies a `do_put` callback that performs the actual
636/// destination-bucket PUT (so unit tests can drive the dispatcher
637/// without needing a full backend). The callback receives:
638/// `(destination_bucket, key, body, metadata)` and returns a
639/// `Result<(), String>` whose `Err` triggers the retry / failure path.
640///
641/// Behaviour:
642/// - Stamps the destination metadata with `x-amz-replication-status:
643///   REPLICA` so a HEAD on the replica is distinguishable.
644/// - On callback success, records `(source_bucket, source_key) →
645///   Completed` in the manager **iff this task's `generation` is not
646///   already overtaken** (CAS-style guard — see [`ReplicationManager::
647///   record_status_if_newer`]).
648/// - On callback failure, retries up to [`RETRY_ATTEMPTS`] times with
649///   exponential backoff (50ms / 100ms / 200ms). After the budget is
650///   exhausted, records `Failed`, bumps `dropped_total`, and emits the
651///   matching Prometheus counter — also CAS-guarded.
652///
653/// ## v0.8.2 #61 — generation token + destination key override
654///
655/// The two new parameters fix the audit's C-1 + C-3 findings:
656///
657/// - `generation` — monotonic per-source-PUT token from
658///   [`ReplicationManager::next_generation`]. CAS-stamps the source's
659///   status + **suppresses the actual destination PUT** when the
660///   token has been overtaken. Without this guard, a slow retry of
661///   PUT-A could land in the destination *after* PUT-B has already
662///   replicated — rolling the destination back to A's bytes. With
663///   the guard, A's task notices B's higher generation has won and
664///   silently drops its destination write.
665/// - `destination_key_override` — the storage-side key the destination
666///   bucket should write under. For an unversioned source this is
667///   `None` and the dispatcher falls back to the source's logical key
668///   (= the AWS-default behaviour). For a **versioning-Enabled**
669///   source the caller passes `Some(versioned_shadow_key(key, vid))`
670///   so the destination's version chain receives the new version
671///   under the same shadow path the source uses (= a `?versionId=`
672///   GET on the destination resolves through the same shadow-key
673///   lookup as the source).
674///
675/// ## v0.8.3 #68 — Object Lock state propagation (audit M-1)
676///
677/// `source_lock_state` carries the source object's WORM posture
678/// (`mode + retain_until + legal_hold_on`) at PUT time. When `Some`,
679/// the destination PUT is decorated with the AWS-wire lock headers
680/// (`x-amz-object-lock-mode`, `x-amz-object-lock-retain-until-date`,
681/// `x-amz-object-lock-legal-hold`) on the metadata map so the
682/// destination side's `put_object` (or its caller) can persist the
683/// same lock state on the replica. Without this, a Compliance /
684/// Governance / legal-hold protected source had a destination
685/// replica that the destination operator could freely DELETE — the
686/// "WORM compliance posture survives DR" guarantee leaked.
687///
688/// `None` (no lock state on the source) keeps the legacy behaviour:
689/// no extra headers, replica is freely deletable on the destination.
690// 10 args is the post-#68 wire-shape: rule + (source_bucket, source_key,
691// body, metadata) + do_put + manager + (generation, dest_override) +
692// source_lock_state. A shape struct would split the call site without
693// buying anything; the caller (`spawn_replication_if_matched`)
694// constructs every field inline, so the indirection is pure noise.
695#[allow(clippy::too_many_arguments)]
696pub async fn replicate_object<F, Fut>(
697    rule: ReplicationRule,
698    source_bucket: String,
699    source_key: String,
700    body: bytes::Bytes,
701    metadata: Option<HashMap<String, String>>,
702    do_put: F,
703    manager: Arc<ReplicationManager>,
704    generation: u64,
705    destination_key_override: Option<String>,
706    source_lock_state: Option<crate::object_lock::ObjectLockState>,
707) where
708    F: Fn(String, String, bytes::Bytes, Option<HashMap<String, String>>) -> Fut,
709    Fut: std::future::Future<Output = Result<(), String>>,
710{
711    // Replica metadata = source metadata + `x-amz-replication-status:
712    // REPLICA` stamp. Keeping the source's compression / encryption
713    // metadata intact means a GET on the replica decodes through the
714    // same path the source would.
715    let mut replica_meta = metadata.unwrap_or_default();
716    replica_meta.insert(
717        "x-amz-replication-status".to_owned(),
718        ReplicationStatus::Replica.as_aws_str().to_owned(),
719    );
720    if let Some(ref sc) = rule.destination_storage_class {
721        replica_meta.insert("x-amz-storage-class".to_owned(), sc.clone());
722    }
723    // v0.8.3 #68 (audit M-1): propagate the source's Object Lock
724    // posture as AWS-wire lock headers attached to the destination
725    // PUT's metadata map. The destination side reads these and
726    // persists the same lock state on the replica so a DR setup keeps
727    // the WORM guarantee end-to-end (Compliance / Governance / legal
728    // hold cannot be silently bypassed by deleting on the destination).
729    if let Some(ref lock) = source_lock_state {
730        if let Some(mode) = lock.mode {
731            replica_meta.insert(
732                "x-amz-object-lock-mode".to_owned(),
733                mode.as_aws_str().to_owned(),
734            );
735        }
736        if let Some(retain_until) = lock.retain_until {
737            // ISO-8601 / RFC-3339 — the AWS wire form for
738            // `x-amz-object-lock-retain-until-date`.
739            replica_meta.insert(
740                "x-amz-object-lock-retain-until-date".to_owned(),
741                retain_until.to_rfc3339(),
742            );
743        }
744        // Always emit the legal-hold flag when any lock state is
745        // present so an explicit "OFF" is propagated too (an absent
746        // header is ambiguous with "no opinion" on the destination).
747        replica_meta.insert(
748            "x-amz-object-lock-legal-hold".to_owned(),
749            if lock.legal_hold_on { "ON" } else { "OFF" }.to_owned(),
750        );
751    }
752
753    let dest_bucket = rule.destination_bucket.clone();
754    // v0.8.2 #61: when the source PUT was a versioned write, the
755    // override carries the storage-side shadow key
756    // (`<key>.__s4ver__/<vid>`); otherwise we use the logical key.
757    let dest_key = destination_key_override.unwrap_or_else(|| source_key.clone());
758    for attempt in 0..RETRY_ATTEMPTS {
759        // v0.8.2 #61 C-3: pre-PUT generation check. If a newer
760        // generation has already stamped a terminal status on this
761        // (bucket, key), our retry is stale — silently drop the
762        // destination write so we don't roll the destination back to
763        // older bytes. We use `record_status_if_newer` with the
764        // **current** entry's status as a no-op when we're not stale,
765        // but the cheap path is to peek and bail.
766        if let Some(entry) = manager
767            .statuses
768            .read()
769            .expect("replication state RwLock poisoned")
770            .get(&(source_bucket.clone(), source_key.clone()))
771            .cloned()
772            && entry.generation > generation
773        {
774            tracing::debug!(
775                source_bucket = %source_bucket,
776                source_key = %source_key,
777                dest_bucket = %dest_bucket,
778                rule_id = %rule.id,
779                generation,
780                stored_generation = entry.generation,
781                "S4 replication: stale generation, dropping destination PUT"
782            );
783            return;
784        }
785        let result = do_put(
786            dest_bucket.clone(),
787            dest_key.clone(),
788            body.clone(),
789            Some(replica_meta.clone()),
790        )
791        .await;
792        match result {
793            Ok(()) => {
794                let accepted = manager.record_status_if_newer(
795                    &source_bucket,
796                    &source_key,
797                    generation,
798                    ReplicationStatus::Completed,
799                );
800                if !accepted {
801                    // v0.8.2 #61 C-3: the destination PUT raced — a
802                    // newer generation stamped between our pre-check
803                    // and our `do_put.await`. The destination now
804                    // *might* hold our stale bytes (the newer PUT
805                    // could have landed after ours) but we stop
806                    // re-stamping and let the newer task overwrite on
807                    // its own success. Bumps the metric so operators
808                    // see the race surfaced.
809                    crate::metrics::record_replication_drop(&source_bucket);
810                    manager.dropped_total.fetch_add(1, Ordering::Relaxed);
811                    tracing::warn!(
812                        source_bucket = %source_bucket,
813                        source_key = %source_key,
814                        dest_bucket = %dest_bucket,
815                        rule_id = %rule.id,
816                        generation,
817                        "S4 replication: completed but a newer generation has won; \
818                         status not stamped"
819                    );
820                    return;
821                }
822                crate::metrics::record_replication_replicated(&source_bucket, &dest_bucket);
823                tracing::debug!(
824                    source_bucket = %source_bucket,
825                    source_key = %source_key,
826                    dest_bucket = %dest_bucket,
827                    rule_id = %rule.id,
828                    generation,
829                    "S4 replication: COMPLETED"
830                );
831                return;
832            }
833            Err(e) => {
834                if attempt + 1 < RETRY_ATTEMPTS {
835                    let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
836                    tracing::warn!(
837                        source_bucket = %source_bucket,
838                        source_key = %source_key,
839                        dest_bucket = %dest_bucket,
840                        attempt = attempt + 1,
841                        generation,
842                        error = %e,
843                        "S4 replication: attempt failed, retrying"
844                    );
845                    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
846                    continue;
847                }
848                // CAS the terminal Failed too — a newer generation that
849                // succeeded must not be rolled back to Failed.
850                let accepted = manager.record_status_if_newer(
851                    &source_bucket,
852                    &source_key,
853                    generation,
854                    ReplicationStatus::Failed,
855                );
856                manager.dropped_total.fetch_add(1, Ordering::Relaxed);
857                crate::metrics::record_replication_drop(&source_bucket);
858                tracing::warn!(
859                    source_bucket = %source_bucket,
860                    source_key = %source_key,
861                    dest_bucket = %dest_bucket,
862                    rule_id = %rule.id,
863                    generation,
864                    error = %e,
865                    accepted_failed_stamp = accepted,
866                    "S4 replication: FAILED after {RETRY_ATTEMPTS} attempts (drop counter bumped)"
867                );
868                return;
869            }
870        }
871    }
872}
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877    use std::sync::Mutex;
878
879    fn rule(
880        id: &str,
881        priority: u32,
882        enabled: bool,
883        prefix: Option<&str>,
884        tags: &[(&str, &str)],
885        dest: &str,
886    ) -> ReplicationRule {
887        ReplicationRule {
888            id: id.to_owned(),
889            priority,
890            status_enabled: enabled,
891            filter: ReplicationFilter {
892                prefix: prefix.map(str::to_owned),
893                tags: tags
894                    .iter()
895                    .map(|(k, v)| ((*k).to_owned(), (*v).to_owned()))
896                    .collect(),
897            },
898            destination_bucket: dest.to_owned(),
899            destination_storage_class: None,
900        }
901    }
902
903    #[test]
904    fn match_rule_prefix_filter_match_and_miss() {
905        let mgr = ReplicationManager::new();
906        mgr.put(
907            "src",
908            ReplicationConfig {
909                role: "arn:aws:iam::000:role/s4-test".into(),
910                rules: vec![rule("r1", 1, true, Some("logs/"), &[], "dst")],
911            },
912        );
913        assert!(mgr.match_rule("src", "logs/2026/01/01.log", &[]).is_some());
914        assert!(mgr.match_rule("src", "uploads/foo.bin", &[]).is_none());
915    }
916
917    #[test]
918    fn match_rule_no_config_for_bucket() {
919        let mgr = ReplicationManager::new();
920        assert!(mgr.match_rule("ghost", "k", &[]).is_none());
921    }
922
923    #[test]
924    fn match_rule_priority_picks_highest() {
925        let mgr = ReplicationManager::new();
926        mgr.put(
927            "src",
928            ReplicationConfig {
929                role: "arn".into(),
930                rules: vec![
931                    rule("low", 1, true, Some(""), &[], "dst-low"),
932                    rule("high", 10, true, Some(""), &[], "dst-high"),
933                    rule("mid", 5, true, Some(""), &[], "dst-mid"),
934                ],
935            },
936        );
937        let picked = mgr.match_rule("src", "any.bin", &[]).expect("match");
938        assert_eq!(picked.id, "high");
939        assert_eq!(picked.destination_bucket, "dst-high");
940    }
941
942    #[test]
943    fn match_rule_priority_tie_breaker_is_declaration_order() {
944        let mgr = ReplicationManager::new();
945        mgr.put(
946            "src",
947            ReplicationConfig {
948                role: "arn".into(),
949                rules: vec![
950                    rule("first", 5, true, Some(""), &[], "dst-first"),
951                    rule("second", 5, true, Some(""), &[], "dst-second"),
952                ],
953            },
954        );
955        let picked = mgr.match_rule("src", "k", &[]).expect("match");
956        assert_eq!(picked.id, "first", "tie on priority must keep the earlier rule");
957    }
958
959    #[test]
960    fn match_rule_tag_filter_and_of_all_tags() {
961        let mgr = ReplicationManager::new();
962        mgr.put(
963            "src",
964            ReplicationConfig {
965                role: "arn".into(),
966                rules: vec![rule(
967                    "r-tags",
968                    1,
969                    true,
970                    None,
971                    &[("env", "prod"), ("tier", "gold")],
972                    "dst",
973                )],
974            },
975        );
976        // Both tags present → match.
977        assert!(
978            mgr.match_rule(
979                "src",
980                "k",
981                &[
982                    ("env".into(), "prod".into()),
983                    ("tier".into(), "gold".into()),
984                    ("extra".into(), "ignored".into())
985                ]
986            )
987            .is_some(),
988            "all required tags present (extras OK) must match"
989        );
990        // Only one tag → AND fails.
991        assert!(
992            mgr.match_rule(
993                "src",
994                "k",
995                &[("env".into(), "prod".into())]
996            )
997            .is_none(),
998            "missing one of the required tags must not match"
999        );
1000        // Wrong tag value → AND fails.
1001        assert!(
1002            mgr.match_rule(
1003                "src",
1004                "k",
1005                &[
1006                    ("env".into(), "dev".into()),
1007                    ("tier".into(), "gold".into())
1008                ]
1009            )
1010            .is_none(),
1011            "wrong value on a required tag must not match"
1012        );
1013    }
1014
1015    #[test]
1016    fn match_rule_status_disabled_never_matches() {
1017        let mgr = ReplicationManager::new();
1018        mgr.put(
1019            "src",
1020            ReplicationConfig {
1021                role: "arn".into(),
1022                rules: vec![rule("disabled", 100, false, None, &[], "dst")],
1023            },
1024        );
1025        assert!(
1026            mgr.match_rule("src", "anything", &[]).is_none(),
1027            "status_enabled=false must not match even at high priority"
1028        );
1029    }
1030
1031    #[test]
1032    fn record_and_lookup_status_round_trip() {
1033        let mgr = ReplicationManager::new();
1034        assert!(mgr.lookup_status("b", "k").is_none());
1035        mgr.record_status("b", "k", ReplicationStatus::Pending);
1036        assert_eq!(
1037            mgr.lookup_status("b", "k"),
1038            Some(ReplicationStatus::Pending)
1039        );
1040        mgr.record_status("b", "k", ReplicationStatus::Completed);
1041        assert_eq!(
1042            mgr.lookup_status("b", "k"),
1043            Some(ReplicationStatus::Completed)
1044        );
1045    }
1046
1047    #[test]
1048    fn json_round_trip_preserves_config_and_statuses() {
1049        let mgr = ReplicationManager::new();
1050        mgr.put(
1051            "src",
1052            ReplicationConfig {
1053                role: "arn:aws:iam::000:role/s4".into(),
1054                rules: vec![rule("r1", 7, true, Some("docs/"), &[("env", "prod")], "dst")],
1055            },
1056        );
1057        mgr.record_status("src", "docs/a.pdf", ReplicationStatus::Completed);
1058        let json = mgr.to_json().expect("to_json");
1059        let mgr2 = ReplicationManager::from_json(&json).expect("from_json");
1060        assert_eq!(mgr.get("src"), mgr2.get("src"));
1061        assert_eq!(
1062            mgr2.lookup_status("src", "docs/a.pdf"),
1063            Some(ReplicationStatus::Completed)
1064        );
1065    }
1066
1067    #[test]
1068    fn delete_is_idempotent() {
1069        let mgr = ReplicationManager::new();
1070        mgr.delete("never-existed");
1071        mgr.put(
1072            "b",
1073            ReplicationConfig {
1074                role: "arn".into(),
1075                rules: vec![rule("r1", 1, true, None, &[], "dst")],
1076            },
1077        );
1078        mgr.delete("b");
1079        assert!(mgr.get("b").is_none());
1080    }
1081
1082    #[test]
1083    fn put_replaces_previous_config() {
1084        let mgr = ReplicationManager::new();
1085        mgr.put(
1086            "b",
1087            ReplicationConfig {
1088                role: "arn".into(),
1089                rules: vec![rule("old", 1, true, None, &[], "dst-old")],
1090            },
1091        );
1092        mgr.put(
1093            "b",
1094            ReplicationConfig {
1095                role: "arn".into(),
1096                rules: vec![rule("new", 1, true, None, &[], "dst-new")],
1097            },
1098        );
1099        let cfg = mgr.get("b").expect("config");
1100        assert_eq!(cfg.rules.len(), 1);
1101        assert_eq!(cfg.rules[0].id, "new");
1102        assert_eq!(cfg.rules[0].destination_bucket, "dst-new");
1103    }
1104
1105    #[tokio::test]
1106    async fn replicate_object_happy_path_marks_completed() {
1107        type Captured = Vec<(String, String, bytes::Bytes, Option<HashMap<String, String>>)>;
1108        let mgr = Arc::new(ReplicationManager::new());
1109        let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
1110        let captured_cl = Arc::clone(&captured);
1111
1112        let do_put = move |dest: String,
1113                           key: String,
1114                           body: bytes::Bytes,
1115                           meta: Option<HashMap<String, String>>| {
1116            let captured = Arc::clone(&captured_cl);
1117            async move {
1118                captured.lock().unwrap().push((dest, key, body, meta));
1119                Ok::<(), String>(())
1120            }
1121        };
1122
1123        replicate_object(
1124            rule("r1", 1, true, None, &[], "dst"),
1125            "src".into(),
1126            "obj.bin".into(),
1127            bytes::Bytes::from_static(b"hello"),
1128            Some(HashMap::from([("content-type".into(), "text/plain".into())])),
1129            do_put,
1130            Arc::clone(&mgr),
1131            mgr.next_generation(),
1132            None,
1133            None,
1134        )
1135        .await;
1136
1137        assert_eq!(
1138            mgr.lookup_status("src", "obj.bin"),
1139            Some(ReplicationStatus::Completed)
1140        );
1141        assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
1142        let cap = captured.lock().unwrap();
1143        assert_eq!(cap.len(), 1, "do_put must run exactly once on success");
1144        assert_eq!(cap[0].0, "dst");
1145        assert_eq!(cap[0].1, "obj.bin");
1146        assert_eq!(cap[0].2.as_ref(), b"hello");
1147        let meta = cap[0].3.as_ref().expect("metadata stamped");
1148        assert_eq!(
1149            meta.get("x-amz-replication-status").map(String::as_str),
1150            Some("REPLICA"),
1151            "destination meta must carry the REPLICA stamp"
1152        );
1153        assert_eq!(meta.get("content-type").map(String::as_str), Some("text/plain"));
1154    }
1155
1156    #[tokio::test]
1157    async fn replicate_object_failure_after_retry_budget_marks_failed_and_bumps_drop() {
1158        let mgr = Arc::new(ReplicationManager::new());
1159        let attempts: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
1160        let attempts_cl = Arc::clone(&attempts);
1161
1162        let do_put = move |_dest: String,
1163                           _key: String,
1164                           _body: bytes::Bytes,
1165                           _meta: Option<HashMap<String, String>>| {
1166            let attempts = Arc::clone(&attempts_cl);
1167            async move {
1168                *attempts.lock().unwrap() += 1;
1169                Err::<(), String>("simulated destination 5xx".into())
1170            }
1171        };
1172
1173        replicate_object(
1174            rule("r-fail", 1, true, None, &[], "dst"),
1175            "src".into(),
1176            "doomed.bin".into(),
1177            bytes::Bytes::from_static(b"x"),
1178            None,
1179            do_put,
1180            Arc::clone(&mgr),
1181            mgr.next_generation(),
1182            None,
1183            None,
1184        )
1185        .await;
1186
1187        assert_eq!(
1188            *attempts.lock().unwrap(),
1189            RETRY_ATTEMPTS,
1190            "must retry exactly the configured budget"
1191        );
1192        assert_eq!(
1193            mgr.lookup_status("src", "doomed.bin"),
1194            Some(ReplicationStatus::Failed)
1195        );
1196        assert_eq!(
1197            mgr.dropped_total.load(Ordering::Relaxed),
1198            1,
1199            "drop counter must bump exactly once after retry budget exhausted"
1200        );
1201    }
1202
1203    #[test]
1204    fn replication_status_aws_strings_match_spec() {
1205        assert_eq!(ReplicationStatus::Pending.as_aws_str(), "PENDING");
1206        assert_eq!(ReplicationStatus::Completed.as_aws_str(), "COMPLETED");
1207        assert_eq!(ReplicationStatus::Failed.as_aws_str(), "FAILED");
1208        assert_eq!(ReplicationStatus::Replica.as_aws_str(), "REPLICA");
1209    }
1210
1211    // ----- v0.8.2 #61: generation token CAS unit tests -----
1212
1213    #[test]
1214    fn record_status_if_newer_accepts_higher_generation() {
1215        let mgr = ReplicationManager::new();
1216        // First stamp at gen=5 — no prior entry, accepted.
1217        assert!(mgr.record_status_if_newer(
1218            "b",
1219            "k",
1220            5,
1221            ReplicationStatus::Pending,
1222        ));
1223        // Higher generation overrides.
1224        assert!(mgr.record_status_if_newer(
1225            "b",
1226            "k",
1227            7,
1228            ReplicationStatus::Completed,
1229        ));
1230        assert_eq!(
1231            mgr.lookup_status("b", "k"),
1232            Some(ReplicationStatus::Completed)
1233        );
1234    }
1235
1236    #[test]
1237    fn record_status_if_newer_rejects_stale_generation() {
1238        let mgr = ReplicationManager::new();
1239        // Newer PUT lands first.
1240        assert!(mgr.record_status_if_newer(
1241            "b",
1242            "k",
1243            10,
1244            ReplicationStatus::Completed,
1245        ));
1246        // Older retry must be rejected — destination must not roll
1247        // back to "alpha" once "beta" has stamped Completed.
1248        let accepted = mgr.record_status_if_newer(
1249            "b",
1250            "k",
1251            3,
1252            ReplicationStatus::Completed,
1253        );
1254        assert!(!accepted, "stale generation must be rejected");
1255        // Stored entry stays at the newer generation's terminal state.
1256        assert_eq!(
1257            mgr.lookup_status("b", "k"),
1258            Some(ReplicationStatus::Completed)
1259        );
1260    }
1261
1262    #[test]
1263    fn record_status_if_newer_accepts_equal_generation() {
1264        // Same generation may legitimately re-stamp (Pending →
1265        // Completed transition on the same task). The CAS is `>=`
1266        // not `>`.
1267        let mgr = ReplicationManager::new();
1268        assert!(mgr.record_status_if_newer(
1269            "b",
1270            "k",
1271            42,
1272            ReplicationStatus::Pending,
1273        ));
1274        assert!(mgr.record_status_if_newer(
1275            "b",
1276            "k",
1277            42,
1278            ReplicationStatus::Completed,
1279        ));
1280        assert_eq!(
1281            mgr.lookup_status("b", "k"),
1282            Some(ReplicationStatus::Completed)
1283        );
1284    }
1285
1286    #[test]
1287    fn next_generation_is_monotonic() {
1288        let mgr = ReplicationManager::new();
1289        let g1 = mgr.next_generation();
1290        let g2 = mgr.next_generation();
1291        let g3 = mgr.next_generation();
1292        assert!(g2 > g1, "g2={g2} must exceed g1={g1}");
1293        assert!(g3 > g2, "g3={g3} must exceed g2={g2}");
1294        assert_eq!(g2, g1 + 1);
1295        assert_eq!(g3, g2 + 1);
1296    }
1297
1298    #[test]
1299    fn snapshot_pre_61_format_loads_with_zero_generation() {
1300        // Pre-v0.8.2 #61 snapshot shape: bare `ReplicationStatus`,
1301        // no `next_generation` field. The `untagged` enum + serde
1302        // default must round-trip lossily into the new shape, with
1303        // `generation = 0` (= guaranteed loseable to next real PUT).
1304        let pre_61_json = r#"{
1305            "by_bucket": {},
1306            "statuses": [
1307                [["src", "k"], "Completed"]
1308            ]
1309        }"#;
1310        let mgr = ReplicationManager::from_json(pre_61_json)
1311            .expect("pre-#61 snapshot must deserialise");
1312        assert_eq!(
1313            mgr.lookup_status("src", "k"),
1314            Some(ReplicationStatus::Completed)
1315        );
1316        // First mint after restore is `1` (max(0, 1)).
1317        assert_eq!(mgr.next_generation(), 1);
1318        // The `generation = 0` legacy entry is overridable by any
1319        // real PUT (= a generation >= 1).
1320        assert!(mgr.record_status_if_newer(
1321            "src",
1322            "k",
1323            1,
1324            ReplicationStatus::Pending,
1325        ));
1326    }
1327
1328    // ----- v0.8.3 #66: sweep + TTL unit tests (H-5 audit fix) -----
1329
1330    /// Helper: install a `(bucket, key)` entry with an explicit
1331    /// `recorded_at` so the sweep test can pin the clock at a known
1332    /// offset from "now". Bypasses `record_status_if_newer` because
1333    /// that always stamps with `Utc::now()`.
1334    fn install_entry_with_recorded_at(
1335        mgr: &ReplicationManager,
1336        bucket: &str,
1337        key: &str,
1338        status: ReplicationStatus,
1339        recorded_at: DateTime<Utc>,
1340    ) {
1341        mgr.statuses
1342            .write()
1343            .expect("replication state RwLock poisoned")
1344            .insert(
1345                (bucket.to_owned(), key.to_owned()),
1346                ReplicationStatusEntry {
1347                    status,
1348                    generation: 1,
1349                    recorded_at,
1350                },
1351            );
1352    }
1353
1354    #[test]
1355    fn sweep_stale_drops_completed_past_ttl() {
1356        // Three terminal entries: Completed -10h, Failed -10h, Completed -1h.
1357        // sweep_stale(now, 5h) → drops the two -10h entries, keeps the
1358        // recent Completed.
1359        let mgr = ReplicationManager::new();
1360        let now = Utc::now();
1361        install_entry_with_recorded_at(
1362            &mgr,
1363            "src",
1364            "old-completed",
1365            ReplicationStatus::Completed,
1366            now - chrono::Duration::hours(10),
1367        );
1368        install_entry_with_recorded_at(
1369            &mgr,
1370            "src",
1371            "old-failed",
1372            ReplicationStatus::Failed,
1373            now - chrono::Duration::hours(10),
1374        );
1375        install_entry_with_recorded_at(
1376            &mgr,
1377            "src",
1378            "recent-completed",
1379            ReplicationStatus::Completed,
1380            now - chrono::Duration::hours(1),
1381        );
1382
1383        let n = mgr.sweep_stale(now, chrono::Duration::hours(5));
1384        assert_eq!(n, 2, "two terminal entries past 5h TTL must be swept");
1385        assert!(
1386            mgr.lookup_status("src", "old-completed").is_none(),
1387            "Completed past TTL must be removed"
1388        );
1389        assert!(
1390            mgr.lookup_status("src", "old-failed").is_none(),
1391            "Failed past TTL must be removed"
1392        );
1393        assert_eq!(
1394            mgr.lookup_status("src", "recent-completed"),
1395            Some(ReplicationStatus::Completed),
1396            "Completed within TTL must survive"
1397        );
1398    }
1399
1400    #[test]
1401    fn sweep_stale_keeps_pending_regardless_of_age() {
1402        // A Pending entry stamped 100h ago is **still in-flight**
1403        // (the dispatcher is racing toward a terminal stamp). Sweeping
1404        // it would lose the eventual Completed/Failed outcome and let
1405        // a stale generation re-emerge under the original key with no
1406        // recorded history.
1407        let mgr = ReplicationManager::new();
1408        let now = Utc::now();
1409        install_entry_with_recorded_at(
1410            &mgr,
1411            "src",
1412            "ancient-pending",
1413            ReplicationStatus::Pending,
1414            now - chrono::Duration::hours(100),
1415        );
1416
1417        let n = mgr.sweep_stale(now, chrono::Duration::hours(5));
1418        assert_eq!(n, 0, "Pending entries must never be swept");
1419        assert_eq!(
1420            mgr.lookup_status("src", "ancient-pending"),
1421            Some(ReplicationStatus::Pending),
1422            "ancient Pending must still be present"
1423        );
1424    }
1425
1426    #[test]
1427    fn recorded_at_back_compat_default_now_on_deserialize() {
1428        // A pre-#66 snapshot whose status entries omit `recorded_at`
1429        // must deserialise with `recorded_at = Utc::now()` (= "freshly
1430        // observed at restart"). This delays the first sweep by one
1431        // TTL window but never drops a still-relevant entry early.
1432        // Use the v0.8.2 #61 entry shape (status + generation, no
1433        // recorded_at) to verify the `#[serde(default = "Utc::now")]`
1434        // applies on inner-entry deserialisation too.
1435        let pre_66_json = r#"{
1436            "by_bucket": {},
1437            "statuses": [
1438                [["src", "k"], { "status": "Completed", "generation": 7 }]
1439            ],
1440            "next_generation": 8
1441        }"#;
1442        let before = Utc::now();
1443        let mgr = ReplicationManager::from_json(pre_66_json)
1444            .expect("pre-#66 snapshot with no `recorded_at` must deserialise");
1445        let after = Utc::now();
1446
1447        // Status preserved.
1448        assert_eq!(
1449            mgr.lookup_status("src", "k"),
1450            Some(ReplicationStatus::Completed),
1451        );
1452
1453        // recorded_at defaulted to Utc::now() at deserialise time —
1454        // peek the inner entry to verify the timestamp is in the
1455        // [before, after] window of the from_json call.
1456        let entries = mgr
1457            .statuses
1458            .read()
1459            .expect("replication state RwLock poisoned");
1460        let entry = entries
1461            .get(&("src".to_owned(), "k".to_owned()))
1462            .expect("entry must exist");
1463        assert!(
1464            entry.recorded_at >= before && entry.recorded_at <= after,
1465            "recorded_at default must be Utc::now() at deserialise time \
1466             (got {:?}, expected within [{:?}, {:?}])",
1467            entry.recorded_at,
1468            before,
1469            after
1470        );
1471        assert_eq!(entry.generation, 7, "generation must round-trip");
1472
1473        // A sweep with TTL 1h immediately after must NOT drop this
1474        // entry (recorded_at ≈ now, well within TTL).
1475        drop(entries);
1476        let n = mgr.sweep_stale(Utc::now(), chrono::Duration::hours(1));
1477        assert_eq!(
1478            n, 0,
1479            "freshly-defaulted recorded_at must survive a 1h-TTL sweep"
1480        );
1481    }
1482
1483    // ----- v0.8.3 #68: Object Lock state propagation unit tests (audit M-1) -----
1484
1485    /// When the source object carried an Object Lock state, the
1486    /// dispatcher must inject the AWS-wire lock headers
1487    /// (`x-amz-object-lock-mode`, `-retain-until-date`, `-legal-hold`)
1488    /// into the destination PUT's metadata map so the destination side
1489    /// can persist the same WORM posture on the replica.
1490    #[tokio::test]
1491    async fn replicate_with_source_lock_state_attaches_headers() {
1492        type Captured = Vec<(String, String, bytes::Bytes, Option<HashMap<String, String>>)>;
1493        let mgr = Arc::new(ReplicationManager::new());
1494        let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
1495        let captured_cl = Arc::clone(&captured);
1496
1497        let do_put = move |dest: String,
1498                           key: String,
1499                           body: bytes::Bytes,
1500                           meta: Option<HashMap<String, String>>| {
1501            let captured = Arc::clone(&captured_cl);
1502            async move {
1503                captured.lock().unwrap().push((dest, key, body, meta));
1504                Ok::<(), String>(())
1505            }
1506        };
1507
1508        let retain_until = Utc::now() + chrono::Duration::days(30);
1509        let lock_state = crate::object_lock::ObjectLockState {
1510            mode: Some(crate::object_lock::LockMode::Compliance),
1511            retain_until: Some(retain_until),
1512            legal_hold_on: true,
1513        };
1514
1515        replicate_object(
1516            rule("r-locked", 1, true, None, &[], "dst"),
1517            "src".into(),
1518            "worm.bin".into(),
1519            bytes::Bytes::from_static(b"locked-payload"),
1520            None,
1521            do_put,
1522            Arc::clone(&mgr),
1523            mgr.next_generation(),
1524            None,
1525            Some(lock_state),
1526        )
1527        .await;
1528
1529        let cap = captured.lock().unwrap();
1530        assert_eq!(cap.len(), 1, "do_put must run exactly once on success");
1531        let meta = cap[0].3.as_ref().expect("metadata stamped");
1532        assert_eq!(
1533            meta.get("x-amz-object-lock-mode").map(String::as_str),
1534            Some("COMPLIANCE"),
1535            "Compliance mode header must be propagated"
1536        );
1537        let stamped_until = meta
1538            .get("x-amz-object-lock-retain-until-date")
1539            .expect("retain-until header must be propagated");
1540        // RFC-3339 round-trip: parse back and compare with the source
1541        // retain_until (1s slack for sub-second truncation).
1542        let parsed: chrono::DateTime<chrono::FixedOffset> =
1543            chrono::DateTime::parse_from_rfc3339(stamped_until)
1544                .expect("retain-until must be RFC-3339");
1545        let diff = (parsed.with_timezone(&Utc) - retain_until).num_seconds().abs();
1546        assert!(diff <= 1, "retain-until off by {diff}s");
1547        assert_eq!(
1548            meta.get("x-amz-object-lock-legal-hold").map(String::as_str),
1549            Some("ON"),
1550            "legal hold state must be propagated as ON"
1551        );
1552        // The REPLICA stamp must still be present alongside the lock
1553        // headers (lock propagation must not displace the replica
1554        // marker that lets HEAD distinguish replica-vs-direct PUT).
1555        assert_eq!(
1556            meta.get("x-amz-replication-status").map(String::as_str),
1557            Some("REPLICA"),
1558        );
1559    }
1560
1561    /// Symmetric: no source lock state → no lock headers leak into
1562    /// the destination metadata (legacy behaviour preserved for the
1563    /// non-WORM PUT path).
1564    #[tokio::test]
1565    async fn replicate_without_source_lock_state_no_headers_added() {
1566        type Captured = Vec<(String, String, bytes::Bytes, Option<HashMap<String, String>>)>;
1567        let mgr = Arc::new(ReplicationManager::new());
1568        let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
1569        let captured_cl = Arc::clone(&captured);
1570
1571        let do_put = move |dest: String,
1572                           key: String,
1573                           body: bytes::Bytes,
1574                           meta: Option<HashMap<String, String>>| {
1575            let captured = Arc::clone(&captured_cl);
1576            async move {
1577                captured.lock().unwrap().push((dest, key, body, meta));
1578                Ok::<(), String>(())
1579            }
1580        };
1581
1582        replicate_object(
1583            rule("r-plain", 1, true, None, &[], "dst"),
1584            "src".into(),
1585            "plain.bin".into(),
1586            bytes::Bytes::from_static(b"plain-payload"),
1587            None,
1588            do_put,
1589            Arc::clone(&mgr),
1590            mgr.next_generation(),
1591            None,
1592            None,
1593        )
1594        .await;
1595
1596        let cap = captured.lock().unwrap();
1597        let meta = cap[0].3.as_ref().expect("metadata stamped");
1598        assert!(
1599            meta.get("x-amz-object-lock-mode").is_none(),
1600            "no lock state ⇒ no mode header (got {:?})",
1601            meta.get("x-amz-object-lock-mode")
1602        );
1603        assert!(
1604            meta.get("x-amz-object-lock-retain-until-date").is_none(),
1605            "no lock state ⇒ no retain-until header"
1606        );
1607        assert!(
1608            meta.get("x-amz-object-lock-legal-hold").is_none(),
1609            "no lock state ⇒ no legal-hold header"
1610        );
1611    }
1612}