use crate::client::Oci;
use crate::error::{Error, Result};
use crate::services::object_storage::models::*;
use serde::Deserialize;
use serde::Serialize;
use serde::de::DeserializeOwned;
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
use sha2::{Sha256, Sha384, Digest as ShaDigest};
#[derive(Clone)]
pub struct ObjectStorage {
oci_client: Oci,
pub namespace: String,
endpoint: String,
protocol: String,
}
impl ObjectStorage {
pub fn new(oci_client: &Oci, namespace: impl Into<String>) -> Self {
let region = oci_client.region().to_string();
let endpoint = format!("objectstorage.{region}.oraclecloud.com");
Self {
oci_client: oci_client.clone(),
namespace: namespace.into(),
endpoint,
protocol: "https".to_string(),
}
}
pub async fn get_bucket(&self, bucket_name: &str) -> Result<Bucket> {
let path = format!("/n/{}/b/{}/", self.namespace, bucket_name);
let url = format!("{}://{}{}", self.protocol, self.endpoint, path);
let (date_header, auth_header) =
self.oci_client
.signer()
.sign_request("GET", &path, &self.endpoint, None)?;
let response = self
.oci_client
.client()
.get(&url)
.header("host", &self.endpoint)
.header("date", &date_header)
.header("authorization", &auth_header)
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await?;
return Err(Error::ApiError {
code: status.to_string(),
message: body,
});
}
Ok(Bucket {
oci_client: self.oci_client.clone(),
namespace: self.namespace.clone(),
name: bucket_name.to_string(),
endpoint: self.endpoint.clone(),
protocol: self.protocol.clone(),
})
}
}
#[derive(Clone)]
pub struct Bucket {
oci_client: Oci,
pub namespace: String,
pub name: String,
endpoint: String,
protocol: String,
}
impl Bucket {
async fn request<T, B>(&self, method: &str, path: &str, body: Option<B>) -> Result<T>
where
T: DeserializeOwned,
B: Serialize,
{
let url = format!("{}://{}{}", self.protocol, self.endpoint, path);
let body_str = if let Some(b) = &body {
Some(serde_json::to_string(b)?)
} else {
None
};
let (date_header, auth_header) = self.oci_client.signer().sign_request(
method,
path,
&self.endpoint,
body_str.as_deref(),
)?;
let mut request_builder = match method {
"GET" => self.oci_client.client().get(&url),
"POST" => self.oci_client.client().post(&url),
"PUT" => self.oci_client.client().put(&url),
"DELETE" => self.oci_client.client().delete(&url),
_ => return Err(Error::Other(format!("Unsupported method: {}", method))),
};
request_builder = request_builder
.header("host", &self.endpoint)
.header("date", &date_header)
.header("authorization", &auth_header);
if let Some(b_str) = body_str {
request_builder = request_builder
.header("content-type", "application/json")
.header("content-length", b_str.len().to_string())
.body(b_str);
}
let response = request_builder.send().await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await?;
return Err(Error::ApiError {
code: status.to_string(),
message: body,
});
}
let text = response.text().await?;
serde_json::from_str(&text).map_err(Into::into)
}
async fn request_no_content<B>(&self, method: &str, path: &str, body: Option<B>) -> Result<()>
where
B: Serialize,
{
let url = format!("{}://{}{}", self.protocol, self.endpoint, path);
let body_str = if let Some(b) = &body {
Some(serde_json::to_string(b)?)
} else {
None
};
let (date_header, auth_header) = self.oci_client.signer().sign_request(
method,
path,
&self.endpoint,
body_str.as_deref(),
)?;
let mut request_builder = match method {
"GET" => self.oci_client.client().get(&url),
"POST" => self.oci_client.client().post(&url),
"PUT" => self.oci_client.client().put(&url),
"DELETE" => self.oci_client.client().delete(&url),
_ => return Err(Error::Other(format!("Unsupported method: {}", method))),
};
request_builder = request_builder
.header("host", &self.endpoint)
.header("date", &date_header)
.header("authorization", &auth_header);
if let Some(b_str) = body_str {
request_builder = request_builder
.header("content-type", "application/json")
.header("content-length", b_str.len().to_string())
.body(b_str);
}
let response = request_builder.send().await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await?;
return Err(Error::ApiError {
code: status.to_string(),
message: body,
});
}
Ok(())
}
pub async fn put_object(&self, object_name: &str, content: &str) -> Result<Object> {
self.put_object_internal(object_name, content, None).await
}
pub async fn put_object_with_checksum(
&self,
object_name: &str,
content: &str,
algorithm: ChecksumAlgorithm,
) -> Result<Object> {
self.put_object_internal(object_name, content, Some(algorithm)).await
}
async fn put_object_internal(
&self,
object_name: &str,
content: &str,
algorithm: Option<ChecksumAlgorithm>,
) -> Result<Object> {
let path = format!("/n/{}/b/{}/o/{}", self.namespace, self.name, object_name);
let url = format!("{}://{}{}", self.protocol, self.endpoint, path);
let (date_header, auth_header) =
self.oci_client
.signer()
.sign_request("PUT", &path, &self.endpoint, Some(content))?;
let mut request_builder = self
.oci_client
.client()
.put(&url)
.header("host", &self.endpoint)
.header("date", &date_header)
.header("authorization", &auth_header)
.header("content-length", content.len().to_string());
if let Some(algo) = algorithm {
let data = content.as_bytes();
match algo {
ChecksumAlgorithm::SHA256 => {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
let b64 = BASE64.encode(result);
request_builder = request_builder
.header("opc-checksum-algorithm", "SHA256")
.header("opc-content-sha256", b64);
}
ChecksumAlgorithm::SHA384 => {
let mut hasher = Sha384::new();
hasher.update(data);
let result = hasher.finalize();
let b64 = BASE64.encode(result);
request_builder = request_builder
.header("opc-checksum-algorithm", "SHA384")
.header("opc-content-sha384", b64);
}
ChecksumAlgorithm::CRC32C => {
let crc = crc32c::crc32c(data);
let bytes = crc.to_be_bytes();
let b64 = BASE64.encode(bytes);
request_builder = request_builder
.header("opc-checksum-algorithm", "CRC32C")
.header("opc-content-crc32c", b64);
}
}
}
let response = request_builder
.body(content.to_string())
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await?;
return Err(Error::ApiError {
code: status.to_string(),
message: body,
});
}
let headers = response.headers();
let md5 = headers
.get("opc-content-md5")
.and_then(|h| h.to_str().ok())
.ok_or_else(|| Error::Other("Missing required header: opc-content-md5".to_string()))?
.to_string();
let mut checksum = None;
if let Some(val) = headers.get("opc-content-sha256").and_then(|h| h.to_str().ok()) {
checksum = Some(Checksum {
algorithm: ChecksumAlgorithm::SHA256,
value: val.to_string(),
});
} else if let Some(val) = headers.get("opc-content-sha384").and_then(|h| h.to_str().ok()) {
checksum = Some(Checksum {
algorithm: ChecksumAlgorithm::SHA384,
value: val.to_string(),
});
} else if let Some(val) = headers.get("opc-content-crc32c").and_then(|h| h.to_str().ok()) {
checksum = Some(Checksum {
algorithm: ChecksumAlgorithm::CRC32C,
value: val.to_string(),
});
}
Ok(Object {
name: object_name.to_string(),
value: content.to_string(),
md5,
checksum,
})
}
pub async fn get_object(&self, object_name: &str) -> Result<Object> {
let path = format!("/n/{}/b/{}/o/{}", self.namespace, self.name, object_name);
let url = format!("{}://{}{}", self.protocol, self.endpoint, path);
let (date_header, auth_header) =
self.oci_client
.signer()
.sign_request("GET", &path, &self.endpoint, None)?;
let response = self
.oci_client
.client()
.get(&url)
.header("host", &self.endpoint)
.header("date", &date_header)
.header("authorization", &auth_header)
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await?;
return Err(Error::ApiError {
code: status.to_string(),
message: body,
});
}
let headers = response.headers();
let md5 = headers
.get("content-md5")
.or_else(|| headers.get("opc-multipart-md5"))
.and_then(|h| h.to_str().ok())
.ok_or_else(|| Error::Other("Missing required header: content-md5".to_string()))?
.to_string();
let mut checksum = None;
if let Some(val) = headers.get("opc-content-sha256").and_then(|h| h.to_str().ok()) {
checksum = Some(Checksum {
algorithm: ChecksumAlgorithm::SHA256,
value: val.to_string(),
});
} else if let Some(val) = headers.get("opc-content-sha384").and_then(|h| h.to_str().ok()) {
checksum = Some(Checksum {
algorithm: ChecksumAlgorithm::SHA384,
value: val.to_string(),
});
} else if let Some(val) = headers.get("opc-content-crc32c").and_then(|h| h.to_str().ok()) {
checksum = Some(Checksum {
algorithm: ChecksumAlgorithm::CRC32C,
value: val.to_string(),
});
}
let value = response.text().await?;
Ok(Object {
name: object_name.to_string(),
value,
md5,
checksum,
})
}
pub async fn get_or_create_object(&self, object_name: &str, content: &str) -> Result<Object> {
match self.get_object(object_name).await {
Ok(obj) => Ok(obj),
Err(Error::ApiError { code, .. }) if code.contains("404") => {
self.put_object(object_name, content).await
}
Err(e) => Err(e),
}
}
pub async fn get_retention_rules(&self) -> Result<Vec<RetentionRule>> {
let path = format!("/n/{}/b/{}/retentionRules", self.namespace, self.name);
#[derive(Deserialize)]
struct ResponseWrapper {
items: Vec<RetentionRule>,
}
let wrapper: ResponseWrapper = self
.request::<ResponseWrapper, ()>("GET", &path, None)
.await?;
Ok(wrapper.items)
}
pub async fn create_retention_rule(
&self,
details: RetentionRuleDetails,
) -> Result<RetentionRule> {
let path = format!("/n/{}/b/{}/retentionRules", self.namespace, self.name);
self.request("POST", &path, Some(details)).await
}
pub async fn get_retention_rule(&self, rule_id: &str) -> Result<RetentionRule> {
let path = format!(
"/n/{}/b/{}/retentionRules/{}",
self.namespace, self.name, rule_id
);
self.request("GET", &path, None::<()>).await
}
pub async fn update_retention_rule(
&self,
rule_or_id: impl Into<String>,
details: RetentionRuleDetails,
) -> Result<RetentionRule> {
let rule_id = rule_or_id.into();
let path = format!(
"/n/{}/b/{}/retentionRules/{}",
self.namespace, self.name, rule_id
);
self.request("PUT", &path, Some(details)).await
}
pub async fn delete_retention_rule(&self, rule_or_id: impl Into<String>) -> Result<()> {
let rule_id = rule_or_id.into();
let path = format!(
"/n/{}/b/{}/retentionRules/{}",
self.namespace, self.name, rule_id
);
self.request_no_content("DELETE", &path, None::<()>).await
}
}
#[cfg(test)]
#[path = "tests.rs"]
mod tests;