bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RuntimeSnapshot {
    pub schema_version: u32,
    pub buckets: BTreeMap<String, BucketState>,
    pub policy: Policy,
    pub audit_events: Vec<AuditEvent>,
    pub replication_records: Vec<ReplicationRecord>,
    pub multipart_uploads: BTreeMap<String, MultipartUploadState>,
    pub notification_events: Vec<NotificationEvent>,
    #[serde(default)]
    pub auth: AuthStore,
    #[serde(default)]
    pub tenant_quotas: BTreeMap<String, TenantQuotaConfiguration>,
    #[serde(default)]
    pub storage_commits: Vec<StorageCommitRecord>,
    #[serde(default)]
    pub console_preferences: BTreeMap<String, BTreeMap<String, String>>,
    pub clock_epoch_seconds: u64,
    pub next_version: u64,
    pub next_upload: u64,
    pub next_event: u64,
    pub next_request: u64,
}
impl RuntimeSnapshot {
    pub const SCHEMA_VERSION: u32 = 16;
    const MIN_SUPPORTED_SCHEMA_VERSION: u32 = 15;

    pub(crate) fn normalize_object_version_ordinals(&mut self) {
        let mut version_id_rewrites = BTreeMap::new();
        for object in self.buckets.iter_mut().flat_map(|(bucket_name, bucket)| {
            bucket
                .objects
                .iter_mut()
                .map(move |(key, object)| (bucket_name.as_str(), key.as_str(), object))
        }) {
            let (bucket_name, key, object) = object;
            object.normalize_missing_local_ordinals();
            for version in &mut object.versions {
                let object_local_version_id = object_version_id(version.local_ordinal);
                if version.version_id != object_local_version_id {
                    version_id_rewrites.insert(
                        (
                            bucket_name.to_string(),
                            key.to_string(),
                            version.version_id.clone(),
                        ),
                        object_local_version_id.clone(),
                    );
                    version.version_id = object_local_version_id;
                }
            }
        }
        for commit in &mut self.storage_commits {
            if let Some(version_id) = version_id_rewrites.get(&(
                commit.bucket.clone(),
                commit.key.clone(),
                commit.version_id.clone(),
            )) {
                commit.version_id.clone_from(version_id);
            }
        }
        for record in &mut self.replication_records {
            if let Some(version_id) = version_id_rewrites.get(&(
                record.bucket.clone(),
                record.key.clone(),
                record.version_id.clone(),
            )) {
                record.version_id.clone_from(version_id);
            }
        }
        for event in &mut self.notification_events {
            if let Some(version_id) = version_id_rewrites.get(&(
                event.bucket.clone(),
                event.key.clone(),
                event.version_id.clone(),
            )) {
                event.version_id.clone_from(version_id);
            }
        }
    }

    pub(crate) fn validate(&self) -> Result<(), RuntimeError> {
        if !(Self::MIN_SUPPORTED_SCHEMA_VERSION..=Self::SCHEMA_VERSION)
            .contains(&self.schema_version)
        {
            return Err(RuntimeError::UnsupportedSnapshotSchema(self.schema_version));
        }
        let highest_version = self
            .buckets
            .values()
            .flat_map(|bucket| bucket.objects.values())
            .flat_map(|object| object.versions.iter())
            .filter_map(|version| version_numeric_suffix(&version.version_id))
            .max()
            .unwrap_or_default();
        if self.next_version <= highest_version {
            return Err(RuntimeError::InvalidSnapshotVersionCounter {
                next_version: self.next_version,
                highest_version,
            });
        }
        let highest_upload = self
            .multipart_uploads
            .keys()
            .filter_map(|upload_id| upload_numeric_suffix(upload_id))
            .max()
            .unwrap_or_default();
        if self.next_upload <= highest_upload {
            return Err(RuntimeError::InvalidSnapshotUploadCounter {
                next_upload: self.next_upload,
                highest_upload,
            });
        }
        let highest_event = self
            .notification_events
            .iter()
            .filter_map(|event| event_numeric_suffix(&event.event_id))
            .max()
            .unwrap_or_default();
        if self.next_event <= highest_event {
            return Err(RuntimeError::InvalidSnapshotEventCounter {
                next_event: self.next_event,
                highest_event,
            });
        }
        validate_storage_commit_log(self)?;
        Ok(())
    }
}

fn version_numeric_suffix(version_id: &str) -> Option<u64> {
    version_id.strip_prefix('v')?.parse().ok()
}

pub(crate) fn object_version_id(local_ordinal: u64) -> String {
    format!("v{local_ordinal}")
}

fn upload_numeric_suffix(upload_id: &str) -> Option<u64> {
    upload_id.strip_prefix('u')?.parse().ok()
}

fn event_numeric_suffix(event_id: &str) -> Option<u64> {
    event_id.strip_prefix('e')?.parse().ok()
}

fn validate_storage_commit_log(snapshot: &RuntimeSnapshot) -> Result<(), RuntimeError> {
    for (index, commit) in snapshot.storage_commits.iter().enumerate() {
        let expected_sequence = index as u64 + 1;
        if commit.sequence != expected_sequence {
            return Err(RuntimeError::InvalidStorageCommitLog(format!(
                "sequence {} should be {}",
                commit.sequence, expected_sequence
            )));
        }
        if commit.operation.trim().is_empty()
            || commit.bucket.trim().is_empty()
            || commit.key.trim().is_empty()
            || commit.version_id.trim().is_empty()
        {
            return Err(RuntimeError::InvalidStorageCommitLog(format!(
                "commit {} has empty required fields",
                commit.sequence
            )));
        }
        let Some(version) = snapshot
            .buckets
            .get(&commit.bucket)
            .and_then(|bucket| bucket.objects.get(&commit.key))
            .and_then(|object| {
                object
                    .versions
                    .iter()
                    .find(|version| version.version_id == commit.version_id)
            })
        else {
            return Err(RuntimeError::InvalidStorageCommitLog(format!(
                "commit {} references missing version {}/{}/{}",
                commit.sequence, commit.bucket, commit.key, commit.version_id
            )));
        };
        if !version.delete_marker && commit.checksum_sha256 != version.integrity.checksum_sha256 {
            return Err(RuntimeError::InvalidStorageCommitLog(format!(
                "commit {} checksum does not match version integrity",
                commit.sequence
            )));
        }
    }
    Ok(())
}