Skip to main content

nydus_storage/backend/
mod.rs

1// Copyright 2020 Ant Group. All rights reserved.
2// Copyright (C) 2021 Alibaba Cloud. All rights reserved.
3//
4// SPDX-License-Identifier: Apache-2.0
5
6//! Storage backends to read blob data from Registry, OSS, disk, file system etc.
7//!
8//! There are several types of storage backend drivers implemented:
9//! - [Registry](registry/struct.Registry.html): backend driver to access blobs on container image
10//!   registry.
11//! - [Oss](oss/struct.Oss.html): backend driver to access blobs on Oss(Object Storage System).
12//! - [LocalFs](localfs/struct.LocalFs.html): backend driver to access blobs on local file system.
13//!   The [LocalFs](localfs/struct.LocalFs.html) storage backend supports backend level data
14//!   prefetching, which is to load data into page cache.
15//! - [LocalDisk](localdisk/struct.LocalDisk.html): backend driver to access blobs on local disk.
16
17use std::fmt;
18use std::io::Read;
19use std::{sync::Arc, time::Duration};
20
21use fuse_backend_rs::file_buf::FileVolatileSlice;
22use nydus_utils::{
23    metrics::{BackendMetrics, ERROR_HOLDER},
24    DelayType, Delayer,
25};
26
27use crate::utils::{alloc_buf, copyv};
28use crate::StorageError;
29
30#[cfg(any(
31    feature = "backend-oss",
32    feature = "backend-registry",
33    feature = "backend-s3",
34    feature = "backend-http-proxy",
35))]
36pub mod connection;
37#[cfg(feature = "backend-http-proxy")]
38pub mod http_proxy;
39#[cfg(feature = "backend-localdisk")]
40pub mod localdisk;
41#[cfg(feature = "backend-localfs")]
42pub mod localfs;
43#[cfg(any(feature = "backend-oss", feature = "backend-s3"))]
44pub mod object_storage;
45#[cfg(feature = "backend-oss")]
46pub mod oss;
47#[cfg(feature = "backend-registry")]
48pub mod registry;
49#[cfg(feature = "backend-s3")]
50pub mod s3;
51
52/// Error codes related to storage backend operations.
53#[derive(Debug)]
54pub enum BackendError {
55    /// Unsupported operation.
56    Unsupported(String),
57    /// Failed to copy data from/into blob.
58    CopyData(StorageError),
59    #[cfg(feature = "backend-localdisk")]
60    /// Error from LocalDisk storage backend.
61    LocalDisk(self::localdisk::LocalDiskError),
62    #[cfg(feature = "backend-registry")]
63    /// Error from Registry storage backend.
64    Registry(self::registry::RegistryError),
65    #[cfg(feature = "backend-localfs")]
66    /// Error from LocalFs storage backend.
67    LocalFs(self::localfs::LocalFsError),
68    #[cfg(any(feature = "backend-oss", feature = "backend-s3"))]
69    /// Error from object storage backend.
70    ObjectStorage(self::object_storage::ObjectStorageError),
71    #[cfg(feature = "backend-http-proxy")]
72    /// Error from local http proxy backend.
73    HttpProxy(self::http_proxy::HttpProxyError),
74}
75
76impl fmt::Display for BackendError {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        match self {
79            BackendError::Unsupported(s) => write!(f, "{}", s),
80            BackendError::CopyData(e) => write!(f, "failed to copy data, {}", e),
81            #[cfg(feature = "backend-registry")]
82            BackendError::Registry(e) => write!(f, "{:?}", e),
83            #[cfg(feature = "backend-localfs")]
84            BackendError::LocalFs(e) => write!(f, "{}", e),
85            #[cfg(any(feature = "backend-oss", feature = "backend-s3"))]
86            BackendError::ObjectStorage(e) => write!(f, "{}", e),
87            #[cfg(feature = "backend-localdisk")]
88            BackendError::LocalDisk(e) => write!(f, "{:?}", e),
89            #[cfg(feature = "backend-http-proxy")]
90            BackendError::HttpProxy(e) => write!(f, "{}", e),
91        }
92    }
93}
94
95/// Specialized `Result` for storage backends.
96pub type BackendResult<T> = std::result::Result<T, BackendError>;
97
98/// Trait to read data from a on storage backend.
99pub trait BlobReader: Send + Sync {
100    /// Get size of the blob file.
101    fn blob_size(&self) -> BackendResult<u64>;
102
103    /// Try to read a range of data from the blob file into the provided buffer.
104    ///
105    /// Try to read data of range [offset, offset + buf.len()) from the blob file, and returns:
106    /// - bytes of data read, which may be smaller than buf.len()
107    /// - error code if error happens
108    fn try_read(&self, buf: &mut [u8], offset: u64) -> BackendResult<usize>;
109
110    /// Read a range of data from the blob file into the provided buffer.
111    ///
112    /// Read data of range [offset, offset + buf.len()) from the blob file, and returns:
113    /// - bytes of data read, which may be smaller than buf.len()
114    /// - error code if error happens
115    ///
116    /// It will try `BlobBackend::retry_limit()` times at most and return the first successfully
117    /// read data.
118    fn read(&self, buf: &mut [u8], offset: u64) -> BackendResult<usize> {
119        let mut retry_count = self.retry_limit();
120        let begin_time = self.metrics().begin();
121
122        let mut delayer = Delayer::new(DelayType::BackOff, Duration::from_millis(500));
123
124        loop {
125            match self.try_read(buf, offset) {
126                Ok(size) => {
127                    self.metrics().end(&begin_time, buf.len(), false);
128                    return Ok(size);
129                }
130                Err(err) => {
131                    if retry_count > 0 {
132                        warn!(
133                            "Read from backend failed: {:?}, retry count {}",
134                            err, retry_count
135                        );
136                        retry_count -= 1;
137                        delayer.delay();
138                    } else {
139                        self.metrics().end(&begin_time, buf.len(), true);
140                        ERROR_HOLDER
141                            .lock()
142                            .unwrap()
143                            .push(&format!("{:?}", err))
144                            .unwrap_or_else(|_| error!("Failed when try to hold error"));
145                        return Err(err);
146                    }
147                }
148            }
149        }
150    }
151
152    /// Read as much as possible data into buffer.
153    fn read_all(&self, buf: &mut [u8], offset: u64) -> BackendResult<usize> {
154        let mut off = 0usize;
155        let mut left = buf.len();
156
157        while left > 0 {
158            let cnt = self.read(&mut buf[off..], offset + off as u64)?;
159            if cnt == 0 {
160                break;
161            }
162            off += cnt;
163            left -= cnt;
164        }
165
166        Ok(off as usize)
167    }
168
169    /// Read a range of data from the blob file into the provided buffers.
170    ///
171    /// Read data of range [offset, offset + max_size) from the blob file, and returns:
172    /// - bytes of data read, which may be smaller than max_size
173    /// - error code if error happens
174    ///
175    /// It will try `BlobBackend::retry_limit()` times at most and return the first successfully
176    /// read data.
177    fn readv(
178        &self,
179        bufs: &[FileVolatileSlice],
180        offset: u64,
181        max_size: usize,
182    ) -> BackendResult<usize> {
183        if bufs.len() == 1 && max_size >= bufs[0].len() {
184            let buf = unsafe { std::slice::from_raw_parts_mut(bufs[0].as_ptr(), bufs[0].len()) };
185            self.read(buf, offset)
186        } else {
187            // Use std::alloc to avoid zeroing the allocated buffer.
188            let size = bufs.iter().fold(0usize, move |size, s| size + s.len());
189            let size = std::cmp::min(size, max_size);
190            let mut data = alloc_buf(size);
191
192            let result = self.read(&mut data, offset)?;
193            copyv(&[&data], bufs, 0, result, 0, 0)
194                .map(|r| r.0)
195                .map_err(BackendError::CopyData)
196        }
197    }
198
199    /// Get metrics object.
200    fn metrics(&self) -> &BackendMetrics;
201
202    /// Get maximum number of times to retry when encountering IO errors.
203    fn retry_limit(&self) -> u8 {
204        0
205    }
206}
207
208/// Trait to access blob files on backend storages, such as OSS, registry, local fs etc.
209pub trait BlobBackend: Send + Sync {
210    /// Destroy the `BlobBackend` storage object.
211    fn shutdown(&self);
212
213    /// Get metrics object.
214    fn metrics(&self) -> &BackendMetrics;
215
216    /// Get a blob reader object to access blob `blob_id`.
217    fn get_reader(&self, blob_id: &str) -> BackendResult<Arc<dyn BlobReader>>;
218}
219
220/// A buffered reader for `BlobReader` object.
221pub struct BlobBufReader {
222    buf: Vec<u8>,
223    pos: usize,
224    len: usize,
225    start: u64,
226    size: u64,
227    reader: Arc<dyn BlobReader>,
228}
229
230impl BlobBufReader {
231    /// Create a new instance of `BlobBufReader`.
232    pub fn new(buf_size: usize, reader: Arc<dyn BlobReader>, start: u64, size: u64) -> Self {
233        Self {
234            buf: alloc_buf(buf_size),
235            pos: 0,
236            len: 0,
237            start,
238            size,
239            reader,
240        }
241    }
242}
243
244impl Read for BlobBufReader {
245    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
246        let mut sz = self.len;
247        if sz == 0 && self.size == 0 {
248            // No more data.
249            return Ok(0);
250        }
251
252        // Refill the buffer.
253        if sz == 0 && self.size > 0 {
254            let cnt = std::cmp::min(self.buf.len() as u64, self.size) as usize;
255            let ret = self
256                .reader
257                .read(&mut self.buf[..cnt], self.start)
258                .map_err(|e| eio!(format!("failed to read data from backend, {:?}", e)))?;
259            self.start += ret as u64;
260            self.size -= ret as u64;
261            self.pos = 0;
262            self.len = ret;
263            sz = ret;
264        }
265        if self.size != 0 && sz == 0 {
266            return Err(eio!("unexpected EOF when reading data from backend"));
267        }
268
269        let sz = std::cmp::min(sz, buf.len());
270        buf[..sz].copy_from_slice(&self.buf[self.pos..self.pos + sz]);
271        self.pos += sz;
272        self.len -= sz;
273
274        Ok(sz)
275    }
276}