1use std::io::Result;
8use std::sync::Arc;
9use std::time::SystemTime;
10
11use base64::Engine;
12use hmac::{Hmac, Mac};
13use reqwest::header::HeaderMap;
14use reqwest::Method;
15use sha1::Sha1;
16
17use nydus_api::OssConfig;
18use nydus_utils::metrics::BackendMetrics;
19
20use crate::backend::connection::{Connection, ConnectionConfig};
21use crate::backend::object_storage::{ObjectStorage, ObjectStorageState};
22
23const HEADER_DATE: &str = "Date";
24const HEADER_AUTHORIZATION: &str = "Authorization";
25
26type HmacSha1 = Hmac<Sha1>;
27
28#[derive(Debug)]
30pub struct OssState {
31 access_key_id: String,
32 access_key_secret: String,
33 scheme: String,
34 object_prefix: String,
35 endpoint: String,
36 bucket_name: String,
37 retry_limit: u8,
38}
39
40impl OssState {
41 fn resource(&self, object_key: &str, query_str: &str) -> String {
42 format!("/{}/{}{}", self.bucket_name, object_key, query_str)
43 }
44}
45
46impl ObjectStorageState for OssState {
47 fn url(&self, object_key: &str, query: &[&str]) -> (String, String) {
48 let object_key = &format!("{}{}", self.object_prefix, object_key);
49 let url = format!(
50 "{}://{}.{}/{}",
51 self.scheme, self.bucket_name, self.endpoint, object_key
52 );
53
54 if query.is_empty() {
55 (self.resource(object_key, ""), url)
56 } else {
57 let query_str = format!("?{}", query.join("&"));
58 let resource = self.resource(object_key, &query_str);
59 let url = format!("{}{}", url.as_str(), &query_str);
60 (resource, url)
61 }
62 }
63
64 fn sign(
66 &self,
67 verb: Method,
68 headers: &mut HeaderMap,
69 canonicalized_resource: &str,
70 _: &str,
71 ) -> Result<()> {
72 let content_md5 = "";
73 let content_type = "";
74 let mut canonicalized_oss_headers = vec![];
75 let date = httpdate::fmt_http_date(SystemTime::now());
76 let mut data = vec![
77 verb.as_str(),
78 content_md5,
79 content_type,
80 date.as_str(),
81 canonicalized_resource,
83 ];
84
85 for (name, value) in headers.iter() {
86 let name = name.as_str();
87 let value = value.to_str().map_err(|e| einval!(e))?;
88 if name.starts_with("x-oss-") {
89 let header = format!("{}:{}", name.to_lowercase(), value);
90 canonicalized_oss_headers.push(header);
91 }
92 }
93 let canonicalized_oss_headers = canonicalized_oss_headers.join("\n");
94 if !canonicalized_oss_headers.is_empty() {
95 data.insert(4, canonicalized_oss_headers.as_str());
96 }
97 let data = data.join("\n");
98 let hmac = HmacSha1::new_from_slice(self.access_key_secret.as_bytes())
99 .map_err(|e| einval!(e))?
100 .chain_update(data.as_bytes())
101 .finalize()
102 .into_bytes();
103 let signature = base64::engine::general_purpose::STANDARD.encode(hmac);
104
105 let authorization = format!("OSS {}:{}", self.access_key_id, signature);
106
107 headers.insert(HEADER_DATE, date.as_str().parse().map_err(|e| einval!(e))?);
108 headers.insert(
109 HEADER_AUTHORIZATION,
110 authorization.as_str().parse().map_err(|e| einval!(e))?,
111 );
112
113 Ok(())
114 }
115
116 fn retry_limit(&self) -> u8 {
117 self.retry_limit
118 }
119}
120
121pub type Oss = ObjectStorage<OssState>;
123
124impl Oss {
125 pub fn new(oss_config: &OssConfig, id: Option<&str>) -> Result<Oss> {
127 let con_config: ConnectionConfig = oss_config.clone().into();
128 let retry_limit = con_config.retry_limit;
129 let connection = Connection::new(&con_config)?;
130 let state = Arc::new(OssState {
131 scheme: oss_config.scheme.clone(),
132 object_prefix: oss_config.object_prefix.clone(),
133 endpoint: oss_config.endpoint.clone(),
134 access_key_id: oss_config.access_key_id.clone(),
135 access_key_secret: oss_config.access_key_secret.clone(),
136 bucket_name: oss_config.bucket_name.clone(),
137 retry_limit,
138 });
139 let metrics = id.map(|i| BackendMetrics::new(i, "oss"));
140
141 Ok(ObjectStorage::new_object_storage(
142 connection,
143 state,
144 metrics,
145 id.map(|i| i.to_string()),
146 ))
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use crate::backend::BlobBackend;
153
154 use super::*;
155
156 #[test]
157 fn test_oss_state() {
158 let state = OssState {
159 access_key_id: "key".to_string(),
160 access_key_secret: "secret".to_string(),
161 scheme: "https".to_string(),
162 object_prefix: "nydus".to_string(),
163 endpoint: "oss".to_string(),
164 bucket_name: "images".to_string(),
165 retry_limit: 5,
166 };
167
168 assert_eq!(
169 state.resource("obj_key", "?idontcare"),
170 "/images/obj_key?idontcare"
171 );
172
173 let (resource, url) = state.url("obj_key", &["idontcare", "second"]);
174 assert_eq!(resource, "/images/nydusobj_key?idontcare&second");
175 assert_eq!(url, "https://images.oss/nydusobj_key?idontcare&second");
176
177 let mut headers = HeaderMap::new();
178 state
179 .sign(Method::HEAD, &mut headers, resource.as_str(), "")
180 .unwrap();
181 let signature = headers.get(HEADER_AUTHORIZATION).unwrap();
182 assert!(signature.to_str().unwrap().contains("OSS key:"));
183 }
184
185 #[test]
186 fn test_oss_new() {
187 let json_str = "{\"access_key_id\":\"key\",\"access_key_secret\":\"secret\",\"bucket_name\":\"images\",\"endpoint\":\"/oss\",\"object_prefix\":\"nydus\",\"scheme\":\"\",\"proxy\":{\"url\":\"\",\"ping_url\":\"\",\"fallback\":true,\"check_interval\":5},\"timeout\":5,\"connect_timeout\":5,\"retry_limit\":5}";
188 let config: OssConfig = serde_json::from_str(json_str).unwrap();
189 let oss = Oss::new(&config, Some("test-image")).unwrap();
190
191 oss.metrics();
192
193 let reader = oss.get_reader("test").unwrap();
194 assert_eq!(reader.retry_limit(), 5);
195
196 oss.shutdown();
197 }
198}