Skip to main content

nydus_storage/cache/filecache/
mod.rs

1// Copyright 2020 Ant Group. All rights reserved.
2// Copyright (C) 2021 Alibaba Cloud. All rights reserved.
3//
4// SPDX-License-Identifier: Apache-2.0
5
6use std::collections::HashMap;
7use std::fs::OpenOptions;
8use std::io::Result;
9use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
10use std::sync::{Arc, RwLock};
11
12use tokio::runtime::Runtime;
13
14use nydus_api::CacheConfigV2;
15use nydus_utils::crypt;
16use nydus_utils::metrics::BlobcacheMetrics;
17
18use crate::backend::BlobBackend;
19use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta};
20use crate::cache::state::{
21    BlobStateMap, ChunkMap, DigestedChunkMap, IndexedChunkMap, NoopChunkMap,
22};
23use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
24#[cfg(feature = "dedup")]
25use crate::cache::CasMgr;
26use crate::cache::{BlobCache, BlobCacheMgr};
27use crate::device::{BlobFeatures, BlobInfo};
28use crate::utils::get_path_from_file;
29
30pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw";
31pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data";
32
33/// An implementation of [BlobCacheMgr](../trait.BlobCacheMgr.html) to improve performance by
34/// caching uncompressed blob with local storage.
35#[derive(Clone)]
36pub struct FileCacheMgr {
37    blobs: Arc<RwLock<HashMap<String, Arc<FileCacheEntry>>>>,
38    backend: Arc<dyn BlobBackend>,
39    metrics: Arc<BlobcacheMetrics>,
40    prefetch_config: Arc<AsyncPrefetchConfig>,
41    runtime: Arc<Runtime>,
42    worker_mgr: Arc<AsyncWorkerMgr>,
43    work_dir: String,
44    validate: bool,
45    disable_indexed_map: bool,
46    cache_raw_data: bool,
47    cache_encrypted: bool,
48    cache_convergent_encryption: bool,
49    cache_encryption_key: String,
50    closed: Arc<AtomicBool>,
51    user_io_batch_size: u32,
52}
53
54impl FileCacheMgr {
55    /// Create a new instance of `FileCacheMgr`.
56    pub fn new(
57        config: &CacheConfigV2,
58        backend: Arc<dyn BlobBackend>,
59        runtime: Arc<Runtime>,
60        id: &str,
61        user_io_batch_size: u32,
62    ) -> Result<FileCacheMgr> {
63        let blob_cfg = config.get_filecache_config()?;
64        let work_dir = blob_cfg.get_work_dir()?;
65        let metrics = BlobcacheMetrics::new(id, work_dir);
66        let prefetch_config: Arc<AsyncPrefetchConfig> = Arc::new((&config.prefetch).into());
67        let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?;
68
69        Ok(FileCacheMgr {
70            blobs: Arc::new(RwLock::new(HashMap::new())),
71            backend,
72            metrics,
73            prefetch_config,
74            runtime,
75            worker_mgr: Arc::new(worker_mgr),
76            work_dir: work_dir.to_owned(),
77            disable_indexed_map: blob_cfg.disable_indexed_map,
78            validate: config.cache_validate,
79            cache_raw_data: config.cache_compressed,
80            cache_encrypted: blob_cfg.enable_encryption,
81            cache_convergent_encryption: blob_cfg.enable_convergent_encryption,
82            cache_encryption_key: blob_cfg.encryption_key.clone(),
83            closed: Arc::new(AtomicBool::new(false)),
84            user_io_batch_size,
85        })
86    }
87
88    // Get the file cache entry for the specified blob object.
89    fn get(&self, blob: &Arc<BlobInfo>) -> Option<Arc<FileCacheEntry>> {
90        self.blobs.read().unwrap().get(&blob.blob_id()).cloned()
91    }
92
93    // Create a file cache entry for the specified blob object if not present, otherwise
94    // return the existing one.
95    fn get_or_create_cache_entry(&self, blob: &Arc<BlobInfo>) -> Result<Arc<FileCacheEntry>> {
96        if let Some(entry) = self.get(blob) {
97            return Ok(entry);
98        }
99
100        let entry = FileCacheEntry::new_file_cache(
101            self,
102            blob.clone(),
103            self.prefetch_config.clone(),
104            self.runtime.clone(),
105            self.worker_mgr.clone(),
106        )?;
107        let entry = Arc::new(entry);
108        let mut guard = self.blobs.write().unwrap();
109        if let Some(entry) = guard.get(&blob.blob_id()) {
110            Ok(entry.clone())
111        } else {
112            let blob_id = blob.blob_id();
113            guard.insert(blob_id.clone(), entry.clone());
114            self.metrics
115                .underlying_files
116                .lock()
117                .unwrap()
118                .insert(blob_id + BLOB_DATA_FILE_SUFFIX);
119            Ok(entry)
120        }
121    }
122}
123
124impl BlobCacheMgr for FileCacheMgr {
125    fn init(&self) -> Result<()> {
126        AsyncWorkerMgr::start(self.worker_mgr.clone())
127    }
128
129    fn destroy(&self) {
130        if !self.closed.load(Ordering::Acquire) {
131            self.closed.store(true, Ordering::Release);
132            self.worker_mgr.stop();
133            self.backend().shutdown();
134            self.metrics.release().unwrap_or_else(|e| error!("{:?}", e));
135        }
136    }
137
138    fn gc(&self, id: Option<&str>) -> bool {
139        let mut reclaim = Vec::new();
140
141        if let Some(blob_id) = id {
142            reclaim.push(blob_id.to_string());
143        } else {
144            let guard = self.blobs.write().unwrap();
145            for (id, entry) in guard.iter() {
146                if Arc::strong_count(entry) == 1 {
147                    reclaim.push(id.to_owned());
148                }
149            }
150        }
151
152        for key in reclaim.iter() {
153            let mut guard = self.blobs.write().unwrap();
154            if let Some(entry) = guard.get(key) {
155                if Arc::strong_count(entry) == 1 {
156                    guard.remove(key);
157                }
158            }
159        }
160
161        self.blobs.read().unwrap().is_empty()
162    }
163
164    fn backend(&self) -> &dyn BlobBackend {
165        self.backend.as_ref()
166    }
167
168    fn get_blob_cache(&self, blob_info: &Arc<BlobInfo>) -> Result<Arc<dyn BlobCache>> {
169        self.get_or_create_cache_entry(blob_info)
170            .map(|v| v as Arc<dyn BlobCache>)
171    }
172
173    fn check_stat(&self) {}
174}
175
176impl Drop for FileCacheMgr {
177    fn drop(&mut self) {
178        self.destroy();
179    }
180}
181
182impl FileCacheEntry {
183    fn new_file_cache(
184        mgr: &FileCacheMgr,
185        blob_info: Arc<BlobInfo>,
186        prefetch_config: Arc<AsyncPrefetchConfig>,
187        runtime: Arc<Runtime>,
188        workers: Arc<AsyncWorkerMgr>,
189    ) -> Result<Self> {
190        let is_separate_meta = blob_info.has_feature(BlobFeatures::SEPARATE);
191        let is_tarfs = blob_info.features().is_tarfs();
192        let is_batch = blob_info.has_feature(BlobFeatures::BATCH);
193        let is_zran = blob_info.has_feature(BlobFeatures::ZRAN);
194        let blob_id = blob_info.blob_id();
195        let blob_meta_id = if is_separate_meta {
196            blob_info.get_blob_meta_id()?
197        } else {
198            blob_id.clone()
199        };
200        let reader = mgr
201            .backend
202            .get_reader(&blob_id)
203            .map_err(|e| eio!(format!("failed to get reader for blob {}, {}", blob_id, e)))?;
204        let blob_meta_reader = if is_separate_meta {
205            mgr.backend.get_reader(&blob_meta_id).map_err(|e| {
206                eio!(format!(
207                    "failed to get reader for blob.meta {}, {}",
208                    blob_id, e
209                ))
210            })?
211        } else {
212            reader.clone()
213        };
214
215        // Turn off chunk deduplication in case of cache data encryption is enabled or is tarfs.
216        let cas_mgr = if mgr.cache_encrypted || mgr.cache_raw_data || is_tarfs {
217            warn!("chunk deduplication trun off");
218            None
219        } else {
220            #[cfg(feature = "dedup")]
221            {
222                CasMgr::get_singleton()
223            }
224            #[cfg(not(feature = "dedup"))]
225            None
226        };
227
228        let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?;
229        let blob_uncompressed_size = blob_info.uncompressed_size();
230        let is_legacy_stargz = blob_info.is_legacy_stargz();
231
232        let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id);
233        let (
234            file,
235            meta,
236            chunk_map,
237            is_direct_chunkmap,
238            is_get_blob_object_supported,
239            need_validation,
240        ) = if is_tarfs {
241            let file = OpenOptions::new()
242                .create(false)
243                .write(false)
244                .read(true)
245                .open(blob_file_path)?;
246            let chunk_map =
247                Arc::new(BlobStateMap::from(NoopChunkMap::new(true))) as Arc<dyn ChunkMap>;
248            (file, None, chunk_map, true, true, false)
249        } else {
250            let (chunk_map, is_direct_chunkmap) =
251                Self::create_chunk_map(mgr, &blob_info, &blob_file_path)?;
252            // Validation is supported by RAFS v5 (which has no meta_ci) or v6 with chunk digest array.
253            let validation_supported = !blob_info.meta_ci_is_valid()
254                || blob_info.has_feature(BlobFeatures::INLINED_CHUNK_DIGEST);
255            let need_validation = ((mgr.validate && validation_supported) || !is_direct_chunkmap)
256                && !is_legacy_stargz;
257            // Set cache file to its expected size.
258            let suffix = if mgr.cache_raw_data {
259                BLOB_RAW_FILE_SUFFIX
260            } else {
261                BLOB_DATA_FILE_SUFFIX
262            };
263            let blob_data_file_path = blob_file_path.clone() + suffix;
264            let file = OpenOptions::new()
265                .create(true)
266                .truncate(false)
267                .write(true)
268                .read(true)
269                .open(blob_data_file_path)?;
270            let file_size = file.metadata()?.len();
271            let cached_file_size = if mgr.cache_raw_data {
272                blob_info.compressed_data_size()
273            } else {
274                blob_info.uncompressed_size()
275            };
276            if file_size == 0 || file_size < cached_file_size {
277                file.set_len(cached_file_size)?;
278            } else if cached_file_size != 0 && file_size != cached_file_size {
279                let msg = format!(
280                    "blob data file size doesn't match: got 0x{:x}, expect 0x{:x}",
281                    file_size, cached_file_size
282                );
283                return Err(einval!(msg));
284            }
285            let load_chunk_digest = need_validation || cas_mgr.is_some();
286            let meta = if blob_info.meta_ci_is_valid()
287                || blob_info.has_feature(BlobFeatures::IS_CHUNKDICT_GENERATED)
288            {
289                let meta = FileCacheMeta::new(
290                    blob_file_path,
291                    blob_info.clone(),
292                    Some(blob_meta_reader),
293                    Some(runtime.clone()),
294                    false,
295                    load_chunk_digest,
296                )?;
297                Some(meta)
298            } else {
299                None
300            };
301            let is_get_blob_object_supported = meta.is_some() && is_direct_chunkmap;
302            (
303                file,
304                meta,
305                chunk_map,
306                is_direct_chunkmap,
307                is_get_blob_object_supported,
308                need_validation,
309            )
310        };
311
312        let (cache_cipher_object, cache_cipher_context) = if mgr.cache_encrypted {
313            let key = hex::decode(mgr.cache_encryption_key.clone())
314                .map_err(|_e| einval!("invalid cache file encryption key"))?;
315            let cipher = crypt::Algorithm::Aes128Xts.new_cipher()?;
316            let ctx = crypt::CipherContext::new(
317                key,
318                [0u8; 16].to_vec(),
319                mgr.cache_convergent_encryption,
320                crypt::Algorithm::Aes128Xts,
321            )?;
322            (Arc::new(cipher), Arc::new(ctx))
323        } else {
324            (Default::default(), Default::default())
325        };
326
327        let mut blob_data_file_path = String::new();
328        if cas_mgr.is_some() {
329            blob_data_file_path = if let Some(path) = get_path_from_file(&file) {
330                path
331            } else {
332                warn!("can't get path from file");
333                "".to_string()
334            }
335        }
336
337        trace!(
338            "filecache entry: is_raw_data {}, direct {}, legacy_stargz {}, separate_meta {}, tarfs {}, batch {}, zran {}",
339            mgr.cache_raw_data,
340            is_direct_chunkmap,
341            is_legacy_stargz,
342            is_separate_meta,
343            is_tarfs,
344            is_batch,
345            is_zran,
346        );
347        Ok(FileCacheEntry {
348            blob_id,
349            blob_info,
350            cache_cipher_object,
351            cache_cipher_context,
352            cas_mgr,
353            chunk_map,
354            file: Arc::new(file),
355            file_path: Arc::new(blob_data_file_path),
356            meta,
357            metrics: mgr.metrics.clone(),
358            prefetch_state: Arc::new(AtomicU32::new(0)),
359            reader,
360            runtime,
361            workers,
362
363            blob_compressed_size,
364            blob_uncompressed_size,
365            is_get_blob_object_supported,
366            is_raw_data: mgr.cache_raw_data,
367            is_cache_encrypted: mgr.cache_encrypted,
368            is_direct_chunkmap,
369            is_legacy_stargz,
370            is_tarfs,
371            is_batch,
372            is_zran,
373            dio_enabled: false,
374            need_validation,
375            user_io_batch_size: mgr.user_io_batch_size,
376            prefetch_config,
377        })
378    }
379
380    fn create_chunk_map(
381        mgr: &FileCacheMgr,
382        blob_info: &BlobInfo,
383        blob_file: &str,
384    ) -> Result<(Arc<dyn ChunkMap>, bool)> {
385        // The builder now records the number of chunks in the blob table, so we can
386        // use IndexedChunkMap as a chunk map, but for the old Nydus bootstrap, we
387        // need downgrade to use DigestedChunkMap as a compatible solution.
388        let is_v5 = !blob_info.meta_ci_is_valid();
389        let mut direct_chunkmap = true;
390        let chunk_map: Arc<dyn ChunkMap> = if (is_v5 && mgr.disable_indexed_map)
391            || blob_info.has_feature(BlobFeatures::_V5_NO_EXT_BLOB_TABLE)
392        {
393            direct_chunkmap = false;
394            Arc::new(BlobStateMap::from(DigestedChunkMap::new()))
395        } else {
396            Arc::new(BlobStateMap::from(IndexedChunkMap::new(
397                &format!("{}{}", blob_file, BLOB_DATA_FILE_SUFFIX),
398                blob_info.chunk_count(),
399                true,
400            )?))
401        };
402
403        Ok((chunk_map, direct_chunkmap))
404    }
405}
406
407#[cfg(test)]
408pub mod blob_cache_tests {
409    use nydus_api::FileCacheConfig;
410    use vmm_sys_util::tempdir::TempDir;
411    use vmm_sys_util::tempfile::TempFile;
412
413    #[test]
414    fn test_blob_cache_config() {
415        // new blob cache
416        let tmp_dir = TempDir::new().unwrap();
417        let dir = tmp_dir.as_path().to_path_buf();
418        let s = format!(
419            r###"
420        {{
421            "work_dir": {:?}
422        }}
423        "###,
424            dir
425        );
426
427        let mut blob_config: FileCacheConfig = serde_json::from_str(&s).unwrap();
428        assert!(!blob_config.disable_indexed_map);
429        assert_eq!(blob_config.work_dir, dir.to_str().unwrap());
430
431        let tmp_file = TempFile::new().unwrap();
432        let file = tmp_file.as_path().to_path_buf();
433        blob_config.work_dir = file.to_str().unwrap().to_owned();
434        assert!(blob_config.get_work_dir().is_err());
435    }
436
437    /*
438       #[test]
439       fn test_add() {
440           // new blob cache
441           let tmp_dir = TempDir::new().unwrap();
442           let s = format!(
443               r###"
444           {{
445               "work_dir": {:?}
446           }}
447           "###,
448               tmp_dir.as_path().to_path_buf().join("cache"),
449           );
450
451           let cache_config = CacheConfig {
452               cache_validate: true,
453               cache_compressed: false,
454               cache_type: String::from("blobcache"),
455               cache_config: serde_json::from_str(&s).unwrap(),
456               prefetch_config: BlobPrefetchConfig::default(),
457           };
458           let blob_cache = filecache::new(
459               cache_config,
460               Arc::new(MockBackend {
461                   metrics: BackendMetrics::new("id", "mock"),
462               }) as Arc<dyn BlobBackend + Send + Sync>,
463               compress::Algorithm::Lz4Block,
464               digest::Algorithm::Blake3,
465               "id",
466           )
467           .unwrap();
468
469           // generate backend data
470           let mut expect = vec![1u8; 100];
471           let blob_id = "blobcache";
472           blob_cache
473               .backend
474               .read(blob_id, expect.as_mut(), 0)
475               .unwrap();
476
477           // generate chunk and bio
478           let mut chunk = MockChunkInfo::new();
479           chunk.block_id = RafsDigest::from_buf(&expect, digest::Algorithm::Blake3);
480           chunk.file_offset = 0;
481           chunk.compress_offset = 0;
482           chunk.compress_size = 100;
483           chunk.decompress_offset = 0;
484           chunk.decompress_size = 100;
485           let bio = BlobIoDesc::new(
486               Arc::new(chunk),
487               Arc::new(BlobInfo {
488                   chunk_count: 0,
489                   readahead_offset: 0,
490                   readahead_size: 0,
491                   blob_id: blob_id.to_string(),
492                   blob_index: 0,
493                   blob_decompressed_size: 0,
494                   blob_compressed_size: 0,
495               }),
496               50,
497               50,
498               RAFS_DEFAULT_BLOCK_SIZE as u32,
499               true,
500           );
501
502           // read from cache
503           let r1 = unsafe {
504               let layout = Layout::from_size_align(50, 1).unwrap();
505               let ptr = alloc_zeroed(layout);
506               let vs = VolatileSlice::new(ptr, 50);
507               blob_cache.read(&mut [bio.clone()], &[vs]).unwrap();
508               Vec::from(from_raw_parts(ptr, 50))
509           };
510
511           let r2 = unsafe {
512               let layout = Layout::from_size_align(50, 1).unwrap();
513               let ptr = alloc_zeroed(layout);
514               let vs = VolatileSlice::new(ptr, 50);
515               blob_cache.read(&mut [bio], &[vs]).unwrap();
516               Vec::from(from_raw_parts(ptr, 50))
517           };
518
519           assert_eq!(r1, &expect[50..]);
520           assert_eq!(r2, &expect[50..]);
521       }
522
523       #[test]
524       fn test_merge_bio() {
525           let tmp_dir = TempDir::new().unwrap();
526           let s = format!(
527               r###"
528           {{
529               "work_dir": {:?}
530           }}
531           "###,
532               tmp_dir.as_path().to_path_buf().join("cache"),
533           );
534
535           let cache_config = CacheConfig {
536               cache_validate: true,
537               cache_compressed: false,
538               cache_type: String::from("blobcache"),
539               cache_config: serde_json::from_str(&s).unwrap(),
540               prefetch_worker: BlobPrefetchConfig::default(),
541           };
542
543           let blob_cache = filecache::new(
544               cache_config,
545               Arc::new(MockBackend {
546                   metrics: BackendMetrics::new("id", "mock"),
547               }) as Arc<dyn BlobBackend + Send + Sync>,
548               compress::Algorithm::Lz4Block,
549               digest::Algorithm::Blake3,
550               "id",
551           )
552           .unwrap();
553
554           let merging_size: u64 = 128 * 1024 * 1024;
555
556           let single_chunk = MockChunkInfo {
557               compress_offset: 1000,
558               compress_size: merging_size as u32 - 1,
559               ..Default::default()
560           };
561
562           let bio = BlobIoDesc::new(
563               Arc::new(single_chunk.clone()),
564               Arc::new(BlobInfo {
565                   chunk_count: 0,
566                   readahead_offset: 0,
567                   readahead_size: 0,
568                   blob_id: "1".to_string(),
569                   blob_index: 0,
570                   blob_decompressed_size: 0,
571                   blob_compressed_size: 0,
572               }),
573               50,
574               50,
575               RAFS_DEFAULT_BLOCK_SIZE as u32,
576               true,
577           );
578
579           let (mut send, recv) = spmc::channel::<MergedBackendRequest>();
580           let mut bios = vec![bio];
581
582           blob_cache.generate_merged_requests_for_prefetch(
583               &mut bios,
584               &mut send,
585               merging_size as usize,
586           );
587           let mr = recv.recv().unwrap();
588
589           assert_eq!(mr.blob_offset, single_chunk.compress_offset());
590           assert_eq!(mr.blob_size, single_chunk.compress_size());
591
592           // ---
593           let chunk1 = MockChunkInfo {
594               compress_offset: 1000,
595               compress_size: merging_size as u32 - 2000,
596               ..Default::default()
597           };
598
599           let bio1 = BlobIoDesc::new(
600               Arc::new(chunk1.clone()),
601               Arc::new(BlobInfo {
602                   chunk_count: 0,
603                   readahead_offset: 0,
604                   readahead_size: 0,
605                   blob_id: "1".to_string(),
606                   blob_index: 0,
607                   blob_decompressed_size: 0,
608                   blob_compressed_size: 0,
609               }),
610               50,
611               50,
612               RAFS_DEFAULT_BLOCK_SIZE as u32,
613               true,
614           );
615
616           let chunk2 = MockChunkInfo {
617               compress_offset: 1000 + merging_size - 2000,
618               compress_size: 200,
619               ..Default::default()
620           };
621
622           let bio2 = BlobIoDesc::new(
623               Arc::new(chunk2.clone()),
624               Arc::new(BlobInfo {
625                   chunk_count: 0,
626                   readahead_offset: 0,
627                   readahead_size: 0,
628                   blob_id: "1".to_string(),
629                   blob_index: 0,
630                   blob_decompressed_size: 0,
631                   blob_compressed_size: 0,
632               }),
633               50,
634               50,
635               RAFS_DEFAULT_BLOCK_SIZE as u32,
636               true,
637           );
638
639           let mut bios = vec![bio1, bio2];
640           let (mut send, recv) = spmc::channel::<MergedBackendRequest>();
641           blob_cache.generate_merged_requests_for_prefetch(
642               &mut bios,
643               &mut send,
644               merging_size as usize,
645           );
646           let mr = recv.recv().unwrap();
647
648           assert_eq!(mr.blob_offset, chunk1.compress_offset());
649           assert_eq!(
650               mr.blob_size,
651               chunk1.compress_size() + chunk2.compress_size()
652           );
653
654           // ---
655           let chunk1 = MockChunkInfo {
656               compress_offset: 1000,
657               compress_size: merging_size as u32 - 2000,
658               ..Default::default()
659           };
660
661           let bio1 = BlobIoDesc::new(
662               Arc::new(chunk1.clone()),
663               Arc::new(BlobInfo {
664                   chunk_count: 0,
665                   readahead_offset: 0,
666                   readahead_size: 0,
667                   blob_id: "1".to_string(),
668                   blob_index: 0,
669                   blob_decompressed_size: 0,
670                   blob_compressed_size: 0,
671               }),
672               50,
673               50,
674               RAFS_DEFAULT_BLOCK_SIZE as u32,
675               true,
676           );
677
678           let chunk2 = MockChunkInfo {
679               compress_offset: 1000 + merging_size - 2000 + 1,
680               compress_size: 200,
681               ..Default::default()
682           };
683
684           let bio2 = BlobIoDesc::new(
685               Arc::new(chunk2.clone()),
686               Arc::new(BlobInfo {
687                   chunk_count: 0,
688                   readahead_offset: 0,
689                   readahead_size: 0,
690                   blob_id: "1".to_string(),
691                   blob_index: 0,
692                   blob_decompressed_size: 0,
693                   blob_compressed_size: 0,
694               }),
695               50,
696               50,
697               RAFS_DEFAULT_BLOCK_SIZE as u32,
698               true,
699           );
700
701           let mut bios = vec![bio1, bio2];
702           let (mut send, recv) = spmc::channel::<MergedBackendRequest>();
703           blob_cache.generate_merged_requests_for_prefetch(
704               &mut bios,
705               &mut send,
706               merging_size as usize,
707           );
708
709           let mr = recv.recv().unwrap();
710           assert_eq!(mr.blob_offset, chunk1.compress_offset());
711           assert_eq!(mr.blob_size, chunk1.compress_size());
712
713           let mr = recv.recv().unwrap();
714           assert_eq!(mr.blob_offset, chunk2.compress_offset());
715           assert_eq!(mr.blob_size, chunk2.compress_size());
716
717           // ---
718           let chunk1 = MockChunkInfo {
719               compress_offset: 1000,
720               compress_size: merging_size as u32 - 2000,
721               ..Default::default()
722           };
723
724           let bio1 = BlobIoDesc::new(
725               Arc::new(chunk1.clone()),
726               Arc::new(BlobInfo {
727                   chunk_count: 0,
728                   readahead_offset: 0,
729                   readahead_size: 0,
730                   blob_id: "1".to_string(),
731                   blob_index: 0,
732                   blob_decompressed_size: 0,
733                   blob_compressed_size: 0,
734               }),
735               50,
736               50,
737               RAFS_DEFAULT_BLOCK_SIZE as u32,
738               true,
739           );
740
741           let chunk2 = MockChunkInfo {
742               compress_offset: 1000 + merging_size - 2000,
743               compress_size: 200,
744               ..Default::default()
745           };
746
747           let bio2 = BlobIoDesc::new(
748               Arc::new(chunk2.clone()),
749               Arc::new(BlobInfo {
750                   chunk_count: 0,
751                   readahead_offset: 0,
752                   readahead_size: 0,
753                   blob_id: "2".to_string(),
754                   blob_index: 0,
755                   blob_decompressed_size: 0,
756                   blob_compressed_size: 0,
757               }),
758               50,
759               50,
760               RAFS_DEFAULT_BLOCK_SIZE as u32,
761               true,
762           );
763
764           let mut bios = vec![bio1, bio2];
765           let (mut send, recv) = spmc::channel::<MergedBackendRequest>();
766           blob_cache.generate_merged_requests_for_prefetch(
767               &mut bios,
768               &mut send,
769               merging_size as usize,
770           );
771
772           let mr = recv.recv().unwrap();
773           assert_eq!(mr.blob_offset, chunk1.compress_offset());
774           assert_eq!(mr.blob_size, chunk1.compress_size());
775
776           let mr = recv.recv().unwrap();
777           assert_eq!(mr.blob_offset, chunk2.compress_offset());
778           assert_eq!(mr.blob_size, chunk2.compress_size());
779
780           // ---
781           let chunk1 = MockChunkInfo {
782               compress_offset: 1000,
783               compress_size: merging_size as u32 - 2000,
784               ..Default::default()
785           };
786
787           let bio1 = BlobIoDesc::new(
788               Arc::new(chunk1.clone()),
789               Arc::new(BlobInfo {
790                   chunk_count: 0,
791                   readahead_offset: 0,
792                   readahead_size: 0,
793                   blob_id: "1".to_string(),
794                   blob_index: 0,
795                   blob_decompressed_size: 0,
796                   blob_compressed_size: 0,
797               }),
798               50,
799               50,
800               RAFS_DEFAULT_BLOCK_SIZE as u32,
801               true,
802           );
803
804           let chunk2 = MockChunkInfo {
805               compress_offset: 1000 + merging_size - 2000,
806               compress_size: 200,
807               ..Default::default()
808           };
809
810           let bio2 = BlobIoDesc::new(
811               Arc::new(chunk2.clone()),
812               Arc::new(BlobInfo {
813                   chunk_count: 0,
814                   readahead_offset: 0,
815                   readahead_size: 0,
816                   blob_id: "1".to_string(),
817                   blob_index: 0,
818                   blob_decompressed_size: 0,
819                   blob_compressed_size: 0,
820               }),
821               50,
822               50,
823               RAFS_DEFAULT_BLOCK_SIZE as u32,
824               true,
825           );
826
827           let chunk3 = MockChunkInfo {
828               compress_offset: 1000 + merging_size - 2000,
829               compress_size: 200,
830               ..Default::default()
831           };
832
833           let bio3 = BlobIoDesc::new(
834               Arc::new(chunk3.clone()),
835               Arc::new(BlobInfo {
836                   chunk_count: 0,
837                   readahead_offset: 0,
838                   readahead_size: 0,
839                   blob_id: "2".to_string(),
840                   blob_index: 0,
841                   blob_decompressed_size: 0,
842                   blob_compressed_size: 0,
843               }),
844               50,
845               50,
846               RAFS_DEFAULT_BLOCK_SIZE as u32,
847               true,
848           );
849
850           let mut bios = vec![bio1, bio2, bio3];
851           let (mut send, recv) = spmc::channel::<MergedBackendRequest>();
852           blob_cache.generate_merged_requests_for_prefetch(
853               &mut bios,
854               &mut send,
855               merging_size as usize,
856           );
857
858           let mr = recv.recv().unwrap();
859           assert_eq!(mr.blob_offset, chunk1.compress_offset());
860           assert_eq!(
861               mr.blob_size,
862               chunk1.compress_size() + chunk2.compress_size()
863           );
864
865           let mr = recv.recv().unwrap();
866           assert_eq!(mr.blob_offset, chunk3.compress_offset());
867           assert_eq!(mr.blob_size, chunk3.compress_size());
868       }
869    */
870}