Skip to main content

bucketwarden_server/
integrity_verification.rs

1use super::*;
2
3#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
4pub struct PartIntegrityRecord {
5    pub part_number: u16,
6    pub checksum_sha256: String,
7    pub size: usize,
8}
9
10#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
11pub struct IntegrityRecord {
12    pub checksum_sha256: String,
13    pub content_length: usize,
14    pub part_checksums: Vec<PartIntegrityRecord>,
15}
16
17#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
18pub struct IntegrityVerificationReport {
19    pub bucket: String,
20    pub key: String,
21    pub version_id: String,
22    pub checksum_sha256: String,
23    pub content_length: usize,
24    pub passed: bool,
25}
26
27#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
28pub struct StorageIntegrityFailure {
29    pub bucket: String,
30    pub key: String,
31    pub version_id: String,
32    pub reason: String,
33}
34
35#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize)]
36pub struct StorageIntegrityScanReport {
37    pub checked_versions: usize,
38    pub checked_parts: usize,
39    pub failures: Vec<StorageIntegrityFailure>,
40    pub passed: bool,
41}
42
43impl IntegrityRecord {
44    pub fn for_body(body: &[u8]) -> Self {
45        Self {
46            checksum_sha256: sha256_hex(body),
47            content_length: body.len(),
48            part_checksums: Vec::new(),
49        }
50    }
51
52    pub fn for_multipart_body(body: &[u8], part_checksums: Vec<PartIntegrityRecord>) -> Self {
53        Self {
54            checksum_sha256: sha256_hex(body),
55            content_length: body.len(),
56            part_checksums,
57        }
58    }
59}
60
61impl BucketWarden {
62    pub(crate) fn verify_integrity_record(
63        &self,
64        integrity: &IntegrityRecord,
65        body: &[u8],
66    ) -> Result<(), RuntimeError> {
67        let checksum = sha256_hex(body);
68        if checksum != integrity.checksum_sha256 || body.len() != integrity.content_length {
69            return Err(RuntimeError::BadDigest {
70                header: "x-bucketwarden-object-integrity".to_string(),
71            });
72        }
73        self.verify_part_integrity_records(integrity, body)?;
74        Ok(())
75    }
76
77    fn verify_part_integrity_records(
78        &self,
79        integrity: &IntegrityRecord,
80        body: &[u8],
81    ) -> Result<(), RuntimeError> {
82        if integrity.part_checksums.is_empty() {
83            return Ok(());
84        }
85        let expected_size = integrity
86            .part_checksums
87            .iter()
88            .map(|part| part.size)
89            .sum::<usize>();
90        if expected_size != body.len() {
91            return Err(RuntimeError::BadDigest {
92                header: "x-bucketwarden-part-integrity".to_string(),
93            });
94        }
95        let mut offset = 0;
96        for part in &integrity.part_checksums {
97            let end = offset + part.size;
98            let actual = sha256_hex(&body[offset..end]);
99            if actual != part.checksum_sha256 {
100                return Err(RuntimeError::BadDigest {
101                    header: format!("x-bucketwarden-part-integrity-{}", part.part_number),
102                });
103            }
104            offset = end;
105        }
106        Ok(())
107    }
108
109    pub fn verify_object_integrity(
110        &self,
111        bucket: &str,
112        key: &str,
113        version_id: Option<&str>,
114    ) -> Result<IntegrityVerificationReport, RuntimeError> {
115        let version = if let Some(version_id) = version_id {
116            self.version_by_id(bucket, key, version_id)?
117        } else {
118            self.current_version(bucket, key)?
119        };
120        if version.delete_marker {
121            return Err(RuntimeError::NoSuchKey(object_resource(bucket, key)));
122        }
123        let body = self.kms.decrypt(&version.ciphertext)?;
124        self.verify_integrity_record(&version.integrity, &body)?;
125        Ok(IntegrityVerificationReport {
126            bucket: bucket.to_string(),
127            key: key.to_string(),
128            version_id: version.version_id.clone(),
129            checksum_sha256: version.integrity.checksum_sha256.clone(),
130            content_length: version.integrity.content_length,
131            passed: true,
132        })
133    }
134
135    pub fn verify_storage_integrity(&self) -> StorageIntegrityScanReport {
136        let mut report = StorageIntegrityScanReport::default();
137        for (bucket_name, bucket) in &self.buckets {
138            for (key, object) in &bucket.objects {
139                for version in &object.versions {
140                    if version.delete_marker {
141                        continue;
142                    }
143                    report.checked_versions += 1;
144                    report.checked_parts += version.integrity.part_checksums.len();
145                    let result = self
146                        .kms
147                        .decrypt(&version.ciphertext)
148                        .map_err(RuntimeError::from)
149                        .and_then(|body| self.verify_integrity_record(&version.integrity, &body));
150                    if let Err(error) = result {
151                        report.failures.push(StorageIntegrityFailure {
152                            bucket: bucket_name.clone(),
153                            key: key.clone(),
154                            version_id: version.version_id.clone(),
155                            reason: error.to_string(),
156                        });
157                    }
158                }
159            }
160        }
161        report.passed = report.failures.is_empty();
162        report
163    }
164}