bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

impl BucketWarden {
    pub fn list_objects(
        &mut self,
        principal: &str,
        request: ListObjectsRequest,
    ) -> Result<ListObjectsResult, RuntimeError> {
        self.authorize(principal, S3Action::ListBucket, &request.bucket)?;
        let bucket = self.require_bucket(&request.bucket)?;
        let max_keys = request.max_keys.unwrap_or(1000);
        if request
            .encoding_type
            .as_deref()
            .is_some_and(|value| value != "url")
        {
            return Err(RuntimeError::InvalidListParameter {
                name: "encoding-type".to_string(),
                value: request.encoding_type.unwrap_or_default(),
            });
        }
        let after = request
            .continuation_token
            .as_ref()
            .or(request.start_after.as_ref())
            .or(request.marker.as_ref());
        let delimiter = request.delimiter.as_deref();
        let prefix = request.prefix.as_deref().unwrap_or_default();
        let mut entries = BTreeMap::<String, ListingEntry>::new();
        for (key, object) in &bucket.objects {
            if !key.starts_with(prefix) {
                continue;
            }
            if after.is_some_and(|after| key.as_str() <= after.as_str()) {
                continue;
            }
            let Some(version) = object.current_version() else {
                continue;
            };
            if let Some(delimiter) = delimiter {
                let remainder = &key[prefix.len()..];
                if let Some(index) = remainder.find(delimiter) {
                    let common_prefix =
                        format!("{}{}", prefix, &remainder[..index + delimiter.len()]);
                    if after.is_none_or(|after| common_prefix.as_str() > after.as_str()) {
                        entries
                            .entry(common_prefix.clone())
                            .or_insert(ListingEntry::CommonPrefix(CommonPrefix {
                                prefix: common_prefix,
                            }));
                    }
                    continue;
                }
            }
            entries.insert(
                key.clone(),
                ListingEntry::Object(ListedObject {
                    key: key.clone(),
                    version_id: version.version_id.clone(),
                    etag: version.etag.clone(),
                    last_modified_epoch_seconds: version.last_modified_epoch_seconds,
                    content_length: version.content_length(),
                    metadata: version.metadata.clone(),
                    owner: Some(stored_owner(bucket, version)),
                }),
            );
        }
        let mut objects = Vec::new();
        let mut common_prefixes = Vec::new();
        let mut next_continuation_token = None;
        let mut emitted = 0usize;
        let mut iter = entries.into_iter().peekable();
        while let Some((sort_key, entry)) = iter.next() {
            if emitted == max_keys {
                next_continuation_token = Some(sort_key);
                break;
            }
            match entry {
                ListingEntry::Object(object) => objects.push(object),
                ListingEntry::CommonPrefix(prefix) => common_prefixes.push(prefix),
            }
            emitted += 1;
            if emitted == max_keys && iter.peek().is_some() {
                next_continuation_token = Some(sort_key);
                break;
            }
        }
        self.audit_allowed(
            principal,
            S3Action::ListBucket,
            &request.bucket,
            Some(emitted.to_string()),
        );
        Ok(ListObjectsResult {
            bucket: request.bucket,
            prefix: request.prefix,
            delimiter: request.delimiter,
            max_keys,
            is_truncated: next_continuation_token.is_some(),
            marker: request.marker,
            next_marker: next_continuation_token.clone(),
            next_continuation_token,
            key_count: emitted,
            common_prefixes,
            continuation_token: request.continuation_token,
            start_after: request.start_after,
            encoding_type: request.encoding_type,
            objects,
        })
    }

