Skip to main content

fakecloud_s3/
state.rs

1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use fakecloud_persistence::cache::{BodyCache, BodyKey};
4use fakecloud_persistence::BodyRef;
5use parking_lot::RwLock;
6use std::collections::{BTreeMap, HashMap};
7use std::io::{self, Read, Seek, SeekFrom};
8use std::sync::Arc;
9
10/// An ACL grant entry.
11#[derive(Debug, Clone)]
12pub struct AclGrant {
13    pub grantee_type: String, // "CanonicalUser" or "Group"
14    pub grantee_id: Option<String>,
15    pub grantee_display_name: Option<String>,
16    pub grantee_uri: Option<String>,
17    pub permission: String, // READ, WRITE, READ_ACP, WRITE_ACP, FULL_CONTROL
18}
19
20#[derive(Debug, Clone, Default)]
21pub struct S3Object {
22    pub key: String,
23    pub body: BodyRef,
24    pub content_type: String,
25    pub etag: String,
26    pub size: u64,
27    pub last_modified: DateTime<Utc>,
28    pub metadata: HashMap<String, String>,
29    pub storage_class: String,
30    pub tags: HashMap<String, String>,
31    pub acl_grants: Vec<AclGrant>,
32    pub acl_owner_id: Option<String>,
33    /// If created from multipart upload, the number of parts.
34    pub parts_count: Option<u32>,
35    /// Per-part sizes for multipart objects (part_number, size).
36    pub part_sizes: Option<Vec<(u32, u64)>>,
37    /// Server-side encryption algorithm.
38    pub sse_algorithm: Option<String>,
39    /// KMS key ID for SSE-KMS.
40    pub sse_kms_key_id: Option<String>,
41    /// Whether bucket key is enabled.
42    pub bucket_key_enabled: Option<bool>,
43    pub version_id: Option<String>,
44    pub is_delete_marker: bool,
45    pub content_encoding: Option<String>,
46    pub website_redirect_location: Option<String>,
47    /// Glacier restore: ongoing request status.
48    pub restore_ongoing: Option<bool>,
49    /// Glacier restore: expiry date string.
50    pub restore_expiry: Option<String>,
51    /// Checksum algorithm (CRC32, SHA1, SHA256).
52    pub checksum_algorithm: Option<String>,
53    /// Base64-encoded checksum value.
54    pub checksum_value: Option<String>,
55    /// Object lock mode (GOVERNANCE or COMPLIANCE).
56    pub lock_mode: Option<String>,
57    /// Object lock retain-until date (ISO 8601).
58    pub lock_retain_until: Option<DateTime<Utc>>,
59    /// Legal hold status (ON or OFF).
60    pub lock_legal_hold: Option<String>,
61}
62
63/// A part uploaded via the multipart upload API.
64#[derive(Debug, Clone)]
65pub struct UploadPart {
66    pub part_number: u32,
67    pub body: BodyRef,
68    pub etag: String,
69    pub size: u64,
70    pub last_modified: DateTime<Utc>,
71}
72
73/// An in-progress multipart upload.
74#[derive(Debug, Clone)]
75pub struct MultipartUpload {
76    pub upload_id: String,
77    pub key: String,
78    pub initiated: DateTime<Utc>,
79    /// Parts keyed by part number.
80    pub parts: BTreeMap<u32, UploadPart>,
81    /// Metadata provided at CreateMultipartUpload time.
82    pub metadata: HashMap<String, String>,
83    pub content_type: String,
84    pub storage_class: String,
85    pub sse_algorithm: Option<String>,
86    pub sse_kms_key_id: Option<String>,
87    pub tagging: Option<String>,
88    pub acl_grants: Vec<AclGrant>,
89    pub checksum_algorithm: Option<String>,
90}
91
92#[derive(Debug, Clone)]
93pub struct S3Bucket {
94    pub name: String,
95    pub creation_date: DateTime<Utc>,
96    pub region: String,
97    /// Objects keyed by their full key path.
98    pub objects: BTreeMap<String, S3Object>,
99    pub tags: HashMap<String, String>,
100    pub acl_grants: Vec<AclGrant>,
101    pub acl_owner_id: String,
102    /// In-progress multipart uploads keyed by upload ID.
103    pub multipart_uploads: HashMap<String, MultipartUpload>,
104    /// Versioning status: None = never enabled, Some("Enabled"), Some("Suspended").
105    pub versioning: Option<String>,
106    /// Object versions keyed by key, each value is a list of versions.
107    pub object_versions: HashMap<String, Vec<S3Object>>,
108    /// Bucket ACL (canned or XML).
109    pub acl: Option<String>,
110    pub encryption_config: Option<String>,
111    pub lifecycle_config: Option<String>,
112    pub policy: Option<String>,
113    pub cors_config: Option<String>,
114    pub notification_config: Option<String>,
115    pub logging_config: Option<String>,
116    pub website_config: Option<String>,
117    pub accelerate_status: Option<String>,
118    pub public_access_block: Option<String>,
119    pub object_lock_config: Option<String>,
120    pub replication_config: Option<String>,
121    pub ownership_controls: Option<String>,
122    pub inventory_configs: HashMap<String, String>,
123    /// Whether EventBridge notifications are enabled for this bucket.
124    pub eventbridge_enabled: bool,
125    /// Per-id analytics configurations (XML body).
126    pub analytics_configs: HashMap<String, String>,
127    /// Per-id intelligent-tiering configurations (XML body).
128    pub intelligent_tiering_configs: HashMap<String, String>,
129    /// Per-id metrics configurations (XML body).
130    pub metrics_configs: HashMap<String, String>,
131    /// Request payment configuration (XML body).
132    pub request_payment: Option<String>,
133    /// Per-bucket ABAC config (XML body) — see PutBucketAbac/GetBucketAbac.
134    pub abac_config: Option<String>,
135    /// Bucket-level metadata configuration (S3 metadata table v2).
136    pub metadata_configuration: Option<String>,
137    /// Bucket-level metadata table configuration (S3 metadata table v1).
138    pub metadata_table_configuration: Option<String>,
139}
140
141impl S3Bucket {
142    pub fn new(name: &str, region: &str, owner_id: &str) -> Self {
143        Self {
144            name: name.to_string(),
145            creation_date: Utc::now(),
146            region: region.to_string(),
147            objects: BTreeMap::new(),
148            tags: HashMap::new(),
149            acl_grants: vec![AclGrant {
150                grantee_type: "CanonicalUser".to_string(),
151                grantee_id: Some(owner_id.to_string()),
152                grantee_display_name: Some(owner_id.to_string()),
153                grantee_uri: None,
154                permission: "FULL_CONTROL".to_string(),
155            }],
156            acl_owner_id: owner_id.to_string(),
157            multipart_uploads: HashMap::new(),
158            versioning: None,
159            object_versions: HashMap::new(),
160            acl: None,
161            encryption_config: None,
162            lifecycle_config: None,
163            policy: None,
164            cors_config: None,
165            notification_config: None,
166            logging_config: None,
167            website_config: None,
168            accelerate_status: None,
169            public_access_block: None,
170            object_lock_config: None,
171            replication_config: None,
172            ownership_controls: None,
173            inventory_configs: HashMap::new(),
174            eventbridge_enabled: false,
175            analytics_configs: HashMap::new(),
176            intelligent_tiering_configs: HashMap::new(),
177            metrics_configs: HashMap::new(),
178            request_payment: None,
179            abac_config: None,
180            metadata_configuration: None,
181            metadata_table_configuration: None,
182        }
183    }
184}
185
186/// A recorded S3 notification event for introspection.
187#[derive(Debug, Clone)]
188pub struct S3NotificationEvent {
189    pub bucket: String,
190    pub key: String,
191    pub event_type: String,
192    pub timestamp: DateTime<Utc>,
193}
194
195pub struct S3State {
196    pub account_id: String,
197    pub region: String,
198    pub buckets: HashMap<String, S3Bucket>,
199    pub notification_events: Vec<S3NotificationEvent>,
200    pub body_cache: Option<Arc<BodyCache>>,
201}
202
203impl S3State {
204    pub fn new(account_id: &str, region: &str) -> Self {
205        Self {
206            account_id: account_id.to_string(),
207            region: region.to_string(),
208            buckets: HashMap::new(),
209            notification_events: Vec::new(),
210            body_cache: None,
211        }
212    }
213
214    pub fn set_body_cache(&mut self, cache: Arc<BodyCache>) {
215        self.body_cache = Some(cache);
216    }
217
218    pub fn reset(&mut self) {
219        self.buckets.clear();
220        self.notification_events.clear();
221    }
222
223    /// Read the full body referenced by a [`BodyRef`], consulting the
224    /// persistent [`BodyCache`] when one is configured.
225    pub fn read_body(&self, body: &BodyRef) -> io::Result<Bytes> {
226        match body {
227            BodyRef::Memory(b) => Ok(b.clone()),
228            BodyRef::Disk {
229                bucket,
230                key,
231                version,
232                path,
233                ..
234            } => {
235                let cache_key = BodyKey::new(bucket.clone(), key.clone(), version.clone());
236                if let Some(cache) = &self.body_cache {
237                    if let Some(hit) = cache.get(&cache_key) {
238                        return Ok(hit);
239                    }
240                }
241                let data = std::fs::read(path)?;
242                let bytes = Bytes::from(data);
243                if let Some(cache) = &self.body_cache {
244                    cache.insert(cache_key, bytes.clone());
245                }
246                Ok(bytes)
247            }
248        }
249    }
250
251    /// Read a byte range from the body without loading the full object into
252    /// memory. Memory bodies are sliced directly; disk bodies are seek+read'd.
253    /// Ranges bypass the body cache (the cache stores whole objects only).
254    pub fn read_body_range(&self, body: &BodyRef, offset: u64, len: u64) -> io::Result<Bytes> {
255        match body {
256            BodyRef::Memory(b) => {
257                let start = offset as usize;
258                let end = start.saturating_add(len as usize).min(b.len());
259                if start > b.len() {
260                    return Ok(Bytes::new());
261                }
262                Ok(b.slice(start..end))
263            }
264            BodyRef::Disk { path, .. } => {
265                let mut f = std::fs::File::open(path)?;
266                f.seek(SeekFrom::Start(offset))?;
267                let mut buf = vec![0u8; len as usize];
268                f.read_exact(&mut buf)?;
269                Ok(Bytes::from(buf))
270            }
271        }
272    }
273}
274
275impl fakecloud_core::multi_account::AccountState for S3State {
276    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
277        Self::new(account_id, region)
278    }
279
280    fn inherit_from(&mut self, sibling: &Self) {
281        if let Some(cache) = &sibling.body_cache {
282            self.body_cache = Some(cache.clone());
283        }
284    }
285}
286
287pub type SharedS3State = Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<S3State>>>;
288
289/// Construct a memory-backed [`BodyRef`] from [`Bytes`].
290pub fn memory_body(bytes: Bytes) -> BodyRef {
291    BodyRef::Memory(bytes)
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use std::io::Write;
298
299    #[test]
300    fn new_bucket_seeds_full_control_acl() {
301        let b = S3Bucket::new("my-bucket", "us-east-1", "owner-id");
302        assert_eq!(b.name, "my-bucket");
303        assert_eq!(b.region, "us-east-1");
304        assert_eq!(b.acl_owner_id, "owner-id");
305        assert_eq!(b.acl_grants.len(), 1);
306        assert_eq!(b.acl_grants[0].permission, "FULL_CONTROL");
307        assert_eq!(b.acl_grants[0].grantee_type, "CanonicalUser");
308        assert!(!b.eventbridge_enabled);
309        assert!(b.versioning.is_none());
310    }
311
312    #[test]
313    fn s3state_new_and_reset_clears_buckets() {
314        let mut state = S3State::new("123456789012", "us-east-1");
315        assert!(state.buckets.is_empty());
316        state
317            .buckets
318            .insert("b".to_string(), S3Bucket::new("b", "us-east-1", "owner"));
319        state.notification_events.push(S3NotificationEvent {
320            bucket: "b".to_string(),
321            key: "k".to_string(),
322            event_type: "s3:ObjectCreated:Put".to_string(),
323            timestamp: Utc::now(),
324        });
325        state.reset();
326        assert!(state.buckets.is_empty());
327        assert!(state.notification_events.is_empty());
328    }
329
330    #[test]
331    fn read_body_from_memory_returns_bytes() {
332        let state = S3State::new("123", "us-east-1");
333        let body = memory_body(Bytes::from_static(b"hello"));
334        assert_eq!(state.read_body(&body).unwrap(), &b"hello"[..]);
335    }
336
337    #[test]
338    fn read_body_from_disk_reads_file() {
339        let tmp = tempfile::NamedTempFile::new().unwrap();
340        tmp.as_file().write_all(b"file-body").unwrap();
341        let body = BodyRef::Disk {
342            bucket: "b".to_string(),
343            key: "k".to_string(),
344            version: None,
345            path: tmp.path().to_path_buf(),
346            size: 9,
347        };
348        let state = S3State::new("123", "us-east-1");
349        assert_eq!(state.read_body(&body).unwrap(), &b"file-body"[..]);
350    }
351
352    #[test]
353    fn read_body_range_slices_memory() {
354        let state = S3State::new("123", "us-east-1");
355        let body = memory_body(Bytes::from_static(b"abcdefghij"));
356        assert_eq!(state.read_body_range(&body, 2, 4).unwrap(), &b"cdef"[..]);
357    }
358
359    #[test]
360    fn read_body_range_memory_beyond_length_returns_empty() {
361        let state = S3State::new("123", "us-east-1");
362        let body = memory_body(Bytes::from_static(b"abc"));
363        assert!(state.read_body_range(&body, 100, 4).unwrap().is_empty());
364    }
365
366    #[test]
367    fn read_body_range_memory_clamps_to_length() {
368        let state = S3State::new("123", "us-east-1");
369        let body = memory_body(Bytes::from_static(b"abcdef"));
370        assert_eq!(state.read_body_range(&body, 4, 100).unwrap(), &b"ef"[..]);
371    }
372
373    #[test]
374    fn read_body_range_from_disk() {
375        let tmp = tempfile::NamedTempFile::new().unwrap();
376        tmp.as_file().write_all(b"0123456789").unwrap();
377        let body = BodyRef::Disk {
378            bucket: "b".to_string(),
379            key: "k".to_string(),
380            version: None,
381            path: tmp.path().to_path_buf(),
382            size: 10,
383        };
384        let state = S3State::new("123", "us-east-1");
385        assert_eq!(state.read_body_range(&body, 3, 4).unwrap(), &b"3456"[..]);
386    }
387
388    #[test]
389    fn account_state_impl_new_for_account() {
390        use fakecloud_core::multi_account::AccountState;
391        let s = S3State::new_for_account("111122223333", "eu-west-1", "http://x");
392        assert_eq!(s.account_id, "111122223333");
393        assert_eq!(s.region, "eu-west-1");
394    }
395}