1use std::fmt;
9use std::fmt::Debug;
10use std::io::{Error, Result};
11use std::marker::Send;
12use std::sync::Arc;
13
14use reqwest::header::{HeaderMap, CONTENT_LENGTH};
15use reqwest::Method;
16
17use nydus_utils::metrics::BackendMetrics;
18
19use super::connection::{Connection, ConnectionError};
20use super::{BackendError, BackendResult, BlobBackend, BlobReader};
21
22#[derive(Debug)]
24pub enum ObjectStorageError {
25 Auth(Error),
26 Request(ConnectionError),
27 ConstructHeader(String),
28 Transport(reqwest::Error),
29 Response(String),
30}
31
32impl fmt::Display for ObjectStorageError {
33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 match self {
35 ObjectStorageError::Auth(e) => write!(f, "failed to generate auth info, {}", e),
36 ObjectStorageError::Request(e) => write!(f, "network communication error, {}", e),
37 ObjectStorageError::ConstructHeader(e) => {
38 write!(f, "failed to generate HTTP header, {}", e)
39 }
40 ObjectStorageError::Transport(e) => write!(f, "network communication error, {}", e),
41 ObjectStorageError::Response(s) => write!(f, "network communication error, {}", s),
42 }
43 }
44}
45
46impl From<ObjectStorageError> for BackendError {
47 fn from(err: ObjectStorageError) -> Self {
48 BackendError::ObjectStorage(err)
49 }
50}
51
52pub trait ObjectStorageState: Send + Sync + Debug {
53 fn url(&self, object_key: &str, query: &[&str]) -> (String, String);
55
56 fn sign(
58 &self,
59 verb: Method,
60 headers: &mut HeaderMap,
61 canonicalized_resource: &str,
62 full_resource_url: &str,
63 ) -> Result<()>;
64
65 fn retry_limit(&self) -> u8;
66}
67
68struct ObjectStorageReader<T>
69where
70 T: ObjectStorageState,
71{
72 blob_id: String,
73 connection: Arc<Connection>,
74 state: Arc<T>,
75 metrics: Arc<BackendMetrics>,
76}
77
78impl<T> BlobReader for ObjectStorageReader<T>
79where
80 T: ObjectStorageState,
81{
82 fn blob_size(&self) -> BackendResult<u64> {
83 let (resource, url) = self.state.url(&self.blob_id, &[]);
84 let mut headers = HeaderMap::new();
85
86 self.state
87 .sign(Method::HEAD, &mut headers, resource.as_str(), url.as_str())
88 .map_err(ObjectStorageError::Auth)?;
89
90 let resp = self
91 .connection
92 .call::<&[u8]>(Method::HEAD, url.as_str(), None, None, &mut headers, true)
93 .map_err(ObjectStorageError::Request)?;
94 let content_length = resp
95 .headers()
96 .get(CONTENT_LENGTH)
97 .ok_or_else(|| ObjectStorageError::Response("invalid content length".to_string()))?;
98
99 Ok(content_length
100 .to_str()
101 .map_err(|err| {
102 ObjectStorageError::Response(format!("invalid content length: {:?}", err))
103 })?
104 .parse::<u64>()
105 .map_err(|err| {
106 ObjectStorageError::Response(format!("invalid content length: {:?}", err))
107 })?)
108 }
109
110 fn try_read(&self, mut buf: &mut [u8], offset: u64) -> BackendResult<usize> {
111 let query = &[];
112 let (resource, url) = self.state.url(&self.blob_id, query);
113 let mut headers = HeaderMap::new();
114 let end_at = offset + buf.len() as u64 - 1;
115 let range = format!("bytes={}-{}", offset, end_at);
116
117 headers.insert(
118 "Range",
119 range
120 .as_str()
121 .parse()
122 .map_err(|e| ObjectStorageError::ConstructHeader(format!("{}", e)))?,
123 );
124 self.state
125 .sign(Method::GET, &mut headers, resource.as_str(), url.as_str())
126 .map_err(ObjectStorageError::Auth)?;
127
128 let mut resp = self
130 .connection
131 .call::<&[u8]>(Method::GET, url.as_str(), None, None, &mut headers, true)
132 .map_err(ObjectStorageError::Request)?;
133 Ok(resp
134 .copy_to(&mut buf)
135 .map_err(ObjectStorageError::Transport)
136 .map(|size| size as usize)?)
137 }
138
139 fn metrics(&self) -> &BackendMetrics {
140 &self.metrics
141 }
142
143 fn retry_limit(&self) -> u8 {
144 self.state.retry_limit()
145 }
146}
147
148#[derive(Debug)]
149pub struct ObjectStorage<T>
150where
151 T: ObjectStorageState,
152{
153 connection: Arc<Connection>,
154 state: Arc<T>,
155 metrics: Option<Arc<BackendMetrics>>,
156 #[allow(unused)]
157 id: Option<String>,
158}
159
160impl<T> ObjectStorage<T>
161where
162 T: ObjectStorageState,
163{
164 pub(crate) fn new_object_storage(
165 connection: Arc<Connection>,
166 state: Arc<T>,
167 metrics: Option<Arc<BackendMetrics>>,
168 id: Option<String>,
169 ) -> Self {
170 ObjectStorage {
171 connection,
172 state,
173 metrics,
174 id,
175 }
176 }
177}
178
179impl<T: 'static> BlobBackend for ObjectStorage<T>
180where
181 T: ObjectStorageState,
182{
183 fn shutdown(&self) {
184 self.connection.shutdown();
185 }
186
187 fn metrics(&self) -> &BackendMetrics {
188 self.metrics.as_ref().unwrap()
191 }
192
193 fn get_reader(&self, blob_id: &str) -> BackendResult<Arc<dyn BlobReader>> {
194 if let Some(metrics) = self.metrics.as_ref() {
195 Ok(Arc::new(ObjectStorageReader {
196 blob_id: blob_id.to_string(),
197 state: self.state.clone(),
198 connection: self.connection.clone(),
199 metrics: metrics.clone(),
200 }))
201 } else {
202 Err(BackendError::Unsupported(
203 "no metrics object available for OssReader".to_string(),
204 ))
205 }
206 }
207}
208
209impl<T> Drop for ObjectStorage<T>
210where
211 T: ObjectStorageState,
212{
213 fn drop(&mut self) {
214 if let Some(metrics) = self.metrics.as_ref() {
215 metrics.release().unwrap_or_else(|e| error!("{:?}", e));
216 }
217 }
218}