1use std::{
4 collections::HashMap,
5 path::{Component, Path, PathBuf},
6};
7
8use serde::{Deserialize, Serialize};
9use thiserror::Error;
10use tokio::io::AsyncWriteExt as _;
11
12use crate::{
13 provider::RustackS3,
14 state::{
15 bucket::{
16 BucketEncryption, CorsRuleConfig, ObjectLockConfiguration, OwnershipControlsConfig,
17 PublicAccessBlockConfig, S3Bucket, VersioningStatus, WebsiteConfig,
18 },
19 multipart::MultipartUpload,
20 object::{ObjectVersion, Owner, S3DeleteMarker, S3Object},
21 },
22};
23
24#[derive(Debug, Error)]
26pub enum S3SnapshotError {
27 #[error("bucket disappeared during snapshot export: {bucket}")]
29 BucketDisappeared {
30 bucket: String,
32 },
33 #[error("failed to read S3 object data for {bucket}/{key}@{version_id}: {source}")]
35 ReadObject {
36 bucket: String,
38 key: String,
40 version_id: String,
42 #[source]
44 source: Box<crate::error::S3ServiceError>,
45 },
46 #[error("failed to read S3 multipart data for {bucket}/{upload_id}/{part_number}: {source}")]
48 ReadPart {
49 bucket: String,
51 upload_id: String,
53 part_number: u32,
55 #[source]
57 source: Box<crate::error::S3ServiceError>,
58 },
59 #[error("failed to restore S3 object data for {bucket}/{key}@{version_id}: {source}")]
61 WriteObject {
62 bucket: String,
64 key: String,
66 version_id: String,
68 #[source]
70 source: Box<crate::error::S3ServiceError>,
71 },
72 #[error("failed to restore S3 multipart data for {bucket}/{upload_id}/{part_number}: {source}")]
74 WritePart {
75 bucket: String,
77 upload_id: String,
79 part_number: u32,
81 #[source]
83 source: Box<crate::error::S3ServiceError>,
84 },
85 #[error("S3 snapshot I/O failed at {path}: {source}")]
87 Io {
88 path: String,
90 #[source]
92 source: std::io::Error,
93 },
94 #[error("invalid S3 snapshot data path: {path}")]
96 InvalidDataPath {
97 path: String,
99 },
100}
101
102#[derive(Debug, Clone, Default, Serialize, Deserialize)]
104#[serde(rename_all = "camelCase")]
105pub struct S3Snapshot {
106 pub buckets: Vec<S3BucketSnapshot>,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112#[serde(rename_all = "camelCase")]
113pub struct S3BucketSnapshot {
114 pub name: String,
116 pub region: String,
118 pub creation_date: chrono::DateTime<chrono::Utc>,
120 pub owner: Owner,
122 pub object_store_versioned: bool,
124 pub object_versions: Vec<S3ObjectVersionSnapshot>,
126 pub multipart_uploads: Vec<S3MultipartUploadSnapshot>,
128 pub versioning: VersioningStatus,
130 pub encryption: Option<BucketEncryption>,
132 pub cors_rules: Option<Vec<CorsRuleConfig>>,
134 pub lifecycle: Option<rustack_s3_model::types::BucketLifecycleConfiguration>,
136 pub policy: Option<String>,
138 pub tags: Vec<(String, String)>,
140 pub acl: crate::state::object::CannedAcl,
142 pub notification_configuration: Option<rustack_s3_model::types::NotificationConfiguration>,
144 pub logging: Option<serde_json::Value>,
146 pub public_access_block: Option<PublicAccessBlockConfig>,
148 pub ownership_controls: Option<OwnershipControlsConfig>,
150 pub object_lock_enabled: bool,
152 pub object_lock_configuration: Option<ObjectLockConfiguration>,
154 pub accelerate: Option<String>,
156 pub request_payment: String,
158 pub website: Option<WebsiteConfig>,
160 pub replication: Option<serde_json::Value>,
162 pub analytics: Option<serde_json::Value>,
164 pub metrics: Option<serde_json::Value>,
166 pub inventory: Option<serde_json::Value>,
168 pub intelligent_tiering: Option<serde_json::Value>,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174#[serde(rename_all = "camelCase", tag = "type")]
175pub enum S3ObjectVersionSnapshot {
176 Object {
178 object: Box<S3Object>,
180 body_file: String,
182 },
183 DeleteMarker(S3DeleteMarker),
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189#[serde(rename_all = "camelCase")]
190pub struct S3MultipartUploadSnapshot {
191 pub upload: MultipartUpload,
193 pub part_body_files: HashMap<u32, String>,
195}
196
197impl RustackS3 {
198 pub async fn export_snapshot(&self, data_dir: &Path) -> Result<S3Snapshot, S3SnapshotError> {
204 create_dir_all(data_dir).await?;
205 let mut buckets = Vec::new();
206 let mut body_index = 0usize;
207
208 for bucket_name in self.state.snapshot_bucket_names() {
209 let bucket = self.state.get_bucket(&bucket_name).map_err(|_| {
210 S3SnapshotError::BucketDisappeared {
211 bucket: bucket_name.clone(),
212 }
213 })?;
214 let (object_store_versioned, versions) = bucket.objects.read().snapshot_versions();
215 let mut object_versions = Vec::with_capacity(versions.len());
216
217 for version in versions {
218 match version {
219 ObjectVersion::Object(object) => {
220 let body_file = format!("objects/{body_index}.bin");
221 body_index = body_index.saturating_add(1);
222 let data = self
223 .storage
224 .read_object(&bucket.name, &object.key, &object.version_id, None)
225 .await
226 .map_err(|source| S3SnapshotError::ReadObject {
227 bucket: bucket.name.clone(),
228 key: object.key.clone(),
229 version_id: object.version_id.clone(),
230 source: Box::new(source),
231 })?;
232 write_data_file(data_dir, &body_file, &data).await?;
233 object_versions.push(S3ObjectVersionSnapshot::Object { object, body_file });
234 }
235 ObjectVersion::DeleteMarker(marker) => {
236 object_versions.push(S3ObjectVersionSnapshot::DeleteMarker(marker));
237 }
238 }
239 }
240
241 let mut multipart_uploads = Vec::new();
242 for entry in &bucket.multipart_uploads {
243 let upload = entry.value().clone();
244 let mut part_body_files = HashMap::new();
245 for part_number in upload.parts.keys() {
246 let body_file = format!("parts/{body_index}.bin");
247 body_index = body_index.saturating_add(1);
248 let data = self
249 .storage
250 .read_part(&bucket.name, &upload.upload_id, *part_number)
251 .await
252 .map_err(|source| S3SnapshotError::ReadPart {
253 bucket: bucket.name.clone(),
254 upload_id: upload.upload_id.clone(),
255 part_number: *part_number,
256 source: Box::new(source),
257 })?;
258 write_data_file(data_dir, &body_file, &data).await?;
259 part_body_files.insert(*part_number, body_file);
260 }
261 multipart_uploads.push(S3MultipartUploadSnapshot {
262 upload,
263 part_body_files,
264 });
265 }
266
267 buckets.push(S3BucketSnapshot {
268 name: bucket.name.clone(),
269 region: bucket.region.clone(),
270 creation_date: bucket.creation_date,
271 owner: bucket.owner.clone(),
272 object_store_versioned,
273 object_versions,
274 multipart_uploads,
275 versioning: *bucket.versioning.read(),
276 encryption: bucket.encryption.read().clone(),
277 cors_rules: bucket.cors_rules.read().clone(),
278 lifecycle: bucket.lifecycle.read().clone(),
279 policy: bucket.policy.read().clone(),
280 tags: bucket.tags.read().clone(),
281 acl: *bucket.acl.read(),
282 notification_configuration: bucket.notification_configuration.read().clone(),
283 logging: bucket.logging.read().clone(),
284 public_access_block: bucket.public_access_block.read().clone(),
285 ownership_controls: bucket.ownership_controls.read().clone(),
286 object_lock_enabled: *bucket.object_lock_enabled.read(),
287 object_lock_configuration: bucket.object_lock_configuration.read().clone(),
288 accelerate: bucket.accelerate.read().clone(),
289 request_payment: bucket.request_payment.read().clone(),
290 website: bucket.website.read().clone(),
291 replication: bucket.replication.read().clone(),
292 analytics: bucket.analytics.read().clone(),
293 metrics: bucket.metrics.read().clone(),
294 inventory: bucket.inventory.read().clone(),
295 intelligent_tiering: bucket.intelligent_tiering.read().clone(),
296 });
297 }
298
299 Ok(S3Snapshot { buckets })
300 }
301
302 pub async fn import_snapshot(
308 &self,
309 snapshot: S3Snapshot,
310 data_dir: &Path,
311 ) -> Result<(), S3SnapshotError> {
312 self.reset();
313
314 for bucket_snapshot in snapshot.buckets {
315 let bucket = build_bucket_from_snapshot(&bucket_snapshot);
316 let mut versions = Vec::with_capacity(bucket_snapshot.object_versions.len());
317
318 for version in bucket_snapshot.object_versions {
319 match version {
320 S3ObjectVersionSnapshot::Object { object, body_file } => {
321 let data = read_data_file(data_dir, &body_file).await?;
322 self.storage
323 .write_object(&bucket.name, &object.key, &object.version_id, data)
324 .await
325 .map_err(|source| S3SnapshotError::WriteObject {
326 bucket: bucket.name.clone(),
327 key: object.key.clone(),
328 version_id: object.version_id.clone(),
329 source: Box::new(source),
330 })?;
331 versions.push(ObjectVersion::Object(object));
332 }
333 S3ObjectVersionSnapshot::DeleteMarker(marker) => {
334 versions.push(ObjectVersion::DeleteMarker(marker));
335 }
336 }
337 }
338
339 bucket
340 .objects
341 .write()
342 .replace_from_snapshot(bucket_snapshot.object_store_versioned, versions);
343
344 for multipart in bucket_snapshot.multipart_uploads {
345 for (part_number, body_file) in &multipart.part_body_files {
346 let data = read_data_file(data_dir, body_file).await?;
347 self.storage
348 .write_part(
349 &bucket.name,
350 &multipart.upload.upload_id,
351 *part_number,
352 data,
353 )
354 .await
355 .map_err(|source| S3SnapshotError::WritePart {
356 bucket: bucket.name.clone(),
357 upload_id: multipart.upload.upload_id.clone(),
358 part_number: *part_number,
359 source: Box::new(source),
360 })?;
361 }
362 bucket
363 .multipart_uploads
364 .insert(multipart.upload.upload_id.clone(), multipart.upload);
365 }
366
367 self.state.insert_snapshot_bucket(bucket);
368 }
369
370 Ok(())
371 }
372}
373
374fn build_bucket_from_snapshot(snapshot: &S3BucketSnapshot) -> S3Bucket {
375 let bucket = S3Bucket::new(
376 snapshot.name.clone(),
377 snapshot.region.clone(),
378 snapshot.owner.clone(),
379 );
380 let mut bucket = bucket;
381 bucket.creation_date = snapshot.creation_date;
382 *bucket.versioning.write() = snapshot.versioning;
383 (*bucket.encryption.write()).clone_from(&snapshot.encryption);
384 (*bucket.cors_rules.write()).clone_from(&snapshot.cors_rules);
385 (*bucket.lifecycle.write()).clone_from(&snapshot.lifecycle);
386 (*bucket.policy.write()).clone_from(&snapshot.policy);
387 (*bucket.tags.write()).clone_from(&snapshot.tags);
388 *bucket.acl.write() = snapshot.acl;
389 (*bucket.notification_configuration.write()).clone_from(&snapshot.notification_configuration);
390 (*bucket.logging.write()).clone_from(&snapshot.logging);
391 (*bucket.public_access_block.write()).clone_from(&snapshot.public_access_block);
392 (*bucket.ownership_controls.write()).clone_from(&snapshot.ownership_controls);
393 *bucket.object_lock_enabled.write() = snapshot.object_lock_enabled;
394 (*bucket.object_lock_configuration.write()).clone_from(&snapshot.object_lock_configuration);
395 (*bucket.accelerate.write()).clone_from(&snapshot.accelerate);
396 (*bucket.request_payment.write()).clone_from(&snapshot.request_payment);
397 (*bucket.website.write()).clone_from(&snapshot.website);
398 (*bucket.replication.write()).clone_from(&snapshot.replication);
399 (*bucket.analytics.write()).clone_from(&snapshot.analytics);
400 (*bucket.metrics.write()).clone_from(&snapshot.metrics);
401 (*bucket.inventory.write()).clone_from(&snapshot.inventory);
402 (*bucket.intelligent_tiering.write()).clone_from(&snapshot.intelligent_tiering);
403 bucket
404}
405
406async fn create_dir_all(path: &Path) -> Result<(), S3SnapshotError> {
407 tokio::fs::create_dir_all(path)
408 .await
409 .map_err(|source| S3SnapshotError::Io {
410 path: path.display().to_string(),
411 source,
412 })
413}
414
415async fn write_data_file(root: &Path, relative: &str, data: &[u8]) -> Result<(), S3SnapshotError> {
416 let path = data_file_path(root, relative)?;
417 if let Some(parent) = path.parent() {
418 create_dir_all(parent).await?;
419 }
420 let mut file = tokio::fs::File::create(&path)
421 .await
422 .map_err(|source| S3SnapshotError::Io {
423 path: path.display().to_string(),
424 source,
425 })?;
426 file.write_all(data)
427 .await
428 .map_err(|source| S3SnapshotError::Io {
429 path: path.display().to_string(),
430 source,
431 })
432}
433
434async fn read_data_file(root: &Path, relative: &str) -> Result<bytes::Bytes, S3SnapshotError> {
435 let path = data_file_path(root, relative)?;
436 tokio::fs::read(&path)
437 .await
438 .map(bytes::Bytes::from)
439 .map_err(|source| S3SnapshotError::Io {
440 path: path.display().to_string(),
441 source,
442 })
443}
444
445fn data_file_path(root: &Path, relative: &str) -> Result<PathBuf, S3SnapshotError> {
446 let path = Path::new(relative);
447 if path.is_absolute()
448 || path
449 .components()
450 .any(|component| !matches!(component, Component::Normal(_)))
451 {
452 return Err(S3SnapshotError::InvalidDataPath {
453 path: relative.to_owned(),
454 });
455 }
456 Ok(root.join(path))
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462
463 #[test]
464 fn test_should_reject_data_file_path_traversal() {
465 let path = data_file_path(Path::new("/tmp/s3"), "../outside.bin");
466 assert!(path.is_err());
467 }
468
469 #[test]
470 fn test_should_accept_data_file_child_path() {
471 let path = data_file_path(Path::new("/tmp/s3"), "objects/body.bin");
472 assert!(path.is_ok());
473 }
474}