bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;
use std::collections::BTreeSet;
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};

const FILESYSTEM_STORE_SCHEMA_VERSION: u32 = 1;
const MANIFEST_FILE: &str = "manifest.json";
const OBJECTS_DIR: &str = "objects";

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct FilesystemStoreManifest {
    pub schema_version: u32,
    pub snapshot: RuntimeSnapshot,
    pub objects: Vec<FilesystemObjectRecord>,
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct FilesystemObjectRecord {
    pub bucket: String,
    pub key: String,
    pub version_id: String,
    pub relative_path: String,
    pub ciphertext_sha256: String,
    pub ciphertext_len: usize,
}

impl BucketWarden {
    pub fn save_filesystem_store(&self, root: impl AsRef<Path>) -> Result<(), RuntimeError> {
        let root = root.as_ref();
        std::fs::create_dir_all(root).map_err(snapshot_io)?;
        let objects_root = root.join(OBJECTS_DIR);
        std::fs::create_dir_all(&objects_root).map_err(snapshot_io)?;

        let mut snapshot = self.snapshot();
        let mut object_records = Vec::new();
        for (bucket_name, bucket) in snapshot.buckets.iter_mut() {
            for (key, object) in bucket.objects.iter_mut() {
                for version in &mut object.versions {
                    if version.delete_marker {
                        continue;
                    }
                    let relative_path =
                        object_version_relative_path(bucket_name, key, &version.version_id);
                    let destination = root.join(&relative_path);
                    write_atomic(&destination, &version.ciphertext.bytes)?;
                    object_records.push(FilesystemObjectRecord {
                        bucket: bucket_name.clone(),
                        key: key.clone(),
                        version_id: version.version_id.clone(),
                        relative_path: path_to_manifest_string(&relative_path),
                        ciphertext_sha256: sha256_hex(&version.ciphertext.bytes),
                        ciphertext_len: version.ciphertext.bytes.len(),
                    });
                    version.ciphertext.bytes.clear();
                }
            }
        }

        let manifest = FilesystemStoreManifest {
            schema_version: FILESYSTEM_STORE_SCHEMA_VERSION,
            snapshot,
            objects: object_records,
        };
        let manifest_json =
            serde_json::to_vec_pretty(&manifest).map_err(RuntimeError::SnapshotSerialize)?;
        write_atomic(&root.join(MANIFEST_FILE), &manifest_json)
    }

    pub fn restore_filesystem_store(
        config: RuntimeConfig,
        root: impl AsRef<Path>,
    ) -> Result<Self, RuntimeError> {
        let root = root.as_ref();
        let manifest_json = std::fs::read_to_string(root.join(MANIFEST_FILE))
            .map_err(|error| RuntimeError::SnapshotIo(error.to_string()))?;
        let mut manifest: FilesystemStoreManifest =
            serde_json::from_str(&manifest_json).map_err(RuntimeError::SnapshotDeserialize)?;
        if manifest.schema_version != FILESYSTEM_STORE_SCHEMA_VERSION {
            return Err(RuntimeError::SnapshotIo(format!(
                "unsupported filesystem store schema version: {}",
                manifest.schema_version
            )));
        }

        let mut restored_versions = BTreeSet::new();
        for record in &manifest.objects {
            let body_path = checked_store_path(root, &record.relative_path)?;
            let body = std::fs::read(&body_path).map_err(snapshot_io)?;
            if body.len() != record.ciphertext_len {
                return Err(RuntimeError::SnapshotIo(format!(
                    "filesystem object length mismatch for {}/{}/{}",
                    record.bucket, record.key, record.version_id
                )));
            }
            let actual_sha256 = sha256_hex(&body);
            if actual_sha256 != record.ciphertext_sha256 {
                return Err(RuntimeError::SnapshotIo(format!(
                    "filesystem object checksum mismatch for {}/{}/{}",
                    record.bucket, record.key, record.version_id
                )));
            }
            let Some(version) = manifest
                .snapshot
                .buckets
                .get_mut(&record.bucket)
                .and_then(|bucket| bucket.objects.get_mut(&record.key))
                .and_then(|object| {
                    object
                        .versions
                        .iter_mut()
                        .find(|version| version.version_id == record.version_id)
                })
            else {
                return Err(RuntimeError::SnapshotIo(format!(
                    "filesystem object record has no manifest version for {}/{}/{}",
                    record.bucket, record.key, record.version_id
                )));
            };
            if version.delete_marker {
                return Err(RuntimeError::SnapshotIo(format!(
                    "filesystem object record points at delete marker {}/{}/{}",
                    record.bucket, record.key, record.version_id
                )));
            }
            if !restored_versions.insert((
                record.bucket.clone(),
                record.key.clone(),
                record.version_id.clone(),
            )) {
                return Err(RuntimeError::SnapshotIo(format!(
                    "filesystem object record is duplicated for {}/{}/{}",
                    record.bucket, record.key, record.version_id
                )));
            }
            version.ciphertext.bytes = body;
        }
        require_complete_object_payload_manifest(&manifest.snapshot, &restored_versions)?;

        Self::restore(config, manifest.snapshot)
    }
}

fn object_version_relative_path(bucket: &str, key: &str, version_id: &str) -> PathBuf {
    PathBuf::from(OBJECTS_DIR)
        .join(hex_component(bucket))
        .join(hex_component(key))
        .join(format!("{}.bin", hex_component(version_id)))
}

fn checked_store_path(root: &Path, relative_path: &str) -> Result<PathBuf, RuntimeError> {
    let relative = Path::new(relative_path);
    if relative.is_absolute()
        || relative.components().any(|component| {
            matches!(
                component,
                std::path::Component::ParentDir | std::path::Component::Prefix(_)
            )
        })
    {
        return Err(RuntimeError::SnapshotIo(format!(
            "invalid filesystem store relative path: {relative_path}"
        )));
    }
    Ok(root.join(relative))
}

fn hex_component(value: &str) -> String {
    let mut encoded = String::with_capacity(value.len() * 2);
    for byte in value.as_bytes() {
        encoded.push_str(&format!("{byte:02x}"));
    }
    encoded
}

fn path_to_manifest_string(path: &Path) -> String {
    path.components()
        .map(|component| component.as_os_str().to_string_lossy())
        .collect::<Vec<_>>()
        .join("/")
}

fn require_complete_object_payload_manifest(
    snapshot: &RuntimeSnapshot,
    restored_versions: &BTreeSet<(String, String, String)>,
) -> Result<(), RuntimeError> {
    for (bucket_name, bucket) in &snapshot.buckets {
        for (key, object) in &bucket.objects {
            for version in &object.versions {
                if version.delete_marker {
                    continue;
                }
                let identity = (bucket_name.clone(), key.clone(), version.version_id.clone());
                if !restored_versions.contains(&identity) {
                    return Err(RuntimeError::SnapshotIo(format!(
                        "filesystem object payload missing from manifest for {}/{}/{}",
                        bucket_name, key, version.version_id
                    )));
                }
            }
        }
    }
    Ok(())
}

fn write_atomic(path: &Path, bytes: &[u8]) -> Result<(), RuntimeError> {
    if let Some(parent) = path.parent() {
        std::fs::create_dir_all(parent).map_err(snapshot_io)?;
    }
    let tmp_path = temporary_store_path(path);
    let mut tmp_file = File::options()
        .write(true)
        .create_new(true)
        .open(&tmp_path)
        .map_err(snapshot_io)?;
    tmp_file.write_all(bytes).map_err(snapshot_io)?;
    tmp_file.sync_all().map_err(snapshot_io)?;
    drop(tmp_file);

    if path.exists() {
        std::fs::remove_file(path).map_err(snapshot_io)?;
    }
    if let Err(error) = std::fs::rename(&tmp_path, path) {
        let _ = std::fs::remove_file(&tmp_path);
        return Err(snapshot_io(error));
    }

    sync_file_best_effort(path);
    if let Some(parent) = path.parent() {
        sync_directory_best_effort(parent);
    }
    Ok(())
}

fn temporary_store_path(path: &Path) -> PathBuf {
    let nonce = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|duration| duration.as_nanos())
        .unwrap_or_default();
    path.with_extension(format!("tmp-{}-{nonce}", std::process::id()))
}

fn sync_directory_best_effort(path: &Path) {
    if let Ok(directory) = File::open(path) {
        let _ = directory.sync_all();
    }
}

fn sync_file_best_effort(path: &Path) {
    if let Ok(file) = File::open(path) {
        let _ = file.sync_all();
    }
}

fn snapshot_io(error: std::io::Error) -> RuntimeError {
    RuntimeError::SnapshotIo(error.to_string())
}