bucketwarden_server/
filesystem_store.rs1use super::*;
2use std::collections::BTreeSet;
3use std::fs::File;
4use std::io::Write;
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8const FILESYSTEM_STORE_SCHEMA_VERSION: u32 = 1;
9const MANIFEST_FILE: &str = "manifest.json";
10const OBJECTS_DIR: &str = "objects";
11
12#[derive(Clone, Debug, Deserialize, Serialize)]
13pub struct FilesystemStoreManifest {
14 pub schema_version: u32,
15 pub snapshot: RuntimeSnapshot,
16 pub objects: Vec<FilesystemObjectRecord>,
17}
18
19#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
20pub struct FilesystemObjectRecord {
21 pub bucket: String,
22 pub key: String,
23 pub version_id: String,
24 pub relative_path: String,
25 pub ciphertext_sha256: String,
26 pub ciphertext_len: usize,
27}
28
29impl BucketWarden {
30 pub fn save_filesystem_store(&self, root: impl AsRef<Path>) -> Result<(), RuntimeError> {
31 let root = root.as_ref();
32 std::fs::create_dir_all(root).map_err(snapshot_io)?;
33 let objects_root = root.join(OBJECTS_DIR);
34 std::fs::create_dir_all(&objects_root).map_err(snapshot_io)?;
35
36 let mut snapshot = self.snapshot();
37 let mut object_records = Vec::new();
38 for (bucket_name, bucket) in snapshot.buckets.iter_mut() {
39 for (key, object) in bucket.objects.iter_mut() {
40 for version in &mut object.versions {
41 if version.delete_marker {
42 continue;
43 }
44 let relative_path =
45 object_version_relative_path(bucket_name, key, &version.version_id);
46 let destination = root.join(&relative_path);
47 write_atomic(&destination, &version.ciphertext.bytes)?;
48 object_records.push(FilesystemObjectRecord {
49 bucket: bucket_name.clone(),
50 key: key.clone(),
51 version_id: version.version_id.clone(),
52 relative_path: path_to_manifest_string(&relative_path),
53 ciphertext_sha256: sha256_hex(&version.ciphertext.bytes),
54 ciphertext_len: version.ciphertext.bytes.len(),
55 });
56 version.ciphertext.bytes.clear();
57 }
58 }
59 }
60
61 let manifest = FilesystemStoreManifest {
62 schema_version: FILESYSTEM_STORE_SCHEMA_VERSION,
63 snapshot,
64 objects: object_records,
65 };
66 let manifest_json =
67 serde_json::to_vec_pretty(&manifest).map_err(RuntimeError::SnapshotSerialize)?;
68 write_atomic(&root.join(MANIFEST_FILE), &manifest_json)
69 }
70
71 pub fn restore_filesystem_store(
72 config: RuntimeConfig,
73 root: impl AsRef<Path>,
74 ) -> Result<Self, RuntimeError> {
75 let root = root.as_ref();
76 let manifest_json = std::fs::read_to_string(root.join(MANIFEST_FILE))
77 .map_err(|error| RuntimeError::SnapshotIo(error.to_string()))?;
78 let mut manifest: FilesystemStoreManifest =
79 serde_json::from_str(&manifest_json).map_err(RuntimeError::SnapshotDeserialize)?;
80 if manifest.schema_version != FILESYSTEM_STORE_SCHEMA_VERSION {
81 return Err(RuntimeError::SnapshotIo(format!(
82 "unsupported filesystem store schema version: {}",
83 manifest.schema_version
84 )));
85 }
86
87 let mut restored_versions = BTreeSet::new();
88 for record in &manifest.objects {
89 let body_path = checked_store_path(root, &record.relative_path)?;
90 let body = std::fs::read(&body_path).map_err(snapshot_io)?;
91 if body.len() != record.ciphertext_len {
92 return Err(RuntimeError::SnapshotIo(format!(
93 "filesystem object length mismatch for {}/{}/{}",
94 record.bucket, record.key, record.version_id
95 )));
96 }
97 let actual_sha256 = sha256_hex(&body);
98 if actual_sha256 != record.ciphertext_sha256 {
99 return Err(RuntimeError::SnapshotIo(format!(
100 "filesystem object checksum mismatch for {}/{}/{}",
101 record.bucket, record.key, record.version_id
102 )));
103 }
104 let Some(version) = manifest
105 .snapshot
106 .buckets
107 .get_mut(&record.bucket)
108 .and_then(|bucket| bucket.objects.get_mut(&record.key))
109 .and_then(|object| {
110 object
111 .versions
112 .iter_mut()
113 .find(|version| version.version_id == record.version_id)
114 })
115 else {
116 return Err(RuntimeError::SnapshotIo(format!(
117 "filesystem object record has no manifest version for {}/{}/{}",
118 record.bucket, record.key, record.version_id
119 )));
120 };
121 if version.delete_marker {
122 return Err(RuntimeError::SnapshotIo(format!(
123 "filesystem object record points at delete marker {}/{}/{}",
124 record.bucket, record.key, record.version_id
125 )));
126 }
127 if !restored_versions.insert((
128 record.bucket.clone(),
129 record.key.clone(),
130 record.version_id.clone(),
131 )) {
132 return Err(RuntimeError::SnapshotIo(format!(
133 "filesystem object record is duplicated for {}/{}/{}",
134 record.bucket, record.key, record.version_id
135 )));
136 }
137 version.ciphertext.bytes = body;
138 }
139 require_complete_object_payload_manifest(&manifest.snapshot, &restored_versions)?;
140
141 Self::restore(config, manifest.snapshot)
142 }
143}
144
145fn object_version_relative_path(bucket: &str, key: &str, version_id: &str) -> PathBuf {
146 PathBuf::from(OBJECTS_DIR)
147 .join(hex_component(bucket))
148 .join(hex_component(key))
149 .join(format!("{}.bin", hex_component(version_id)))
150}
151
152fn checked_store_path(root: &Path, relative_path: &str) -> Result<PathBuf, RuntimeError> {
153 let relative = Path::new(relative_path);
154 if relative.is_absolute()
155 || relative.components().any(|component| {
156 matches!(
157 component,
158 std::path::Component::ParentDir | std::path::Component::Prefix(_)
159 )
160 })
161 {
162 return Err(RuntimeError::SnapshotIo(format!(
163 "invalid filesystem store relative path: {relative_path}"
164 )));
165 }
166 Ok(root.join(relative))
167}
168
169fn hex_component(value: &str) -> String {
170 let mut encoded = String::with_capacity(value.len() * 2);
171 for byte in value.as_bytes() {
172 encoded.push_str(&format!("{byte:02x}"));
173 }
174 encoded
175}
176
177fn path_to_manifest_string(path: &Path) -> String {
178 path.components()
179 .map(|component| component.as_os_str().to_string_lossy())
180 .collect::<Vec<_>>()
181 .join("/")
182}
183
184fn require_complete_object_payload_manifest(
185 snapshot: &RuntimeSnapshot,
186 restored_versions: &BTreeSet<(String, String, String)>,
187) -> Result<(), RuntimeError> {
188 for (bucket_name, bucket) in &snapshot.buckets {
189 for (key, object) in &bucket.objects {
190 for version in &object.versions {
191 if version.delete_marker {
192 continue;
193 }
194 let identity = (bucket_name.clone(), key.clone(), version.version_id.clone());
195 if !restored_versions.contains(&identity) {
196 return Err(RuntimeError::SnapshotIo(format!(
197 "filesystem object payload missing from manifest for {}/{}/{}",
198 bucket_name, key, version.version_id
199 )));
200 }
201 }
202 }
203 }
204 Ok(())
205}
206
207fn write_atomic(path: &Path, bytes: &[u8]) -> Result<(), RuntimeError> {
208 if let Some(parent) = path.parent() {
209 std::fs::create_dir_all(parent).map_err(snapshot_io)?;
210 }
211 let tmp_path = temporary_store_path(path);
212 let mut tmp_file = File::options()
213 .write(true)
214 .create_new(true)
215 .open(&tmp_path)
216 .map_err(snapshot_io)?;
217 tmp_file.write_all(bytes).map_err(snapshot_io)?;
218 tmp_file.sync_all().map_err(snapshot_io)?;
219 drop(tmp_file);
220
221 if path.exists() {
222 std::fs::remove_file(path).map_err(snapshot_io)?;
223 }
224 if let Err(error) = std::fs::rename(&tmp_path, path) {
225 let _ = std::fs::remove_file(&tmp_path);
226 return Err(snapshot_io(error));
227 }
228
229 sync_file_best_effort(path);
230 if let Some(parent) = path.parent() {
231 sync_directory_best_effort(parent);
232 }
233 Ok(())
234}
235
236fn temporary_store_path(path: &Path) -> PathBuf {
237 let nonce = SystemTime::now()
238 .duration_since(UNIX_EPOCH)
239 .map(|duration| duration.as_nanos())
240 .unwrap_or_default();
241 path.with_extension(format!("tmp-{}-{nonce}", std::process::id()))
242}
243
244fn sync_directory_best_effort(path: &Path) {
245 if let Ok(directory) = File::open(path) {
246 let _ = directory.sync_all();
247 }
248}
249
250fn sync_file_best_effort(path: &Path) {
251 if let Ok(file) = File::open(path) {
252 let _ = file.sync_all();
253 }
254}
255
256fn snapshot_io(error: std::io::Error) -> RuntimeError {
257 RuntimeError::SnapshotIo(error.to_string())
258}