Skip to main content

nydus_storage/cache/
mod.rs

1// Copyright 2020 Ant Group. All rights reserved.
2// Copyright (C) 2021 Alibaba Cloud. All rights reserved.
3//
4// SPDX-License-Identifier: Apache-2.0
5
6//! A blob cache layer over storage backend to improve performance.
7//!
8//! One of Rafs filesystem's goal is to support "on demand data loading". On demand loading may
9//! help to speed up application/container startup, but it may also cause serious performance
10//! penalty if all data chunks are retrieved from remoted backend storage. So cache layer is
11//! introduced between Rafs filesystem and backend storage, which caches remote data onto local
12//! storage and merge small data request into bigger request to improve network performance.
13//!
14//! There are several cache drivers implemented:
15//! - [DummyCacheMgr](dummycache/struct.DummyCacheMgr.html): a dummy implementation of
16//!   `BlobCacheMgr`, simply reporting each chunk as cached or not cached according to
17//!   configuration.
18
19use std::cmp;
20use std::io::Result;
21use std::sync::Arc;
22use std::time::Instant;
23
24use fuse_backend_rs::file_buf::FileVolatileSlice;
25use nydus_utils::compress::zlib_random::ZranDecoder;
26use nydus_utils::crypt::{self, Cipher, CipherContext};
27use nydus_utils::{compress, digest};
28
29use crate::backend::{BlobBackend, BlobReader};
30use crate::cache::state::ChunkMap;
31use crate::device::{
32    BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoVec, BlobObject, BlobPrefetchRequest,
33};
34use crate::meta::BlobCompressionContextInfo;
35use crate::utils::{alloc_buf, check_crc, check_hash};
36use crate::{StorageResult, RAFS_MAX_CHUNK_SIZE};
37
38mod cachedfile;
39#[cfg(feature = "dedup")]
40mod dedup;
41mod dummycache;
42mod filecache;
43#[cfg(target_os = "linux")]
44mod fscache;
45mod worker;
46
47pub mod state;
48
49pub use dummycache::DummyCacheMgr;
50pub use filecache::FileCacheMgr;
51#[cfg(target_os = "linux")]
52pub use fscache::FsCacheMgr;
53
54/// Timeout in milli-seconds to retrieve blob data from backend storage.
55pub const SINGLE_INFLIGHT_WAIT_TIMEOUT: u64 = 2000;
56
57struct BlobIoMergeState<'a, F: FnMut(BlobIoRange)> {
58    cb: F,
59    // size of compressed data
60    size: u32,
61    bios: Vec<&'a BlobIoDesc>,
62}
63
64impl<'a, F: FnMut(BlobIoRange)> BlobIoMergeState<'a, F> {
65    /// Create a new instance of 'IoMergeState`.
66    pub fn new(bio: &'a BlobIoDesc, cb: F) -> Self {
67        let size = bio.chunkinfo.compressed_size();
68
69        BlobIoMergeState {
70            cb,
71            size,
72            bios: vec![bio],
73        }
74    }
75
76    /// Get size of pending compressed data.
77    #[inline]
78    fn size(&self) -> usize {
79        self.size as usize
80    }
81
82    /// Push a new io descriptor into the pending list.
83    #[inline]
84    fn push(&mut self, bio: &'a BlobIoDesc) {
85        let start = bio.chunkinfo.compressed_offset();
86        let size = if !self.bios.is_empty() {
87            let last = &self.bios[self.bios.len() - 1].chunkinfo;
88            let prev = last.compressed_offset() + last.compressed_size() as u64;
89            assert!(prev <= start);
90            assert!(start - prev < u32::MAX as u64);
91            (start - prev) as u32 + bio.chunkinfo.compressed_size()
92        } else {
93            bio.chunkinfo.compressed_size()
94        };
95        assert!(self.size.checked_add(size).is_some());
96        self.size += size;
97        self.bios.push(bio);
98    }
99
100    /// Issue all pending io descriptors.
101    #[inline]
102    pub fn issue(&mut self, max_gap: u64) {
103        if !self.bios.is_empty() {
104            let mut mr = BlobIoRange::new(self.bios[0], self.bios.len());
105            for bio in self.bios[1..].iter() {
106                mr.merge(bio, max_gap);
107            }
108            (self.cb)(mr);
109
110            self.bios.truncate(0);
111            self.size = 0;
112        }
113    }
114
115    /// Merge adjacent chunks into bigger request with compressed size no bigger than `max_size`
116    /// and issue all blob IO descriptors.
117    pub fn merge_and_issue(bios: &[BlobIoDesc], max_comp_size: u64, max_gap: u64, op: F) {
118        if !bios.is_empty() {
119            let mut index = 1;
120            let mut state = BlobIoMergeState::new(&bios[0], op);
121
122            for cur_bio in &bios[1..] {
123                // Issue pending descriptors when next chunk is not continuous with current chunk
124                // or the accumulated compressed data size is big enough.
125                if !bios[index - 1].is_continuous(cur_bio, max_gap)
126                    || state.size() as u64 >= max_comp_size
127                {
128                    state.issue(max_gap);
129                }
130                state.push(cur_bio);
131                index += 1
132            }
133            state.issue(max_gap);
134        }
135    }
136}
137
138/// Trait representing a cache object for a blob on backend storage.
139///
140/// The caller may use the `BlobCache` trait to access blob data on backend storage, with an
141/// optional intermediate cache layer to improve performance.
142pub trait BlobCache: Send + Sync {
143    /// Get id of the blob object.
144    fn blob_id(&self) -> &str;
145
146    /// Get size of the decompressed blob object.
147    fn blob_uncompressed_size(&self) -> Result<u64>;
148
149    /// Get size of the compressed blob object.
150    fn blob_compressed_size(&self) -> Result<u64>;
151
152    /// Get data compression algorithm to handle chunks in the blob.
153    fn blob_compressor(&self) -> compress::Algorithm;
154
155    /// Get data encryption algorithm to handle chunks in the blob.
156    fn blob_cipher(&self) -> crypt::Algorithm;
157
158    /// Cipher object to encrypt/decrypt chunk data.
159    fn blob_cipher_object(&self) -> Arc<Cipher>;
160
161    /// Cipher context to encrypt/decrypt chunk data.
162    fn blob_cipher_context(&self) -> Option<CipherContext>;
163
164    /// Get message digest algorithm to handle chunks in the blob.
165    fn blob_digester(&self) -> digest::Algorithm;
166
167    /// Check whether the cache object is for an stargz image with legacy chunk format.
168    fn is_legacy_stargz(&self) -> bool;
169
170    /// Get maximum size of gzip compressed data.
171    fn get_legacy_stargz_size(&self, offset: u64, uncomp_size: usize) -> Result<usize> {
172        let blob_size = self.blob_compressed_size()?;
173        let max_size = blob_size.checked_sub(offset).ok_or_else(|| {
174            einval!(format!(
175                "chunk compressed offset {:x} is bigger than blob file size {:x}",
176                offset, blob_size
177            ))
178        })?;
179        let max_size = cmp::min(max_size, usize::MAX as u64) as usize;
180        Ok(compress::compute_compressed_gzip_size(
181            uncomp_size,
182            max_size,
183        ))
184    }
185
186    /// Check whether the blob is ZRan based.
187    fn is_zran(&self) -> bool {
188        false
189    }
190
191    /// Check whether the blob is Batch based.
192    fn is_batch(&self) -> bool {
193        false
194    }
195
196    /// Check whether need to validate the data chunk by digest value.
197    fn need_validation(&self) -> bool;
198
199    /// Get the [BlobReader](../backend/trait.BlobReader.html) to read data from storage backend.
200    fn reader(&self) -> &dyn BlobReader;
201
202    /// Get the underlying `ChunkMap` object.
203    fn get_chunk_map(&self) -> &Arc<dyn ChunkMap>;
204
205    /// Get the `BlobChunkInfo` object corresponding to `chunk_index`.
206    fn get_chunk_info(&self, chunk_index: u32) -> Option<Arc<dyn BlobChunkInfo>>;
207
208    /// Get a `BlobObject` instance to directly access uncompressed blob file.
209    fn get_blob_object(&self) -> Option<&dyn BlobObject> {
210        None
211    }
212
213    /// Enable prefetching blob data in background.
214    ///
215    /// It should be paired with stop_prefetch().
216    fn start_prefetch(&self) -> StorageResult<()>;
217
218    /// Stop prefetching blob data in background.
219    ///
220    /// It should be paired with start_prefetch().
221    fn stop_prefetch(&self) -> StorageResult<()>;
222
223    // Check whether data prefetch is still active.
224    fn is_prefetch_active(&self) -> bool;
225
226    /// Start to prefetch requested data in background.
227    fn prefetch(
228        &self,
229        cache: Arc<dyn BlobCache>,
230        prefetches: &[BlobPrefetchRequest],
231        bios: &[BlobIoDesc],
232    ) -> StorageResult<usize>;
233
234    /// Execute filesystem data prefetch.
235    fn prefetch_range(&self, _range: &BlobIoRange) -> Result<usize> {
236        Err(enosys!("doesn't support prefetch_range()"))
237    }
238
239    /// Read chunk data described by the blob Io descriptors from the blob cache into the buffer.
240    fn read(&self, iovec: &mut BlobIoVec, buffers: &[FileVolatileSlice]) -> Result<usize>;
241
242    /// Read multiple chunks from the blob cache in batch mode.
243    ///
244    /// This is an interface to optimize chunk data fetch performance by merging multiple continuous
245    /// chunks into one backend request. Callers must ensure that chunks in `chunks` covers a
246    /// continuous range, and the range exactly matches [`blob_offset`..`blob_offset` + `blob_size`].
247    /// Function `read_chunks_from_backend()` returns one buffer containing decompressed chunk data
248    /// for each entry in the `chunks` array in corresponding order.
249    ///
250    /// This method returns success only if all requested data are successfully fetched.
251    fn read_chunks_from_backend<'a, 'b>(
252        &'a self,
253        blob_offset: u64,
254        blob_size: usize,
255        chunks: &'b [Arc<dyn BlobChunkInfo>],
256        prefetch: bool,
257    ) -> Result<ChunkDecompressState<'a, 'b>>
258    where
259        Self: Sized,
260    {
261        // Read requested data from the backend by altogether.
262        let mut c_buf = alloc_buf(blob_size);
263        let start = Instant::now();
264        let nr_read = self
265            .reader()
266            .read(c_buf.as_mut_slice(), blob_offset)
267            .map_err(|e| eio!(e))?;
268        if nr_read != blob_size {
269            return Err(eio!(format!(
270                "request for {} bytes but got {} bytes",
271                blob_size, nr_read
272            )));
273        }
274        let duration = Instant::now().duration_since(start).as_millis();
275        debug!(
276            "read_chunks_from_backend: {} {} {} bytes at {}, duration {}ms",
277            std::thread::current().name().unwrap_or_default(),
278            if prefetch { "prefetch" } else { "fetch" },
279            blob_size,
280            blob_offset,
281            duration
282        );
283
284        let chunks = chunks.iter().map(|v| v.as_ref()).collect();
285        Ok(ChunkDecompressState::new(blob_offset, self, chunks, c_buf))
286    }
287
288    /// Read a whole chunk directly from the storage backend.
289    ///
290    /// The fetched chunk data may be compressed or encrypted or not, which depends on chunk information
291    /// from `chunk`. Moreover, chunk data from backend storage may be validated per user's configuration.
292    fn read_chunk_from_backend(
293        &self,
294        chunk: &dyn BlobChunkInfo,
295        buffer: &mut [u8],
296    ) -> Result<Option<Vec<u8>>> {
297        let start = Instant::now();
298        let offset = chunk.compressed_offset();
299        let mut c_buf = None;
300
301        if self.is_zran() || self.is_batch() {
302            return Err(enosys!("read_chunk_from_backend"));
303        } else if !chunk.is_compressed() && !chunk.is_encrypted() {
304            let size = self.reader().read(buffer, offset).map_err(|e| eio!(e))?;
305            if size != buffer.len() {
306                return Err(eio!("storage backend returns less data than requested"));
307            }
308        } else {
309            let c_size = if self.is_legacy_stargz() {
310                self.get_legacy_stargz_size(offset, buffer.len())?
311            } else {
312                chunk.compressed_size() as usize
313            };
314            let mut raw_buffer = alloc_buf(c_size);
315            let size = self
316                .reader()
317                .read(raw_buffer.as_mut_slice(), offset)
318                .map_err(|e| eio!(e))?;
319            if size != raw_buffer.len() {
320                return Err(eio!("storage backend returns less data than requested"));
321            }
322            let decrypted_buffer = crypt::decrypt_with_context(
323                &raw_buffer,
324                &self.blob_cipher_object(),
325                &self.blob_cipher_context(),
326                chunk.is_encrypted(),
327            )?;
328            self.decompress_chunk_data(&decrypted_buffer, buffer, chunk.is_compressed())?;
329            c_buf = Some(raw_buffer);
330        }
331
332        let duration = Instant::now().duration_since(start).as_millis();
333        debug!(
334            "read_chunk_from_backend: {} {} bytes at {}, duration {}ms",
335            std::thread::current().name().unwrap_or_default(),
336            chunk.compressed_size(),
337            chunk.compressed_offset(),
338            duration
339        );
340        self.validate_chunk_data(chunk, buffer, false)
341            .map_err(|e| {
342                warn!("failed to read data from backend, {}", e);
343                e
344            })?;
345
346        Ok(c_buf)
347    }
348
349    /// Decompress chunk data.
350    fn decompress_chunk_data(
351        &self,
352        raw_buffer: &[u8],
353        buffer: &mut [u8],
354        is_compressed: bool,
355    ) -> Result<()> {
356        if is_compressed {
357            let compressor = self.blob_compressor();
358            let ret = compress::decompress(raw_buffer, buffer, compressor).map_err(|e| {
359                error!("failed to decompress chunk: {}", e);
360                e
361            })?;
362            if ret != buffer.len() {
363                return Err(einval!(format!(
364                    "size of decompressed data doesn't match expected, {} vs {}, raw_buffer: {}",
365                    ret,
366                    buffer.len(),
367                    raw_buffer.len()
368                )));
369            }
370        } else if raw_buffer.as_ptr() != buffer.as_ptr() {
371            // raw_chunk and chunk may point to the same buffer, so only copy data when needed.
372            buffer.copy_from_slice(raw_buffer);
373        }
374        Ok(())
375    }
376
377    /// Validate chunk data.
378    fn validate_chunk_data(
379        &self,
380        chunk: &dyn BlobChunkInfo,
381        buffer: &[u8],
382        force_validation: bool,
383    ) -> Result<usize> {
384        let d_size = chunk.uncompressed_size() as usize;
385        if buffer.len() != d_size {
386            Err(eio!("uncompressed size and buffer size doesn't match"))
387        } else if (self.need_validation() || chunk.has_crc32() || force_validation)
388            && !self.is_legacy_stargz()
389            && !self.check_digest(chunk, buffer)
390        {
391            Err(std::io::Error::new(
392                std::io::ErrorKind::InvalidData,
393                "data digest value doesn't match",
394            ))
395        } else {
396            Ok(d_size)
397        }
398    }
399
400    fn check_digest(&self, chunk: &dyn BlobChunkInfo, buffer: &[u8]) -> bool {
401        if chunk.has_crc32() {
402            check_crc(buffer, chunk.crc32())
403        } else {
404            check_hash(buffer, chunk.chunk_id(), self.blob_digester())
405        }
406    }
407
408    fn get_blob_meta_info(&self) -> Result<Option<Arc<BlobCompressionContextInfo>>> {
409        Ok(None)
410    }
411}
412
413/// An iterator to enumerate decompressed data for chunks.
414pub struct ChunkDecompressState<'a, 'b> {
415    blob_offset: u64,
416    chunk_idx: usize,
417    batch_idx: u32,
418    zran_idx: u32,
419    cache: &'a dyn BlobCache,
420    chunks: Vec<&'b dyn BlobChunkInfo>,
421    c_buf: Vec<u8>,
422    d_buf: Vec<u8>,
423}
424
425impl<'a, 'b> ChunkDecompressState<'a, 'b> {
426    fn new(
427        blob_offset: u64,
428        cache: &'a dyn BlobCache,
429        chunks: Vec<&'b dyn BlobChunkInfo>,
430        c_buf: Vec<u8>,
431    ) -> Self {
432        ChunkDecompressState {
433            blob_offset,
434            chunk_idx: 0,
435            batch_idx: u32::MAX,
436            zran_idx: u32::MAX,
437            cache,
438            chunks,
439            c_buf,
440            d_buf: Vec::new(),
441        }
442    }
443
444    fn decompress_batch(
445        &mut self,
446        meta: &Arc<BlobCompressionContextInfo>,
447        c_offset: u64,
448    ) -> Result<()> {
449        let ctx = meta.get_batch_context(self.batch_idx)?;
450        let c_size = ctx.compressed_size() as u64;
451        let d_size = ctx.uncompressed_batch_size() as u64;
452        if c_offset < self.blob_offset
453            || c_offset.checked_add(c_size).is_none()
454            || c_offset + c_size > self.blob_offset + self.c_buf.len() as u64
455            || d_size > RAFS_MAX_CHUNK_SIZE
456        {
457            let msg = format!(
458                "invalid chunk: z_offset 0x{:x}, z_size 0x{:x}, c_offset 0x{:x}, c_size 0x{:x}, d_size 0x{:x}",
459                self.blob_offset,
460                self.c_buf.len(),
461                c_offset,
462                c_size,
463                d_size
464            );
465            return Err(einval!(msg));
466        }
467
468        let c_offset = (c_offset - self.blob_offset) as usize;
469        let input = &self.c_buf[c_offset..c_offset + c_size as usize];
470        let decrypted_buffer = crypt::decrypt_with_context(
471            input,
472            &self.cache.blob_cipher_object(),
473            &self.cache.blob_cipher_context(),
474            meta.state.is_encrypted(),
475        )?;
476        let mut output = alloc_buf(d_size as usize);
477
478        self.cache
479            .decompress_chunk_data(&decrypted_buffer, &mut output, c_size != d_size)?;
480
481        if output.len() != d_size as usize {
482            return Err(einval!(format!(
483                "decompressed data size doesn't match: {} vs {}",
484                output.len(),
485                d_size
486            )));
487        }
488
489        self.d_buf = output;
490
491        Ok(())
492    }
493
494    fn decompress_zran(&mut self, meta: &Arc<BlobCompressionContextInfo>) -> Result<()> {
495        let (ctx, dict) = meta.get_zran_context(self.zran_idx)?;
496        let c_offset = ctx.in_offset;
497        let c_size = ctx.in_len as u64;
498        if c_offset < self.blob_offset
499            || c_offset.checked_add(c_size).is_none()
500            || c_offset + c_size > self.blob_offset + self.c_buf.len() as u64
501            || ctx.out_len as u64 > RAFS_MAX_CHUNK_SIZE
502        {
503            let msg = format!(
504                "invalid chunk: z_offset 0x{:x}, z_size 0x{:x}, c_offset 0x{:x}, c_size 0x{:x}, d_size 0x{:x}",
505                self.blob_offset,
506                self.c_buf.len(),
507                c_offset,
508                c_size,
509                ctx.out_len
510            );
511            return Err(einval!(msg));
512        }
513
514        let c_offset = (c_offset - self.blob_offset) as usize;
515        let input = &self.c_buf[c_offset..c_offset + c_size as usize];
516        let mut output = alloc_buf(ctx.out_len as usize);
517        let mut decoder = ZranDecoder::new()?;
518        decoder.uncompress(&ctx, Some(dict), input, &mut output)?;
519        self.d_buf = output;
520
521        Ok(())
522    }
523
524    fn next_batch(&mut self, chunk: &dyn BlobChunkInfo) -> Result<Vec<u8>> {
525        // If the chunk is not a batch chunk, decompress it as normal.
526        if !chunk.is_batch() {
527            return self.next_buf(chunk);
528        }
529
530        let meta = self
531            .cache
532            .get_blob_meta_info()?
533            .ok_or_else(|| einval!("failed to get blob meta object for Batch"))?;
534
535        let batch_idx = meta.get_batch_index(chunk.id())?;
536        if batch_idx != self.batch_idx {
537            self.batch_idx = batch_idx;
538            self.decompress_batch(&meta, chunk.compressed_offset())?;
539        }
540        let offset = meta.get_uncompressed_offset_in_batch_buf(chunk.id())? as usize;
541        let end = offset + chunk.uncompressed_size() as usize;
542        if end > self.d_buf.len() {
543            return Err(einval!(format!(
544                "invalid Batch decompression status, end: {}, len: {}",
545                end,
546                self.d_buf.len()
547            )));
548        }
549
550        // Use alloc_buf here to ensure 4k alignment for later use
551        // in adjust_buffer_for_dio.
552        let mut buffer = alloc_buf(chunk.uncompressed_size() as usize);
553        buffer.copy_from_slice(&self.d_buf[offset as usize..end]);
554        Ok(buffer)
555    }
556
557    fn next_zran(&mut self, chunk: &dyn BlobChunkInfo) -> Result<Vec<u8>> {
558        let meta = self
559            .cache
560            .get_blob_meta_info()?
561            .ok_or_else(|| einval!("failed to get blob meta object for ZRan"))?;
562        let zran_idx = meta.get_zran_index(chunk.id())?;
563        if zran_idx != self.zran_idx {
564            self.zran_idx = zran_idx;
565            self.decompress_zran(&meta)?;
566        }
567        let offset = meta.get_zran_offset(chunk.id())? as usize;
568        let end = offset + chunk.uncompressed_size() as usize;
569        if end > self.d_buf.len() {
570            return Err(einval!("invalid ZRan decompression status"));
571        }
572        // Use alloc_buf here to ensure 4k alignment for later use
573        // in adjust_buffer_for_dio.
574        let mut buffer = alloc_buf(chunk.uncompressed_size() as usize);
575        buffer.copy_from_slice(&self.d_buf[offset as usize..end]);
576        Ok(buffer)
577    }
578
579    fn next_buf(&mut self, chunk: &dyn BlobChunkInfo) -> Result<Vec<u8>> {
580        let c_offset = chunk.compressed_offset();
581        let c_size = chunk.compressed_size();
582        let d_size = chunk.uncompressed_size() as usize;
583        if c_offset < self.blob_offset
584            || c_offset - self.blob_offset > usize::MAX as u64
585            || c_offset.checked_add(c_size as u64).is_none()
586            || c_offset + c_size as u64 > self.blob_offset + self.c_buf.len() as u64
587            || d_size as u64 > RAFS_MAX_CHUNK_SIZE
588        {
589            let msg = format!(
590                "invalid chunk info: c_offset 0x{:x}, c_size 0x{:x}, d_size 0x{:x}, blob_offset 0x{:x}",
591                c_offset, c_size, d_size, self.blob_offset
592            );
593            return Err(eio!(msg));
594        }
595
596        let offset_merged = (c_offset - self.blob_offset) as usize;
597        let end_merged = offset_merged + c_size as usize;
598        let decrypted_buffer = crypt::decrypt_with_context(
599            &self.c_buf[offset_merged..end_merged],
600            &self.cache.blob_cipher_object(),
601            &self.cache.blob_cipher_context(),
602            chunk.is_encrypted(),
603        )?;
604        let mut buffer = alloc_buf(d_size);
605        self.cache
606            .decompress_chunk_data(&decrypted_buffer, &mut buffer, chunk.is_compressed())?;
607        self.cache
608            .validate_chunk_data(chunk, &buffer, false)
609            .map_err(|e| {
610                warn!("failed to read data from backend, {}", e);
611                e
612            })?;
613        Ok(buffer)
614    }
615
616    /// Get an immutable reference to the compressed data buffer.
617    pub fn compressed_buf(&self) -> &[u8] {
618        &self.c_buf
619    }
620}
621
622impl Iterator for ChunkDecompressState<'_, '_> {
623    type Item = Result<Vec<u8>>;
624
625    fn next(&mut self) -> Option<Self::Item> {
626        if self.chunk_idx >= self.chunks.len() {
627            return None;
628        }
629
630        let cache = self.cache;
631        let chunk = self.chunks[self.chunk_idx];
632        self.chunk_idx += 1;
633        let res = if cache.is_batch() {
634            self.next_batch(chunk)
635        } else if cache.is_zran() {
636            self.next_zran(chunk)
637        } else {
638            self.next_buf(chunk)
639        };
640        Some(res)
641    }
642}
643
644/// Trait representing blob manager to manage a group of [BlobCache](trait.BlobCache.html) objects.
645///
646/// The main responsibility of the blob cache manager is to create blob cache objects for blobs,
647/// all IO requests should be issued to the blob cache object directly.
648pub(crate) trait BlobCacheMgr: Send + Sync {
649    /// Initialize the blob cache manager.
650    fn init(&self) -> Result<()>;
651
652    /// Tear down the blob cache manager.
653    fn destroy(&self);
654
655    /// Garbage-collect unused resources.
656    ///
657    /// Return true if the blob cache manager itself should be garbage-collected.
658    fn gc(&self, _id: Option<&str>) -> bool;
659
660    /// Get the underlying `BlobBackend` object of the blob cache object.
661    fn backend(&self) -> &dyn BlobBackend;
662
663    /// Get the blob cache to provide access to the `blob` object.
664    fn get_blob_cache(&self, blob_info: &Arc<BlobInfo>) -> Result<Arc<dyn BlobCache>>;
665
666    /// Check the blob cache data status, if data all ready stop prefetch workers.
667    fn check_stat(&self);
668}
669
670#[cfg(feature = "dedup")]
671pub use dedup::CasMgr;
672
673#[cfg(not(feature = "dedup"))]
674pub struct CasMgr {}
675
676#[cfg(test)]
677mod tests {
678    use crate::device::{BlobChunkFlags, BlobFeatures};
679    use crate::test::MockChunkInfo;
680
681    use super::*;
682
683    #[test]
684    fn test_io_merge_state_new() {
685        let blob_info = Arc::new(BlobInfo::new(
686            1,
687            "test1".to_owned(),
688            0x200000,
689            0x100000,
690            0x100000,
691            512,
692            BlobFeatures::_V5_NO_EXT_BLOB_TABLE,
693        ));
694        let chunk1 = Arc::new(MockChunkInfo {
695            block_id: Default::default(),
696            blob_index: 1,
697            flags: BlobChunkFlags::empty(),
698            compress_size: 0x800,
699            uncompress_size: 0x1000,
700            compress_offset: 0,
701            uncompress_offset: 0,
702            file_offset: 0,
703            index: 0,
704            crc32: 0,
705        }) as Arc<dyn BlobChunkInfo>;
706        let chunk2 = Arc::new(MockChunkInfo {
707            block_id: Default::default(),
708            blob_index: 1,
709            flags: BlobChunkFlags::empty(),
710            compress_size: 0x800,
711            uncompress_size: 0x1000,
712            compress_offset: 0x800,
713            uncompress_offset: 0x1000,
714            file_offset: 0x1000,
715            index: 1,
716            crc32: 0,
717        }) as Arc<dyn BlobChunkInfo>;
718        let chunk3 = Arc::new(MockChunkInfo {
719            block_id: Default::default(),
720            blob_index: 1,
721            flags: BlobChunkFlags::empty(),
722            compress_size: 0x800,
723            uncompress_size: 0x1000,
724            compress_offset: 0x1000,
725            uncompress_offset: 0x1000,
726            file_offset: 0x1000,
727            index: 1,
728            crc32: 0,
729        }) as Arc<dyn BlobChunkInfo>;
730
731        let cb = |_merged| {};
732        let desc1 = BlobIoDesc {
733            blob: blob_info.clone(),
734            chunkinfo: chunk1.into(),
735            offset: 0,
736            size: 0x1000,
737            user_io: true,
738        };
739        let mut state = BlobIoMergeState::new(&desc1, cb);
740        assert_eq!(state.size(), 0x800);
741        assert_eq!(state.bios.len(), 1);
742
743        let desc2 = BlobIoDesc {
744            blob: blob_info.clone(),
745            chunkinfo: chunk2.into(),
746            offset: 0,
747            size: 0x1000,
748            user_io: true,
749        };
750        state.push(&desc2);
751        assert_eq!(state.size, 0x1000);
752        assert_eq!(state.bios.len(), 2);
753
754        state.issue(0);
755        assert_eq!(state.size(), 0x0);
756        assert_eq!(state.bios.len(), 0);
757
758        let desc3 = BlobIoDesc {
759            blob: blob_info,
760            chunkinfo: chunk3.into(),
761            offset: 0,
762            size: 0x1000,
763            user_io: true,
764        };
765        state.push(&desc3);
766        assert_eq!(state.size, 0x800);
767        assert_eq!(state.bios.len(), 1);
768
769        state.issue(0);
770        assert_eq!(state.size(), 0x0);
771        assert_eq!(state.bios.len(), 0);
772
773        let mut count = 0;
774        BlobIoMergeState::merge_and_issue(
775            &[desc1.clone(), desc2.clone(), desc3.clone()],
776            0x4000,
777            0x0,
778            |_v| count += 1,
779        );
780        assert_eq!(count, 1);
781
782        let mut count = 0;
783        BlobIoMergeState::merge_and_issue(
784            &[desc1.clone(), desc2.clone(), desc3.clone()],
785            0x1000,
786            0x0,
787            |_v| count += 1,
788        );
789        assert_eq!(count, 2);
790
791        let mut count = 0;
792        BlobIoMergeState::merge_and_issue(&[desc1.clone(), desc3.clone()], 0x4000, 0x0, |_v| {
793            count += 1
794        });
795        assert_eq!(count, 2);
796
797        assert!(desc1.is_continuous(&desc2, 0));
798        assert!(!desc1.is_continuous(&desc3, 0));
799    }
800}