1use std::collections::VecDeque;
16use std::io::Read;
17use std::sync::Arc;
18
19use ibmcloud_iam::token::TokenManager;
20use quick_xml::de::from_str;
21use reqwest;
22use serde;
23use serde::{Deserialize, Serialize};
24use tracing::error;
25
26pub type Error = Box<dyn std::error::Error>;
27
28#[derive(Deserialize, Serialize, Debug)]
29pub struct ListAllMyBucketsResult {
30 #[serde(rename = "Owner")]
31 owner: Owner,
32 #[serde(rename = "Buckets")]
33 buckets: Buckets,
34}
35
36#[derive(Deserialize, Serialize, Debug)]
37pub struct Buckets {
38 #[serde(rename = "Bucket")]
39 list: Vec<Bucket>,
40}
41
42#[derive(Deserialize, Serialize, Debug)]
43pub struct Owner {
44 #[serde(rename = "$unflatten=ID")]
45 id: String,
46 #[serde(rename = "$unflatten=DisplayName")]
47 display_name: String,
48}
49
50#[derive(Deserialize, Serialize, Debug)]
51pub struct Bucket {
52 #[serde(rename = "$unflatten=Name")]
53 pub name: String,
54 #[serde(rename = "$unflatten=CreationDate")]
55 pub creation_date: String,
56}
57
58fn default_contents() -> Vec<Contents> {
59 Vec::new()
60}
61
62#[derive(Deserialize, Serialize, Debug, PartialEq)]
63pub struct ListBucketResult {
64 #[serde(rename = "Contents", default = "default_contents")]
65 contents: Vec<Contents>,
66 #[serde(rename = "$unflatten=KeyCount")]
67 key_count: u64,
68 #[serde(rename = "$unflatten=MaxKeys")]
69 max_keys: u64,
70 #[serde(rename = "$unflatten=NextContinuationToken")]
71 next_token: Option<String>,
72}
73
74#[derive(Deserialize, Serialize, Debug, PartialEq)]
75pub struct Contents {
76 #[serde(rename = "$unflatten=Key")]
77 pub key: String,
78 #[serde(rename = "$unflatten=LastModified")]
79 pub last_modified: String,
80 #[serde(rename = "$unflatten=ETag")]
81 pub etag: String,
82 #[serde(rename = "$unflatten=Size")]
83 pub size: u64,
84 #[serde(rename = "$unflatten=StorageClass")]
85 pub storage_class: String,
86}
87
88pub struct Client {
89 pub(crate) tm: Arc<TokenManager>,
90 pub(crate) endpoint: String,
91 pub(crate) client: reqwest::blocking::Client,
92}
93
94impl Client {
95 pub fn new(tm: Arc<TokenManager>, endpoint: &str) -> Self {
96 Self {
97 tm: tm,
98 endpoint: endpoint.to_string(),
99 client: reqwest::blocking::Client::new(),
100 }
101 }
102
103 pub fn list_buckets(&self, instance_id: &str) -> Result<Vec<Bucket>, Error> {
104 let c = &self.client;
105
106 let url = format!("https://{}/", self.endpoint);
107 let response = c
108 .get(url)
109 .header(
110 "Authorization",
111 format!("Bearer {}", self.tm.token()?.access_token),
112 )
113 .header("ibm-service-instance-id", instance_id.to_string())
114 .send()?;
115
116 let text: String = check_response(response)?.text()?;
117 let bucket_resp: ListAllMyBucketsResult = from_str(&text)?;
118
119 Ok(bucket_resp.buckets.list)
120 }
121
122 pub fn list_objects(
123 &self,
124 bucket: &str,
125 prefix: Option<String>,
126 start_after: Option<String>,
127 ) -> ObjectIterator {
128 ObjectIterator::new(self, bucket, prefix.clone(), start_after.clone())
129 }
130
131 fn _list_objects(
132 &self,
133 bucket: &str,
134 prefix: &Option<String>,
135 continuation_token: &Option<String>,
136 start_after: &Option<String>,
137 ) -> Result<ListBucketResult, Error> {
138 let c = &self.client;
139
140 let url = build_list_objects_url(
141 &self.endpoint,
142 bucket,
143 prefix,
144 continuation_token,
145 start_after,
146 )?;
147
148 let response = c
149 .get(url)
150 .header(
151 "Authorization",
152 format!("Bearer {}", self.tm.token()?.access_token),
153 )
154 .send()?;
155
156 let text: String = check_response(response)?.text()?;
157 let objlist: ListBucketResult = from_str(&text)?;
158 Ok(objlist)
159 }
160
161 pub fn get_object_at_range(
162 &self,
163 bucket: &str,
164 key: &str,
165 start: u64,
166 end: Option<u64>,
167 ) -> Result<Box<dyn Read>, Error> {
168 let c = &self.client;
169 let url = format!("https://{}.{}/{}", bucket, self.endpoint, key);
170
171 let mut end_str = "".to_string();
172 if let Some(e) = end {
173 end_str = format!("{}", e);
174 }
175
176 let response = c
177 .get(url)
178 .header(
179 "Authorization",
180 format!("Bearer {}", self.tm.token()?.access_token),
181 )
182 .header("Range", format!("bytes={}-{}", start, end_str))
183 .send()?;
184
185 let r = check_response(response)?;
186 Ok(Box::new(r))
187 }
188
189 pub fn get_object(&self, bucket: &str, key: &str) -> Result<Box<dyn Read>, Error> {
190 let c = &self.client;
191 let url = format!("https://{}.{}/{}", bucket, self.endpoint, key);
192
193 let response = c
194 .get(url)
195 .header(
196 "Authorization",
197 format!("Bearer {}", self.tm.token()?.access_token),
198 )
199 .send()?;
200
201 let r = check_response(response)?;
202 Ok(Box::new(r))
203 }
204
205 pub fn put_object<B: Into<reqwest::blocking::Body>>(
206 &self,
207 bucket: &str,
208 key: &str,
209 body: B,
210 ) -> Result<(), Error> {
211 let c = &self.client;
212 let url = format!("https://{}.{}/{}", bucket, self.endpoint, key);
213
214 let response = c
215 .put(url)
216 .header(
217 "Authorization",
218 format!("Bearer {}", self.tm.token()?.access_token),
219 )
220 .body(body)
221 .send()?;
222
223 let _r = check_response(response)?;
224 Ok(())
225 }
226
227 pub fn delete_object(&self, bucket: &str, key: &str) -> Result<(), Error> {
228 let c = &self.client;
229 let url = format!("https://{}.{}/{}", bucket, self.endpoint, key);
230
231 let response = c
232 .delete(url)
233 .header(
234 "Authorization",
235 format!("Bearer {}", self.tm.token()?.access_token),
236 )
237 .send()?;
238
239 check_response(response)?;
240 Ok(())
241 }
242}
243
244pub(crate) fn check_response(
245 response: reqwest::blocking::Response,
246) -> Result<reqwest::blocking::Response, Error> {
247 if !response.status().is_success() {
248 return Err(format!(
249 "request failed: code='{}' body='{:?}'",
250 response.status(),
251 response.text().unwrap()
252 )
253 .into());
254 }
255
256 Ok(response)
257}
258
259pub struct ObjectIterator<'a> {
260 client: &'a Client,
261 bucket: String,
262 prefix: Option<String>,
263 continuation_token: Option<String>,
264 start_after: Option<String>,
265 results: VecDeque<Contents>,
266 complete: bool,
267}
268
269impl<'a> ObjectIterator<'a> {
270 pub fn new(
271 client: &'a Client,
272 bucket: &str,
273 prefix: Option<String>,
274 start_after: Option<String>,
275 ) -> Self {
276 Self {
277 client,
278 bucket: bucket.to_string(),
279 prefix: prefix,
280 continuation_token: None,
281 start_after: start_after,
282 results: VecDeque::new(),
283 complete: false,
284 }
285 }
286}
287
288impl Iterator for ObjectIterator<'_> {
289 type Item = Contents;
290
291 fn next(&mut self) -> Option<Self::Item> {
292 if self.results.len() < 1 {
293 if self.complete {
294 return None;
295 }
296
297 match self.client._list_objects(
298 &self.bucket,
299 &self.prefix,
300 &self.continuation_token,
301 &self.start_after,
302 ) {
303 Ok(mut v) => {
304 if v.contents.len() < 1 {
305 self.complete = true;
307 return None;
308 }
309
310 for o in v.contents.drain(..) {
311 self.results.push_back(o);
312 }
313 if v.next_token.is_some() {
314 self.continuation_token = v.next_token;
315 } else {
316 self.complete = true;
317 }
318 }
319 Err(e) => {
320 error!(e);
321 return None;
322 }
323 }
324 }
325
326 Some(self.results.pop_front().unwrap())
327 }
328}
329
330fn build_list_objects_url(
331 endpoint: &str,
332 bucket: &str,
333 prefix: &Option<String>,
334 continuation_token: &Option<String>,
335 start_after: &Option<String>,
336) -> Result<reqwest::Url, Error> {
337 let mut url = reqwest::Url::parse(&format!("https://{}.{}/?list-type=2", bucket, endpoint))?;
338
339 if let Some(tok) = continuation_token {
340 url.query_pairs_mut().append_pair("continuation-token", tok);
341 }
342
343 if let Some(pre) = prefix {
344 url.query_pairs_mut().append_pair("prefix", pre);
345 }
346
347 if let Some(after) = start_after {
348 url.query_pairs_mut().append_pair("start-after", after);
349 }
350
351 Ok(url)
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use quick_xml::se::to_string;
358
359 #[test]
360 fn test_bucket_list_response() {
361 let res = ListAllMyBucketsResult {
362 owner: Owner {
363 id: "asdfasdfa".to_string(),
364 display_name: "12315123".to_string(),
365 },
366 buckets: Buckets {
367 list: vec![
368 Bucket {
369 name: "asdasdfasdfadfadf".to_string(),
370 creation_date: "1238218238902389023890".to_string(),
371 },
372 Bucket {
373 name: "asdasdfasdfadfadf".to_string(),
374 creation_date: "1238218238902389023890".to_string(),
375 },
376 Bucket {
377 name: "asdasdfasdfadfadf".to_string(),
378 creation_date: "1238218238902389023890".to_string(),
379 },
380 ],
381 },
382 };
383
384 let exp = "<ListAllMyBucketsResult><Owner><ID>asdfasdfa</ID><DisplayName>12315123</DisplayName></Owner><Buckets><Bucket><Name>asdasdfasdfadfadf</Name><CreationDate>1238218238902389023890</CreationDate></Bucket><Bucket><Name>asdasdfasdfadfadf</Name><CreationDate>1238218238902389023890</CreationDate></Bucket><Bucket><Name>asdasdfasdfadfadf</Name><CreationDate>1238218238902389023890</CreationDate></Bucket></Buckets></ListAllMyBucketsResult>";
385
386 let out = to_string(&res).unwrap();
387 assert_eq!(out, exp);
388 }
389
390 #[test]
391 fn test_list_objects_empty_bucket() {
392 let input = r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?><ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Name>logbase</Name><Prefix></Prefix><KeyCount>0</KeyCount><MaxKeys>1000</MaxKeys><Delimiter></Delimiter><IsTruncated>false</IsTruncated></ListBucketResult>"#;
393 let exp = ListBucketResult {
394 contents: vec![],
395 key_count: 0,
396 max_keys: 1000,
397 next_token: None,
398 };
399
400 let objs: ListBucketResult = from_str(&input).unwrap();
401 assert_eq!(objs, exp);
402 }
403
404 #[test]
405 fn test_build_list_objects_url() {
406 let res = build_list_objects_url(
407 "cos.cloud.ibm.com",
408 "test-bucket-123",
409 &None,
410 &None,
411 &Some("object-key/with/special=characters+001.stuff".to_string()),
412 );
413
414 let mut url = reqwest::Url::parse("https://test-bucket-123.cos.cloud.ibm.com/").unwrap();
415 url.query_pairs_mut()
416 .append_pair("list-type", "2")
417 .append_pair(
418 "start-after",
419 "object-key/with/special=characters+001.stuff",
420 );
421
422 assert_eq!(res.unwrap(), url);
423 }
424}