object_store_client/
lib.rs

1use bytes::Bytes;
2use reqwest::{Client, StatusCode};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use thiserror::Error;
6
7#[derive(Error, Debug)]
8pub enum Error {
9    #[error("HTTP error: {0}")]
10    Http(#[from] reqwest::Error),
11
12    #[error("Not found: {0}")]
13    NotFound(String),
14
15    #[error("Already exists: {0}")]
16    AlreadyExists(String),
17
18    #[error("Bad request: {0}")]
19    BadRequest(String),
20
21    #[error("Server error: {0}")]
22    ServerError(String),
23}
24
25pub type Result<T> = std::result::Result<T, Error>;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct Bucket {
29    pub name: String,
30    pub created_at: String,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ObjectMetadata {
35    pub key: String,
36    pub size: u64,
37    pub content_type: Option<String>,
38    pub etag: String,
39    pub last_modified: String,
40    pub metadata: HashMap<String, String>,
41}
42
43#[derive(Debug, Clone)]
44pub struct ObjectData {
45    pub metadata: ObjectMetadata,
46    pub data: Bytes,
47}
48
49#[derive(Debug, Clone, Serialize)]
50struct CreateBucketRequest {
51    name: String,
52}
53
54#[derive(Debug, Deserialize)]
55struct ListBucketsResponse {
56    buckets: Vec<Bucket>,
57}
58
59#[derive(Debug, Deserialize)]
60struct ListObjectsResponse {
61    objects: Vec<ObjectMetadata>,
62}
63
64pub struct ObjectStoreClient {
65    client: Client,
66    base_url: String,
67}
68
69impl ObjectStoreClient {
70    pub fn new(base_url: impl Into<String>) -> Self {
71        Self {
72            client: Client::new(),
73            base_url: base_url.into(),
74        }
75    }
76
77    pub fn with_client(base_url: impl Into<String>, client: Client) -> Self {
78        Self {
79            client,
80            base_url: base_url.into(),
81        }
82    }
83
84    pub async fn create_bucket(&self, name: &str) -> Result<Bucket> {
85        let url = format!("{}/buckets", self.base_url);
86        let req = CreateBucketRequest {
87            name: name.to_string(),
88        };
89
90        let response = self.client.post(&url).json(&req).send().await?;
91
92        match response.status() {
93            StatusCode::OK => Ok(response.json().await?),
94            StatusCode::CONFLICT => Err(Error::AlreadyExists(name.to_string())),
95            StatusCode::BAD_REQUEST => {
96                Err(Error::BadRequest(response.text().await.unwrap_or_default()))
97            }
98            _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
99        }
100    }
101
102    pub async fn list_buckets(&self) -> Result<Vec<Bucket>> {
103        let url = format!("{}/buckets", self.base_url);
104        let response = self.client.get(&url).send().await?;
105
106        match response.status() {
107            StatusCode::OK => {
108                let resp: ListBucketsResponse = response.json().await?;
109                Ok(resp.buckets)
110            }
111            _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
112        }
113    }
114
115    pub async fn delete_bucket(&self, name: &str) -> Result<()> {
116        let url = format!("{}/buckets/{}", self.base_url, name);
117        let response = self.client.delete(&url).send().await?;
118
119        match response.status() {
120            StatusCode::NO_CONTENT => Ok(()),
121            StatusCode::NOT_FOUND => Err(Error::NotFound(name.to_string())),
122            StatusCode::BAD_REQUEST => {
123                Err(Error::BadRequest(response.text().await.unwrap_or_default()))
124            }
125            _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
126        }
127    }
128
129    pub async fn put_object(
130        &self,
131        bucket: &str,
132        key: &str,
133        data: impl Into<Bytes>,
134        content_type: Option<&str>,
135        metadata: Option<HashMap<String, String>>,
136    ) -> Result<ObjectMetadata> {
137        let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
138        let mut request = self.client.put(&url);
139
140        if let Some(ct) = content_type {
141            request = request.header("content-type", ct);
142        }
143
144        if let Some(meta) = metadata {
145            for (k, v) in meta {
146                request = request.header(format!("x-object-meta-{}", k), v);
147            }
148        }
149
150        let response = request.body(data.into()).send().await?;
151
152        match response.status() {
153            StatusCode::OK => Ok(response.json().await?),
154            StatusCode::NOT_FOUND => Err(Error::NotFound(bucket.to_string())),
155            StatusCode::BAD_REQUEST => {
156                Err(Error::BadRequest(response.text().await.unwrap_or_default()))
157            }
158            _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
159        }
160    }
161
162    pub async fn get_object(&self, bucket: &str, key: &str) -> Result<ObjectData> {
163        let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
164        let response = self.client.get(&url).send().await?;
165
166        match response.status() {
167            StatusCode::OK => {
168                let etag = response
169                    .headers()
170                    .get("etag")
171                    .and_then(|v| v.to_str().ok())
172                    .unwrap_or("")
173                    .to_string();
174
175                let last_modified = response
176                    .headers()
177                    .get("last-modified")
178                    .and_then(|v| v.to_str().ok())
179                    .unwrap_or("")
180                    .to_string();
181
182                let content_type = response
183                    .headers()
184                    .get("content-type")
185                    .and_then(|v| v.to_str().ok())
186                    .map(|s| s.to_string());
187
188                let size = response
189                    .headers()
190                    .get("content-length")
191                    .and_then(|v| v.to_str().ok())
192                    .and_then(|s| s.parse().ok())
193                    .unwrap_or(0);
194
195                let data = response.bytes().await?;
196
197                Ok(ObjectData {
198                    metadata: ObjectMetadata {
199                        key: key.to_string(),
200                        size,
201                        content_type,
202                        etag,
203                        last_modified,
204                        metadata: HashMap::new(),
205                    },
206                    data,
207                })
208            }
209            StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
210            _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
211        }
212    }
213
214    pub async fn head_object(&self, bucket: &str, key: &str) -> Result<ObjectMetadata> {
215        let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
216        let response = self.client.head(&url).send().await?;
217
218        match response.status() {
219            StatusCode::OK => {
220                let etag = response
221                    .headers()
222                    .get("etag")
223                    .and_then(|v| v.to_str().ok())
224                    .unwrap_or("")
225                    .to_string();
226
227                let last_modified = response
228                    .headers()
229                    .get("last-modified")
230                    .and_then(|v| v.to_str().ok())
231                    .unwrap_or("")
232                    .to_string();
233
234                let content_type = response
235                    .headers()
236                    .get("content-type")
237                    .and_then(|v| v.to_str().ok())
238                    .map(|s| s.to_string());
239
240                let size = response
241                    .headers()
242                    .get("content-length")
243                    .and_then(|v| v.to_str().ok())
244                    .and_then(|s| s.parse().ok())
245                    .unwrap_or(0);
246
247                Ok(ObjectMetadata {
248                    key: key.to_string(),
249                    size,
250                    content_type,
251                    etag,
252                    last_modified,
253                    metadata: HashMap::new(),
254                })
255            }
256            StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
257            _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
258        }
259    }
260
261    pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
262        let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
263        let response = self.client.delete(&url).send().await?;
264
265        match response.status() {
266            StatusCode::NO_CONTENT => Ok(()),
267            StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
268            _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
269        }
270    }
271
272    pub async fn list_objects(
273        &self,
274        bucket: &str,
275        prefix: Option<&str>,
276        max_keys: Option<usize>,
277    ) -> Result<Vec<ObjectMetadata>> {
278        let mut url = format!("{}/buckets/{}/objects", self.base_url, bucket);
279        let mut params = vec![];
280
281        if let Some(p) = prefix {
282            params.push(format!("prefix={}", p));
283        }
284        if let Some(m) = max_keys {
285            params.push(format!("max_keys={}", m));
286        }
287
288        if !params.is_empty() {
289            url.push('?');
290            url.push_str(&params.join("&"));
291        }
292
293        let response = self.client.get(&url).send().await?;
294
295        match response.status() {
296            StatusCode::OK => {
297                let resp: ListObjectsResponse = response.json().await?;
298                Ok(resp.objects)
299            }
300            StatusCode::NOT_FOUND => Err(Error::NotFound(bucket.to_string())),
301            _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
302        }
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use mockito::Server;
310
311    #[tokio::test]
312    async fn test_client_creation() {
313        let client = ObjectStoreClient::new("http://localhost:8080");
314        assert_eq!(client.base_url, "http://localhost:8080");
315    }
316
317    #[tokio::test]
318    async fn test_create_bucket() {
319        let mut server = Server::new_async().await;
320        let _m = server
321            .mock("POST", "/buckets")
322            .with_status(200)
323            .with_header("content-type", "application/json")
324            .with_body(r#"{"name":"test-bucket","created_at":"2024-01-01T00:00:00Z"}"#)
325            .create_async()
326            .await;
327
328        let client = ObjectStoreClient::new(&server.url());
329        let bucket = client.create_bucket("test-bucket").await.unwrap();
330
331        assert_eq!(bucket.name, "test-bucket");
332        assert_eq!(bucket.created_at, "2024-01-01T00:00:00Z");
333    }
334
335    #[tokio::test]
336    async fn test_create_bucket_conflict() {
337        let mut server = Server::new_async().await;
338        let _m = server
339            .mock("POST", "/buckets")
340            .with_status(409)
341            .with_body("Bucket already exists")
342            .create_async()
343            .await;
344
345        let client = ObjectStoreClient::new(&server.url());
346        let result = client.create_bucket("test-bucket").await;
347
348        assert!(result.is_err());
349        assert!(matches!(result.unwrap_err(), Error::AlreadyExists(_)));
350    }
351
352    #[tokio::test]
353    async fn test_list_buckets() {
354        let mut server = Server::new_async().await;
355        let _m = server
356            .mock("GET", "/buckets")
357            .with_status(200)
358            .with_header("content-type", "application/json")
359            .with_body(r#"{"buckets":[{"name":"bucket1","created_at":"2024-01-01T00:00:00Z"},{"name":"bucket2","created_at":"2024-01-02T00:00:00Z"}]}"#)
360            .create_async()
361            .await;
362
363        let client = ObjectStoreClient::new(&server.url());
364        let buckets = client.list_buckets().await.unwrap();
365
366        assert_eq!(buckets.len(), 2);
367        assert_eq!(buckets[0].name, "bucket1");
368        assert_eq!(buckets[1].name, "bucket2");
369    }
370
371    #[tokio::test]
372    async fn test_delete_bucket() {
373        let mut server = Server::new_async().await;
374        let _m = server
375            .mock("DELETE", "/buckets/test-bucket")
376            .with_status(204)
377            .create_async()
378            .await;
379
380        let client = ObjectStoreClient::new(&server.url());
381        let result = client.delete_bucket("test-bucket").await;
382
383        assert!(result.is_ok());
384    }
385
386    #[tokio::test]
387    async fn test_delete_bucket_not_found() {
388        let mut server = Server::new_async().await;
389        let _m = server
390            .mock("DELETE", "/buckets/test-bucket")
391            .with_status(404)
392            .with_body("Bucket not found")
393            .create_async()
394            .await;
395
396        let client = ObjectStoreClient::new(&server.url());
397        let result = client.delete_bucket("test-bucket").await;
398
399        assert!(result.is_err());
400        assert!(matches!(result.unwrap_err(), Error::NotFound(_)));
401    }
402
403    #[tokio::test]
404    async fn test_put_object() {
405        let mut server = Server::new_async().await;
406        let _m = server
407            .mock("PUT", "/buckets/test-bucket/objects/test-key")
408            .with_status(200)
409            .with_header("content-type", "application/json")
410            .with_body(r#"{"key":"test-key","size":13,"content_type":"text/plain","etag":"abc123","last_modified":"2024-01-01T00:00:00Z","metadata":{"key1":"value1"}}"#)
411            .create_async()
412            .await;
413
414        let client = ObjectStoreClient::new(&server.url());
415        let data = Bytes::from("Hello, World!");
416        let mut metadata = HashMap::new();
417        metadata.insert("key1".to_string(), "value1".to_string());
418
419        let obj = client
420            .put_object(
421                "test-bucket",
422                "test-key",
423                data,
424                Some("text/plain"),
425                Some(metadata),
426            )
427            .await
428            .unwrap();
429
430        assert_eq!(obj.key, "test-key");
431        assert_eq!(obj.size, 13);
432        assert_eq!(obj.etag, "abc123");
433    }
434
435    #[tokio::test]
436    async fn test_get_object() {
437        let mut server = Server::new_async().await;
438        let _m = server
439            .mock("GET", "/buckets/test-bucket/objects/test-key")
440            .with_status(200)
441            .with_header("content-type", "text/plain")
442            .with_header("content-length", "13")
443            .with_header("etag", "abc123")
444            .with_header("last-modified", "2024-01-01T00:00:00Z")
445            .with_body("Hello, World!")
446            .create_async()
447            .await;
448
449        let client = ObjectStoreClient::new(&server.url());
450        let obj = client.get_object("test-bucket", "test-key").await.unwrap();
451
452        assert_eq!(obj.metadata.key, "test-key");
453        assert_eq!(obj.metadata.size, 13);
454        assert_eq!(obj.metadata.etag, "abc123");
455        assert_eq!(obj.data, Bytes::from("Hello, World!"));
456    }
457
458    #[tokio::test]
459    async fn test_get_object_not_found() {
460        let mut server = Server::new_async().await;
461        let _m = server
462            .mock("GET", "/buckets/test-bucket/objects/test-key")
463            .with_status(404)
464            .with_body("Object not found")
465            .create_async()
466            .await;
467
468        let client = ObjectStoreClient::new(&server.url());
469        let result = client.get_object("test-bucket", "test-key").await;
470
471        assert!(result.is_err());
472        assert!(matches!(result.unwrap_err(), Error::NotFound(_)));
473    }
474
475    #[tokio::test]
476    async fn test_head_object() {
477        let mut server = Server::new_async().await;
478        let _m = server
479            .mock("HEAD", "/buckets/test-bucket/objects/test-key")
480            .with_status(200)
481            .with_header("content-type", "text/plain")
482            .with_header("content-length", "13")
483            .with_header("etag", "abc123")
484            .with_header("last-modified", "2024-01-01T00:00:00Z")
485            .create_async()
486            .await;
487
488        let client = ObjectStoreClient::new(&server.url());
489        let obj = client.head_object("test-bucket", "test-key").await.unwrap();
490
491        assert_eq!(obj.key, "test-key");
492        assert_eq!(obj.size, 13);
493        assert_eq!(obj.etag, "abc123");
494    }
495
496    #[tokio::test]
497    async fn test_delete_object() {
498        let mut server = Server::new_async().await;
499        let _m = server
500            .mock("DELETE", "/buckets/test-bucket/objects/test-key")
501            .with_status(204)
502            .create_async()
503            .await;
504
505        let client = ObjectStoreClient::new(&server.url());
506        let result = client.delete_object("test-bucket", "test-key").await;
507
508        assert!(result.is_ok());
509    }
510
511    #[tokio::test]
512    async fn test_list_objects() {
513        let mut server = Server::new_async().await;
514        let _m = server
515            .mock("GET", "/buckets/test-bucket/objects")
516            .match_query(mockito::Matcher::AllOf(vec![
517                mockito::Matcher::UrlEncoded("prefix".into(), "prefix/".into()),
518                mockito::Matcher::UrlEncoded("max_keys".into(), "10".into()),
519            ]))
520            .with_status(200)
521            .with_header("content-type", "application/json")
522            .with_body(r#"{"objects":[{"key":"prefix/obj1","size":100,"etag":"etag1","last_modified":"2024-01-01T00:00:00Z","metadata":{}},{"key":"prefix/obj2","size":200,"etag":"etag2","last_modified":"2024-01-02T00:00:00Z","metadata":{}}]}"#)
523            .create_async()
524            .await;
525
526        let client = ObjectStoreClient::new(&server.url());
527        let objects = client
528            .list_objects("test-bucket", Some("prefix/"), Some(10))
529            .await
530            .unwrap();
531
532        assert_eq!(objects.len(), 2);
533        assert_eq!(objects[0].key, "prefix/obj1");
534        assert_eq!(objects[1].key, "prefix/obj2");
535    }
536
537    #[tokio::test]
538    async fn test_list_objects_no_params() {
539        let mut server = Server::new_async().await;
540        let _m = server
541            .mock("GET", "/buckets/test-bucket/objects")
542            .with_status(200)
543            .with_header("content-type", "application/json")
544            .with_body(r#"{"objects":[]}"#)
545            .create_async()
546            .await;
547
548        let client = ObjectStoreClient::new(&server.url());
549        let objects = client.list_objects("test-bucket", None, None).await.unwrap();
550
551        assert_eq!(objects.len(), 0);
552    }
553}