1use crate::client::Oci;
4use crate::client::request_executor::{RequestPayload, RequestTarget};
5use crate::error::{Error, Result};
6use crate::services::object_storage::models::*;
7use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
8use reqwest::Method;
9use serde::Deserialize;
10use serde::Serialize;
11use serde::de::DeserializeOwned;
12use sha2::{Digest as ShaDigest, Sha256, Sha384};
13
14#[derive(Clone)]
16pub struct ObjectStorage {
17 oci_client: Oci,
19 pub namespace: String,
21 endpoint: String,
23 protocol: String,
25}
26
27impl ObjectStorage {
28 pub fn new(oci_client: &Oci, namespace: impl Into<String>) -> Self {
34 let region = oci_client.region().to_string();
35 let endpoint = format!("objectstorage.{region}.{}", oci_client.realm_domain());
36
37 Self {
38 oci_client: oci_client.clone(),
39 namespace: namespace.into(),
40 endpoint,
41 protocol: "https".to_string(),
42 }
43 }
44
45 pub async fn get_bucket(&self, bucket_name: &str) -> Result<Bucket> {
50 let path = format!("/n/{}/b/{}/", self.namespace, bucket_name);
51 self.oci_client
52 .executor()
53 .execute(
54 Method::GET,
55 RequestTarget {
56 scheme: &self.protocol,
57 host: &self.endpoint,
58 path: &path,
59 },
60 RequestPayload {
61 body: None,
62 content_type: None,
63 extra_headers: Vec::new(),
64 },
65 )
66 .await?;
67 Ok(Bucket {
68 oci_client: self.oci_client.clone(),
69 namespace: self.namespace.clone(),
70 name: bucket_name.to_string(),
71 endpoint: self.endpoint.clone(),
72 protocol: self.protocol.clone(),
73 })
74 }
75}
76
77#[derive(Clone)]
79pub struct Bucket {
80 oci_client: Oci,
82 pub namespace: String,
84 pub name: String,
86 endpoint: String,
88 protocol: String,
90}
91
92impl Bucket {
93 async fn execute(
94 &self,
95 method: Method,
96 path: &str,
97 body: Option<String>,
98 content_type: Option<&str>,
99 extra_headers: Vec<(String, String)>,
100 ) -> Result<reqwest::Response> {
101 self.oci_client
102 .executor()
103 .execute(
104 method,
105 RequestTarget {
106 scheme: &self.protocol,
107 host: &self.endpoint,
108 path,
109 },
110 RequestPayload {
111 body,
112 content_type,
113 extra_headers,
114 },
115 )
116 .await
117 }
118
119 async fn request<T, B>(&self, method: &str, path: &str, body: Option<B>) -> Result<T>
121 where
122 T: DeserializeOwned,
123 B: Serialize,
124 {
125 let body_str = if let Some(b) = &body {
126 Some(serde_json::to_string(b)?)
127 } else {
128 None
129 };
130 let method = parse_method(method)?;
131 let response = self
132 .execute(method, path, body_str, Some("application/json"), Vec::new())
133 .await?;
134 let text = response.text().await?;
135 serde_json::from_str(&text).map_err(Into::into)
136 }
137
138 async fn request_no_content<B>(&self, method: &str, path: &str, body: Option<B>) -> Result<()>
139 where
140 B: Serialize,
141 {
142 let body_str = if let Some(b) = &body {
143 Some(serde_json::to_string(b)?)
144 } else {
145 None
146 };
147 let method = parse_method(method)?;
148 self.execute(method, path, body_str, Some("application/json"), Vec::new())
149 .await?;
150 Ok(())
151 }
152
153 pub async fn put_object(&self, object_name: &str, content: &str) -> Result<Object> {
159 self.put_object_internal(object_name, content, None).await
160 }
161
162 pub async fn put_object_with_checksum(
169 &self,
170 object_name: &str,
171 content: &str,
172 algorithm: ChecksumAlgorithm,
173 ) -> Result<Object> {
174 self.put_object_internal(object_name, content, Some(algorithm))
175 .await
176 }
177
178 async fn put_object_internal(
179 &self,
180 object_name: &str,
181 content: &str,
182 algorithm: Option<ChecksumAlgorithm>,
183 ) -> Result<Object> {
184 let path = format!("/n/{}/b/{}/o/{}", self.namespace, self.name, object_name);
185 let mut extra_headers = Vec::new();
186
187 if let Some(algo) = algorithm {
188 let data = content.as_bytes();
189 match algo {
190 ChecksumAlgorithm::SHA256 => {
191 let mut hasher = Sha256::new();
192 hasher.update(data);
193 let result = hasher.finalize();
194 let b64 = BASE64.encode(result);
195 extra_headers.push(("opc-checksum-algorithm".to_owned(), "SHA256".to_owned()));
196 extra_headers.push(("opc-content-sha256".to_owned(), b64));
197 }
198 ChecksumAlgorithm::SHA384 => {
199 let mut hasher = Sha384::new();
200 hasher.update(data);
201 let result = hasher.finalize();
202 let b64 = BASE64.encode(result);
203 extra_headers.push(("opc-checksum-algorithm".to_owned(), "SHA384".to_owned()));
204 extra_headers.push(("opc-content-sha384".to_owned(), b64));
205 }
206 ChecksumAlgorithm::CRC32C => {
207 let crc = crc32c::crc32c(data);
208 let bytes = crc.to_be_bytes();
209 let b64 = BASE64.encode(bytes);
210 extra_headers.push(("opc-checksum-algorithm".to_owned(), "CRC32C".to_owned()));
211 extra_headers.push(("opc-content-crc32c".to_owned(), b64));
212 }
213 }
214 }
215
216 let response = self
217 .execute(
218 Method::PUT,
219 &path,
220 Some(content.to_owned()),
221 Some("application/octet-stream"),
222 extra_headers,
223 )
224 .await?;
225 let headers = response.headers();
226 let md5 = headers
227 .get("opc-content-md5")
228 .and_then(|h| h.to_str().ok())
229 .ok_or_else(|| Error::Other("Missing required header: opc-content-md5".to_string()))?
230 .to_string();
231
232 let mut checksum = None;
233
234 if let Some(val) = headers
235 .get("opc-content-sha256")
236 .and_then(|h| h.to_str().ok())
237 {
238 checksum = Some(Checksum {
239 algorithm: ChecksumAlgorithm::SHA256,
240 value: val.to_string(),
241 });
242 } else if let Some(val) = headers
243 .get("opc-content-sha384")
244 .and_then(|h| h.to_str().ok())
245 {
246 checksum = Some(Checksum {
247 algorithm: ChecksumAlgorithm::SHA384,
248 value: val.to_string(),
249 });
250 } else if let Some(val) = headers
251 .get("opc-content-crc32c")
252 .and_then(|h| h.to_str().ok())
253 {
254 checksum = Some(Checksum {
255 algorithm: ChecksumAlgorithm::CRC32C,
256 value: val.to_string(),
257 });
258 }
259
260 Ok(Object {
261 name: object_name.to_string(),
262 value: content.to_string(),
263 md5,
264 checksum,
265 })
266 }
267
268 pub async fn get_object(&self, object_name: &str) -> Result<Object> {
273 let path = format!("/n/{}/b/{}/o/{}", self.namespace, self.name, object_name);
274 let response = self
275 .execute(Method::GET, &path, None, None, Vec::new())
276 .await?;
277
278 let headers = response.headers();
279 let md5 = headers
280 .get("content-md5")
281 .or_else(|| headers.get("opc-multipart-md5"))
282 .and_then(|h| h.to_str().ok())
283 .ok_or_else(|| Error::Other("Missing required header: content-md5".to_string()))?
284 .to_string();
285
286 let mut checksum = None;
287
288 if let Some(val) = headers
289 .get("opc-content-sha256")
290 .and_then(|h| h.to_str().ok())
291 {
292 checksum = Some(Checksum {
293 algorithm: ChecksumAlgorithm::SHA256,
294 value: val.to_string(),
295 });
296 } else if let Some(val) = headers
297 .get("opc-content-sha384")
298 .and_then(|h| h.to_str().ok())
299 {
300 checksum = Some(Checksum {
301 algorithm: ChecksumAlgorithm::SHA384,
302 value: val.to_string(),
303 });
304 } else if let Some(val) = headers
305 .get("opc-content-crc32c")
306 .and_then(|h| h.to_str().ok())
307 {
308 checksum = Some(Checksum {
309 algorithm: ChecksumAlgorithm::CRC32C,
310 value: val.to_string(),
311 });
312 }
313
314 let value = response.text().await?;
315
316 Ok(Object {
317 name: object_name.to_string(),
318 value,
319 md5,
320 checksum,
321 })
322 }
323
324 pub async fn get_or_create_object(&self, object_name: &str, content: &str) -> Result<Object> {
332 match self.get_object(object_name).await {
333 Ok(obj) => Ok(obj),
334 Err(Error::ApiError { code, .. }) if code.contains("404") => {
335 self.put_object(object_name, content).await
336 }
337 Err(e) => Err(e),
338 }
339 }
340
341 pub async fn get_retention_rules(&self) -> Result<Vec<RetentionRule>> {
343 let path = format!("/n/{}/b/{}/retentionRules", self.namespace, self.name);
344
345 #[derive(Deserialize)]
346 struct ResponseWrapper {
347 items: Vec<RetentionRule>,
348 }
349
350 let wrapper: ResponseWrapper = self
351 .request::<ResponseWrapper, ()>("GET", &path, None)
352 .await?;
353 Ok(wrapper.items)
354 }
355
356 pub async fn create_retention_rule(
358 &self,
359 details: RetentionRuleDetails,
360 ) -> Result<RetentionRule> {
361 let path = format!("/n/{}/b/{}/retentionRules", self.namespace, self.name);
362 self.request("POST", &path, Some(details)).await
363 }
364
365 pub async fn get_retention_rule(&self, rule_id: &str) -> Result<RetentionRule> {
367 let path = format!(
368 "/n/{}/b/{}/retentionRules/{}",
369 self.namespace, self.name, rule_id
370 );
371 self.request("GET", &path, None::<()>).await
372 }
373
374 pub async fn update_retention_rule(
376 &self,
377 rule_or_id: impl Into<String>,
378 details: RetentionRuleDetails,
379 ) -> Result<RetentionRule> {
380 let rule_id = rule_or_id.into();
381 let path = format!(
382 "/n/{}/b/{}/retentionRules/{}",
383 self.namespace, self.name, rule_id
384 );
385 self.request("PUT", &path, Some(details)).await
386 }
387
388 pub async fn delete_retention_rule(&self, rule_or_id: impl Into<String>) -> Result<()> {
390 let rule_id = rule_or_id.into();
391 let path = format!(
392 "/n/{}/b/{}/retentionRules/{}",
393 self.namespace, self.name, rule_id
394 );
395 self.request_no_content("DELETE", &path, None::<()>).await
396 }
397}
398
399fn parse_method(method: &str) -> Result<Method> {
400 Method::from_bytes(method.as_bytes())
401 .map_err(|e| Error::Other(format!("Unsupported method {method}: {e}")))
402}
403
404#[cfg(test)]
405#[path = "tests.rs"]
406mod tests;