Skip to main content

aegis_object/
engine.rs

1//! The object / blob engine: S3-style buckets of binary objects with
2//! content-addressed ETags, prefix listing, and snapshot persistence.
3
4use crate::types::{etag_of, valid_bucket_name, ObjectError, ObjectMeta, DEFAULT_CONTENT_TYPE};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::collections::HashMap;
9
10#[derive(Debug, Clone, Serialize)]
11pub struct BucketStats {
12    pub name: String,
13    pub objects: usize,
14    pub bytes: usize,
15}
16
17/// A stored object: bytes plus its metadata.
18#[derive(Clone, Serialize, Deserialize)]
19struct StoredObject {
20    data: Vec<u8>,
21    content_type: String,
22    etag: String,
23    metadata: serde_json::Value,
24}
25
26impl StoredObject {
27    fn meta(&self, key: &str) -> ObjectMeta {
28        ObjectMeta {
29            key: key.to_string(),
30            size: self.data.len(),
31            content_type: self.content_type.clone(),
32            etag: self.etag.clone(),
33            metadata: self.metadata.clone(),
34        }
35    }
36}
37
38/// A bucket keeps its objects sorted by key so prefix listing is a range scan.
39#[derive(Default, Clone, Serialize, Deserialize)]
40struct Bucket {
41    objects: BTreeMap<String, StoredObject>,
42}
43
44/// Multi-bucket object store.
45pub struct ObjectEngine {
46    buckets: RwLock<HashMap<String, Bucket>>,
47}
48
49impl Default for ObjectEngine {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl ObjectEngine {
56    pub fn new() -> Self {
57        Self {
58            buckets: RwLock::new(HashMap::new()),
59        }
60    }
61
62    pub fn create_bucket(&self, name: impl Into<String>) -> Result<(), ObjectError> {
63        let name = name.into();
64        if !valid_bucket_name(&name) {
65            return Err(ObjectError::InvalidBucketName(name));
66        }
67        let mut buckets = self.buckets.write();
68        if buckets.contains_key(&name) {
69            return Err(ObjectError::BucketExists(name));
70        }
71        buckets.insert(name, Bucket::default());
72        Ok(())
73    }
74
75    pub fn drop_bucket(&self, name: &str) -> Result<(), ObjectError> {
76        self.buckets
77            .write()
78            .remove(name)
79            .map(|_| ())
80            .ok_or_else(|| ObjectError::BucketNotFound(name.to_string()))
81    }
82
83    pub fn list_buckets(&self) -> Vec<String> {
84        let mut v: Vec<String> = self.buckets.read().keys().cloned().collect();
85        v.sort();
86        v
87    }
88
89    pub fn bucket_stats(&self, name: &str) -> Option<BucketStats> {
90        let buckets = self.buckets.read();
91        let b = buckets.get(name)?;
92        Some(BucketStats {
93            name: name.to_string(),
94            objects: b.objects.len(),
95            bytes: b.objects.values().map(|o| o.data.len()).sum(),
96        })
97    }
98
99    /// Store (or replace) an object. Returns its metadata, including the ETag.
100    pub fn put(
101        &self,
102        bucket: &str,
103        key: impl Into<String>,
104        data: Vec<u8>,
105        content_type: Option<String>,
106        metadata: serde_json::Value,
107    ) -> Result<ObjectMeta, ObjectError> {
108        let key = key.into();
109        let mut buckets = self.buckets.write();
110        // Auto-create the bucket on first write (its name must still be valid).
111        if !buckets.contains_key(bucket) {
112            if !valid_bucket_name(bucket) {
113                return Err(ObjectError::InvalidBucketName(bucket.to_string()));
114            }
115            buckets.insert(bucket.to_string(), Bucket::default());
116        }
117        let b = buckets
118            .get_mut(bucket)
119            .expect("bucket present after auto-create");
120        let obj = StoredObject {
121            etag: etag_of(&data),
122            content_type: content_type.unwrap_or_else(|| DEFAULT_CONTENT_TYPE.to_string()),
123            metadata,
124            data,
125        };
126        let meta = obj.meta(&key);
127        b.objects.insert(key, obj);
128        Ok(meta)
129    }
130
131    /// Fetch an object's bytes + metadata, or `None` if absent.
132    pub fn get(
133        &self,
134        bucket: &str,
135        key: &str,
136    ) -> Result<Option<(Vec<u8>, ObjectMeta)>, ObjectError> {
137        let buckets = self.buckets.read();
138        let b = buckets
139            .get(bucket)
140            .ok_or_else(|| ObjectError::BucketNotFound(bucket.to_string()))?;
141        Ok(b.objects.get(key).map(|o| (o.data.clone(), o.meta(key))))
142    }
143
144    /// Fetch only an object's metadata (HEAD), or `None` if absent.
145    pub fn head(&self, bucket: &str, key: &str) -> Result<Option<ObjectMeta>, ObjectError> {
146        let buckets = self.buckets.read();
147        let b = buckets
148            .get(bucket)
149            .ok_or_else(|| ObjectError::BucketNotFound(bucket.to_string()))?;
150        Ok(b.objects.get(key).map(|o| o.meta(key)))
151    }
152
153    /// Delete an object; returns whether it existed.
154    pub fn delete(&self, bucket: &str, key: &str) -> Result<bool, ObjectError> {
155        let mut buckets = self.buckets.write();
156        let b = buckets
157            .get_mut(bucket)
158            .ok_or_else(|| ObjectError::BucketNotFound(bucket.to_string()))?;
159        Ok(b.objects.remove(key).is_some())
160    }
161
162    /// List object metadata in a bucket, optionally filtered by key prefix,
163    /// in lexical key order, capped at `limit`.
164    pub fn list(
165        &self,
166        bucket: &str,
167        prefix: &str,
168        limit: Option<usize>,
169    ) -> Result<Vec<ObjectMeta>, ObjectError> {
170        let buckets = self.buckets.read();
171        let b = buckets
172            .get(bucket)
173            .ok_or_else(|| ObjectError::BucketNotFound(bucket.to_string()))?;
174        let cap = limit.unwrap_or(usize::MAX);
175        // BTreeMap range from the prefix; stop as soon as a key stops matching.
176        let out = b
177            .objects
178            .range(prefix.to_string()..)
179            .take_while(|(k, _)| k.starts_with(prefix))
180            .take(cap)
181            .map(|(k, o)| o.meta(k))
182            .collect();
183        Ok(out)
184    }
185
186    // ---- Persistence --------------------------------------------------------
187
188    pub fn snapshot(&self) -> EngineSnapshot {
189        EngineSnapshot {
190            buckets: self.buckets.read().clone(),
191        }
192    }
193
194    pub fn load_snapshot(&self, snap: EngineSnapshot) {
195        *self.buckets.write() = snap.buckets;
196    }
197}
198
199#[derive(Serialize, Deserialize)]
200pub struct EngineSnapshot {
201    buckets: HashMap<String, Bucket>,
202}