1use std::{path::Path, sync::Arc};
2
3use bytes::Bytes;
4use toolcraft_utils::{presign_get_object, presign_put_object, sign_request};
5
6use crate::{
7 client::S3Client,
8 error::Result,
9 util::{ObjectInfo, check_status, parse_object_list, url_encode},
10};
11
12#[derive(Clone)]
22pub struct BucketClient {
23 inner: Arc<S3Client>,
24 bucket: String,
25}
26
27impl BucketClient {
30 pub fn new(client: Arc<S3Client>, bucket: impl Into<String>) -> Self {
31 Self {
32 inner: client,
33 bucket: bucket.into(),
34 }
35 }
36}
37
38impl BucketClient {
41 pub async fn list_objects(&self, prefix: Option<&str>) -> Result<Vec<ObjectInfo>> {
42 let c = &self.inner;
43 let path = format!("/{}", self.bucket);
44 let query = match prefix {
45 Some(p) => format!("list-type=2&prefix={}", url_encode(p)),
46 None => "list-type=2".to_string(),
47 };
48 let auth = sign_request(
49 "GET",
50 &c.access_key,
51 &c.secret_key,
52 &c.host(),
53 &path,
54 &query,
55 Some(&c.region),
56 );
57
58 let resp = c
59 .http
60 .get(format!("{}?{}", c.url(&path), query))
61 .header("host", c.host())
62 .header("x-amz-date", &auth.x_amz_date)
63 .header("x-amz-content-sha256", &auth.x_amz_content_sha256)
64 .header("authorization", &auth.authorization)
65 .send()
66 .await?;
67
68 let xml = check_status(resp).await?.text().await?;
69 parse_object_list(&xml)
70 }
71
72 pub async fn upload_bytes(
74 &self,
75 key: &str,
76 data: Bytes,
77 content_type: Option<&str>,
78 ) -> Result<()> {
79 let c = &self.inner;
80 let url = presign_put_object(
81 &c.access_key,
82 &c.secret_key,
83 &self.bucket,
84 key,
85 Some(&c.region),
86 c.base_url.as_str(),
87 None,
88 );
89
90 let mut req = c.http.put(&url).body(data);
91 if let Some(ct) = content_type {
92 req = req.header("content-type", ct);
93 }
94 check_status(req.send().await?).await.map(|_| ())
95 }
96
97 pub async fn upload_local_file<P: AsRef<Path>>(
99 &self,
100 key: &str,
101 local_path: P,
102 content_type: Option<&str>,
103 ) -> Result<u64> {
104 let bytes = tokio::fs::read(local_path.as_ref()).await?;
105 let size = bytes.len() as u64;
106 self.upload_bytes(key, Bytes::from(bytes), content_type)
107 .await?;
108 Ok(size)
109 }
110
111 pub async fn upload_file(
113 &self,
114 key: &str,
115 data: Bytes,
116 content_type: Option<&str>,
117 ) -> Result<()> {
118 self.upload_bytes(key, data, content_type).await
119 }
120
121 pub async fn download_object(&self, key: &str) -> Result<Bytes> {
122 let c = &self.inner;
123 let url = presign_get_object(
124 &c.access_key,
125 &c.secret_key,
126 &self.bucket,
127 key,
128 Some(&c.region),
129 c.base_url.as_str(),
130 None,
131 );
132
133 let resp = check_status(c.http.get(&url).send().await?).await?;
134 Ok(resp.bytes().await?)
135 }
136
137 pub async fn delete_object(&self, key: &str) -> Result<()> {
138 let c = &self.inner;
139 let path = format!("/{}/{}", self.bucket, key.trim_start_matches('/'));
140 let auth = sign_request(
141 "DELETE",
142 &c.access_key,
143 &c.secret_key,
144 &c.host(),
145 &path,
146 "",
147 Some(&c.region),
148 );
149
150 let resp = c
151 .http
152 .delete(c.url(&path))
153 .header("host", c.host())
154 .header("x-amz-date", &auth.x_amz_date)
155 .header("x-amz-content-sha256", &auth.x_amz_content_sha256)
156 .header("authorization", &auth.authorization)
157 .send()
158 .await?;
159
160 check_status(resp).await.map(|_| ())
161 }
162
163 pub fn presign_upload(&self, key: &str, expires_secs: Option<u64>) -> String {
165 let c = &self.inner;
166 presign_put_object(
167 &c.access_key,
168 &c.secret_key,
169 &self.bucket,
170 key,
171 Some(&c.region),
172 c.base_url.as_str(),
173 expires_secs,
174 )
175 }
176}