use super::*;
impl BucketWarden {
pub fn replication_status_report(
&mut self,
principal: &str,
source_bucket: &str,
) -> Result<ReplicationStatusReport, RuntimeError> {
self.require_operator_action(
principal,
OperatorAction::ReadDiagnostics,
source_bucket,
"ops:GetReplicationStatus",
)?;
let config = self.require_bucket(source_bucket)?.replication.clone();
if config.rules.is_empty() {
return Err(RuntimeError::NoSuchBucketReplication(
source_bucket.to_string(),
));
}
let mut entries = Vec::new();
for rule in config.rules.iter().filter(|rule| rule.status == "Enabled") {
let destination_bucket = replication_destination_bucket(rule).map(str::to_string);
let source_versions = self.replication_source_versions(
source_bucket,
rule,
config.configured_epoch_seconds,
)?;
for source in source_versions {
let encrypted = source.version.metadata.encryption.is_some();
let destination_resolved = destination_bucket
.as_deref()
.is_some_and(|bucket| self.buckets.contains_key(bucket));
let eligible =
(!encrypted || rule.replicate_encrypted_objects) && destination_resolved;
let destination_present = destination_bucket
.as_deref()
.and_then(|bucket| self.buckets.get(bucket))
.and_then(|bucket| bucket.objects.get(&source.key))
.is_some_and(|object| {
object
.versions
.iter()
.any(|version| version.version_id == source.version.version_id)
});
let status = if !eligible {
if encrypted && !rule.replicate_encrypted_objects {
"SKIPPED_ENCRYPTED".to_string()
} else {
"MISSING_DESTINATION".to_string()
}
} else if destination_bucket.is_none() {
"MISSING_DESTINATION".to_string()
} else if destination_present {
source
.version
.replication_status
.clone()
.unwrap_or_else(|| "COMPLETED".to_string())
} else {
"PENDING".to_string()
};
entries.push(ReplicationStatusEntry {
rule_id: rule.id.clone(),
destination_bucket: destination_bucket.clone(),
key: source.key,
version_id: source.version.version_id,
delete_marker: source.version.delete_marker,
legal_hold: source.version.lock.legal_hold,
retention_mode: source
.version
.lock
.retention_mode
.map(|mode| format!("{mode:?}")),
retain_until_epoch_seconds: source.version.lock.retain_until_epoch_seconds,
encrypted,
eligible,
destination_present,
status,
});
}
}
entries.sort_by(|left, right| {
left.key
.cmp(&right.key)
.then(
replication_version_sort_key(&left.version_id)
.cmp(&replication_version_sort_key(&right.version_id)),
)
.then(left.rule_id.cmp(&right.rule_id))
});
let completed_count = entries
.iter()
.filter(|entry| entry.status == "COMPLETED")
.count();
let pending_count = entries
.iter()
.filter(|entry| entry.status == "PENDING")
.count();
let replicated_delete_marker_count = entries
.iter()
.filter(|entry| entry.delete_marker && entry.destination_present)
.count();
let skipped_encrypted_count = entries
.iter()
.filter(|entry| entry.status == "SKIPPED_ENCRYPTED")
.count();
let missing_destination_count = entries
.iter()
.filter(|entry| entry.status == "MISSING_DESTINATION")
.count();
let report = ReplicationStatusReport {
source_bucket: source_bucket.to_string(),
generated_at_epoch_seconds: self.clock_epoch_seconds,
entry_count: entries.len(),
completed_count,
pending_count,
replicated_delete_marker_count,
skipped_encrypted_count,
missing_destination_count,
last_replication_sequence: self.replication.last_sequence(),
entries,
};
self.audit.append(
principal,
"ops:GetReplicationStatus",
source_bucket,
AuditOutcome::Allowed,
Some(format!(
"entries={},completed={},pending={},skipped_encrypted={},missing_destinations={}",
report.entry_count,
report.completed_count,
report.pending_count,
report.skipped_encrypted_count,
report.missing_destination_count
)),
);
Ok(report)
}
}
fn replication_version_sort_key(version_id: &str) -> (u8, u64, &str) {
match version_id
.strip_prefix('v')
.and_then(|suffix| suffix.parse::<u64>().ok())
{
Some(value) => (0, value, version_id),
None => (1, 0, version_id),
}
}