Skip to main content

rustack_s3_core/
snapshot.rs

1//! Snapshot support for S3 buckets, metadata, multipart uploads, and object bodies.
2
3use std::{
4    collections::HashMap,
5    path::{Component, Path, PathBuf},
6};
7
8use serde::{Deserialize, Serialize};
9use thiserror::Error;
10use tokio::io::AsyncWriteExt as _;
11
12use crate::{
13    provider::RustackS3,
14    state::{
15        bucket::{
16            BucketEncryption, CorsRuleConfig, ObjectLockConfiguration, OwnershipControlsConfig,
17            PublicAccessBlockConfig, S3Bucket, VersioningStatus, WebsiteConfig,
18        },
19        multipart::MultipartUpload,
20        object::{ObjectVersion, Owner, S3DeleteMarker, S3Object},
21    },
22};
23
24/// Errors raised while exporting or importing S3 snapshots.
25#[derive(Debug, Error)]
26pub enum S3SnapshotError {
27    /// Bucket referenced in the bucket list disappeared during export.
28    #[error("bucket disappeared during snapshot export: {bucket}")]
29    BucketDisappeared {
30        /// Bucket name.
31        bucket: String,
32    },
33    /// Object data could not be read through the storage backend.
34    #[error("failed to read S3 object data for {bucket}/{key}@{version_id}: {source}")]
35    ReadObject {
36        /// Bucket name.
37        bucket: String,
38        /// Object key.
39        key: String,
40        /// Version identifier.
41        version_id: String,
42        /// Source error.
43        #[source]
44        source: Box<crate::error::S3ServiceError>,
45    },
46    /// Multipart part data could not be read through the storage backend.
47    #[error("failed to read S3 multipart data for {bucket}/{upload_id}/{part_number}: {source}")]
48    ReadPart {
49        /// Bucket name.
50        bucket: String,
51        /// Multipart upload identifier.
52        upload_id: String,
53        /// Part number.
54        part_number: u32,
55        /// Source error.
56        #[source]
57        source: Box<crate::error::S3ServiceError>,
58    },
59    /// Object data could not be written through the storage backend.
60    #[error("failed to restore S3 object data for {bucket}/{key}@{version_id}: {source}")]
61    WriteObject {
62        /// Bucket name.
63        bucket: String,
64        /// Object key.
65        key: String,
66        /// Version identifier.
67        version_id: String,
68        /// Source error.
69        #[source]
70        source: Box<crate::error::S3ServiceError>,
71    },
72    /// Multipart part data could not be written through the storage backend.
73    #[error("failed to restore S3 multipart data for {bucket}/{upload_id}/{part_number}: {source}")]
74    WritePart {
75        /// Bucket name.
76        bucket: String,
77        /// Multipart upload identifier.
78        upload_id: String,
79        /// Part number.
80        part_number: u32,
81        /// Source error.
82        #[source]
83        source: Box<crate::error::S3ServiceError>,
84    },
85    /// File-system I/O failed.
86    #[error("S3 snapshot I/O failed at {path}: {source}")]
87    Io {
88        /// Path being accessed.
89        path: String,
90        /// Source error.
91        #[source]
92        source: std::io::Error,
93    },
94    /// Snapshot metadata referenced a data path outside its data directory.
95    #[error("invalid S3 snapshot data path: {path}")]
96    InvalidDataPath {
97        /// Invalid relative path.
98        path: String,
99    },
100}
101
102/// Serializable S3 service snapshot.
103#[derive(Debug, Clone, Default, Serialize, Deserialize)]
104#[serde(rename_all = "camelCase")]
105pub struct S3Snapshot {
106    /// Bucket snapshots.
107    pub buckets: Vec<S3BucketSnapshot>,
108}
109
110/// Serializable S3 bucket snapshot.
111#[derive(Debug, Clone, Serialize, Deserialize)]
112#[serde(rename_all = "camelCase")]
113pub struct S3BucketSnapshot {
114    /// Bucket name.
115    pub name: String,
116    /// Bucket region.
117    pub region: String,
118    /// Creation date.
119    pub creation_date: chrono::DateTime<chrono::Utc>,
120    /// Bucket owner.
121    pub owner: Owner,
122    /// Whether the object store is versioned internally.
123    pub object_store_versioned: bool,
124    /// Object versions and delete markers.
125    pub object_versions: Vec<S3ObjectVersionSnapshot>,
126    /// In-progress multipart uploads.
127    pub multipart_uploads: Vec<S3MultipartUploadSnapshot>,
128    /// Bucket versioning status.
129    pub versioning: VersioningStatus,
130    /// Bucket encryption settings.
131    pub encryption: Option<BucketEncryption>,
132    /// CORS rules.
133    pub cors_rules: Option<Vec<CorsRuleConfig>>,
134    /// Lifecycle configuration.
135    pub lifecycle: Option<rustack_s3_model::types::BucketLifecycleConfiguration>,
136    /// Bucket policy JSON.
137    pub policy: Option<String>,
138    /// Bucket tags.
139    pub tags: Vec<(String, String)>,
140    /// Canned ACL.
141    pub acl: crate::state::object::CannedAcl,
142    /// Notification configuration.
143    pub notification_configuration: Option<rustack_s3_model::types::NotificationConfiguration>,
144    /// Logging configuration.
145    pub logging: Option<serde_json::Value>,
146    /// Public access block settings.
147    pub public_access_block: Option<PublicAccessBlockConfig>,
148    /// Ownership controls.
149    pub ownership_controls: Option<OwnershipControlsConfig>,
150    /// Whether object lock is enabled.
151    pub object_lock_enabled: bool,
152    /// Object lock configuration.
153    pub object_lock_configuration: Option<ObjectLockConfiguration>,
154    /// Transfer acceleration status.
155    pub accelerate: Option<String>,
156    /// Request payment configuration.
157    pub request_payment: String,
158    /// Static website hosting configuration.
159    pub website: Option<WebsiteConfig>,
160    /// Replication configuration.
161    pub replication: Option<serde_json::Value>,
162    /// Analytics configuration.
163    pub analytics: Option<serde_json::Value>,
164    /// Metrics configuration.
165    pub metrics: Option<serde_json::Value>,
166    /// Inventory configuration.
167    pub inventory: Option<serde_json::Value>,
168    /// Intelligent-tiering configuration.
169    pub intelligent_tiering: Option<serde_json::Value>,
170}
171
172/// Object version snapshot with external body file reference.
173#[derive(Debug, Clone, Serialize, Deserialize)]
174#[serde(rename_all = "camelCase", tag = "type")]
175pub enum S3ObjectVersionSnapshot {
176    /// Real object version.
177    Object {
178        /// Object metadata.
179        object: Box<S3Object>,
180        /// Body file relative to the S3 data directory.
181        body_file: String,
182    },
183    /// Delete marker version.
184    DeleteMarker(S3DeleteMarker),
185}
186
187/// Multipart upload snapshot with external part body file references.
188#[derive(Debug, Clone, Serialize, Deserialize)]
189#[serde(rename_all = "camelCase")]
190pub struct S3MultipartUploadSnapshot {
191    /// Multipart upload metadata.
192    pub upload: MultipartUpload,
193    /// Part number to body file relative to the S3 data directory.
194    pub part_body_files: HashMap<u32, String>,
195}
196
197impl RustackS3 {
198    /// Export S3 state and object bodies into a snapshot.
199    ///
200    /// # Errors
201    ///
202    /// Returns an error if object bodies cannot be read or written.
203    pub async fn export_snapshot(&self, data_dir: &Path) -> Result<S3Snapshot, S3SnapshotError> {
204        create_dir_all(data_dir).await?;
205        let mut buckets = Vec::new();
206        let mut body_index = 0usize;
207
208        for bucket_name in self.state.snapshot_bucket_names() {
209            let bucket = self.state.get_bucket(&bucket_name).map_err(|_| {
210                S3SnapshotError::BucketDisappeared {
211                    bucket: bucket_name.clone(),
212                }
213            })?;
214            let (object_store_versioned, versions) = bucket.objects.read().snapshot_versions();
215            let mut object_versions = Vec::with_capacity(versions.len());
216
217            for version in versions {
218                match version {
219                    ObjectVersion::Object(object) => {
220                        let body_file = format!("objects/{body_index}.bin");
221                        body_index = body_index.saturating_add(1);
222                        let data = self
223                            .storage
224                            .read_object(&bucket.name, &object.key, &object.version_id, None)
225                            .await
226                            .map_err(|source| S3SnapshotError::ReadObject {
227                                bucket: bucket.name.clone(),
228                                key: object.key.clone(),
229                                version_id: object.version_id.clone(),
230                                source: Box::new(source),
231                            })?;
232                        write_data_file(data_dir, &body_file, &data).await?;
233                        object_versions.push(S3ObjectVersionSnapshot::Object { object, body_file });
234                    }
235                    ObjectVersion::DeleteMarker(marker) => {
236                        object_versions.push(S3ObjectVersionSnapshot::DeleteMarker(marker));
237                    }
238                }
239            }
240
241            let mut multipart_uploads = Vec::new();
242            for entry in &bucket.multipart_uploads {
243                let upload = entry.value().clone();
244                let mut part_body_files = HashMap::new();
245                for part_number in upload.parts.keys() {
246                    let body_file = format!("parts/{body_index}.bin");
247                    body_index = body_index.saturating_add(1);
248                    let data = self
249                        .storage
250                        .read_part(&bucket.name, &upload.upload_id, *part_number)
251                        .await
252                        .map_err(|source| S3SnapshotError::ReadPart {
253                            bucket: bucket.name.clone(),
254                            upload_id: upload.upload_id.clone(),
255                            part_number: *part_number,
256                            source: Box::new(source),
257                        })?;
258                    write_data_file(data_dir, &body_file, &data).await?;
259                    part_body_files.insert(*part_number, body_file);
260                }
261                multipart_uploads.push(S3MultipartUploadSnapshot {
262                    upload,
263                    part_body_files,
264                });
265            }
266
267            buckets.push(S3BucketSnapshot {
268                name: bucket.name.clone(),
269                region: bucket.region.clone(),
270                creation_date: bucket.creation_date,
271                owner: bucket.owner.clone(),
272                object_store_versioned,
273                object_versions,
274                multipart_uploads,
275                versioning: *bucket.versioning.read(),
276                encryption: bucket.encryption.read().clone(),
277                cors_rules: bucket.cors_rules.read().clone(),
278                lifecycle: bucket.lifecycle.read().clone(),
279                policy: bucket.policy.read().clone(),
280                tags: bucket.tags.read().clone(),
281                acl: *bucket.acl.read(),
282                notification_configuration: bucket.notification_configuration.read().clone(),
283                logging: bucket.logging.read().clone(),
284                public_access_block: bucket.public_access_block.read().clone(),
285                ownership_controls: bucket.ownership_controls.read().clone(),
286                object_lock_enabled: *bucket.object_lock_enabled.read(),
287                object_lock_configuration: bucket.object_lock_configuration.read().clone(),
288                accelerate: bucket.accelerate.read().clone(),
289                request_payment: bucket.request_payment.read().clone(),
290                website: bucket.website.read().clone(),
291                replication: bucket.replication.read().clone(),
292                analytics: bucket.analytics.read().clone(),
293                metrics: bucket.metrics.read().clone(),
294                inventory: bucket.inventory.read().clone(),
295                intelligent_tiering: bucket.intelligent_tiering.read().clone(),
296            });
297        }
298
299        Ok(S3Snapshot { buckets })
300    }
301
302    /// Import S3 state and object bodies from a snapshot.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if object body files cannot be read or restored.
307    pub async fn import_snapshot(
308        &self,
309        snapshot: S3Snapshot,
310        data_dir: &Path,
311    ) -> Result<(), S3SnapshotError> {
312        self.reset();
313
314        for bucket_snapshot in snapshot.buckets {
315            let bucket = build_bucket_from_snapshot(&bucket_snapshot);
316            let mut versions = Vec::with_capacity(bucket_snapshot.object_versions.len());
317
318            for version in bucket_snapshot.object_versions {
319                match version {
320                    S3ObjectVersionSnapshot::Object { object, body_file } => {
321                        let data = read_data_file(data_dir, &body_file).await?;
322                        self.storage
323                            .write_object(&bucket.name, &object.key, &object.version_id, data)
324                            .await
325                            .map_err(|source| S3SnapshotError::WriteObject {
326                                bucket: bucket.name.clone(),
327                                key: object.key.clone(),
328                                version_id: object.version_id.clone(),
329                                source: Box::new(source),
330                            })?;
331                        versions.push(ObjectVersion::Object(object));
332                    }
333                    S3ObjectVersionSnapshot::DeleteMarker(marker) => {
334                        versions.push(ObjectVersion::DeleteMarker(marker));
335                    }
336                }
337            }
338
339            bucket
340                .objects
341                .write()
342                .replace_from_snapshot(bucket_snapshot.object_store_versioned, versions);
343
344            for multipart in bucket_snapshot.multipart_uploads {
345                for (part_number, body_file) in &multipart.part_body_files {
346                    let data = read_data_file(data_dir, body_file).await?;
347                    self.storage
348                        .write_part(
349                            &bucket.name,
350                            &multipart.upload.upload_id,
351                            *part_number,
352                            data,
353                        )
354                        .await
355                        .map_err(|source| S3SnapshotError::WritePart {
356                            bucket: bucket.name.clone(),
357                            upload_id: multipart.upload.upload_id.clone(),
358                            part_number: *part_number,
359                            source: Box::new(source),
360                        })?;
361                }
362                bucket
363                    .multipart_uploads
364                    .insert(multipart.upload.upload_id.clone(), multipart.upload);
365            }
366
367            self.state.insert_snapshot_bucket(bucket);
368        }
369
370        Ok(())
371    }
372}
373
374fn build_bucket_from_snapshot(snapshot: &S3BucketSnapshot) -> S3Bucket {
375    let bucket = S3Bucket::new(
376        snapshot.name.clone(),
377        snapshot.region.clone(),
378        snapshot.owner.clone(),
379    );
380    let mut bucket = bucket;
381    bucket.creation_date = snapshot.creation_date;
382    *bucket.versioning.write() = snapshot.versioning;
383    (*bucket.encryption.write()).clone_from(&snapshot.encryption);
384    (*bucket.cors_rules.write()).clone_from(&snapshot.cors_rules);
385    (*bucket.lifecycle.write()).clone_from(&snapshot.lifecycle);
386    (*bucket.policy.write()).clone_from(&snapshot.policy);
387    (*bucket.tags.write()).clone_from(&snapshot.tags);
388    *bucket.acl.write() = snapshot.acl;
389    (*bucket.notification_configuration.write()).clone_from(&snapshot.notification_configuration);
390    (*bucket.logging.write()).clone_from(&snapshot.logging);
391    (*bucket.public_access_block.write()).clone_from(&snapshot.public_access_block);
392    (*bucket.ownership_controls.write()).clone_from(&snapshot.ownership_controls);
393    *bucket.object_lock_enabled.write() = snapshot.object_lock_enabled;
394    (*bucket.object_lock_configuration.write()).clone_from(&snapshot.object_lock_configuration);
395    (*bucket.accelerate.write()).clone_from(&snapshot.accelerate);
396    (*bucket.request_payment.write()).clone_from(&snapshot.request_payment);
397    (*bucket.website.write()).clone_from(&snapshot.website);
398    (*bucket.replication.write()).clone_from(&snapshot.replication);
399    (*bucket.analytics.write()).clone_from(&snapshot.analytics);
400    (*bucket.metrics.write()).clone_from(&snapshot.metrics);
401    (*bucket.inventory.write()).clone_from(&snapshot.inventory);
402    (*bucket.intelligent_tiering.write()).clone_from(&snapshot.intelligent_tiering);
403    bucket
404}
405
406async fn create_dir_all(path: &Path) -> Result<(), S3SnapshotError> {
407    tokio::fs::create_dir_all(path)
408        .await
409        .map_err(|source| S3SnapshotError::Io {
410            path: path.display().to_string(),
411            source,
412        })
413}
414
415async fn write_data_file(root: &Path, relative: &str, data: &[u8]) -> Result<(), S3SnapshotError> {
416    let path = data_file_path(root, relative)?;
417    if let Some(parent) = path.parent() {
418        create_dir_all(parent).await?;
419    }
420    let mut file = tokio::fs::File::create(&path)
421        .await
422        .map_err(|source| S3SnapshotError::Io {
423            path: path.display().to_string(),
424            source,
425        })?;
426    file.write_all(data)
427        .await
428        .map_err(|source| S3SnapshotError::Io {
429            path: path.display().to_string(),
430            source,
431        })
432}
433
434async fn read_data_file(root: &Path, relative: &str) -> Result<bytes::Bytes, S3SnapshotError> {
435    let path = data_file_path(root, relative)?;
436    tokio::fs::read(&path)
437        .await
438        .map(bytes::Bytes::from)
439        .map_err(|source| S3SnapshotError::Io {
440            path: path.display().to_string(),
441            source,
442        })
443}
444
445fn data_file_path(root: &Path, relative: &str) -> Result<PathBuf, S3SnapshotError> {
446    let path = Path::new(relative);
447    if path.is_absolute()
448        || path
449            .components()
450            .any(|component| !matches!(component, Component::Normal(_)))
451    {
452        return Err(S3SnapshotError::InvalidDataPath {
453            path: relative.to_owned(),
454        });
455    }
456    Ok(root.join(path))
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462
463    #[test]
464    fn test_should_reject_data_file_path_traversal() {
465        let path = data_file_path(Path::new("/tmp/s3"), "../outside.bin");
466        assert!(path.is_err());
467    }
468
469    #[test]
470    fn test_should_accept_data_file_child_path() {
471        let path = data_file_path(Path::new("/tmp/s3"), "objects/body.bin");
472        assert!(path.is_ok());
473    }
474}