Skip to main content

fakecloud_s3/
state.rs

1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use fakecloud_persistence::cache::{BodyCache, BodyKey};
4use fakecloud_persistence::BodyRef;
5use parking_lot::RwLock;
6use std::collections::BTreeMap;
7use std::io::{self, Read, Seek, SeekFrom};
8use std::sync::Arc;
9
10/// An ACL grant entry.
11#[derive(Debug, Clone)]
12pub struct AclGrant {
13    pub grantee_type: String, // "CanonicalUser" or "Group"
14    pub grantee_id: Option<String>,
15    pub grantee_display_name: Option<String>,
16    pub grantee_uri: Option<String>,
17    pub permission: String, // READ, WRITE, READ_ACP, WRITE_ACP, FULL_CONTROL
18}
19
20#[derive(Debug, Clone, Default)]
21pub struct S3Object {
22    pub key: String,
23    pub body: BodyRef,
24    pub content_type: String,
25    pub etag: String,
26    pub size: u64,
27    pub last_modified: DateTime<Utc>,
28    pub metadata: BTreeMap<String, String>,
29    pub storage_class: String,
30    pub tags: BTreeMap<String, String>,
31    pub acl_grants: Vec<AclGrant>,
32    pub acl_owner_id: Option<String>,
33    /// If created from multipart upload, the number of parts.
34    pub parts_count: Option<u32>,
35    /// Per-part sizes for multipart objects (part_number, size).
36    pub part_sizes: Option<Vec<(u32, u64)>>,
37    /// Server-side encryption algorithm.
38    pub sse_algorithm: Option<String>,
39    /// KMS key ID for SSE-KMS.
40    pub sse_kms_key_id: Option<String>,
41    /// Whether bucket key is enabled.
42    pub bucket_key_enabled: Option<bool>,
43    pub version_id: Option<String>,
44    pub is_delete_marker: bool,
45    pub content_encoding: Option<String>,
46    pub website_redirect_location: Option<String>,
47    /// Glacier restore: ongoing request status.
48    pub restore_ongoing: Option<bool>,
49    /// Glacier restore: expiry date string.
50    pub restore_expiry: Option<String>,
51    /// Checksum algorithm (CRC32, SHA1, SHA256).
52    pub checksum_algorithm: Option<String>,
53    /// Base64-encoded checksum value.
54    pub checksum_value: Option<String>,
55    /// Object lock mode (GOVERNANCE or COMPLIANCE).
56    pub lock_mode: Option<String>,
57    /// Object lock retain-until date (ISO 8601).
58    pub lock_retain_until: Option<DateTime<Utc>>,
59    /// Legal hold status (ON or OFF).
60    pub lock_legal_hold: Option<String>,
61}
62
63/// A part uploaded via the multipart upload API.
64#[derive(Debug, Clone)]
65pub struct UploadPart {
66    pub part_number: u32,
67    pub body: BodyRef,
68    pub etag: String,
69    pub size: u64,
70    pub last_modified: DateTime<Utc>,
71}
72
73/// An in-progress multipart upload.
74#[derive(Debug, Clone)]
75pub struct MultipartUpload {
76    pub upload_id: String,
77    pub key: String,
78    pub initiated: DateTime<Utc>,
79    /// Parts keyed by part number.
80    pub parts: BTreeMap<u32, UploadPart>,
81    /// Metadata provided at CreateMultipartUpload time.
82    pub metadata: BTreeMap<String, String>,
83    pub content_type: String,
84    pub storage_class: String,
85    pub sse_algorithm: Option<String>,
86    pub sse_kms_key_id: Option<String>,
87    pub tagging: Option<String>,
88    pub acl_grants: Vec<AclGrant>,
89    pub checksum_algorithm: Option<String>,
90}
91
92#[derive(Debug, Clone)]
93pub struct S3Bucket {
94    pub name: String,
95    pub creation_date: DateTime<Utc>,
96    pub region: String,
97    /// Objects keyed by their full key path.
98    pub objects: BTreeMap<String, S3Object>,
99    pub tags: BTreeMap<String, String>,
100    pub acl_grants: Vec<AclGrant>,
101    pub acl_owner_id: String,
102    /// In-progress multipart uploads keyed by upload ID.
103    pub multipart_uploads: BTreeMap<String, MultipartUpload>,
104    /// Versioning status: None = never enabled, Some("Enabled"), Some("Suspended").
105    pub versioning: Option<String>,
106    /// MFA-Delete status: None = unset, Some("Enabled"), Some("Disabled").
107    pub mfa_delete: Option<String>,
108    /// Object versions keyed by key, each value is a list of versions.
109    pub object_versions: BTreeMap<String, Vec<S3Object>>,
110    /// Bucket ACL (canned or XML).
111    pub acl: Option<String>,
112    pub encryption_config: Option<String>,
113    pub lifecycle_config: Option<String>,
114    /// Value of the `x-amz-transition-default-minimum-object-size` header
115    /// supplied on PutBucketLifecycleConfiguration. Echoed back as a header
116    /// on the corresponding GET (and PUT) response. Real AWS defaults to
117    /// `all_storage_classes_128K` for general purpose buckets.
118    pub lifecycle_transition_default_min_size: Option<String>,
119    pub policy: Option<String>,
120    pub cors_config: Option<String>,
121    pub notification_config: Option<String>,
122    pub logging_config: Option<String>,
123    pub website_config: Option<String>,
124    pub accelerate_status: Option<String>,
125    pub public_access_block: Option<String>,
126    pub object_lock_config: Option<String>,
127    pub replication_config: Option<String>,
128    pub ownership_controls: Option<String>,
129    pub inventory_configs: BTreeMap<String, String>,
130    /// Whether EventBridge notifications are enabled for this bucket.
131    pub eventbridge_enabled: bool,
132    /// Per-id analytics configurations (XML body).
133    pub analytics_configs: BTreeMap<String, String>,
134    /// Per-id intelligent-tiering configurations (XML body).
135    pub intelligent_tiering_configs: BTreeMap<String, String>,
136    /// Per-id metrics configurations (XML body).
137    pub metrics_configs: BTreeMap<String, String>,
138    /// Request payment configuration (XML body).
139    pub request_payment: Option<String>,
140    /// Per-bucket ABAC config (XML body) — see PutBucketAbac/GetBucketAbac.
141    pub abac_config: Option<String>,
142    /// Bucket-level metadata configuration (S3 metadata table v2).
143    pub metadata_configuration: Option<String>,
144    /// Bucket-level metadata table configuration (S3 metadata table v1).
145    pub metadata_table_configuration: Option<String>,
146}
147
148impl S3Bucket {
149    pub fn new(name: &str, region: &str, owner_id: &str) -> Self {
150        Self {
151            name: name.to_string(),
152            creation_date: Utc::now(),
153            region: region.to_string(),
154            objects: BTreeMap::new(),
155            tags: BTreeMap::new(),
156            acl_grants: vec![AclGrant {
157                grantee_type: "CanonicalUser".to_string(),
158                grantee_id: Some(owner_id.to_string()),
159                grantee_display_name: Some(owner_id.to_string()),
160                grantee_uri: None,
161                permission: "FULL_CONTROL".to_string(),
162            }],
163            acl_owner_id: owner_id.to_string(),
164            multipart_uploads: BTreeMap::new(),
165            versioning: None,
166            mfa_delete: None,
167            object_versions: BTreeMap::new(),
168            acl: None,
169            encryption_config: None,
170            lifecycle_config: None,
171            lifecycle_transition_default_min_size: None,
172            policy: None,
173            cors_config: None,
174            notification_config: None,
175            logging_config: None,
176            website_config: None,
177            accelerate_status: None,
178            public_access_block: None,
179            object_lock_config: None,
180            replication_config: None,
181            ownership_controls: None,
182            inventory_configs: BTreeMap::new(),
183            eventbridge_enabled: false,
184            analytics_configs: BTreeMap::new(),
185            intelligent_tiering_configs: BTreeMap::new(),
186            metrics_configs: BTreeMap::new(),
187            request_payment: None,
188            abac_config: None,
189            metadata_configuration: None,
190            metadata_table_configuration: None,
191        }
192    }
193}
194
195/// A recorded S3 notification event for introspection.
196#[derive(Debug, Clone)]
197pub struct S3NotificationEvent {
198    pub bucket: String,
199    pub key: String,
200    pub event_type: String,
201    pub timestamp: DateTime<Utc>,
202}
203
204/// Stored response from a Lambda function invoked via S3 Object Lambda.
205/// Keyed by `request_token` in [`S3State::object_lambda_responses`].
206#[derive(Debug, Clone)]
207pub struct ObjectLambdaResponse {
208    pub route: String,
209    pub token: String,
210    pub body: Vec<u8>,
211    pub content_type: Option<String>,
212    pub fwd_status: Option<u16>,
213    pub fwd_error_message: Option<String>,
214    pub metadata: BTreeMap<String, String>,
215    pub encryption: Option<String>,
216    pub kms_key_id: Option<String>,
217    pub stored_at: DateTime<Utc>,
218}
219
220#[derive(Debug, Clone)]
221pub struct S3AccessPoint {
222    pub name: String,
223    pub bucket: String,
224    pub account_id: String,
225    pub network_origin: String,
226    pub vpc_configuration: Option<String>,
227    pub creation_date: DateTime<Utc>,
228    pub public_access_block: Option<String>,
229    pub bucket_account_id: Option<String>,
230}
231
232pub struct S3State {
233    pub account_id: String,
234    pub region: String,
235    pub buckets: BTreeMap<String, S3Bucket>,
236    pub notification_events: Vec<S3NotificationEvent>,
237    pub body_cache: Option<Arc<BodyCache>>,
238    /// Object Lambda responses keyed by request token.
239    pub object_lambda_responses: BTreeMap<String, ObjectLambdaResponse>,
240    pub access_points: BTreeMap<String, S3AccessPoint>,
241}
242
243impl S3State {
244    pub fn new(account_id: &str, region: &str) -> Self {
245        Self {
246            account_id: account_id.to_string(),
247            region: region.to_string(),
248            buckets: BTreeMap::new(),
249            notification_events: Vec::new(),
250            body_cache: None,
251            object_lambda_responses: BTreeMap::new(),
252            access_points: BTreeMap::new(),
253        }
254    }
255
256    pub fn set_body_cache(&mut self, cache: Arc<BodyCache>) {
257        self.body_cache = Some(cache);
258    }
259
260    pub fn reset(&mut self) {
261        self.buckets.clear();
262        self.notification_events.clear();
263        self.object_lambda_responses.clear();
264    }
265
266    /// Read the full body referenced by a [`BodyRef`] without touching the
267    /// [`BodyCache`] or any `S3State`. Because it borrows nothing from state,
268    /// callers can read part bodies after dropping the global S3 lock — used by
269    /// CompleteMultipartUpload to assemble a multi-GB object off-lock instead of
270    /// serializing every other S3 operation behind the assembly (bug-audit
271    /// 2026-05-28, 4.7). Disk bodies are read straight from their path.
272    pub fn read_body_uncached(body: &BodyRef) -> io::Result<Bytes> {
273        match body {
274            BodyRef::Memory(b) => Ok(b.clone()),
275            BodyRef::Disk { path, .. } => Ok(Bytes::from(std::fs::read(path)?)),
276        }
277    }
278
279    /// Read the full body referenced by a [`BodyRef`], consulting the
280    /// persistent [`BodyCache`] when one is configured.
281    pub fn read_body(&self, body: &BodyRef) -> io::Result<Bytes> {
282        match body {
283            BodyRef::Memory(b) => Ok(b.clone()),
284            BodyRef::Disk {
285                bucket,
286                key,
287                version,
288                path,
289                ..
290            } => {
291                let cache_key = BodyKey::new(bucket.clone(), key.clone(), version.clone());
292                if let Some(cache) = &self.body_cache {
293                    if let Some(hit) = cache.get(&cache_key) {
294                        return Ok(hit);
295                    }
296                }
297                let data = std::fs::read(path)?;
298                let bytes = Bytes::from(data);
299                if let Some(cache) = &self.body_cache {
300                    cache.insert(cache_key, bytes.clone());
301                }
302                Ok(bytes)
303            }
304        }
305    }
306
307    /// Read a byte range from the body without loading the full object into
308    /// memory. Memory bodies are sliced directly; disk bodies are seek+read'd.
309    /// Ranges bypass the body cache (the cache stores whole objects only).
310    pub fn read_body_range(&self, body: &BodyRef, offset: u64, len: u64) -> io::Result<Bytes> {
311        match body {
312            BodyRef::Memory(b) => {
313                let start = offset as usize;
314                let end = start.saturating_add(len as usize).min(b.len());
315                if start > b.len() {
316                    return Ok(Bytes::new());
317                }
318                Ok(b.slice(start..end))
319            }
320            BodyRef::Disk { path, .. } => {
321                let mut f = std::fs::File::open(path)?;
322                f.seek(SeekFrom::Start(offset))?;
323                let mut buf = vec![0u8; len as usize];
324                f.read_exact(&mut buf)?;
325                Ok(Bytes::from(buf))
326            }
327        }
328    }
329}
330
331impl fakecloud_core::multi_account::AccountState for S3State {
332    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
333        Self::new(account_id, region)
334    }
335
336    fn inherit_from(&mut self, sibling: &Self) {
337        if let Some(cache) = &sibling.body_cache {
338            self.body_cache = Some(cache.clone());
339        }
340    }
341}
342
343pub type SharedS3State = Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<S3State>>>;
344
345/// Construct a memory-backed [`BodyRef`] from [`Bytes`].
346pub fn memory_body(bytes: Bytes) -> BodyRef {
347    BodyRef::Memory(bytes)
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353    use std::io::Write;
354
355    #[test]
356    fn new_bucket_seeds_full_control_acl() {
357        let b = S3Bucket::new("my-bucket", "us-east-1", "owner-id");
358        assert_eq!(b.name, "my-bucket");
359        assert_eq!(b.region, "us-east-1");
360        assert_eq!(b.acl_owner_id, "owner-id");
361        assert_eq!(b.acl_grants.len(), 1);
362        assert_eq!(b.acl_grants[0].permission, "FULL_CONTROL");
363        assert_eq!(b.acl_grants[0].grantee_type, "CanonicalUser");
364        assert!(!b.eventbridge_enabled);
365        assert!(b.versioning.is_none());
366    }
367
368    #[test]
369    fn s3state_new_and_reset_clears_buckets() {
370        let mut state = S3State::new("123456789012", "us-east-1");
371        assert!(state.buckets.is_empty());
372        state
373            .buckets
374            .insert("b".to_string(), S3Bucket::new("b", "us-east-1", "owner"));
375        state.notification_events.push(S3NotificationEvent {
376            bucket: "b".to_string(),
377            key: "k".to_string(),
378            event_type: "s3:ObjectCreated:Put".to_string(),
379            timestamp: Utc::now(),
380        });
381        state.reset();
382        assert!(state.buckets.is_empty());
383        assert!(state.notification_events.is_empty());
384    }
385
386    #[test]
387    fn read_body_from_memory_returns_bytes() {
388        let state = S3State::new("123", "us-east-1");
389        let body = memory_body(Bytes::from_static(b"hello"));
390        assert_eq!(state.read_body(&body).unwrap(), &b"hello"[..]);
391    }
392
393    #[test]
394    fn read_body_from_disk_reads_file() {
395        let tmp = tempfile::NamedTempFile::new().unwrap();
396        tmp.as_file().write_all(b"file-body").unwrap();
397        let body = BodyRef::Disk {
398            bucket: "b".to_string(),
399            key: "k".to_string(),
400            version: None,
401            path: tmp.path().to_path_buf(),
402            size: 9,
403        };
404        let state = S3State::new("123", "us-east-1");
405        assert_eq!(state.read_body(&body).unwrap(), &b"file-body"[..]);
406    }
407
408    #[test]
409    fn read_body_uncached_reads_memory_and_disk() {
410        // bug-audit 4.7: the off-lock multipart assembler relies on this
411        // state-free reader for both body kinds.
412        let mem = memory_body(Bytes::from_static(b"hello"));
413        assert_eq!(S3State::read_body_uncached(&mem).unwrap(), &b"hello"[..]);
414
415        let tmp = tempfile::NamedTempFile::new().unwrap();
416        tmp.as_file().write_all(b"file-body").unwrap();
417        let disk = BodyRef::Disk {
418            bucket: "b".to_string(),
419            key: "k".to_string(),
420            version: None,
421            path: tmp.path().to_path_buf(),
422            size: 9,
423        };
424        assert_eq!(
425            S3State::read_body_uncached(&disk).unwrap(),
426            &b"file-body"[..]
427        );
428    }
429
430    #[test]
431    fn read_body_range_slices_memory() {
432        let state = S3State::new("123", "us-east-1");
433        let body = memory_body(Bytes::from_static(b"abcdefghij"));
434        assert_eq!(state.read_body_range(&body, 2, 4).unwrap(), &b"cdef"[..]);
435    }
436
437    #[test]
438    fn read_body_range_memory_beyond_length_returns_empty() {
439        let state = S3State::new("123", "us-east-1");
440        let body = memory_body(Bytes::from_static(b"abc"));
441        assert!(state.read_body_range(&body, 100, 4).unwrap().is_empty());
442    }
443
444    #[test]
445    fn read_body_range_memory_clamps_to_length() {
446        let state = S3State::new("123", "us-east-1");
447        let body = memory_body(Bytes::from_static(b"abcdef"));
448        assert_eq!(state.read_body_range(&body, 4, 100).unwrap(), &b"ef"[..]);
449    }
450
451    #[test]
452    fn read_body_range_from_disk() {
453        let tmp = tempfile::NamedTempFile::new().unwrap();
454        tmp.as_file().write_all(b"0123456789").unwrap();
455        let body = BodyRef::Disk {
456            bucket: "b".to_string(),
457            key: "k".to_string(),
458            version: None,
459            path: tmp.path().to_path_buf(),
460            size: 10,
461        };
462        let state = S3State::new("123", "us-east-1");
463        assert_eq!(state.read_body_range(&body, 3, 4).unwrap(), &b"3456"[..]);
464    }
465
466    #[test]
467    fn account_state_impl_new_for_account() {
468        use fakecloud_core::multi_account::AccountState;
469        let s = S3State::new_for_account("111122223333", "eu-west-1", "http://x");
470        assert_eq!(s.account_id, "111122223333");
471        assert_eq!(s.region, "eu-west-1");
472    }
473}