bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

impl BucketWarden {
    pub fn ops_health_report(
        &mut self,
        principal: &str,
        bucket: Option<&str>,
    ) -> Result<OpsHealthReport, RuntimeError> {
        let (scope, target) = ops_scope_and_target(bucket);
        self.require_operator_action(
            principal,
            OperatorAction::ReadDiagnostics,
            &target,
            "ops:GetHealthReport",
        )?;
        self.require_ops_bucket_scope(bucket)?;
        let buckets = scoped_buckets(self, bucket);
        let bucket_count = buckets.len();
        let object_version_count = buckets
            .iter()
            .flat_map(|(_, bucket_state)| bucket_state.objects.values())
            .map(|object| object.versions.len())
            .sum();
        let multipart_upload_count = self
            .multipart_uploads
            .values()
            .filter(|upload| bucket.is_none_or(|name| upload.bucket == name))
            .count();
        let audit_events = scoped_audit_events(self, bucket);
        let notification_events = scoped_notification_events(self, bucket);
        let replication_records = scoped_replication_records(self, bucket);
        let mut issues = Vec::new();
        for (bucket_name, bucket_state) in &buckets {
            if bucket_state.request_count > 0
                && bucket_state
                    .quota
                    .max_requests
                    .is_some_and(|limit| bucket_state.request_count >= limit)
            {
                issues.push(format!(
                    "bucket request quota at limit for {bucket_name}: {}/{}",
                    bucket_state.request_count,
                    bucket_state.quota.max_requests.unwrap_or_default()
                ));
            }
            for rule in bucket_state
                .replication
                .rules
                .iter()
                .filter(|rule| rule.status == "Enabled")
            {
                let Some(destination_bucket) = replication_destination_bucket(rule) else {
                    issues.push(format!(
                        "replication rule {} on {} has invalid destination {}",
                        rule.id, bucket_name, rule.destination_bucket
                    ));
                    continue;
                };
                if !self.buckets.contains_key(destination_bucket) {
                    issues.push(format!(
                        "replication destination {} missing for {} rule {}",
                        destination_bucket, bucket_name, rule.id
                    ));
                }
            }
        }
        if bucket.is_none() && bucket_count == 0 {
            issues.push("runtime has no buckets".to_string());
        }
        let status = if issues.is_empty() { "ok" } else { "degraded" };
        let storage_backend_report = self.storage_backend_support_report();
        let replication_strategy_report = self.replication_strategy_support_report();
        let erasure_coding_report = self.erasure_coding_support_report();
        let placement_domain_report = self.placement_domain_support_report();
        let consistency_model_report = self.consistency_model_support_report();
        let metadata_architecture_report = self.metadata_architecture_support_report();
        let object_layout_report = self.object_layout_support_report();
        let small_object_report = self.small_object_optimization_report();
        let large_object_report = self.large_object_optimization_report();
        let report = OpsHealthReport {
            scope: scope.to_string(),
            target: target.clone(),
            generated_at_epoch_seconds: self.clock_epoch_seconds,
            status: status.to_string(),
            live: true,
            ready: issues.is_empty(),
            bucket_count,
            object_version_count,
            multipart_upload_count,
            audit_event_count: audit_events.len(),
            notification_event_count: notification_events.len(),
            replication_record_count: replication_records.len(),
            credential_count: scoped_credential_count(self, &buckets),
            first_audit_sequence: audit_events.first().map(|event| event.sequence),
            last_audit_sequence: audit_events.last().map(|event| event.sequence),
            last_replication_sequence: replication_records.last().map(|record| record.sequence),
            active_storage_backend: storage_backend_report.active_backend.to_string(),
            unsupported_storage_backends: storage_backend_report
                .unsupported_backends
                .iter()
                .map(|backend| (*backend).to_string())
                .collect(),
            active_replication_strategy: replication_strategy_report.active_strategy.to_string(),
            unsupported_replication_strategies: replication_strategy_report
                .unsupported_strategies
                .iter()
                .map(|strategy| (*strategy).to_string())
                .collect(),
            active_erasure_coding_profile: erasure_coding_report.active_profile.to_string(),
            unsupported_erasure_coding_profiles: erasure_coding_report
                .unsupported_profiles
                .iter()
                .map(|profile| (*profile).to_string())
                .collect(),
            active_placement_profile: placement_domain_report.active_profile.to_string(),
            unsupported_placement_domains: placement_domain_report
                .unsupported_domains
                .iter()
                .map(|domain| (*domain).to_string())
                .collect(),
            active_consistency_model: consistency_model_report.active_model.to_string(),
            unsupported_consistency_models: consistency_model_report
                .unsupported_models
                .iter()
                .map(|model| (*model).to_string())
                .collect(),
            active_metadata_architecture: metadata_architecture_report
                .active_architecture
                .to_string(),
            unsupported_metadata_architectures: metadata_architecture_report
                .unsupported_architectures
                .iter()
                .map(|architecture| (*architecture).to_string())
                .collect(),
            active_object_layout: object_layout_report.active_layout.to_string(),
            unsupported_object_layouts: object_layout_report
                .unsupported_layouts
                .iter()
                .map(|layout| (*layout).to_string())
                .collect(),
            active_small_object_mode: small_object_report.active_mode.to_string(),
            unsupported_small_object_modes: small_object_report
                .unsupported_modes
                .iter()
                .map(|mode| (*mode).to_string())
                .collect(),
            active_large_object_mode: large_object_report.active_mode.to_string(),
            unsupported_large_object_modes: large_object_report
                .unsupported_modes
                .iter()
                .map(|mode| (*mode).to_string())
                .collect(),
            issues,
        };
        self.audit.append(
            principal,
            "ops:GetHealthReport",
            &target,
            AuditOutcome::Allowed,
            Some(format!(
                "status={},buckets={},issues={}",
                report.status,
                report.bucket_count,
                report.issues.len()
            )),
        );
        Ok(report)
    }

