bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;
mod lifecycle_inventory_helpers;
use std::collections::BTreeSet;

use lifecycle_inventory_helpers::*;

impl BucketWarden {
    pub fn bucket_inventory(
        &mut self,
        principal: &str,
        bucket: &str,
    ) -> Result<BucketInventoryReport, RuntimeError> {
        self.bucket_inventory_query(
            principal,
            BucketInventoryRequest {
                bucket: bucket.to_string(),
                prefix: None,
                include_delete_markers: true,
                include_noncurrent_versions: true,
                max_entries: None,
            },
        )
    }

    pub fn bucket_inventory_query(
        &mut self,
        principal: &str,
        request: BucketInventoryRequest,
    ) -> Result<BucketInventoryReport, RuntimeError> {
        self.require_operator_action(
            principal,
            OperatorAction::ReadDiagnostics,
            &request.bucket,
            "ops:GetBucketInventory",
        )?;
        let bucket_state = self.require_bucket(&request.bucket)?;
        let mut all_entries = Vec::new();
        for view in filtered_inventory_versions(
            bucket_state,
            request.prefix.as_deref().unwrap_or_default(),
            request.include_delete_markers,
            request.include_noncurrent_versions,
        ) {
            all_entries.push(BucketInventoryEntry {
                key: view.key.to_string(),
                version_id: view.version.version_id.clone(),
                is_latest: view.is_latest,
                delete_marker: view.version.delete_marker,
                content_length: view.version.content_length(),
                last_modified_epoch_seconds: view.version.last_modified_epoch_seconds,
                owner: stored_owner(bucket_state, view.version),
                tag_count: view.version.tags.len(),
                legal_hold: view.version.lock.legal_hold,
                retention_mode: view
                    .version
                    .lock
                    .retention_mode
                    .map(|mode| format!("{mode:?}")),
                retain_until_epoch_seconds: view.version.lock.retain_until_epoch_seconds,
                encryption: view.version.metadata.encryption.clone(),
                replication_status: view.version.replication_status.clone(),
            });
        }
        all_entries.sort_by(|left, right| {
            left.key
                .cmp(&right.key)
                .then(left.version_id.cmp(&right.version_id))
        });
        let total_entries = all_entries.len();
        let entries = if let Some(limit) = request.max_entries {
            all_entries.into_iter().take(limit).collect::<Vec<_>>()
        } else {
            all_entries
        };
        let object_count = entries
            .iter()
            .filter(|entry| entry.is_latest && !entry.delete_marker)
            .count();
        let delete_marker_count = entries.iter().filter(|entry| entry.delete_marker).count();
        let report = BucketInventoryReport {
            bucket: request.bucket.clone(),
            generated_at_epoch_seconds: self.clock_epoch_seconds,
            object_count,
            version_count: entries.len(),
            delete_marker_count,
            returned_count: entries.len(),
            truncated: entries.len() < total_entries,
            entries,
        };
        self.audit.append(
            principal,
            "ops:GetBucketInventory",
            &request.bucket,
            AuditOutcome::Allowed,
            Some(format!(
                "objects={},versions={},delete_markers={},truncated={}",
                report.object_count,
                report.version_count,
                report.delete_marker_count,
                report.truncated
            )),
        );
        Ok(report)
    }