    pub fn list_object_versions(
        &mut self,
        principal: &str,
        request: ListObjectVersionsRequest,
    ) -> Result<ListObjectVersionsResult, RuntimeError> {
        self.authorize(principal, S3Action::ListBucketVersions, &request.bucket)?;
        let bucket = self.require_bucket(&request.bucket)?;
        let max_keys = request.max_keys.unwrap_or(1000);
        if request
            .encoding_type
            .as_deref()
            .is_some_and(|value| value != "url")
        {
            return Err(RuntimeError::InvalidListParameter {
                name: "encoding-type".to_string(),
                value: request.encoding_type.unwrap_or_default(),
            });
        }
        if request.version_id_marker.is_some() && request.key_marker.is_none() {
            return Err(RuntimeError::InvalidListParameter {
                name: "version-id-marker".to_string(),
                value: request.version_id_marker.unwrap_or_default(),
            });
        }
        let prefix = request.prefix.as_deref().unwrap_or_default();
        let key_marker = request.key_marker.as_deref();
        let version_id_marker = request.version_id_marker.as_deref();
        let mut entries = Vec::new();
        for (key, object) in &bucket.objects {
            if !key.starts_with(prefix) {
                continue;
            }
            let latest_index = object.versions.len().saturating_sub(1);
            for (index, version) in object.versions.iter().enumerate().rev() {
                let is_latest = index == latest_index;
                if version.delete_marker {
                    entries.push(VersionListingEntry::DeleteMarker(ListedDeleteMarker {
                        key: key.clone(),
                        version_id: version.version_id.clone(),
                        last_modified_epoch_seconds: version.last_modified_epoch_seconds,
                        is_latest,
                    }));
                } else {
                    entries.push(VersionListingEntry::Version(ListedObjectVersion {
                        key: key.clone(),
                        version_id: version.version_id.clone(),
                        etag: version.etag.clone(),
                        last_modified_epoch_seconds: version.last_modified_epoch_seconds,
                        content_length: version.content_length(),
                        is_latest,
                        metadata: version.metadata.clone(),
                        owner: Some(stored_owner(bucket, version)),
                    }));
                }
            }
        }
        if let Some(marker) = key_marker {
            let marker_version = version_id_marker.unwrap_or_default();
            let mut resumed = marker_version.is_empty();
            entries = entries
                .into_iter()
                .filter_map(|entry| match entry.key().cmp(marker) {
                    std::cmp::Ordering::Less => None,
                    std::cmp::Ordering::Greater => Some(entry),
                    std::cmp::Ordering::Equal if marker_version.is_empty() => None,
                    std::cmp::Ordering::Equal if resumed => Some(entry),
                    std::cmp::Ordering::Equal if entry.version_id() == marker_version => {
                        resumed = true;
                        None
                    }
                    std::cmp::Ordering::Equal => None,
                })
                .collect();
        }
        let mut versions = Vec::new();
        let mut delete_markers = Vec::new();
        let mut next_key_marker = None;
        let mut next_version_id_marker = None;
        let mut emitted = 0usize;
        let mut iter = entries.into_iter().peekable();
        while let Some(entry) = iter.next() {
            if emitted == max_keys {
                next_key_marker = Some(entry.key().to_string());
                next_version_id_marker = Some(entry.version_id().to_string());
                break;
            }
            let key = entry.key().to_string();
            let version_id = entry.version_id().to_string();
            match entry {
                VersionListingEntry::Version(version) => versions.push(version),
                VersionListingEntry::DeleteMarker(marker) => delete_markers.push(marker),
            }
            emitted += 1;
            if emitted == max_keys && iter.peek().is_some() {
                next_key_marker = Some(key);
                next_version_id_marker = Some(version_id);
                break;
            }
        }
        self.audit_allowed(
            principal,
            S3Action::ListBucketVersions,
            &request.bucket,
            Some(emitted.to_string()),
        );
        Ok(ListObjectVersionsResult {
            bucket: request.bucket,
            prefix: request.prefix,
            key_marker: request.key_marker,
            version_id_marker: request.version_id_marker,
            max_keys,
            is_truncated: next_key_marker.is_some(),
            next_key_marker,
            next_version_id_marker,
            key_count: emitted,
            encoding_type: request.encoding_type,
            versions,
            delete_markers,
        })
    }
}