bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct PartIntegrityRecord {
    pub part_number: u16,
    pub checksum_sha256: String,
    pub size: usize,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct IntegrityRecord {
    pub checksum_sha256: String,
    pub content_length: usize,
    pub part_checksums: Vec<PartIntegrityRecord>,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct IntegrityVerificationReport {
    pub bucket: String,
    pub key: String,
    pub version_id: String,
    pub checksum_sha256: String,
    pub content_length: usize,
    pub passed: bool,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct StorageIntegrityFailure {
    pub bucket: String,
    pub key: String,
    pub version_id: String,
    pub reason: String,
}

#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize)]
pub struct StorageIntegrityScanReport {
    pub checked_versions: usize,
    pub checked_parts: usize,
    pub failures: Vec<StorageIntegrityFailure>,
    pub passed: bool,
}

impl IntegrityRecord {
    pub fn for_body(body: &[u8]) -> Self {
        Self {
            checksum_sha256: sha256_hex(body),
            content_length: body.len(),
            part_checksums: Vec::new(),
        }
    }

    pub fn for_multipart_body(body: &[u8], part_checksums: Vec<PartIntegrityRecord>) -> Self {
        Self {
            checksum_sha256: sha256_hex(body),
            content_length: body.len(),
            part_checksums,
        }
    }
}

impl BucketWarden {
    pub(crate) fn verify_integrity_record(
        &self,
        integrity: &IntegrityRecord,
        body: &[u8],
    ) -> Result<(), RuntimeError> {
        let checksum = sha256_hex(body);
        if checksum != integrity.checksum_sha256 || body.len() != integrity.content_length {
            return Err(RuntimeError::BadDigest {
                header: "x-bucketwarden-object-integrity".to_string(),
            });
        }
        self.verify_part_integrity_records(integrity, body)?;
        Ok(())
    }

    fn verify_part_integrity_records(
        &self,
        integrity: &IntegrityRecord,
        body: &[u8],
    ) -> Result<(), RuntimeError> {
        if integrity.part_checksums.is_empty() {
            return Ok(());
        }
        let expected_size = integrity
            .part_checksums
            .iter()
            .map(|part| part.size)
            .sum::<usize>();
        if expected_size != body.len() {
            return Err(RuntimeError::BadDigest {
                header: "x-bucketwarden-part-integrity".to_string(),
            });
        }
        let mut offset = 0;
        for part in &integrity.part_checksums {
            let end = offset + part.size;
            let actual = sha256_hex(&body[offset..end]);
            if actual != part.checksum_sha256 {
                return Err(RuntimeError::BadDigest {
                    header: format!("x-bucketwarden-part-integrity-{}", part.part_number),
                });
            }
            offset = end;
        }
        Ok(())
    }

    pub fn verify_object_integrity(
        &self,
        bucket: &str,
        key: &str,
        version_id: Option<&str>,
    ) -> Result<IntegrityVerificationReport, RuntimeError> {
        let version = if let Some(version_id) = version_id {
            self.version_by_id(bucket, key, version_id)?
        } else {
            self.current_version(bucket, key)?
        };
        if version.delete_marker {
            return Err(RuntimeError::NoSuchKey(object_resource(bucket, key)));
        }
        let body = self.kms.decrypt(&version.ciphertext)?;
        self.verify_integrity_record(&version.integrity, &body)?;
        Ok(IntegrityVerificationReport {
            bucket: bucket.to_string(),
            key: key.to_string(),
            version_id: version.version_id.clone(),
            checksum_sha256: version.integrity.checksum_sha256.clone(),
            content_length: version.integrity.content_length,
            passed: true,
        })
    }

    pub fn verify_storage_integrity(&self) -> StorageIntegrityScanReport {
        let mut report = StorageIntegrityScanReport::default();
        for (bucket_name, bucket) in &self.buckets {
            for (key, object) in &bucket.objects {
                for version in &object.versions {
                    if version.delete_marker {
                        continue;
                    }
                    report.checked_versions += 1;
                    report.checked_parts += version.integrity.part_checksums.len();
                    let result = self
                        .kms
                        .decrypt(&version.ciphertext)
                        .map_err(RuntimeError::from)
                        .and_then(|body| self.verify_integrity_record(&version.integrity, &body));
                    if let Err(error) = result {
                        report.failures.push(StorageIntegrityFailure {
                            bucket: bucket_name.clone(),
                            key: key.clone(),
                            version_id: version.version_id.clone(),
                            reason: error.to_string(),
                        });
                    }
                }
            }
        }
        report.passed = report.failures.is_empty();
        report
    }
}