1use std::fmt;
18use std::io::Read;
19use std::{sync::Arc, time::Duration};
20
21use fuse_backend_rs::file_buf::FileVolatileSlice;
22use nydus_utils::{
23 metrics::{BackendMetrics, ERROR_HOLDER},
24 DelayType, Delayer,
25};
26
27use crate::utils::{alloc_buf, copyv};
28use crate::StorageError;
29
30#[cfg(any(
31 feature = "backend-oss",
32 feature = "backend-registry",
33 feature = "backend-s3",
34 feature = "backend-http-proxy",
35))]
36pub mod connection;
37#[cfg(feature = "backend-http-proxy")]
38pub mod http_proxy;
39#[cfg(feature = "backend-localdisk")]
40pub mod localdisk;
41#[cfg(feature = "backend-localfs")]
42pub mod localfs;
43#[cfg(any(feature = "backend-oss", feature = "backend-s3"))]
44pub mod object_storage;
45#[cfg(feature = "backend-oss")]
46pub mod oss;
47#[cfg(feature = "backend-registry")]
48pub mod registry;
49#[cfg(feature = "backend-s3")]
50pub mod s3;
51
52#[derive(Debug)]
54pub enum BackendError {
55 Unsupported(String),
57 CopyData(StorageError),
59 #[cfg(feature = "backend-localdisk")]
60 LocalDisk(self::localdisk::LocalDiskError),
62 #[cfg(feature = "backend-registry")]
63 Registry(self::registry::RegistryError),
65 #[cfg(feature = "backend-localfs")]
66 LocalFs(self::localfs::LocalFsError),
68 #[cfg(any(feature = "backend-oss", feature = "backend-s3"))]
69 ObjectStorage(self::object_storage::ObjectStorageError),
71 #[cfg(feature = "backend-http-proxy")]
72 HttpProxy(self::http_proxy::HttpProxyError),
74}
75
76impl fmt::Display for BackendError {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 match self {
79 BackendError::Unsupported(s) => write!(f, "{}", s),
80 BackendError::CopyData(e) => write!(f, "failed to copy data, {}", e),
81 #[cfg(feature = "backend-registry")]
82 BackendError::Registry(e) => write!(f, "{:?}", e),
83 #[cfg(feature = "backend-localfs")]
84 BackendError::LocalFs(e) => write!(f, "{}", e),
85 #[cfg(any(feature = "backend-oss", feature = "backend-s3"))]
86 BackendError::ObjectStorage(e) => write!(f, "{}", e),
87 #[cfg(feature = "backend-localdisk")]
88 BackendError::LocalDisk(e) => write!(f, "{:?}", e),
89 #[cfg(feature = "backend-http-proxy")]
90 BackendError::HttpProxy(e) => write!(f, "{}", e),
91 }
92 }
93}
94
95pub type BackendResult<T> = std::result::Result<T, BackendError>;
97
98pub trait BlobReader: Send + Sync {
100 fn blob_size(&self) -> BackendResult<u64>;
102
103 fn try_read(&self, buf: &mut [u8], offset: u64) -> BackendResult<usize>;
109
110 fn read(&self, buf: &mut [u8], offset: u64) -> BackendResult<usize> {
119 let mut retry_count = self.retry_limit();
120 let begin_time = self.metrics().begin();
121
122 let mut delayer = Delayer::new(DelayType::BackOff, Duration::from_millis(500));
123
124 loop {
125 match self.try_read(buf, offset) {
126 Ok(size) => {
127 self.metrics().end(&begin_time, buf.len(), false);
128 return Ok(size);
129 }
130 Err(err) => {
131 if retry_count > 0 {
132 warn!(
133 "Read from backend failed: {:?}, retry count {}",
134 err, retry_count
135 );
136 retry_count -= 1;
137 delayer.delay();
138 } else {
139 self.metrics().end(&begin_time, buf.len(), true);
140 ERROR_HOLDER
141 .lock()
142 .unwrap()
143 .push(&format!("{:?}", err))
144 .unwrap_or_else(|_| error!("Failed when try to hold error"));
145 return Err(err);
146 }
147 }
148 }
149 }
150 }
151
152 fn read_all(&self, buf: &mut [u8], offset: u64) -> BackendResult<usize> {
154 let mut off = 0usize;
155 let mut left = buf.len();
156
157 while left > 0 {
158 let cnt = self.read(&mut buf[off..], offset + off as u64)?;
159 if cnt == 0 {
160 break;
161 }
162 off += cnt;
163 left -= cnt;
164 }
165
166 Ok(off as usize)
167 }
168
169 fn readv(
178 &self,
179 bufs: &[FileVolatileSlice],
180 offset: u64,
181 max_size: usize,
182 ) -> BackendResult<usize> {
183 if bufs.len() == 1 && max_size >= bufs[0].len() {
184 let buf = unsafe { std::slice::from_raw_parts_mut(bufs[0].as_ptr(), bufs[0].len()) };
185 self.read(buf, offset)
186 } else {
187 let size = bufs.iter().fold(0usize, move |size, s| size + s.len());
189 let size = std::cmp::min(size, max_size);
190 let mut data = alloc_buf(size);
191
192 let result = self.read(&mut data, offset)?;
193 copyv(&[&data], bufs, 0, result, 0, 0)
194 .map(|r| r.0)
195 .map_err(BackendError::CopyData)
196 }
197 }
198
199 fn metrics(&self) -> &BackendMetrics;
201
202 fn retry_limit(&self) -> u8 {
204 0
205 }
206}
207
208pub trait BlobBackend: Send + Sync {
210 fn shutdown(&self);
212
213 fn metrics(&self) -> &BackendMetrics;
215
216 fn get_reader(&self, blob_id: &str) -> BackendResult<Arc<dyn BlobReader>>;
218}
219
220pub struct BlobBufReader {
222 buf: Vec<u8>,
223 pos: usize,
224 len: usize,
225 start: u64,
226 size: u64,
227 reader: Arc<dyn BlobReader>,
228}
229
230impl BlobBufReader {
231 pub fn new(buf_size: usize, reader: Arc<dyn BlobReader>, start: u64, size: u64) -> Self {
233 Self {
234 buf: alloc_buf(buf_size),
235 pos: 0,
236 len: 0,
237 start,
238 size,
239 reader,
240 }
241 }
242}
243
244impl Read for BlobBufReader {
245 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
246 let mut sz = self.len;
247 if sz == 0 && self.size == 0 {
248 return Ok(0);
250 }
251
252 if sz == 0 && self.size > 0 {
254 let cnt = std::cmp::min(self.buf.len() as u64, self.size) as usize;
255 let ret = self
256 .reader
257 .read(&mut self.buf[..cnt], self.start)
258 .map_err(|e| eio!(format!("failed to read data from backend, {:?}", e)))?;
259 self.start += ret as u64;
260 self.size -= ret as u64;
261 self.pos = 0;
262 self.len = ret;
263 sz = ret;
264 }
265 if self.size != 0 && sz == 0 {
266 return Err(eio!("unexpected EOF when reading data from backend"));
267 }
268
269 let sz = std::cmp::min(sz, buf.len());
270 buf[..sz].copy_from_slice(&self.buf[self.pos..self.pos + sz]);
271 self.pos += sz;
272 self.len -= sz;
273
274 Ok(sz)
275 }
276}