use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::{AtomicU64, Ordering};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum ReplicationStatus {
Pending,
Completed,
Failed,
Replica,
}
impl ReplicationStatus {
#[must_use]
pub fn as_aws_str(&self) -> &'static str {
match self {
Self::Pending => "PENDING",
Self::Completed => "COMPLETED",
Self::Failed => "FAILED",
Self::Replica => "REPLICA",
}
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct ReplicationFilter {
pub prefix: Option<String>,
pub tags: Vec<(String, String)>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ReplicationRule {
pub id: String,
pub priority: u32,
pub status_enabled: bool,
pub filter: ReplicationFilter,
pub destination_bucket: String,
pub destination_storage_class: Option<String>,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct ReplicationConfig {
pub role: String,
pub rules: Vec<ReplicationRule>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ReplicationStatusEntry {
pub status: ReplicationStatus,
pub generation: u64,
#[serde(default = "Utc::now")]
pub recorded_at: DateTime<Utc>,
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct ReplicationSnapshot {
by_bucket: HashMap<String, ReplicationConfig>,
statuses: Vec<((String, String), StatusOrEntry)>,
#[serde(default)]
next_generation: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum StatusOrEntry {
Entry(ReplicationStatusEntry),
Status(ReplicationStatus),
}
impl StatusOrEntry {
fn into_entry(self) -> ReplicationStatusEntry {
match self {
Self::Entry(e) => e,
Self::Status(s) => ReplicationStatusEntry {
status: s,
generation: 0,
recorded_at: Utc::now(),
},
}
}
}
pub struct ReplicationManager {
by_bucket: RwLock<HashMap<String, ReplicationConfig>>,
statuses: RwLock<HashMap<(String, String), ReplicationStatusEntry>>,
pub next_generation: AtomicU64,
pub dropped_total: AtomicU64,
}
impl Default for ReplicationManager {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for ReplicationManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReplicationManager")
.field("dropped_total", &self.dropped_total.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl ReplicationManager {
#[must_use]
pub fn new() -> Self {
Self {
by_bucket: RwLock::new(HashMap::new()),
statuses: RwLock::new(HashMap::new()),
next_generation: AtomicU64::new(1),
dropped_total: AtomicU64::new(0),
}
}
pub fn next_generation(&self) -> u64 {
self.next_generation.fetch_add(1, Ordering::Relaxed)
}
pub fn put(&self, bucket: &str, config: ReplicationConfig) {
crate::lock_recovery::recover_write(&self.by_bucket, "replication.by_bucket")
.insert(bucket.to_owned(), config);
}
#[must_use]
pub fn get(&self, bucket: &str) -> Option<ReplicationConfig> {
crate::lock_recovery::recover_read(&self.by_bucket, "replication.by_bucket")
.get(bucket)
.cloned()
}
pub fn delete(&self, bucket: &str) {
crate::lock_recovery::recover_write(&self.by_bucket, "replication.by_bucket")
.remove(bucket);
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
let snap = ReplicationSnapshot {
by_bucket: crate::lock_recovery::recover_read(&self.by_bucket, "replication.by_bucket")
.clone(),
statuses: crate::lock_recovery::recover_read(&self.statuses, "replication.statuses")
.iter()
.map(|(k, v)| (k.clone(), StatusOrEntry::Entry(v.clone())))
.collect(),
next_generation: self.next_generation.load(Ordering::Relaxed),
};
serde_json::to_string(&snap)
}
pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
let snap: ReplicationSnapshot = serde_json::from_str(s)?;
let statuses: HashMap<(String, String), ReplicationStatusEntry> = snap
.statuses
.into_iter()
.map(|(k, v)| (k, v.into_entry()))
.collect();
let next_gen = snap.next_generation.max(1);
Ok(Self {
by_bucket: RwLock::new(snap.by_bucket),
statuses: RwLock::new(statuses),
next_generation: AtomicU64::new(next_gen),
dropped_total: AtomicU64::new(0),
})
}
#[must_use]
pub fn match_rule(
&self,
bucket: &str,
key: &str,
object_tags: &[(String, String)],
) -> Option<ReplicationRule> {
let map = crate::lock_recovery::recover_read(&self.by_bucket, "replication.by_bucket");
let cfg = map.get(bucket)?;
let mut best: Option<&ReplicationRule> = None;
for rule in &cfg.rules {
if !rule.status_enabled {
continue;
}
if !filter_matches(&rule.filter, key, object_tags) {
continue;
}
best = match best {
None => Some(rule),
Some(prev) if rule.priority > prev.priority => Some(rule),
Some(prev) => Some(prev),
};
}
best.cloned()
}
pub fn record_status(&self, bucket: &str, key: &str, status: ReplicationStatus) {
crate::lock_recovery::recover_write(&self.statuses, "replication.statuses").insert(
(bucket.to_owned(), key.to_owned()),
ReplicationStatusEntry {
status,
generation: 0,
recorded_at: Utc::now(),
},
);
}
pub fn record_status_if_newer(
&self,
bucket: &str,
key: &str,
generation: u64,
status: ReplicationStatus,
) -> bool {
let mut map = crate::lock_recovery::recover_write(&self.statuses, "replication.statuses");
let now = Utc::now();
let entry =
map.entry((bucket.to_owned(), key.to_owned()))
.or_insert(ReplicationStatusEntry {
status: ReplicationStatus::Pending,
generation: 0,
recorded_at: now,
});
if generation < entry.generation {
return false;
}
entry.generation = generation;
entry.status = status;
entry.recorded_at = now;
true
}
pub fn sweep_stale(&self, now: DateTime<Utc>, max_age: chrono::Duration) -> usize {
let mut map = crate::lock_recovery::recover_write(&self.statuses, "replication.statuses");
let cutoff = now - max_age;
let stale: Vec<(String, String)> = map
.iter()
.filter(|(_, e)| {
matches!(
e.status,
ReplicationStatus::Completed | ReplicationStatus::Failed
) && e.recorded_at < cutoff
})
.map(|(k, _)| k.clone())
.collect();
let count = stale.len();
for k in stale {
map.remove(&k);
}
count
}
#[must_use]
pub fn lookup_status(&self, bucket: &str, key: &str) -> Option<ReplicationStatus> {
crate::lock_recovery::recover_read(&self.statuses, "replication.statuses")
.get(&(bucket.to_owned(), key.to_owned()))
.map(|entry| entry.status.clone())
}
}
fn filter_matches(filter: &ReplicationFilter, key: &str, object_tags: &[(String, String)]) -> bool {
if let Some(p) = filter.prefix.as_deref()
&& !p.is_empty()
&& !key.starts_with(p)
{
return false;
}
for (tk, tv) in &filter.tags {
if !object_tags.iter().any(|(ok, ov)| ok == tk && ov == tv) {
return false;
}
}
true
}
const RETRY_ATTEMPTS: u32 = 3;
const RETRY_BASE_MS: u64 = 50;
pub fn warn_lock_propagation_skipped(source_bucket: &str, dest_bucket: &str) {
use std::collections::HashSet;
use std::sync::{Mutex, OnceLock};
static SEEN: OnceLock<Mutex<HashSet<(String, String)>>> = OnceLock::new();
let seen = SEEN.get_or_init(|| Mutex::new(HashSet::new()));
let key = (source_bucket.to_owned(), dest_bucket.to_owned());
let first_time = {
let mut guard = crate::lock_recovery::recover_mutex(seen, "replication.warn_once_seen");
guard.insert(key)
};
if first_time {
tracing::warn!(
source_bucket = %source_bucket,
dest_bucket = %dest_bucket,
"S4 replication: source carries Object Lock state but destination \
bucket has no ObjectLockManager attached — replica will be freely \
deletable on the destination (WORM posture is source-only). Attach \
an ObjectLockManager via S4Service::with_object_lock() on the \
destination-side gateway to honour cross-bucket WORM."
);
}
crate::metrics::record_replication_lock_propagation_skipped();
}
#[allow(clippy::too_many_arguments)]
pub async fn replicate_object<F, Fut>(
rule: ReplicationRule,
source_bucket: String,
source_key: String,
body: bytes::Bytes,
metadata: Option<HashMap<String, String>>,
do_put: F,
manager: Arc<ReplicationManager>,
generation: u64,
destination_key_override: Option<String>,
source_lock_state: Option<crate::object_lock::ObjectLockState>,
) where
F: Fn(String, String, bytes::Bytes, Option<HashMap<String, String>>) -> Fut,
Fut: std::future::Future<Output = Result<(), String>>,
{
let mut replica_meta = metadata.unwrap_or_default();
replica_meta.insert(
"x-amz-replication-status".to_owned(),
ReplicationStatus::Replica.as_aws_str().to_owned(),
);
if let Some(ref sc) = rule.destination_storage_class {
replica_meta.insert("x-amz-storage-class".to_owned(), sc.clone());
}
if let Some(ref lock) = source_lock_state {
if let Some(mode) = lock.mode {
replica_meta.insert(
"x-amz-object-lock-mode".to_owned(),
mode.as_aws_str().to_owned(),
);
}
if let Some(retain_until) = lock.retain_until {
replica_meta.insert(
"x-amz-object-lock-retain-until-date".to_owned(),
retain_until.to_rfc3339(),
);
}
replica_meta.insert(
"x-amz-object-lock-legal-hold".to_owned(),
if lock.legal_hold_on { "ON" } else { "OFF" }.to_owned(),
);
}
let dest_bucket = rule.destination_bucket.clone();
let dest_key = destination_key_override.unwrap_or_else(|| source_key.clone());
for attempt in 0..RETRY_ATTEMPTS {
if let Some(entry) =
crate::lock_recovery::recover_read(&manager.statuses, "replication.statuses")
.get(&(source_bucket.clone(), source_key.clone()))
.cloned()
&& entry.generation > generation
{
tracing::debug!(
source_bucket = %source_bucket,
source_key = %source_key,
dest_bucket = %dest_bucket,
rule_id = %rule.id,
generation,
stored_generation = entry.generation,
"S4 replication: stale generation, dropping destination PUT"
);
return;
}
let result = do_put(
dest_bucket.clone(),
dest_key.clone(),
body.clone(),
Some(replica_meta.clone()),
)
.await;
match result {
Ok(()) => {
let accepted = manager.record_status_if_newer(
&source_bucket,
&source_key,
generation,
ReplicationStatus::Completed,
);
if !accepted {
crate::metrics::record_replication_drop(&source_bucket);
manager.dropped_total.fetch_add(1, Ordering::Relaxed);
tracing::warn!(
source_bucket = %source_bucket,
source_key = %source_key,
dest_bucket = %dest_bucket,
rule_id = %rule.id,
generation,
"S4 replication: completed but a newer generation has won; \
status not stamped"
);
return;
}
crate::metrics::record_replication_replicated(&source_bucket, &dest_bucket);
tracing::debug!(
source_bucket = %source_bucket,
source_key = %source_key,
dest_bucket = %dest_bucket,
rule_id = %rule.id,
generation,
"S4 replication: COMPLETED"
);
return;
}
Err(e) => {
if attempt + 1 < RETRY_ATTEMPTS {
let delay_ms = RETRY_BASE_MS * (1u64 << attempt);
tracing::warn!(
source_bucket = %source_bucket,
source_key = %source_key,
dest_bucket = %dest_bucket,
attempt = attempt + 1,
generation,
error = %e,
"S4 replication: attempt failed, retrying"
);
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
continue;
}
let accepted = manager.record_status_if_newer(
&source_bucket,
&source_key,
generation,
ReplicationStatus::Failed,
);
manager.dropped_total.fetch_add(1, Ordering::Relaxed);
crate::metrics::record_replication_drop(&source_bucket);
tracing::warn!(
source_bucket = %source_bucket,
source_key = %source_key,
dest_bucket = %dest_bucket,
rule_id = %rule.id,
generation,
error = %e,
accepted_failed_stamp = accepted,
"S4 replication: FAILED after {RETRY_ATTEMPTS} attempts (drop counter bumped)"
);
return;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
fn rule(
id: &str,
priority: u32,
enabled: bool,
prefix: Option<&str>,
tags: &[(&str, &str)],
dest: &str,
) -> ReplicationRule {
ReplicationRule {
id: id.to_owned(),
priority,
status_enabled: enabled,
filter: ReplicationFilter {
prefix: prefix.map(str::to_owned),
tags: tags
.iter()
.map(|(k, v)| ((*k).to_owned(), (*v).to_owned()))
.collect(),
},
destination_bucket: dest.to_owned(),
destination_storage_class: None,
}
}
#[test]
fn match_rule_prefix_filter_match_and_miss() {
let mgr = ReplicationManager::new();
mgr.put(
"src",
ReplicationConfig {
role: "arn:aws:iam::000:role/s4-test".into(),
rules: vec![rule("r1", 1, true, Some("logs/"), &[], "dst")],
},
);
assert!(mgr.match_rule("src", "logs/2026/01/01.log", &[]).is_some());
assert!(mgr.match_rule("src", "uploads/foo.bin", &[]).is_none());
}
#[test]
fn match_rule_no_config_for_bucket() {
let mgr = ReplicationManager::new();
assert!(mgr.match_rule("ghost", "k", &[]).is_none());
}
#[test]
fn match_rule_priority_picks_highest() {
let mgr = ReplicationManager::new();
mgr.put(
"src",
ReplicationConfig {
role: "arn".into(),
rules: vec![
rule("low", 1, true, Some(""), &[], "dst-low"),
rule("high", 10, true, Some(""), &[], "dst-high"),
rule("mid", 5, true, Some(""), &[], "dst-mid"),
],
},
);
let picked = mgr.match_rule("src", "any.bin", &[]).expect("match");
assert_eq!(picked.id, "high");
assert_eq!(picked.destination_bucket, "dst-high");
}
#[test]
fn match_rule_priority_tie_breaker_is_declaration_order() {
let mgr = ReplicationManager::new();
mgr.put(
"src",
ReplicationConfig {
role: "arn".into(),
rules: vec![
rule("first", 5, true, Some(""), &[], "dst-first"),
rule("second", 5, true, Some(""), &[], "dst-second"),
],
},
);
let picked = mgr.match_rule("src", "k", &[]).expect("match");
assert_eq!(
picked.id, "first",
"tie on priority must keep the earlier rule"
);
}
#[test]
fn match_rule_tag_filter_and_of_all_tags() {
let mgr = ReplicationManager::new();
mgr.put(
"src",
ReplicationConfig {
role: "arn".into(),
rules: vec![rule(
"r-tags",
1,
true,
None,
&[("env", "prod"), ("tier", "gold")],
"dst",
)],
},
);
assert!(
mgr.match_rule(
"src",
"k",
&[
("env".into(), "prod".into()),
("tier".into(), "gold".into()),
("extra".into(), "ignored".into())
]
)
.is_some(),
"all required tags present (extras OK) must match"
);
assert!(
mgr.match_rule("src", "k", &[("env".into(), "prod".into())])
.is_none(),
"missing one of the required tags must not match"
);
assert!(
mgr.match_rule(
"src",
"k",
&[("env".into(), "dev".into()), ("tier".into(), "gold".into())]
)
.is_none(),
"wrong value on a required tag must not match"
);
}
#[test]
fn match_rule_status_disabled_never_matches() {
let mgr = ReplicationManager::new();
mgr.put(
"src",
ReplicationConfig {
role: "arn".into(),
rules: vec![rule("disabled", 100, false, None, &[], "dst")],
},
);
assert!(
mgr.match_rule("src", "anything", &[]).is_none(),
"status_enabled=false must not match even at high priority"
);
}
#[test]
fn record_and_lookup_status_round_trip() {
let mgr = ReplicationManager::new();
assert!(mgr.lookup_status("b", "k").is_none());
mgr.record_status("b", "k", ReplicationStatus::Pending);
assert_eq!(
mgr.lookup_status("b", "k"),
Some(ReplicationStatus::Pending)
);
mgr.record_status("b", "k", ReplicationStatus::Completed);
assert_eq!(
mgr.lookup_status("b", "k"),
Some(ReplicationStatus::Completed)
);
}
#[test]
fn json_round_trip_preserves_config_and_statuses() {
let mgr = ReplicationManager::new();
mgr.put(
"src",
ReplicationConfig {
role: "arn:aws:iam::000:role/s4".into(),
rules: vec![rule(
"r1",
7,
true,
Some("docs/"),
&[("env", "prod")],
"dst",
)],
},
);
mgr.record_status("src", "docs/a.pdf", ReplicationStatus::Completed);
let json = mgr.to_json().expect("to_json");
let mgr2 = ReplicationManager::from_json(&json).expect("from_json");
assert_eq!(mgr.get("src"), mgr2.get("src"));
assert_eq!(
mgr2.lookup_status("src", "docs/a.pdf"),
Some(ReplicationStatus::Completed)
);
}
#[test]
fn delete_is_idempotent() {
let mgr = ReplicationManager::new();
mgr.delete("never-existed");
mgr.put(
"b",
ReplicationConfig {
role: "arn".into(),
rules: vec![rule("r1", 1, true, None, &[], "dst")],
},
);
mgr.delete("b");
assert!(mgr.get("b").is_none());
}
#[test]
fn put_replaces_previous_config() {
let mgr = ReplicationManager::new();
mgr.put(
"b",
ReplicationConfig {
role: "arn".into(),
rules: vec![rule("old", 1, true, None, &[], "dst-old")],
},
);
mgr.put(
"b",
ReplicationConfig {
role: "arn".into(),
rules: vec![rule("new", 1, true, None, &[], "dst-new")],
},
);
let cfg = mgr.get("b").expect("config");
assert_eq!(cfg.rules.len(), 1);
assert_eq!(cfg.rules[0].id, "new");
assert_eq!(cfg.rules[0].destination_bucket, "dst-new");
}
#[tokio::test]
async fn replicate_object_happy_path_marks_completed() {
type Captured = Vec<(
String,
String,
bytes::Bytes,
Option<HashMap<String, String>>,
)>;
let mgr = Arc::new(ReplicationManager::new());
let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
let captured_cl = Arc::clone(&captured);
let do_put = move |dest: String,
key: String,
body: bytes::Bytes,
meta: Option<HashMap<String, String>>| {
let captured = Arc::clone(&captured_cl);
async move {
captured.lock().unwrap().push((dest, key, body, meta));
Ok::<(), String>(())
}
};
replicate_object(
rule("r1", 1, true, None, &[], "dst"),
"src".into(),
"obj.bin".into(),
bytes::Bytes::from_static(b"hello"),
Some(HashMap::from([(
"content-type".into(),
"text/plain".into(),
)])),
do_put,
Arc::clone(&mgr),
mgr.next_generation(),
None,
None,
)
.await;
assert_eq!(
mgr.lookup_status("src", "obj.bin"),
Some(ReplicationStatus::Completed)
);
assert_eq!(mgr.dropped_total.load(Ordering::Relaxed), 0);
let cap = captured.lock().unwrap();
assert_eq!(cap.len(), 1, "do_put must run exactly once on success");
assert_eq!(cap[0].0, "dst");
assert_eq!(cap[0].1, "obj.bin");
assert_eq!(cap[0].2.as_ref(), b"hello");
let meta = cap[0].3.as_ref().expect("metadata stamped");
assert_eq!(
meta.get("x-amz-replication-status").map(String::as_str),
Some("REPLICA"),
"destination meta must carry the REPLICA stamp"
);
assert_eq!(
meta.get("content-type").map(String::as_str),
Some("text/plain")
);
}
#[tokio::test]
async fn replicate_object_failure_after_retry_budget_marks_failed_and_bumps_drop() {
let mgr = Arc::new(ReplicationManager::new());
let attempts: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
let attempts_cl = Arc::clone(&attempts);
let do_put = move |_dest: String,
_key: String,
_body: bytes::Bytes,
_meta: Option<HashMap<String, String>>| {
let attempts = Arc::clone(&attempts_cl);
async move {
*attempts.lock().unwrap() += 1;
Err::<(), String>("simulated destination 5xx".into())
}
};
replicate_object(
rule("r-fail", 1, true, None, &[], "dst"),
"src".into(),
"doomed.bin".into(),
bytes::Bytes::from_static(b"x"),
None,
do_put,
Arc::clone(&mgr),
mgr.next_generation(),
None,
None,
)
.await;
assert_eq!(
*attempts.lock().unwrap(),
RETRY_ATTEMPTS,
"must retry exactly the configured budget"
);
assert_eq!(
mgr.lookup_status("src", "doomed.bin"),
Some(ReplicationStatus::Failed)
);
assert_eq!(
mgr.dropped_total.load(Ordering::Relaxed),
1,
"drop counter must bump exactly once after retry budget exhausted"
);
}
#[test]
fn replication_status_aws_strings_match_spec() {
assert_eq!(ReplicationStatus::Pending.as_aws_str(), "PENDING");
assert_eq!(ReplicationStatus::Completed.as_aws_str(), "COMPLETED");
assert_eq!(ReplicationStatus::Failed.as_aws_str(), "FAILED");
assert_eq!(ReplicationStatus::Replica.as_aws_str(), "REPLICA");
}
#[test]
fn record_status_if_newer_accepts_higher_generation() {
let mgr = ReplicationManager::new();
assert!(mgr.record_status_if_newer("b", "k", 5, ReplicationStatus::Pending,));
assert!(mgr.record_status_if_newer("b", "k", 7, ReplicationStatus::Completed,));
assert_eq!(
mgr.lookup_status("b", "k"),
Some(ReplicationStatus::Completed)
);
}
#[test]
fn record_status_if_newer_rejects_stale_generation() {
let mgr = ReplicationManager::new();
assert!(mgr.record_status_if_newer("b", "k", 10, ReplicationStatus::Completed,));
let accepted = mgr.record_status_if_newer("b", "k", 3, ReplicationStatus::Completed);
assert!(!accepted, "stale generation must be rejected");
assert_eq!(
mgr.lookup_status("b", "k"),
Some(ReplicationStatus::Completed)
);
}
#[test]
fn record_status_if_newer_accepts_equal_generation() {
let mgr = ReplicationManager::new();
assert!(mgr.record_status_if_newer("b", "k", 42, ReplicationStatus::Pending,));
assert!(mgr.record_status_if_newer("b", "k", 42, ReplicationStatus::Completed,));
assert_eq!(
mgr.lookup_status("b", "k"),
Some(ReplicationStatus::Completed)
);
}
#[test]
fn next_generation_is_monotonic() {
let mgr = ReplicationManager::new();
let g1 = mgr.next_generation();
let g2 = mgr.next_generation();
let g3 = mgr.next_generation();
assert!(g2 > g1, "g2={g2} must exceed g1={g1}");
assert!(g3 > g2, "g3={g3} must exceed g2={g2}");
assert_eq!(g2, g1 + 1);
assert_eq!(g3, g2 + 1);
}
#[test]
fn snapshot_pre_61_format_loads_with_zero_generation() {
let pre_61_json = r#"{
"by_bucket": {},
"statuses": [
[["src", "k"], "Completed"]
]
}"#;
let mgr =
ReplicationManager::from_json(pre_61_json).expect("pre-#61 snapshot must deserialise");
assert_eq!(
mgr.lookup_status("src", "k"),
Some(ReplicationStatus::Completed)
);
assert_eq!(mgr.next_generation(), 1);
assert!(mgr.record_status_if_newer("src", "k", 1, ReplicationStatus::Pending,));
}
fn install_entry_with_recorded_at(
mgr: &ReplicationManager,
bucket: &str,
key: &str,
status: ReplicationStatus,
recorded_at: DateTime<Utc>,
) {
crate::lock_recovery::recover_write(&mgr.statuses, "replication.statuses").insert(
(bucket.to_owned(), key.to_owned()),
ReplicationStatusEntry {
status,
generation: 1,
recorded_at,
},
);
}
#[test]
fn sweep_stale_drops_completed_past_ttl() {
let mgr = ReplicationManager::new();
let now = Utc::now();
install_entry_with_recorded_at(
&mgr,
"src",
"old-completed",
ReplicationStatus::Completed,
now - chrono::Duration::hours(10),
);
install_entry_with_recorded_at(
&mgr,
"src",
"old-failed",
ReplicationStatus::Failed,
now - chrono::Duration::hours(10),
);
install_entry_with_recorded_at(
&mgr,
"src",
"recent-completed",
ReplicationStatus::Completed,
now - chrono::Duration::hours(1),
);
let n = mgr.sweep_stale(now, chrono::Duration::hours(5));
assert_eq!(n, 2, "two terminal entries past 5h TTL must be swept");
assert!(
mgr.lookup_status("src", "old-completed").is_none(),
"Completed past TTL must be removed"
);
assert!(
mgr.lookup_status("src", "old-failed").is_none(),
"Failed past TTL must be removed"
);
assert_eq!(
mgr.lookup_status("src", "recent-completed"),
Some(ReplicationStatus::Completed),
"Completed within TTL must survive"
);
}
#[test]
fn sweep_stale_keeps_pending_regardless_of_age() {
let mgr = ReplicationManager::new();
let now = Utc::now();
install_entry_with_recorded_at(
&mgr,
"src",
"ancient-pending",
ReplicationStatus::Pending,
now - chrono::Duration::hours(100),
);
let n = mgr.sweep_stale(now, chrono::Duration::hours(5));
assert_eq!(n, 0, "Pending entries must never be swept");
assert_eq!(
mgr.lookup_status("src", "ancient-pending"),
Some(ReplicationStatus::Pending),
"ancient Pending must still be present"
);
}
#[test]
fn recorded_at_back_compat_default_now_on_deserialize() {
let pre_66_json = r#"{
"by_bucket": {},
"statuses": [
[["src", "k"], { "status": "Completed", "generation": 7 }]
],
"next_generation": 8
}"#;
let before = Utc::now();
let mgr = ReplicationManager::from_json(pre_66_json)
.expect("pre-#66 snapshot with no `recorded_at` must deserialise");
let after = Utc::now();
assert_eq!(
mgr.lookup_status("src", "k"),
Some(ReplicationStatus::Completed),
);
let entries = crate::lock_recovery::recover_read(&mgr.statuses, "replication.statuses");
let entry = entries
.get(&("src".to_owned(), "k".to_owned()))
.expect("entry must exist");
assert!(
entry.recorded_at >= before && entry.recorded_at <= after,
"recorded_at default must be Utc::now() at deserialise time \
(got {:?}, expected within [{:?}, {:?}])",
entry.recorded_at,
before,
after
);
assert_eq!(entry.generation, 7, "generation must round-trip");
drop(entries);
let n = mgr.sweep_stale(Utc::now(), chrono::Duration::hours(1));
assert_eq!(
n, 0,
"freshly-defaulted recorded_at must survive a 1h-TTL sweep"
);
}
#[tokio::test]
async fn replicate_with_source_lock_state_attaches_headers() {
type Captured = Vec<(
String,
String,
bytes::Bytes,
Option<HashMap<String, String>>,
)>;
let mgr = Arc::new(ReplicationManager::new());
let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
let captured_cl = Arc::clone(&captured);
let do_put = move |dest: String,
key: String,
body: bytes::Bytes,
meta: Option<HashMap<String, String>>| {
let captured = Arc::clone(&captured_cl);
async move {
captured.lock().unwrap().push((dest, key, body, meta));
Ok::<(), String>(())
}
};
let retain_until = Utc::now() + chrono::Duration::days(30);
let lock_state = crate::object_lock::ObjectLockState {
mode: Some(crate::object_lock::LockMode::Compliance),
retain_until: Some(retain_until),
legal_hold_on: true,
};
replicate_object(
rule("r-locked", 1, true, None, &[], "dst"),
"src".into(),
"worm.bin".into(),
bytes::Bytes::from_static(b"locked-payload"),
None,
do_put,
Arc::clone(&mgr),
mgr.next_generation(),
None,
Some(lock_state),
)
.await;
let cap = captured.lock().unwrap();
assert_eq!(cap.len(), 1, "do_put must run exactly once on success");
let meta = cap[0].3.as_ref().expect("metadata stamped");
assert_eq!(
meta.get("x-amz-object-lock-mode").map(String::as_str),
Some("COMPLIANCE"),
"Compliance mode header must be propagated"
);
let stamped_until = meta
.get("x-amz-object-lock-retain-until-date")
.expect("retain-until header must be propagated");
let parsed: chrono::DateTime<chrono::FixedOffset> =
chrono::DateTime::parse_from_rfc3339(stamped_until)
.expect("retain-until must be RFC-3339");
let diff = (parsed.with_timezone(&Utc) - retain_until)
.num_seconds()
.abs();
assert!(diff <= 1, "retain-until off by {diff}s");
assert_eq!(
meta.get("x-amz-object-lock-legal-hold").map(String::as_str),
Some("ON"),
"legal hold state must be propagated as ON"
);
assert_eq!(
meta.get("x-amz-replication-status").map(String::as_str),
Some("REPLICA"),
);
}
#[tokio::test]
async fn replicate_without_source_lock_state_no_headers_added() {
type Captured = Vec<(
String,
String,
bytes::Bytes,
Option<HashMap<String, String>>,
)>;
let mgr = Arc::new(ReplicationManager::new());
let captured: Arc<Mutex<Captured>> = Arc::new(Mutex::new(Vec::new()));
let captured_cl = Arc::clone(&captured);
let do_put = move |dest: String,
key: String,
body: bytes::Bytes,
meta: Option<HashMap<String, String>>| {
let captured = Arc::clone(&captured_cl);
async move {
captured.lock().unwrap().push((dest, key, body, meta));
Ok::<(), String>(())
}
};
replicate_object(
rule("r-plain", 1, true, None, &[], "dst"),
"src".into(),
"plain.bin".into(),
bytes::Bytes::from_static(b"plain-payload"),
None,
do_put,
Arc::clone(&mgr),
mgr.next_generation(),
None,
None,
)
.await;
let cap = captured.lock().unwrap();
let meta = cap[0].3.as_ref().expect("metadata stamped");
assert!(
meta.get("x-amz-object-lock-mode").is_none(),
"no lock state ⇒ no mode header (got {:?})",
meta.get("x-amz-object-lock-mode")
);
assert!(
meta.get("x-amz-object-lock-retain-until-date").is_none(),
"no lock state ⇒ no retain-until header"
);
assert!(
meta.get("x-amz-object-lock-legal-hold").is_none(),
"no lock state ⇒ no legal-hold header"
);
}
#[test]
fn replication_to_json_after_panic_recovers_via_poison() {
let mgr = Arc::new(ReplicationManager::new());
mgr.put(
"src",
ReplicationConfig {
role: "arn:aws:iam::000:role/s4".into(),
rules: vec![rule("r1", 1, true, None, &[], "dst")],
},
);
mgr.record_status("src", "k", ReplicationStatus::Pending);
let mgr_cl = Arc::clone(&mgr);
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut g = mgr_cl.statuses.write().expect("clean lock");
g.insert(
("src".into(), "k2".into()),
ReplicationStatusEntry {
status: ReplicationStatus::Pending,
generation: 0,
recorded_at: Utc::now(),
},
);
panic!("force-poison");
}));
assert!(
mgr.statuses.is_poisoned(),
"write panic must poison statuses lock"
);
let json = mgr.to_json().expect("to_json after poison must succeed");
let mgr2 = ReplicationManager::from_json(&json).expect("from_json");
assert_eq!(
mgr2.lookup_status("src", "k"),
Some(ReplicationStatus::Pending),
"recovered snapshot keeps original status"
);
}
}