    pub(crate) fn require_ops_bucket_scope(
        &self,
        bucket: Option<&str>,
    ) -> Result<(), RuntimeError> {
        if let Some(bucket) = bucket {
            self.require_bucket(bucket)?;
        }
        Ok(())
    }
}

pub(crate) fn ops_scope_and_target(bucket: Option<&str>) -> (&'static str, String) {
    match bucket {
        Some(bucket) => ("bucket", bucket.to_string()),
        None => ("runtime", "*".to_string()),
    }
}

pub(crate) fn scoped_buckets<'a>(
    runtime: &'a BucketWarden,
    bucket: Option<&str>,
) -> Vec<(&'a String, &'a BucketState)> {
    runtime
        .buckets
        .iter()
        .filter(|(bucket_name, _)| bucket.is_none_or(|target| bucket_name.as_str() == target))
        .collect()
}

pub(crate) fn scoped_audit_events<'a>(
    runtime: &'a BucketWarden,
    bucket: Option<&str>,
) -> Vec<&'a AuditEvent> {
    runtime
        .audit_events()
        .iter()
        .filter(|event| {
            bucket.is_none_or(|bucket| {
                event.resource == bucket
                    || event.resource.starts_with(&format!("{bucket}/"))
                    || event.resource == "*"
            })
        })
        .collect()
}

pub(crate) fn scoped_notification_events<'a>(
    runtime: &'a BucketWarden,
    bucket: Option<&str>,
) -> Vec<&'a NotificationEvent> {
    runtime
        .notification_events()
        .iter()
        .filter(|event| bucket.is_none_or(|bucket| event.bucket == bucket))
        .collect()
}

pub(crate) fn scoped_replication_records<'a>(
    runtime: &'a BucketWarden,
    bucket: Option<&str>,
) -> Vec<&'a ReplicationRecord> {
    runtime
        .replication_records()
        .iter()
        .filter(|record| bucket.is_none_or(|bucket| record.bucket == bucket))
        .collect()
}

fn scoped_credential_count(runtime: &BucketWarden, buckets: &[(&String, &BucketState)]) -> usize {
    let tenant_ids = buckets
        .iter()
        .map(|(_, bucket_state)| bucket_state.tenant_id.as_str())
        .collect::<std::collections::BTreeSet<_>>();
    if tenant_ids.is_empty() {
        runtime.auth.credential_count_for_tenant(DEFAULT_TENANT_ID)
    } else {
        tenant_ids
            .iter()
            .map(|tenant_id| runtime.auth.credential_count_for_tenant(tenant_id))
            .sum()
    }
}