bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

impl BucketWarden {
    pub fn put_bucket_quota(
        &mut self,
        principal: &str,
        bucket: &str,
        quota: BucketQuotaConfiguration,
    ) -> Result<BucketQuotaConfiguration, RuntimeError> {
        self.require_operator_action(
            principal,
            OperatorAction::AdministerBucket,
            bucket,
            "ops:PutBucketQuota",
        )?;
        self.require_bucket_mut(bucket)?.quota = quota.clone();
        self.audit.append(
            principal,
            "ops:PutBucketQuota",
            bucket,
            AuditOutcome::Allowed,
            Some(format!(
                "objects={:?},requests={:?}",
                quota.max_objects, quota.max_requests
            )),
        );
        Ok(quota)
    }

    pub fn get_bucket_quota(
        &mut self,
        principal: &str,
        bucket: &str,
    ) -> Result<BucketQuotaConfiguration, RuntimeError> {
        self.require_operator_action(
            principal,
            OperatorAction::ReadDiagnostics,
            bucket,
            "ops:GetBucketQuota",
        )?;
        let quota = self.require_bucket(bucket)?.quota.clone();
        self.audit.append(
            principal,
            "ops:GetBucketQuota",
            bucket,
            AuditOutcome::Allowed,
            Some(format!(
                "objects={:?},requests={:?}",
                quota.max_objects, quota.max_requests
            )),
        );
        Ok(quota)
    }

    pub fn put_tenant_quota(
        &mut self,
        principal: &str,
        tenant_id: &str,
        quota: TenantQuotaConfiguration,
    ) -> Result<TenantQuotaConfiguration, RuntimeError> {
        self.require_operator_action(
            principal,
            OperatorAction::AdministerTenant,
            tenant_id,
            "ops:PutTenantQuota",
        )?;
        self.tenant_quotas
            .insert(tenant_id.to_string(), quota.clone());
        self.audit.append(
            principal,
            "ops:PutTenantQuota",
            tenant_id,
            AuditOutcome::Allowed,
            Some(format!(
                "buckets={:?},objects={:?},requests={:?}",
                quota.max_buckets, quota.max_objects, quota.max_requests
            )),
        );
        Ok(quota)
    }

    pub fn get_tenant_quota(
        &mut self,
        principal: &str,
        tenant_id: &str,
    ) -> Result<TenantQuotaConfiguration, RuntimeError> {
        self.require_operator_action(
            principal,
            OperatorAction::ReadDiagnostics,
            tenant_id,
            "ops:GetTenantQuota",
        )?;
        let quota = self
            .tenant_quotas
            .get(tenant_id)
            .cloned()
            .unwrap_or_default();
        self.audit.append(
            principal,
            "ops:GetTenantQuota",
            tenant_id,
            AuditOutcome::Allowed,
            Some(format!(
                "buckets={:?},objects={:?},requests={:?}",
                quota.max_buckets, quota.max_objects, quota.max_requests
            )),
        );
        Ok(quota)
    }

    pub fn bucket_quota_usage(
        &mut self,
        principal: &str,
        bucket: &str,
    ) -> Result<QuotaUsageReport, RuntimeError> {
        self.require_operator_action(
            principal,
            OperatorAction::ReadDiagnostics,
            bucket,
            "ops:GetBucketQuotaUsage",
        )?;
        let usage = self.bucket_quota_usage_report(bucket)?;
        self.audit.append(
            principal,
            "ops:GetBucketQuotaUsage",
            bucket,
            AuditOutcome::Allowed,
            Some(format!(
                "objects={},requests={}",
                usage.object_count, usage.request_count
            )),
        );
        Ok(usage)
    }

    pub fn tenant_quota_usage(
        &mut self,
        principal: &str,
        tenant_id: &str,
    ) -> Result<QuotaUsageReport, RuntimeError> {
        self.require_operator_action(
            principal,
            OperatorAction::ReadDiagnostics,
            tenant_id,
            "ops:GetTenantQuotaUsage",
        )?;
        let usage = self.tenant_quota_usage_report(tenant_id);
        self.audit.append(
            principal,
            "ops:GetTenantQuotaUsage",
            tenant_id,
            AuditOutcome::Allowed,
            Some(format!(
                "buckets={},objects={},requests={}",
                usage.bucket_count, usage.object_count, usage.request_count
            )),
        );
        Ok(usage)
    }

    pub(crate) fn require_operator_action(
        &mut self,
        principal: &str,
        action: OperatorAction,
        resource: &str,
        audit_action: &str,
    ) -> Result<(), RuntimeError> {
        let allowed = self.operator_action_allowed(principal, action, resource)?;
        if allowed {
            return Ok(());
        }
        self.audit.append(
            principal,
            audit_action,
            resource,
            AuditOutcome::Denied,
            None,
        );
        Err(RuntimeError::OperatorActionDenied {
            principal: principal.to_string(),
            action: format!("{action:?}"),
            resource: resource.to_string(),
        })
    }

