Skip to main content

nydus_storage/backend/
s3.rs

1// Copyright 2022 Ant Group. All rights reserved.
2// Copyright (C) 2022 Alibaba Cloud. All rights reserved.
3
4// SPDX-License-Identifier: Apache-2.0
5
6// ! Storage backend driver to access blobs on s3.
7
8use std::collections::BTreeMap;
9use std::fmt::Debug;
10use std::io::Result;
11use std::sync::Arc;
12
13use hmac::{Hmac, Mac};
14use http::Uri;
15use nydus_api::S3Config;
16use nydus_utils::metrics::BackendMetrics;
17use reqwest::header::HeaderMap;
18use reqwest::Method;
19use sha2::{Digest, Sha256};
20use time::{format_description, OffsetDateTime};
21
22use crate::backend::connection::{Connection, ConnectionConfig};
23use crate::backend::object_storage::{ObjectStorage, ObjectStorageState};
24
25const EMPTY_SHA256: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
26const HEADER_HOST: &str = "Host";
27const HEADER_AWZ_DATE: &str = "x-amz-date";
28const HEADER_AWZ_CONTENT_SHA256: &str = "x-amz-content-sha256";
29const S3_DEFAULT_ENDPOINT: &str = "s3.amazonaws.com";
30
31#[derive(Debug)]
32pub struct S3State {
33    region: String,
34    access_key_id: String,
35    access_key_secret: String,
36    scheme: String,
37    object_prefix: String,
38    endpoint: String,
39    bucket_name: String,
40    retry_limit: u8,
41}
42
43/// Storage backend to access data stored in S3.
44pub type S3 = ObjectStorage<S3State>;
45
46impl S3 {
47    /// Create a new S3 storage backend.
48    pub fn new(s3_config: &S3Config, id: Option<&str>) -> Result<S3> {
49        let con_config: ConnectionConfig = s3_config.clone().into();
50        let retry_limit = con_config.retry_limit;
51        let connection = Connection::new(&con_config)?;
52        let final_endpoint = if s3_config.endpoint.is_empty() {
53            S3_DEFAULT_ENDPOINT.to_string()
54        } else {
55            s3_config.endpoint.clone()
56        };
57
58        let state = Arc::new(S3State {
59            region: s3_config.region.clone(),
60            scheme: s3_config.scheme.clone(),
61            object_prefix: s3_config.object_prefix.clone(),
62            endpoint: final_endpoint,
63            access_key_id: s3_config.access_key_id.clone(),
64            access_key_secret: s3_config.access_key_secret.clone(),
65            bucket_name: s3_config.bucket_name.clone(),
66            retry_limit,
67        });
68        let metrics = id.map(|i| BackendMetrics::new(i, "oss"));
69
70        Ok(ObjectStorage::new_object_storage(
71            connection,
72            state,
73            metrics,
74            id.map(|i| i.to_string()),
75        ))
76    }
77}
78
79impl S3State {
80    // modified based on https://github.com/minio/minio-rs/blob/5fea81d68d381fd2a4c27e4d259f7012de08ab77/src/s3/utils.rs#L155-L200
81    // under apache 2.0 license
82    fn get_canonical_headers(&self, map: &HeaderMap) -> (String, String) {
83        let mut btmap: BTreeMap<String, String> = BTreeMap::new();
84
85        for (k, values) in map.iter() {
86            let key = k.as_str().to_lowercase();
87            if "authorization" == key || "user-agent" == key {
88                continue;
89            }
90            btmap.insert(key.clone(), values.to_str().unwrap().to_string());
91        }
92
93        let mut signed_headers = String::new();
94        let mut canonical_headers = String::new();
95        let mut add_delim = false;
96        for (key, value) in &btmap {
97            if add_delim {
98                signed_headers.push(';');
99                canonical_headers.push('\n');
100            }
101
102            signed_headers.push_str(key);
103
104            canonical_headers.push_str(key);
105            canonical_headers.push(':');
106            canonical_headers.push_str(value);
107
108            add_delim = true;
109        }
110
111        (signed_headers, canonical_headers)
112    }
113
114    // modified based on https://github.com/minio/minio-rs/blob/5fea81d68d381fd2a4c27e4d259f7012de08ab77/src/s3/signer.rs#L44-L64
115    // under apache 2.0 license
116    fn get_canonical_request_hash(
117        &self,
118        method: &Method,
119        uri: &str,
120        query_string: &str,
121        headers: &str,
122        signed_headers: &str,
123        content_sha256: &str,
124    ) -> String {
125        let canonical_request = format!(
126            "{}\n{}\n{}\n{}\n\n{}\n{}",
127            method, uri, query_string, headers, signed_headers, content_sha256
128        );
129        sha256_hash(canonical_request.as_bytes())
130    }
131
132    // modified based on https://github.com/minio/minio-rs/blob/5fea81d68d381fd2a4c27e4d259f7012de08ab77/src/s3/signer.rs#L75-88
133    // under apache 2.0 license
134    pub fn get_signing_key(&self, date: &OffsetDateTime) -> Vec<u8> {
135        let mut key: Vec<u8> = b"AWS4".to_vec();
136        key.extend(self.access_key_secret.as_bytes());
137
138        let date_key = hmac_hash(key.as_slice(), to_signer_date(date).as_bytes());
139        let date_region_key = hmac_hash(date_key.as_slice(), self.region.as_bytes());
140        let date_region_service_key = hmac_hash(date_region_key.as_slice(), "s3".as_bytes());
141        hmac_hash(date_region_service_key.as_slice(), b"aws4_request")
142    }
143}
144
145impl ObjectStorageState for S3State {
146    fn url(&self, obj_key: &str, query_str: &[&str]) -> (String, String) {
147        let query_str = if query_str.is_empty() {
148            "".to_string()
149        } else {
150            format!("?{}", query_str.join("&"))
151        };
152        let resource = format!(
153            "/{}/{}{}{}",
154            self.bucket_name, self.object_prefix, obj_key, query_str
155        );
156        let url = format!("{}://{}{}", self.scheme, self.endpoint, resource,);
157        (resource, url)
158    }
159
160    // modified based on https://github.com/minio/minio-rs/blob/5fea81d68d381fd2a4c27e4d259f7012de08ab77/src/s3/signer.rs#L106-L135
161    // under apache 2.0 license
162    /// generate s3 request signature
163    fn sign(
164        &self,
165        verb: Method,
166        headers: &mut HeaderMap,
167        _: &str,
168        full_resource_url: &str,
169    ) -> Result<()> {
170        let date = OffsetDateTime::now_utc();
171        let content_sha256 = EMPTY_SHA256;
172        let parsed_uri = full_resource_url
173            .to_string()
174            .parse::<Uri>()
175            .map_err(|e| einval!(e))?;
176        let uri_path = parsed_uri.path();
177        let query = parsed_uri.query().unwrap_or("");
178        let host = parsed_uri.host().unwrap_or(self.endpoint.as_str());
179
180        headers.insert(HEADER_HOST, host.parse().map_err(|e| einval!(e))?);
181        headers.insert(
182            HEADER_AWZ_DATE,
183            to_awz_date(&date).parse().map_err(|e| einval!(e))?,
184        );
185        headers.insert(
186            HEADER_AWZ_CONTENT_SHA256,
187            EMPTY_SHA256.parse().map_err(|e| einval!(e))?,
188        );
189        let scope = format!(
190            "{}/{}/{}/aws4_request",
191            to_signer_date(&date),
192            self.region,
193            "s3",
194        );
195        let (signed_headers, canonical_headers) = self.get_canonical_headers(headers);
196        let canonical_request_hash = self.get_canonical_request_hash(
197            &verb,
198            uri_path,
199            query,
200            &canonical_headers,
201            &signed_headers,
202            content_sha256,
203        );
204        let string_to_sign = format!(
205            "AWS4-HMAC-SHA256\n{}\n{}\n{}",
206            to_awz_date(&date),
207            scope,
208            canonical_request_hash
209        );
210        let signing_key = self.get_signing_key(&date);
211        let signature = hmac_hash_hex(signing_key.as_slice(), string_to_sign.as_bytes());
212        let authorization = format!(
213            "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, Signature={}",
214            self.access_key_id, scope, signed_headers, signature
215        );
216        headers.insert(
217            "Authorization",
218            authorization.parse().map_err(|e| einval!(e))?,
219        );
220
221        Ok(())
222    }
223
224    fn retry_limit(&self) -> u8 {
225        self.retry_limit
226    }
227}
228
229// modified based on https://github.com/minio/minio-rs/blob/5fea81d68d381fd2a4c27e4d259f7012de08ab77/src/s3/utils.rs#L52-L56
230// under apache 2.0 license
231fn sha256_hash(data: &[u8]) -> String {
232    let mut hasher = Sha256::new();
233    hasher.update(data);
234    format!("{:x}", hasher.finalize())
235}
236
237// modified based on https://github.com/minio/minio-rs/blob/5fea81d68d381fd2a4c27e4d259f7012de08ab77/src/s3/signer.rs#L25-L29
238// under apache 2.0 license
239fn hmac_hash(key: &[u8], data: &[u8]) -> Vec<u8> {
240    let mut hasher = Hmac::<Sha256>::new_from_slice(key).expect("HMAC can take key of any size");
241    hasher.update(data);
242    hasher.finalize().into_bytes().to_vec()
243}
244
245// modified based on https://github.com/minio/minio-rs/blob/5fea81d68d381fd2a4c27e4d259f7012de08ab77/src/s3/signer.rs#L31-L33
246// under apache 2.0 license
247fn hmac_hash_hex(key: &[u8], data: &[u8]) -> String {
248    hex::encode(hmac_hash(key, data))
249}
250
251// modified based on https://github.com/minio/minio-rs/blob/5fea81d68d381fd2a4c27e4d259f7012de08ab77/src/s3/utils.rs#L66-L68
252// under apache 2.0 license
253fn to_signer_date(date: &OffsetDateTime) -> String {
254    let format = format_description::parse("[year][month][day]").unwrap();
255    date.format(&format).unwrap()
256}
257
258// modified based on https://github.com/minio/minio-rs/blob/5fea81d68d381fd2a4c27e4d259f7012de08ab77/src/s3/utils.rs#L70-L72
259// under apache 2.0 license
260fn to_awz_date(date: &OffsetDateTime) -> String {
261    let format = format_description::parse("[year][month][day]T[hour][minute][second]Z").unwrap();
262    date.format(&format).unwrap()
263}
264
265#[cfg(test)]
266mod tests {
267    use http::{HeaderMap, Method};
268    use nydus_api::S3Config;
269
270    use crate::backend::object_storage::ObjectStorageState;
271    use crate::backend::s3::S3State;
272    use crate::backend::BlobBackend;
273
274    use super::S3;
275
276    fn get_test_s3_state() -> (S3State, String, String) {
277        let state = S3State {
278            region: "us-east-1".to_string(),
279            access_key_id: "test-key".to_string(),
280            access_key_secret: "test-key-secret".to_string(),
281            scheme: "http".to_string(),
282            object_prefix: "test-prefix-".to_string(),
283            endpoint: "localhost:9000".to_string(),
284            bucket_name: "test-bucket".to_string(),
285            retry_limit: 6,
286        };
287        let (resource, url) = state.url("test-object", &["a=b", "c=d"]);
288        (state, resource, url)
289    }
290
291    #[test]
292    fn test_s3_new() {
293        let config_str = r#"{
294            "endpoint": "https://test.com",
295            "region": "us-east-1",
296            "access_key_id": "test",
297            "access_key_secret": "test",
298            "bucket_name": "antsys-nydus",
299            "object_prefix":"nydus_v2/",
300            "retry_limit": 6
301        }"#;
302        let config: S3Config = serde_json::from_str(config_str).unwrap();
303        let s3 = S3::new(&config, Some("test-image")).unwrap();
304
305        s3.metrics();
306
307        let reader = s3.get_reader("test").unwrap();
308        assert_eq!(reader.retry_limit(), 6);
309
310        s3.shutdown();
311    }
312
313    #[test]
314    fn test_s3_state_url() {
315        let (_, resource, url) = get_test_s3_state();
316        assert_eq!(resource, "/test-bucket/test-prefix-test-object?a=b&c=d");
317        assert_eq!(
318            url,
319            "http://localhost:9000/test-bucket/test-prefix-test-object?a=b&c=d"
320        );
321    }
322
323    #[test]
324    fn test_s3_state_sign() {
325        let (state, resource, url) = get_test_s3_state();
326        println!("{}", url);
327        let mut headers = HeaderMap::new();
328        headers.append("Range", "bytes=5242900-".parse().unwrap());
329        let result = state.sign(Method::GET, &mut headers, &resource, &url);
330        assert!(result.is_ok());
331
332        use regex::Regex;
333        let re = Regex::new(r"^AWS4-HMAC-SHA256 Credential=test-key/[0-9]{8}/us-east-1/s3/aws4_request, SignedHeaders=host;range;x-amz-content-sha256;x-amz-date, Signature=[A-Fa-f0-9]{64}$").unwrap();
334        let authorization = headers.get("Authorization").unwrap();
335        assert!(re.is_match(authorization.to_str().unwrap()));
336    }
337}