Skip to main content

s4_server/
replication.rs

1//! Bucket-to-bucket asynchronous replication (v0.6 #40).
2//!
3//! AWS S3 Cross-Region Replication (CRR) lets a bucket owner declare a
4//! `ReplicationConfiguration` whose rules say "for every PUT to this
5//! bucket that matches `<filter>`, asynchronously copy the new object to
6//! `<destination_bucket>`". The source object grows an
7//! `x-amz-replication-status` of `PENDING` → `COMPLETED` (or `FAILED`),
8//! the replica gets stamped `REPLICA`, and consumers can poll either
9//! HEAD to see how the replication is going.
10//!
11//! ## v0.6 #40 scope (single-instance only)
12//!
13//! - **Same S4 endpoint** — the source bucket and the destination bucket
14//!   live on the same `S4Service`. True cross-region (multi-instance,
15//!   wire-replicated) replication is a v0.7+ follow-up that needs a
16//!   `aws-sdk-s3` PUT to a remote endpoint with its own credentials.
17//! - **Async only** — the originating `put_object` returns as soon as
18//!   the source backend write is done. The replica PUT happens on a
19//!   detached `tokio::spawn` task and never blocks the client. There is
20//!   no synchronous `replication_required` mode (would defeat the whole
21//!   point of CRR being asynchronous in the first place).
22//! - **Retry budget = 3 attempts** with exponential backoff (50ms,
23//!   100ms, 200ms). On exhaustion the per-(bucket, key) status flips to
24//!   `Failed` and `dropped_total` is bumped + a warn-level log line is
25//!   emitted so operators see the loss in `s4_replication_dropped_total`.
26//! - **Highest-priority rule wins** when multiple rules match a single
27//!   object key (S3 spec). Ties are broken by declaration order
28//!   (deterministic for tests).
29//! - **`status_enabled = false` rules never match**, mirroring the AWS
30//!   `ReplicationRuleStatus::Disabled` semantics — the rule sits in the
31//!   configuration document but is inert.
32//! - **Replica is full-body** — there is no delta replication, no
33//!   incremental fetch, no batching. Every matching PUT triggers one
34//!   independent destination PUT.
35//!
36//! ## what is NOT in v0.6 #40
37//!
38//! - Delete-marker replication (S3's `DeleteMarkerReplication` block) —
39//!   v0.7+. Right now `delete_object` does not fan out a destination
40//!   delete; the replica drifts on the source's deletion.
41//! - Replication of multipart-completed objects through the per-part
42//!   copy path. The whole compose-then-PUT result of CMU is replicated
43//!   as a single PUT, which is fine for single-instance and matches
44//!   what AWS does for source objects ≤ 5 GiB.
45//! - SSE-KMS-encrypted replicas with KMS-key-id rewriting per the
46//!   `SourceSelectionCriteria` block (the source's wrapped DEK is
47//!   replicated as-is — fine for single-instance because the same KMS
48//!   backend unwraps both copies).
49//! - Replication metrics (RTC) — a v0.7+ follow-up that wires a
50//!   `replication_lag_seconds` histogram.
51
52use std::collections::HashMap;
53use std::sync::Arc;
54use std::sync::RwLock;
55use std::sync::atomic::{AtomicU64, Ordering};
56
57use chrono::{DateTime, Utc};
58use serde::{Deserialize, Serialize};
59
60/// Per-(bucket, key) replication state, surfaced as the
61/// `x-amz-replication-status` HEAD/GET response header. Values match the
62/// AWS wire form exactly.
63#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
64pub enum ReplicationStatus {
65    /// Replication has been queued (a matching rule fired and the
66    /// dispatcher task has been spawned) but the destination PUT has
67    /// not yet succeeded.
68    Pending,
69    /// Replication has succeeded — the replica exists in the
70    /// destination bucket.
71    Completed,
72    /// Replication failed permanently (retry budget exhausted).
73    Failed,
74    /// Stamped on the destination side so the replica is
75    /// distinguishable from a normal PUT, matching AWS CRR's
76    /// "replica stamp" behaviour.
77    Replica,
78}
79
80impl ReplicationStatus {
81    /// AWS wire-string form. Caller stamps it on the response as the
82    /// `x-amz-replication-status` header.
83    #[must_use]
84    pub fn as_aws_str(&self) -> &'static str {
85        match self {
86            Self::Pending => "PENDING",
87            Self::Completed => "COMPLETED",
88            Self::Failed => "FAILED",
89            Self::Replica => "REPLICA",
90        }
91    }
92}
93
94/// Filter on a `ReplicationRule` — the AND of a key-prefix predicate
95/// and a tag predicate. AWS S3's wire form uses a sum type
96/// (`Prefix | Tag | And { Prefix, Tags }`); we collapse those into
97/// the single representation that the in-memory matcher needs.
98#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
99pub struct ReplicationFilter {
100    /// Empty / `None` means "any prefix".
101    pub prefix: Option<String>,
102    /// AND of every tag pair — every entry here must be present in the
103    /// object's tag set for the rule to fire. Empty means "no tag
104    /// predicate".
105    pub tags: Vec<(String, String)>,
106}
107
108/// One replication rule. Each rule independently decides whether to
109/// copy an object based on the (key, tags) tuple; the replication
110/// manager picks the highest-priority matching rule when multiple
111/// fire on the same object.
112#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
113pub struct ReplicationRule {
114    /// Operator-supplied id (max 255 chars per AWS).
115    pub id: String,
116    /// Higher number = higher priority (S3 spec). When two rules match
117    /// the same key, the higher `priority` wins; ties broken by
118    /// declaration order.
119    pub priority: u32,
120    /// `false` makes the rule inert without removing it from the
121    /// configuration document — mirrors AWS's `Disabled` status.
122    pub status_enabled: bool,
123    /// Subset of source-bucket objects this rule applies to.
124    pub filter: ReplicationFilter,
125    /// Where to copy matching objects. Plain bucket name (no ARN) for
126    /// the v0.6 #40 single-instance scope.
127    pub destination_bucket: String,
128    /// Optional storage-class override on the replica. `None` = keep
129    /// the source's class (S3 default).
130    pub destination_storage_class: Option<String>,
131}
132
133/// Per-bucket replication configuration.
134#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
135pub struct ReplicationConfig {
136    /// Placeholder ARN — not consumed by S4 itself, kept for AWS wire
137    /// compatibility (the `Role` field is mandatory in the
138    /// `PutBucketReplication` XML payload).
139    pub role: String,
140    pub rules: Vec<ReplicationRule>,
141}
142
143/// Per-(source_bucket, source_key) replication status entry, paired
144/// with the **generation token** of the source PUT that produced it.
145///
146/// ## v0.8.2 #61 — generation token
147///
148/// Each `put_object` (or `complete_multipart_upload`) on a source key
149/// pulls a fresh, monotonically-increasing `generation` from the
150/// manager. The detached replication task carries that generation and
151/// only stamps the status when its generation is `>=` the stored one
152/// (CAS-style). A stale retry whose generation has been overtaken by a
153/// newer PUT is silently dropped, so the destination bucket never gets
154/// rolled back to older bytes. See [`ReplicationManager::next_generation`]
155/// + [`ReplicationManager::record_status_if_newer`].
156///
157/// ## v0.8.3 #66 — `recorded_at` for sweep + TTL (H-5 audit fix)
158///
159/// Each stamp records the wall-clock time the entry was last updated.
160/// The hourly sweep task ([`ReplicationManager::sweep_stale`]) drops
161/// terminal entries (`Completed` / `Failed`) older than the operator-
162/// configured TTL, bounding the otherwise-unbounded growth of the
163/// `statuses` map under workloads with many unique keys. `Pending`
164/// entries are never swept (they are still in-flight and dropping them
165/// would lose the eventual `Completed` / `Failed` stamp the dispatcher
166/// is racing toward). Pre-#66 snapshots without `recorded_at` deserialise
167/// with `Utc::now()` (= "freshly observed at restart") which delays
168/// the first sweep by one TTL cycle but never drops a still-relevant
169/// entry early.
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
171pub struct ReplicationStatusEntry {
172    pub status: ReplicationStatus,
173    pub generation: u64,
174    /// v0.8.3 #66: when the entry was last updated. The sweep drops
175    /// terminal entries (Completed / Failed) older than the operator-
176    /// configured TTL. Pending entries are never swept (still in-flight).
177    /// Pre-#66 snapshots default to `Utc::now()` so legacy entries get
178    /// one full TTL window of grace before becoming sweep-eligible.
179    #[serde(default = "Utc::now")]
180    pub recorded_at: DateTime<Utc>,
181}
182
183/// JSON snapshot — `bucket -> ReplicationConfig`. Mirrors the shape of
184/// `notifications::NotificationSnapshot` so operators can hand-edit
185/// configurations across restart cycles.
186///
187/// ## v0.8.2 #61 back-compat
188///
189/// Pre-#61 snapshots stored `Vec<((String, String), ReplicationStatus)>`
190/// (status only; no generation). The new format stores
191/// `Vec<((String, String), ReplicationStatusEntry)>`. Serde is set up
192/// with `#[serde(untagged)]` on a wrapper enum so old snapshots load
193/// with `generation = 0`. A `generation = 0` entry is never stale —
194/// the very next PUT mints `generation = 1` which wins the CAS — so
195/// the migration is loss-free.
196#[derive(Debug, Default, Serialize, Deserialize)]
197struct ReplicationSnapshot {
198    by_bucket: HashMap<String, ReplicationConfig>,
199    /// Per-(bucket, key) replication status. Persisted so a restart
200    /// doesn't lose the COMPLETED stamp on already-replicated
201    /// objects.
202    statuses: Vec<((String, String), StatusOrEntry)>,
203    /// v0.8.2 #61: persist the next generation so a restart doesn't
204    /// reissue tokens that are still in-flight. Optional for
205    /// back-compat — pre-#61 snapshots restore with `next_generation = 1`.
206    #[serde(default)]
207    next_generation: u64,
208}
209
210/// Back-compat wrapper for snapshot deserialisation: accepts either a
211/// bare `ReplicationStatus` (pre-#61 schema) or a full
212/// `ReplicationStatusEntry`. `serde(untagged)` tries the variants in
213/// declaration order — the more-structured `Entry` variant first so
214/// new snapshots round-trip, falling back to bare `Status` for old
215/// snapshots.
216#[derive(Clone, Debug, Serialize, Deserialize)]
217#[serde(untagged)]
218enum StatusOrEntry {
219    Entry(ReplicationStatusEntry),
220    Status(ReplicationStatus),
221}
222
223impl StatusOrEntry {
224    fn into_entry(self) -> ReplicationStatusEntry {
225        match self {
226            Self::Entry(e) => e,
227            // v0.8.3 #66: pre-#61 snapshots have no `recorded_at`; stamp
228            // `Utc::now()` so the first sweep tick sees them as freshly
229            // observed and gives them one full TTL window of grace.
230            Self::Status(s) => ReplicationStatusEntry {
231                status: s,
232                generation: 0,
233                recorded_at: Utc::now(),
234            },
235        }
236    }
237}
238
239/// In-memory manager of per-bucket replication configurations + per-
240/// (bucket, key) replication statuses.
241pub struct ReplicationManager {
242    by_bucket: RwLock<HashMap<String, ReplicationConfig>>,
243    /// Per-(source_bucket, key) replication status entry (status +
244    /// generation token). Looked up by `head_object` / `get_object` to
245    /// stamp `x-amz-replication-status` on the response.
246    statuses: RwLock<HashMap<(String, String), ReplicationStatusEntry>>,
247    /// v0.8.2 #61: monotonic per-source-PUT generation counter. Each
248    /// `put_object` (or `complete_multipart_upload`) on a replicated
249    /// source bucket calls [`Self::next_generation`] before spawning
250    /// its detached replication task. The dispatcher carries the
251    /// generation through to [`Self::record_status_if_newer`], which
252    /// drops the stamp + the destination write when a newer
253    /// generation has already won — guaranteeing the destination
254    /// can't be rolled back by a slow retry.
255    pub next_generation: AtomicU64,
256    /// Bumped each time the dispatcher exhausts its retry budget on a
257    /// destination PUT. Exposed publicly so the metrics layer can poll
258    /// without taking the configuration lock.
259    pub dropped_total: AtomicU64,
260}
261
262impl Default for ReplicationManager {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268impl std::fmt::Debug for ReplicationManager {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        f.debug_struct("ReplicationManager")
271            .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
272            .finish_non_exhaustive()
273    }
274}
275
276impl ReplicationManager {
277    /// Empty manager — no bucket has any replication rules. The
278    /// generation counter starts at 1 so the first PUT-issued token is
279    /// `1` (a stored entry's `generation = 0` from a pre-#61 snapshot
280    /// is then strictly less and the very next PUT wins the CAS).
281    #[must_use]
282    pub fn new() -> Self {
283        Self {
284            by_bucket: RwLock::new(HashMap::new()),
285            statuses: RwLock::new(HashMap::new()),
286            next_generation: AtomicU64::new(1),
287            dropped_total: AtomicU64::new(0),
288        }
289    }
290
291    /// v0.8.2 #61: mint a fresh, monotonically-increasing generation
292    /// token. Caller is the per-source-PUT dispatcher fork (the body-
293    /// bearing `put_object` branch, the body-less `put_object` branch,
294    /// and `complete_multipart_upload`). The token is then carried
295    /// through [`replicate_object`] to [`Self::record_status_if_newer`]
296    /// so a stale retry can be detected and dropped.
297    ///
298    /// Uses `Relaxed` ordering — we only need uniqueness +
299    /// monotonicity per atomic; the cross-thread happens-before
300    /// between PUT-A's spawn and the dispatcher reading the body is
301    /// already established by `tokio::spawn`'s implicit
302    /// `Acquire/Release` on the task queue.
303    pub fn next_generation(&self) -> u64 {
304        self.next_generation.fetch_add(1, Ordering::Relaxed)
305    }
306
307    /// `put_bucket_replication` handler entry. The bucket's existing
308    /// configuration is fully replaced (S3 spec — `PutBucketReplication`
309    /// is upsert-style at the bucket scope, not per-rule patch).
310    pub fn put(&self, bucket: &str, config: ReplicationConfig) {
311        crate::lock_recovery::recover_write(&self.by_bucket, "replication.by_bucket")
312            .insert(bucket.to_owned(), config);
313    }
314
315    /// `get_bucket_replication` handler entry. Returns `None` when
316    /// nothing is registered (AWS S3 returns
317    /// `ReplicationConfigurationNotFoundError` in that case; the
318    /// service-layer handler maps `None` accordingly).
319    #[must_use]
320    pub fn get(&self, bucket: &str) -> Option<ReplicationConfig> {
321        crate::lock_recovery::recover_read(&self.by_bucket, "replication.by_bucket")
322            .get(bucket)
323            .cloned()
324    }
325
326    /// Drop the configuration for `bucket`. Idempotent.
327    pub fn delete(&self, bucket: &str) {
328        crate::lock_recovery::recover_write(&self.by_bucket, "replication.by_bucket")
329            .remove(bucket);
330    }
331
332    /// Serialise the entire manager state (configurations + per-key
333    /// statuses + next generation counter) to JSON. The status entries
334    /// are emitted in the v0.8.2 #61 schema (`ReplicationStatusEntry`);
335    /// readers built before #61 will see the embedded
336    /// `{ status, generation }` shape via the `untagged` enum and
337    /// (older binaries) reject — but the production restart path always
338    /// runs the same binary against its own snapshot.
339    pub fn to_json(&self) -> Result<String, serde_json::Error> {
340        let snap = ReplicationSnapshot {
341            by_bucket: crate::lock_recovery::recover_read(&self.by_bucket, "replication.by_bucket")
342                .clone(),
343            statuses: crate::lock_recovery::recover_read(&self.statuses, "replication.statuses")
344                .iter()
345                .map(|(k, v)| (k.clone(), StatusOrEntry::Entry(v.clone())))
346                .collect(),
347            next_generation: self.next_generation.load(Ordering::Relaxed),
348        };
349        serde_json::to_string(&snap)
350    }
351
352    /// Restore a manager from a previously-emitted snapshot. The
353    /// `dropped_total` counter is reset to 0 — historical drops are
354    /// runtime metrics, not configuration.
355    ///
356    /// ## Back-compat (v0.8.2 #61)
357    ///
358    /// Pre-#61 snapshots store bare `ReplicationStatus` (no
359    /// generation). The `untagged` `StatusOrEntry` enum picks them up
360    /// and assigns `generation = 0`, which the CAS-style
361    /// [`Self::record_status_if_newer`] treats as "always overridable
362    /// by the next real PUT" — guaranteed loss-free migration. The
363    /// `next_generation` counter defaults to `1` when the snapshot
364    /// predates #61 (= `serde(default)` on the field).
365    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
366        let snap: ReplicationSnapshot = serde_json::from_str(s)?;
367        let statuses: HashMap<(String, String), ReplicationStatusEntry> = snap
368            .statuses
369            .into_iter()
370            .map(|(k, v)| (k, v.into_entry()))
371            .collect();
372        // Pre-#61 snapshots come back with `next_generation = 0`
373        // (`serde(default)` on `u64`); start fresh at 1 so all newly-
374        // minted tokens are strictly greater than the legacy
375        // `generation = 0` entries.
376        let next_gen = snap.next_generation.max(1);
377        Ok(Self {
378            by_bucket: RwLock::new(snap.by_bucket),
379            statuses: RwLock::new(statuses),
380            next_generation: AtomicU64::new(next_gen),
381            dropped_total: AtomicU64::new(0),
382        })
383    }
384
385    /// Match an object against the bucket's rules and return the
386    /// highest-priority enabled rule whose filter matches. Returns
387    /// `None` when no rule matches (or no configuration is registered
388    /// for the bucket). Ties on `priority` are broken by declaration
389    /// order — the first such rule wins.
390    #[must_use]
391    pub fn match_rule(
392        &self,
393        bucket: &str,
394        key: &str,
395        object_tags: &[(String, String)],
396    ) -> Option<ReplicationRule> {
397        let map = crate::lock_recovery::recover_read(&self.by_bucket, "replication.by_bucket");
398        let cfg = map.get(bucket)?;
399        let mut best: Option<&ReplicationRule> = None;
400        for rule in &cfg.rules {
401            if !rule.status_enabled {
402                continue;
403            }
404            if !filter_matches(&rule.filter, key, object_tags) {
405                continue;
406            }
407            best = match best {
408                None => Some(rule),
409                Some(prev) if rule.priority > prev.priority => Some(rule),
410                Some(prev) => Some(prev),
411            };
412        }
413        best.cloned()
414    }
415
416    /// Stamp the per-(bucket, key) replication status with no
417    /// generation guard. Replaces any previous entry. **Generation is
418    /// reset to 0** (= overridable by the next real PUT) — callers
419    /// that hold a generation token must use
420    /// [`Self::record_status_if_newer`] instead.
421    ///
422    /// Use cases (kept for back-compat + the eager `Pending` stamp the
423    /// service-layer dispatcher emits before spawning the actual
424    /// replication task):
425    /// - Eager `Pending` stamp synchronously alongside the source PUT
426    ///   so a HEAD between PUT-return and dispatcher-completion sees
427    ///   `PENDING` instead of `None`.
428    /// - Tests that don't care about generation (legacy assertions).
429    pub fn record_status(&self, bucket: &str, key: &str, status: ReplicationStatus) {
430        crate::lock_recovery::recover_write(&self.statuses, "replication.statuses").insert(
431            (bucket.to_owned(), key.to_owned()),
432            ReplicationStatusEntry {
433                status,
434                generation: 0,
435                // v0.8.3 #66: stamp now so a subsequent sweep can
436                // age this entry out once it reaches a terminal
437                // state and exceeds the configured TTL.
438                recorded_at: Utc::now(),
439            },
440        );
441    }
442
443    /// v0.8.2 #61: CAS-style stamp. Only updates the entry when
444    /// `generation >= entry.generation`; rejects the update (returns
445    /// `false`) when `generation < entry.generation` because a newer
446    /// PUT has already won and we must not roll the source's status
447    /// back to a stale terminal state.
448    ///
449    /// ## Returns
450    ///
451    /// - `true` — the stamp was accepted; the caller may proceed with
452    ///   the destination-bucket PUT (in [`replicate_object`]) /
453    ///   declare success.
454    /// - `false` — a strictly-newer generation has already stamped the
455    ///   entry; the caller must **drop the destination write** to
456    ///   avoid overwriting newer bytes with a stale retry's body.
457    ///
458    /// Equality (`generation == entry.generation`) is accepted because
459    /// the same generation may legitimately stamp twice across the
460    /// dispatcher's retry budget (`Pending` → `Completed` on the same
461    /// task).
462    pub fn record_status_if_newer(
463        &self,
464        bucket: &str,
465        key: &str,
466        generation: u64,
467        status: ReplicationStatus,
468    ) -> bool {
469        let mut map = crate::lock_recovery::recover_write(&self.statuses, "replication.statuses");
470        let now = Utc::now();
471        let entry =
472            map.entry((bucket.to_owned(), key.to_owned()))
473                .or_insert(ReplicationStatusEntry {
474                    status: ReplicationStatus::Pending,
475                    generation: 0,
476                    // v0.8.3 #66: stamp at insertion; will be overwritten
477                    // immediately below when the CAS accepts.
478                    recorded_at: now,
479                });
480        if generation < entry.generation {
481            return false;
482        }
483        entry.generation = generation;
484        entry.status = status;
485        // v0.8.3 #66: refresh the timestamp on every accepted stamp so
486        // a Pending → Completed transition (same generation) resets
487        // the sweep clock — the TTL is measured from the **last**
488        // terminal stamp, not the first observation.
489        entry.recorded_at = now;
490        true
491    }
492
493    /// v0.8.3 #66 (H-5 audit fix): drop terminal-state entries
494    /// (`Completed` / `Failed`) older than `max_age`. `Pending` entries
495    /// are never swept because they are still in-flight — the
496    /// dispatcher is racing toward a terminal stamp and dropping the
497    /// `Pending` would lose the eventual outcome (and let the entry
498    /// re-emerge under the original key with no recorded history).
499    /// `Replica` entries can theoretically appear here through legacy
500    /// paths and are likewise preserved (the destination-side stamp is
501    /// not produced by `record_status_if_newer` in the current code,
502    /// but the conservative filter keeps any future use loss-free).
503    ///
504    /// Cutoff is `now - max_age` rather than `Utc::now() - max_age` so
505    /// callers can drive the clock deterministically in tests.
506    ///
507    /// Returns the number of entries removed (operators dashboard via
508    /// `s4_replication_status_swept_total`).
509    pub fn sweep_stale(&self, now: DateTime<Utc>, max_age: chrono::Duration) -> usize {
510        let mut map = crate::lock_recovery::recover_write(&self.statuses, "replication.statuses");
511        let cutoff = now - max_age;
512        let stale: Vec<(String, String)> = map
513            .iter()
514            .filter(|(_, e)| {
515                matches!(
516                    e.status,
517                    ReplicationStatus::Completed | ReplicationStatus::Failed
518                ) && e.recorded_at < cutoff
519            })
520            .map(|(k, _)| k.clone())
521            .collect();
522        let count = stale.len();
523        for k in stale {
524            map.remove(&k);
525        }
526        count
527    }
528
529    /// Look up the recorded replication status for `(bucket, key)`.
530    /// Returns `None` when no PUT to this key has triggered
531    /// replication (= the object is not under any replication rule, or
532    /// it predates the rule's creation).
533    ///
534    /// The `generation` field of the entry is intentionally not
535    /// surfaced here — it's an internal CAS guard, not part of the
536    /// AWS wire shape.
537    #[must_use]
538    pub fn lookup_status(&self, bucket: &str, key: &str) -> Option<ReplicationStatus> {
539        crate::lock_recovery::recover_read(&self.statuses, "replication.statuses")
540            .get(&(bucket.to_owned(), key.to_owned()))
541            .map(|entry| entry.status.clone())
542    }
543}
544
545/// AND of (prefix predicate, every tag pair). An empty / `None` prefix
546/// means "any prefix"; an empty tag list means "no tag predicate".
547fn filter_matches(filter: &ReplicationFilter, key: &str, object_tags: &[(String, String)]) -> bool {
548    if let Some(p) = filter.prefix.as_deref()
549        && !p.is_empty()
550        && !key.starts_with(p)
551    {
552        return false;
553    }
554    for (tk, tv) in &filter.tags {
555        if !object_tags.iter().any(|(ok, ov)| ok == tk && ov == tv) {
556            return false;
557        }
558    }
559    true
560}
561
562const RETRY_ATTEMPTS: u32 = 3;
563const RETRY_BASE_MS: u64 = 50;
564
565/// v0.8.3 #68 (audit M-1): emit a single WARN log line per
566/// `(source_bucket, dest_bucket)` pair the first time we observe a
567/// replication PUT that wanted to propagate Object Lock state but the
568/// destination side has no `ObjectLockManager` attached. The metric
569/// (`s4_replication_lock_propagation_skipped_total`) bumps every time
570/// (so dashboards see the rate); the log is dedup'd because operators
571/// only need to know once that the configuration is asymmetric.
572///
573/// The dedup set lives in a process-static `Mutex<HashSet<(src, dst)>>`
574/// — bounded by the (#source × #destination) pair count, which is
575/// always small (operator-declared rules, not per-key).
576pub fn warn_lock_propagation_skipped(source_bucket: &str, dest_bucket: &str) {
577    use std::collections::HashSet;
578    use std::sync::{Mutex, OnceLock};
579    static SEEN: OnceLock<Mutex<HashSet<(String, String)>>> = OnceLock::new();
580    let seen = SEEN.get_or_init(|| Mutex::new(HashSet::new()));
581    let key = (source_bucket.to_owned(), dest_bucket.to_owned());
582    let first_time = {
583        let mut guard = crate::lock_recovery::recover_mutex(seen, "replication.warn_once_seen");
584        guard.insert(key)
585    };
586    if first_time {
587        tracing::warn!(
588            source_bucket = %source_bucket,
589            dest_bucket = %dest_bucket,
590            "S4 replication: source carries Object Lock state but destination \
591             bucket has no ObjectLockManager attached — replica will be freely \
592             deletable on the destination (WORM posture is source-only). Attach \
593             an ObjectLockManager via S4Service::with_object_lock() on the \
594             destination-side gateway to honour cross-bucket WORM."
595        );
596    }
597    crate::metrics::record_replication_lock_propagation_skipped();
598}
599
600/// Replicate one source-bucket object to the rule's destination bucket.
601///
602/// The caller supplies a `do_put` callback that performs the actual
603/// destination-bucket PUT (so unit tests can drive the dispatcher
604/// without needing a full backend). The callback receives:
605/// `(destination_bucket, key, body, metadata)` and returns a
606/// `Result<(), String>` whose `Err` triggers the retry / failure path.
607///
608/// Behaviour:
609/// - Stamps the destination metadata with `x-amz-replication-status:
610///   REPLICA` so a HEAD on the replica is distinguishable.
611/// - On callback success, records `(source_bucket, source_key) →
612///   Completed` in the manager **iff this task's `generation` is not
613///   already overtaken** (CAS-style guard — see [`ReplicationManager::
614///   record_status_if_newer`]).
615/// - On callback failure, retries up to [`RETRY_ATTEMPTS`] times with
616///   exponential backoff (50ms / 100ms / 200ms). After the budget is
617///   exhausted, records `Failed`, bumps `dropped_total`, and emits the
618///   matching Prometheus counter — also CAS-guarded.
619///
620/// ## v0.8.2 #61 — generation token + destination key override
621///
622/// The two new parameters fix the audit's C-1 + C-3 findings:
623///
624/// - `generation` — monotonic per-source-PUT token from
625///   [`ReplicationManager::next_generation`]. CAS-stamps the source's
626///   status + **suppresses the actual destination PUT** when the
627///   token has been overtaken. Without this guard, a slow retry of
628///   PUT-A could land in the destination *after* PUT-B has already
629///   replicated — rolling the destination back to A's bytes. With
630///   the guard, A's task notices B's higher generation has won and
631///   silently drops its destination write.
632/// - `destination_key_override` — the storage-side key the destination
633///   bucket should write under. For an unversioned source this is
634///   `None` and the dispatcher falls back to the source's logical key
635///   (= the AWS-default behaviour). For a **versioning-Enabled**
636///   source the caller passes `Some(versioned_shadow_key(key, vid))`
637///   so the destination's version chain receives the new version
638///   under the same shadow path the source uses (= a `?versionId=`
639///   GET on the destination resolves through the same shadow-key
640///   lookup as the source).
641///
642/// ## v0.8.3 #68 — Object Lock state propagation (audit M-1)
643///
644/// `source_lock_state` carries the source object's WORM posture
645/// (`mode + retain_until + legal_hold_on`) at PUT time. When `Some`,
646/// the destination PUT is decorated with the AWS-wire lock headers
647/// (`x-amz-object-lock-mode`, `x-amz-object-lock-retain-until-date`,
648/// `x-amz-object-lock-legal-hold`) on the metadata map so the
649/// destination side's `put_object` (or its caller) can persist the
650/// same lock state on the replica. Without this, a Compliance /
651/// Governance / legal-hold protected source had a destination
652/// replica that the destination operator could freely DELETE — the
653/// "WORM compliance posture survives DR" guarantee leaked.
654///
655/// `None` (no lock state on the source) keeps the legacy behaviour:
656/// no extra headers, replica is freely deletable on the destination.
657// 10 args is the post-#68 wire-shape: rule + (source_bucket, source_key,
658// body, metadata) + do_put + manager + (generation, dest_override) +
659// source_lock_state. A shape struct would split the call site without
660// buying anything; the caller (`spawn_replication_if_matched`)
661// constructs every field inline, so the indirection is pure noise.
662#[allow(clippy::too_many_arguments)]
663pub async fn replicate_object<F, Fut>(
664    rule: ReplicationRule,
665    source_bucket: String,
666    source_key: String,
667    body: bytes::Bytes,
668    metadata: Option<HashMap<String, String>>,
669    do_put: F,
670    manager: Arc<ReplicationManager>,
671    generation: u64,
672    destination_key_override: Option<String>,
673    source_lock_state: Option<crate::object_lock::ObjectLockState>,
674) where
675    F: Fn(String, String, bytes::Bytes, Option<HashMap<String, String>>) -> Fut,
676    Fut: std::future::Future<Output = Result<(), String>>,
677{
678    // Replica metadata = source metadata + `x-amz-replication-status:
679    // REPLICA` stamp. Keeping the source's compression / encryption
680    // metadata intact means a GET on the replica decodes through the
681    // same path the source would.
682    let mut replica_meta = metadata.unwrap_or_default();
683    replica_meta.insert(
684        "x-amz-replication-status".to_owned(),
685        ReplicationStatus::Replica.as_aws_str().to_owned(),
686    );
687    if let Some(ref sc) = rule.destination_storage_class {
688        replica_meta.insert("x-amz-storage-class".to_owned(), sc.clone());
689    }
690    // v0.8.3 #68 (audit M-1): propagate the source's Object Lock
691    // posture as AWS-wire lock headers attached to the destination
692    // PUT's metadata map. The destination side reads these and
693    // persists the same lock state on the replica so a DR setup keeps
694    // the WORM guarantee end-to-end (Compliance / Governance / legal
695    // hold cannot be silently bypassed by deleting on the destination).
696    if let Some(ref lock) = source_lock_state {
697        if let Some(mode) = lock.mode {
698            replica_meta.insert(
699                "x-amz-object-lock-mode".to_owned(),
700                mode.as_aws_str().to_owned(),
701            );
702        }
703        if let Some(retain_until) = lock.retain_until {
704            // ISO-8601 / RFC-3339 — the AWS wire form for
705            // `x-amz-object-lock-retain-until-date`.
706            replica_meta.insert(
707                "x-amz-object-lock-retain-until-date".to_owned(),
708                retain_until.to_rfc3339(),
709            );
710        }
711        // Always emit the legal-hold flag when any lock state is
712        // present so an explicit "OFF" is propagated too (an absent
713        // header is ambiguous with "no opinion" on the destination).
714        replica_meta.insert(
715            "x-amz-object-lock-legal-hold".to_owned(),
716            if lock.legal_hold_on { "ON" } else { "OFF" }.to_owned(),
717        );
718    }
719
720    let dest_bucket = rule.destination_bucket.clone();
721    // v0.8.2 #61: when the source PUT was a versioned write, the
722    // override carries the storage-side shadow key
723    // (`<key>.__s4ver__/<vid>`); otherwise we use the logical key.
724    let dest_key = destination_key_override.unwrap_or_else(|| source_key.clone());
725    for attempt in 0..RETRY_ATTEMPTS {
726        // v0.8.2 #61 C-3: pre-PUT generation check. If a newer
727        // generation has already stamped a terminal status on this
728        // (bucket, key), our retry is stale — silently drop the
729        // destination write so we don't roll the destination back to
730        // older bytes. We use `record_status_if_newer` with the
731        // **current** entry's status as a no-op when we're not stale,
732        // but the cheap path is to peek and bail.
733        if let Some(entry) =
734            crate::lock_recovery::recover_read(&manager.statuses, "replication.statuses")
735                .get(&(source_bucket.clone(), source_key.clone()))
736                .cloned()
737            && entry.generation > generation
738        {
739            tracing::debug!(
740                source_bucket = %source_bucket,
741                source_key = %source_key,
742                dest_bucket = %dest_bucket,
743                rule_id = %rule.id,
744                generation,
745                stored_generation = entry.generation,
746                "S4 replication: stale generation, dropping destination PUT"
747            );
748            return;
749        }
750        let result = do_put(
751            dest_bucket.clone(),
752            dest_key.clone(),
753            body.clone(),
754            Some(replica_meta.clone()),
755        )
756        .await;
757        match result {
758            Ok(()) => {
759                let accepted = manager.record_status_if_newer(
760                    &source_bucket,
761                    &source_key,
762                    generation,
763                    ReplicationStatus::Completed,
764                );
765                if !accepted {
766                    // v0.8.2 #61 C-3: the destination PUT raced — a
767                    // newer generation stamped between our pre-check
768                    // and our `do_put.await`. The destination now
769                    // *might* hold our stale bytes (the newer PUT
770                    // could have landed after ours) but we stop
771                    // re-stamping and let the newer task overwrite on
772                    // its own success. Bumps the metric so operators
773                    // see the race surfaced.
774                    crate::metrics::record_replication_drop(&source_bucket);
775                    manager.dropped_total.fetch_add(1, Ordering::Relaxed);
776                    tracing::warn!(
777                        source_bucket = %source_bucket,
778                        source_key = %source_key,
779                        dest_bucket = %dest_bucket,
780                        rule_id = %rule.id,
781                        generation,
782                        "S4 replication: completed but a newer generation has won; \
783                         status not stamped"
784                    );
785                    return;
786                }
787                crate::metrics::record_replication_replicated(&source_bucket, &dest_bucket);
788                tracing::debug!(
789                    source_bucket = %source_bucket,
790                    source_key = %source_key,
791                    dest_bucket = %dest_bucket,
792                    rule_id = %rule.id,
793                    generation,
794                    "S4 replication: COMPLETED"
795                );
796                return;
797            }
798            Err(e) => {
799                if attempt + 1 < RETRY_ATTEMPTS {
800                    let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
801                    tracing::warn!(
802                        source_bucket = %source_bucket,
803                        source_key = %source_key,
804                        dest_bucket = %dest_bucket,
805                        attempt = attempt + 1,
806                        generation,
807                        error = %e,
808                        "S4 replication: attempt failed, retrying"
809                    );
810                    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
811                    continue;
812                }
813                // CAS the terminal Failed too — a newer generation that
814                // succeeded must not be rolled back to Failed.
815                let accepted = manager.record_status_if_newer(
816                    &source_bucket,
817                    &source_key,
818                    generation,
819                    ReplicationStatus::Failed,
820                );
821                manager.dropped_total.fetch_add(1, Ordering::Relaxed);
822                crate::metrics::record_replication_drop(&source_bucket);
823                tracing::warn!(
824                    source_bucket = %source_bucket,
825                    source_key = %source_key,
826                    dest_bucket = %dest_bucket,
827                    rule_id = %rule.id,
828                    generation,
829                    error = %e,
830                    accepted_failed_stamp = accepted,
831                    "S4 replication: FAILED after {RETRY_ATTEMPTS} attempts (drop counter bumped)"
832                );
833                return;
834            }
835        }
836    }
837}
838
839#[cfg(test)]
840mod tests {
841    use super::*;
842    use std::sync::Mutex;
843
844    fn rule(
845        id: &str,
846        priority: u32,
847        enabled: bool,
848        prefix: Option<&str>,
849        tags: &[(&str, &str)],
850        dest: &str,
851    ) -> ReplicationRule {
852        ReplicationRule {
853            id: id.to_owned(),
854            priority,
855            status_enabled: enabled,
856            filter: ReplicationFilter {
857                prefix: prefix.map(str::to_owned),
858                tags: tags
859                    .iter()
860                    .map(|(k, v)| ((*k).to_owned(), (*v).to_owned()))
861                    .collect(),
862            },
863            destination_bucket: dest.to_owned(),
864            destination_storage_class: None,
865        }
866    }
867
868    #[test]
869    fn match_rule_prefix_filter_match_and_miss() {
870        let mgr = ReplicationManager::new();
871        mgr.put(
872            "src",
873            ReplicationConfig {
874                role: "arn:aws:iam::000:role/s4-test".into(),
875                rules: vec![rule("r1", 1, true, Some("logs/"), &[], "dst")],
876            },
877        );
878        assert!(mgr.match_rule("src", "logs/2026/01/01.log", &[]).is_some());
879        assert!(mgr.match_rule("src", "uploads/foo.bin", &[]).is_none());
880    }
881
882    #[test]
883    fn match_rule_no_config_for_bucket() {
884        let mgr = ReplicationManager::new();
885        assert!(mgr.match_rule("ghost", "k", &[]).is_none());
886    }
887
888    #[test]
889    fn match_rule_priority_picks_highest() {
890        let mgr = ReplicationManager::new();
891        mgr.put(
892            "src",
893            ReplicationConfig {
894                role: "arn".into(),
895                rules: vec![
896                    rule("low", 1, true, Some(""), &[], "dst-low"),
897                    rule("high", 10, true, Some(""), &[], "dst-high"),
898                    rule("mid", 5, true, Some(""), &[], "dst-mid"),
899                ],
900            },
901        );
902        let picked = mgr.match_rule("src", "any.bin", &[]).expect("match");
903        assert_eq!(picked.id, "high");
904        assert_eq!(picked.destination_bucket, "dst-high");
905    }
906
907    #[test]
908    fn match_rule_priority_tie_breaker_is_declaration_order() {
909        let mgr = ReplicationManager::new();
910        mgr.put(
911            "src",
912            ReplicationConfig {
913                role: "arn".into(),
914                rules: vec![
915                    rule("first", 5, true, Some(""), &[], "dst-first"),
916                    rule("second", 5, true, Some(""), &[], "dst-second"),
917                ],
918            },
919        );
920        let picked = mgr.match_rule("src", "k", &[]).expect("match");
921        assert_eq!(
922            picked.id, "first",
923            "tie on priority must keep the earlier rule"
924        );
925    }
926
927    #[test]
928    fn match_rule_tag_filter_and_of_all_tags() {
929        let mgr = ReplicationManager::new();
930        mgr.put(
931            "src",
932            ReplicationConfig {
933                role: "arn".into(),
934                rules: vec![rule(
935                    "r-tags",
936                    1,
937                    true,
938                    None,
939                    &[("env", "prod"), ("tier", "gold")],
940                    "dst",
941                )],
942            },
943        );
944        // Both tags present → match.
945        assert!(
946            mgr.match_rule(
947                "src",
948                "k",
949                &[
950                    ("env".into(), "prod".into()),
951                    ("tier".into(), "gold".into()),
952                    ("extra".into(), "ignored".into())
953                ]
954            )
955            .is_some(),
956            "all required tags present (extras OK) must match"
957        );
958        // Only one tag → AND fails.
959        assert!(
960            mgr.match_rule("src", "k", &[("env".into(), "prod".into())])
961                .is_none(),
962            "missing one of the required tags must not match"
963        );
964        // Wrong tag value → AND fails.
965        assert!(
966            mgr.match_rule(
967                "src",
968                "k",
969                &[("env".into(), "dev".into()), ("tier".into(), "gold".into())]
970            )
971            .is_none(),
972            "wrong value on a required tag must not match"
973        );
974    }
975
976    #[test]
977    fn match_rule_status_disabled_never_matches() {
978        let mgr = ReplicationManager::new();
979        mgr.put(
980            "src",
981            ReplicationConfig {
982                role: "arn".into(),
983                rules: vec![rule("disabled", 100, false, None, &[], "dst")],
984            },
985        );
986        assert!(
987            mgr.match_rule("src", "anything", &[]).is_none(),
988            "status_enabled=false must not match even at high priority"
989        );
990    }
991
992    #[test]
993    fn record_and_lookup_status_round_trip() {
994        let mgr = ReplicationManager::new();
995        assert!(mgr.lookup_status("b", "k").is_none());
996        mgr.record_status("b", "k", ReplicationStatus::Pending);
997        assert_eq!(
998            mgr.lookup_status("b", "k"),
999            Some(ReplicationStatus::Pending)
1000        );
1001        mgr.record_status("b", "k", ReplicationStatus::Completed);
1002        assert_eq!(
1003            mgr.lookup_status("b", "k"),
1004            Some(ReplicationStatus::Completed)
1005        );
1006    }
1007
1008    #[test]
1009    fn json_round_trip_preserves_config_and_statuses() {
1010        let mgr = ReplicationManager::new();
1011        mgr.put(
1012            "src",
1013            ReplicationConfig {
1014                role: "arn:aws:iam::000:role/s4".into(),
1015                rules: vec![rule(
1016                    "r1",
1017                    7,
1018                    true,
1019                    Some("docs/"),
1020                    &[("env", "prod")],
1021                    "dst",
1022                )],
1023            },
1024        );
1025        mgr.record_status("src", "docs/a.pdf", ReplicationStatus::Completed);
1026        let json = mgr.to_json().expect("to_json");
1027        let mgr2 = ReplicationManager::from_json(&json).expect("from_json");
1028        assert_eq!(mgr.get("src"), mgr2.get("src"));
1029        assert_eq!(
1030            mgr2.lookup_status("src", "docs/a.pdf"),
1031            Some(ReplicationStatus::Completed)
1032        );
1033    }
1034
1035    #[test]
1036    fn delete_is_idempotent() {
1037        let mgr = ReplicationManager::new();
1038        mgr.delete("never-existed");
1039        mgr.put(
1040            "b",
1041            ReplicationConfig {
1042                role: "arn".into(),
1043                rules: vec![rule("r1", 1, true, None, &[], "dst")],
1044            },
1045        );
1046        mgr.delete("b");
1047        assert!(mgr.get("b").is_none());
1048    }
1049
1050    #[test]
1051    fn put_replaces_previous_config() {
1052        let mgr = ReplicationManager::new();
1053        mgr.put(
1054            "b",
1055            ReplicationConfig {
1056                role: "arn".into(),
1057                rules: vec![rule("old", 1, true, None, &[], "dst-old")],
1058            },
1059        );
1060        mgr.put(
1061            "b",
1062            ReplicationConfig {
1063                role: "arn".into(),
1064                rules: vec![rule("new", 1, true, None, &[], "dst-new")],
1065            },
1066        );
1067        let cfg = mgr.get("b").expect("config");
1068        assert_eq!(cfg.rules.len(), 1);
1069        assert_eq!(cfg.rules[0].id, "new");
1070        assert_eq!(cfg.rules[0].destination_bucket, "dst-new");
1071    }
1072
1073    #[tokio::test]
1074    async fn replicate_object_happy_path_marks_completed() {
1075        type Captured = Vec<(
1076            String,
1077            String,
1078            bytes::Bytes,
1079            Option<HashMap<String, String>>,
1080        )>;
1081        let mgr = Arc::new(ReplicationManager::new());
1082        let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
1083        let captured_cl = Arc::clone(&captured);
1084
1085        let do_put = move |dest: String,
1086                           key: String,
1087                           body: bytes::Bytes,
1088                           meta: Option<HashMap<String, String>>| {
1089            let captured = Arc::clone(&captured_cl);
1090            async move {
1091                captured.lock().unwrap().push((dest, key, body, meta));
1092                Ok::<(), String>(())
1093            }
1094        };
1095
1096        replicate_object(
1097            rule("r1", 1, true, None, &[], "dst"),
1098            "src".into(),
1099            "obj.bin".into(),
1100            bytes::Bytes::from_static(b"hello"),
1101            Some(HashMap::from([(
1102                "content-type".into(),
1103                "text/plain".into(),
1104            )])),
1105            do_put,
1106            Arc::clone(&mgr),
1107            mgr.next_generation(),
1108            None,
1109            None,
1110        )
1111        .await;
1112
1113        assert_eq!(
1114            mgr.lookup_status("src", "obj.bin"),
1115            Some(ReplicationStatus::Completed)
1116        );
1117        assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
1118        let cap = captured.lock().unwrap();
1119        assert_eq!(cap.len(), 1, "do_put must run exactly once on success");
1120        assert_eq!(cap[0].0, "dst");
1121        assert_eq!(cap[0].1, "obj.bin");
1122        assert_eq!(cap[0].2.as_ref(), b"hello");
1123        let meta = cap[0].3.as_ref().expect("metadata stamped");
1124        assert_eq!(
1125            meta.get("x-amz-replication-status").map(String::as_str),
1126            Some("REPLICA"),
1127            "destination meta must carry the REPLICA stamp"
1128        );
1129        assert_eq!(
1130            meta.get("content-type").map(String::as_str),
1131            Some("text/plain")
1132        );
1133    }
1134
1135    #[tokio::test]
1136    async fn replicate_object_failure_after_retry_budget_marks_failed_and_bumps_drop() {
1137        let mgr = Arc::new(ReplicationManager::new());
1138        let attempts: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
1139        let attempts_cl = Arc::clone(&attempts);
1140
1141        let do_put = move |_dest: String,
1142                           _key: String,
1143                           _body: bytes::Bytes,
1144                           _meta: Option<HashMap<String, String>>| {
1145            let attempts = Arc::clone(&attempts_cl);
1146            async move {
1147                *attempts.lock().unwrap() += 1;
1148                Err::<(), String>("simulated destination 5xx".into())
1149            }
1150        };
1151
1152        replicate_object(
1153            rule("r-fail", 1, true, None, &[], "dst"),
1154            "src".into(),
1155            "doomed.bin".into(),
1156            bytes::Bytes::from_static(b"x"),
1157            None,
1158            do_put,
1159            Arc::clone(&mgr),
1160            mgr.next_generation(),
1161            None,
1162            None,
1163        )
1164        .await;
1165
1166        assert_eq!(
1167            *attempts.lock().unwrap(),
1168            RETRY_ATTEMPTS,
1169            "must retry exactly the configured budget"
1170        );
1171        assert_eq!(
1172            mgr.lookup_status("src", "doomed.bin"),
1173            Some(ReplicationStatus::Failed)
1174        );
1175        assert_eq!(
1176            mgr.dropped_total.load(Ordering::Relaxed),
1177            1,
1178            "drop counter must bump exactly once after retry budget exhausted"
1179        );
1180    }
1181
1182    #[test]
1183    fn replication_status_aws_strings_match_spec() {
1184        assert_eq!(ReplicationStatus::Pending.as_aws_str(), "PENDING");
1185        assert_eq!(ReplicationStatus::Completed.as_aws_str(), "COMPLETED");
1186        assert_eq!(ReplicationStatus::Failed.as_aws_str(), "FAILED");
1187        assert_eq!(ReplicationStatus::Replica.as_aws_str(), "REPLICA");
1188    }
1189
1190    // ----- v0.8.2 #61: generation token CAS unit tests -----
1191
1192    #[test]
1193    fn record_status_if_newer_accepts_higher_generation() {
1194        let mgr = ReplicationManager::new();
1195        // First stamp at gen=5 — no prior entry, accepted.
1196        assert!(mgr.record_status_if_newer("b", "k", 5, ReplicationStatus::Pending,));
1197        // Higher generation overrides.
1198        assert!(mgr.record_status_if_newer("b", "k", 7, ReplicationStatus::Completed,));
1199        assert_eq!(
1200            mgr.lookup_status("b", "k"),
1201            Some(ReplicationStatus::Completed)
1202        );
1203    }
1204
1205    #[test]
1206    fn record_status_if_newer_rejects_stale_generation() {
1207        let mgr = ReplicationManager::new();
1208        // Newer PUT lands first.
1209        assert!(mgr.record_status_if_newer("b", "k", 10, ReplicationStatus::Completed,));
1210        // Older retry must be rejected — destination must not roll
1211        // back to "alpha" once "beta" has stamped Completed.
1212        let accepted = mgr.record_status_if_newer("b", "k", 3, ReplicationStatus::Completed);
1213        assert!(!accepted, "stale generation must be rejected");
1214        // Stored entry stays at the newer generation's terminal state.
1215        assert_eq!(
1216            mgr.lookup_status("b", "k"),
1217            Some(ReplicationStatus::Completed)
1218        );
1219    }
1220
1221    #[test]
1222    fn record_status_if_newer_accepts_equal_generation() {
1223        // Same generation may legitimately re-stamp (Pending →
1224        // Completed transition on the same task). The CAS is `>=`
1225        // not `>`.
1226        let mgr = ReplicationManager::new();
1227        assert!(mgr.record_status_if_newer("b", "k", 42, ReplicationStatus::Pending,));
1228        assert!(mgr.record_status_if_newer("b", "k", 42, ReplicationStatus::Completed,));
1229        assert_eq!(
1230            mgr.lookup_status("b", "k"),
1231            Some(ReplicationStatus::Completed)
1232        );
1233    }
1234
1235    #[test]
1236    fn next_generation_is_monotonic() {
1237        let mgr = ReplicationManager::new();
1238        let g1 = mgr.next_generation();
1239        let g2 = mgr.next_generation();
1240        let g3 = mgr.next_generation();
1241        assert!(g2 > g1, "g2={g2} must exceed g1={g1}");
1242        assert!(g3 > g2, "g3={g3} must exceed g2={g2}");
1243        assert_eq!(g2, g1 + 1);
1244        assert_eq!(g3, g2 + 1);
1245    }
1246
1247    #[test]
1248    fn snapshot_pre_61_format_loads_with_zero_generation() {
1249        // Pre-v0.8.2 #61 snapshot shape: bare `ReplicationStatus`,
1250        // no `next_generation` field. The `untagged` enum + serde
1251        // default must round-trip lossily into the new shape, with
1252        // `generation = 0` (= guaranteed loseable to next real PUT).
1253        let pre_61_json = r#"{
1254            "by_bucket": {},
1255            "statuses": [
1256                [["src", "k"], "Completed"]
1257            ]
1258        }"#;
1259        let mgr =
1260            ReplicationManager::from_json(pre_61_json).expect("pre-#61 snapshot must deserialise");
1261        assert_eq!(
1262            mgr.lookup_status("src", "k"),
1263            Some(ReplicationStatus::Completed)
1264        );
1265        // First mint after restore is `1` (max(0, 1)).
1266        assert_eq!(mgr.next_generation(), 1);
1267        // The `generation = 0` legacy entry is overridable by any
1268        // real PUT (= a generation >= 1).
1269        assert!(mgr.record_status_if_newer("src", "k", 1, ReplicationStatus::Pending,));
1270    }
1271
1272    // ----- v0.8.3 #66: sweep + TTL unit tests (H-5 audit fix) -----
1273
1274    /// Helper: install a `(bucket, key)` entry with an explicit
1275    /// `recorded_at` so the sweep test can pin the clock at a known
1276    /// offset from "now". Bypasses `record_status_if_newer` because
1277    /// that always stamps with `Utc::now()`.
1278    fn install_entry_with_recorded_at(
1279        mgr: &ReplicationManager,
1280        bucket: &str,
1281        key: &str,
1282        status: ReplicationStatus,
1283        recorded_at: DateTime<Utc>,
1284    ) {
1285        crate::lock_recovery::recover_write(&mgr.statuses, "replication.statuses").insert(
1286            (bucket.to_owned(), key.to_owned()),
1287            ReplicationStatusEntry {
1288                status,
1289                generation: 1,
1290                recorded_at,
1291            },
1292        );
1293    }
1294
1295    #[test]
1296    fn sweep_stale_drops_completed_past_ttl() {
1297        // Three terminal entries: Completed -10h, Failed -10h, Completed -1h.
1298        // sweep_stale(now, 5h) → drops the two -10h entries, keeps the
1299        // recent Completed.
1300        let mgr = ReplicationManager::new();
1301        let now = Utc::now();
1302        install_entry_with_recorded_at(
1303            &mgr,
1304            "src",
1305            "old-completed",
1306            ReplicationStatus::Completed,
1307            now - chrono::Duration::hours(10),
1308        );
1309        install_entry_with_recorded_at(
1310            &mgr,
1311            "src",
1312            "old-failed",
1313            ReplicationStatus::Failed,
1314            now - chrono::Duration::hours(10),
1315        );
1316        install_entry_with_recorded_at(
1317            &mgr,
1318            "src",
1319            "recent-completed",
1320            ReplicationStatus::Completed,
1321            now - chrono::Duration::hours(1),
1322        );
1323
1324        let n = mgr.sweep_stale(now, chrono::Duration::hours(5));
1325        assert_eq!(n, 2, "two terminal entries past 5h TTL must be swept");
1326        assert!(
1327            mgr.lookup_status("src", "old-completed").is_none(),
1328            "Completed past TTL must be removed"
1329        );
1330        assert!(
1331            mgr.lookup_status("src", "old-failed").is_none(),
1332            "Failed past TTL must be removed"
1333        );
1334        assert_eq!(
1335            mgr.lookup_status("src", "recent-completed"),
1336            Some(ReplicationStatus::Completed),
1337            "Completed within TTL must survive"
1338        );
1339    }
1340
1341    #[test]
1342    fn sweep_stale_keeps_pending_regardless_of_age() {
1343        // A Pending entry stamped 100h ago is **still in-flight**
1344        // (the dispatcher is racing toward a terminal stamp). Sweeping
1345        // it would lose the eventual Completed/Failed outcome and let
1346        // a stale generation re-emerge under the original key with no
1347        // recorded history.
1348        let mgr = ReplicationManager::new();
1349        let now = Utc::now();
1350        install_entry_with_recorded_at(
1351            &mgr,
1352            "src",
1353            "ancient-pending",
1354            ReplicationStatus::Pending,
1355            now - chrono::Duration::hours(100),
1356        );
1357
1358        let n = mgr.sweep_stale(now, chrono::Duration::hours(5));
1359        assert_eq!(n, 0, "Pending entries must never be swept");
1360        assert_eq!(
1361            mgr.lookup_status("src", "ancient-pending"),
1362            Some(ReplicationStatus::Pending),
1363            "ancient Pending must still be present"
1364        );
1365    }
1366
1367    #[test]
1368    fn recorded_at_back_compat_default_now_on_deserialize() {
1369        // A pre-#66 snapshot whose status entries omit `recorded_at`
1370        // must deserialise with `recorded_at = Utc::now()` (= "freshly
1371        // observed at restart"). This delays the first sweep by one
1372        // TTL window but never drops a still-relevant entry early.
1373        // Use the v0.8.2 #61 entry shape (status + generation, no
1374        // recorded_at) to verify the `#[serde(default = "Utc::now")]`
1375        // applies on inner-entry deserialisation too.
1376        let pre_66_json = r#"{
1377            "by_bucket": {},
1378            "statuses": [
1379                [["src", "k"], { "status": "Completed", "generation": 7 }]
1380            ],
1381            "next_generation": 8
1382        }"#;
1383        let before = Utc::now();
1384        let mgr = ReplicationManager::from_json(pre_66_json)
1385            .expect("pre-#66 snapshot with no `recorded_at` must deserialise");
1386        let after = Utc::now();
1387
1388        // Status preserved.
1389        assert_eq!(
1390            mgr.lookup_status("src", "k"),
1391            Some(ReplicationStatus::Completed),
1392        );
1393
1394        // recorded_at defaulted to Utc::now() at deserialise time —
1395        // peek the inner entry to verify the timestamp is in the
1396        // [before, after] window of the from_json call.
1397        let entries = crate::lock_recovery::recover_read(&mgr.statuses, "replication.statuses");
1398        let entry = entries
1399            .get(&("src".to_owned(), "k".to_owned()))
1400            .expect("entry must exist");
1401        assert!(
1402            entry.recorded_at >= before && entry.recorded_at <= after,
1403            "recorded_at default must be Utc::now() at deserialise time \
1404             (got {:?}, expected within [{:?}, {:?}])",
1405            entry.recorded_at,
1406            before,
1407            after
1408        );
1409        assert_eq!(entry.generation, 7, "generation must round-trip");
1410
1411        // A sweep with TTL 1h immediately after must NOT drop this
1412        // entry (recorded_at ≈ now, well within TTL).
1413        drop(entries);
1414        let n = mgr.sweep_stale(Utc::now(), chrono::Duration::hours(1));
1415        assert_eq!(
1416            n, 0,
1417            "freshly-defaulted recorded_at must survive a 1h-TTL sweep"
1418        );
1419    }
1420
1421    // ----- v0.8.3 #68: Object Lock state propagation unit tests (audit M-1) -----
1422
1423    /// When the source object carried an Object Lock state, the
1424    /// dispatcher must inject the AWS-wire lock headers
1425    /// (`x-amz-object-lock-mode`, `-retain-until-date`, `-legal-hold`)
1426    /// into the destination PUT's metadata map so the destination side
1427    /// can persist the same WORM posture on the replica.
1428    #[tokio::test]
1429    async fn replicate_with_source_lock_state_attaches_headers() {
1430        type Captured = Vec<(
1431            String,
1432            String,
1433            bytes::Bytes,
1434            Option<HashMap<String, String>>,
1435        )>;
1436        let mgr = Arc::new(ReplicationManager::new());
1437        let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
1438        let captured_cl = Arc::clone(&captured);
1439
1440        let do_put = move |dest: String,
1441                           key: String,
1442                           body: bytes::Bytes,
1443                           meta: Option<HashMap<String, String>>| {
1444            let captured = Arc::clone(&captured_cl);
1445            async move {
1446                captured.lock().unwrap().push((dest, key, body, meta));
1447                Ok::<(), String>(())
1448            }
1449        };
1450
1451        let retain_until = Utc::now() + chrono::Duration::days(30);
1452        let lock_state = crate::object_lock::ObjectLockState {
1453            mode: Some(crate::object_lock::LockMode::Compliance),
1454            retain_until: Some(retain_until),
1455            legal_hold_on: true,
1456        };
1457
1458        replicate_object(
1459            rule("r-locked", 1, true, None, &[], "dst"),
1460            "src".into(),
1461            "worm.bin".into(),
1462            bytes::Bytes::from_static(b"locked-payload"),
1463            None,
1464            do_put,
1465            Arc::clone(&mgr),
1466            mgr.next_generation(),
1467            None,
1468            Some(lock_state),
1469        )
1470        .await;
1471
1472        let cap = captured.lock().unwrap();
1473        assert_eq!(cap.len(), 1, "do_put must run exactly once on success");
1474        let meta = cap[0].3.as_ref().expect("metadata stamped");
1475        assert_eq!(
1476            meta.get("x-amz-object-lock-mode").map(String::as_str),
1477            Some("COMPLIANCE"),
1478            "Compliance mode header must be propagated"
1479        );
1480        let stamped_until = meta
1481            .get("x-amz-object-lock-retain-until-date")
1482            .expect("retain-until header must be propagated");
1483        // RFC-3339 round-trip: parse back and compare with the source
1484        // retain_until (1s slack for sub-second truncation).
1485        let parsed: chrono::DateTime<chrono::FixedOffset> =
1486            chrono::DateTime::parse_from_rfc3339(stamped_until)
1487                .expect("retain-until must be RFC-3339");
1488        let diff = (parsed.with_timezone(&Utc) - retain_until)
1489            .num_seconds()
1490            .abs();
1491        assert!(diff <= 1, "retain-until off by {diff}s");
1492        assert_eq!(
1493            meta.get("x-amz-object-lock-legal-hold").map(String::as_str),
1494            Some("ON"),
1495            "legal hold state must be propagated as ON"
1496        );
1497        // The REPLICA stamp must still be present alongside the lock
1498        // headers (lock propagation must not displace the replica
1499        // marker that lets HEAD distinguish replica-vs-direct PUT).
1500        assert_eq!(
1501            meta.get("x-amz-replication-status").map(String::as_str),
1502            Some("REPLICA"),
1503        );
1504    }
1505
1506    /// Symmetric: no source lock state → no lock headers leak into
1507    /// the destination metadata (legacy behaviour preserved for the
1508    /// non-WORM PUT path).
1509    #[tokio::test]
1510    async fn replicate_without_source_lock_state_no_headers_added() {
1511        type Captured = Vec<(
1512            String,
1513            String,
1514            bytes::Bytes,
1515            Option<HashMap<String, String>>,
1516        )>;
1517        let mgr = Arc::new(ReplicationManager::new());
1518        let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
1519        let captured_cl = Arc::clone(&captured);
1520
1521        let do_put = move |dest: String,
1522                           key: String,
1523                           body: bytes::Bytes,
1524                           meta: Option<HashMap<String, String>>| {
1525            let captured = Arc::clone(&captured_cl);
1526            async move {
1527                captured.lock().unwrap().push((dest, key, body, meta));
1528                Ok::<(), String>(())
1529            }
1530        };
1531
1532        replicate_object(
1533            rule("r-plain", 1, true, None, &[], "dst"),
1534            "src".into(),
1535            "plain.bin".into(),
1536            bytes::Bytes::from_static(b"plain-payload"),
1537            None,
1538            do_put,
1539            Arc::clone(&mgr),
1540            mgr.next_generation(),
1541            None,
1542            None,
1543        )
1544        .await;
1545
1546        let cap = captured.lock().unwrap();
1547        let meta = cap[0].3.as_ref().expect("metadata stamped");
1548        assert!(
1549            meta.get("x-amz-object-lock-mode").is_none(),
1550            "no lock state ⇒ no mode header (got {:?})",
1551            meta.get("x-amz-object-lock-mode")
1552        );
1553        assert!(
1554            meta.get("x-amz-object-lock-retain-until-date").is_none(),
1555            "no lock state ⇒ no retain-until header"
1556        );
1557        assert!(
1558            meta.get("x-amz-object-lock-legal-hold").is_none(),
1559            "no lock state ⇒ no legal-hold header"
1560        );
1561    }
1562
1563    /// v0.8.4 #77 (audit H-8): a panic inside the `statuses` write
1564    /// guard poisons the lock. `to_json` must recover via
1565    /// [`crate::lock_recovery::recover_read`] and surface the data
1566    /// instead of re-panicking on the SIGUSR1 dump-back path.
1567    #[test]
1568    fn replication_to_json_after_panic_recovers_via_poison() {
1569        let mgr = Arc::new(ReplicationManager::new());
1570        mgr.put(
1571            "src",
1572            ReplicationConfig {
1573                role: "arn:aws:iam::000:role/s4".into(),
1574                rules: vec![rule("r1", 1, true, None, &[], "dst")],
1575            },
1576        );
1577        mgr.record_status("src", "k", ReplicationStatus::Pending);
1578
1579        let mgr_cl = Arc::clone(&mgr);
1580        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1581            let mut g = mgr_cl.statuses.write().expect("clean lock");
1582            g.insert(
1583                ("src".into(), "k2".into()),
1584                ReplicationStatusEntry {
1585                    status: ReplicationStatus::Pending,
1586                    generation: 0,
1587                    recorded_at: Utc::now(),
1588                },
1589            );
1590            panic!("force-poison");
1591        }));
1592        assert!(
1593            mgr.statuses.is_poisoned(),
1594            "write panic must poison statuses lock"
1595        );
1596        let json = mgr.to_json().expect("to_json after poison must succeed");
1597        let mgr2 = ReplicationManager::from_json(&json).expect("from_json");
1598        assert_eq!(
1599            mgr2.lookup_status("src", "k"),
1600            Some(ReplicationStatus::Pending),
1601            "recovered snapshot keeps original status"
1602        );
1603    }
1604}