    pub fn tenant_inventory_query(
        &mut self,
        principal: &str,
        request: TenantInventoryRequest,
    ) -> Result<TenantInventoryReport, RuntimeError> {
        self.require_operator_action(
            principal,
            OperatorAction::ReadDiagnostics,
            &request.tenant_id,
            "ops:GetTenantInventory",
        )?;
        let mut entries = Vec::new();
        let mut bucket_count = 0usize;
        for (bucket_name, bucket_state) in &self.buckets {
            if bucket_state.tenant_id != request.tenant_id
                || !bucket_name.starts_with(request.bucket_prefix.as_deref().unwrap_or_default())
            {
                continue;
            }
            bucket_count += 1;
            for view in filtered_inventory_versions(
                bucket_state,
                request.key_prefix.as_deref().unwrap_or_default(),
                request.include_delete_markers,
                request.include_noncurrent_versions,
            ) {
                entries.push(TenantInventoryEntry {
                    bucket: bucket_name.clone(),
                    key: view.key.to_string(),
                    version_id: view.version.version_id.clone(),
                    is_latest: view.is_latest,
                    delete_marker: view.version.delete_marker,
                    content_length: view.version.content_length(),
                    last_modified_epoch_seconds: view.version.last_modified_epoch_seconds,
                    owner: stored_owner(bucket_state, view.version),
                    tag_count: view.version.tags.len(),
                    legal_hold: view.version.lock.legal_hold,
                    retention_mode: view
                        .version
                        .lock
                        .retention_mode
                        .map(|mode| format!("{mode:?}")),
                    retain_until_epoch_seconds: view.version.lock.retain_until_epoch_seconds,
                    encryption: view.version.metadata.encryption.clone(),
                    replication_status: view.version.replication_status.clone(),
                });
            }
        }
        entries.sort_by(|left, right| {
            left.bucket
                .cmp(&right.bucket)
                .then(left.key.cmp(&right.key))
                .then(left.version_id.cmp(&right.version_id))
        });
        let total_entries = entries.len();
        let entries = if let Some(limit) = request.max_entries {
            entries.into_iter().take(limit).collect::<Vec<_>>()
        } else {
            entries
        };
        let object_count = entries
            .iter()
            .filter(|entry| entry.is_latest && !entry.delete_marker)
            .count();
        let delete_marker_count = entries.iter().filter(|entry| entry.delete_marker).count();
        let report = TenantInventoryReport {
            tenant_id: request.tenant_id.clone(),
            generated_at_epoch_seconds: self.clock_epoch_seconds,
            bucket_count,
            object_count,
            version_count: entries.len(),
            delete_marker_count,
            returned_count: entries.len(),
            truncated: entries.len() < total_entries,
            entries,
        };
        self.audit.append(
            principal,
            "ops:GetTenantInventory",
            &request.tenant_id,
            AuditOutcome::Allowed,
            Some(format!(
                "buckets={},objects={},versions={},delete_markers={},truncated={}",
                report.bucket_count,
                report.object_count,
                report.version_count,
                report.delete_marker_count,
                report.truncated
            )),
        );
        Ok(report)
    }

    pub fn runtime_inventory_query(
        &mut self,
        principal: &str,
        request: RuntimeInventoryRequest,
    ) -> Result<RuntimeInventoryReport, RuntimeError> {
        self.require_operator_action(
            principal,
            OperatorAction::ReadDiagnostics,
            "*",
            "ops:GetRuntimeInventory",
        )?;
        let tenant_prefix = request.tenant_prefix.as_deref().unwrap_or_default();
        let bucket_prefix = request.bucket_prefix.as_deref().unwrap_or_default();
        let key_prefix = request.key_prefix.as_deref().unwrap_or_default();
        let mut entries = Vec::new();
        let mut bucket_count = 0usize;
        let mut tenant_ids = BTreeSet::new();
        for (bucket_name, bucket_state) in &self.buckets {
            if !bucket_state.tenant_id.starts_with(tenant_prefix)
                || !bucket_name.starts_with(bucket_prefix)
            {
                continue;
            }
            tenant_ids.insert(bucket_state.tenant_id.clone());
            bucket_count += 1;
            for view in filtered_inventory_versions(
                bucket_state,
                key_prefix,
                request.include_delete_markers,
                request.include_noncurrent_versions,
            ) {
                entries.push(RuntimeInventoryEntry {
                    tenant_id: bucket_state.tenant_id.clone(),
                    bucket: bucket_name.clone(),
                    key: view.key.to_string(),
                    version_id: view.version.version_id.clone(),
                    is_latest: view.is_latest,
                    delete_marker: view.version.delete_marker,
                    content_length: view.version.content_length(),
                    last_modified_epoch_seconds: view.version.last_modified_epoch_seconds,
                    owner: stored_owner(bucket_state, view.version),
                    tag_count: view.version.tags.len(),
                    legal_hold: view.version.lock.legal_hold,
                    retention_mode: view
                        .version
                        .lock
                        .retention_mode
                        .map(|mode| format!("{mode:?}")),
                    retain_until_epoch_seconds: view.version.lock.retain_until_epoch_seconds,
                    encryption: view.version.metadata.encryption.clone(),
                    replication_status: view.version.replication_status.clone(),
                });
            }
        }
        entries.sort_by(|left, right| {
            left.tenant_id
                .cmp(&right.tenant_id)
                .then(left.bucket.cmp(&right.bucket))
                .then(left.key.cmp(&right.key))
                .then(left.version_id.cmp(&right.version_id))
        });
        let total_entries = entries.len();
        let start_index = if let Some(token) = request.continuation_token.as_deref() {
            runtime_inventory_continuation_start(&entries, token)?
        } else {
            0
        };
        let available_entries = &entries[start_index..];
        let (entries, next_continuation_token) =
            runtime_inventory_page_window(available_entries, request.max_entries);
        let object_count = entries
            .iter()
            .filter(|entry| entry.is_latest && !entry.delete_marker)
            .count();
        let delete_marker_count = entries.iter().filter(|entry| entry.delete_marker).count();
        let report = RuntimeInventoryReport {
            generated_at_epoch_seconds: self.clock_epoch_seconds,
            tenant_count: tenant_ids.len(),
            bucket_count,
            object_count,
            version_count: entries.len(),
            delete_marker_count,
            returned_count: entries.len(),
            truncated: start_index + entries.len() < total_entries,
            next_continuation_token,
            entries,
        };
        self.audit.append(
            principal,
            "ops:GetRuntimeInventory",
            "*",
            AuditOutcome::Allowed,
            Some(format!(
                "tenants={},buckets={},objects={},versions={},delete_markers={},truncated={}",
                report.tenant_count,
                report.bucket_count,
                report.object_count,
                report.version_count,
                report.delete_marker_count,
                report.truncated
            )),
        );
        Ok(report)
    }

