1use bytes::Bytes;
2use reqwest::{Client, StatusCode};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use thiserror::Error;
6
7#[derive(Error, Debug)]
8pub enum Error {
9 #[error("HTTP error: {0}")]
10 Http(#[from] reqwest::Error),
11
12 #[error("Not found: {0}")]
13 NotFound(String),
14
15 #[error("Already exists: {0}")]
16 AlreadyExists(String),
17
18 #[error("Bad request: {0}")]
19 BadRequest(String),
20
21 #[error("Server error: {0}")]
22 ServerError(String),
23}
24
25pub type Result<T> = std::result::Result<T, Error>;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct Bucket {
29 pub name: String,
30 pub created_at: String,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ObjectMetadata {
35 pub key: String,
36 pub size: u64,
37 pub content_type: Option<String>,
38 pub etag: String,
39 pub last_modified: String,
40 pub metadata: HashMap<String, String>,
41}
42
43#[derive(Debug, Clone)]
44pub struct ObjectData {
45 pub metadata: ObjectMetadata,
46 pub data: Bytes,
47}
48
49#[derive(Debug, Clone, Serialize)]
50struct CreateBucketRequest {
51 name: String,
52}
53
54#[derive(Debug, Deserialize)]
55struct ListBucketsResponse {
56 buckets: Vec<Bucket>,
57}
58
59#[derive(Debug, Deserialize)]
60struct ListObjectsResponse {
61 objects: Vec<ObjectMetadata>,
62}
63
64pub struct ObjectStoreClient {
65 client: Client,
66 base_url: String,
67}
68
69impl ObjectStoreClient {
70 pub fn new(base_url: impl Into<String>) -> Self {
71 Self {
72 client: Client::new(),
73 base_url: base_url.into(),
74 }
75 }
76
77 pub fn with_client(base_url: impl Into<String>, client: Client) -> Self {
78 Self {
79 client,
80 base_url: base_url.into(),
81 }
82 }
83
84 pub async fn create_bucket(&self, name: &str) -> Result<Bucket> {
85 let url = format!("{}/buckets", self.base_url);
86 let req = CreateBucketRequest {
87 name: name.to_string(),
88 };
89
90 let response = self.client.post(&url).json(&req).send().await?;
91
92 match response.status() {
93 StatusCode::OK => Ok(response.json().await?),
94 StatusCode::CONFLICT => Err(Error::AlreadyExists(name.to_string())),
95 StatusCode::BAD_REQUEST => {
96 Err(Error::BadRequest(response.text().await.unwrap_or_default()))
97 }
98 _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
99 }
100 }
101
102 pub async fn list_buckets(&self) -> Result<Vec<Bucket>> {
103 let url = format!("{}/buckets", self.base_url);
104 let response = self.client.get(&url).send().await?;
105
106 match response.status() {
107 StatusCode::OK => {
108 let resp: ListBucketsResponse = response.json().await?;
109 Ok(resp.buckets)
110 }
111 _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
112 }
113 }
114
115 pub async fn delete_bucket(&self, name: &str) -> Result<()> {
116 let url = format!("{}/buckets/{}", self.base_url, name);
117 let response = self.client.delete(&url).send().await?;
118
119 match response.status() {
120 StatusCode::NO_CONTENT => Ok(()),
121 StatusCode::NOT_FOUND => Err(Error::NotFound(name.to_string())),
122 StatusCode::BAD_REQUEST => {
123 Err(Error::BadRequest(response.text().await.unwrap_or_default()))
124 }
125 _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
126 }
127 }
128
129 pub async fn put_object(
130 &self,
131 bucket: &str,
132 key: &str,
133 data: impl Into<Bytes>,
134 content_type: Option<&str>,
135 metadata: Option<HashMap<String, String>>,
136 ) -> Result<ObjectMetadata> {
137 let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
138 let mut request = self.client.put(&url);
139
140 if let Some(ct) = content_type {
141 request = request.header("content-type", ct);
142 }
143
144 if let Some(meta) = metadata {
145 for (k, v) in meta {
146 request = request.header(format!("x-object-meta-{}", k), v);
147 }
148 }
149
150 let response = request.body(data.into()).send().await?;
151
152 match response.status() {
153 StatusCode::OK => Ok(response.json().await?),
154 StatusCode::NOT_FOUND => Err(Error::NotFound(bucket.to_string())),
155 StatusCode::BAD_REQUEST => {
156 Err(Error::BadRequest(response.text().await.unwrap_or_default()))
157 }
158 _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
159 }
160 }
161
162 pub async fn get_object(&self, bucket: &str, key: &str) -> Result<ObjectData> {
163 let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
164 let response = self.client.get(&url).send().await?;
165
166 match response.status() {
167 StatusCode::OK => {
168 let etag = response
169 .headers()
170 .get("etag")
171 .and_then(|v| v.to_str().ok())
172 .unwrap_or("")
173 .to_string();
174
175 let last_modified = response
176 .headers()
177 .get("last-modified")
178 .and_then(|v| v.to_str().ok())
179 .unwrap_or("")
180 .to_string();
181
182 let content_type = response
183 .headers()
184 .get("content-type")
185 .and_then(|v| v.to_str().ok())
186 .map(|s| s.to_string());
187
188 let size = response
189 .headers()
190 .get("content-length")
191 .and_then(|v| v.to_str().ok())
192 .and_then(|s| s.parse().ok())
193 .unwrap_or(0);
194
195 let data = response.bytes().await?;
196
197 Ok(ObjectData {
198 metadata: ObjectMetadata {
199 key: key.to_string(),
200 size,
201 content_type,
202 etag,
203 last_modified,
204 metadata: HashMap::new(),
205 },
206 data,
207 })
208 }
209 StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
210 _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
211 }
212 }
213
214 pub async fn head_object(&self, bucket: &str, key: &str) -> Result<ObjectMetadata> {
215 let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
216 let response = self.client.head(&url).send().await?;
217
218 match response.status() {
219 StatusCode::OK => {
220 let etag = response
221 .headers()
222 .get("etag")
223 .and_then(|v| v.to_str().ok())
224 .unwrap_or("")
225 .to_string();
226
227 let last_modified = response
228 .headers()
229 .get("last-modified")
230 .and_then(|v| v.to_str().ok())
231 .unwrap_or("")
232 .to_string();
233
234 let content_type = response
235 .headers()
236 .get("content-type")
237 .and_then(|v| v.to_str().ok())
238 .map(|s| s.to_string());
239
240 let size = response
241 .headers()
242 .get("content-length")
243 .and_then(|v| v.to_str().ok())
244 .and_then(|s| s.parse().ok())
245 .unwrap_or(0);
246
247 Ok(ObjectMetadata {
248 key: key.to_string(),
249 size,
250 content_type,
251 etag,
252 last_modified,
253 metadata: HashMap::new(),
254 })
255 }
256 StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
257 _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
258 }
259 }
260
261 pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
262 let url = format!("{}/buckets/{}/objects/{}", self.base_url, bucket, key);
263 let response = self.client.delete(&url).send().await?;
264
265 match response.status() {
266 StatusCode::NO_CONTENT => Ok(()),
267 StatusCode::NOT_FOUND => Err(Error::NotFound(format!("{}/{}", bucket, key))),
268 _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
269 }
270 }
271
272 pub async fn list_objects(
273 &self,
274 bucket: &str,
275 prefix: Option<&str>,
276 max_keys: Option<usize>,
277 ) -> Result<Vec<ObjectMetadata>> {
278 let mut url = format!("{}/buckets/{}/objects", self.base_url, bucket);
279 let mut params = vec![];
280
281 if let Some(p) = prefix {
282 params.push(format!("prefix={}", p));
283 }
284 if let Some(m) = max_keys {
285 params.push(format!("max_keys={}", m));
286 }
287
288 if !params.is_empty() {
289 url.push('?');
290 url.push_str(¶ms.join("&"));
291 }
292
293 let response = self.client.get(&url).send().await?;
294
295 match response.status() {
296 StatusCode::OK => {
297 let resp: ListObjectsResponse = response.json().await?;
298 Ok(resp.objects)
299 }
300 StatusCode::NOT_FOUND => Err(Error::NotFound(bucket.to_string())),
301 _ => Err(Error::ServerError(response.text().await.unwrap_or_default())),
302 }
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use mockito::Server;
310
311 #[tokio::test]
312 async fn test_client_creation() {
313 let client = ObjectStoreClient::new("http://localhost:8080");
314 assert_eq!(client.base_url, "http://localhost:8080");
315 }
316
317 #[tokio::test]
318 async fn test_create_bucket() {
319 let mut server = Server::new_async().await;
320 let _m = server
321 .mock("POST", "/buckets")
322 .with_status(200)
323 .with_header("content-type", "application/json")
324 .with_body(r#"{"name":"test-bucket","created_at":"2024-01-01T00:00:00Z"}"#)
325 .create_async()
326 .await;
327
328 let client = ObjectStoreClient::new(&server.url());
329 let bucket = client.create_bucket("test-bucket").await.unwrap();
330
331 assert_eq!(bucket.name, "test-bucket");
332 assert_eq!(bucket.created_at, "2024-01-01T00:00:00Z");
333 }
334
335 #[tokio::test]
336 async fn test_create_bucket_conflict() {
337 let mut server = Server::new_async().await;
338 let _m = server
339 .mock("POST", "/buckets")
340 .with_status(409)
341 .with_body("Bucket already exists")
342 .create_async()
343 .await;
344
345 let client = ObjectStoreClient::new(&server.url());
346 let result = client.create_bucket("test-bucket").await;
347
348 assert!(result.is_err());
349 assert!(matches!(result.unwrap_err(), Error::AlreadyExists(_)));
350 }
351
352 #[tokio::test]
353 async fn test_list_buckets() {
354 let mut server = Server::new_async().await;
355 let _m = server
356 .mock("GET", "/buckets")
357 .with_status(200)
358 .with_header("content-type", "application/json")
359 .with_body(r#"{"buckets":[{"name":"bucket1","created_at":"2024-01-01T00:00:00Z"},{"name":"bucket2","created_at":"2024-01-02T00:00:00Z"}]}"#)
360 .create_async()
361 .await;
362
363 let client = ObjectStoreClient::new(&server.url());
364 let buckets = client.list_buckets().await.unwrap();
365
366 assert_eq!(buckets.len(), 2);
367 assert_eq!(buckets[0].name, "bucket1");
368 assert_eq!(buckets[1].name, "bucket2");
369 }
370
371 #[tokio::test]
372 async fn test_delete_bucket() {
373 let mut server = Server::new_async().await;
374 let _m = server
375 .mock("DELETE", "/buckets/test-bucket")
376 .with_status(204)
377 .create_async()
378 .await;
379
380 let client = ObjectStoreClient::new(&server.url());
381 let result = client.delete_bucket("test-bucket").await;
382
383 assert!(result.is_ok());
384 }
385
386 #[tokio::test]
387 async fn test_delete_bucket_not_found() {
388 let mut server = Server::new_async().await;
389 let _m = server
390 .mock("DELETE", "/buckets/test-bucket")
391 .with_status(404)
392 .with_body("Bucket not found")
393 .create_async()
394 .await;
395
396 let client = ObjectStoreClient::new(&server.url());
397 let result = client.delete_bucket("test-bucket").await;
398
399 assert!(result.is_err());
400 assert!(matches!(result.unwrap_err(), Error::NotFound(_)));
401 }
402
403 #[tokio::test]
404 async fn test_put_object() {
405 let mut server = Server::new_async().await;
406 let _m = server
407 .mock("PUT", "/buckets/test-bucket/objects/test-key")
408 .with_status(200)
409 .with_header("content-type", "application/json")
410 .with_body(r#"{"key":"test-key","size":13,"content_type":"text/plain","etag":"abc123","last_modified":"2024-01-01T00:00:00Z","metadata":{"key1":"value1"}}"#)
411 .create_async()
412 .await;
413
414 let client = ObjectStoreClient::new(&server.url());
415 let data = Bytes::from("Hello, World!");
416 let mut metadata = HashMap::new();
417 metadata.insert("key1".to_string(), "value1".to_string());
418
419 let obj = client
420 .put_object(
421 "test-bucket",
422 "test-key",
423 data,
424 Some("text/plain"),
425 Some(metadata),
426 )
427 .await
428 .unwrap();
429
430 assert_eq!(obj.key, "test-key");
431 assert_eq!(obj.size, 13);
432 assert_eq!(obj.etag, "abc123");
433 }
434
435 #[tokio::test]
436 async fn test_get_object() {
437 let mut server = Server::new_async().await;
438 let _m = server
439 .mock("GET", "/buckets/test-bucket/objects/test-key")
440 .with_status(200)
441 .with_header("content-type", "text/plain")
442 .with_header("content-length", "13")
443 .with_header("etag", "abc123")
444 .with_header("last-modified", "2024-01-01T00:00:00Z")
445 .with_body("Hello, World!")
446 .create_async()
447 .await;
448
449 let client = ObjectStoreClient::new(&server.url());
450 let obj = client.get_object("test-bucket", "test-key").await.unwrap();
451
452 assert_eq!(obj.metadata.key, "test-key");
453 assert_eq!(obj.metadata.size, 13);
454 assert_eq!(obj.metadata.etag, "abc123");
455 assert_eq!(obj.data, Bytes::from("Hello, World!"));
456 }
457
458 #[tokio::test]
459 async fn test_get_object_not_found() {
460 let mut server = Server::new_async().await;
461 let _m = server
462 .mock("GET", "/buckets/test-bucket/objects/test-key")
463 .with_status(404)
464 .with_body("Object not found")
465 .create_async()
466 .await;
467
468 let client = ObjectStoreClient::new(&server.url());
469 let result = client.get_object("test-bucket", "test-key").await;
470
471 assert!(result.is_err());
472 assert!(matches!(result.unwrap_err(), Error::NotFound(_)));
473 }
474
475 #[tokio::test]
476 async fn test_head_object() {
477 let mut server = Server::new_async().await;
478 let _m = server
479 .mock("HEAD", "/buckets/test-bucket/objects/test-key")
480 .with_status(200)
481 .with_header("content-type", "text/plain")
482 .with_header("content-length", "13")
483 .with_header("etag", "abc123")
484 .with_header("last-modified", "2024-01-01T00:00:00Z")
485 .create_async()
486 .await;
487
488 let client = ObjectStoreClient::new(&server.url());
489 let obj = client.head_object("test-bucket", "test-key").await.unwrap();
490
491 assert_eq!(obj.key, "test-key");
492 assert_eq!(obj.size, 13);
493 assert_eq!(obj.etag, "abc123");
494 }
495
496 #[tokio::test]
497 async fn test_delete_object() {
498 let mut server = Server::new_async().await;
499 let _m = server
500 .mock("DELETE", "/buckets/test-bucket/objects/test-key")
501 .with_status(204)
502 .create_async()
503 .await;
504
505 let client = ObjectStoreClient::new(&server.url());
506 let result = client.delete_object("test-bucket", "test-key").await;
507
508 assert!(result.is_ok());
509 }
510
511 #[tokio::test]
512 async fn test_list_objects() {
513 let mut server = Server::new_async().await;
514 let _m = server
515 .mock("GET", "/buckets/test-bucket/objects")
516 .match_query(mockito::Matcher::AllOf(vec![
517 mockito::Matcher::UrlEncoded("prefix".into(), "prefix/".into()),
518 mockito::Matcher::UrlEncoded("max_keys".into(), "10".into()),
519 ]))
520 .with_status(200)
521 .with_header("content-type", "application/json")
522 .with_body(r#"{"objects":[{"key":"prefix/obj1","size":100,"etag":"etag1","last_modified":"2024-01-01T00:00:00Z","metadata":{}},{"key":"prefix/obj2","size":200,"etag":"etag2","last_modified":"2024-01-02T00:00:00Z","metadata":{}}]}"#)
523 .create_async()
524 .await;
525
526 let client = ObjectStoreClient::new(&server.url());
527 let objects = client
528 .list_objects("test-bucket", Some("prefix/"), Some(10))
529 .await
530 .unwrap();
531
532 assert_eq!(objects.len(), 2);
533 assert_eq!(objects[0].key, "prefix/obj1");
534 assert_eq!(objects[1].key, "prefix/obj2");
535 }
536
537 #[tokio::test]
538 async fn test_list_objects_no_params() {
539 let mut server = Server::new_async().await;
540 let _m = server
541 .mock("GET", "/buckets/test-bucket/objects")
542 .with_status(200)
543 .with_header("content-type", "application/json")
544 .with_body(r#"{"objects":[]}"#)
545 .create_async()
546 .await;
547
548 let client = ObjectStoreClient::new(&server.url());
549 let objects = client.list_objects("test-bucket", None, None).await.unwrap();
550
551 assert_eq!(objects.len(), 0);
552 }
553}