bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;
impl BucketWarden {
    pub fn audit_events(&self) -> &[bucketwarden_audit::AuditEvent] {
        self.audit.events()
    }
    pub fn audit_summary(&self) -> AuditSummary {
        self.audit.summary()
    }
    pub fn audit_json_lines(&self) -> anyhow::Result<String> {
        self.audit.to_json_lines()
    }
    pub fn replication_records(&self) -> &[ReplicationRecord] {
        self.replication.records()
    }
    pub fn replication_summary(&self) -> ReplicationSummary {
        self.replication.summary()
    }
    pub fn replication_json_lines(&self) -> String {
        self.replication.to_json_lines()
    }
    pub fn health(&self) -> RuntimeHealth {
        let object_version_count = self
            .buckets
            .values()
            .flat_map(|bucket| bucket.objects.values())
            .map(|object| object.versions.len())
            .sum();
        RuntimeHealth {
            status: "ok",
            bucket_count: self.buckets.len(),
            object_version_count,
            multipart_upload_count: self.multipart_uploads.len(),
            audit_event_count: self.audit.events().len(),
            last_replication_sequence: self.replication.last_sequence(),
        }
    }
    pub(crate) fn require_bucket(&self, bucket: &str) -> Result<&BucketState, RuntimeError> {
        self.buckets
            .get(bucket)
            .ok_or_else(|| RuntimeError::NoSuchBucket(bucket.to_string()))
    }
    pub(crate) fn require_bucket_mut(
        &mut self,
        bucket: &str,
    ) -> Result<&mut BucketState, RuntimeError> {
        self.buckets
            .get_mut(bucket)
            .ok_or_else(|| RuntimeError::NoSuchBucket(bucket.to_string()))
    }
    pub(crate) fn object_state(
        &self,
        bucket: &str,
        key: &str,
    ) -> Result<&ObjectState, RuntimeError> {
        let bucket_state = self
            .buckets
            .get(bucket)
            .ok_or_else(|| RuntimeError::NoSuchBucket(bucket.to_string()))?;
        bucket_state
            .objects
            .get(key)
            .ok_or_else(|| RuntimeError::NoSuchKey(object_resource(bucket, key)))
    }
    pub(crate) fn current_version(
        &self,
        bucket: &str,
        key: &str,
    ) -> Result<&StoredVersion, RuntimeError> {
        let object = self.object_state(bucket, key)?;
        object
            .current_version()
            .ok_or_else(|| RuntimeError::NoSuchKey(object_resource(bucket, key)))
    }
    pub(crate) fn version_by_id(
        &self,
        bucket: &str,
        key: &str,
        version_id: &str,
    ) -> Result<&StoredVersion, RuntimeError> {
        let object = self.object_state(bucket, key)?;
        object
            .versions
            .iter()
            .find(|version| version.version_id == version_id)
            .ok_or_else(|| RuntimeError::NoSuchVersion {
                bucket: bucket.to_string(),
                key: key.to_string(),
                version_id: version_id.to_string(),
            })
    }
    pub(crate) fn version_by_id_mut(
        &mut self,
        bucket: &str,
        key: &str,
        version_id: &str,
    ) -> Result<&mut StoredVersion, RuntimeError> {
        let bucket_state = self
            .buckets
            .get_mut(bucket)
            .ok_or_else(|| RuntimeError::NoSuchBucket(bucket.to_string()))?;
        let object = bucket_state
            .objects
            .get_mut(key)
            .ok_or_else(|| RuntimeError::NoSuchKey(object_resource(bucket, key)))?;
        object
            .versions
            .iter_mut()
            .find(|version| version.version_id == version_id)
            .ok_or_else(|| RuntimeError::NoSuchVersion {
                bucket: bucket.to_string(),
                key: key.to_string(),
                version_id: version_id.to_string(),
            })
    }
    pub(crate) fn current_version_mut(
        &mut self,
        bucket: &str,
        key: &str,
    ) -> Result<&mut StoredVersion, RuntimeError> {
        let bucket_state = self
            .buckets
            .get_mut(bucket)
            .ok_or_else(|| RuntimeError::NoSuchBucket(bucket.to_string()))?;
        let object = bucket_state
            .objects
            .get_mut(key)
            .ok_or_else(|| RuntimeError::NoSuchKey(object_resource(bucket, key)))?;
        object
            .current_version_mut()
            .ok_or_else(|| RuntimeError::NoSuchKey(object_resource(bucket, key)))
    }
    pub(crate) fn object_acl_owner_and_version(
        &self,
        bucket: &str,
        key: &str,
        version_id: Option<&str>,
    ) -> Result<(String, String), RuntimeError> {
        let bucket_state = self.require_bucket(bucket)?;
        let version = if let Some(version_id) = version_id {
            self.version_by_id(bucket, key, version_id)?
        } else {
            self.current_version(bucket, key)?
        };
        if version.delete_marker {
            return Err(RuntimeError::NoSuchKey(object_resource(bucket, key)));
        }
        Ok((
            stored_owner(bucket_state, version),
            version.version_id.clone(),
        ))
    }
    pub(crate) fn allocate_object_version_id(&mut self, local_ordinal: u64) -> String {
        self.next_version += 1;
        object_version_id(local_ordinal)
    }
    pub(crate) fn next_object_local_ordinal(
        &self,
        bucket: &str,
        key: &str,
    ) -> Result<u64, RuntimeError> {
        let bucket_state = self.require_bucket(bucket)?;
        Ok(bucket_state
            .objects
            .get(key)
            .map(ObjectState::next_local_ordinal)
            .unwrap_or(1))
    }
    pub(crate) fn next_upload_id(&mut self) -> String {
        let upload_id = format!("u{}", self.next_upload);
        self.next_upload += 1;
        upload_id
    }
    pub(crate) fn next_event_id(&mut self) -> String {
        let event_id = format!("e{}", self.next_event);
        self.next_event += 1;
        event_id
    }
    pub(crate) fn next_request_id(&mut self) -> String {
        let request_id = format!("bwreq{:016x}", self.next_request);
        self.next_request += 1;
        request_id
    }
    pub(crate) fn emit_notification_event(
        &mut self,
        event_name: &str,
        bucket: &str,
        key: &str,
        version_id: &str,
    ) {
        let Some(bucket_state) = self.buckets.get(bucket) else {
            return;
        };
        let rules = bucket_state
            .notification_rules
            .iter()
            .filter(|rule| notification_rule_matches(rule, event_name, key))
            .cloned()
            .collect::<Vec<_>>();
        for rule in rules {
            let event = NotificationEvent {
                event_id: self.next_event_id(),
                event_name: event_name.to_string(),
                bucket: bucket.to_string(),
                key: key.to_string(),
                version_id: version_id.to_string(),
                rule_id: rule.id,
                target_kind: rule.target_kind,
                target_arn: rule.target_arn,
                event_time_epoch_seconds: self.clock_epoch_seconds,
            };
            self.notification_events.push(event);
        }
    }
    pub(crate) fn multipart_upload(
        &mut self,
        upload_id: &str,
        bucket: &str,
        key: &str,
    ) -> Result<&mut MultipartUploadState, RuntimeError> {
        let upload = self
            .multipart_uploads
            .get_mut(upload_id)
            .ok_or_else(|| RuntimeError::NoSuchUpload(upload_id.to_string()))?;
        if upload.bucket == bucket && upload.key == key {
            Ok(upload)
        } else {
            Err(RuntimeError::NoSuchUpload(upload_id.to_string()))
        }
    }
    pub(crate) fn authorize(
        &mut self,
        principal: &str,
        action: S3Action,
        resource: &str,
    ) -> Result<(), RuntimeError> {
        self.authorize_with_context(principal, action, resource, &RequestContext::default())
    }
    pub(crate) fn authorize_bypass_governance_if_requested(
        &mut self,
        principal: &str,
        resource: &str,
        bypass_governance: bool,
    ) -> Result<(), RuntimeError> {
        if bypass_governance {
            self.authorize(principal, S3Action::BypassGovernanceRetention, resource)?;
        }
        Ok(())
    }
    pub(crate) fn authorize_with_context(
        &mut self,
        principal: &str,
        action: S3Action,
        resource: &str,
        context: &RequestContext,
    ) -> Result<(), RuntimeError> {
        if !self.tenant_allows_resource(principal, resource) {
            self.audit_denied(
                principal,
                action,
                resource,
                Some("tenant isolation denied".to_string()),
            );
            return Err(RuntimeError::AccessDenied {
                principal: principal.to_string(),
                action: action.iam_name().to_string(),
                resource: resource.to_string(),
                explicit_deny: true,
            });
        }
        if let Some(scope) = self.active_request_scope.as_ref() {
            if !scope.permits(action.iam_name(), resource) {
                self.audit_denied(
                    principal,
                    action,
                    resource,
                    Some("session scope denied".to_string()),
                );
                return Err(RuntimeError::AccessDenied {
                    principal: principal.to_string(),
                    action: action.iam_name().to_string(),
                    resource: resource.to_string(),
                    explicit_deny: true,
                });
            }
        }
        let decision =
            self.policy
                .evaluate_with_context(principal, action.iam_name(), resource, context);
        let resource_decision = self.bucket_policy_for_resource(resource).map(|policy| {
            policy.evaluate_with_context(principal, action.iam_name(), resource, context)
        });
        let explicit_deny =
            decision.explicit_deny || resource_decision.as_ref().is_some_and(|d| d.explicit_deny);
        let allowed = !explicit_deny
            && (decision.allowed || resource_decision.as_ref().is_some_and(|d| d.allowed));
        if allowed {
            return Ok(());
        }
        self.audit_denied(
            principal,
            action,
            resource,
            Some(if explicit_deny {
                "explicit deny".to_string()
            } else {
                "default deny".to_string()
            }),
        );
        Err(RuntimeError::AccessDenied {
            principal: principal.to_string(),
            action: action.iam_name().to_string(),
            resource: resource.to_string(),
            explicit_deny,
        })
    }
    pub(crate) fn bucket_policy_for_resource(&self, resource: &str) -> Option<&Policy> {
        let bucket = resource.split('/').next().unwrap_or(resource);
        self.buckets
            .get(bucket)
            .and_then(|bucket_state| bucket_state.policy.as_ref())
            .map(|state| &state.policy)
    }

