Skip to main content

bucketwarden_server/
filesystem_store.rs

1use super::*;
2use std::collections::BTreeSet;
3use std::fs::File;
4use std::io::Write;
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8const FILESYSTEM_STORE_SCHEMA_VERSION: u32 = 1;
9const MANIFEST_FILE: &str = "manifest.json";
10const OBJECTS_DIR: &str = "objects";
11
12#[derive(Clone, Debug, Deserialize, Serialize)]
13pub struct FilesystemStoreManifest {
14    pub schema_version: u32,
15    pub snapshot: RuntimeSnapshot,
16    pub objects: Vec<FilesystemObjectRecord>,
17}
18
19#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
20pub struct FilesystemObjectRecord {
21    pub bucket: String,
22    pub key: String,
23    pub version_id: String,
24    pub relative_path: String,
25    pub ciphertext_sha256: String,
26    pub ciphertext_len: usize,
27}
28
29impl BucketWarden {
30    pub fn save_filesystem_store(&self, root: impl AsRef<Path>) -> Result<(), RuntimeError> {
31        let root = root.as_ref();
32        std::fs::create_dir_all(root).map_err(snapshot_io)?;
33        let objects_root = root.join(OBJECTS_DIR);
34        std::fs::create_dir_all(&objects_root).map_err(snapshot_io)?;
35
36        let mut snapshot = self.snapshot();
37        let mut object_records = Vec::new();
38        for (bucket_name, bucket) in snapshot.buckets.iter_mut() {
39            for (key, object) in bucket.objects.iter_mut() {
40                for version in &mut object.versions {
41                    if version.delete_marker {
42                        continue;
43                    }
44                    let relative_path =
45                        object_version_relative_path(bucket_name, key, &version.version_id);
46                    let destination = root.join(&relative_path);
47                    write_atomic(&destination, &version.ciphertext.bytes)?;
48                    object_records.push(FilesystemObjectRecord {
49                        bucket: bucket_name.clone(),
50                        key: key.clone(),
51                        version_id: version.version_id.clone(),
52                        relative_path: path_to_manifest_string(&relative_path),
53                        ciphertext_sha256: sha256_hex(&version.ciphertext.bytes),
54                        ciphertext_len: version.ciphertext.bytes.len(),
55                    });
56                    version.ciphertext.bytes.clear();
57                }
58            }
59        }
60
61        let manifest = FilesystemStoreManifest {
62            schema_version: FILESYSTEM_STORE_SCHEMA_VERSION,
63            snapshot,
64            objects: object_records,
65        };
66        let manifest_json =
67            serde_json::to_vec_pretty(&manifest).map_err(RuntimeError::SnapshotSerialize)?;
68        write_atomic(&root.join(MANIFEST_FILE), &manifest_json)
69    }
70
71    pub fn restore_filesystem_store(
72        config: RuntimeConfig,
73        root: impl AsRef<Path>,
74    ) -> Result<Self, RuntimeError> {
75        let root = root.as_ref();
76        let manifest_json = std::fs::read_to_string(root.join(MANIFEST_FILE))
77            .map_err(|error| RuntimeError::SnapshotIo(error.to_string()))?;
78        let mut manifest: FilesystemStoreManifest =
79            serde_json::from_str(&manifest_json).map_err(RuntimeError::SnapshotDeserialize)?;
80        if manifest.schema_version != FILESYSTEM_STORE_SCHEMA_VERSION {
81            return Err(RuntimeError::SnapshotIo(format!(
82                "unsupported filesystem store schema version: {}",
83                manifest.schema_version
84            )));
85        }
86
87        let mut restored_versions = BTreeSet::new();
88        for record in &manifest.objects {
89            let body_path = checked_store_path(root, &record.relative_path)?;
90            let body = std::fs::read(&body_path).map_err(snapshot_io)?;
91            if body.len() != record.ciphertext_len {
92                return Err(RuntimeError::SnapshotIo(format!(
93                    "filesystem object length mismatch for {}/{}/{}",
94                    record.bucket, record.key, record.version_id
95                )));
96            }
97            let actual_sha256 = sha256_hex(&body);
98            if actual_sha256 != record.ciphertext_sha256 {
99                return Err(RuntimeError::SnapshotIo(format!(
100                    "filesystem object checksum mismatch for {}/{}/{}",
101                    record.bucket, record.key, record.version_id
102                )));
103            }
104            let Some(version) = manifest
105                .snapshot
106                .buckets
107                .get_mut(&record.bucket)
108                .and_then(|bucket| bucket.objects.get_mut(&record.key))
109                .and_then(|object| {
110                    object
111                        .versions
112                        .iter_mut()
113                        .find(|version| version.version_id == record.version_id)
114                })
115            else {
116                return Err(RuntimeError::SnapshotIo(format!(
117                    "filesystem object record has no manifest version for {}/{}/{}",
118                    record.bucket, record.key, record.version_id
119                )));
120            };
121            if version.delete_marker {
122                return Err(RuntimeError::SnapshotIo(format!(
123                    "filesystem object record points at delete marker {}/{}/{}",
124                    record.bucket, record.key, record.version_id
125                )));
126            }
127            if !restored_versions.insert((
128                record.bucket.clone(),
129                record.key.clone(),
130                record.version_id.clone(),
131            )) {
132                return Err(RuntimeError::SnapshotIo(format!(
133                    "filesystem object record is duplicated for {}/{}/{}",
134                    record.bucket, record.key, record.version_id
135                )));
136            }
137            version.ciphertext.bytes = body;
138        }
139        require_complete_object_payload_manifest(&manifest.snapshot, &restored_versions)?;
140
141        Self::restore(config, manifest.snapshot)
142    }
143}
144
145fn object_version_relative_path(bucket: &str, key: &str, version_id: &str) -> PathBuf {
146    PathBuf::from(OBJECTS_DIR)
147        .join(hex_component(bucket))
148        .join(hex_component(key))
149        .join(format!("{}.bin", hex_component(version_id)))
150}
151
152fn checked_store_path(root: &Path, relative_path: &str) -> Result<PathBuf, RuntimeError> {
153    let relative = Path::new(relative_path);
154    if relative.is_absolute()
155        || relative.components().any(|component| {
156            matches!(
157                component,
158                std::path::Component::ParentDir | std::path::Component::Prefix(_)
159            )
160        })
161    {
162        return Err(RuntimeError::SnapshotIo(format!(
163            "invalid filesystem store relative path: {relative_path}"
164        )));
165    }
166    Ok(root.join(relative))
167}
168
169fn hex_component(value: &str) -> String {
170    let mut encoded = String::with_capacity(value.len() * 2);
171    for byte in value.as_bytes() {
172        encoded.push_str(&format!("{byte:02x}"));
173    }
174    encoded
175}
176
177fn path_to_manifest_string(path: &Path) -> String {
178    path.components()
179        .map(|component| component.as_os_str().to_string_lossy())
180        .collect::<Vec<_>>()
181        .join("/")
182}
183
184fn require_complete_object_payload_manifest(
185    snapshot: &RuntimeSnapshot,
186    restored_versions: &BTreeSet<(String, String, String)>,
187) -> Result<(), RuntimeError> {
188    for (bucket_name, bucket) in &snapshot.buckets {
189        for (key, object) in &bucket.objects {
190            for version in &object.versions {
191                if version.delete_marker {
192                    continue;
193                }
194                let identity = (bucket_name.clone(), key.clone(), version.version_id.clone());
195                if !restored_versions.contains(&identity) {
196                    return Err(RuntimeError::SnapshotIo(format!(
197                        "filesystem object payload missing from manifest for {}/{}/{}",
198                        bucket_name, key, version.version_id
199                    )));
200                }
201            }
202        }
203    }
204    Ok(())
205}
206
207fn write_atomic(path: &Path, bytes: &[u8]) -> Result<(), RuntimeError> {
208    if let Some(parent) = path.parent() {
209        std::fs::create_dir_all(parent).map_err(snapshot_io)?;
210    }
211    let tmp_path = temporary_store_path(path);
212    let mut tmp_file = File::options()
213        .write(true)
214        .create_new(true)
215        .open(&tmp_path)
216        .map_err(snapshot_io)?;
217    tmp_file.write_all(bytes).map_err(snapshot_io)?;
218    tmp_file.sync_all().map_err(snapshot_io)?;
219    drop(tmp_file);
220
221    if path.exists() {
222        std::fs::remove_file(path).map_err(snapshot_io)?;
223    }
224    if let Err(error) = std::fs::rename(&tmp_path, path) {
225        let _ = std::fs::remove_file(&tmp_path);
226        return Err(snapshot_io(error));
227    }
228
229    sync_file_best_effort(path);
230    if let Some(parent) = path.parent() {
231        sync_directory_best_effort(parent);
232    }
233    Ok(())
234}
235
236fn temporary_store_path(path: &Path) -> PathBuf {
237    let nonce = SystemTime::now()
238        .duration_since(UNIX_EPOCH)
239        .map(|duration| duration.as_nanos())
240        .unwrap_or_default();
241    path.with_extension(format!("tmp-{}-{nonce}", std::process::id()))
242}
243
244fn sync_directory_best_effort(path: &Path) {
245    if let Ok(directory) = File::open(path) {
246        let _ = directory.sync_all();
247    }
248}
249
250fn sync_file_best_effort(path: &Path) {
251    if let Ok(file) = File::open(path) {
252        let _ = file.sync_all();
253    }
254}
255
256fn snapshot_io(error: std::io::Error) -> RuntimeError {
257    RuntimeError::SnapshotIo(error.to_string())
258}