s4_server/replication.rs
1//! Bucket-to-bucket asynchronous replication (v0.6 #40).
2//!
3//! AWS S3 Cross-Region Replication (CRR) lets a bucket owner declare a
4//! `ReplicationConfiguration` whose rules say "for every PUT to this
5//! bucket that matches `<filter>`, asynchronously copy the new object to
6//! `<destination_bucket>`". The source object grows an
7//! `x-amz-replication-status` of `PENDING` → `COMPLETED` (or `FAILED`),
8//! the replica gets stamped `REPLICA`, and consumers can poll either
9//! HEAD to see how the replication is going.
10//!
11//! ## v0.6 #40 scope (single-instance only)
12//!
13//! - **Same S4 endpoint** — the source bucket and the destination bucket
14//! live on the same `S4Service`. True cross-region (multi-instance,
15//! wire-replicated) replication is a v0.7+ follow-up that needs a
16//! `aws-sdk-s3` PUT to a remote endpoint with its own credentials.
17//! - **Async only** — the originating `put_object` returns as soon as
18//! the source backend write is done. The replica PUT happens on a
19//! detached `tokio::spawn` task and never blocks the client. There is
20//! no synchronous `replication_required` mode (would defeat the whole
21//! point of CRR being asynchronous in the first place).
22//! - **Retry budget = 3 attempts** with exponential backoff (50ms,
23//! 100ms, 200ms). On exhaustion the per-(bucket, key) status flips to
24//! `Failed` and `dropped_total` is bumped + a warn-level log line is
25//! emitted so operators see the loss in `s4_replication_dropped_total`.
26//! - **Highest-priority rule wins** when multiple rules match a single
27//! object key (S3 spec). Ties are broken by declaration order
28//! (deterministic for tests).
29//! - **`status_enabled = false` rules never match**, mirroring the AWS
30//! `ReplicationRuleStatus::Disabled` semantics — the rule sits in the
31//! configuration document but is inert.
32//! - **Replica is full-body** — there is no delta replication, no
33//! incremental fetch, no batching. Every matching PUT triggers one
34//! independent destination PUT.
35//!
36//! ## what is NOT in v0.6 #40
37//!
38//! - Delete-marker replication (S3's `DeleteMarkerReplication` block) —
39//! v0.7+. Right now `delete_object` does not fan out a destination
40//! delete; the replica drifts on the source's deletion.
41//! - Replication of multipart-completed objects through the per-part
42//! copy path. The whole compose-then-PUT result of CMU is replicated
43//! as a single PUT, which is fine for single-instance and matches
44//! what AWS does for source objects ≤ 5 GiB.
45//! - SSE-KMS-encrypted replicas with KMS-key-id rewriting per the
46//! `SourceSelectionCriteria` block (the source's wrapped DEK is
47//! replicated as-is — fine for single-instance because the same KMS
48//! backend unwraps both copies).
49//! - Replication metrics (RTC) — a v0.7+ follow-up that wires a
50//! `replication_lag_seconds` histogram.
51
52use std::collections::HashMap;
53use std::sync::Arc;
54use std::sync::RwLock;
55use std::sync::atomic::{AtomicU64, Ordering};
56
57use chrono::{DateTime, Utc};
58use serde::{Deserialize, Serialize};
59
60/// Per-(bucket, key) replication state, surfaced as the
61/// `x-amz-replication-status` HEAD/GET response header. Values match the
62/// AWS wire form exactly.
63#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
64pub enum ReplicationStatus {
65 /// Replication has been queued (a matching rule fired and the
66 /// dispatcher task has been spawned) but the destination PUT has
67 /// not yet succeeded.
68 Pending,
69 /// Replication has succeeded — the replica exists in the
70 /// destination bucket.
71 Completed,
72 /// Replication failed permanently (retry budget exhausted).
73 Failed,
74 /// Stamped on the destination side so the replica is
75 /// distinguishable from a normal PUT, matching AWS CRR's
76 /// "replica stamp" behaviour.
77 Replica,
78}
79
80impl ReplicationStatus {
81 /// AWS wire-string form. Caller stamps it on the response as the
82 /// `x-amz-replication-status` header.
83 #[must_use]
84 pub fn as_aws_str(&self) -> &'static str {
85 match self {
86 Self::Pending => "PENDING",
87 Self::Completed => "COMPLETED",
88 Self::Failed => "FAILED",
89 Self::Replica => "REPLICA",
90 }
91 }
92}
93
94/// Filter on a `ReplicationRule` — the AND of a key-prefix predicate
95/// and a tag predicate. AWS S3's wire form uses a sum type
96/// (`Prefix | Tag | And { Prefix, Tags }`); we collapse those into
97/// the single representation that the in-memory matcher needs.
98#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
99pub struct ReplicationFilter {
100 /// Empty / `None` means "any prefix".
101 pub prefix: Option<String>,
102 /// AND of every tag pair — every entry here must be present in the
103 /// object's tag set for the rule to fire. Empty means "no tag
104 /// predicate".
105 pub tags: Vec<(String, String)>,
106}
107
108/// One replication rule. Each rule independently decides whether to
109/// copy an object based on the (key, tags) tuple; the replication
110/// manager picks the highest-priority matching rule when multiple
111/// fire on the same object.
112#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
113pub struct ReplicationRule {
114 /// Operator-supplied id (max 255 chars per AWS).
115 pub id: String,
116 /// Higher number = higher priority (S3 spec). When two rules match
117 /// the same key, the higher `priority` wins; ties broken by
118 /// declaration order.
119 pub priority: u32,
120 /// `false` makes the rule inert without removing it from the
121 /// configuration document — mirrors AWS's `Disabled` status.
122 pub status_enabled: bool,
123 /// Subset of source-bucket objects this rule applies to.
124 pub filter: ReplicationFilter,
125 /// Where to copy matching objects. Plain bucket name (no ARN) for
126 /// the v0.6 #40 single-instance scope.
127 pub destination_bucket: String,
128 /// Optional storage-class override on the replica. `None` = keep
129 /// the source's class (S3 default).
130 pub destination_storage_class: Option<String>,
131}
132
133/// Per-bucket replication configuration.
134#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
135pub struct ReplicationConfig {
136 /// Placeholder ARN — not consumed by S4 itself, kept for AWS wire
137 /// compatibility (the `Role` field is mandatory in the
138 /// `PutBucketReplication` XML payload).
139 pub role: String,
140 pub rules: Vec<ReplicationRule>,
141}
142
143/// Per-(source_bucket, source_key) replication status entry, paired
144/// with the **generation token** of the source PUT that produced it.
145///
146/// ## v0.8.2 #61 — generation token
147///
148/// Each `put_object` (or `complete_multipart_upload`) on a source key
149/// pulls a fresh, monotonically-increasing `generation` from the
150/// manager. The detached replication task carries that generation and
151/// only stamps the status when its generation is `>=` the stored one
152/// (CAS-style). A stale retry whose generation has been overtaken by a
153/// newer PUT is silently dropped, so the destination bucket never gets
154/// rolled back to older bytes. See [`ReplicationManager::next_generation`]
155/// + [`ReplicationManager::record_status_if_newer`].
156///
157/// ## v0.8.3 #66 — `recorded_at` for sweep + TTL (H-5 audit fix)
158///
159/// Each stamp records the wall-clock time the entry was last updated.
160/// The hourly sweep task ([`ReplicationManager::sweep_stale`]) drops
161/// terminal entries (`Completed` / `Failed`) older than the operator-
162/// configured TTL, bounding the otherwise-unbounded growth of the
163/// `statuses` map under workloads with many unique keys. `Pending`
164/// entries are never swept (they are still in-flight and dropping them
165/// would lose the eventual `Completed` / `Failed` stamp the dispatcher
166/// is racing toward). Pre-#66 snapshots without `recorded_at` deserialise
167/// with `Utc::now()` (= "freshly observed at restart") which delays
168/// the first sweep by one TTL cycle but never drops a still-relevant
169/// entry early.
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
171pub struct ReplicationStatusEntry {
172 pub status: ReplicationStatus,
173 pub generation: u64,
174 /// v0.8.3 #66: when the entry was last updated. The sweep drops
175 /// terminal entries (Completed / Failed) older than the operator-
176 /// configured TTL. Pending entries are never swept (still in-flight).
177 /// Pre-#66 snapshots default to `Utc::now()` so legacy entries get
178 /// one full TTL window of grace before becoming sweep-eligible.
179 #[serde(default = "Utc::now")]
180 pub recorded_at: DateTime<Utc>,
181}
182
183/// JSON snapshot — `bucket -> ReplicationConfig`. Mirrors the shape of
184/// `notifications::NotificationSnapshot` so operators can hand-edit
185/// configurations across restart cycles.
186///
187/// ## v0.8.2 #61 back-compat
188///
189/// Pre-#61 snapshots stored `Vec<((String, String), ReplicationStatus)>`
190/// (status only; no generation). The new format stores
191/// `Vec<((String, String), ReplicationStatusEntry)>`. Serde is set up
192/// with `#[serde(untagged)]` on a wrapper enum so old snapshots load
193/// with `generation = 0`. A `generation = 0` entry is never stale —
194/// the very next PUT mints `generation = 1` which wins the CAS — so
195/// the migration is loss-free.
196#[derive(Debug, Default, Serialize, Deserialize)]
197struct ReplicationSnapshot {
198 by_bucket: HashMap<String, ReplicationConfig>,
199 /// Per-(bucket, key) replication status. Persisted so a restart
200 /// doesn't lose the COMPLETED stamp on already-replicated
201 /// objects.
202 statuses: Vec<((String, String), StatusOrEntry)>,
203 /// v0.8.2 #61: persist the next generation so a restart doesn't
204 /// reissue tokens that are still in-flight. Optional for
205 /// back-compat — pre-#61 snapshots restore with `next_generation = 1`.
206 #[serde(default)]
207 next_generation: u64,
208}
209
210/// Back-compat wrapper for snapshot deserialisation: accepts either a
211/// bare `ReplicationStatus` (pre-#61 schema) or a full
212/// `ReplicationStatusEntry`. `serde(untagged)` tries the variants in
213/// declaration order — the more-structured `Entry` variant first so
214/// new snapshots round-trip, falling back to bare `Status` for old
215/// snapshots.
216#[derive(Clone, Debug, Serialize, Deserialize)]
217#[serde(untagged)]
218enum StatusOrEntry {
219 Entry(ReplicationStatusEntry),
220 Status(ReplicationStatus),
221}
222
223impl StatusOrEntry {
224 fn into_entry(self) -> ReplicationStatusEntry {
225 match self {
226 Self::Entry(e) => e,
227 // v0.8.3 #66: pre-#61 snapshots have no `recorded_at`; stamp
228 // `Utc::now()` so the first sweep tick sees them as freshly
229 // observed and gives them one full TTL window of grace.
230 Self::Status(s) => ReplicationStatusEntry {
231 status: s,
232 generation: 0,
233 recorded_at: Utc::now(),
234 },
235 }
236 }
237}
238
239/// In-memory manager of per-bucket replication configurations + per-
240/// (bucket, key) replication statuses.
241pub struct ReplicationManager {
242 by_bucket: RwLock<HashMap<String, ReplicationConfig>>,
243 /// Per-(source_bucket, key) replication status entry (status +
244 /// generation token). Looked up by `head_object` / `get_object` to
245 /// stamp `x-amz-replication-status` on the response.
246 statuses: RwLock<HashMap<(String, String), ReplicationStatusEntry>>,
247 /// v0.8.2 #61: monotonic per-source-PUT generation counter. Each
248 /// `put_object` (or `complete_multipart_upload`) on a replicated
249 /// source bucket calls [`Self::next_generation`] before spawning
250 /// its detached replication task. The dispatcher carries the
251 /// generation through to [`Self::record_status_if_newer`], which
252 /// drops the stamp + the destination write when a newer
253 /// generation has already won — guaranteeing the destination
254 /// can't be rolled back by a slow retry.
255 pub next_generation: AtomicU64,
256 /// Bumped each time the dispatcher exhausts its retry budget on a
257 /// destination PUT. Exposed publicly so the metrics layer can poll
258 /// without taking the configuration lock.
259 pub dropped_total: AtomicU64,
260}
261
262impl Default for ReplicationManager {
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268impl std::fmt::Debug for ReplicationManager {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 f.debug_struct("ReplicationManager")
271 .field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
272 .finish_non_exhaustive()
273 }
274}
275
276impl ReplicationManager {
277 /// Empty manager — no bucket has any replication rules. The
278 /// generation counter starts at 1 so the first PUT-issued token is
279 /// `1` (a stored entry's `generation = 0` from a pre-#61 snapshot
280 /// is then strictly less and the very next PUT wins the CAS).
281 #[must_use]
282 pub fn new() -> Self {
283 Self {
284 by_bucket: RwLock::new(HashMap::new()),
285 statuses: RwLock::new(HashMap::new()),
286 next_generation: AtomicU64::new(1),
287 dropped_total: AtomicU64::new(0),
288 }
289 }
290
291 /// v0.8.2 #61: mint a fresh, monotonically-increasing generation
292 /// token. Caller is the per-source-PUT dispatcher fork (the body-
293 /// bearing `put_object` branch, the body-less `put_object` branch,
294 /// and `complete_multipart_upload`). The token is then carried
295 /// through [`replicate_object`] to [`Self::record_status_if_newer`]
296 /// so a stale retry can be detected and dropped.
297 ///
298 /// Uses `Relaxed` ordering — we only need uniqueness +
299 /// monotonicity per atomic; the cross-thread happens-before
300 /// between PUT-A's spawn and the dispatcher reading the body is
301 /// already established by `tokio::spawn`'s implicit
302 /// `Acquire/Release` on the task queue.
303 pub fn next_generation(&self) -> u64 {
304 self.next_generation.fetch_add(1, Ordering::Relaxed)
305 }
306
307 /// `put_bucket_replication` handler entry. The bucket's existing
308 /// configuration is fully replaced (S3 spec — `PutBucketReplication`
309 /// is upsert-style at the bucket scope, not per-rule patch).
310 pub fn put(&self, bucket: &str, config: ReplicationConfig) {
311 self.by_bucket
312 .write()
313 .expect("replication state RwLock poisoned")
314 .insert(bucket.to_owned(), config);
315 }
316
317 /// `get_bucket_replication` handler entry. Returns `None` when
318 /// nothing is registered (AWS S3 returns
319 /// `ReplicationConfigurationNotFoundError` in that case; the
320 /// service-layer handler maps `None` accordingly).
321 #[must_use]
322 pub fn get(&self, bucket: &str) -> Option<ReplicationConfig> {
323 self.by_bucket
324 .read()
325 .expect("replication state RwLock poisoned")
326 .get(bucket)
327 .cloned()
328 }
329
330 /// Drop the configuration for `bucket`. Idempotent.
331 pub fn delete(&self, bucket: &str) {
332 self.by_bucket
333 .write()
334 .expect("replication state RwLock poisoned")
335 .remove(bucket);
336 }
337
338 /// Serialise the entire manager state (configurations + per-key
339 /// statuses + next generation counter) to JSON. The status entries
340 /// are emitted in the v0.8.2 #61 schema (`ReplicationStatusEntry`);
341 /// readers built before #61 will see the embedded
342 /// `{ status, generation }` shape via the `untagged` enum and
343 /// (older binaries) reject — but the production restart path always
344 /// runs the same binary against its own snapshot.
345 pub fn to_json(&self) -> Result<String, serde_json::Error> {
346 let snap = ReplicationSnapshot {
347 by_bucket: self
348 .by_bucket
349 .read()
350 .expect("replication state RwLock poisoned")
351 .clone(),
352 statuses: self
353 .statuses
354 .read()
355 .expect("replication state RwLock poisoned")
356 .iter()
357 .map(|(k, v)| (k.clone(), StatusOrEntry::Entry(v.clone())))
358 .collect(),
359 next_generation: self.next_generation.load(Ordering::Relaxed),
360 };
361 serde_json::to_string(&snap)
362 }
363
364 /// Restore a manager from a previously-emitted snapshot. The
365 /// `dropped_total` counter is reset to 0 — historical drops are
366 /// runtime metrics, not configuration.
367 ///
368 /// ## Back-compat (v0.8.2 #61)
369 ///
370 /// Pre-#61 snapshots store bare `ReplicationStatus` (no
371 /// generation). The `untagged` `StatusOrEntry` enum picks them up
372 /// and assigns `generation = 0`, which the CAS-style
373 /// [`Self::record_status_if_newer`] treats as "always overridable
374 /// by the next real PUT" — guaranteed loss-free migration. The
375 /// `next_generation` counter defaults to `1` when the snapshot
376 /// predates #61 (= `serde(default)` on the field).
377 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
378 let snap: ReplicationSnapshot = serde_json::from_str(s)?;
379 let statuses: HashMap<(String, String), ReplicationStatusEntry> = snap
380 .statuses
381 .into_iter()
382 .map(|(k, v)| (k, v.into_entry()))
383 .collect();
384 // Pre-#61 snapshots come back with `next_generation = 0`
385 // (`serde(default)` on `u64`); start fresh at 1 so all newly-
386 // minted tokens are strictly greater than the legacy
387 // `generation = 0` entries.
388 let next_gen = snap.next_generation.max(1);
389 Ok(Self {
390 by_bucket: RwLock::new(snap.by_bucket),
391 statuses: RwLock::new(statuses),
392 next_generation: AtomicU64::new(next_gen),
393 dropped_total: AtomicU64::new(0),
394 })
395 }
396
397 /// Match an object against the bucket's rules and return the
398 /// highest-priority enabled rule whose filter matches. Returns
399 /// `None` when no rule matches (or no configuration is registered
400 /// for the bucket). Ties on `priority` are broken by declaration
401 /// order — the first such rule wins.
402 #[must_use]
403 pub fn match_rule(
404 &self,
405 bucket: &str,
406 key: &str,
407 object_tags: &[(String, String)],
408 ) -> Option<ReplicationRule> {
409 let map = self
410 .by_bucket
411 .read()
412 .expect("replication state RwLock poisoned");
413 let cfg = map.get(bucket)?;
414 let mut best: Option<&ReplicationRule> = None;
415 for rule in &cfg.rules {
416 if !rule.status_enabled {
417 continue;
418 }
419 if !filter_matches(&rule.filter, key, object_tags) {
420 continue;
421 }
422 best = match best {
423 None => Some(rule),
424 Some(prev) if rule.priority > prev.priority => Some(rule),
425 Some(prev) => Some(prev),
426 };
427 }
428 best.cloned()
429 }
430
431 /// Stamp the per-(bucket, key) replication status with no
432 /// generation guard. Replaces any previous entry. **Generation is
433 /// reset to 0** (= overridable by the next real PUT) — callers
434 /// that hold a generation token must use
435 /// [`Self::record_status_if_newer`] instead.
436 ///
437 /// Use cases (kept for back-compat + the eager `Pending` stamp the
438 /// service-layer dispatcher emits before spawning the actual
439 /// replication task):
440 /// - Eager `Pending` stamp synchronously alongside the source PUT
441 /// so a HEAD between PUT-return and dispatcher-completion sees
442 /// `PENDING` instead of `None`.
443 /// - Tests that don't care about generation (legacy assertions).
444 pub fn record_status(&self, bucket: &str, key: &str, status: ReplicationStatus) {
445 self.statuses
446 .write()
447 .expect("replication state RwLock poisoned")
448 .insert(
449 (bucket.to_owned(), key.to_owned()),
450 ReplicationStatusEntry {
451 status,
452 generation: 0,
453 // v0.8.3 #66: stamp now so a subsequent sweep can
454 // age this entry out once it reaches a terminal
455 // state and exceeds the configured TTL.
456 recorded_at: Utc::now(),
457 },
458 );
459 }
460
461 /// v0.8.2 #61: CAS-style stamp. Only updates the entry when
462 /// `generation >= entry.generation`; rejects the update (returns
463 /// `false`) when `generation < entry.generation` because a newer
464 /// PUT has already won and we must not roll the source's status
465 /// back to a stale terminal state.
466 ///
467 /// ## Returns
468 ///
469 /// - `true` — the stamp was accepted; the caller may proceed with
470 /// the destination-bucket PUT (in [`replicate_object`]) /
471 /// declare success.
472 /// - `false` — a strictly-newer generation has already stamped the
473 /// entry; the caller must **drop the destination write** to
474 /// avoid overwriting newer bytes with a stale retry's body.
475 ///
476 /// Equality (`generation == entry.generation`) is accepted because
477 /// the same generation may legitimately stamp twice across the
478 /// dispatcher's retry budget (`Pending` → `Completed` on the same
479 /// task).
480 pub fn record_status_if_newer(
481 &self,
482 bucket: &str,
483 key: &str,
484 generation: u64,
485 status: ReplicationStatus,
486 ) -> bool {
487 let mut map = self
488 .statuses
489 .write()
490 .expect("replication state RwLock poisoned");
491 let now = Utc::now();
492 let entry = map
493 .entry((bucket.to_owned(), key.to_owned()))
494 .or_insert(ReplicationStatusEntry {
495 status: ReplicationStatus::Pending,
496 generation: 0,
497 // v0.8.3 #66: stamp at insertion; will be overwritten
498 // immediately below when the CAS accepts.
499 recorded_at: now,
500 });
501 if generation < entry.generation {
502 return false;
503 }
504 entry.generation = generation;
505 entry.status = status;
506 // v0.8.3 #66: refresh the timestamp on every accepted stamp so
507 // a Pending → Completed transition (same generation) resets
508 // the sweep clock — the TTL is measured from the **last**
509 // terminal stamp, not the first observation.
510 entry.recorded_at = now;
511 true
512 }
513
514 /// v0.8.3 #66 (H-5 audit fix): drop terminal-state entries
515 /// (`Completed` / `Failed`) older than `max_age`. `Pending` entries
516 /// are never swept because they are still in-flight — the
517 /// dispatcher is racing toward a terminal stamp and dropping the
518 /// `Pending` would lose the eventual outcome (and let the entry
519 /// re-emerge under the original key with no recorded history).
520 /// `Replica` entries can theoretically appear here through legacy
521 /// paths and are likewise preserved (the destination-side stamp is
522 /// not produced by `record_status_if_newer` in the current code,
523 /// but the conservative filter keeps any future use loss-free).
524 ///
525 /// Cutoff is `now - max_age` rather than `Utc::now() - max_age` so
526 /// callers can drive the clock deterministically in tests.
527 ///
528 /// Returns the number of entries removed (operators dashboard via
529 /// `s4_replication_status_swept_total`).
530 pub fn sweep_stale(&self, now: DateTime<Utc>, max_age: chrono::Duration) -> usize {
531 let mut map = self
532 .statuses
533 .write()
534 .expect("replication state RwLock poisoned");
535 let cutoff = now - max_age;
536 let stale: Vec<(String, String)> = map
537 .iter()
538 .filter(|(_, e)| {
539 matches!(
540 e.status,
541 ReplicationStatus::Completed | ReplicationStatus::Failed
542 ) && e.recorded_at < cutoff
543 })
544 .map(|(k, _)| k.clone())
545 .collect();
546 let count = stale.len();
547 for k in stale {
548 map.remove(&k);
549 }
550 count
551 }
552
553 /// Look up the recorded replication status for `(bucket, key)`.
554 /// Returns `None` when no PUT to this key has triggered
555 /// replication (= the object is not under any replication rule, or
556 /// it predates the rule's creation).
557 ///
558 /// The `generation` field of the entry is intentionally not
559 /// surfaced here — it's an internal CAS guard, not part of the
560 /// AWS wire shape.
561 #[must_use]
562 pub fn lookup_status(&self, bucket: &str, key: &str) -> Option<ReplicationStatus> {
563 self.statuses
564 .read()
565 .expect("replication state RwLock poisoned")
566 .get(&(bucket.to_owned(), key.to_owned()))
567 .map(|entry| entry.status.clone())
568 }
569}
570
571/// AND of (prefix predicate, every tag pair). An empty / `None` prefix
572/// means "any prefix"; an empty tag list means "no tag predicate".
573fn filter_matches(
574 filter: &ReplicationFilter,
575 key: &str,
576 object_tags: &[(String, String)],
577) -> bool {
578 if let Some(p) = filter.prefix.as_deref()
579 && !p.is_empty()
580 && !key.starts_with(p)
581 {
582 return false;
583 }
584 for (tk, tv) in &filter.tags {
585 if !object_tags
586 .iter()
587 .any(|(ok, ov)| ok == tk && ov == tv)
588 {
589 return false;
590 }
591 }
592 true
593}
594
595const RETRY_ATTEMPTS: u32 = 3;
596const RETRY_BASE_MS: u64 = 50;
597
598/// v0.8.3 #68 (audit M-1): emit a single WARN log line per
599/// `(source_bucket, dest_bucket)` pair the first time we observe a
600/// replication PUT that wanted to propagate Object Lock state but the
601/// destination side has no `ObjectLockManager` attached. The metric
602/// (`s4_replication_lock_propagation_skipped_total`) bumps every time
603/// (so dashboards see the rate); the log is dedup'd because operators
604/// only need to know once that the configuration is asymmetric.
605///
606/// The dedup set lives in a process-static `Mutex<HashSet<(src, dst)>>`
607/// — bounded by the (#source × #destination) pair count, which is
608/// always small (operator-declared rules, not per-key).
609pub fn warn_lock_propagation_skipped(source_bucket: &str, dest_bucket: &str) {
610 use std::collections::HashSet;
611 use std::sync::{Mutex, OnceLock};
612 static SEEN: OnceLock<Mutex<HashSet<(String, String)>>> = OnceLock::new();
613 let seen = SEEN.get_or_init(|| Mutex::new(HashSet::new()));
614 let key = (source_bucket.to_owned(), dest_bucket.to_owned());
615 let first_time = {
616 let mut guard = seen.lock().expect("warn-once HashSet Mutex poisoned");
617 guard.insert(key)
618 };
619 if first_time {
620 tracing::warn!(
621 source_bucket = %source_bucket,
622 dest_bucket = %dest_bucket,
623 "S4 replication: source carries Object Lock state but destination \
624 bucket has no ObjectLockManager attached — replica will be freely \
625 deletable on the destination (WORM posture is source-only). Attach \
626 an ObjectLockManager via S4Service::with_object_lock() on the \
627 destination-side gateway to honour cross-bucket WORM."
628 );
629 }
630 crate::metrics::record_replication_lock_propagation_skipped();
631}
632
633/// Replicate one source-bucket object to the rule's destination bucket.
634///
635/// The caller supplies a `do_put` callback that performs the actual
636/// destination-bucket PUT (so unit tests can drive the dispatcher
637/// without needing a full backend). The callback receives:
638/// `(destination_bucket, key, body, metadata)` and returns a
639/// `Result<(), String>` whose `Err` triggers the retry / failure path.
640///
641/// Behaviour:
642/// - Stamps the destination metadata with `x-amz-replication-status:
643/// REPLICA` so a HEAD on the replica is distinguishable.
644/// - On callback success, records `(source_bucket, source_key) →
645/// Completed` in the manager **iff this task's `generation` is not
646/// already overtaken** (CAS-style guard — see [`ReplicationManager::
647/// record_status_if_newer`]).
648/// - On callback failure, retries up to [`RETRY_ATTEMPTS`] times with
649/// exponential backoff (50ms / 100ms / 200ms). After the budget is
650/// exhausted, records `Failed`, bumps `dropped_total`, and emits the
651/// matching Prometheus counter — also CAS-guarded.
652///
653/// ## v0.8.2 #61 — generation token + destination key override
654///
655/// The two new parameters fix the audit's C-1 + C-3 findings:
656///
657/// - `generation` — monotonic per-source-PUT token from
658/// [`ReplicationManager::next_generation`]. CAS-stamps the source's
659/// status + **suppresses the actual destination PUT** when the
660/// token has been overtaken. Without this guard, a slow retry of
661/// PUT-A could land in the destination *after* PUT-B has already
662/// replicated — rolling the destination back to A's bytes. With
663/// the guard, A's task notices B's higher generation has won and
664/// silently drops its destination write.
665/// - `destination_key_override` — the storage-side key the destination
666/// bucket should write under. For an unversioned source this is
667/// `None` and the dispatcher falls back to the source's logical key
668/// (= the AWS-default behaviour). For a **versioning-Enabled**
669/// source the caller passes `Some(versioned_shadow_key(key, vid))`
670/// so the destination's version chain receives the new version
671/// under the same shadow path the source uses (= a `?versionId=`
672/// GET on the destination resolves through the same shadow-key
673/// lookup as the source).
674///
675/// ## v0.8.3 #68 — Object Lock state propagation (audit M-1)
676///
677/// `source_lock_state` carries the source object's WORM posture
678/// (`mode + retain_until + legal_hold_on`) at PUT time. When `Some`,
679/// the destination PUT is decorated with the AWS-wire lock headers
680/// (`x-amz-object-lock-mode`, `x-amz-object-lock-retain-until-date`,
681/// `x-amz-object-lock-legal-hold`) on the metadata map so the
682/// destination side's `put_object` (or its caller) can persist the
683/// same lock state on the replica. Without this, a Compliance /
684/// Governance / legal-hold protected source had a destination
685/// replica that the destination operator could freely DELETE — the
686/// "WORM compliance posture survives DR" guarantee leaked.
687///
688/// `None` (no lock state on the source) keeps the legacy behaviour:
689/// no extra headers, replica is freely deletable on the destination.
690// 10 args is the post-#68 wire-shape: rule + (source_bucket, source_key,
691// body, metadata) + do_put + manager + (generation, dest_override) +
692// source_lock_state. A shape struct would split the call site without
693// buying anything; the caller (`spawn_replication_if_matched`)
694// constructs every field inline, so the indirection is pure noise.
695#[allow(clippy::too_many_arguments)]
696pub async fn replicate_object<F, Fut>(
697 rule: ReplicationRule,
698 source_bucket: String,
699 source_key: String,
700 body: bytes::Bytes,
701 metadata: Option<HashMap<String, String>>,
702 do_put: F,
703 manager: Arc<ReplicationManager>,
704 generation: u64,
705 destination_key_override: Option<String>,
706 source_lock_state: Option<crate::object_lock::ObjectLockState>,
707) where
708 F: Fn(String, String, bytes::Bytes, Option<HashMap<String, String>>) -> Fut,
709 Fut: std::future::Future<Output = Result<(), String>>,
710{
711 // Replica metadata = source metadata + `x-amz-replication-status:
712 // REPLICA` stamp. Keeping the source's compression / encryption
713 // metadata intact means a GET on the replica decodes through the
714 // same path the source would.
715 let mut replica_meta = metadata.unwrap_or_default();
716 replica_meta.insert(
717 "x-amz-replication-status".to_owned(),
718 ReplicationStatus::Replica.as_aws_str().to_owned(),
719 );
720 if let Some(ref sc) = rule.destination_storage_class {
721 replica_meta.insert("x-amz-storage-class".to_owned(), sc.clone());
722 }
723 // v0.8.3 #68 (audit M-1): propagate the source's Object Lock
724 // posture as AWS-wire lock headers attached to the destination
725 // PUT's metadata map. The destination side reads these and
726 // persists the same lock state on the replica so a DR setup keeps
727 // the WORM guarantee end-to-end (Compliance / Governance / legal
728 // hold cannot be silently bypassed by deleting on the destination).
729 if let Some(ref lock) = source_lock_state {
730 if let Some(mode) = lock.mode {
731 replica_meta.insert(
732 "x-amz-object-lock-mode".to_owned(),
733 mode.as_aws_str().to_owned(),
734 );
735 }
736 if let Some(retain_until) = lock.retain_until {
737 // ISO-8601 / RFC-3339 — the AWS wire form for
738 // `x-amz-object-lock-retain-until-date`.
739 replica_meta.insert(
740 "x-amz-object-lock-retain-until-date".to_owned(),
741 retain_until.to_rfc3339(),
742 );
743 }
744 // Always emit the legal-hold flag when any lock state is
745 // present so an explicit "OFF" is propagated too (an absent
746 // header is ambiguous with "no opinion" on the destination).
747 replica_meta.insert(
748 "x-amz-object-lock-legal-hold".to_owned(),
749 if lock.legal_hold_on { "ON" } else { "OFF" }.to_owned(),
750 );
751 }
752
753 let dest_bucket = rule.destination_bucket.clone();
754 // v0.8.2 #61: when the source PUT was a versioned write, the
755 // override carries the storage-side shadow key
756 // (`<key>.__s4ver__/<vid>`); otherwise we use the logical key.
757 let dest_key = destination_key_override.unwrap_or_else(|| source_key.clone());
758 for attempt in 0..RETRY_ATTEMPTS {
759 // v0.8.2 #61 C-3: pre-PUT generation check. If a newer
760 // generation has already stamped a terminal status on this
761 // (bucket, key), our retry is stale — silently drop the
762 // destination write so we don't roll the destination back to
763 // older bytes. We use `record_status_if_newer` with the
764 // **current** entry's status as a no-op when we're not stale,
765 // but the cheap path is to peek and bail.
766 if let Some(entry) = manager
767 .statuses
768 .read()
769 .expect("replication state RwLock poisoned")
770 .get(&(source_bucket.clone(), source_key.clone()))
771 .cloned()
772 && entry.generation > generation
773 {
774 tracing::debug!(
775 source_bucket = %source_bucket,
776 source_key = %source_key,
777 dest_bucket = %dest_bucket,
778 rule_id = %rule.id,
779 generation,
780 stored_generation = entry.generation,
781 "S4 replication: stale generation, dropping destination PUT"
782 );
783 return;
784 }
785 let result = do_put(
786 dest_bucket.clone(),
787 dest_key.clone(),
788 body.clone(),
789 Some(replica_meta.clone()),
790 )
791 .await;
792 match result {
793 Ok(()) => {
794 let accepted = manager.record_status_if_newer(
795 &source_bucket,
796 &source_key,
797 generation,
798 ReplicationStatus::Completed,
799 );
800 if !accepted {
801 // v0.8.2 #61 C-3: the destination PUT raced — a
802 // newer generation stamped between our pre-check
803 // and our `do_put.await`. The destination now
804 // *might* hold our stale bytes (the newer PUT
805 // could have landed after ours) but we stop
806 // re-stamping and let the newer task overwrite on
807 // its own success. Bumps the metric so operators
808 // see the race surfaced.
809 crate::metrics::record_replication_drop(&source_bucket);
810 manager.dropped_total.fetch_add(1, Ordering::Relaxed);
811 tracing::warn!(
812 source_bucket = %source_bucket,
813 source_key = %source_key,
814 dest_bucket = %dest_bucket,
815 rule_id = %rule.id,
816 generation,
817 "S4 replication: completed but a newer generation has won; \
818 status not stamped"
819 );
820 return;
821 }
822 crate::metrics::record_replication_replicated(&source_bucket, &dest_bucket);
823 tracing::debug!(
824 source_bucket = %source_bucket,
825 source_key = %source_key,
826 dest_bucket = %dest_bucket,
827 rule_id = %rule.id,
828 generation,
829 "S4 replication: COMPLETED"
830 );
831 return;
832 }
833 Err(e) => {
834 if attempt + 1 < RETRY_ATTEMPTS {
835 let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
836 tracing::warn!(
837 source_bucket = %source_bucket,
838 source_key = %source_key,
839 dest_bucket = %dest_bucket,
840 attempt = attempt + 1,
841 generation,
842 error = %e,
843 "S4 replication: attempt failed, retrying"
844 );
845 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
846 continue;
847 }
848 // CAS the terminal Failed too — a newer generation that
849 // succeeded must not be rolled back to Failed.
850 let accepted = manager.record_status_if_newer(
851 &source_bucket,
852 &source_key,
853 generation,
854 ReplicationStatus::Failed,
855 );
856 manager.dropped_total.fetch_add(1, Ordering::Relaxed);
857 crate::metrics::record_replication_drop(&source_bucket);
858 tracing::warn!(
859 source_bucket = %source_bucket,
860 source_key = %source_key,
861 dest_bucket = %dest_bucket,
862 rule_id = %rule.id,
863 generation,
864 error = %e,
865 accepted_failed_stamp = accepted,
866 "S4 replication: FAILED after {RETRY_ATTEMPTS} attempts (drop counter bumped)"
867 );
868 return;
869 }
870 }
871 }
872}
873
874#[cfg(test)]
875mod tests {
876 use super::*;
877 use std::sync::Mutex;
878
879 fn rule(
880 id: &str,
881 priority: u32,
882 enabled: bool,
883 prefix: Option<&str>,
884 tags: &[(&str, &str)],
885 dest: &str,
886 ) -> ReplicationRule {
887 ReplicationRule {
888 id: id.to_owned(),
889 priority,
890 status_enabled: enabled,
891 filter: ReplicationFilter {
892 prefix: prefix.map(str::to_owned),
893 tags: tags
894 .iter()
895 .map(|(k, v)| ((*k).to_owned(), (*v).to_owned()))
896 .collect(),
897 },
898 destination_bucket: dest.to_owned(),
899 destination_storage_class: None,
900 }
901 }
902
903 #[test]
904 fn match_rule_prefix_filter_match_and_miss() {
905 let mgr = ReplicationManager::new();
906 mgr.put(
907 "src",
908 ReplicationConfig {
909 role: "arn:aws:iam::000:role/s4-test".into(),
910 rules: vec![rule("r1", 1, true, Some("logs/"), &[], "dst")],
911 },
912 );
913 assert!(mgr.match_rule("src", "logs/2026/01/01.log", &[]).is_some());
914 assert!(mgr.match_rule("src", "uploads/foo.bin", &[]).is_none());
915 }
916
917 #[test]
918 fn match_rule_no_config_for_bucket() {
919 let mgr = ReplicationManager::new();
920 assert!(mgr.match_rule("ghost", "k", &[]).is_none());
921 }
922
923 #[test]
924 fn match_rule_priority_picks_highest() {
925 let mgr = ReplicationManager::new();
926 mgr.put(
927 "src",
928 ReplicationConfig {
929 role: "arn".into(),
930 rules: vec![
931 rule("low", 1, true, Some(""), &[], "dst-low"),
932 rule("high", 10, true, Some(""), &[], "dst-high"),
933 rule("mid", 5, true, Some(""), &[], "dst-mid"),
934 ],
935 },
936 );
937 let picked = mgr.match_rule("src", "any.bin", &[]).expect("match");
938 assert_eq!(picked.id, "high");
939 assert_eq!(picked.destination_bucket, "dst-high");
940 }
941
942 #[test]
943 fn match_rule_priority_tie_breaker_is_declaration_order() {
944 let mgr = ReplicationManager::new();
945 mgr.put(
946 "src",
947 ReplicationConfig {
948 role: "arn".into(),
949 rules: vec![
950 rule("first", 5, true, Some(""), &[], "dst-first"),
951 rule("second", 5, true, Some(""), &[], "dst-second"),
952 ],
953 },
954 );
955 let picked = mgr.match_rule("src", "k", &[]).expect("match");
956 assert_eq!(picked.id, "first", "tie on priority must keep the earlier rule");
957 }
958
959 #[test]
960 fn match_rule_tag_filter_and_of_all_tags() {
961 let mgr = ReplicationManager::new();
962 mgr.put(
963 "src",
964 ReplicationConfig {
965 role: "arn".into(),
966 rules: vec![rule(
967 "r-tags",
968 1,
969 true,
970 None,
971 &[("env", "prod"), ("tier", "gold")],
972 "dst",
973 )],
974 },
975 );
976 // Both tags present → match.
977 assert!(
978 mgr.match_rule(
979 "src",
980 "k",
981 &[
982 ("env".into(), "prod".into()),
983 ("tier".into(), "gold".into()),
984 ("extra".into(), "ignored".into())
985 ]
986 )
987 .is_some(),
988 "all required tags present (extras OK) must match"
989 );
990 // Only one tag → AND fails.
991 assert!(
992 mgr.match_rule(
993 "src",
994 "k",
995 &[("env".into(), "prod".into())]
996 )
997 .is_none(),
998 "missing one of the required tags must not match"
999 );
1000 // Wrong tag value → AND fails.
1001 assert!(
1002 mgr.match_rule(
1003 "src",
1004 "k",
1005 &[
1006 ("env".into(), "dev".into()),
1007 ("tier".into(), "gold".into())
1008 ]
1009 )
1010 .is_none(),
1011 "wrong value on a required tag must not match"
1012 );
1013 }
1014
1015 #[test]
1016 fn match_rule_status_disabled_never_matches() {
1017 let mgr = ReplicationManager::new();
1018 mgr.put(
1019 "src",
1020 ReplicationConfig {
1021 role: "arn".into(),
1022 rules: vec![rule("disabled", 100, false, None, &[], "dst")],
1023 },
1024 );
1025 assert!(
1026 mgr.match_rule("src", "anything", &[]).is_none(),
1027 "status_enabled=false must not match even at high priority"
1028 );
1029 }
1030
1031 #[test]
1032 fn record_and_lookup_status_round_trip() {
1033 let mgr = ReplicationManager::new();
1034 assert!(mgr.lookup_status("b", "k").is_none());
1035 mgr.record_status("b", "k", ReplicationStatus::Pending);
1036 assert_eq!(
1037 mgr.lookup_status("b", "k"),
1038 Some(ReplicationStatus::Pending)
1039 );
1040 mgr.record_status("b", "k", ReplicationStatus::Completed);
1041 assert_eq!(
1042 mgr.lookup_status("b", "k"),
1043 Some(ReplicationStatus::Completed)
1044 );
1045 }
1046
1047 #[test]
1048 fn json_round_trip_preserves_config_and_statuses() {
1049 let mgr = ReplicationManager::new();
1050 mgr.put(
1051 "src",
1052 ReplicationConfig {
1053 role: "arn:aws:iam::000:role/s4".into(),
1054 rules: vec![rule("r1", 7, true, Some("docs/"), &[("env", "prod")], "dst")],
1055 },
1056 );
1057 mgr.record_status("src", "docs/a.pdf", ReplicationStatus::Completed);
1058 let json = mgr.to_json().expect("to_json");
1059 let mgr2 = ReplicationManager::from_json(&json).expect("from_json");
1060 assert_eq!(mgr.get("src"), mgr2.get("src"));
1061 assert_eq!(
1062 mgr2.lookup_status("src", "docs/a.pdf"),
1063 Some(ReplicationStatus::Completed)
1064 );
1065 }
1066
1067 #[test]
1068 fn delete_is_idempotent() {
1069 let mgr = ReplicationManager::new();
1070 mgr.delete("never-existed");
1071 mgr.put(
1072 "b",
1073 ReplicationConfig {
1074 role: "arn".into(),
1075 rules: vec![rule("r1", 1, true, None, &[], "dst")],
1076 },
1077 );
1078 mgr.delete("b");
1079 assert!(mgr.get("b").is_none());
1080 }
1081
1082 #[test]
1083 fn put_replaces_previous_config() {
1084 let mgr = ReplicationManager::new();
1085 mgr.put(
1086 "b",
1087 ReplicationConfig {
1088 role: "arn".into(),
1089 rules: vec![rule("old", 1, true, None, &[], "dst-old")],
1090 },
1091 );
1092 mgr.put(
1093 "b",
1094 ReplicationConfig {
1095 role: "arn".into(),
1096 rules: vec![rule("new", 1, true, None, &[], "dst-new")],
1097 },
1098 );
1099 let cfg = mgr.get("b").expect("config");
1100 assert_eq!(cfg.rules.len(), 1);
1101 assert_eq!(cfg.rules[0].id, "new");
1102 assert_eq!(cfg.rules[0].destination_bucket, "dst-new");
1103 }
1104
1105 #[tokio::test]
1106 async fn replicate_object_happy_path_marks_completed() {
1107 type Captured = Vec<(String, String, bytes::Bytes, Option<HashMap<String, String>>)>;
1108 let mgr = Arc::new(ReplicationManager::new());
1109 let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
1110 let captured_cl = Arc::clone(&captured);
1111
1112 let do_put = move |dest: String,
1113 key: String,
1114 body: bytes::Bytes,
1115 meta: Option<HashMap<String, String>>| {
1116 let captured = Arc::clone(&captured_cl);
1117 async move {
1118 captured.lock().unwrap().push((dest, key, body, meta));
1119 Ok::<(), String>(())
1120 }
1121 };
1122
1123 replicate_object(
1124 rule("r1", 1, true, None, &[], "dst"),
1125 "src".into(),
1126 "obj.bin".into(),
1127 bytes::Bytes::from_static(b"hello"),
1128 Some(HashMap::from([("content-type".into(), "text/plain".into())])),
1129 do_put,
1130 Arc::clone(&mgr),
1131 mgr.next_generation(),
1132 None,
1133 None,
1134 )
1135 .await;
1136
1137 assert_eq!(
1138 mgr.lookup_status("src", "obj.bin"),
1139 Some(ReplicationStatus::Completed)
1140 );
1141 assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
1142 let cap = captured.lock().unwrap();
1143 assert_eq!(cap.len(), 1, "do_put must run exactly once on success");
1144 assert_eq!(cap[0].0, "dst");
1145 assert_eq!(cap[0].1, "obj.bin");
1146 assert_eq!(cap[0].2.as_ref(), b"hello");
1147 let meta = cap[0].3.as_ref().expect("metadata stamped");
1148 assert_eq!(
1149 meta.get("x-amz-replication-status").map(String::as_str),
1150 Some("REPLICA"),
1151 "destination meta must carry the REPLICA stamp"
1152 );
1153 assert_eq!(meta.get("content-type").map(String::as_str), Some("text/plain"));
1154 }
1155
1156 #[tokio::test]
1157 async fn replicate_object_failure_after_retry_budget_marks_failed_and_bumps_drop() {
1158 let mgr = Arc::new(ReplicationManager::new());
1159 let attempts: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
1160 let attempts_cl = Arc::clone(&attempts);
1161
1162 let do_put = move |_dest: String,
1163 _key: String,
1164 _body: bytes::Bytes,
1165 _meta: Option<HashMap<String, String>>| {
1166 let attempts = Arc::clone(&attempts_cl);
1167 async move {
1168 *attempts.lock().unwrap() += 1;
1169 Err::<(), String>("simulated destination 5xx".into())
1170 }
1171 };
1172
1173 replicate_object(
1174 rule("r-fail", 1, true, None, &[], "dst"),
1175 "src".into(),
1176 "doomed.bin".into(),
1177 bytes::Bytes::from_static(b"x"),
1178 None,
1179 do_put,
1180 Arc::clone(&mgr),
1181 mgr.next_generation(),
1182 None,
1183 None,
1184 )
1185 .await;
1186
1187 assert_eq!(
1188 *attempts.lock().unwrap(),
1189 RETRY_ATTEMPTS,
1190 "must retry exactly the configured budget"
1191 );
1192 assert_eq!(
1193 mgr.lookup_status("src", "doomed.bin"),
1194 Some(ReplicationStatus::Failed)
1195 );
1196 assert_eq!(
1197 mgr.dropped_total.load(Ordering::Relaxed),
1198 1,
1199 "drop counter must bump exactly once after retry budget exhausted"
1200 );
1201 }
1202
1203 #[test]
1204 fn replication_status_aws_strings_match_spec() {
1205 assert_eq!(ReplicationStatus::Pending.as_aws_str(), "PENDING");
1206 assert_eq!(ReplicationStatus::Completed.as_aws_str(), "COMPLETED");
1207 assert_eq!(ReplicationStatus::Failed.as_aws_str(), "FAILED");
1208 assert_eq!(ReplicationStatus::Replica.as_aws_str(), "REPLICA");
1209 }
1210
1211 // ----- v0.8.2 #61: generation token CAS unit tests -----
1212
1213 #[test]
1214 fn record_status_if_newer_accepts_higher_generation() {
1215 let mgr = ReplicationManager::new();
1216 // First stamp at gen=5 — no prior entry, accepted.
1217 assert!(mgr.record_status_if_newer(
1218 "b",
1219 "k",
1220 5,
1221 ReplicationStatus::Pending,
1222 ));
1223 // Higher generation overrides.
1224 assert!(mgr.record_status_if_newer(
1225 "b",
1226 "k",
1227 7,
1228 ReplicationStatus::Completed,
1229 ));
1230 assert_eq!(
1231 mgr.lookup_status("b", "k"),
1232 Some(ReplicationStatus::Completed)
1233 );
1234 }
1235
1236 #[test]
1237 fn record_status_if_newer_rejects_stale_generation() {
1238 let mgr = ReplicationManager::new();
1239 // Newer PUT lands first.
1240 assert!(mgr.record_status_if_newer(
1241 "b",
1242 "k",
1243 10,
1244 ReplicationStatus::Completed,
1245 ));
1246 // Older retry must be rejected — destination must not roll
1247 // back to "alpha" once "beta" has stamped Completed.
1248 let accepted = mgr.record_status_if_newer(
1249 "b",
1250 "k",
1251 3,
1252 ReplicationStatus::Completed,
1253 );
1254 assert!(!accepted, "stale generation must be rejected");
1255 // Stored entry stays at the newer generation's terminal state.
1256 assert_eq!(
1257 mgr.lookup_status("b", "k"),
1258 Some(ReplicationStatus::Completed)
1259 );
1260 }
1261
1262 #[test]
1263 fn record_status_if_newer_accepts_equal_generation() {
1264 // Same generation may legitimately re-stamp (Pending →
1265 // Completed transition on the same task). The CAS is `>=`
1266 // not `>`.
1267 let mgr = ReplicationManager::new();
1268 assert!(mgr.record_status_if_newer(
1269 "b",
1270 "k",
1271 42,
1272 ReplicationStatus::Pending,
1273 ));
1274 assert!(mgr.record_status_if_newer(
1275 "b",
1276 "k",
1277 42,
1278 ReplicationStatus::Completed,
1279 ));
1280 assert_eq!(
1281 mgr.lookup_status("b", "k"),
1282 Some(ReplicationStatus::Completed)
1283 );
1284 }
1285
1286 #[test]
1287 fn next_generation_is_monotonic() {
1288 let mgr = ReplicationManager::new();
1289 let g1 = mgr.next_generation();
1290 let g2 = mgr.next_generation();
1291 let g3 = mgr.next_generation();
1292 assert!(g2 > g1, "g2={g2} must exceed g1={g1}");
1293 assert!(g3 > g2, "g3={g3} must exceed g2={g2}");
1294 assert_eq!(g2, g1 + 1);
1295 assert_eq!(g3, g2 + 1);
1296 }
1297
1298 #[test]
1299 fn snapshot_pre_61_format_loads_with_zero_generation() {
1300 // Pre-v0.8.2 #61 snapshot shape: bare `ReplicationStatus`,
1301 // no `next_generation` field. The `untagged` enum + serde
1302 // default must round-trip lossily into the new shape, with
1303 // `generation = 0` (= guaranteed loseable to next real PUT).
1304 let pre_61_json = r#"{
1305 "by_bucket": {},
1306 "statuses": [
1307 [["src", "k"], "Completed"]
1308 ]
1309 }"#;
1310 let mgr = ReplicationManager::from_json(pre_61_json)
1311 .expect("pre-#61 snapshot must deserialise");
1312 assert_eq!(
1313 mgr.lookup_status("src", "k"),
1314 Some(ReplicationStatus::Completed)
1315 );
1316 // First mint after restore is `1` (max(0, 1)).
1317 assert_eq!(mgr.next_generation(), 1);
1318 // The `generation = 0` legacy entry is overridable by any
1319 // real PUT (= a generation >= 1).
1320 assert!(mgr.record_status_if_newer(
1321 "src",
1322 "k",
1323 1,
1324 ReplicationStatus::Pending,
1325 ));
1326 }
1327
1328 // ----- v0.8.3 #66: sweep + TTL unit tests (H-5 audit fix) -----
1329
1330 /// Helper: install a `(bucket, key)` entry with an explicit
1331 /// `recorded_at` so the sweep test can pin the clock at a known
1332 /// offset from "now". Bypasses `record_status_if_newer` because
1333 /// that always stamps with `Utc::now()`.
1334 fn install_entry_with_recorded_at(
1335 mgr: &ReplicationManager,
1336 bucket: &str,
1337 key: &str,
1338 status: ReplicationStatus,
1339 recorded_at: DateTime<Utc>,
1340 ) {
1341 mgr.statuses
1342 .write()
1343 .expect("replication state RwLock poisoned")
1344 .insert(
1345 (bucket.to_owned(), key.to_owned()),
1346 ReplicationStatusEntry {
1347 status,
1348 generation: 1,
1349 recorded_at,
1350 },
1351 );
1352 }
1353
1354 #[test]
1355 fn sweep_stale_drops_completed_past_ttl() {
1356 // Three terminal entries: Completed -10h, Failed -10h, Completed -1h.
1357 // sweep_stale(now, 5h) → drops the two -10h entries, keeps the
1358 // recent Completed.
1359 let mgr = ReplicationManager::new();
1360 let now = Utc::now();
1361 install_entry_with_recorded_at(
1362 &mgr,
1363 "src",
1364 "old-completed",
1365 ReplicationStatus::Completed,
1366 now - chrono::Duration::hours(10),
1367 );
1368 install_entry_with_recorded_at(
1369 &mgr,
1370 "src",
1371 "old-failed",
1372 ReplicationStatus::Failed,
1373 now - chrono::Duration::hours(10),
1374 );
1375 install_entry_with_recorded_at(
1376 &mgr,
1377 "src",
1378 "recent-completed",
1379 ReplicationStatus::Completed,
1380 now - chrono::Duration::hours(1),
1381 );
1382
1383 let n = mgr.sweep_stale(now, chrono::Duration::hours(5));
1384 assert_eq!(n, 2, "two terminal entries past 5h TTL must be swept");
1385 assert!(
1386 mgr.lookup_status("src", "old-completed").is_none(),
1387 "Completed past TTL must be removed"
1388 );
1389 assert!(
1390 mgr.lookup_status("src", "old-failed").is_none(),
1391 "Failed past TTL must be removed"
1392 );
1393 assert_eq!(
1394 mgr.lookup_status("src", "recent-completed"),
1395 Some(ReplicationStatus::Completed),
1396 "Completed within TTL must survive"
1397 );
1398 }
1399
1400 #[test]
1401 fn sweep_stale_keeps_pending_regardless_of_age() {
1402 // A Pending entry stamped 100h ago is **still in-flight**
1403 // (the dispatcher is racing toward a terminal stamp). Sweeping
1404 // it would lose the eventual Completed/Failed outcome and let
1405 // a stale generation re-emerge under the original key with no
1406 // recorded history.
1407 let mgr = ReplicationManager::new();
1408 let now = Utc::now();
1409 install_entry_with_recorded_at(
1410 &mgr,
1411 "src",
1412 "ancient-pending",
1413 ReplicationStatus::Pending,
1414 now - chrono::Duration::hours(100),
1415 );
1416
1417 let n = mgr.sweep_stale(now, chrono::Duration::hours(5));
1418 assert_eq!(n, 0, "Pending entries must never be swept");
1419 assert_eq!(
1420 mgr.lookup_status("src", "ancient-pending"),
1421 Some(ReplicationStatus::Pending),
1422 "ancient Pending must still be present"
1423 );
1424 }
1425
1426 #[test]
1427 fn recorded_at_back_compat_default_now_on_deserialize() {
1428 // A pre-#66 snapshot whose status entries omit `recorded_at`
1429 // must deserialise with `recorded_at = Utc::now()` (= "freshly
1430 // observed at restart"). This delays the first sweep by one
1431 // TTL window but never drops a still-relevant entry early.
1432 // Use the v0.8.2 #61 entry shape (status + generation, no
1433 // recorded_at) to verify the `#[serde(default = "Utc::now")]`
1434 // applies on inner-entry deserialisation too.
1435 let pre_66_json = r#"{
1436 "by_bucket": {},
1437 "statuses": [
1438 [["src", "k"], { "status": "Completed", "generation": 7 }]
1439 ],
1440 "next_generation": 8
1441 }"#;
1442 let before = Utc::now();
1443 let mgr = ReplicationManager::from_json(pre_66_json)
1444 .expect("pre-#66 snapshot with no `recorded_at` must deserialise");
1445 let after = Utc::now();
1446
1447 // Status preserved.
1448 assert_eq!(
1449 mgr.lookup_status("src", "k"),
1450 Some(ReplicationStatus::Completed),
1451 );
1452
1453 // recorded_at defaulted to Utc::now() at deserialise time —
1454 // peek the inner entry to verify the timestamp is in the
1455 // [before, after] window of the from_json call.
1456 let entries = mgr
1457 .statuses
1458 .read()
1459 .expect("replication state RwLock poisoned");
1460 let entry = entries
1461 .get(&("src".to_owned(), "k".to_owned()))
1462 .expect("entry must exist");
1463 assert!(
1464 entry.recorded_at >= before && entry.recorded_at <= after,
1465 "recorded_at default must be Utc::now() at deserialise time \
1466 (got {:?}, expected within [{:?}, {:?}])",
1467 entry.recorded_at,
1468 before,
1469 after
1470 );
1471 assert_eq!(entry.generation, 7, "generation must round-trip");
1472
1473 // A sweep with TTL 1h immediately after must NOT drop this
1474 // entry (recorded_at ≈ now, well within TTL).
1475 drop(entries);
1476 let n = mgr.sweep_stale(Utc::now(), chrono::Duration::hours(1));
1477 assert_eq!(
1478 n, 0,
1479 "freshly-defaulted recorded_at must survive a 1h-TTL sweep"
1480 );
1481 }
1482
1483 // ----- v0.8.3 #68: Object Lock state propagation unit tests (audit M-1) -----
1484
1485 /// When the source object carried an Object Lock state, the
1486 /// dispatcher must inject the AWS-wire lock headers
1487 /// (`x-amz-object-lock-mode`, `-retain-until-date`, `-legal-hold`)
1488 /// into the destination PUT's metadata map so the destination side
1489 /// can persist the same WORM posture on the replica.
1490 #[tokio::test]
1491 async fn replicate_with_source_lock_state_attaches_headers() {
1492 type Captured = Vec<(String, String, bytes::Bytes, Option<HashMap<String, String>>)>;
1493 let mgr = Arc::new(ReplicationManager::new());
1494 let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
1495 let captured_cl = Arc::clone(&captured);
1496
1497 let do_put = move |dest: String,
1498 key: String,
1499 body: bytes::Bytes,
1500 meta: Option<HashMap<String, String>>| {
1501 let captured = Arc::clone(&captured_cl);
1502 async move {
1503 captured.lock().unwrap().push((dest, key, body, meta));
1504 Ok::<(), String>(())
1505 }
1506 };
1507
1508 let retain_until = Utc::now() + chrono::Duration::days(30);
1509 let lock_state = crate::object_lock::ObjectLockState {
1510 mode: Some(crate::object_lock::LockMode::Compliance),
1511 retain_until: Some(retain_until),
1512 legal_hold_on: true,
1513 };
1514
1515 replicate_object(
1516 rule("r-locked", 1, true, None, &[], "dst"),
1517 "src".into(),
1518 "worm.bin".into(),
1519 bytes::Bytes::from_static(b"locked-payload"),
1520 None,
1521 do_put,
1522 Arc::clone(&mgr),
1523 mgr.next_generation(),
1524 None,
1525 Some(lock_state),
1526 )
1527 .await;
1528
1529 let cap = captured.lock().unwrap();
1530 assert_eq!(cap.len(), 1, "do_put must run exactly once on success");
1531 let meta = cap[0].3.as_ref().expect("metadata stamped");
1532 assert_eq!(
1533 meta.get("x-amz-object-lock-mode").map(String::as_str),
1534 Some("COMPLIANCE"),
1535 "Compliance mode header must be propagated"
1536 );
1537 let stamped_until = meta
1538 .get("x-amz-object-lock-retain-until-date")
1539 .expect("retain-until header must be propagated");
1540 // RFC-3339 round-trip: parse back and compare with the source
1541 // retain_until (1s slack for sub-second truncation).
1542 let parsed: chrono::DateTime<chrono::FixedOffset> =
1543 chrono::DateTime::parse_from_rfc3339(stamped_until)
1544 .expect("retain-until must be RFC-3339");
1545 let diff = (parsed.with_timezone(&Utc) - retain_until).num_seconds().abs();
1546 assert!(diff <= 1, "retain-until off by {diff}s");
1547 assert_eq!(
1548 meta.get("x-amz-object-lock-legal-hold").map(String::as_str),
1549 Some("ON"),
1550 "legal hold state must be propagated as ON"
1551 );
1552 // The REPLICA stamp must still be present alongside the lock
1553 // headers (lock propagation must not displace the replica
1554 // marker that lets HEAD distinguish replica-vs-direct PUT).
1555 assert_eq!(
1556 meta.get("x-amz-replication-status").map(String::as_str),
1557 Some("REPLICA"),
1558 );
1559 }
1560
1561 /// Symmetric: no source lock state → no lock headers leak into
1562 /// the destination metadata (legacy behaviour preserved for the
1563 /// non-WORM PUT path).
1564 #[tokio::test]
1565 async fn replicate_without_source_lock_state_no_headers_added() {
1566 type Captured = Vec<(String, String, bytes::Bytes, Option<HashMap<String, String>>)>;
1567 let mgr = Arc::new(ReplicationManager::new());
1568 let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
1569 let captured_cl = Arc::clone(&captured);
1570
1571 let do_put = move |dest: String,
1572 key: String,
1573 body: bytes::Bytes,
1574 meta: Option<HashMap<String, String>>| {
1575 let captured = Arc::clone(&captured_cl);
1576 async move {
1577 captured.lock().unwrap().push((dest, key, body, meta));
1578 Ok::<(), String>(())
1579 }
1580 };
1581
1582 replicate_object(
1583 rule("r-plain", 1, true, None, &[], "dst"),
1584 "src".into(),
1585 "plain.bin".into(),
1586 bytes::Bytes::from_static(b"plain-payload"),
1587 None,
1588 do_put,
1589 Arc::clone(&mgr),
1590 mgr.next_generation(),
1591 None,
1592 None,
1593 )
1594 .await;
1595
1596 let cap = captured.lock().unwrap();
1597 let meta = cap[0].3.as_ref().expect("metadata stamped");
1598 assert!(
1599 meta.get("x-amz-object-lock-mode").is_none(),
1600 "no lock state ⇒ no mode header (got {:?})",
1601 meta.get("x-amz-object-lock-mode")
1602 );
1603 assert!(
1604 meta.get("x-amz-object-lock-retain-until-date").is_none(),
1605 "no lock state ⇒ no retain-until header"
1606 );
1607 assert!(
1608 meta.get("x-amz-object-lock-legal-hold").is_none(),
1609 "no lock state ⇒ no legal-hold header"
1610 );
1611 }
1612}