    pub(crate) fn tenant_allows_resource(&self, principal: &str, resource: &str) -> bool {
        if resource == "*" {
            return true;
        }
        let bucket = resource.split('/').next().unwrap_or(resource);
        let Some(bucket_state) = self.buckets.get(bucket) else {
            return true;
        };
        bucket_state.tenant_id == self.principal_tenant_id(principal)
    }
    pub(crate) fn audit_allowed(
        &mut self,
        principal: &str,
        action: S3Action,
        resource: &str,
        detail: Option<String>,
    ) {
        self.audit.append(
            principal,
            action.iam_name(),
            resource,
            AuditOutcome::Allowed,
            detail,
        );
    }
    pub(crate) fn audit_denied(
        &mut self,
        principal: &str,
        action: S3Action,
        resource: &str,
        detail: Option<String>,
    ) {
        self.audit.append(
            principal,
            action.iam_name(),
            resource,
            AuditOutcome::Denied,
            detail,
        );
    }

    pub(crate) fn audit_auth_allowed(&mut self, principal: &str, access_key_id: &str) {
        self.audit.append(
            principal,
            "auth:SigV4Authenticate",
            access_key_id,
            AuditOutcome::Allowed,
            None,
        );
    }

    pub(crate) fn audit_auth_failed(&mut self, access_key_id: Option<&str>, detail: String) {
        self.audit.append(
            "anonymous",
            "auth:SigV4Authenticate",
            access_key_id.unwrap_or("*"),
            AuditOutcome::Failed,
            Some(detail),
        );
    }
}