1use bytes::Bytes;
2use reqwest::{Client, StatusCode};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use thiserror::Error;
6
7#[derive(Error, Debug)]
8pub enum Error {
9 #[error("HTTP error: {0}")]
10 Http(#[from] reqwest::Error),
11
12 #[error("Not found: {0}")]
13 NotFound(String),
14
15 #[error("Already exists: {0}")]
16 AlreadyExists(String),
17
18 #[error("Bad request: {0}")]
19 BadRequest(String),
20
21 #[error("Server error: {0}")]
22 ServerError(String),
23}
24
25pub type Result<T> = std::result::Result<T, Error>;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct Bucket {
29 pub name: String,
30 pub created_at: String,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ObjectMetadata {
35 pub key: String,
36 pub size: u64,
37 pub content_type: Option<String>,
38 pub etag: String,
39 pub last_modified: String,
40 pub metadata: HashMap<String, String>,
41}
42
43#[derive(Debug, Clone)]
44pub struct ObjectData {
45 pub metadata: ObjectMetadata,
46 pub data: Bytes,
47}
48
49#[derive(Debug, Clone, Serialize)]
50struct CreateBucketRequest {
51 name: String,
52}
53
54#[derive(Debug, Deserialize)]
55struct ListBucketsResponse {
56 buckets: Vec<Bucket>,
57}
58
59#[derive(Debug, Deserialize)]
60struct ListObjectsResponse {
61 objects: Vec<ObjectMetadata>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct PublicUrlResponse {
66 pub url: String,
67 pub expires_in: u64,
68}
69
70pub struct ObjectStoreClient {
71 client: Client,
72 base_url: String,
73}
74
75impl ObjectStoreClient {
76 pub fn new(base_url: impl Into<String>) -> Self {
77 Self {
78 client: Client::new(),
79 base_url: base_url.into(),
80 }
81 }
82
83 pub fn with_client(base_url: impl Into<String>, client: Client) -> Self {
84 Self {
85 client,
86 base_url: base_url.into(),
87 }
88 }
89
90 pub async fn create_bucket(&self, name: &str) -> Result<Bucket> {
91 let url = format!("{}/buckets", self.base_url);
92 let req = CreateBucketRequest {
93 name: name.to_string(),
94 };
95
96 let response = self.client.post(&url).json(&req).send().await?;
97
98 match response.status() {
99 StatusCode::OK => Ok(response.json().await?),
100 StatusCode::CONFLICT => Err(Error::AlreadyExists(name.to_string())),
101 StatusCode::BAD_REQUEST => {
102 Err(Error::BadRequest(response.text().await.unwrap_or_default()))
103 }
104 _ => Err(Error::ServerError(
105 response.text().await.unwrap_or_default(),
106 )),
107 }
108 }
109
110 pub async fn list_buckets(&self) -> Result<Vec<Bucket>> {
111 let url = format!("{}/buckets", self.base_url);
112 let response = self.client.get(&url).send().await?;
113
114 match response.status() {
115 StatusCode::OK => {
116 let resp: ListBucketsResponse = response.json().await?;
117 Ok(resp.buckets)
118 }
119 _ => Err(Error::ServerError(
120 response.text().await.unwrap_or_default(),
121 )),
122 }
123 }
124
125 pub async fn delete_bucket(&self, name: &str) -> Result<()> {
126 let url = format!("{}/buckets/{}", self.base_url, name);
127 let response = self.client.delete(&url).send().await?;
128
129 match response.status() {
130 StatusCode::NO_CONTENT => Ok(()),
131 StatusCode::NOT_FOUND => Err(Error::NotFound(name.to_string())),
132 StatusCode::BAD_REQUEST => {
133 Err(Error::BadRequest(response.text().await.unwrap_or_default()))
134 }
135 _ => Err(Error::ServerError(
136 response.text().await.unwrap_or_default(),
137 )),
138 }
139 }
140
141 pub async fn put_object(
142 &self,
143 bucket: &str,
144 key: &str,
145 data: impl Into<Bytes>,
146 content_type: Option<&str>,
147 metadata: Option<HashMap<String, String>>,
148 ) -> Result<ObjectMetadata> {
149 let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
150 let mut request = self.client.put(&url);
151
152 if let Some(ct) = content_type {
153 request = request.header("content-type", ct);
154 }
155
156 if let Some(meta) = metadata {
157 for (k, v) in meta {
158 request = request.header(format!("x-object-meta-{}", k), v);
159 }
160 }
161
162 let response = request.body(data.into()).send().await?;
163
164 match response.status() {
165 StatusCode::OK => Ok(response.json().await?),
166 StatusCode::NOT_FOUND => Err(Error::NotFound(bucket.to_string())),
167 StatusCode::BAD_REQUEST => {
168 Err(Error::BadRequest(response.text().await.unwrap_or_default()))
169 }
170 _ => Err(Error::ServerError(
171 response.text().await.unwrap_or_default(),
172 )),
173 }
174 }
175
176 pub async fn get_object(&self, bucket: &str, key: &str) -> Result<ObjectData> {
177 let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
178 let response = self.client.get(&url).send().await?;
179
180 match response.status() {
181 StatusCode::OK => {
182 let etag = response
183 .headers()
184 .get("etag")
185 .and_then(|v| v.to_str().ok())
186 .unwrap_or("")
187 .to_string();
188
189 let last_modified = response
190 .headers()
191 .get("last-modified")
192 .and_then(|v| v.to_str().ok())
193 .unwrap_or("")
194 .to_string();
195
196 let content_type = response
197 .headers()
198 .get("content-type")
199 .and_then(|v| v.to_str().ok())
200 .map(|s| s.to_string());
201
202 let size = response
203 .headers()
204 .get("content-length")
205 .and_then(|v| v.to_str().ok())
206 .and_then(|s| s.parse().ok())
207 .unwrap_or(0);
208
209 let data = response.bytes().await?;
210
211 Ok(ObjectData {
212 metadata: ObjectMetadata {
213 key: key.to_string(),
214 size,
215 content_type,
216 etag,
217 last_modified,
218 metadata: HashMap::new(),
219 },
220 data,
221 })
222 }
223 StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
224 _ => Err(Error::ServerError(
225 response.text().await.unwrap_or_default(),
226 )),
227 }
228 }
229
230 pub async fn head_object(&self, bucket: &str, key: &str) -> Result<ObjectMetadata> {
231 let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
232 let response = self.client.head(&url).send().await?;
233
234 match response.status() {
235 StatusCode::OK => {
236 let etag = response
237 .headers()
238 .get("etag")
239 .and_then(|v| v.to_str().ok())
240 .unwrap_or("")
241 .to_string();
242
243 let last_modified = response
244 .headers()
245 .get("last-modified")
246 .and_then(|v| v.to_str().ok())
247 .unwrap_or("")
248 .to_string();
249
250 let content_type = response
251 .headers()
252 .get("content-type")
253 .and_then(|v| v.to_str().ok())
254 .map(|s| s.to_string());
255
256 let size = response
257 .headers()
258 .get("content-length")
259 .and_then(|v| v.to_str().ok())
260 .and_then(|s| s.parse().ok())
261 .unwrap_or(0);
262
263 Ok(ObjectMetadata {
264 key: key.to_string(),
265 size,
266 content_type,
267 etag,
268 last_modified,
269 metadata: HashMap::new(),
270 })
271 }
272 StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
273 _ => Err(Error::ServerError(
274 response.text().await.unwrap_or_default(),
275 )),
276 }
277 }
278
279 pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
280 let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
281 let response = self.client.delete(&url).send().await?;
282
283 match response.status() {
284 StatusCode::NO_CONTENT => Ok(()),
285 StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
286 _ => Err(Error::ServerError(
287 response.text().await.unwrap_or_default(),
288 )),
289 }
290 }
291
292 pub async fn list_objects(
293 &self,
294 bucket: &str,
295 prefix: Option<&str>,
296 max_keys: Option<usize>,
297 ) -> Result<Vec<ObjectMetadata>> {
298 let mut url = format!("{}/buckets/{}/objects", self.base_url, bucket);
299 let mut params = vec![];
300
301 if let Some(p) = prefix {
302 params.push(format!("prefix={}", p));
303 }
304 if let Some(m) = max_keys {
305 params.push(format!("max_keys={}", m));
306 }
307
308 if !params.is_empty() {
309 url.push('?');
310 url.push_str(¶ms.join("&"));
311 }
312
313 let response = self.client.get(&url).send().await?;
314
315 match response.status() {
316 StatusCode::OK => {
317 let resp: ListObjectsResponse = response.json().await?;
318 Ok(resp.objects)
319 }
320 StatusCode::NOT_FOUND => Err(Error::NotFound(bucket.to_string())),
321 _ => Err(Error::ServerError(
322 response.text().await.unwrap_or_default(),
323 )),
324 }
325 }
326
327 pub async fn get_public_url(
328 &self,
329 bucket: &str,
330 key: &str,
331 expiration_secs: Option<u64>,
332 ) -> Result<PublicUrlResponse> {
333 let mut url = format!("{}/buckets/{}/public-url/{}", self.base_url, bucket, key);
334
335 if let Some(exp) = expiration_secs {
336 url.push_str(&format!("?expiration_secs={}", exp));
337 }
338
339 let response = self.client.get(&url).send().await?;
340
341 match response.status() {
342 StatusCode::OK => Ok(response.json().await?),
343 StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
344 StatusCode::BAD_REQUEST => {
345 Err(Error::BadRequest(response.text().await.unwrap_or_default()))
346 }
347 _ => Err(Error::ServerError(
348 response.text().await.unwrap_or_default(),
349 )),
350 }
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use mockito::Server;
358
359 #[tokio::test]
360 async fn test_client_creation() {
361 let client = ObjectStoreClient::new("http://localhost:8080");
362 assert_eq!(client.base_url, "http://localhost:8080");
363 }
364
365 #[tokio::test]
366 async fn test_create_bucket() {
367 let mut server = Server::new_async().await;
368 let _m = server
369 .mock("POST", "/buckets")
370 .with_status(200)
371 .with_header("content-type", "application/json")
372 .with_body(r#"{"name":"test-bucket","created_at":"2024-01-01T00:00:00Z"}"#)
373 .create_async()
374 .await;
375
376 let client = ObjectStoreClient::new(&server.url());
377 let bucket = client.create_bucket("test-bucket").await.unwrap();
378
379 assert_eq!(bucket.name, "test-bucket");
380 assert_eq!(bucket.created_at, "2024-01-01T00:00:00Z");
381 }
382
383 #[tokio::test]
384 async fn test_create_bucket_conflict() {
385 let mut server = Server::new_async().await;
386 let _m = server
387 .mock("POST", "/buckets")
388 .with_status(409)
389 .with_body("Bucket already exists")
390 .create_async()
391 .await;
392
393 let client = ObjectStoreClient::new(&server.url());
394 let result = client.create_bucket("test-bucket").await;
395
396 assert!(result.is_err());
397 assert!(matches!(result.unwrap_err(), Error::AlreadyExists(_)));
398 }
399
400 #[tokio::test]
401 async fn test_list_buckets() {
402 let mut server = Server::new_async().await;
403 let _m = server
404 .mock("GET", "/buckets")
405 .with_status(200)
406 .with_header("content-type", "application/json")
407 .with_body(r#"{"buckets":[{"name":"bucket1","created_at":"2024-01-01T00:00:00Z"},{"name":"bucket2","created_at":"2024-01-02T00:00:00Z"}]}"#)
408 .create_async()
409 .await;
410
411 let client = ObjectStoreClient::new(&server.url());
412 let buckets = client.list_buckets().await.unwrap();
413
414 assert_eq!(buckets.len(), 2);
415 assert_eq!(buckets[0].name, "bucket1");
416 assert_eq!(buckets[1].name, "bucket2");
417 }
418
419 #[tokio::test]
420 async fn test_delete_bucket() {
421 let mut server = Server::new_async().await;
422 let _m = server
423 .mock("DELETE", "/buckets/test-bucket")
424 .with_status(204)
425 .create_async()
426 .await;
427
428 let client = ObjectStoreClient::new(&server.url());
429 let result = client.delete_bucket("test-bucket").await;
430
431 assert!(result.is_ok());
432 }
433
434 #[tokio::test]
435 async fn test_delete_bucket_not_found() {
436 let mut server = Server::new_async().await;
437 let _m = server
438 .mock("DELETE", "/buckets/test-bucket")
439 .with_status(404)
440 .with_body("Bucket not found")
441 .create_async()
442 .await;
443
444 let client = ObjectStoreClient::new(&server.url());
445 let result = client.delete_bucket("test-bucket").await;
446
447 assert!(result.is_err());
448 assert!(matches!(result.unwrap_err(), Error::NotFound(_)));
449 }
450
451 #[tokio::test]
452 async fn test_put_object() {
453 let mut server = Server::new_async().await;
454 let _m = server
455 .mock("PUT", "/buckets/test-bucket/objects/test-key")
456 .with_status(200)
457 .with_header("content-type", "application/json")
458 .with_body(r#"{"key":"test-key","size":13,"content_type":"text/plain","etag":"abc123","last_modified":"2024-01-01T00:00:00Z","metadata":{"key1":"value1"}}"#)
459 .create_async()
460 .await;
461
462 let client = ObjectStoreClient::new(&server.url());
463 let data = Bytes::from("Hello, World!");
464 let mut metadata = HashMap::new();
465 metadata.insert("key1".to_string(), "value1".to_string());
466
467 let obj = client
468 .put_object(
469 "test-bucket",
470 "test-key",
471 data,
472 Some("text/plain"),
473 Some(metadata),
474 )
475 .await
476 .unwrap();
477
478 assert_eq!(obj.key, "test-key");
479 assert_eq!(obj.size, 13);
480 assert_eq!(obj.etag, "abc123");
481 }
482
483 #[tokio::test]
484 async fn test_get_object() {
485 let mut server = Server::new_async().await;
486 let _m = server
487 .mock("GET", "/buckets/test-bucket/objects/test-key")
488 .with_status(200)
489 .with_header("content-type", "text/plain")
490 .with_header("content-length", "13")
491 .with_header("etag", "abc123")
492 .with_header("last-modified", "2024-01-01T00:00:00Z")
493 .with_body("Hello, World!")
494 .create_async()
495 .await;
496
497 let client = ObjectStoreClient::new(&server.url());
498 let obj = client.get_object("test-bucket", "test-key").await.unwrap();
499
500 assert_eq!(obj.metadata.key, "test-key");
501 assert_eq!(obj.metadata.size, 13);
502 assert_eq!(obj.metadata.etag, "abc123");
503 assert_eq!(obj.data, Bytes::from("Hello, World!"));
504 }
505
506 #[tokio::test]
507 async fn test_get_object_not_found() {
508 let mut server = Server::new_async().await;
509 let _m = server
510 .mock("GET", "/buckets/test-bucket/objects/test-key")
511 .with_status(404)
512 .with_body("Object not found")
513 .create_async()
514 .await;
515
516 let client = ObjectStoreClient::new(&server.url());
517 let result = client.get_object("test-bucket", "test-key").await;
518
519 assert!(result.is_err());
520 assert!(matches!(result.unwrap_err(), Error::NotFound(_)));
521 }
522
523 #[tokio::test]
524 async fn test_head_object() {
525 let mut server = Server::new_async().await;
526 let _m = server
527 .mock("HEAD", "/buckets/test-bucket/objects/test-key")
528 .with_status(200)
529 .with_header("content-type", "text/plain")
530 .with_header("content-length", "13")
531 .with_header("etag", "abc123")
532 .with_header("last-modified", "2024-01-01T00:00:00Z")
533 .create_async()
534 .await;
535
536 let client = ObjectStoreClient::new(&server.url());
537 let obj = client.head_object("test-bucket", "test-key").await.unwrap();
538
539 assert_eq!(obj.key, "test-key");
540 assert_eq!(obj.size, 13);
541 assert_eq!(obj.etag, "abc123");
542 }
543
544 #[tokio::test]
545 async fn test_delete_object() {
546 let mut server = Server::new_async().await;
547 let _m = server
548 .mock("DELETE", "/buckets/test-bucket/objects/test-key")
549 .with_status(204)
550 .create_async()
551 .await;
552
553 let client = ObjectStoreClient::new(&server.url());
554 let result = client.delete_object("test-bucket", "test-key").await;
555
556 assert!(result.is_ok());
557 }
558
559 #[tokio::test]
560 async fn test_list_objects() {
561 let mut server = Server::new_async().await;
562 let _m = server
563 .mock("GET", "/buckets/test-bucket/objects")
564 .match_query(mockito::Matcher::AllOf(vec![
565 mockito::Matcher::UrlEncoded("prefix".into(), "prefix/".into()),
566 mockito::Matcher::UrlEncoded("max_keys".into(), "10".into()),
567 ]))
568 .with_status(200)
569 .with_header("content-type", "application/json")
570 .with_body(r#"{"objects":[{"key":"prefix/obj1","size":100,"etag":"etag1","last_modified":"2024-01-01T00:00:00Z","metadata":{}},{"key":"prefix/obj2","size":200,"etag":"etag2","last_modified":"2024-01-02T00:00:00Z","metadata":{}}]}"#)
571 .create_async()
572 .await;
573
574 let client = ObjectStoreClient::new(&server.url());
575 let objects = client
576 .list_objects("test-bucket", Some("prefix/"), Some(10))
577 .await
578 .unwrap();
579
580 assert_eq!(objects.len(), 2);
581 assert_eq!(objects[0].key, "prefix/obj1");
582 assert_eq!(objects[1].key, "prefix/obj2");
583 }
584
585 #[tokio::test]
586 async fn test_list_objects_no_params() {
587 let mut server = Server::new_async().await;
588 let _m = server
589 .mock("GET", "/buckets/test-bucket/objects")
590 .with_status(200)
591 .with_header("content-type", "application/json")
592 .with_body(r#"{"objects":[]}"#)
593 .create_async()
594 .await;
595
596 let client = ObjectStoreClient::new(&server.url());
597 let objects = client
598 .list_objects("test-bucket", None, None)
599 .await
600 .unwrap();
601
602 assert_eq!(objects.len(), 0);
603 }
604
605 #[tokio::test]
606 async fn test_get_public_url() {
607 let mut server = Server::new_async().await;
608 let _m = server
609 .mock("GET", "/buckets/test-bucket/public-url/test-key")
610 .match_query(mockito::Matcher::UrlEncoded(
611 "expiration_secs".into(),
612 "7200".into(),
613 ))
614 .with_status(200)
615 .with_header("content-type", "application/json")
616 .with_body(
617 r#"{"url":"https://example.com/signed-url?signature=abc123","expires_in":7200}"#,
618 )
619 .create_async()
620 .await;
621
622 let client = ObjectStoreClient::new(&server.url());
623 let response = client
624 .get_public_url("test-bucket", "test-key", Some(7200))
625 .await
626 .unwrap();
627
628 assert_eq!(
629 response.url,
630 "https://example.com/signed-url?signature=abc123"
631 );
632 assert_eq!(response.expires_in, 7200);
633 }
634
635 #[tokio::test]
636 async fn test_get_public_url_default_expiration() {
637 let mut server = Server::new_async().await;
638 let _m = server
639 .mock("GET", "/buckets/test-bucket/public-url/test-key")
640 .with_status(200)
641 .with_header("content-type", "application/json")
642 .with_body(
643 r#"{"url":"https://example.com/signed-url?signature=xyz789","expires_in":3600}"#,
644 )
645 .create_async()
646 .await;
647
648 let client = ObjectStoreClient::new(&server.url());
649 let response = client
650 .get_public_url("test-bucket", "test-key", None)
651 .await
652 .unwrap();
653
654 assert_eq!(
655 response.url,
656 "https://example.com/signed-url?signature=xyz789"
657 );
658 assert_eq!(response.expires_in, 3600);
659 }
660
661 #[tokio::test]
662 async fn test_get_public_url_not_found() {
663 let mut server = Server::new_async().await;
664 let _m = server
665 .mock("GET", "/buckets/test-bucket/public-url/test-key")
666 .with_status(404)
667 .with_body("Object not found")
668 .create_async()
669 .await;
670
671 let client = ObjectStoreClient::new(&server.url());
672 let result = client.get_public_url("test-bucket", "test-key", None).await;
673
674 assert!(result.is_err());
675 assert!(matches!(result.unwrap_err(), Error::NotFound(_)));
676 }
677}