Skip to main content

oci_api/services/object_storage/
client.rs

1//! Object Storage client
2
3use crate::client::Oci;
4use crate::client::request_executor::{RequestPayload, RequestTarget};
5use crate::error::{Error, Result};
6use crate::services::object_storage::models::*;
7use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
8use reqwest::Method;
9use serde::Deserialize;
10use serde::Serialize;
11use serde::de::DeserializeOwned;
12use sha2::{Digest as ShaDigest, Sha256, Sha384};
13
14/// Object Storage Service Client
15#[derive(Clone)]
16pub struct ObjectStorage {
17    /// OCI HTTP client
18    oci_client: Oci,
19    /// Namespace name
20    pub namespace: String,
21    /// Endpoint (Host)
22    endpoint: String,
23    /// Protocol (http/https)
24    protocol: String,
25}
26
27impl ObjectStorage {
28    /// Create new Object Storage client
29    ///
30    /// # Arguments
31    /// * `oci_client` - OCI HTTP client
32    /// * `namespace` - Object Storage Namespace
33    pub fn new(oci_client: &Oci, namespace: impl Into<String>) -> Self {
34        let region = oci_client.region().to_string();
35        let endpoint = format!("objectstorage.{region}.{}", oci_client.realm_domain());
36
37        Self {
38            oci_client: oci_client.clone(),
39            namespace: namespace.into(),
40            endpoint,
41            protocol: "https".to_string(),
42        }
43    }
44
45    /// Get Bucket
46    ///
47    /// # Arguments
48    /// * `bucket_name` - Bucket name
49    pub async fn get_bucket(&self, bucket_name: &str) -> Result<Bucket> {
50        let path = format!("/n/{}/b/{}/", self.namespace, bucket_name);
51        self.oci_client
52            .executor()
53            .execute(
54                Method::GET,
55                RequestTarget {
56                    scheme: &self.protocol,
57                    host: &self.endpoint,
58                    path: &path,
59                },
60                RequestPayload {
61                    body: None,
62                    content_type: None,
63                    extra_headers: Vec::new(),
64                },
65            )
66            .await?;
67        Ok(Bucket {
68            oci_client: self.oci_client.clone(),
69            namespace: self.namespace.clone(),
70            name: bucket_name.to_string(),
71            endpoint: self.endpoint.clone(),
72            protocol: self.protocol.clone(),
73        })
74    }
75}
76
77/// Bucket
78#[derive(Clone)]
79pub struct Bucket {
80    /// OCI HTTP client
81    oci_client: Oci,
82    /// Namespace
83    pub namespace: String,
84    /// Bucket name
85    pub name: String,
86    /// Endpoint (Host)
87    endpoint: String,
88    /// Protocol (http/https)
89    protocol: String,
90}
91
92impl Bucket {
93    async fn execute(
94        &self,
95        method: Method,
96        path: &str,
97        body: Option<String>,
98        content_type: Option<&str>,
99        extra_headers: Vec<(String, String)>,
100    ) -> Result<reqwest::Response> {
101        self.oci_client
102            .executor()
103            .execute(
104                method,
105                RequestTarget {
106                    scheme: &self.protocol,
107                    host: &self.endpoint,
108                    path,
109                },
110                RequestPayload {
111                    body,
112                    content_type,
113                    extra_headers,
114                },
115            )
116            .await
117    }
118
119    // Helper for making requests
120    async fn request<T, B>(&self, method: &str, path: &str, body: Option<B>) -> Result<T>
121    where
122        T: DeserializeOwned,
123        B: Serialize,
124    {
125        let body_str = if let Some(b) = &body {
126            Some(serde_json::to_string(b)?)
127        } else {
128            None
129        };
130        let method = parse_method(method)?;
131        let response = self
132            .execute(method, path, body_str, Some("application/json"), Vec::new())
133            .await?;
134        let text = response.text().await?;
135        serde_json::from_str(&text).map_err(Into::into)
136    }
137
138    async fn request_no_content<B>(&self, method: &str, path: &str, body: Option<B>) -> Result<()>
139    where
140        B: Serialize,
141    {
142        let body_str = if let Some(b) = &body {
143            Some(serde_json::to_string(b)?)
144        } else {
145            None
146        };
147        let method = parse_method(method)?;
148        self.execute(method, path, body_str, Some("application/json"), Vec::new())
149            .await?;
150        Ok(())
151    }
152
153    /// Put Object
154    ///
155    /// # Arguments
156    /// * `object_name` - Object name
157    /// * `content` - Object content
158    pub async fn put_object(&self, object_name: &str, content: &str) -> Result<Object> {
159        self.put_object_internal(object_name, content, None).await
160    }
161
162    /// Put Object with Checksum
163    ///
164    /// # Arguments
165    /// * `object_name` - Object name
166    /// * `content` - Object content
167    /// * `algorithm` - Checksum algorithm to use
168    pub async fn put_object_with_checksum(
169        &self,
170        object_name: &str,
171        content: &str,
172        algorithm: ChecksumAlgorithm,
173    ) -> Result<Object> {
174        self.put_object_internal(object_name, content, Some(algorithm))
175            .await
176    }
177
178    async fn put_object_internal(
179        &self,
180        object_name: &str,
181        content: &str,
182        algorithm: Option<ChecksumAlgorithm>,
183    ) -> Result<Object> {
184        let path = format!("/n/{}/b/{}/o/{}", self.namespace, self.name, object_name);
185        let mut extra_headers = Vec::new();
186
187        if let Some(algo) = algorithm {
188            let data = content.as_bytes();
189            match algo {
190                ChecksumAlgorithm::SHA256 => {
191                    let mut hasher = Sha256::new();
192                    hasher.update(data);
193                    let result = hasher.finalize();
194                    let b64 = BASE64.encode(result);
195                    extra_headers.push(("opc-checksum-algorithm".to_owned(), "SHA256".to_owned()));
196                    extra_headers.push(("opc-content-sha256".to_owned(), b64));
197                }
198                ChecksumAlgorithm::SHA384 => {
199                    let mut hasher = Sha384::new();
200                    hasher.update(data);
201                    let result = hasher.finalize();
202                    let b64 = BASE64.encode(result);
203                    extra_headers.push(("opc-checksum-algorithm".to_owned(), "SHA384".to_owned()));
204                    extra_headers.push(("opc-content-sha384".to_owned(), b64));
205                }
206                ChecksumAlgorithm::CRC32C => {
207                    let crc = crc32c::crc32c(data);
208                    let bytes = crc.to_be_bytes();
209                    let b64 = BASE64.encode(bytes);
210                    extra_headers.push(("opc-checksum-algorithm".to_owned(), "CRC32C".to_owned()));
211                    extra_headers.push(("opc-content-crc32c".to_owned(), b64));
212                }
213            }
214        }
215
216        let response = self
217            .execute(
218                Method::PUT,
219                &path,
220                Some(content.to_owned()),
221                Some("application/octet-stream"),
222                extra_headers,
223            )
224            .await?;
225        let headers = response.headers();
226        let md5 = headers
227            .get("opc-content-md5")
228            .and_then(|h| h.to_str().ok())
229            .ok_or_else(|| Error::Other("Missing required header: opc-content-md5".to_string()))?
230            .to_string();
231
232        let mut checksum = None;
233
234        if let Some(val) = headers
235            .get("opc-content-sha256")
236            .and_then(|h| h.to_str().ok())
237        {
238            checksum = Some(Checksum {
239                algorithm: ChecksumAlgorithm::SHA256,
240                value: val.to_string(),
241            });
242        } else if let Some(val) = headers
243            .get("opc-content-sha384")
244            .and_then(|h| h.to_str().ok())
245        {
246            checksum = Some(Checksum {
247                algorithm: ChecksumAlgorithm::SHA384,
248                value: val.to_string(),
249            });
250        } else if let Some(val) = headers
251            .get("opc-content-crc32c")
252            .and_then(|h| h.to_str().ok())
253        {
254            checksum = Some(Checksum {
255                algorithm: ChecksumAlgorithm::CRC32C,
256                value: val.to_string(),
257            });
258        }
259
260        Ok(Object {
261            name: object_name.to_string(),
262            value: content.to_string(),
263            md5,
264            checksum,
265        })
266    }
267
268    /// Get Object
269    ///
270    /// # Arguments
271    /// * `object_name` - Object name
272    pub async fn get_object(&self, object_name: &str) -> Result<Object> {
273        let path = format!("/n/{}/b/{}/o/{}", self.namespace, self.name, object_name);
274        let response = self
275            .execute(Method::GET, &path, None, None, Vec::new())
276            .await?;
277
278        let headers = response.headers();
279        let md5 = headers
280            .get("content-md5")
281            .or_else(|| headers.get("opc-multipart-md5"))
282            .and_then(|h| h.to_str().ok())
283            .ok_or_else(|| Error::Other("Missing required header: content-md5".to_string()))?
284            .to_string();
285
286        let mut checksum = None;
287
288        if let Some(val) = headers
289            .get("opc-content-sha256")
290            .and_then(|h| h.to_str().ok())
291        {
292            checksum = Some(Checksum {
293                algorithm: ChecksumAlgorithm::SHA256,
294                value: val.to_string(),
295            });
296        } else if let Some(val) = headers
297            .get("opc-content-sha384")
298            .and_then(|h| h.to_str().ok())
299        {
300            checksum = Some(Checksum {
301                algorithm: ChecksumAlgorithm::SHA384,
302                value: val.to_string(),
303            });
304        } else if let Some(val) = headers
305            .get("opc-content-crc32c")
306            .and_then(|h| h.to_str().ok())
307        {
308            checksum = Some(Checksum {
309                algorithm: ChecksumAlgorithm::CRC32C,
310                value: val.to_string(),
311            });
312        }
313
314        let value = response.text().await?;
315
316        Ok(Object {
317            name: object_name.to_string(),
318            value,
319            md5,
320            checksum,
321        })
322    }
323
324    /// Get or Create Object
325    ///
326    /// Tries to get the object. If it doesn't exist (404), creates it with the provided content.
327    ///
328    /// # Arguments
329    /// * `object_name` - Object name
330    /// * `content` - Content to use if object needs to be created
331    pub async fn get_or_create_object(&self, object_name: &str, content: &str) -> Result<Object> {
332        match self.get_object(object_name).await {
333            Ok(obj) => Ok(obj),
334            Err(Error::ApiError { code, .. }) if code.contains("404") => {
335                self.put_object(object_name, content).await
336            }
337            Err(e) => Err(e),
338        }
339    }
340
341    /// Get Retention Rules
342    pub async fn get_retention_rules(&self) -> Result<Vec<RetentionRule>> {
343        let path = format!("/n/{}/b/{}/retentionRules", self.namespace, self.name);
344
345        #[derive(Deserialize)]
346        struct ResponseWrapper {
347            items: Vec<RetentionRule>,
348        }
349
350        let wrapper: ResponseWrapper = self
351            .request::<ResponseWrapper, ()>("GET", &path, None)
352            .await?;
353        Ok(wrapper.items)
354    }
355
356    /// Create Retention Rule
357    pub async fn create_retention_rule(
358        &self,
359        details: RetentionRuleDetails,
360    ) -> Result<RetentionRule> {
361        let path = format!("/n/{}/b/{}/retentionRules", self.namespace, self.name);
362        self.request("POST", &path, Some(details)).await
363    }
364
365    /// Get Retention Rule
366    pub async fn get_retention_rule(&self, rule_id: &str) -> Result<RetentionRule> {
367        let path = format!(
368            "/n/{}/b/{}/retentionRules/{}",
369            self.namespace, self.name, rule_id
370        );
371        self.request("GET", &path, None::<()>).await
372    }
373
374    /// Update Retention Rule
375    pub async fn update_retention_rule(
376        &self,
377        rule_or_id: impl Into<String>,
378        details: RetentionRuleDetails,
379    ) -> Result<RetentionRule> {
380        let rule_id = rule_or_id.into();
381        let path = format!(
382            "/n/{}/b/{}/retentionRules/{}",
383            self.namespace, self.name, rule_id
384        );
385        self.request("PUT", &path, Some(details)).await
386    }
387
388    /// Delete Retention Rule
389    pub async fn delete_retention_rule(&self, rule_or_id: impl Into<String>) -> Result<()> {
390        let rule_id = rule_or_id.into();
391        let path = format!(
392            "/n/{}/b/{}/retentionRules/{}",
393            self.namespace, self.name, rule_id
394        );
395        self.request_no_content("DELETE", &path, None::<()>).await
396    }
397}
398
399fn parse_method(method: &str) -> Result<Method> {
400    Method::from_bytes(method.as_bytes())
401        .map_err(|e| Error::Other(format!("Unsupported method {method}: {e}")))
402}
403
404#[cfg(test)]
405#[path = "tests.rs"]
406mod tests;