1use std::collections::BTreeMap;
9use std::fmt::Debug;
10use std::io::Result;
11use std::sync::Arc;
12
13use hmac::{Hmac, Mac};
14use http::Uri;
15use nydus_api::S3Config;
16use nydus_utils::metrics::BackendMetrics;
17use reqwest::header::HeaderMap;
18use reqwest::Method;
19use sha2::{Digest, Sha256};
20use time::{format_description, OffsetDateTime};
21
22use crate::backend::connection::{Connection, ConnectionConfig};
23use crate::backend::object_storage::{ObjectStorage, ObjectStorageState};
24
25const EMPTY_SHA256: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
26const HEADER_HOST: &str = "Host";
27const HEADER_AWZ_DATE: &str = "x-amz-date";
28const HEADER_AWZ_CONTENT_SHA256: &str = "x-amz-content-sha256";
29const S3_DEFAULT_ENDPOINT: &str = "s3.amazonaws.com";
30
31#[derive(Debug)]
32pub struct S3State {
33 region: String,
34 access_key_id: String,
35 access_key_secret: String,
36 scheme: String,
37 object_prefix: String,
38 endpoint: String,
39 bucket_name: String,
40 retry_limit: u8,
41}
42
43pub type S3 = ObjectStorage<S3State>;
45
46impl S3 {
47 pub fn new(s3_config: &S3Config, id: Option<&str>) -> Result<S3> {
49 let con_config: ConnectionConfig = s3_config.clone().into();
50 let retry_limit = con_config.retry_limit;
51 let connection = Connection::new(&con_config)?;
52 let final_endpoint = if s3_config.endpoint.is_empty() {
53 S3_DEFAULT_ENDPOINT.to_string()
54 } else {
55 s3_config.endpoint.clone()
56 };
57
58 let state = Arc::new(S3State {
59 region: s3_config.region.clone(),
60 scheme: s3_config.scheme.clone(),
61 object_prefix: s3_config.object_prefix.clone(),
62 endpoint: final_endpoint,
63 access_key_id: s3_config.access_key_id.clone(),
64 access_key_secret: s3_config.access_key_secret.clone(),
65 bucket_name: s3_config.bucket_name.clone(),
66 retry_limit,
67 });
68 let metrics = id.map(|i| BackendMetrics::new(i, "oss"));
69
70 Ok(ObjectStorage::new_object_storage(
71 connection,
72 state,
73 metrics,
74 id.map(|i| i.to_string()),
75 ))
76 }
77}
78
79impl S3State {
80 fn get_canonical_headers(&self, map: &HeaderMap) -> (String, String) {
83 let mut btmap: BTreeMap<String, String> = BTreeMap::new();
84
85 for (k, values) in map.iter() {
86 let key = k.as_str().to_lowercase();
87 if "authorization" == key || "user-agent" == key {
88 continue;
89 }
90 btmap.insert(key.clone(), values.to_str().unwrap().to_string());
91 }
92
93 let mut signed_headers = String::new();
94 let mut canonical_headers = String::new();
95 let mut add_delim = false;
96 for (key, value) in &btmap {
97 if add_delim {
98 signed_headers.push(';');
99 canonical_headers.push('\n');
100 }
101
102 signed_headers.push_str(key);
103
104 canonical_headers.push_str(key);
105 canonical_headers.push(':');
106 canonical_headers.push_str(value);
107
108 add_delim = true;
109 }
110
111 (signed_headers, canonical_headers)
112 }
113
114 fn get_canonical_request_hash(
117 &self,
118 method: &Method,
119 uri: &str,
120 query_string: &str,
121 headers: &str,
122 signed_headers: &str,
123 content_sha256: &str,
124 ) -> String {
125 let canonical_request = format!(
126 "{}\n{}\n{}\n{}\n\n{}\n{}",
127 method, uri, query_string, headers, signed_headers, content_sha256
128 );
129 sha256_hash(canonical_request.as_bytes())
130 }
131
132 pub fn get_signing_key(&self, date: &OffsetDateTime) -> Vec<u8> {
135 let mut key: Vec<u8> = b"AWS4".to_vec();
136 key.extend(self.access_key_secret.as_bytes());
137
138 let date_key = hmac_hash(key.as_slice(), to_signer_date(date).as_bytes());
139 let date_region_key = hmac_hash(date_key.as_slice(), self.region.as_bytes());
140 let date_region_service_key = hmac_hash(date_region_key.as_slice(), "s3".as_bytes());
141 hmac_hash(date_region_service_key.as_slice(), b"aws4_request")
142 }
143}
144
145impl ObjectStorageState for S3State {
146 fn url(&self, obj_key: &str, query_str: &[&str]) -> (String, String) {
147 let query_str = if query_str.is_empty() {
148 "".to_string()
149 } else {
150 format!("?{}", query_str.join("&"))
151 };
152 let resource = format!(
153 "/{}/{}{}{}",
154 self.bucket_name, self.object_prefix, obj_key, query_str
155 );
156 let url = format!("{}://{}{}", self.scheme, self.endpoint, resource,);
157 (resource, url)
158 }
159
160 fn sign(
164 &self,
165 verb: Method,
166 headers: &mut HeaderMap,
167 _: &str,
168 full_resource_url: &str,
169 ) -> Result<()> {
170 let date = OffsetDateTime::now_utc();
171 let content_sha256 = EMPTY_SHA256;
172 let parsed_uri = full_resource_url
173 .to_string()
174 .parse::<Uri>()
175 .map_err(|e| einval!(e))?;
176 let uri_path = parsed_uri.path();
177 let query = parsed_uri.query().unwrap_or("");
178 let host = parsed_uri.host().unwrap_or(self.endpoint.as_str());
179
180 headers.insert(HEADER_HOST, host.parse().map_err(|e| einval!(e))?);
181 headers.insert(
182 HEADER_AWZ_DATE,
183 to_awz_date(&date).parse().map_err(|e| einval!(e))?,
184 );
185 headers.insert(
186 HEADER_AWZ_CONTENT_SHA256,
187 EMPTY_SHA256.parse().map_err(|e| einval!(e))?,
188 );
189 let scope = format!(
190 "{}/{}/{}/aws4_request",
191 to_signer_date(&date),
192 self.region,
193 "s3",
194 );
195 let (signed_headers, canonical_headers) = self.get_canonical_headers(headers);
196 let canonical_request_hash = self.get_canonical_request_hash(
197 &verb,
198 uri_path,
199 query,
200 &canonical_headers,
201 &signed_headers,
202 content_sha256,
203 );
204 let string_to_sign = format!(
205 "AWS4-HMAC-SHA256\n{}\n{}\n{}",
206 to_awz_date(&date),
207 scope,
208 canonical_request_hash
209 );
210 let signing_key = self.get_signing_key(&date);
211 let signature = hmac_hash_hex(signing_key.as_slice(), string_to_sign.as_bytes());
212 let authorization = format!(
213 "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, Signature={}",
214 self.access_key_id, scope, signed_headers, signature
215 );
216 headers.insert(
217 "Authorization",
218 authorization.parse().map_err(|e| einval!(e))?,
219 );
220
221 Ok(())
222 }
223
224 fn retry_limit(&self) -> u8 {
225 self.retry_limit
226 }
227}
228
229fn sha256_hash(data: &[u8]) -> String {
232 let mut hasher = Sha256::new();
233 hasher.update(data);
234 format!("{:x}", hasher.finalize())
235}
236
237fn hmac_hash(key: &[u8], data: &[u8]) -> Vec<u8> {
240 let mut hasher = Hmac::<Sha256>::new_from_slice(key).expect("HMAC can take key of any size");
241 hasher.update(data);
242 hasher.finalize().into_bytes().to_vec()
243}
244
245fn hmac_hash_hex(key: &[u8], data: &[u8]) -> String {
248 hex::encode(hmac_hash(key, data))
249}
250
251fn to_signer_date(date: &OffsetDateTime) -> String {
254 let format = format_description::parse("[year][month][day]").unwrap();
255 date.format(&format).unwrap()
256}
257
258fn to_awz_date(date: &OffsetDateTime) -> String {
261 let format = format_description::parse("[year][month][day]T[hour][minute][second]Z").unwrap();
262 date.format(&format).unwrap()
263}
264
265#[cfg(test)]
266mod tests {
267 use http::{HeaderMap, Method};
268 use nydus_api::S3Config;
269
270 use crate::backend::object_storage::ObjectStorageState;
271 use crate::backend::s3::S3State;
272 use crate::backend::BlobBackend;
273
274 use super::S3;
275
276 fn get_test_s3_state() -> (S3State, String, String) {
277 let state = S3State {
278 region: "us-east-1".to_string(),
279 access_key_id: "test-key".to_string(),
280 access_key_secret: "test-key-secret".to_string(),
281 scheme: "http".to_string(),
282 object_prefix: "test-prefix-".to_string(),
283 endpoint: "localhost:9000".to_string(),
284 bucket_name: "test-bucket".to_string(),
285 retry_limit: 6,
286 };
287 let (resource, url) = state.url("test-object", &["a=b", "c=d"]);
288 (state, resource, url)
289 }
290
291 #[test]
292 fn test_s3_new() {
293 let config_str = r#"{
294 "endpoint": "https://test.com",
295 "region": "us-east-1",
296 "access_key_id": "test",
297 "access_key_secret": "test",
298 "bucket_name": "antsys-nydus",
299 "object_prefix":"nydus_v2/",
300 "retry_limit": 6
301 }"#;
302 let config: S3Config = serde_json::from_str(config_str).unwrap();
303 let s3 = S3::new(&config, Some("test-image")).unwrap();
304
305 s3.metrics();
306
307 let reader = s3.get_reader("test").unwrap();
308 assert_eq!(reader.retry_limit(), 6);
309
310 s3.shutdown();
311 }
312
313 #[test]
314 fn test_s3_state_url() {
315 let (_, resource, url) = get_test_s3_state();
316 assert_eq!(resource, "/test-bucket/test-prefix-test-object?a=b&c=d");
317 assert_eq!(
318 url,
319 "http://localhost:9000/test-bucket/test-prefix-test-object?a=b&c=d"
320 );
321 }
322
323 #[test]
324 fn test_s3_state_sign() {
325 let (state, resource, url) = get_test_s3_state();
326 println!("{}", url);
327 let mut headers = HeaderMap::new();
328 headers.append("Range", "bytes=5242900-".parse().unwrap());
329 let result = state.sign(Method::GET, &mut headers, &resource, &url);
330 assert!(result.is_ok());
331
332 use regex::Regex;
333 let re = Regex::new(r"^AWS4-HMAC-SHA256 Credential=test-key/[0-9]{8}/us-east-1/s3/aws4_request, SignedHeaders=host;range;x-amz-content-sha256;x-amz-date, Signature=[A-Fa-f0-9]{64}$").unwrap();
334 let authorization = headers.get("Authorization").unwrap();
335 assert!(re.is_match(authorization.to_str().unwrap()));
336 }
337}