    pub fn runtime_inventory_summary(
        &mut self,
        principal: &str,
        request: RuntimeInventorySummaryRequest,
    ) -> Result<RuntimeInventorySummaryReport, RuntimeError> {
        self.require_operator_action(
            principal,
            OperatorAction::ReadDiagnostics,
            "*",
            "ops:GetRuntimeInventorySummary",
        )?;
        let tenant_prefix = request.tenant_prefix.as_deref().unwrap_or_default();
        let bucket_prefix = request.bucket_prefix.as_deref().unwrap_or_default();
        let key_prefix = request.key_prefix.as_deref().unwrap_or_default();
        let mut summary = InventorySummary::default();
        let mut tenant_ids = BTreeSet::new();
        let mut bucket_count = 0usize;
        for (bucket_name, bucket_state) in &self.buckets {
            if !bucket_state.tenant_id.starts_with(tenant_prefix)
                || !bucket_name.starts_with(bucket_prefix)
            {
                continue;
            }
            tenant_ids.insert(bucket_state.tenant_id.clone());
            bucket_count += 1;
            for view in filtered_inventory_versions(
                bucket_state,
                key_prefix,
                request.include_delete_markers,
                request.include_noncurrent_versions,
            ) {
                summary.observe(view);
            }
        }
        let report = RuntimeInventorySummaryReport {
            generated_at_epoch_seconds: self.clock_epoch_seconds,
            tenant_count: tenant_ids.len(),
            bucket_count,
            current_object_count: summary.current_object_count,
            noncurrent_version_count: summary.noncurrent_version_count,
            delete_marker_count: summary.delete_marker_count,
            encrypted_version_count: summary.encrypted_version_count,
            legal_hold_count: summary.legal_hold_count,
            retained_version_count: summary.retained_version_count,
            replicated_version_count: summary.replicated_version_count,
            total_bytes: summary.total_bytes,
        };
        self.audit.append(
            principal,
            "ops:GetRuntimeInventorySummary",
            "*",
            AuditOutcome::Allowed,
            Some(format!(
                "tenants={},buckets={},current_objects={},noncurrent_versions={},delete_markers={},encrypted={},legal_holds={},retained={},replicated={},bytes={}",
                report.tenant_count,
                report.bucket_count,
                report.current_object_count,
                report.noncurrent_version_count,
                report.delete_marker_count,
                report.encrypted_version_count,
                report.legal_hold_count,
                report.retained_version_count,
                report.replicated_version_count,
                report.total_bytes
            )),
        );
        Ok(report)
    }
}