bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

impl BucketWarden {
    pub fn replication_status_report(
        &mut self,
        principal: &str,
        source_bucket: &str,
    ) -> Result<ReplicationStatusReport, RuntimeError> {
        self.require_operator_action(
            principal,
            OperatorAction::ReadDiagnostics,
            source_bucket,
            "ops:GetReplicationStatus",
        )?;
        let config = self.require_bucket(source_bucket)?.replication.clone();
        if config.rules.is_empty() {
            return Err(RuntimeError::NoSuchBucketReplication(
                source_bucket.to_string(),
            ));
        }
        let mut entries = Vec::new();
        for rule in config.rules.iter().filter(|rule| rule.status == "Enabled") {
            let destination_bucket = replication_destination_bucket(rule).map(str::to_string);
            let source_versions = self.replication_source_versions(
                source_bucket,
                rule,
                config.configured_epoch_seconds,
            )?;
            for source in source_versions {
                let encrypted = source.version.metadata.encryption.is_some();
                let destination_resolved = destination_bucket
                    .as_deref()
                    .is_some_and(|bucket| self.buckets.contains_key(bucket));
                let eligible =
                    (!encrypted || rule.replicate_encrypted_objects) && destination_resolved;
                let destination_present = destination_bucket
                    .as_deref()
                    .and_then(|bucket| self.buckets.get(bucket))
                    .and_then(|bucket| bucket.objects.get(&source.key))
                    .is_some_and(|object| {
                        object
                            .versions
                            .iter()
                            .any(|version| version.version_id == source.version.version_id)
                    });
                let status = if !eligible {
                    if encrypted && !rule.replicate_encrypted_objects {
                        "SKIPPED_ENCRYPTED".to_string()
                    } else {
                        "MISSING_DESTINATION".to_string()
                    }
                } else if destination_bucket.is_none() {
                    "MISSING_DESTINATION".to_string()
                } else if destination_present {
                    source
                        .version
                        .replication_status
                        .clone()
                        .unwrap_or_else(|| "COMPLETED".to_string())
                } else {
                    "PENDING".to_string()
                };
                entries.push(ReplicationStatusEntry {
                    rule_id: rule.id.clone(),
                    destination_bucket: destination_bucket.clone(),
                    key: source.key,
                    version_id: source.version.version_id,
                    delete_marker: source.version.delete_marker,
                    legal_hold: source.version.lock.legal_hold,
                    retention_mode: source
                        .version
                        .lock
                        .retention_mode
                        .map(|mode| format!("{mode:?}")),
                    retain_until_epoch_seconds: source.version.lock.retain_until_epoch_seconds,
                    encrypted,
                    eligible,
                    destination_present,
                    status,
                });
            }
        }
        entries.sort_by(|left, right| {
            left.key
                .cmp(&right.key)
                .then(
                    replication_version_sort_key(&left.version_id)
                        .cmp(&replication_version_sort_key(&right.version_id)),
                )
                .then(left.rule_id.cmp(&right.rule_id))
        });
        let completed_count = entries
            .iter()
            .filter(|entry| entry.status == "COMPLETED")
            .count();
        let pending_count = entries
            .iter()
            .filter(|entry| entry.status == "PENDING")
            .count();
        let replicated_delete_marker_count = entries
            .iter()
            .filter(|entry| entry.delete_marker && entry.destination_present)
            .count();
        let skipped_encrypted_count = entries
            .iter()
            .filter(|entry| entry.status == "SKIPPED_ENCRYPTED")
            .count();
        let missing_destination_count = entries
            .iter()
            .filter(|entry| entry.status == "MISSING_DESTINATION")
            .count();
        let report = ReplicationStatusReport {
            source_bucket: source_bucket.to_string(),
            generated_at_epoch_seconds: self.clock_epoch_seconds,
            entry_count: entries.len(),
            completed_count,
            pending_count,
            replicated_delete_marker_count,
            skipped_encrypted_count,
            missing_destination_count,
            last_replication_sequence: self.replication.last_sequence(),
            entries,
        };
        self.audit.append(
            principal,
            "ops:GetReplicationStatus",
            source_bucket,
            AuditOutcome::Allowed,
            Some(format!(
                "entries={},completed={},pending={},skipped_encrypted={},missing_destinations={}",
                report.entry_count,
                report.completed_count,
                report.pending_count,
                report.skipped_encrypted_count,
                report.missing_destination_count
            )),
        );
        Ok(report)
    }
}

fn replication_version_sort_key(version_id: &str) -> (u8, u64, &str) {
    match version_id
        .strip_prefix('v')
        .and_then(|suffix| suffix.parse::<u64>().ok())
    {
        Some(value) => (0, value, version_id),
        None => (1, 0, version_id),
    }
}