use super::*;
use std::collections::BTreeSet;
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
const FILESYSTEM_STORE_SCHEMA_VERSION: u32 = 1;
const MANIFEST_FILE: &str = "manifest.json";
const OBJECTS_DIR: &str = "objects";
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct FilesystemStoreManifest {
pub schema_version: u32,
pub snapshot: RuntimeSnapshot,
pub objects: Vec<FilesystemObjectRecord>,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct FilesystemObjectRecord {
pub bucket: String,
pub key: String,
pub version_id: String,
pub relative_path: String,
pub ciphertext_sha256: String,
pub ciphertext_len: usize,
}
impl BucketWarden {
pub fn save_filesystem_store(&self, root: impl AsRef<Path>) -> Result<(), RuntimeError> {
let root = root.as_ref();
std::fs::create_dir_all(root).map_err(snapshot_io)?;
let objects_root = root.join(OBJECTS_DIR);
std::fs::create_dir_all(&objects_root).map_err(snapshot_io)?;
let mut snapshot = self.snapshot();
let mut object_records = Vec::new();
for (bucket_name, bucket) in snapshot.buckets.iter_mut() {
for (key, object) in bucket.objects.iter_mut() {
for version in &mut object.versions {
if version.delete_marker {
continue;
}
let relative_path =
object_version_relative_path(bucket_name, key, &version.version_id);
let destination = root.join(&relative_path);
write_atomic(&destination, &version.ciphertext.bytes)?;
object_records.push(FilesystemObjectRecord {
bucket: bucket_name.clone(),
key: key.clone(),
version_id: version.version_id.clone(),
relative_path: path_to_manifest_string(&relative_path),
ciphertext_sha256: sha256_hex(&version.ciphertext.bytes),
ciphertext_len: version.ciphertext.bytes.len(),
});
version.ciphertext.bytes.clear();
}
}
}
let manifest = FilesystemStoreManifest {
schema_version: FILESYSTEM_STORE_SCHEMA_VERSION,
snapshot,
objects: object_records,
};
let manifest_json =
serde_json::to_vec_pretty(&manifest).map_err(RuntimeError::SnapshotSerialize)?;
write_atomic(&root.join(MANIFEST_FILE), &manifest_json)
}
pub fn restore_filesystem_store(
config: RuntimeConfig,
root: impl AsRef<Path>,
) -> Result<Self, RuntimeError> {
let root = root.as_ref();
let manifest_json = std::fs::read_to_string(root.join(MANIFEST_FILE))
.map_err(|error| RuntimeError::SnapshotIo(error.to_string()))?;
let mut manifest: FilesystemStoreManifest =
serde_json::from_str(&manifest_json).map_err(RuntimeError::SnapshotDeserialize)?;
if manifest.schema_version != FILESYSTEM_STORE_SCHEMA_VERSION {
return Err(RuntimeError::SnapshotIo(format!(
"unsupported filesystem store schema version: {}",
manifest.schema_version
)));
}
let mut restored_versions = BTreeSet::new();
for record in &manifest.objects {
let body_path = checked_store_path(root, &record.relative_path)?;
let body = std::fs::read(&body_path).map_err(snapshot_io)?;
if body.len() != record.ciphertext_len {
return Err(RuntimeError::SnapshotIo(format!(
"filesystem object length mismatch for {}/{}/{}",
record.bucket, record.key, record.version_id
)));
}
let actual_sha256 = sha256_hex(&body);
if actual_sha256 != record.ciphertext_sha256 {
return Err(RuntimeError::SnapshotIo(format!(
"filesystem object checksum mismatch for {}/{}/{}",
record.bucket, record.key, record.version_id
)));
}
let Some(version) = manifest
.snapshot
.buckets
.get_mut(&record.bucket)
.and_then(|bucket| bucket.objects.get_mut(&record.key))
.and_then(|object| {
object
.versions
.iter_mut()
.find(|version| version.version_id == record.version_id)
})
else {
return Err(RuntimeError::SnapshotIo(format!(
"filesystem object record has no manifest version for {}/{}/{}",
record.bucket, record.key, record.version_id
)));
};
if version.delete_marker {
return Err(RuntimeError::SnapshotIo(format!(
"filesystem object record points at delete marker {}/{}/{}",
record.bucket, record.key, record.version_id
)));
}
if !restored_versions.insert((
record.bucket.clone(),
record.key.clone(),
record.version_id.clone(),
)) {
return Err(RuntimeError::SnapshotIo(format!(
"filesystem object record is duplicated for {}/{}/{}",
record.bucket, record.key, record.version_id
)));
}
version.ciphertext.bytes = body;
}
require_complete_object_payload_manifest(&manifest.snapshot, &restored_versions)?;
Self::restore(config, manifest.snapshot)
}
}
fn object_version_relative_path(bucket: &str, key: &str, version_id: &str) -> PathBuf {
PathBuf::from(OBJECTS_DIR)
.join(hex_component(bucket))
.join(hex_component(key))
.join(format!("{}.bin", hex_component(version_id)))
}
fn checked_store_path(root: &Path, relative_path: &str) -> Result<PathBuf, RuntimeError> {
let relative = Path::new(relative_path);
if relative.is_absolute()
|| relative.components().any(|component| {
matches!(
component,
std::path::Component::ParentDir | std::path::Component::Prefix(_)
)
})
{
return Err(RuntimeError::SnapshotIo(format!(
"invalid filesystem store relative path: {relative_path}"
)));
}
Ok(root.join(relative))
}
fn hex_component(value: &str) -> String {
let mut encoded = String::with_capacity(value.len() * 2);
for byte in value.as_bytes() {
encoded.push_str(&format!("{byte:02x}"));
}
encoded
}
fn path_to_manifest_string(path: &Path) -> String {
path.components()
.map(|component| component.as_os_str().to_string_lossy())
.collect::<Vec<_>>()
.join("/")
}
fn require_complete_object_payload_manifest(
snapshot: &RuntimeSnapshot,
restored_versions: &BTreeSet<(String, String, String)>,
) -> Result<(), RuntimeError> {
for (bucket_name, bucket) in &snapshot.buckets {
for (key, object) in &bucket.objects {
for version in &object.versions {
if version.delete_marker {
continue;
}
let identity = (bucket_name.clone(), key.clone(), version.version_id.clone());
if !restored_versions.contains(&identity) {
return Err(RuntimeError::SnapshotIo(format!(
"filesystem object payload missing from manifest for {}/{}/{}",
bucket_name, key, version.version_id
)));
}
}
}
}
Ok(())
}
fn write_atomic(path: &Path, bytes: &[u8]) -> Result<(), RuntimeError> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(snapshot_io)?;
}
let tmp_path = temporary_store_path(path);
let mut tmp_file = File::options()
.write(true)
.create_new(true)
.open(&tmp_path)
.map_err(snapshot_io)?;
tmp_file.write_all(bytes).map_err(snapshot_io)?;
tmp_file.sync_all().map_err(snapshot_io)?;
drop(tmp_file);
if path.exists() {
std::fs::remove_file(path).map_err(snapshot_io)?;
}
if let Err(error) = std::fs::rename(&tmp_path, path) {
let _ = std::fs::remove_file(&tmp_path);
return Err(snapshot_io(error));
}
sync_file_best_effort(path);
if let Some(parent) = path.parent() {
sync_directory_best_effort(parent);
}
Ok(())
}
fn temporary_store_path(path: &Path) -> PathBuf {
let nonce = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or_default();
path.with_extension(format!("tmp-{}-{nonce}", std::process::id()))
}
fn sync_directory_best_effort(path: &Path) {
if let Ok(directory) = File::open(path) {
let _ = directory.sync_all();
}
}
fn sync_file_best_effort(path: &Path) {
if let Ok(file) = File::open(path) {
let _ = file.sync_all();
}
}
fn snapshot_io(error: std::io::Error) -> RuntimeError {
RuntimeError::SnapshotIo(error.to_string())
}