use super::*;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RuntimeSnapshot {
pub schema_version: u32,
pub buckets: BTreeMap<String, BucketState>,
pub policy: Policy,
pub audit_events: Vec<AuditEvent>,
pub replication_records: Vec<ReplicationRecord>,
pub multipart_uploads: BTreeMap<String, MultipartUploadState>,
pub notification_events: Vec<NotificationEvent>,
#[serde(default)]
pub auth: AuthStore,
#[serde(default)]
pub tenant_quotas: BTreeMap<String, TenantQuotaConfiguration>,
#[serde(default)]
pub storage_commits: Vec<StorageCommitRecord>,
#[serde(default)]
pub console_preferences: BTreeMap<String, BTreeMap<String, String>>,
pub clock_epoch_seconds: u64,
pub next_version: u64,
pub next_upload: u64,
pub next_event: u64,
pub next_request: u64,
}
impl RuntimeSnapshot {
pub const SCHEMA_VERSION: u32 = 16;
const MIN_SUPPORTED_SCHEMA_VERSION: u32 = 15;
pub(crate) fn normalize_object_version_ordinals(&mut self) {
let mut version_id_rewrites = BTreeMap::new();
for object in self.buckets.iter_mut().flat_map(|(bucket_name, bucket)| {
bucket
.objects
.iter_mut()
.map(move |(key, object)| (bucket_name.as_str(), key.as_str(), object))
}) {
let (bucket_name, key, object) = object;
object.normalize_missing_local_ordinals();
for version in &mut object.versions {
let object_local_version_id = object_version_id(version.local_ordinal);
if version.version_id != object_local_version_id {
version_id_rewrites.insert(
(
bucket_name.to_string(),
key.to_string(),
version.version_id.clone(),
),
object_local_version_id.clone(),
);
version.version_id = object_local_version_id;
}
}
}
for commit in &mut self.storage_commits {
if let Some(version_id) = version_id_rewrites.get(&(
commit.bucket.clone(),
commit.key.clone(),
commit.version_id.clone(),
)) {
commit.version_id.clone_from(version_id);
}
}
for record in &mut self.replication_records {
if let Some(version_id) = version_id_rewrites.get(&(
record.bucket.clone(),
record.key.clone(),
record.version_id.clone(),
)) {
record.version_id.clone_from(version_id);
}
}
for event in &mut self.notification_events {
if let Some(version_id) = version_id_rewrites.get(&(
event.bucket.clone(),
event.key.clone(),
event.version_id.clone(),
)) {
event.version_id.clone_from(version_id);
}
}
}
pub(crate) fn validate(&self) -> Result<(), RuntimeError> {
if !(Self::MIN_SUPPORTED_SCHEMA_VERSION..=Self::SCHEMA_VERSION)
.contains(&self.schema_version)
{
return Err(RuntimeError::UnsupportedSnapshotSchema(self.schema_version));
}
let highest_version = self
.buckets
.values()
.flat_map(|bucket| bucket.objects.values())
.flat_map(|object| object.versions.iter())
.filter_map(|version| version_numeric_suffix(&version.version_id))
.max()
.unwrap_or_default();
if self.next_version <= highest_version {
return Err(RuntimeError::InvalidSnapshotVersionCounter {
next_version: self.next_version,
highest_version,
});
}
let highest_upload = self
.multipart_uploads
.keys()
.filter_map(|upload_id| upload_numeric_suffix(upload_id))
.max()
.unwrap_or_default();
if self.next_upload <= highest_upload {
return Err(RuntimeError::InvalidSnapshotUploadCounter {
next_upload: self.next_upload,
highest_upload,
});
}
let highest_event = self
.notification_events
.iter()
.filter_map(|event| event_numeric_suffix(&event.event_id))
.max()
.unwrap_or_default();
if self.next_event <= highest_event {
return Err(RuntimeError::InvalidSnapshotEventCounter {
next_event: self.next_event,
highest_event,
});
}
validate_storage_commit_log(self)?;
Ok(())
}
}
fn version_numeric_suffix(version_id: &str) -> Option<u64> {
version_id.strip_prefix('v')?.parse().ok()
}
pub(crate) fn object_version_id(local_ordinal: u64) -> String {
format!("v{local_ordinal}")
}
fn upload_numeric_suffix(upload_id: &str) -> Option<u64> {
upload_id.strip_prefix('u')?.parse().ok()
}
fn event_numeric_suffix(event_id: &str) -> Option<u64> {
event_id.strip_prefix('e')?.parse().ok()
}
fn validate_storage_commit_log(snapshot: &RuntimeSnapshot) -> Result<(), RuntimeError> {
for (index, commit) in snapshot.storage_commits.iter().enumerate() {
let expected_sequence = index as u64 + 1;
if commit.sequence != expected_sequence {
return Err(RuntimeError::InvalidStorageCommitLog(format!(
"sequence {} should be {}",
commit.sequence, expected_sequence
)));
}
if commit.operation.trim().is_empty()
|| commit.bucket.trim().is_empty()
|| commit.key.trim().is_empty()
|| commit.version_id.trim().is_empty()
{
return Err(RuntimeError::InvalidStorageCommitLog(format!(
"commit {} has empty required fields",
commit.sequence
)));
}
let Some(version) = snapshot
.buckets
.get(&commit.bucket)
.and_then(|bucket| bucket.objects.get(&commit.key))
.and_then(|object| {
object
.versions
.iter()
.find(|version| version.version_id == commit.version_id)
})
else {
return Err(RuntimeError::InvalidStorageCommitLog(format!(
"commit {} references missing version {}/{}/{}",
commit.sequence, commit.bucket, commit.key, commit.version_id
)));
};
if !version.delete_marker && commit.checksum_sha256 != version.integrity.checksum_sha256 {
return Err(RuntimeError::InvalidStorageCommitLog(format!(
"commit {} checksum does not match version integrity",
commit.sequence
)));
}
}
Ok(())
}