bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;
use crate::ops_health_diagnostics::{
    ops_scope_and_target, scoped_audit_events, scoped_buckets, scoped_notification_events,
    scoped_replication_records,
};

impl BucketWarden {
    pub fn ops_incident_report(
        &mut self,
        principal: &str,
        bucket: Option<&str>,
        incident_type: &str,
    ) -> Result<OpsIncidentReport, RuntimeError> {
        let (scope, target) = ops_scope_and_target(bucket);
        self.require_operator_action(
            principal,
            OperatorAction::ReadAudit,
            &target,
            "ops:GetIncidentReport",
        )?;
        self.require_ops_bucket_scope(bucket)?;
        let audit_events = scoped_audit_events(self, bucket);
        let notification_events = scoped_notification_events(self, bucket);
        let replication_records = scoped_replication_records(self, bucket);
        let (status, summary, recent_actions, evidence) = incident_details(
            self,
            incident_type,
            bucket,
            &audit_events,
            &notification_events,
        );
        let report = OpsIncidentReport {
            incident_type: incident_type.to_string(),
            scope: scope.to_string(),
            target: target.to_string(),
            generated_at_epoch_seconds: self.clock_epoch_seconds,
            status,
            summary,
            audit_event_count: audit_events.len(),
            notification_event_count: notification_events.len(),
            replication_record_count: replication_records.len(),
            recent_actions,
            evidence,
        };
        self.audit.append(
            principal,
            "ops:GetIncidentReport",
            &target,
            AuditOutcome::Allowed,
            Some(format!(
                "type={},status={}",
                report.incident_type, report.status
            )),
        );
        Ok(report)
    }
}

fn incident_details(
    runtime: &BucketWarden,
    incident_type: &str,
    bucket: Option<&str>,
    audit_events: &[&AuditEvent],
    notification_events: &[&NotificationEvent],
) -> (String, String, Vec<String>, Vec<String>) {
    match incident_type {
        "credential_leak" => {
            let matched = audit_events
                .iter()
                .filter(|event| event.action == "auth:ReportLeakedAccessKey")
                .collect::<Vec<_>>();
            let status = if matched.is_empty() {
                "not_observed"
            } else {
                "observed"
            };
            let recent_actions = matched
                .iter()
                .rev()
                .take(5)
                .map(|event| format!("{}:{}:{:?}", event.sequence, event.resource, event.outcome))
                .collect::<Vec<_>>();
            let evidence = matched
                .iter()
                .map(|event| format!("audit:{}:{}", event.sequence, event.action))
                .collect::<Vec<_>>();
            (
                status.to_string(),
                format!("{} leaked credential response events found", matched.len()),
                recent_actions,
                evidence,
            )
        }
        "replication_failure" => {
            let mut evidence = Vec::new();
            let mut issue_count = 0usize;
            for (bucket_name, bucket_state) in scoped_buckets(runtime, bucket) {
                for rule in bucket_state
                    .replication
                    .rules
                    .iter()
                    .filter(|rule| rule.status == "Enabled")
                {
                    match replication_destination_bucket(rule) {
                        Some(destination_bucket)
                            if runtime.buckets.contains_key(destination_bucket) => {}
                        _ => {
                            issue_count += 1;
                            evidence.push(format!("replication:{}:{}", bucket_name, rule.id));
                        }
                    }
                }
            }
            let matched_notifications = notification_events
                .iter()
                .filter(|event| event.event_name.starts_with("s3:Replication:"))
                .count();
            let status = if issue_count == 0 {
                "not_observed"
            } else {
                "observed"
            };
            (
                status.to_string(),
                format!(
                    "{} replication configuration issues found; {} replication events recorded",
                    issue_count, matched_notifications
                ),
                evidence.clone(),
                evidence,
            )
        }
        "quota_pressure" => {
            let mut evidence = Vec::new();
            let mut count = 0usize;
            for (bucket_name, bucket_state) in scoped_buckets(runtime, bucket) {
                if bucket_state
                    .quota
                    .max_requests
                    .is_some_and(|limit| bucket_state.request_count >= limit)
                {
                    count += 1;
                    evidence.push(format!(
                        "quota:{}:requests={}/{}",
                        bucket_name,
                        bucket_state.request_count,
                        bucket_state.quota.max_requests.unwrap_or_default()
                    ));
                }
            }
            let status = if count == 0 {
                "not_observed"
            } else {
                "observed"
            };
            (
                status.to_string(),
                format!("{count} request quota pressure conditions found"),
                evidence.clone(),
                evidence,
            )
        }
        _ => (
            "not_observed".to_string(),
            format!("incident type {incident_type} is tracked but not yet specialized"),
            Vec::new(),
            Vec::new(),
        ),
    }
}