Skip to main content

nydus_storage/backend/
object_storage.rs

1// Copyright 2022 Ant Group. All rights reserved.
2// Copyright (C) 2022 Alibaba Cloud. All rights reserved.
3//
4// SPDX-License-Identifier: Apache-2.0
5
6//! Base module used to implement object storage backend drivers (such as oss, s3, etc.).
7
8use std::fmt;
9use std::fmt::Debug;
10use std::io::{Error, Result};
11use std::marker::Send;
12use std::sync::Arc;
13
14use reqwest::header::{HeaderMap, CONTENT_LENGTH};
15use reqwest::Method;
16
17use nydus_utils::metrics::BackendMetrics;
18
19use super::connection::{Connection, ConnectionError};
20use super::{BackendError, BackendResult, BlobBackend, BlobReader};
21
22/// Error codes related to object storage backend.
23#[derive(Debug)]
24pub enum ObjectStorageError {
25    Auth(Error),
26    Request(ConnectionError),
27    ConstructHeader(String),
28    Transport(reqwest::Error),
29    Response(String),
30}
31
32impl fmt::Display for ObjectStorageError {
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        match self {
35            ObjectStorageError::Auth(e) => write!(f, "failed to generate auth info, {}", e),
36            ObjectStorageError::Request(e) => write!(f, "network communication error, {}", e),
37            ObjectStorageError::ConstructHeader(e) => {
38                write!(f, "failed to generate HTTP header, {}", e)
39            }
40            ObjectStorageError::Transport(e) => write!(f, "network communication error, {}", e),
41            ObjectStorageError::Response(s) => write!(f, "network communication error, {}", s),
42        }
43    }
44}
45
46impl From<ObjectStorageError> for BackendError {
47    fn from(err: ObjectStorageError) -> Self {
48        BackendError::ObjectStorage(err)
49    }
50}
51
52pub trait ObjectStorageState: Send + Sync + Debug {
53    // `url` builds the resource path and full url for the object.
54    fn url(&self, object_key: &str, query: &[&str]) -> (String, String);
55
56    // `sign` signs the request with the access key and secret key.
57    fn sign(
58        &self,
59        verb: Method,
60        headers: &mut HeaderMap,
61        canonicalized_resource: &str,
62        full_resource_url: &str,
63    ) -> Result<()>;
64
65    fn retry_limit(&self) -> u8;
66}
67
68struct ObjectStorageReader<T>
69where
70    T: ObjectStorageState,
71{
72    blob_id: String,
73    connection: Arc<Connection>,
74    state: Arc<T>,
75    metrics: Arc<BackendMetrics>,
76}
77
78impl<T> BlobReader for ObjectStorageReader<T>
79where
80    T: ObjectStorageState,
81{
82    fn blob_size(&self) -> BackendResult<u64> {
83        let (resource, url) = self.state.url(&self.blob_id, &[]);
84        let mut headers = HeaderMap::new();
85
86        self.state
87            .sign(Method::HEAD, &mut headers, resource.as_str(), url.as_str())
88            .map_err(ObjectStorageError::Auth)?;
89
90        let resp = self
91            .connection
92            .call::<&[u8]>(Method::HEAD, url.as_str(), None, None, &mut headers, true)
93            .map_err(ObjectStorageError::Request)?;
94        let content_length = resp
95            .headers()
96            .get(CONTENT_LENGTH)
97            .ok_or_else(|| ObjectStorageError::Response("invalid content length".to_string()))?;
98
99        Ok(content_length
100            .to_str()
101            .map_err(|err| {
102                ObjectStorageError::Response(format!("invalid content length: {:?}", err))
103            })?
104            .parse::<u64>()
105            .map_err(|err| {
106                ObjectStorageError::Response(format!("invalid content length: {:?}", err))
107            })?)
108    }
109
110    fn try_read(&self, mut buf: &mut [u8], offset: u64) -> BackendResult<usize> {
111        let query = &[];
112        let (resource, url) = self.state.url(&self.blob_id, query);
113        let mut headers = HeaderMap::new();
114        let end_at = offset + buf.len() as u64 - 1;
115        let range = format!("bytes={}-{}", offset, end_at);
116
117        headers.insert(
118            "Range",
119            range
120                .as_str()
121                .parse()
122                .map_err(|e| ObjectStorageError::ConstructHeader(format!("{}", e)))?,
123        );
124        self.state
125            .sign(Method::GET, &mut headers, resource.as_str(), url.as_str())
126            .map_err(ObjectStorageError::Auth)?;
127
128        // Safe because the the call() is a synchronous operation.
129        let mut resp = self
130            .connection
131            .call::<&[u8]>(Method::GET, url.as_str(), None, None, &mut headers, true)
132            .map_err(ObjectStorageError::Request)?;
133        Ok(resp
134            .copy_to(&mut buf)
135            .map_err(ObjectStorageError::Transport)
136            .map(|size| size as usize)?)
137    }
138
139    fn metrics(&self) -> &BackendMetrics {
140        &self.metrics
141    }
142
143    fn retry_limit(&self) -> u8 {
144        self.state.retry_limit()
145    }
146}
147
148#[derive(Debug)]
149pub struct ObjectStorage<T>
150where
151    T: ObjectStorageState,
152{
153    connection: Arc<Connection>,
154    state: Arc<T>,
155    metrics: Option<Arc<BackendMetrics>>,
156    #[allow(unused)]
157    id: Option<String>,
158}
159
160impl<T> ObjectStorage<T>
161where
162    T: ObjectStorageState,
163{
164    pub(crate) fn new_object_storage(
165        connection: Arc<Connection>,
166        state: Arc<T>,
167        metrics: Option<Arc<BackendMetrics>>,
168        id: Option<String>,
169    ) -> Self {
170        ObjectStorage {
171            connection,
172            state,
173            metrics,
174            id,
175        }
176    }
177}
178
179impl<T: 'static> BlobBackend for ObjectStorage<T>
180where
181    T: ObjectStorageState,
182{
183    fn shutdown(&self) {
184        self.connection.shutdown();
185    }
186
187    fn metrics(&self) -> &BackendMetrics {
188        // `metrics()` is only used for nydusd, which will always provide valid `blob_id`, thus
189        // `self.metrics` has valid value.
190        self.metrics.as_ref().unwrap()
191    }
192
193    fn get_reader(&self, blob_id: &str) -> BackendResult<Arc<dyn BlobReader>> {
194        if let Some(metrics) = self.metrics.as_ref() {
195            Ok(Arc::new(ObjectStorageReader {
196                blob_id: blob_id.to_string(),
197                state: self.state.clone(),
198                connection: self.connection.clone(),
199                metrics: metrics.clone(),
200            }))
201        } else {
202            Err(BackendError::Unsupported(
203                "no metrics object available for OssReader".to_string(),
204            ))
205        }
206    }
207}
208
209impl<T> Drop for ObjectStorage<T>
210where
211    T: ObjectStorageState,
212{
213    fn drop(&mut self) {
214        if let Some(metrics) = self.metrics.as_ref() {
215            metrics.release().unwrap_or_else(|e| error!("{:?}", e));
216        }
217    }
218}