Skip to main content

nydus_storage/backend/
oss.rs

1// Copyright 2020 Ant Group. All rights reserved.
2// Copyright (C) 2021 Alibaba Cloud. All rights reserved.
3//
4// SPDX-License-Identifier: Apache-2.0
5
6//! Storage backend driver to access blobs on Oss(Object Storage System).
7use std::io::Result;
8use std::sync::Arc;
9use std::time::SystemTime;
10
11use base64::Engine;
12use hmac::{Hmac, Mac};
13use reqwest::header::HeaderMap;
14use reqwest::Method;
15use sha1::Sha1;
16
17use nydus_api::OssConfig;
18use nydus_utils::metrics::BackendMetrics;
19
20use crate::backend::connection::{Connection, ConnectionConfig};
21use crate::backend::object_storage::{ObjectStorage, ObjectStorageState};
22
23const HEADER_DATE: &str = "Date";
24const HEADER_AUTHORIZATION: &str = "Authorization";
25
26type HmacSha1 = Hmac<Sha1>;
27
28// `OssState` is almost identical to `OssConfig`, but let's keep them separated.
29#[derive(Debug)]
30pub struct OssState {
31    access_key_id: String,
32    access_key_secret: String,
33    scheme: String,
34    object_prefix: String,
35    endpoint: String,
36    bucket_name: String,
37    retry_limit: u8,
38}
39
40impl OssState {
41    fn resource(&self, object_key: &str, query_str: &str) -> String {
42        format!("/{}/{}{}", self.bucket_name, object_key, query_str)
43    }
44}
45
46impl ObjectStorageState for OssState {
47    fn url(&self, object_key: &str, query: &[&str]) -> (String, String) {
48        let object_key = &format!("{}{}", self.object_prefix, object_key);
49        let url = format!(
50            "{}://{}.{}/{}",
51            self.scheme, self.bucket_name, self.endpoint, object_key
52        );
53
54        if query.is_empty() {
55            (self.resource(object_key, ""), url)
56        } else {
57            let query_str = format!("?{}", query.join("&"));
58            let resource = self.resource(object_key, &query_str);
59            let url = format!("{}{}", url.as_str(), &query_str);
60            (resource, url)
61        }
62    }
63
64    /// generate oss request signature
65    fn sign(
66        &self,
67        verb: Method,
68        headers: &mut HeaderMap,
69        canonicalized_resource: &str,
70        _: &str,
71    ) -> Result<()> {
72        let content_md5 = "";
73        let content_type = "";
74        let mut canonicalized_oss_headers = vec![];
75        let date = httpdate::fmt_http_date(SystemTime::now());
76        let mut data = vec![
77            verb.as_str(),
78            content_md5,
79            content_type,
80            date.as_str(),
81            // canonicalized_oss_headers,
82            canonicalized_resource,
83        ];
84
85        for (name, value) in headers.iter() {
86            let name = name.as_str();
87            let value = value.to_str().map_err(|e| einval!(e))?;
88            if name.starts_with("x-oss-") {
89                let header = format!("{}:{}", name.to_lowercase(), value);
90                canonicalized_oss_headers.push(header);
91            }
92        }
93        let canonicalized_oss_headers = canonicalized_oss_headers.join("\n");
94        if !canonicalized_oss_headers.is_empty() {
95            data.insert(4, canonicalized_oss_headers.as_str());
96        }
97        let data = data.join("\n");
98        let hmac = HmacSha1::new_from_slice(self.access_key_secret.as_bytes())
99            .map_err(|e| einval!(e))?
100            .chain_update(data.as_bytes())
101            .finalize()
102            .into_bytes();
103        let signature = base64::engine::general_purpose::STANDARD.encode(hmac);
104
105        let authorization = format!("OSS {}:{}", self.access_key_id, signature);
106
107        headers.insert(HEADER_DATE, date.as_str().parse().map_err(|e| einval!(e))?);
108        headers.insert(
109            HEADER_AUTHORIZATION,
110            authorization.as_str().parse().map_err(|e| einval!(e))?,
111        );
112
113        Ok(())
114    }
115
116    fn retry_limit(&self) -> u8 {
117        self.retry_limit
118    }
119}
120
121/// Storage backend to access data stored in OSS.
122pub type Oss = ObjectStorage<OssState>;
123
124impl Oss {
125    /// Create a new OSS storage backend.
126    pub fn new(oss_config: &OssConfig, id: Option<&str>) -> Result<Oss> {
127        let con_config: ConnectionConfig = oss_config.clone().into();
128        let retry_limit = con_config.retry_limit;
129        let connection = Connection::new(&con_config)?;
130        let state = Arc::new(OssState {
131            scheme: oss_config.scheme.clone(),
132            object_prefix: oss_config.object_prefix.clone(),
133            endpoint: oss_config.endpoint.clone(),
134            access_key_id: oss_config.access_key_id.clone(),
135            access_key_secret: oss_config.access_key_secret.clone(),
136            bucket_name: oss_config.bucket_name.clone(),
137            retry_limit,
138        });
139        let metrics = id.map(|i| BackendMetrics::new(i, "oss"));
140
141        Ok(ObjectStorage::new_object_storage(
142            connection,
143            state,
144            metrics,
145            id.map(|i| i.to_string()),
146        ))
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use crate::backend::BlobBackend;
153
154    use super::*;
155
156    #[test]
157    fn test_oss_state() {
158        let state = OssState {
159            access_key_id: "key".to_string(),
160            access_key_secret: "secret".to_string(),
161            scheme: "https".to_string(),
162            object_prefix: "nydus".to_string(),
163            endpoint: "oss".to_string(),
164            bucket_name: "images".to_string(),
165            retry_limit: 5,
166        };
167
168        assert_eq!(
169            state.resource("obj_key", "?idontcare"),
170            "/images/obj_key?idontcare"
171        );
172
173        let (resource, url) = state.url("obj_key", &["idontcare", "second"]);
174        assert_eq!(resource, "/images/nydusobj_key?idontcare&second");
175        assert_eq!(url, "https://images.oss/nydusobj_key?idontcare&second");
176
177        let mut headers = HeaderMap::new();
178        state
179            .sign(Method::HEAD, &mut headers, resource.as_str(), "")
180            .unwrap();
181        let signature = headers.get(HEADER_AUTHORIZATION).unwrap();
182        assert!(signature.to_str().unwrap().contains("OSS key:"));
183    }
184
185    #[test]
186    fn test_oss_new() {
187        let json_str = "{\"access_key_id\":\"key\",\"access_key_secret\":\"secret\",\"bucket_name\":\"images\",\"endpoint\":\"/oss\",\"object_prefix\":\"nydus\",\"scheme\":\"\",\"proxy\":{\"url\":\"\",\"ping_url\":\"\",\"fallback\":true,\"check_interval\":5},\"timeout\":5,\"connect_timeout\":5,\"retry_limit\":5}";
188        let config: OssConfig = serde_json::from_str(json_str).unwrap();
189        let oss = Oss::new(&config, Some("test-image")).unwrap();
190
191        oss.metrics();
192
193        let reader = oss.get_reader("test").unwrap();
194        assert_eq!(reader.retry_limit(), 5);
195
196        oss.shutdown();
197    }
198}