Skip to main content

nydus_storage/
device.rs

1// Copyright 2020 Ant Group. All rights reserved.
2// Copyright (C) 2021 Alibaba Cloud. All rights reserved.
3//
4// SPDX-License-Identifier: Apache-2.0
5
6//! Blob Storage Public Service APIs
7//!
8//! The core functionality of the nydus-storage crate is to serve blob IO request, mainly read chunk
9//! data from blobs. This module provides public APIs and data structures for clients to issue blob
10//! IO requests. The main traits and structs provided include:
11//! - [BlobChunkInfo](trait.BlobChunkInfo.html): trait to provide basic information for a  chunk.
12//! - [BlobDevice](struct.BlobDevice.html): a wrapping object over a group of underlying [BlobCache]
13//!   object to serve blob data access requests.
14//! - [BlobInfo](struct.BlobInfo.html): configuration information for a metadata/data blob object.
15//! - [BlobIoChunk](enum.BlobIoChunk.html): an enumeration to encapsulate different [BlobChunkInfo]
16//!   implementations for [BlobIoDesc].
17//! - [BlobIoDesc](struct.BlobIoDesc.html): a blob IO descriptor, containing information for a
18//!   continuous IO range within a chunk.
19//! - [BlobIoVec](struct.BlobIoVec.html): a scatter/gather list for blob IO operation, containing
20//!   one or more blob IO descriptors
21//! - [BlobPrefetchRequest](struct.BlobPrefetchRequest.html): a blob data prefetching request.
22use std::any::Any;
23use std::collections::hash_map::Drain;
24use std::collections::HashMap;
25use std::convert::TryFrom;
26use std::fmt::{Debug, Formatter};
27use std::fs::File;
28use std::io::{self, Error};
29use std::ops::Deref;
30use std::os::unix::io::AsRawFd;
31use std::path::Path;
32use std::sync::{Arc, Mutex};
33
34use arc_swap::ArcSwap;
35use fuse_backend_rs::api::filesystem::ZeroCopyWriter;
36use fuse_backend_rs::file_buf::FileVolatileSlice;
37use fuse_backend_rs::file_traits::FileReadWriteVolatile;
38
39use nydus_api::ConfigV2;
40use nydus_utils::compress;
41use nydus_utils::crypt::{self, Cipher, CipherContext};
42use nydus_utils::digest::{self, RafsDigest};
43
44use crate::cache::BlobCache;
45use crate::factory::BLOB_FACTORY;
46
47pub(crate) const BLOB_FEATURE_INCOMPAT_MASK: u32 = 0x0000_ffff;
48pub(crate) const BLOB_FEATURE_INCOMPAT_VALUE: u32 = 0x0000_0fff;
49
50bitflags! {
51    /// Features bits for blob management.
52    pub struct BlobFeatures: u32 {
53        /// Uncompressed chunk data is aligned.
54        const ALIGNED = 0x0000_0001;
55        /// RAFS meta data is inlined in the data blob.
56        const INLINED_FS_META = 0x0000_0002;
57        /// Blob chunk information format v2.
58        const CHUNK_INFO_V2 = 0x0000_0004;
59        /// Blob compression information data include context data for zlib random access.
60        const ZRAN = 0x0000_0008;
61        /// Blob data and blob meta are stored in separate blobs.
62        const SEPARATE = 0x0000_0010;
63        /// Chunk digest array is inlined in the data blob.
64        const INLINED_CHUNK_DIGEST = 0x0000_0020;
65        /// Blob is for RAFS filesystems in TARFS mode.
66        const TARFS = 0x0000_0040;
67        /// Small file chunk are merged into batch chunk.
68        const BATCH = 0x0000_0080;
69        /// Whether the Blob is encrypted.
70        const ENCRYPTED = 0x0000_0100;
71        /// Blob has TAR headers to separate contents.
72        const HAS_TAR_HEADER = 0x1000_0000;
73        /// Blob has Table of Content (ToC) at the tail.
74        const HAS_TOC = 0x2000_0000;
75        /// Data blob are encoded with Tar header and optionally ToC.
76        /// It's also a flag indicating that images are generated with `nydus-image` v2.2 or newer.
77        const CAP_TAR_TOC = 0x4000_0000;
78        /// Rafs V5 image without extended blob table, this is an internal flag.
79        const _V5_NO_EXT_BLOB_TABLE = 0x8000_0000;
80        /// Blob is generated with chunkdict.
81        const IS_CHUNKDICT_GENERATED = 0x0000_0200;
82        /// Blob is generated with separated prefetch files.
83        const IS_SEPARATED_WITH_PREFETCH_FILES = 0x0000_0400;
84        /// Blob is stored in an external storage backend.
85        const EXTERNAL = 0x0000_0800;
86    }
87}
88
89impl Default for BlobFeatures {
90    fn default() -> Self {
91        BlobFeatures::empty()
92    }
93}
94
95impl BlobFeatures {
96    /// Check whether the blob is for RAFS filesystems in TARFS mode.
97    pub fn is_tarfs(&self) -> bool {
98        self.contains(BlobFeatures::CAP_TAR_TOC) && self.contains(BlobFeatures::TARFS)
99    }
100}
101
102impl TryFrom<u32> for BlobFeatures {
103    type Error = Error;
104
105    fn try_from(value: u32) -> Result<Self, Self::Error> {
106        if value & BLOB_FEATURE_INCOMPAT_MASK & !BLOB_FEATURE_INCOMPAT_VALUE != 0
107            || value & BlobFeatures::_V5_NO_EXT_BLOB_TABLE.bits() != 0
108        {
109            Err(einval!(format!("invalid blob features: 0x{:x}", value)))
110        } else {
111            // Safe because we have just validated feature flags.
112            Ok(unsafe { BlobFeatures::from_bits_unchecked(value) })
113        }
114    }
115}
116
117/// Configuration information for a metadata/data blob object.
118///
119/// The `BlobInfo` structure provides information for the storage subsystem to manage a blob file
120/// and serve blob IO requests for clients.
121#[derive(Clone, Debug, Default)]
122pub struct BlobInfo {
123    /// The index of blob in RAFS blob table.
124    blob_index: u32,
125    /// A sha256 hex string generally.
126    blob_id: String,
127    /// Feature bits for blob management.
128    blob_features: BlobFeatures,
129    /// Size of the compressed blob file.
130    compressed_size: u64,
131    /// Size of the uncompressed blob file, or the cache file.
132    uncompressed_size: u64,
133    /// Chunk size.
134    chunk_size: u32,
135    /// Number of chunks in blob file.
136    /// A helper to distinguish bootstrap with extended blob table or not:
137    ///     Bootstrap with extended blob table always has non-zero `chunk_count`
138    chunk_count: u32,
139    /// Compression algorithm to process the blob.
140    compressor: compress::Algorithm,
141    /// Chunk data encryption algorithm.
142    cipher: crypt::Algorithm,
143    /// Message digest algorithm to process the blob.
144    digester: digest::Algorithm,
145    /// Starting offset of the data to prefetch.
146    prefetch_offset: u32,
147    /// Size of blob data to prefetch.
148    prefetch_size: u32,
149    /// The blob is for a legacy estargz image.
150    is_legacy_stargz: bool,
151
152    /// V6: compressor that is used for compressing chunk info array.
153    meta_ci_compressor: u32,
154    /// V6: Offset of the chunk information array in the compressed blob.
155    meta_ci_offset: u64,
156    /// V6: Size of the compressed chunk information array.
157    meta_ci_compressed_size: u64,
158    /// V6: Size of the uncompressed chunk information array.
159    meta_ci_uncompressed_size: u64,
160
161    // SHA256 digest of blob ToC content, including the toc tar header.
162    // It's all zero for blobs with inlined-meta.
163    blob_toc_digest: [u8; 32],
164    // SHA256 digest of RAFS blob for ZRAN, containing `blob.meta`, `blob.digest` `blob.toc` and
165    // optionally 'image.boot`. It's all zero for ZRAN blobs with inlined-meta, so need special
166    // handling.
167    blob_meta_digest: [u8; 32],
168    // Size of RAFS blob for ZRAN. It's zero ZRAN blobs with inlined-meta.
169    blob_meta_size: u64,
170    // Size of blob ToC content, it's zero for blobs with inlined-meta.
171    blob_toc_size: u32,
172
173    /// V6: support fs-cache mode
174    fs_cache_file: Option<Arc<File>>,
175    /// V6: support inlined-meta
176    meta_path: Arc<Mutex<String>>,
177    /// V6: support data encryption.
178    cipher_object: Arc<Cipher>,
179    /// Cipher context for encryption.
180    cipher_ctx: Option<CipherContext>,
181
182    /// is chunkdict generated
183    is_chunkdict_generated: bool,
184}
185
186impl BlobInfo {
187    /// Create a new instance of `BlobInfo`.
188    pub fn new(
189        blob_index: u32,
190        blob_id: String,
191        uncompressed_size: u64,
192        compressed_size: u64,
193        chunk_size: u32,
194        chunk_count: u32,
195        blob_features: BlobFeatures,
196    ) -> Self {
197        let blob_id = blob_id.trim_end_matches('\0').to_string();
198        let mut blob_info = BlobInfo {
199            blob_index,
200            blob_id,
201            blob_features,
202            uncompressed_size,
203            compressed_size,
204            chunk_size,
205            chunk_count,
206
207            compressor: compress::Algorithm::None,
208            cipher: crypt::Algorithm::None,
209            digester: digest::Algorithm::Blake3,
210            prefetch_offset: 0,
211            prefetch_size: 0,
212            is_legacy_stargz: false,
213            meta_ci_compressor: 0,
214            meta_ci_offset: 0,
215            meta_ci_compressed_size: 0,
216            meta_ci_uncompressed_size: 0,
217
218            blob_toc_digest: [0u8; 32],
219            blob_meta_digest: [0u8; 32],
220            blob_meta_size: 0,
221            blob_toc_size: 0,
222
223            fs_cache_file: None,
224            meta_path: Arc::new(Mutex::new(String::new())),
225            cipher_object: Default::default(),
226            cipher_ctx: None,
227
228            is_chunkdict_generated: false,
229        };
230
231        blob_info.compute_features();
232
233        blob_info
234    }
235
236    /// Set the chunk count
237    pub fn set_chunk_count(&mut self, count: usize) {
238        self.chunk_count = count as u32;
239    }
240
241    /// Set compressed size
242    pub fn set_compressed_size(&mut self, size: usize) {
243        self.compressed_size = size as u64;
244    }
245
246    /// Set uncompressed size
247    pub fn set_uncompressed_size(&mut self, size: usize) {
248        self.uncompressed_size = size as u64;
249    }
250
251    /// Set meta ci compressed size
252    pub fn set_meta_ci_compressed_size(&mut self, size: usize) {
253        self.meta_ci_compressed_size = size as u64;
254    }
255
256    /// Set meta ci uncompressed size
257    pub fn set_meta_ci_uncompressed_size(&mut self, size: usize) {
258        self.meta_ci_uncompressed_size = size as u64;
259    }
260
261    /// Set meta ci offset
262    pub fn set_meta_ci_offset(&mut self, size: usize) {
263        self.meta_ci_offset = size as u64;
264    }
265
266    /// Set the is_chunkdict_generated flag.
267    pub fn set_chunkdict_generated(&mut self, is_chunkdict_generated: bool) {
268        self.is_chunkdict_generated = is_chunkdict_generated;
269    }
270
271    /// Get the is_chunkdict_generated flag.
272    pub fn is_chunkdict_generated(&self) -> bool {
273        self.is_chunkdict_generated
274    }
275
276    /// Get the blob index in the blob array.
277    pub fn blob_index(&self) -> u32 {
278        self.blob_index
279    }
280
281    /// Get the id of the blob, with special handling of `inlined-meta` case.
282    pub fn blob_id(&self) -> String {
283        if (!self.has_feature(BlobFeatures::EXTERNAL)
284            && self.has_feature(BlobFeatures::INLINED_FS_META)
285            && !self.has_feature(BlobFeatures::SEPARATE))
286            || !self.has_feature(BlobFeatures::CAP_TAR_TOC)
287        {
288            let guard = self.meta_path.lock().unwrap();
289            if !guard.is_empty() {
290                return guard.deref().clone();
291            }
292        }
293        self.blob_id.clone()
294    }
295
296    /// Set the blob id
297    pub fn set_blob_id(&mut self, blob_id: String) {
298        self.blob_id = blob_id
299    }
300
301    /// Get raw blob id, without special handling of `inlined-meta` case.
302    pub fn raw_blob_id(&self) -> &str {
303        &self.blob_id
304    }
305
306    /// Get size of compressed chunk data, not including `blob.meta`, `blob.chunk`, `toc` etc.
307    pub fn compressed_data_size(&self) -> u64 {
308        if self.has_feature(BlobFeatures::SEPARATE) {
309            // It's the size of referenced OCIv1 targz blob.
310            self.compressed_size
311        } else if self.has_feature(BlobFeatures::CAP_TAR_TOC) {
312            // Image built with nydus 2.2 and newer versions.
313            if self.meta_ci_is_valid() {
314                // For RAFS v6
315                if self.has_feature(BlobFeatures::HAS_TAR_HEADER) {
316                    // There's a tar header between chunk data and compression information.
317                    self.meta_ci_offset - 0x200
318                } else {
319                    self.meta_ci_offset
320                }
321            } else {
322                // For RAFS v5
323                if self.has_feature(BlobFeatures::HAS_TAR_HEADER) {
324                    // There's a tar header between chunk data and fs meta data.
325                    self.compressed_size - 0x200
326                } else {
327                    self.compressed_size
328                }
329            }
330        } else {
331            // Images built with nydus 2.1 and previous versions.
332            self.compressed_size
333        }
334    }
335
336    /// Get size of the compressed blob, including `blob.meta`, `blob.chunk`, `toc` etc.
337    pub fn compressed_size(&self) -> u64 {
338        self.compressed_size
339    }
340
341    /// Get size of the uncompressed blob.
342    pub fn uncompressed_size(&self) -> u64 {
343        self.uncompressed_size
344    }
345
346    /// Get chunk size.
347    pub fn chunk_size(&self) -> u32 {
348        self.chunk_size
349    }
350
351    /// Get number of chunks in the blob.
352    pub fn chunk_count(&self) -> u32 {
353        self.chunk_count
354    }
355
356    /// Get the compression algorithm to handle the blob data.
357    pub fn compressor(&self) -> compress::Algorithm {
358        self.compressor
359    }
360
361    /// Set compression algorithm for the blob.
362    pub fn set_compressor(&mut self, compressor: compress::Algorithm) {
363        self.compressor = compressor;
364        self.compute_features();
365    }
366
367    /// Get the cipher algorithm to handle chunk data.
368    pub fn cipher(&self) -> crypt::Algorithm {
369        self.cipher
370    }
371
372    /// Set encryption algorithm for the blob.
373    pub fn set_cipher(&mut self, cipher: crypt::Algorithm) {
374        self.cipher = cipher;
375    }
376
377    /// Get the cipher object to encrypt/decrypt chunk data.
378    pub fn cipher_object(&self) -> Arc<Cipher> {
379        self.cipher_object.clone()
380    }
381
382    /// Get the cipher context.
383    pub fn cipher_context(&self) -> Option<CipherContext> {
384        self.cipher_ctx.clone()
385    }
386
387    /// Set the cipher info, including cipher algo, cipher object and cipher context.
388    pub fn set_cipher_info(
389        &mut self,
390        cipher: crypt::Algorithm,
391        cipher_object: Arc<Cipher>,
392        cipher_ctx: Option<CipherContext>,
393    ) {
394        self.cipher = cipher;
395        self.cipher_object = cipher_object;
396        self.cipher_ctx = cipher_ctx;
397    }
398
399    /// Get the message digest algorithm for the blob.
400    pub fn digester(&self) -> digest::Algorithm {
401        self.digester
402    }
403
404    /// Set compression algorithm for the blob.
405    pub fn set_digester(&mut self, digester: digest::Algorithm) {
406        self.digester = digester;
407    }
408
409    /// Get blob data prefetching offset.
410    pub fn prefetch_offset(&self) -> u64 {
411        self.prefetch_offset as u64
412    }
413
414    /// Get blob data prefetching offset.
415    pub fn prefetch_size(&self) -> u64 {
416        self.prefetch_size as u64
417    }
418
419    /// Set a range for blob data prefetching.
420    ///
421    /// Only one range could be configured per blob, and zero prefetch_size means disabling blob
422    /// data prefetching.
423    pub fn set_prefetch_info(&mut self, offset: u64, size: u64) {
424        self.prefetch_offset = offset as u32;
425        self.prefetch_size = size as u32;
426    }
427
428    /// Check whether this blob is for an stargz image.
429    pub fn is_legacy_stargz(&self) -> bool {
430        self.is_legacy_stargz
431    }
432
433    /// Set metadata information for a blob.
434    ///
435    /// The compressed blobs are laid out as:
436    /// `[compressed chunk data], [compressed metadata], [uncompressed header]`.
437    pub fn set_blob_meta_info(
438        &mut self,
439        offset: u64,
440        compressed_size: u64,
441        uncompressed_size: u64,
442        compressor: u32,
443    ) {
444        self.meta_ci_compressor = compressor;
445        self.meta_ci_offset = offset;
446        self.meta_ci_compressed_size = compressed_size;
447        self.meta_ci_uncompressed_size = uncompressed_size;
448    }
449
450    /// Get compression algorithm for chunk information array.
451    pub fn meta_ci_compressor(&self) -> compress::Algorithm {
452        if self.meta_ci_compressor == compress::Algorithm::Lz4Block as u32 {
453            compress::Algorithm::Lz4Block
454        } else if self.meta_ci_compressor == compress::Algorithm::GZip as u32 {
455            compress::Algorithm::GZip
456        } else if self.meta_ci_compressor == compress::Algorithm::Zstd as u32 {
457            compress::Algorithm::Zstd
458        } else {
459            compress::Algorithm::None
460        }
461    }
462
463    /// Get offset of chunk information array in the compressed blob.
464    pub fn meta_ci_offset(&self) -> u64 {
465        self.meta_ci_offset
466    }
467
468    /// Get size of the compressed chunk information array.
469    pub fn meta_ci_compressed_size(&self) -> u64 {
470        self.meta_ci_compressed_size
471    }
472
473    /// Get the uncompressed size of the chunk information array.
474    pub fn meta_ci_uncompressed_size(&self) -> u64 {
475        self.meta_ci_uncompressed_size
476    }
477
478    /// Check whether compression metadata is available.
479    pub fn meta_ci_is_valid(&self) -> bool {
480        self.meta_ci_compressed_size != 0 && self.meta_ci_uncompressed_size != 0
481    }
482
483    /// Set the associated `File` object provided by Linux fscache subsystem.
484    pub fn set_fscache_file(&mut self, file: Option<Arc<File>>) {
485        self.fs_cache_file = file;
486    }
487
488    #[cfg(target_os = "linux")]
489    /// Get the associated `File` object provided by Linux fscache subsystem.
490    pub(crate) fn get_fscache_file(&self) -> Option<Arc<File>> {
491        self.fs_cache_file.clone()
492    }
493
494    /// Get blob features.
495    pub fn features(&self) -> BlobFeatures {
496        self.blob_features
497    }
498
499    /// Check whether the requested features are available.
500    pub fn has_feature(&self, features: BlobFeatures) -> bool {
501        self.blob_features.bits() & features.bits() == features.bits()
502    }
503
504    pub fn is_external(&self) -> bool {
505        self.has_feature(BlobFeatures::EXTERNAL)
506    }
507
508    /// Generate feature flags according to blob configuration.
509    fn compute_features(&mut self) {
510        if self.chunk_count == 0 {
511            self.blob_features |= BlobFeatures::_V5_NO_EXT_BLOB_TABLE;
512        }
513        if self.compressor == compress::Algorithm::GZip
514            && !self.has_feature(BlobFeatures::CHUNK_INFO_V2)
515        {
516            self.is_legacy_stargz = true;
517        }
518    }
519
520    pub fn set_separated_with_prefetch_files_feature(&mut self, is_prefetchblob: bool) {
521        if is_prefetchblob {
522            self.blob_features |= BlobFeatures::IS_SEPARATED_WITH_PREFETCH_FILES;
523        }
524    }
525
526    /// Get SHA256 digest of the ToC content, including the toc tar header.
527    ///
528    /// It's all zero for inlined bootstrap.
529    pub fn blob_toc_digest(&self) -> &[u8; 32] {
530        &self.blob_toc_digest
531    }
532
533    /// Set SHA256 digest of the ToC content, including the toc tar header.
534    pub fn set_blob_toc_digest(&mut self, digest: [u8; 32]) {
535        self.blob_toc_digest = digest;
536    }
537
538    /// Get size of the ToC content. It's all zero for inlined bootstrap.
539    pub fn blob_toc_size(&self) -> u32 {
540        self.blob_toc_size
541    }
542
543    /// Set size of the ToC content.
544    pub fn set_blob_toc_size(&mut self, sz: u32) {
545        self.blob_toc_size = sz;
546    }
547
548    /// The RAFS blob contains `blob.meta`, `blob.digest`, `image.boot`, `ToC` etc.
549    /// Get SHA256 digest of RAFS blob containing `blob.meta`, `blob.digest` `blob.toc` and
550    /// optionally 'image.boot`.
551    ///
552    /// Default to `self.blob_id` when it's all zero.
553    pub fn blob_meta_digest(&self) -> &[u8; 32] {
554        &self.blob_meta_digest
555    }
556
557    /// Set SHA256 digest of the RAFS blob.
558    pub fn set_blob_meta_digest(&mut self, digest: [u8; 32]) {
559        self.blob_meta_digest = digest;
560    }
561
562    /// Get size of the RAFS blob.
563    pub fn blob_meta_size(&self) -> u64 {
564        self.blob_meta_size
565    }
566
567    /// Set size of the RAFS blob.
568    pub fn set_blob_meta_size(&mut self, size: u64) {
569        self.blob_meta_size = size;
570    }
571
572    /// Set path for meta blob file, which will be used by `get_blob_id()` and `get_blob_meta_id()`.
573    pub fn set_blob_id_from_meta_path(&self, path: &Path) -> Result<(), Error> {
574        *self.meta_path.lock().unwrap() = Self::get_blob_id_from_meta_path(path)?;
575        Ok(())
576    }
577
578    pub fn get_blob_id_from_meta_path(path: &Path) -> Result<String, Error> {
579        // Manual implementation of Path::file_prefix().
580        let mut id = path.file_name().ok_or_else(|| {
581            einval!(format!(
582                "failed to get blob id from meta file path {}",
583                path.display()
584            ))
585        })?;
586        loop {
587            let id1 = Path::new(id).file_stem().ok_or_else(|| {
588                einval!(format!(
589                    "failed to get blob id from meta file path {}",
590                    path.display()
591                ))
592            })?;
593            if id1.is_empty() {
594                return Err(einval!(format!(
595                    "failed to get blob id from meta file path {}",
596                    path.display()
597                )));
598            } else if id == id1 {
599                break;
600            } else {
601                id = id1;
602            }
603        }
604        let id = id.to_str().ok_or_else(|| {
605            einval!(format!(
606                "failed to get blob id from meta file path {}",
607                path.display()
608            ))
609        })?;
610
611        Ok(id.to_string())
612    }
613
614    /// Get RAFS blob id for ZRan.
615    pub fn get_blob_meta_id(&self) -> Result<String, Error> {
616        assert!(self.has_feature(BlobFeatures::SEPARATE));
617        let id = if self.has_feature(BlobFeatures::INLINED_FS_META) {
618            let guard = self.meta_path.lock().unwrap();
619            if guard.is_empty() {
620                return Err(einval!("failed to get blob id from meta file name"));
621            }
622            guard.deref().clone()
623        } else {
624            hex::encode(self.blob_meta_digest)
625        };
626        Ok(id)
627    }
628
629    /// Get the cipher info, including cipher algo, cipher object and cipher context.
630    pub fn get_cipher_info(&self) -> (crypt::Algorithm, Arc<Cipher>, Option<CipherContext>) {
631        (
632            self.cipher,
633            self.cipher_object.clone(),
634            self.cipher_ctx.clone(),
635        )
636    }
637}
638
639bitflags! {
640    /// Blob chunk flags.
641    pub struct BlobChunkFlags: u32 {
642        /// Chunk data is compressed.
643        const COMPRESSED = 0x0000_0001;
644        /// Chunk is a hole, with all data as zero.
645        const _HOLECHUNK = 0x0000_0002;
646        /// Chunk data is encrypted.
647        const ENCRYPTED = 0x0000_0004;
648        /// Chunk data is merged into a batch chunk.
649        const BATCH = 0x0000_0008;
650        /// Chunk data includes a CRC checksum.
651        const HAS_CRC32 = 0x0000_0010;
652    }
653}
654
655impl Default for BlobChunkFlags {
656    fn default() -> Self {
657        BlobChunkFlags::empty()
658    }
659}
660
661/// Trait to provide basic information for a chunk.
662///
663/// A `BlobChunkInfo` object describes how a chunk is located within the compressed and
664/// uncompressed data blobs. It's used to help the storage subsystem to:
665/// - download chunks from storage backend
666/// - maintain chunk readiness state for each chunk
667/// - convert from compressed form to uncompressed form
668///
669/// This trait may be extended to provide additional information for a specific Rafs filesystem
670/// version, for example `BlobV5ChunkInfo` provides Rafs v5 filesystem related information about
671/// a chunk.
672pub trait BlobChunkInfo: Any + Sync + Send {
673    /// Get the message digest value of the chunk, which acts as an identifier for the chunk.
674    fn chunk_id(&self) -> &RafsDigest;
675
676    /// Get a unique ID to identify the chunk within the metadata/data blob.
677    ///
678    /// The returned value of `id()` is often been used as HashMap keys, so `id()` method should
679    /// return unique identifier for each chunk of a blob file.
680    fn id(&self) -> u32;
681
682    /// Get the blob index of the blob file in the Rafs v5 metadata's blob array.
683    fn blob_index(&self) -> u32;
684
685    /// Get the chunk offset in the compressed blob.
686    fn compressed_offset(&self) -> u64;
687
688    /// Get the size of the compressed chunk.
689    fn compressed_size(&self) -> u32;
690
691    /// Get end of the chunk in the compressed blob.
692    fn compressed_end(&self) -> u64 {
693        self.compressed_offset() + self.compressed_size() as u64
694    }
695
696    /// Get the chunk offset in the uncompressed blob.
697    fn uncompressed_offset(&self) -> u64;
698
699    /// Get the size of the uncompressed chunk.
700    fn uncompressed_size(&self) -> u32;
701
702    /// Get end of the chunk in the compressed blob.
703    fn uncompressed_end(&self) -> u64 {
704        self.uncompressed_offset() + self.uncompressed_size() as u64
705    }
706
707    /// Check whether the chunk is batch chunk or not.
708    fn is_batch(&self) -> bool;
709
710    /// Check whether the chunk is compressed or not.
711    ///
712    /// Some chunk may become bigger after compression, so plain data instead of compressed
713    /// data may be stored in the compressed data blob for those chunks.
714    fn is_compressed(&self) -> bool;
715
716    /// Check whether the chunk is encrypted or not.
717    fn is_encrypted(&self) -> bool;
718
719    /// Check whether the chunk has CRC checksum or not.
720    fn has_crc32(&self) -> bool;
721
722    /// Get the crc32 checksum of the chunk.
723    fn crc32(&self) -> u32;
724
725    fn as_any(&self) -> &dyn Any;
726}
727
728/// An enumeration to encapsulate different [BlobChunkInfo] implementations for [BlobIoDesc].
729///
730/// This helps to feed unified IO description to storage subsystem from both rafs v6 and v5 since
731/// rafs v6 have a different ChunkInfo definition on bootstrap.
732#[derive(Clone)]
733pub struct BlobIoChunk(Arc<dyn BlobChunkInfo>);
734
735impl From<Arc<dyn BlobChunkInfo>> for BlobIoChunk {
736    fn from(v: Arc<dyn BlobChunkInfo>) -> Self {
737        BlobIoChunk(v)
738    }
739}
740
741impl BlobChunkInfo for BlobIoChunk {
742    fn chunk_id(&self) -> &RafsDigest {
743        self.0.chunk_id()
744    }
745
746    fn id(&self) -> u32 {
747        self.0.id()
748    }
749
750    fn blob_index(&self) -> u32 {
751        self.0.blob_index()
752    }
753
754    fn compressed_offset(&self) -> u64 {
755        self.0.compressed_offset()
756    }
757
758    fn compressed_size(&self) -> u32 {
759        self.0.compressed_size()
760    }
761
762    fn uncompressed_offset(&self) -> u64 {
763        self.0.uncompressed_offset()
764    }
765
766    fn uncompressed_size(&self) -> u32 {
767        self.0.uncompressed_size()
768    }
769
770    fn is_batch(&self) -> bool {
771        self.0.is_batch()
772    }
773
774    fn is_compressed(&self) -> bool {
775        self.0.is_compressed()
776    }
777
778    fn is_encrypted(&self) -> bool {
779        self.0.is_encrypted()
780    }
781
782    fn has_crc32(&self) -> bool {
783        self.0.has_crc32()
784    }
785
786    fn crc32(&self) -> u32 {
787        self.0.crc32()
788    }
789
790    fn as_any(&self) -> &dyn Any {
791        self
792    }
793}
794
795/// Blob IO descriptor, containing information for a continuous IO range within a chunk.
796#[derive(Clone)]
797pub struct BlobIoDesc {
798    /// The blob associated with the IO operation.
799    pub blob: Arc<BlobInfo>,
800    /// The chunk associated with the IO operation.
801    pub chunkinfo: BlobIoChunk,
802    /// Offset from start of the chunk for the IO operation.
803    pub offset: u32,
804    /// Size of the IO operation
805    pub size: u32,
806    /// Whether it's a user initiated IO, otherwise is a storage system internal IO.
807    ///
808    /// It might be initiated by user io amplification. With this flag, lower device
809    /// layer may choose how to prioritize the IO operation.
810    pub(crate) user_io: bool,
811}
812
813impl BlobIoDesc {
814    /// Create a new blob IO descriptor.
815    pub fn new(
816        blob: Arc<BlobInfo>,
817        chunkinfo: BlobIoChunk,
818        offset: u32,
819        size: u32,
820        user_io: bool,
821    ) -> Self {
822        BlobIoDesc {
823            blob,
824            chunkinfo,
825            offset,
826            size,
827            user_io,
828        }
829    }
830
831    /// Check whether the `other` BlobIoDesc is continuous to current one.
832    pub fn is_continuous(&self, next: &BlobIoDesc, max_gap: u64) -> bool {
833        let prev_end = self.chunkinfo.compressed_offset() + self.chunkinfo.compressed_size() as u64;
834        let next_offset = next.chunkinfo.compressed_offset();
835
836        if self.chunkinfo.is_batch() || next.chunkinfo.is_batch() {
837            // Batch chunk can only be compared by uncompressed info.
838            return next.chunkinfo.uncompressed_offset() - self.chunkinfo.uncompressed_end()
839                <= max_gap;
840        }
841
842        if self.chunkinfo.blob_index() == next.chunkinfo.blob_index() && next_offset >= prev_end {
843            if next.blob.is_legacy_stargz() {
844                next_offset - prev_end <= max_gap * 8
845            } else {
846                next_offset - prev_end <= max_gap
847            }
848        } else {
849            false
850        }
851    }
852}
853
854impl Debug for BlobIoDesc {
855    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
856        f.debug_struct("BlobIoDesc")
857            .field("blob_index", &self.blob.blob_index)
858            .field("chunk_index", &self.chunkinfo.id())
859            .field("compressed_offset", &self.chunkinfo.compressed_offset())
860            .field("file_offset", &self.offset)
861            .field("size", &self.size)
862            .field("user", &self.user_io)
863            .finish()
864    }
865}
866
867/// Scatter/gather list for blob IO operation, containing zero or more blob IO descriptors
868pub struct BlobIoVec {
869    /// The blob associated with the IO operation.
870    bi_blob: Arc<BlobInfo>,
871    /// Total size of blob IOs to be performed.
872    bi_size: u64,
873    /// Array of blob IOs, these IOs should be executed sequentially.
874    pub(crate) bi_vec: Vec<BlobIoDesc>,
875}
876
877impl BlobIoVec {
878    /// Create a new blob IO scatter/gather list object.
879    pub fn new(bi_blob: Arc<BlobInfo>) -> Self {
880        BlobIoVec {
881            bi_blob,
882            bi_size: 0,
883            bi_vec: Vec::with_capacity(128),
884        }
885    }
886
887    /// Add a new 'BlobIoDesc' to the 'BlobIoVec'.
888    pub fn push(&mut self, desc: BlobIoDesc) {
889        assert_eq!(self.bi_blob.blob_index(), desc.blob.blob_index());
890        assert_eq!(self.bi_blob.blob_id(), desc.blob.blob_id());
891        assert!(self.bi_size.checked_add(desc.size as u64).is_some());
892        self.bi_size += desc.size as u64;
893        self.bi_vec.push(desc);
894    }
895
896    /// Append another blob io vector to current one.
897    pub fn append(&mut self, mut vec: BlobIoVec) {
898        assert_eq!(self.bi_blob.blob_id(), vec.bi_blob.blob_id());
899        assert!(self.bi_size.checked_add(vec.bi_size).is_some());
900        self.bi_vec.append(vec.bi_vec.as_mut());
901        self.bi_size += vec.bi_size;
902    }
903
904    /// Reset the blob io vector.
905    pub fn reset(&mut self) {
906        self.bi_size = 0;
907        self.bi_vec.truncate(0);
908    }
909
910    /// Get number of 'BlobIoDesc' in the 'BlobIoVec'.
911    pub fn len(&self) -> usize {
912        self.bi_vec.len()
913    }
914
915    /// Check whether there's 'BlobIoDesc' in the 'BlobIoVec'.
916    pub fn is_empty(&self) -> bool {
917        self.bi_vec.is_empty()
918    }
919
920    /// Get size of pending IO data.
921    pub fn size(&self) -> u64 {
922        self.bi_size
923    }
924
925    /// Get an immutable reference to a `BlobIoDesc` entry.
926    pub fn blob_io_desc(&self, index: usize) -> Option<&BlobIoDesc> {
927        if index < self.bi_vec.len() {
928            Some(&self.bi_vec[index])
929        } else {
930            None
931        }
932    }
933
934    /// Get the target blob index of the blob io vector.
935    pub fn blob_index(&self) -> u32 {
936        self.bi_blob.blob_index()
937    }
938
939    /// Check whether the blob io vector is targeting the blob with `blob_index`
940    pub fn is_target_blob(&self, blob_index: u32) -> bool {
941        self.bi_blob.blob_index() == blob_index
942    }
943
944    /// Check whether two blob io vector targets the same blob.
945    pub fn has_same_blob(&self, desc: &BlobIoVec) -> bool {
946        self.bi_blob.blob_index() == desc.bi_blob.blob_index()
947    }
948}
949
950impl Debug for BlobIoVec {
951    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
952        f.debug_struct("BlobIoDesc")
953            .field("blob_index", &self.bi_blob.blob_index)
954            .field("size", &self.bi_size)
955            .field("decriptors", &self.bi_vec)
956            .finish()
957    }
958}
959
960/// Helper structure to merge blob IOs to reduce IO requests.
961#[derive(Default)]
962pub struct BlobIoMerge {
963    map: HashMap<String, BlobIoVec>,
964    current: String,
965}
966
967impl BlobIoMerge {
968    /// Append an `BlobIoVec` object to the merge state object.
969    pub fn append(&mut self, desc: BlobIoVec) {
970        if !desc.is_empty() {
971            let id = desc.bi_blob.blob_id.as_str();
972            if self.current != id {
973                self.current = id.to_string();
974            }
975            if let Some(prev) = self.map.get_mut(id) {
976                prev.append(desc);
977            } else {
978                self.map.insert(id.to_string(), desc);
979            }
980        }
981    }
982
983    /// Drain elements in the cache.
984    pub fn drain(&mut self) -> Drain<'_, String, BlobIoVec> {
985        self.map.drain()
986    }
987
988    /// Get current element.
989    pub fn get_current_element(&mut self) -> Option<&mut BlobIoVec> {
990        self.map.get_mut(&self.current)
991    }
992}
993
994/// A segment representing a continuous range for a blob IO operation.
995///
996/// It can span multiple chunks while the `offset` is where the user io starts
997/// within the first chunk and `len` is the total user io length of these chunks.
998#[derive(Clone, Debug, Default)]
999pub(crate) struct BlobIoSegment {
1000    /// Start position of the range within the chunk
1001    pub offset: u32,
1002    /// Size of the range within the chunk
1003    pub len: u32,
1004}
1005
1006impl BlobIoSegment {
1007    /// Create a new instance of `ChunkSegment`.
1008    pub fn new(offset: u32, len: u32) -> Self {
1009        Self { offset, len }
1010    }
1011
1012    #[inline]
1013    pub fn append(&mut self, offset: u32, len: u32) {
1014        assert!(offset.checked_add(len).is_some());
1015        assert_eq!(offset, 0);
1016
1017        self.len += len;
1018    }
1019
1020    pub fn is_empty(&self) -> bool {
1021        self.offset == 0 && self.len == 0
1022    }
1023}
1024
1025/// Struct to maintain information about blob IO operation.
1026#[derive(Clone, Debug)]
1027pub(crate) enum BlobIoTag {
1028    /// Io requests to fulfill user requests.
1029    User(BlobIoSegment),
1030    /// Io requests to fulfill internal requirements.
1031    Internal,
1032}
1033
1034impl BlobIoTag {
1035    /// Check whether the tag is a user issued io request.
1036    pub fn is_user_io(&self) -> bool {
1037        matches!(self, BlobIoTag::User(_))
1038    }
1039}
1040
1041/// Struct to representing multiple continuous blob IO as one storage backend request.
1042///
1043/// For network based remote storage backend, such as Registry/OS, it may have limited IOPs
1044/// due to high request round-trip time, but have enough network bandwidth. In such cases,
1045/// it may help to improve performance by merging multiple continuous and small blob IO
1046/// requests into one big backend request.
1047///
1048/// A `BlobIoRange` request targets a continuous range of a single blob.
1049#[derive(Default, Clone)]
1050pub struct BlobIoRange {
1051    pub(crate) blob_info: Arc<BlobInfo>,
1052    pub(crate) blob_offset: u64,
1053    pub(crate) blob_size: u64,
1054    pub(crate) chunks: Vec<Arc<dyn BlobChunkInfo>>,
1055    pub(crate) tags: Vec<BlobIoTag>,
1056}
1057
1058impl Debug for BlobIoRange {
1059    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
1060        f.debug_struct("BlobIoRange")
1061            .field("blob_id", &self.blob_info.blob_id())
1062            .field("blob_offset", &self.blob_offset)
1063            .field("blob_size", &self.blob_size)
1064            .field("tags", &self.tags)
1065            .finish()
1066    }
1067}
1068
1069impl BlobIoRange {
1070    /// Create a new instance of `BlobIoRange`.
1071    pub fn new(bio: &BlobIoDesc, capacity: usize) -> Self {
1072        let blob_size = bio.chunkinfo.compressed_size() as u64;
1073        let blob_offset = bio.chunkinfo.compressed_offset();
1074        assert!(blob_offset.checked_add(blob_size).is_some());
1075
1076        let mut chunks = Vec::with_capacity(capacity);
1077        let mut tags = Vec::with_capacity(capacity);
1078        tags.push(Self::tag_from_desc(bio));
1079        chunks.push(bio.chunkinfo.0.clone());
1080
1081        BlobIoRange {
1082            blob_info: bio.blob.clone(),
1083            blob_offset,
1084            blob_size,
1085            chunks,
1086            tags,
1087        }
1088    }
1089
1090    /// Merge an `BlobIoDesc` into the `BlobIoRange` object.
1091    pub fn merge(&mut self, bio: &BlobIoDesc, _max_gap: u64) {
1092        let end = self.blob_offset + self.blob_size;
1093        let offset = bio.chunkinfo.compressed_offset();
1094        let size = bio.chunkinfo.compressed_size() as u64;
1095        let size = if end == offset {
1096            assert!(offset.checked_add(size).is_some());
1097            size
1098        } else {
1099            assert!(offset > end);
1100            size + (offset - end)
1101        };
1102        assert!(end.checked_add(size).is_some());
1103
1104        self.blob_size += size;
1105        self.tags.push(Self::tag_from_desc(bio));
1106        self.chunks.push(bio.chunkinfo.0.clone());
1107    }
1108
1109    fn tag_from_desc(bio: &BlobIoDesc) -> BlobIoTag {
1110        if bio.user_io {
1111            BlobIoTag::User(BlobIoSegment::new(bio.offset, bio.size as u32))
1112        } else {
1113            BlobIoTag::Internal
1114        }
1115    }
1116}
1117
1118/// Struct representing a blob data prefetching request.
1119///
1120/// It may help to improve performance for the storage backend to prefetch data in background.
1121/// A `BlobPrefetchControl` object advises to prefetch data range [offset, offset + len) from
1122/// blob `blob_id`. The prefetch operation should be asynchronous, and cache hit for filesystem
1123/// read operations should validate data integrity.
1124pub struct BlobPrefetchRequest {
1125    /// The ID of the blob to prefetch data for.
1126    pub blob_id: String,
1127    /// Offset into the blob to prefetch data.
1128    pub offset: u64,
1129    /// Size of data to prefetch.
1130    pub len: u64,
1131}
1132
1133/// Trait to provide direct access to underlying uncompressed blob file.
1134///
1135/// The suggested flow to make use of an `BlobObject` is as below:
1136/// - call `is_all_data_ready()` to check all blob data has already been cached. If true, skip
1137///   next step.
1138/// - call `fetch()` to ensure blob range [offset, offset + size) has been cached.
1139/// - call `as_raw_fd()` to get the underlying file descriptor for direct access.
1140/// - call File::read(buf, offset + `base_offset()`, size) to read data from underlying cache file.
1141pub trait BlobObject: AsRawFd {
1142    /// Get base offset to read blob from the fd returned by `as_raw_fd()`.
1143    fn base_offset(&self) -> u64;
1144
1145    /// Check whether all data of the blob object is ready.
1146    fn is_all_data_ready(&self) -> bool;
1147
1148    /// Fetch data from storage backend covering compressed blob range [offset, offset + size).
1149    ///
1150    /// Used by asynchronous prefetch worker to implement blob prefetch.
1151    fn fetch_range_compressed(&self, offset: u64, size: u64, prefetch: bool) -> io::Result<()>;
1152
1153    /// Fetch data from storage backend and make sure data range [offset, offset + size) is ready
1154    /// for use.
1155    ///
1156    /// Used by rafs to support blobfs.
1157    fn fetch_range_uncompressed(&self, offset: u64, size: u64) -> io::Result<()>;
1158
1159    /// Prefetch data for specified chunks from storage backend.
1160    ///
1161    /// Used by asynchronous prefetch worker to implement fs prefetch.
1162    fn prefetch_chunks(&self, range: &BlobIoRange) -> io::Result<()>;
1163}
1164
1165/// A wrapping object over an underlying [BlobCache] object.
1166///
1167/// All blob Io requests are actually served by the underlying [BlobCache] object. The wrapper
1168/// provides an interface to dynamically switch underlying [BlobCache] objects.
1169#[derive(Clone, Default)]
1170pub struct BlobDevice {
1171    blobs: Arc<ArcSwap<Vec<Arc<dyn BlobCache>>>>,
1172    blob_count: usize,
1173}
1174
1175impl BlobDevice {
1176    /// Create new blob device instance.
1177    pub fn new(config: &Arc<ConfigV2>, blob_infos: &[Arc<BlobInfo>]) -> io::Result<BlobDevice> {
1178        let mut blobs = Vec::with_capacity(blob_infos.len());
1179        for blob_info in blob_infos.iter() {
1180            let blob = BLOB_FACTORY.new_blob_cache(config, blob_info)?;
1181            blobs.push(blob);
1182        }
1183
1184        Ok(BlobDevice {
1185            blobs: Arc::new(ArcSwap::new(Arc::new(blobs))),
1186            blob_count: blob_infos.len(),
1187        })
1188    }
1189
1190    /// Update configuration and storage backends of the blob device.
1191    ///
1192    /// The `update()` method switch a new storage backend object according to the configuration
1193    /// information passed in.
1194    pub fn update(
1195        &self,
1196        config: &Arc<ConfigV2>,
1197        blob_infos: &[Arc<BlobInfo>],
1198        fs_prefetch: bool,
1199    ) -> io::Result<()> {
1200        if self.blobs.load().len() != blob_infos.len() {
1201            return Err(einval!(
1202                "number of blobs doesn't match when update 'BlobDevice' object"
1203            ));
1204        }
1205
1206        let mut blobs = Vec::with_capacity(blob_infos.len());
1207        for blob_info in blob_infos.iter() {
1208            let blob = BLOB_FACTORY.new_blob_cache(config, blob_info)?;
1209            blobs.push(blob);
1210        }
1211
1212        if fs_prefetch {
1213            // Stop prefetch if it is running before swapping backend since prefetch threads cloned
1214            // Arc<BlobCache>, the swap operation can't drop inner object completely.
1215            // Otherwise prefetch threads will be leaked.
1216            self.stop_prefetch();
1217        }
1218        self.blobs.store(Arc::new(blobs));
1219        if fs_prefetch {
1220            self.start_prefetch();
1221        }
1222
1223        Ok(())
1224    }
1225
1226    /// Close the blob device.
1227    pub fn close(&self) -> io::Result<()> {
1228        Ok(())
1229    }
1230
1231    /// Check whether the `BlobDevice` has any blobs.
1232    pub fn has_device(&self) -> bool {
1233        self.blob_count > 0
1234    }
1235
1236    /// Read a range of data from a data blob into the provided writer
1237    pub fn read_to(&self, w: &mut dyn ZeroCopyWriter, desc: &mut BlobIoVec) -> io::Result<usize> {
1238        // Validate that:
1239        // - bi_vec[0] is valid
1240        // - bi_vec[0].blob.blob_index() is valid
1241        // - all IOs are against a single blob.
1242        if desc.bi_vec.is_empty() {
1243            if desc.bi_size == 0 {
1244                Ok(0)
1245            } else {
1246                Err(einval!("BlobIoVec size doesn't match."))
1247            }
1248        } else if desc.blob_index() as usize >= self.blob_count {
1249            Err(einval!("BlobIoVec has out of range blob_index."))
1250        } else {
1251            let size = desc.bi_size;
1252            let mut f = BlobDeviceIoVec::new(self, desc);
1253            // The `off` parameter to w.write_from() is actually ignored by
1254            // BlobV5IoVec::read_vectored_at_volatile()
1255            w.write_from(&mut f, size as usize, 0)
1256        }
1257    }
1258
1259    /// Try to prefetch specified blob data.
1260    pub fn prefetch(
1261        &self,
1262        io_vecs: &[&BlobIoVec],
1263        prefetches: &[BlobPrefetchRequest],
1264    ) -> io::Result<()> {
1265        for idx in 0..prefetches.len() {
1266            if let Some(blob) = self.get_blob_by_id(&prefetches[idx].blob_id) {
1267                let _ = blob.prefetch(blob.clone(), &prefetches[idx..idx + 1], &[]);
1268            }
1269        }
1270
1271        for io_vec in io_vecs.iter() {
1272            if let Some(blob) = self.get_blob_by_iovec(io_vec) {
1273                // Prefetch errors are ignored.
1274                let _ = blob
1275                    .prefetch(blob.clone(), &[], &io_vec.bi_vec)
1276                    .map_err(|e| {
1277                        error!("failed to prefetch blob data, {}", e);
1278                    });
1279            }
1280        }
1281
1282        Ok(())
1283    }
1284
1285    /// Start the background blob data prefetch task.
1286    pub fn start_prefetch(&self) {
1287        for blob in self.blobs.load().iter() {
1288            let _ = blob.start_prefetch();
1289        }
1290    }
1291
1292    /// Stop the background blob data prefetch task.
1293    pub fn stop_prefetch(&self) {
1294        for blob in self.blobs.load().iter() {
1295            let _ = blob.stop_prefetch();
1296        }
1297    }
1298
1299    /// fetch specified blob data in a synchronous way.
1300    pub fn fetch_range_synchronous(&self, prefetches: &[BlobPrefetchRequest]) -> io::Result<()> {
1301        for req in prefetches {
1302            if req.len == 0 {
1303                continue;
1304            }
1305            if let Some(cache) = self.get_blob_by_id(&req.blob_id) {
1306                trace!(
1307                    "fetch blob {} offset {} size {}",
1308                    req.blob_id,
1309                    req.offset,
1310                    req.len
1311                );
1312                if let Some(obj) = cache.get_blob_object() {
1313                    obj.fetch_range_uncompressed(req.offset as u64, req.len as u64)
1314                        .map_err(|e| {
1315                            warn!(
1316                                "Failed to prefetch data from blob {}, offset {}, size {}, {}",
1317                                cache.blob_id(),
1318                                req.offset,
1319                                req.len,
1320                                e
1321                            );
1322                            e
1323                        })?;
1324                } else {
1325                    error!("No support for fetching uncompressed blob data");
1326                    return Err(einval!("No support for fetching uncompressed blob data"));
1327                }
1328            }
1329        }
1330
1331        Ok(())
1332    }
1333
1334    /// Check all chunks related to the blob io vector are ready.
1335    pub fn all_chunks_ready(&self, io_vecs: &[BlobIoVec]) -> bool {
1336        for io_vec in io_vecs.iter() {
1337            if let Some(blob) = self.get_blob_by_iovec(io_vec) {
1338                let chunk_map = blob.get_chunk_map();
1339                for desc in io_vec.bi_vec.iter() {
1340                    if !chunk_map.is_ready(&desc.chunkinfo).unwrap_or(false) {
1341                        return false;
1342                    }
1343                }
1344            } else {
1345                return false;
1346            }
1347        }
1348
1349        true
1350    }
1351
1352    /// RAFS V6: create a `BlobIoChunk` for chunk with index `chunk_index`.
1353    pub fn create_io_chunk(&self, blob_index: u32, chunk_index: u32) -> Option<BlobIoChunk> {
1354        if (blob_index as usize) < self.blob_count {
1355            let state = self.blobs.load();
1356            let blob = &state[blob_index as usize];
1357            blob.get_chunk_info(chunk_index).map(|v| v.into())
1358        } else {
1359            None
1360        }
1361    }
1362
1363    /// RAFS V6: get chunk information object for chunks.
1364    pub fn get_chunk_info(
1365        &self,
1366        blob_index: u32,
1367        chunk_index: u32,
1368    ) -> Option<Arc<dyn BlobChunkInfo>> {
1369        if (blob_index as usize) < self.blob_count {
1370            let state = self.blobs.load();
1371            let blob = &state[blob_index as usize];
1372            blob.get_chunk_info(chunk_index)
1373        } else {
1374            None
1375        }
1376    }
1377
1378    fn get_blob_by_iovec(&self, iovec: &BlobIoVec) -> Option<Arc<dyn BlobCache>> {
1379        let blob_index = iovec.blob_index();
1380        if (blob_index as usize) < self.blob_count {
1381            return Some(self.blobs.load()[blob_index as usize].clone());
1382        }
1383
1384        None
1385    }
1386
1387    fn get_blob_by_id(&self, blob_id: &str) -> Option<Arc<dyn BlobCache>> {
1388        for blob in self.blobs.load().iter() {
1389            if blob.blob_id() == blob_id {
1390                return Some(blob.clone());
1391            }
1392        }
1393
1394        None
1395    }
1396}
1397
1398/// Struct to execute Io requests with a single blob.
1399///
1400/// It's used to support `BlobDevice::read_to()` and acts the main entrance to read chunk data
1401/// from data blobs.
1402struct BlobDeviceIoVec<'a> {
1403    dev: &'a BlobDevice,
1404    iovec: &'a mut BlobIoVec,
1405}
1406
1407impl<'a> BlobDeviceIoVec<'a> {
1408    fn new(dev: &'a BlobDevice, iovec: &'a mut BlobIoVec) -> Self {
1409        BlobDeviceIoVec { dev, iovec }
1410    }
1411}
1412
1413impl FileReadWriteVolatile for BlobDeviceIoVec<'_> {
1414    fn read_volatile(&mut self, _slice: FileVolatileSlice) -> Result<usize, Error> {
1415        // Skip because we don't really use it
1416        unimplemented!();
1417    }
1418
1419    fn write_volatile(&mut self, _slice: FileVolatileSlice) -> Result<usize, Error> {
1420        // Skip because we don't really use it
1421        unimplemented!();
1422    }
1423
1424    fn read_at_volatile(&mut self, slice: FileVolatileSlice, offset: u64) -> Result<usize, Error> {
1425        let buffers = [slice];
1426        self.read_vectored_at_volatile(&buffers, offset)
1427    }
1428
1429    // The default read_vectored_at_volatile only read to the first slice, so we have to overload it.
1430    fn read_vectored_at_volatile(
1431        &mut self,
1432        buffers: &[FileVolatileSlice],
1433        _offset: u64,
1434    ) -> Result<usize, Error> {
1435        // BlobDevice::read_to() has validated that all IOs are against a single blob.
1436        let index = self.iovec.blob_index();
1437        let blobs = &self.dev.blobs.load();
1438
1439        if (index as usize) < blobs.len() {
1440            blobs[index as usize].read(self.iovec, buffers)
1441        } else {
1442            let msg = format!(
1443                "failed to get blob object for BlobIoVec, index {}, blob array len: {}",
1444                index,
1445                blobs.len()
1446            );
1447            Err(einval!(msg))
1448        }
1449    }
1450
1451    fn write_at_volatile(
1452        &mut self,
1453        _slice: FileVolatileSlice,
1454        _offset: u64,
1455    ) -> Result<usize, Error> {
1456        unimplemented!()
1457    }
1458}
1459
1460/// Traits and Structs to support Rafs v5 image format.
1461///
1462/// The Rafs v5 image format is designed with fused filesystem metadata and blob management
1463/// metadata, which is simple to implement but also introduces inter-dependency between the
1464/// filesystem layer and the blob management layer. This circular dependency is hard to maintain
1465/// and extend. Newer Rafs image format adopts designs with independent blob management layer,
1466/// which could be easily used to support both fuse and virtio-fs. So Rafs v5 image specific
1467/// interfaces are isolated into a dedicated sub-module.
1468pub mod v5 {
1469    use super::*;
1470
1471    /// Trait to provide extended information for a Rafs v5 chunk.
1472    ///
1473    /// Rafs filesystem stores filesystem metadata in a single metadata blob, and stores file
1474    /// content in zero or more data blobs, which are separated from the metadata blob.
1475    /// A `BlobV5ChunkInfo` object describes how a Rafs v5 chunk is located within a data blob.
1476    /// It is abstracted because Rafs have several ways to load metadata from metadata blob.
1477    pub trait BlobV5ChunkInfo: BlobChunkInfo {
1478        /// Get the chunk index in the Rafs v5 metadata's chunk info array.
1479        fn index(&self) -> u32;
1480
1481        /// Get the file offset within the Rafs file it belongs to.
1482        fn file_offset(&self) -> u64;
1483
1484        /// Get flags of the chunk.
1485        fn flags(&self) -> BlobChunkFlags;
1486
1487        /// Cast to a base [BlobChunkInfo] trait object.
1488        fn as_base(&self) -> &dyn BlobChunkInfo;
1489    }
1490}
1491
1492#[cfg(test)]
1493mod tests {
1494    use std::path::PathBuf;
1495
1496    use super::*;
1497    use crate::test::MockChunkInfo;
1498
1499    #[test]
1500    fn test_blob_io_chunk() {
1501        let chunk: Arc<dyn BlobChunkInfo> = Arc::new(MockChunkInfo {
1502            block_id: Default::default(),
1503            blob_index: 0,
1504            flags: Default::default(),
1505            compress_size: 0x100,
1506            uncompress_size: 0x200,
1507            compress_offset: 0x1000,
1508            uncompress_offset: 0x2000,
1509            file_offset: 0,
1510            index: 3,
1511            crc32: 0,
1512        });
1513        let iochunk: BlobIoChunk = chunk.clone().into();
1514
1515        assert_eq!(iochunk.id(), 3);
1516        assert_eq!(iochunk.compressed_offset(), 0x1000);
1517        assert_eq!(iochunk.compressed_size(), 0x100);
1518        assert_eq!(iochunk.uncompressed_offset(), 0x2000);
1519        assert_eq!(iochunk.uncompressed_size(), 0x200);
1520        assert!(!iochunk.is_compressed());
1521    }
1522
1523    #[test]
1524    fn test_chunk_is_continuous() {
1525        let blob_info = Arc::new(BlobInfo::new(
1526            1,
1527            "test1".to_owned(),
1528            0x200000,
1529            0x100000,
1530            0x100000,
1531            512,
1532            BlobFeatures::_V5_NO_EXT_BLOB_TABLE,
1533        ));
1534        let chunk1 = Arc::new(MockChunkInfo {
1535            block_id: Default::default(),
1536            blob_index: 1,
1537            flags: BlobChunkFlags::empty(),
1538            compress_size: 0x800,
1539            uncompress_size: 0x1000,
1540            compress_offset: 0,
1541            uncompress_offset: 0,
1542            file_offset: 0,
1543            index: 0,
1544            crc32: 0,
1545        }) as Arc<dyn BlobChunkInfo>;
1546        let chunk2 = Arc::new(MockChunkInfo {
1547            block_id: Default::default(),
1548            blob_index: 1,
1549            flags: BlobChunkFlags::empty(),
1550            compress_size: 0x800,
1551            uncompress_size: 0x1000,
1552            compress_offset: 0x800,
1553            uncompress_offset: 0x1000,
1554            file_offset: 0x1000,
1555            index: 1,
1556            crc32: 0,
1557        }) as Arc<dyn BlobChunkInfo>;
1558        let chunk3 = Arc::new(MockChunkInfo {
1559            block_id: Default::default(),
1560            blob_index: 1,
1561            flags: BlobChunkFlags::empty(),
1562            compress_size: 0x800,
1563            uncompress_size: 0x1000,
1564            compress_offset: 0x1800,
1565            uncompress_offset: 0x3000,
1566            file_offset: 0x3000,
1567            index: 1,
1568            crc32: 0,
1569        }) as Arc<dyn BlobChunkInfo>;
1570
1571        let desc1 = BlobIoDesc {
1572            blob: blob_info.clone(),
1573            chunkinfo: chunk1.into(),
1574            offset: 0,
1575            size: 0x1000,
1576            user_io: true,
1577        };
1578        let desc2 = BlobIoDesc {
1579            blob: blob_info.clone(),
1580            chunkinfo: chunk2.into(),
1581            offset: 0,
1582            size: 0x1000,
1583            user_io: true,
1584        };
1585        let desc3 = BlobIoDesc {
1586            blob: blob_info,
1587            chunkinfo: chunk3.into(),
1588            offset: 0,
1589            size: 0x1000,
1590            user_io: true,
1591        };
1592
1593        assert!(desc1.is_continuous(&desc2, 0x0));
1594        assert!(desc1.is_continuous(&desc2, 0x1000));
1595        assert!(!desc2.is_continuous(&desc1, 0x1000));
1596        assert!(!desc2.is_continuous(&desc1, 0x0));
1597
1598        assert!(!desc1.is_continuous(&desc3, 0x0));
1599        assert!(!desc1.is_continuous(&desc3, 0x400));
1600        assert!(!desc1.is_continuous(&desc3, 0x800));
1601        assert!(desc1.is_continuous(&desc3, 0x1000));
1602
1603        assert!(!desc2.is_continuous(&desc3, 0x0));
1604        assert!(!desc2.is_continuous(&desc3, 0x400));
1605        assert!(desc2.is_continuous(&desc3, 0x800));
1606        assert!(desc2.is_continuous(&desc3, 0x1000));
1607    }
1608
1609    #[test]
1610    fn test_append_same_blob_with_diff_index() {
1611        let blob1 = Arc::new(BlobInfo::new(
1612            1,
1613            "test1".to_owned(),
1614            0x200000,
1615            0x100000,
1616            0x100000,
1617            512,
1618            BlobFeatures::_V5_NO_EXT_BLOB_TABLE,
1619        ));
1620        let chunk1 = Arc::new(MockChunkInfo {
1621            block_id: Default::default(),
1622            blob_index: 1,
1623            flags: BlobChunkFlags::empty(),
1624            compress_size: 0x800,
1625            uncompress_size: 0x1000,
1626            compress_offset: 0,
1627            uncompress_offset: 0,
1628            file_offset: 0,
1629            index: 0,
1630            crc32: 0,
1631        }) as Arc<dyn BlobChunkInfo>;
1632        let mut iovec = BlobIoVec::new(blob1.clone());
1633        iovec.push(BlobIoDesc::new(blob1, BlobIoChunk(chunk1), 0, 0x1000, true));
1634
1635        let blob2 = Arc::new(BlobInfo::new(
1636            2,                  // different index
1637            "test1".to_owned(), // same id
1638            0x200000,
1639            0x100000,
1640            0x100000,
1641            512,
1642            BlobFeatures::_V5_NO_EXT_BLOB_TABLE,
1643        ));
1644        let chunk2 = Arc::new(MockChunkInfo {
1645            block_id: Default::default(),
1646            blob_index: 2,
1647            flags: BlobChunkFlags::empty(),
1648            compress_size: 0x800,
1649            uncompress_size: 0x1000,
1650            compress_offset: 0x800,
1651            uncompress_offset: 0x1000,
1652            file_offset: 0x1000,
1653            index: 1,
1654            crc32: 0,
1655        }) as Arc<dyn BlobChunkInfo>;
1656        let mut iovec2 = BlobIoVec::new(blob2.clone());
1657        iovec2.push(BlobIoDesc::new(blob2, BlobIoChunk(chunk2), 0, 0x1000, true));
1658
1659        iovec.append(iovec2);
1660        assert_eq!(0x2000, iovec.bi_size);
1661    }
1662
1663    #[test]
1664    fn test_extend_large_blob_io_vec() {
1665        let size = 0x2_0000_0000; // 8G blob
1666        let chunk_size = 0x10_0000; // 1M chunk
1667        let chunk_count = (size / chunk_size as u64) as u32;
1668        let large_blob = Arc::new(BlobInfo::new(
1669            0,
1670            "blob_id".to_owned(),
1671            size,
1672            size,
1673            chunk_size,
1674            chunk_count,
1675            BlobFeatures::default(),
1676        ));
1677
1678        let mut iovec = BlobIoVec::new(large_blob.clone());
1679        let mut iovec2 = BlobIoVec::new(large_blob.clone());
1680
1681        // Extend half of blob
1682        for chunk_idx in 0..chunk_count {
1683            let chunk = Arc::new(MockChunkInfo {
1684                block_id: Default::default(),
1685                blob_index: large_blob.blob_index,
1686                flags: BlobChunkFlags::empty(),
1687                compress_size: chunk_size,
1688                compress_offset: chunk_idx as u64 * chunk_size as u64,
1689                uncompress_size: 2 * chunk_size,
1690                uncompress_offset: 2 * chunk_idx as u64 * chunk_size as u64,
1691                file_offset: 2 * chunk_idx as u64 * chunk_size as u64,
1692                index: chunk_idx as u32,
1693                crc32: 0,
1694            }) as Arc<dyn BlobChunkInfo>;
1695            let desc = BlobIoDesc::new(large_blob.clone(), BlobIoChunk(chunk), 0, chunk_size, true);
1696            if chunk_idx < chunk_count / 2 {
1697                iovec.push(desc);
1698            } else {
1699                iovec2.push(desc)
1700            }
1701        }
1702
1703        // Extend other half of blob
1704        iovec.append(iovec2);
1705
1706        assert_eq!(size, iovec.size());
1707        assert_eq!(chunk_count, iovec.len() as u32);
1708    }
1709
1710    #[test]
1711    fn test_blob_info_blob_meta_id() {
1712        let blob_info = BlobInfo::new(
1713            1,
1714            "blob_id".to_owned(),
1715            0,
1716            0,
1717            0,
1718            1,
1719            BlobFeatures::SEPARATE | BlobFeatures::INLINED_FS_META,
1720        );
1721
1722        let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR");
1723        let mut source_path = PathBuf::from(root_dir);
1724        source_path.push("../tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef");
1725
1726        assert!(blob_info
1727            .set_blob_id_from_meta_path(source_path.as_path())
1728            .is_ok());
1729
1730        let id = blob_info.get_blob_meta_id();
1731        assert!(id.is_ok());
1732        assert_eq!(
1733            id.unwrap(),
1734            "be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef".to_owned()
1735        );
1736    }
1737}