object_store_client/
lib.rs

1use bytes::Bytes;
2use reqwest::{Client, StatusCode};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use thiserror::Error;
6
7#[derive(Error, Debug)]
8pub enum Error {
9    #[error("HTTP error: {0}")]
10    Http(#[from] reqwest::Error),
11
12    #[error("Not found: {0}")]
13    NotFound(String),
14
15    #[error("Already exists: {0}")]
16    AlreadyExists(String),
17
18    #[error("Bad request: {0}")]
19    BadRequest(String),
20
21    #[error("Server error: {0}")]
22    ServerError(String),
23}
24
25pub type Result<T> = std::result::Result<T, Error>;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct Bucket {
29    pub name: String,
30    pub created_at: String,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ObjectMetadata {
35    pub key: String,
36    pub size: u64,
37    pub content_type: Option<String>,
38    pub etag: String,
39    pub last_modified: String,
40    pub metadata: HashMap<String, String>,
41}
42
43#[derive(Debug, Clone)]
44pub struct ObjectData {
45    pub metadata: ObjectMetadata,
46    pub data: Bytes,
47}
48
49#[derive(Debug, Clone, Serialize)]
50struct CreateBucketRequest {
51    name: String,
52}
53
54#[derive(Debug, Deserialize)]
55struct ListBucketsResponse {
56    buckets: Vec<Bucket>,
57}
58
59#[derive(Debug, Deserialize)]
60struct ListObjectsResponse {
61    objects: Vec<ObjectMetadata>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct PublicUrlResponse {
66    pub url: String,
67    pub expires_in: u64,
68}
69
70pub struct ObjectStoreClient {
71    client: Client,
72    base_url: String,
73}
74
75impl ObjectStoreClient {
76    pub fn new(base_url: impl Into<String>) -> Self {
77        Self {
78            client: Client::new(),
79            base_url: base_url.into(),
80        }
81    }
82
83    pub fn with_client(base_url: impl Into<String>, client: Client) -> Self {
84        Self {
85            client,
86            base_url: base_url.into(),
87        }
88    }
89
90    pub async fn create_bucket(&self, name: &str) -> Result<Bucket> {
91        let url = format!("{}/buckets", self.base_url);
92        let req = CreateBucketRequest {
93            name: name.to_string(),
94        };
95
96        let response = self.client.post(&url).json(&req).send().await?;
97
98        match response.status() {
99            StatusCode::OK => Ok(response.json().await?),
100            StatusCode::CONFLICT => Err(Error::AlreadyExists(name.to_string())),
101            StatusCode::BAD_REQUEST => {
102                Err(Error::BadRequest(response.text().await.unwrap_or_default()))
103            }
104            _ => Err(Error::ServerError(
105                response.text().await.unwrap_or_default(),
106            )),
107        }
108    }
109
110    pub async fn list_buckets(&self) -> Result<Vec<Bucket>> {
111        let url = format!("{}/buckets", self.base_url);
112        let response = self.client.get(&url).send().await?;
113
114        match response.status() {
115            StatusCode::OK => {
116                let resp: ListBucketsResponse = response.json().await?;
117                Ok(resp.buckets)
118            }
119            _ => Err(Error::ServerError(
120                response.text().await.unwrap_or_default(),
121            )),
122        }
123    }
124
125    pub async fn delete_bucket(&self, name: &str) -> Result<()> {
126        let url = format!("{}/buckets/{}", self.base_url, name);
127        let response = self.client.delete(&url).send().await?;
128
129        match response.status() {
130            StatusCode::NO_CONTENT => Ok(()),
131            StatusCode::NOT_FOUND => Err(Error::NotFound(name.to_string())),
132            StatusCode::BAD_REQUEST => {
133                Err(Error::BadRequest(response.text().await.unwrap_or_default()))
134            }
135            _ => Err(Error::ServerError(
136                response.text().await.unwrap_or_default(),
137            )),
138        }
139    }
140
141    pub async fn put_object(
142        &self,
143        bucket: &str,
144        key: &str,
145        data: impl Into<Bytes>,
146        content_type: Option<&str>,
147        metadata: Option<HashMap<String, String>>,
148    ) -> Result<ObjectMetadata> {
149        let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
150        let mut request = self.client.put(&url);
151
152        if let Some(ct) = content_type {
153            request = request.header("content-type", ct);
154        }
155
156        if let Some(meta) = metadata {
157            for (k, v) in meta {
158                request = request.header(format!("x-object-meta-{}", k), v);
159            }
160        }
161
162        let response = request.body(data.into()).send().await?;
163
164        match response.status() {
165            StatusCode::OK => Ok(response.json().await?),
166            StatusCode::NOT_FOUND => Err(Error::NotFound(bucket.to_string())),
167            StatusCode::BAD_REQUEST => {
168                Err(Error::BadRequest(response.text().await.unwrap_or_default()))
169            }
170            _ => Err(Error::ServerError(
171                response.text().await.unwrap_or_default(),
172            )),
173        }
174    }
175
176    pub async fn get_object(&self, bucket: &str, key: &str) -> Result<ObjectData> {
177        let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
178        let response = self.client.get(&url).send().await?;
179
180        match response.status() {
181            StatusCode::OK => {
182                let etag = response
183                    .headers()
184                    .get("etag")
185                    .and_then(|v| v.to_str().ok())
186                    .unwrap_or("")
187                    .to_string();
188
189                let last_modified = response
190                    .headers()
191                    .get("last-modified")
192                    .and_then(|v| v.to_str().ok())
193                    .unwrap_or("")
194                    .to_string();
195
196                let content_type = response
197                    .headers()
198                    .get("content-type")
199                    .and_then(|v| v.to_str().ok())
200                    .map(|s| s.to_string());
201
202                let size = response
203                    .headers()
204                    .get("content-length")
205                    .and_then(|v| v.to_str().ok())
206                    .and_then(|s| s.parse().ok())
207                    .unwrap_or(0);
208
209                let data = response.bytes().await?;
210
211                Ok(ObjectData {
212                    metadata: ObjectMetadata {
213                        key: key.to_string(),
214                        size,
215                        content_type,
216                        etag,
217                        last_modified,
218                        metadata: HashMap::new(),
219                    },
220                    data,
221                })
222            }
223            StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
224            _ => Err(Error::ServerError(
225                response.text().await.unwrap_or_default(),
226            )),
227        }
228    }
229
230    pub async fn head_object(&self, bucket: &str, key: &str) -> Result<ObjectMetadata> {
231        let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
232        let response = self.client.head(&url).send().await?;
233
234        match response.status() {
235            StatusCode::OK => {
236                let etag = response
237                    .headers()
238                    .get("etag")
239                    .and_then(|v| v.to_str().ok())
240                    .unwrap_or("")
241                    .to_string();
242
243                let last_modified = response
244                    .headers()
245                    .get("last-modified")
246                    .and_then(|v| v.to_str().ok())
247                    .unwrap_or("")
248                    .to_string();
249
250                let content_type = response
251                    .headers()
252                    .get("content-type")
253                    .and_then(|v| v.to_str().ok())
254                    .map(|s| s.to_string());
255
256                let size = response
257                    .headers()
258                    .get("content-length")
259                    .and_then(|v| v.to_str().ok())
260                    .and_then(|s| s.parse().ok())
261                    .unwrap_or(0);
262
263                Ok(ObjectMetadata {
264                    key: key.to_string(),
265                    size,
266                    content_type,
267                    etag,
268                    last_modified,
269                    metadata: HashMap::new(),
270                })
271            }
272            StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
273            _ => Err(Error::ServerError(
274                response.text().await.unwrap_or_default(),
275            )),
276        }
277    }
278
279    pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
280        let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
281        let response = self.client.delete(&url).send().await?;
282
283        match response.status() {
284            StatusCode::NO_CONTENT => Ok(()),
285            StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
286            _ => Err(Error::ServerError(
287                response.text().await.unwrap_or_default(),
288            )),
289        }
290    }
291
292    pub async fn list_objects(
293        &self,
294        bucket: &str,
295        prefix: Option<&str>,
296        max_keys: Option<usize>,
297    ) -> Result<Vec<ObjectMetadata>> {
298        let mut url = format!("{}/buckets/{}/objects", self.base_url, bucket);
299        let mut params = vec![];
300
301        if let Some(p) = prefix {
302            params.push(format!("prefix={}", p));
303        }
304        if let Some(m) = max_keys {
305            params.push(format!("max_keys={}", m));
306        }
307
308        if !params.is_empty() {
309            url.push('?');
310            url.push_str(&params.join("&"));
311        }
312
313        let response = self.client.get(&url).send().await?;
314
315        match response.status() {
316            StatusCode::OK => {
317                let resp: ListObjectsResponse = response.json().await?;
318                Ok(resp.objects)
319            }
320            StatusCode::NOT_FOUND => Err(Error::NotFound(bucket.to_string())),
321            _ => Err(Error::ServerError(
322                response.text().await.unwrap_or_default(),
323            )),
324        }
325    }
326
327    pub async fn get_public_url(
328        &self,
329        bucket: &str,
330        key: &str,
331        expiration_secs: Option<u64>,
332    ) -> Result<PublicUrlResponse> {
333        let mut url = format!("{}/buckets/{}/public-url/{}", self.base_url, bucket, key);
334
335        if let Some(exp) = expiration_secs {
336            url.push_str(&format!("?expiration_secs={}", exp));
337        }
338
339        let response = self.client.get(&url).send().await?;
340
341        match response.status() {
342            StatusCode::OK => Ok(response.json().await?),
343            StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
344            StatusCode::BAD_REQUEST => {
345                Err(Error::BadRequest(response.text().await.unwrap_or_default()))
346            }
347            _ => Err(Error::ServerError(
348                response.text().await.unwrap_or_default(),
349            )),
350        }
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use mockito::Server;
358
359    #[tokio::test]
360    async fn test_client_creation() {
361        let client = ObjectStoreClient::new("http://localhost:8080");
362        assert_eq!(client.base_url, "http://localhost:8080");
363    }
364
365    #[tokio::test]
366    async fn test_create_bucket() {
367        let mut server = Server::new_async().await;
368        let _m = server
369            .mock("POST", "/buckets")
370            .with_status(200)
371            .with_header("content-type", "application/json")
372            .with_body(r#"{"name":"test-bucket","created_at":"2024-01-01T00:00:00Z"}"#)
373            .create_async()
374            .await;
375
376        let client = ObjectStoreClient::new(&server.url());
377        let bucket = client.create_bucket("test-bucket").await.unwrap();
378
379        assert_eq!(bucket.name, "test-bucket");
380        assert_eq!(bucket.created_at, "2024-01-01T00:00:00Z");
381    }
382
383    #[tokio::test]
384    async fn test_create_bucket_conflict() {
385        let mut server = Server::new_async().await;
386        let _m = server
387            .mock("POST", "/buckets")
388            .with_status(409)
389            .with_body("Bucket already exists")
390            .create_async()
391            .await;
392
393        let client = ObjectStoreClient::new(&server.url());
394        let result = client.create_bucket("test-bucket").await;
395
396        assert!(result.is_err());
397        assert!(matches!(result.unwrap_err(), Error::AlreadyExists(_)));
398    }
399
400    #[tokio::test]
401    async fn test_list_buckets() {
402        let mut server = Server::new_async().await;
403        let _m = server
404            .mock("GET", "/buckets")
405            .with_status(200)
406            .with_header("content-type", "application/json")
407            .with_body(r#"{"buckets":[{"name":"bucket1","created_at":"2024-01-01T00:00:00Z"},{"name":"bucket2","created_at":"2024-01-02T00:00:00Z"}]}"#)
408            .create_async()
409            .await;
410
411        let client = ObjectStoreClient::new(&server.url());
412        let buckets = client.list_buckets().await.unwrap();
413
414        assert_eq!(buckets.len(), 2);
415        assert_eq!(buckets[0].name, "bucket1");
416        assert_eq!(buckets[1].name, "bucket2");
417    }
418
419    #[tokio::test]
420    async fn test_delete_bucket() {
421        let mut server = Server::new_async().await;
422        let _m = server
423            .mock("DELETE", "/buckets/test-bucket")
424            .with_status(204)
425            .create_async()
426            .await;
427
428        let client = ObjectStoreClient::new(&server.url());
429        let result = client.delete_bucket("test-bucket").await;
430
431        assert!(result.is_ok());
432    }
433
434    #[tokio::test]
435    async fn test_delete_bucket_not_found() {
436        let mut server = Server::new_async().await;
437        let _m = server
438            .mock("DELETE", "/buckets/test-bucket")
439            .with_status(404)
440            .with_body("Bucket not found")
441            .create_async()
442            .await;
443
444        let client = ObjectStoreClient::new(&server.url());
445        let result = client.delete_bucket("test-bucket").await;
446
447        assert!(result.is_err());
448        assert!(matches!(result.unwrap_err(), Error::NotFound(_)));
449    }
450
451    #[tokio::test]
452    async fn test_put_object() {
453        let mut server = Server::new_async().await;
454        let _m = server
455            .mock("PUT", "/buckets/test-bucket/objects/test-key")
456            .with_status(200)
457            .with_header("content-type", "application/json")
458            .with_body(r#"{"key":"test-key","size":13,"content_type":"text/plain","etag":"abc123","last_modified":"2024-01-01T00:00:00Z","metadata":{"key1":"value1"}}"#)
459            .create_async()
460            .await;
461
462        let client = ObjectStoreClient::new(&server.url());
463        let data = Bytes::from("Hello, World!");
464        let mut metadata = HashMap::new();
465        metadata.insert("key1".to_string(), "value1".to_string());
466
467        let obj = client
468            .put_object(
469                "test-bucket",
470                "test-key",
471                data,
472                Some("text/plain"),
473                Some(metadata),
474            )
475            .await
476            .unwrap();
477
478        assert_eq!(obj.key, "test-key");
479        assert_eq!(obj.size, 13);
480        assert_eq!(obj.etag, "abc123");
481    }
482
483    #[tokio::test]
484    async fn test_get_object() {
485        let mut server = Server::new_async().await;
486        let _m = server
487            .mock("GET", "/buckets/test-bucket/objects/test-key")
488            .with_status(200)
489            .with_header("content-type", "text/plain")
490            .with_header("content-length", "13")
491            .with_header("etag", "abc123")
492            .with_header("last-modified", "2024-01-01T00:00:00Z")
493            .with_body("Hello, World!")
494            .create_async()
495            .await;
496
497        let client = ObjectStoreClient::new(&server.url());
498        let obj = client.get_object("test-bucket", "test-key").await.unwrap();
499
500        assert_eq!(obj.metadata.key, "test-key");
501        assert_eq!(obj.metadata.size, 13);
502        assert_eq!(obj.metadata.etag, "abc123");
503        assert_eq!(obj.data, Bytes::from("Hello, World!"));
504    }
505
506    #[tokio::test]
507    async fn test_get_object_not_found() {
508        let mut server = Server::new_async().await;
509        let _m = server
510            .mock("GET", "/buckets/test-bucket/objects/test-key")
511            .with_status(404)
512            .with_body("Object not found")
513            .create_async()
514            .await;
515
516        let client = ObjectStoreClient::new(&server.url());
517        let result = client.get_object("test-bucket", "test-key").await;
518
519        assert!(result.is_err());
520        assert!(matches!(result.unwrap_err(), Error::NotFound(_)));
521    }
522
523    #[tokio::test]
524    async fn test_head_object() {
525        let mut server = Server::new_async().await;
526        let _m = server
527            .mock("HEAD", "/buckets/test-bucket/objects/test-key")
528            .with_status(200)
529            .with_header("content-type", "text/plain")
530            .with_header("content-length", "13")
531            .with_header("etag", "abc123")
532            .with_header("last-modified", "2024-01-01T00:00:00Z")
533            .create_async()
534            .await;
535
536        let client = ObjectStoreClient::new(&server.url());
537        let obj = client.head_object("test-bucket", "test-key").await.unwrap();
538
539        assert_eq!(obj.key, "test-key");
540        assert_eq!(obj.size, 13);
541        assert_eq!(obj.etag, "abc123");
542    }
543
544    #[tokio::test]
545    async fn test_delete_object() {
546        let mut server = Server::new_async().await;
547        let _m = server
548            .mock("DELETE", "/buckets/test-bucket/objects/test-key")
549            .with_status(204)
550            .create_async()
551            .await;
552
553        let client = ObjectStoreClient::new(&server.url());
554        let result = client.delete_object("test-bucket", "test-key").await;
555
556        assert!(result.is_ok());
557    }
558
559    #[tokio::test]
560    async fn test_list_objects() {
561        let mut server = Server::new_async().await;
562        let _m = server
563            .mock("GET", "/buckets/test-bucket/objects")
564            .match_query(mockito::Matcher::AllOf(vec![
565                mockito::Matcher::UrlEncoded("prefix".into(), "prefix/".into()),
566                mockito::Matcher::UrlEncoded("max_keys".into(), "10".into()),
567            ]))
568            .with_status(200)
569            .with_header("content-type", "application/json")
570            .with_body(r#"{"objects":[{"key":"prefix/obj1","size":100,"etag":"etag1","last_modified":"2024-01-01T00:00:00Z","metadata":{}},{"key":"prefix/obj2","size":200,"etag":"etag2","last_modified":"2024-01-02T00:00:00Z","metadata":{}}]}"#)
571            .create_async()
572            .await;
573
574        let client = ObjectStoreClient::new(&server.url());
575        let objects = client
576            .list_objects("test-bucket", Some("prefix/"), Some(10))
577            .await
578            .unwrap();
579
580        assert_eq!(objects.len(), 2);
581        assert_eq!(objects[0].key, "prefix/obj1");
582        assert_eq!(objects[1].key, "prefix/obj2");
583    }
584
585    #[tokio::test]
586    async fn test_list_objects_no_params() {
587        let mut server = Server::new_async().await;
588        let _m = server
589            .mock("GET", "/buckets/test-bucket/objects")
590            .with_status(200)
591            .with_header("content-type", "application/json")
592            .with_body(r#"{"objects":[]}"#)
593            .create_async()
594            .await;
595
596        let client = ObjectStoreClient::new(&server.url());
597        let objects = client
598            .list_objects("test-bucket", None, None)
599            .await
600            .unwrap();
601
602        assert_eq!(objects.len(), 0);
603    }
604
605    #[tokio::test]
606    async fn test_get_public_url() {
607        let mut server = Server::new_async().await;
608        let _m = server
609            .mock("GET", "/buckets/test-bucket/public-url/test-key")
610            .match_query(mockito::Matcher::UrlEncoded(
611                "expiration_secs".into(),
612                "7200".into(),
613            ))
614            .with_status(200)
615            .with_header("content-type", "application/json")
616            .with_body(
617                r#"{"url":"https://example.com/signed-url?signature=abc123","expires_in":7200}"#,
618            )
619            .create_async()
620            .await;
621
622        let client = ObjectStoreClient::new(&server.url());
623        let response = client
624            .get_public_url("test-bucket", "test-key", Some(7200))
625            .await
626            .unwrap();
627
628        assert_eq!(
629            response.url,
630            "https://example.com/signed-url?signature=abc123"
631        );
632        assert_eq!(response.expires_in, 7200);
633    }
634
635    #[tokio::test]
636    async fn test_get_public_url_default_expiration() {
637        let mut server = Server::new_async().await;
638        let _m = server
639            .mock("GET", "/buckets/test-bucket/public-url/test-key")
640            .with_status(200)
641            .with_header("content-type", "application/json")
642            .with_body(
643                r#"{"url":"https://example.com/signed-url?signature=xyz789","expires_in":3600}"#,
644            )
645            .create_async()
646            .await;
647
648        let client = ObjectStoreClient::new(&server.url());
649        let response = client
650            .get_public_url("test-bucket", "test-key", None)
651            .await
652            .unwrap();
653
654        assert_eq!(
655            response.url,
656            "https://example.com/signed-url?signature=xyz789"
657        );
658        assert_eq!(response.expires_in, 3600);
659    }
660
661    #[tokio::test]
662    async fn test_get_public_url_not_found() {
663        let mut server = Server::new_async().await;
664        let _m = server
665            .mock("GET", "/buckets/test-bucket/public-url/test-key")
666            .with_status(404)
667            .with_body("Object not found")
668            .create_async()
669            .await;
670
671        let client = ObjectStoreClient::new(&server.url());
672        let result = client.get_public_url("test-bucket", "test-key", None).await;
673
674        assert!(result.is_err());
675        assert!(matches!(result.unwrap_err(), Error::NotFound(_)));
676    }
677}