ibmcloud_cos/
cos.rs

1// Copyright 2022 Mathew Odden <mathewrodden@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::io::Read;
17use std::sync::Arc;
18
19use ibmcloud_iam::token::TokenManager;
20use quick_xml::de::from_str;
21use reqwest;
22use serde;
23use serde::{Deserialize, Serialize};
24use tracing::error;
25
26pub type Error = Box<dyn std::error::Error>;
27
28#[derive(Deserialize, Serialize, Debug)]
29pub struct ListAllMyBucketsResult {
30    #[serde(rename = "Owner")]
31    owner: Owner,
32    #[serde(rename = "Buckets")]
33    buckets: Buckets,
34}
35
36#[derive(Deserialize, Serialize, Debug)]
37pub struct Buckets {
38    #[serde(rename = "Bucket")]
39    list: Vec<Bucket>,
40}
41
42#[derive(Deserialize, Serialize, Debug)]
43pub struct Owner {
44    #[serde(rename = "$unflatten=ID")]
45    id: String,
46    #[serde(rename = "$unflatten=DisplayName")]
47    display_name: String,
48}
49
50#[derive(Deserialize, Serialize, Debug)]
51pub struct Bucket {
52    #[serde(rename = "$unflatten=Name")]
53    pub name: String,
54    #[serde(rename = "$unflatten=CreationDate")]
55    pub creation_date: String,
56}
57
58fn default_contents() -> Vec<Contents> {
59    Vec::new()
60}
61
62#[derive(Deserialize, Serialize, Debug, PartialEq)]
63pub struct ListBucketResult {
64    #[serde(rename = "Contents", default = "default_contents")]
65    contents: Vec<Contents>,
66    #[serde(rename = "$unflatten=KeyCount")]
67    key_count: u64,
68    #[serde(rename = "$unflatten=MaxKeys")]
69    max_keys: u64,
70    #[serde(rename = "$unflatten=NextContinuationToken")]
71    next_token: Option<String>,
72}
73
74#[derive(Deserialize, Serialize, Debug, PartialEq)]
75pub struct Contents {
76    #[serde(rename = "$unflatten=Key")]
77    pub key: String,
78    #[serde(rename = "$unflatten=LastModified")]
79    pub last_modified: String,
80    #[serde(rename = "$unflatten=ETag")]
81    pub etag: String,
82    #[serde(rename = "$unflatten=Size")]
83    pub size: u64,
84    #[serde(rename = "$unflatten=StorageClass")]
85    pub storage_class: String,
86}
87
88pub struct Client {
89    pub(crate) tm: Arc<TokenManager>,
90    pub(crate) endpoint: String,
91    pub(crate) client: reqwest::blocking::Client,
92}
93
94impl Client {
95    pub fn new(tm: Arc<TokenManager>, endpoint: &str) -> Self {
96        Self {
97            tm: tm,
98            endpoint: endpoint.to_string(),
99            client: reqwest::blocking::Client::new(),
100        }
101    }
102
103    pub fn list_buckets(&self, instance_id: &str) -> Result<Vec<Bucket>, Error> {
104        let c = &self.client;
105
106        let url = format!("https://{}/", self.endpoint);
107        let response = c
108            .get(url)
109            .header(
110                "Authorization",
111                format!("Bearer {}", self.tm.token()?.access_token),
112            )
113            .header("ibm-service-instance-id", instance_id.to_string())
114            .send()?;
115
116        let text: String = check_response(response)?.text()?;
117        let bucket_resp: ListAllMyBucketsResult = from_str(&text)?;
118
119        Ok(bucket_resp.buckets.list)
120    }
121
122    pub fn list_objects(
123        &self,
124        bucket: &str,
125        prefix: Option<String>,
126        start_after: Option<String>,
127    ) -> ObjectIterator {
128        ObjectIterator::new(self, bucket, prefix.clone(), start_after.clone())
129    }
130
131    fn _list_objects(
132        &self,
133        bucket: &str,
134        prefix: &Option<String>,
135        continuation_token: &Option<String>,
136        start_after: &Option<String>,
137    ) -> Result<ListBucketResult, Error> {
138        let c = &self.client;
139
140        let url = build_list_objects_url(
141            &self.endpoint,
142            bucket,
143            prefix,
144            continuation_token,
145            start_after,
146        )?;
147
148        let response = c
149            .get(url)
150            .header(
151                "Authorization",
152                format!("Bearer {}", self.tm.token()?.access_token),
153            )
154            .send()?;
155
156        let text: String = check_response(response)?.text()?;
157        let objlist: ListBucketResult = from_str(&text)?;
158        Ok(objlist)
159    }
160
161    pub fn get_object_at_range(
162        &self,
163        bucket: &str,
164        key: &str,
165        start: u64,
166        end: Option<u64>,
167    ) -> Result<Box<dyn Read>, Error> {
168        let c = &self.client;
169        let url = format!("https://{}.{}/{}", bucket, self.endpoint, key);
170
171        let mut end_str = "".to_string();
172        if let Some(e) = end {
173            end_str = format!("{}", e);
174        }
175
176        let response = c
177            .get(url)
178            .header(
179                "Authorization",
180                format!("Bearer {}", self.tm.token()?.access_token),
181            )
182            .header("Range", format!("bytes={}-{}", start, end_str))
183            .send()?;
184
185        let r = check_response(response)?;
186        Ok(Box::new(r))
187    }
188
189    pub fn get_object(&self, bucket: &str, key: &str) -> Result<Box<dyn Read>, Error> {
190        let c = &self.client;
191        let url = format!("https://{}.{}/{}", bucket, self.endpoint, key);
192
193        let response = c
194            .get(url)
195            .header(
196                "Authorization",
197                format!("Bearer {}", self.tm.token()?.access_token),
198            )
199            .send()?;
200
201        let r = check_response(response)?;
202        Ok(Box::new(r))
203    }
204
205    pub fn put_object<B: Into<reqwest::blocking::Body>>(
206        &self,
207        bucket: &str,
208        key: &str,
209        body: B,
210    ) -> Result<(), Error> {
211        let c = &self.client;
212        let url = format!("https://{}.{}/{}", bucket, self.endpoint, key);
213
214        let response = c
215            .put(url)
216            .header(
217                "Authorization",
218                format!("Bearer {}", self.tm.token()?.access_token),
219            )
220            .body(body)
221            .send()?;
222
223        let _r = check_response(response)?;
224        Ok(())
225    }
226
227    pub fn delete_object(&self, bucket: &str, key: &str) -> Result<(), Error> {
228        let c = &self.client;
229        let url = format!("https://{}.{}/{}", bucket, self.endpoint, key);
230
231        let response = c
232            .delete(url)
233            .header(
234                "Authorization",
235                format!("Bearer {}", self.tm.token()?.access_token),
236            )
237            .send()?;
238
239        check_response(response)?;
240        Ok(())
241    }
242}
243
244pub(crate) fn check_response(
245    response: reqwest::blocking::Response,
246) -> Result<reqwest::blocking::Response, Error> {
247    if !response.status().is_success() {
248        return Err(format!(
249            "request failed: code='{}' body='{:?}'",
250            response.status(),
251            response.text().unwrap()
252        )
253        .into());
254    }
255
256    Ok(response)
257}
258
259pub struct ObjectIterator<'a> {
260    client: &'a Client,
261    bucket: String,
262    prefix: Option<String>,
263    continuation_token: Option<String>,
264    start_after: Option<String>,
265    results: VecDeque<Contents>,
266    complete: bool,
267}
268
269impl<'a> ObjectIterator<'a> {
270    pub fn new(
271        client: &'a Client,
272        bucket: &str,
273        prefix: Option<String>,
274        start_after: Option<String>,
275    ) -> Self {
276        Self {
277            client,
278            bucket: bucket.to_string(),
279            prefix: prefix,
280            continuation_token: None,
281            start_after: start_after,
282            results: VecDeque::new(),
283            complete: false,
284        }
285    }
286}
287
288impl Iterator for ObjectIterator<'_> {
289    type Item = Contents;
290
291    fn next(&mut self) -> Option<Self::Item> {
292        if self.results.len() < 1 {
293            if self.complete {
294                return None;
295            }
296
297            match self.client._list_objects(
298                &self.bucket,
299                &self.prefix,
300                &self.continuation_token,
301                &self.start_after,
302            ) {
303                Ok(mut v) => {
304                    if v.contents.len() < 1 {
305                        // empty bucket
306                        self.complete = true;
307                        return None;
308                    }
309
310                    for o in v.contents.drain(..) {
311                        self.results.push_back(o);
312                    }
313                    if v.next_token.is_some() {
314                        self.continuation_token = v.next_token;
315                    } else {
316                        self.complete = true;
317                    }
318                }
319                Err(e) => {
320                    error!(e);
321                    return None;
322                }
323            }
324        }
325
326        Some(self.results.pop_front().unwrap())
327    }
328}
329
330fn build_list_objects_url(
331    endpoint: &str,
332    bucket: &str,
333    prefix: &Option<String>,
334    continuation_token: &Option<String>,
335    start_after: &Option<String>,
336) -> Result<reqwest::Url, Error> {
337    let mut url = reqwest::Url::parse(&format!("https://{}.{}/?list-type=2", bucket, endpoint))?;
338
339    if let Some(tok) = continuation_token {
340        url.query_pairs_mut().append_pair("continuation-token", tok);
341    }
342
343    if let Some(pre) = prefix {
344        url.query_pairs_mut().append_pair("prefix", pre);
345    }
346
347    if let Some(after) = start_after {
348        url.query_pairs_mut().append_pair("start-after", after);
349    }
350
351    Ok(url)
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use quick_xml::se::to_string;
358
359    #[test]
360    fn test_bucket_list_response() {
361        let res = ListAllMyBucketsResult {
362            owner: Owner {
363                id: "asdfasdfa".to_string(),
364                display_name: "12315123".to_string(),
365            },
366            buckets: Buckets {
367                list: vec![
368                    Bucket {
369                        name: "asdasdfasdfadfadf".to_string(),
370                        creation_date: "1238218238902389023890".to_string(),
371                    },
372                    Bucket {
373                        name: "asdasdfasdfadfadf".to_string(),
374                        creation_date: "1238218238902389023890".to_string(),
375                    },
376                    Bucket {
377                        name: "asdasdfasdfadfadf".to_string(),
378                        creation_date: "1238218238902389023890".to_string(),
379                    },
380                ],
381            },
382        };
383
384        let exp = "<ListAllMyBucketsResult><Owner><ID>asdfasdfa</ID><DisplayName>12315123</DisplayName></Owner><Buckets><Bucket><Name>asdasdfasdfadfadf</Name><CreationDate>1238218238902389023890</CreationDate></Bucket><Bucket><Name>asdasdfasdfadfadf</Name><CreationDate>1238218238902389023890</CreationDate></Bucket><Bucket><Name>asdasdfasdfadfadf</Name><CreationDate>1238218238902389023890</CreationDate></Bucket></Buckets></ListAllMyBucketsResult>";
385
386        let out = to_string(&res).unwrap();
387        assert_eq!(out, exp);
388    }
389
390    #[test]
391    fn test_list_objects_empty_bucket() {
392        let input = r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?><ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Name>logbase</Name><Prefix></Prefix><KeyCount>0</KeyCount><MaxKeys>1000</MaxKeys><Delimiter></Delimiter><IsTruncated>false</IsTruncated></ListBucketResult>"#;
393        let exp = ListBucketResult {
394            contents: vec![],
395            key_count: 0,
396            max_keys: 1000,
397            next_token: None,
398        };
399
400        let objs: ListBucketResult = from_str(&input).unwrap();
401        assert_eq!(objs, exp);
402    }
403
404    #[test]
405    fn test_build_list_objects_url() {
406        let res = build_list_objects_url(
407            "cos.cloud.ibm.com",
408            "test-bucket-123",
409            &None,
410            &None,
411            &Some("object-key/with/special=characters+001.stuff".to_string()),
412        );
413
414        let mut url = reqwest::Url::parse("https://test-bucket-123.cos.cloud.ibm.com/").unwrap();
415        url.query_pairs_mut()
416            .append_pair("list-type", "2")
417            .append_pair(
418                "start-after",
419                "object-key/with/special=characters+001.stuff",
420            );
421
422        assert_eq!(res.unwrap(), url);
423    }
424}