1use crate::client::Oci;
4use crate::error::{Error, Result};
5use crate::services::object_storage::models::*;
6use serde::Deserialize;
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
10use sha2::{Sha256, Sha384, Digest as ShaDigest};
11
12#[derive(Clone)]
14pub struct ObjectStorage {
15 oci_client: Oci,
17 pub namespace: String,
19 endpoint: String,
21 protocol: String,
23}
24
25impl ObjectStorage {
26 pub fn new(oci_client: &Oci, namespace: impl Into<String>) -> Self {
32 let region = oci_client.region().to_string();
33 let endpoint = format!("objectstorage.{region}.oraclecloud.com");
34
35 Self {
36 oci_client: oci_client.clone(),
37 namespace: namespace.into(),
38 endpoint,
39 protocol: "https".to_string(),
40 }
41 }
42
43 pub async fn get_bucket(&self, bucket_name: &str) -> Result<Bucket> {
48 let path = format!("/n/{}/b/{}/", self.namespace, bucket_name);
50 let url = format!("{}://{}{}", self.protocol, self.endpoint, path);
51
52 let (date_header, auth_header) =
53 self.oci_client
54 .signer()
55 .sign_request("GET", &path, &self.endpoint, None)?;
56
57 let response = self
58 .oci_client
59 .client()
60 .get(&url)
61 .header("host", &self.endpoint)
62 .header("date", &date_header)
63 .header("authorization", &auth_header)
64 .send()
65 .await?;
66
67 if !response.status().is_success() {
68 let status = response.status();
69 let body = response.text().await?;
70 return Err(Error::ApiError {
71 code: status.to_string(),
72 message: body,
73 });
74 }
75
76 Ok(Bucket {
78 oci_client: self.oci_client.clone(),
79 namespace: self.namespace.clone(),
80 name: bucket_name.to_string(),
81 endpoint: self.endpoint.clone(),
82 protocol: self.protocol.clone(),
83 })
84 }
85}
86
87#[derive(Clone)]
89pub struct Bucket {
90 oci_client: Oci,
92 pub namespace: String,
94 pub name: String,
96 endpoint: String,
98 protocol: String,
100}
101
102impl Bucket {
103 async fn request<T, B>(&self, method: &str, path: &str, body: Option<B>) -> Result<T>
105 where
106 T: DeserializeOwned,
107 B: Serialize,
108 {
109 let url = format!("{}://{}{}", self.protocol, self.endpoint, path);
110 let body_str = if let Some(b) = &body {
111 Some(serde_json::to_string(b)?)
112 } else {
113 None
114 };
115
116 let (date_header, auth_header) = self.oci_client.signer().sign_request(
117 method,
118 path,
119 &self.endpoint,
120 body_str.as_deref(),
121 )?;
122
123 let mut request_builder = match method {
124 "GET" => self.oci_client.client().get(&url),
125 "POST" => self.oci_client.client().post(&url),
126 "PUT" => self.oci_client.client().put(&url),
127 "DELETE" => self.oci_client.client().delete(&url),
128 _ => return Err(Error::Other(format!("Unsupported method: {}", method))),
129 };
130
131 request_builder = request_builder
132 .header("host", &self.endpoint)
133 .header("date", &date_header)
134 .header("authorization", &auth_header);
135
136 if let Some(b_str) = body_str {
137 request_builder = request_builder
138 .header("content-type", "application/json")
139 .header("content-length", b_str.len().to_string())
140 .body(b_str);
141 }
142
143 let response = request_builder.send().await?;
144
145 if !response.status().is_success() {
146 let status = response.status();
147 let body = response.text().await?;
148 return Err(Error::ApiError {
149 code: status.to_string(),
150 message: body,
151 });
152 }
153
154 let text = response.text().await?;
155 serde_json::from_str(&text).map_err(Into::into)
156 }
157
158 async fn request_no_content<B>(&self, method: &str, path: &str, body: Option<B>) -> Result<()>
159 where
160 B: Serialize,
161 {
162 let url = format!("{}://{}{}", self.protocol, self.endpoint, path);
163 let body_str = if let Some(b) = &body {
164 Some(serde_json::to_string(b)?)
165 } else {
166 None
167 };
168
169 let (date_header, auth_header) = self.oci_client.signer().sign_request(
170 method,
171 path,
172 &self.endpoint,
173 body_str.as_deref(),
174 )?;
175
176 let mut request_builder = match method {
177 "GET" => self.oci_client.client().get(&url),
178 "POST" => self.oci_client.client().post(&url),
179 "PUT" => self.oci_client.client().put(&url),
180 "DELETE" => self.oci_client.client().delete(&url),
181 _ => return Err(Error::Other(format!("Unsupported method: {}", method))),
182 };
183
184 request_builder = request_builder
185 .header("host", &self.endpoint)
186 .header("date", &date_header)
187 .header("authorization", &auth_header);
188
189 if let Some(b_str) = body_str {
190 request_builder = request_builder
191 .header("content-type", "application/json")
192 .header("content-length", b_str.len().to_string())
193 .body(b_str);
194 }
195
196 let response = request_builder.send().await?;
197
198 if !response.status().is_success() {
199 let status = response.status();
200 let body = response.text().await?;
201 return Err(Error::ApiError {
202 code: status.to_string(),
203 message: body,
204 });
205 }
206
207 Ok(())
208 }
209
210 pub async fn put_object(&self, object_name: &str, content: &str) -> Result<Object> {
216 self.put_object_internal(object_name, content, None).await
217 }
218
219 pub async fn put_object_with_checksum(
226 &self,
227 object_name: &str,
228 content: &str,
229 algorithm: ChecksumAlgorithm,
230 ) -> Result<Object> {
231 self.put_object_internal(object_name, content, Some(algorithm)).await
232 }
233
234 async fn put_object_internal(
235 &self,
236 object_name: &str,
237 content: &str,
238 algorithm: Option<ChecksumAlgorithm>,
239 ) -> Result<Object> {
240 let path = format!("/n/{}/b/{}/o/{}", self.namespace, self.name, object_name);
241 let url = format!("{}://{}{}", self.protocol, self.endpoint, path);
242
243 let (date_header, auth_header) =
244 self.oci_client
245 .signer()
246 .sign_request("PUT", &path, &self.endpoint, Some(content))?;
247
248 let mut request_builder = self
249 .oci_client
250 .client()
251 .put(&url)
252 .header("host", &self.endpoint)
253 .header("date", &date_header)
254 .header("authorization", &auth_header)
255 .header("content-length", content.len().to_string());
256
257 if let Some(algo) = algorithm {
258 let data = content.as_bytes();
259 match algo {
260 ChecksumAlgorithm::SHA256 => {
261 let mut hasher = Sha256::new();
262 hasher.update(data);
263 let result = hasher.finalize();
264 let b64 = BASE64.encode(result);
265 request_builder = request_builder
266 .header("opc-checksum-algorithm", "SHA256")
267 .header("opc-content-sha256", b64);
268 }
269 ChecksumAlgorithm::SHA384 => {
270 let mut hasher = Sha384::new();
271 hasher.update(data);
272 let result = hasher.finalize();
273 let b64 = BASE64.encode(result);
274 request_builder = request_builder
275 .header("opc-checksum-algorithm", "SHA384")
276 .header("opc-content-sha384", b64);
277 }
278 ChecksumAlgorithm::CRC32C => {
279 let crc = crc32c::crc32c(data);
280 let bytes = crc.to_be_bytes();
281 let b64 = BASE64.encode(bytes);
282 request_builder = request_builder
283 .header("opc-checksum-algorithm", "CRC32C")
284 .header("opc-content-crc32c", b64);
285 }
286 }
287 }
288
289 let response = request_builder
290 .body(content.to_string())
291 .send()
292 .await?;
293
294 if !response.status().is_success() {
295 let status = response.status();
296 let body = response.text().await?;
297 return Err(Error::ApiError {
298 code: status.to_string(),
299 message: body,
300 });
301 }
302
303 let headers = response.headers();
304 let md5 = headers
305 .get("opc-content-md5")
306 .and_then(|h| h.to_str().ok())
307 .ok_or_else(|| Error::Other("Missing required header: opc-content-md5".to_string()))?
308 .to_string();
309
310 let mut checksum = None;
311
312 if let Some(val) = headers.get("opc-content-sha256").and_then(|h| h.to_str().ok()) {
313 checksum = Some(Checksum {
314 algorithm: ChecksumAlgorithm::SHA256,
315 value: val.to_string(),
316 });
317 } else if let Some(val) = headers.get("opc-content-sha384").and_then(|h| h.to_str().ok()) {
318 checksum = Some(Checksum {
319 algorithm: ChecksumAlgorithm::SHA384,
320 value: val.to_string(),
321 });
322 } else if let Some(val) = headers.get("opc-content-crc32c").and_then(|h| h.to_str().ok()) {
323 checksum = Some(Checksum {
324 algorithm: ChecksumAlgorithm::CRC32C,
325 value: val.to_string(),
326 });
327 }
328
329 Ok(Object {
330 name: object_name.to_string(),
331 value: content.to_string(),
332 md5,
333 checksum,
334 })
335 }
336
337 pub async fn get_object(&self, object_name: &str) -> Result<Object> {
342 let path = format!("/n/{}/b/{}/o/{}", self.namespace, self.name, object_name);
343 let url = format!("{}://{}{}", self.protocol, self.endpoint, path);
344
345 let (date_header, auth_header) =
346 self.oci_client
347 .signer()
348 .sign_request("GET", &path, &self.endpoint, None)?;
349
350 let response = self
351 .oci_client
352 .client()
353 .get(&url)
354 .header("host", &self.endpoint)
355 .header("date", &date_header)
356 .header("authorization", &auth_header)
357 .send()
358 .await?;
359
360 if !response.status().is_success() {
361 let status = response.status();
362 let body = response.text().await?;
363 return Err(Error::ApiError {
364 code: status.to_string(),
365 message: body,
366 });
367 }
368
369 let headers = response.headers();
370 let md5 = headers
371 .get("content-md5")
372 .or_else(|| headers.get("opc-multipart-md5"))
373 .and_then(|h| h.to_str().ok())
374 .ok_or_else(|| Error::Other("Missing required header: content-md5".to_string()))?
375 .to_string();
376
377 let mut checksum = None;
378
379 if let Some(val) = headers.get("opc-content-sha256").and_then(|h| h.to_str().ok()) {
380 checksum = Some(Checksum {
381 algorithm: ChecksumAlgorithm::SHA256,
382 value: val.to_string(),
383 });
384 } else if let Some(val) = headers.get("opc-content-sha384").and_then(|h| h.to_str().ok()) {
385 checksum = Some(Checksum {
386 algorithm: ChecksumAlgorithm::SHA384,
387 value: val.to_string(),
388 });
389 } else if let Some(val) = headers.get("opc-content-crc32c").and_then(|h| h.to_str().ok()) {
390 checksum = Some(Checksum {
391 algorithm: ChecksumAlgorithm::CRC32C,
392 value: val.to_string(),
393 });
394 }
395
396 let value = response.text().await?;
397
398 Ok(Object {
399 name: object_name.to_string(),
400 value,
401 md5,
402 checksum,
403 })
404 }
405
406 pub async fn get_or_create_object(&self, object_name: &str, content: &str) -> Result<Object> {
414 match self.get_object(object_name).await {
415 Ok(obj) => Ok(obj),
416 Err(Error::ApiError { code, .. }) if code.contains("404") => {
417 self.put_object(object_name, content).await
418 }
419 Err(e) => Err(e),
420 }
421 }
422
423 pub async fn get_retention_rules(&self) -> Result<Vec<RetentionRule>> {
425 let path = format!("/n/{}/b/{}/retentionRules", self.namespace, self.name);
426
427 #[derive(Deserialize)]
428 struct ResponseWrapper {
429 items: Vec<RetentionRule>,
430 }
431
432 let wrapper: ResponseWrapper = self
433 .request::<ResponseWrapper, ()>("GET", &path, None)
434 .await?;
435 Ok(wrapper.items)
436 }
437
438 pub async fn create_retention_rule(
440 &self,
441 details: RetentionRuleDetails,
442 ) -> Result<RetentionRule> {
443 let path = format!("/n/{}/b/{}/retentionRules", self.namespace, self.name);
444 self.request("POST", &path, Some(details)).await
445 }
446
447 pub async fn get_retention_rule(&self, rule_id: &str) -> Result<RetentionRule> {
449 let path = format!(
450 "/n/{}/b/{}/retentionRules/{}",
451 self.namespace, self.name, rule_id
452 );
453 self.request("GET", &path, None::<()>).await
454 }
455
456 pub async fn update_retention_rule(
458 &self,
459 rule_or_id: impl Into<String>,
460 details: RetentionRuleDetails,
461 ) -> Result<RetentionRule> {
462 let rule_id = rule_or_id.into();
463 let path = format!(
464 "/n/{}/b/{}/retentionRules/{}",
465 self.namespace, self.name, rule_id
466 );
467 self.request("PUT", &path, Some(details)).await
468 }
469
470 pub async fn delete_retention_rule(&self, rule_or_id: impl Into<String>) -> Result<()> {
472 let rule_id = rule_or_id.into();
473 let path = format!(
474 "/n/{}/b/{}/retentionRules/{}",
475 self.namespace, self.name, rule_id
476 );
477 self.request_no_content("DELETE", &path, None::<()>).await
478 }
479}
480
481#[cfg(test)]
482#[path = "tests.rs"]
483mod tests;