    pub(crate) fn bucket_quota_usage_report(
        &self,
        bucket: &str,
    ) -> Result<QuotaUsageReport, RuntimeError> {
        let bucket_state = self.require_bucket(bucket)?;
        Ok(QuotaUsageReport {
            scope: "bucket".to_string(),
            target: bucket.to_string(),
            bucket_count: 1,
            object_count: count_current_objects(bucket_state),
            request_count: bucket_state.request_count,
        })
    }

    pub(crate) fn tenant_quota_usage_report(&self, tenant_id: &str) -> QuotaUsageReport {
        let buckets = self
            .buckets
            .values()
            .filter(|bucket| bucket.tenant_id == tenant_id)
            .collect::<Vec<_>>();
        let object_count = buckets
            .iter()
            .map(|bucket| count_current_objects(bucket))
            .sum();
        let request_count = buckets.iter().map(|bucket| bucket.request_count).sum();
        QuotaUsageReport {
            scope: "tenant".to_string(),
            target: tenant_id.to_string(),
            bucket_count: buckets.len(),
            object_count,
            request_count,
        }
    }

    pub(crate) fn enforce_bucket_creation_quota(
        &self,
        tenant_id: &str,
    ) -> Result<(), RuntimeError> {
        let Some(quota) = self.tenant_quotas.get(tenant_id) else {
            return Ok(());
        };
        if let Some(limit) = quota.max_buckets {
            let current = self
                .buckets
                .values()
                .filter(|bucket| bucket.tenant_id == tenant_id)
                .count() as u64;
            if current + 1 > limit as u64 {
                return Err(RuntimeError::QuotaExceeded {
                    scope: "tenant".to_string(),
                    resource: tenant_id.to_string(),
                    metric: "bucket-count".to_string(),
                    limit: limit as u64,
                    attempted: current + 1,
                });
            }
        }
        Ok(())
    }

    pub(crate) fn enforce_object_creation_quota(
        &self,
        bucket: &str,
        creates_new_current_key: bool,
    ) -> Result<(), RuntimeError> {
        if !creates_new_current_key {
            return Ok(());
        }
        let bucket_state = self.require_bucket(bucket)?;
        if let Some(limit) = bucket_state.quota.max_objects {
            let current = count_current_objects(bucket_state) as u64;
            if current + 1 > limit as u64 {
                return Err(RuntimeError::QuotaExceeded {
                    scope: "bucket".to_string(),
                    resource: bucket.to_string(),
                    metric: "object-count".to_string(),
                    limit: limit as u64,
                    attempted: current + 1,
                });
            }
        }
        if let Some(quota) = self.tenant_quotas.get(&bucket_state.tenant_id) {
            if let Some(limit) = quota.max_objects {
                let current = self
                    .buckets
                    .values()
                    .filter(|candidate| candidate.tenant_id == bucket_state.tenant_id)
                    .map(count_current_objects)
                    .sum::<usize>() as u64;
                if current + 1 > limit as u64 {
                    return Err(RuntimeError::QuotaExceeded {
                        scope: "tenant".to_string(),
                        resource: bucket_state.tenant_id.clone(),
                        metric: "object-count".to_string(),
                        limit: limit as u64,
                        attempted: current + 1,
                    });
                }
            }
        }
        Ok(())
    }

    pub(crate) fn enforce_and_record_bucket_request_quota(
        &mut self,
        bucket: &str,
    ) -> Result<(), RuntimeError> {
        let (tenant_id, bucket_limit, request_count) = {
            let bucket_state = self.require_bucket(bucket)?;
            (
                bucket_state.tenant_id.clone(),
                bucket_state.quota.max_requests,
                bucket_state.request_count,
            )
        };
        if let Some(limit) = bucket_limit {
            let attempted = request_count + 1;
            if attempted > limit {
                return Err(RuntimeError::QuotaExceeded {
                    scope: "bucket".to_string(),
                    resource: bucket.to_string(),
                    metric: "request-count".to_string(),
                    limit,
                    attempted,
                });
            }
        }
        if let Some(limit) = self
            .tenant_quotas
            .get(&tenant_id)
            .and_then(|quota| quota.max_requests)
        {
            let attempted = self
                .buckets
                .values()
                .filter(|candidate| candidate.tenant_id == tenant_id)
                .map(|candidate| candidate.request_count)
                .sum::<u64>()
                + 1;
            if attempted > limit {
                return Err(RuntimeError::QuotaExceeded {
                    scope: "tenant".to_string(),
                    resource: tenant_id,
                    metric: "request-count".to_string(),
                    limit,
                    attempted,
                });
            }
        }
        self.require_bucket_mut(bucket)?.request_count += 1;
        Ok(())
    }
}

fn count_current_objects(bucket: &BucketState) -> usize {
    bucket
        .objects
        .values()
        .filter(|object| object.has_current_version())
        .count()
}