nydus_storage/cache/filecache/mod.rs
1// Copyright 2020 Ant Group. All rights reserved.
2// Copyright (C) 2021 Alibaba Cloud. All rights reserved.
3//
4// SPDX-License-Identifier: Apache-2.0
5
6use std::collections::HashMap;
7use std::fs::OpenOptions;
8use std::io::Result;
9use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
10use std::sync::{Arc, RwLock};
11
12use tokio::runtime::Runtime;
13
14use nydus_api::CacheConfigV2;
15use nydus_utils::crypt;
16use nydus_utils::metrics::BlobcacheMetrics;
17
18use crate::backend::BlobBackend;
19use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta};
20use crate::cache::state::{
21 BlobStateMap, ChunkMap, DigestedChunkMap, IndexedChunkMap, NoopChunkMap,
22};
23use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
24#[cfg(feature = "dedup")]
25use crate::cache::CasMgr;
26use crate::cache::{BlobCache, BlobCacheMgr};
27use crate::device::{BlobFeatures, BlobInfo};
28use crate::utils::get_path_from_file;
29
30pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw";
31pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data";
32
33/// An implementation of [BlobCacheMgr](../trait.BlobCacheMgr.html) to improve performance by
34/// caching uncompressed blob with local storage.
35#[derive(Clone)]
36pub struct FileCacheMgr {
37 blobs: Arc<RwLock<HashMap<String, Arc<FileCacheEntry>>>>,
38 backend: Arc<dyn BlobBackend>,
39 metrics: Arc<BlobcacheMetrics>,
40 prefetch_config: Arc<AsyncPrefetchConfig>,
41 runtime: Arc<Runtime>,
42 worker_mgr: Arc<AsyncWorkerMgr>,
43 work_dir: String,
44 validate: bool,
45 disable_indexed_map: bool,
46 cache_raw_data: bool,
47 cache_encrypted: bool,
48 cache_convergent_encryption: bool,
49 cache_encryption_key: String,
50 closed: Arc<AtomicBool>,
51 user_io_batch_size: u32,
52}
53
54impl FileCacheMgr {
55 /// Create a new instance of `FileCacheMgr`.
56 pub fn new(
57 config: &CacheConfigV2,
58 backend: Arc<dyn BlobBackend>,
59 runtime: Arc<Runtime>,
60 id: &str,
61 user_io_batch_size: u32,
62 ) -> Result<FileCacheMgr> {
63 let blob_cfg = config.get_filecache_config()?;
64 let work_dir = blob_cfg.get_work_dir()?;
65 let metrics = BlobcacheMetrics::new(id, work_dir);
66 let prefetch_config: Arc<AsyncPrefetchConfig> = Arc::new((&config.prefetch).into());
67 let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?;
68
69 Ok(FileCacheMgr {
70 blobs: Arc::new(RwLock::new(HashMap::new())),
71 backend,
72 metrics,
73 prefetch_config,
74 runtime,
75 worker_mgr: Arc::new(worker_mgr),
76 work_dir: work_dir.to_owned(),
77 disable_indexed_map: blob_cfg.disable_indexed_map,
78 validate: config.cache_validate,
79 cache_raw_data: config.cache_compressed,
80 cache_encrypted: blob_cfg.enable_encryption,
81 cache_convergent_encryption: blob_cfg.enable_convergent_encryption,
82 cache_encryption_key: blob_cfg.encryption_key.clone(),
83 closed: Arc::new(AtomicBool::new(false)),
84 user_io_batch_size,
85 })
86 }
87
88 // Get the file cache entry for the specified blob object.
89 fn get(&self, blob: &Arc<BlobInfo>) -> Option<Arc<FileCacheEntry>> {
90 self.blobs.read().unwrap().get(&blob.blob_id()).cloned()
91 }
92
93 // Create a file cache entry for the specified blob object if not present, otherwise
94 // return the existing one.
95 fn get_or_create_cache_entry(&self, blob: &Arc<BlobInfo>) -> Result<Arc<FileCacheEntry>> {
96 if let Some(entry) = self.get(blob) {
97 return Ok(entry);
98 }
99
100 let entry = FileCacheEntry::new_file_cache(
101 self,
102 blob.clone(),
103 self.prefetch_config.clone(),
104 self.runtime.clone(),
105 self.worker_mgr.clone(),
106 )?;
107 let entry = Arc::new(entry);
108 let mut guard = self.blobs.write().unwrap();
109 if let Some(entry) = guard.get(&blob.blob_id()) {
110 Ok(entry.clone())
111 } else {
112 let blob_id = blob.blob_id();
113 guard.insert(blob_id.clone(), entry.clone());
114 self.metrics
115 .underlying_files
116 .lock()
117 .unwrap()
118 .insert(blob_id + BLOB_DATA_FILE_SUFFIX);
119 Ok(entry)
120 }
121 }
122}
123
124impl BlobCacheMgr for FileCacheMgr {
125 fn init(&self) -> Result<()> {
126 AsyncWorkerMgr::start(self.worker_mgr.clone())
127 }
128
129 fn destroy(&self) {
130 if !self.closed.load(Ordering::Acquire) {
131 self.closed.store(true, Ordering::Release);
132 self.worker_mgr.stop();
133 self.backend().shutdown();
134 self.metrics.release().unwrap_or_else(|e| error!("{:?}", e));
135 }
136 }
137
138 fn gc(&self, id: Option<&str>) -> bool {
139 let mut reclaim = Vec::new();
140
141 if let Some(blob_id) = id {
142 reclaim.push(blob_id.to_string());
143 } else {
144 let guard = self.blobs.write().unwrap();
145 for (id, entry) in guard.iter() {
146 if Arc::strong_count(entry) == 1 {
147 reclaim.push(id.to_owned());
148 }
149 }
150 }
151
152 for key in reclaim.iter() {
153 let mut guard = self.blobs.write().unwrap();
154 if let Some(entry) = guard.get(key) {
155 if Arc::strong_count(entry) == 1 {
156 guard.remove(key);
157 }
158 }
159 }
160
161 self.blobs.read().unwrap().is_empty()
162 }
163
164 fn backend(&self) -> &dyn BlobBackend {
165 self.backend.as_ref()
166 }
167
168 fn get_blob_cache(&self, blob_info: &Arc<BlobInfo>) -> Result<Arc<dyn BlobCache>> {
169 self.get_or_create_cache_entry(blob_info)
170 .map(|v| v as Arc<dyn BlobCache>)
171 }
172
173 fn check_stat(&self) {}
174}
175
176impl Drop for FileCacheMgr {
177 fn drop(&mut self) {
178 self.destroy();
179 }
180}
181
182impl FileCacheEntry {
183 fn new_file_cache(
184 mgr: &FileCacheMgr,
185 blob_info: Arc<BlobInfo>,
186 prefetch_config: Arc<AsyncPrefetchConfig>,
187 runtime: Arc<Runtime>,
188 workers: Arc<AsyncWorkerMgr>,
189 ) -> Result<Self> {
190 let is_separate_meta = blob_info.has_feature(BlobFeatures::SEPARATE);
191 let is_tarfs = blob_info.features().is_tarfs();
192 let is_batch = blob_info.has_feature(BlobFeatures::BATCH);
193 let is_zran = blob_info.has_feature(BlobFeatures::ZRAN);
194 let blob_id = blob_info.blob_id();
195 let blob_meta_id = if is_separate_meta {
196 blob_info.get_blob_meta_id()?
197 } else {
198 blob_id.clone()
199 };
200 let reader = mgr
201 .backend
202 .get_reader(&blob_id)
203 .map_err(|e| eio!(format!("failed to get reader for blob {}, {}", blob_id, e)))?;
204 let blob_meta_reader = if is_separate_meta {
205 mgr.backend.get_reader(&blob_meta_id).map_err(|e| {
206 eio!(format!(
207 "failed to get reader for blob.meta {}, {}",
208 blob_id, e
209 ))
210 })?
211 } else {
212 reader.clone()
213 };
214
215 // Turn off chunk deduplication in case of cache data encryption is enabled or is tarfs.
216 let cas_mgr = if mgr.cache_encrypted || mgr.cache_raw_data || is_tarfs {
217 warn!("chunk deduplication trun off");
218 None
219 } else {
220 #[cfg(feature = "dedup")]
221 {
222 CasMgr::get_singleton()
223 }
224 #[cfg(not(feature = "dedup"))]
225 None
226 };
227
228 let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?;
229 let blob_uncompressed_size = blob_info.uncompressed_size();
230 let is_legacy_stargz = blob_info.is_legacy_stargz();
231
232 let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id);
233 let (
234 file,
235 meta,
236 chunk_map,
237 is_direct_chunkmap,
238 is_get_blob_object_supported,
239 need_validation,
240 ) = if is_tarfs {
241 let file = OpenOptions::new()
242 .create(false)
243 .write(false)
244 .read(true)
245 .open(blob_file_path)?;
246 let chunk_map =
247 Arc::new(BlobStateMap::from(NoopChunkMap::new(true))) as Arc<dyn ChunkMap>;
248 (file, None, chunk_map, true, true, false)
249 } else {
250 let (chunk_map, is_direct_chunkmap) =
251 Self::create_chunk_map(mgr, &blob_info, &blob_file_path)?;
252 // Validation is supported by RAFS v5 (which has no meta_ci) or v6 with chunk digest array.
253 let validation_supported = !blob_info.meta_ci_is_valid()
254 || blob_info.has_feature(BlobFeatures::INLINED_CHUNK_DIGEST);
255 let need_validation = ((mgr.validate && validation_supported) || !is_direct_chunkmap)
256 && !is_legacy_stargz;
257 // Set cache file to its expected size.
258 let suffix = if mgr.cache_raw_data {
259 BLOB_RAW_FILE_SUFFIX
260 } else {
261 BLOB_DATA_FILE_SUFFIX
262 };
263 let blob_data_file_path = blob_file_path.clone() + suffix;
264 let file = OpenOptions::new()
265 .create(true)
266 .truncate(false)
267 .write(true)
268 .read(true)
269 .open(blob_data_file_path)?;
270 let file_size = file.metadata()?.len();
271 let cached_file_size = if mgr.cache_raw_data {
272 blob_info.compressed_data_size()
273 } else {
274 blob_info.uncompressed_size()
275 };
276 if file_size == 0 || file_size < cached_file_size {
277 file.set_len(cached_file_size)?;
278 } else if cached_file_size != 0 && file_size != cached_file_size {
279 let msg = format!(
280 "blob data file size doesn't match: got 0x{:x}, expect 0x{:x}",
281 file_size, cached_file_size
282 );
283 return Err(einval!(msg));
284 }
285 let load_chunk_digest = need_validation || cas_mgr.is_some();
286 let meta = if blob_info.meta_ci_is_valid()
287 || blob_info.has_feature(BlobFeatures::IS_CHUNKDICT_GENERATED)
288 {
289 let meta = FileCacheMeta::new(
290 blob_file_path,
291 blob_info.clone(),
292 Some(blob_meta_reader),
293 Some(runtime.clone()),
294 false,
295 load_chunk_digest,
296 )?;
297 Some(meta)
298 } else {
299 None
300 };
301 let is_get_blob_object_supported = meta.is_some() && is_direct_chunkmap;
302 (
303 file,
304 meta,
305 chunk_map,
306 is_direct_chunkmap,
307 is_get_blob_object_supported,
308 need_validation,
309 )
310 };
311
312 let (cache_cipher_object, cache_cipher_context) = if mgr.cache_encrypted {
313 let key = hex::decode(mgr.cache_encryption_key.clone())
314 .map_err(|_e| einval!("invalid cache file encryption key"))?;
315 let cipher = crypt::Algorithm::Aes128Xts.new_cipher()?;
316 let ctx = crypt::CipherContext::new(
317 key,
318 [0u8; 16].to_vec(),
319 mgr.cache_convergent_encryption,
320 crypt::Algorithm::Aes128Xts,
321 )?;
322 (Arc::new(cipher), Arc::new(ctx))
323 } else {
324 (Default::default(), Default::default())
325 };
326
327 let mut blob_data_file_path = String::new();
328 if cas_mgr.is_some() {
329 blob_data_file_path = if let Some(path) = get_path_from_file(&file) {
330 path
331 } else {
332 warn!("can't get path from file");
333 "".to_string()
334 }
335 }
336
337 trace!(
338 "filecache entry: is_raw_data {}, direct {}, legacy_stargz {}, separate_meta {}, tarfs {}, batch {}, zran {}",
339 mgr.cache_raw_data,
340 is_direct_chunkmap,
341 is_legacy_stargz,
342 is_separate_meta,
343 is_tarfs,
344 is_batch,
345 is_zran,
346 );
347 Ok(FileCacheEntry {
348 blob_id,
349 blob_info,
350 cache_cipher_object,
351 cache_cipher_context,
352 cas_mgr,
353 chunk_map,
354 file: Arc::new(file),
355 file_path: Arc::new(blob_data_file_path),
356 meta,
357 metrics: mgr.metrics.clone(),
358 prefetch_state: Arc::new(AtomicU32::new(0)),
359 reader,
360 runtime,
361 workers,
362
363 blob_compressed_size,
364 blob_uncompressed_size,
365 is_get_blob_object_supported,
366 is_raw_data: mgr.cache_raw_data,
367 is_cache_encrypted: mgr.cache_encrypted,
368 is_direct_chunkmap,
369 is_legacy_stargz,
370 is_tarfs,
371 is_batch,
372 is_zran,
373 dio_enabled: false,
374 need_validation,
375 user_io_batch_size: mgr.user_io_batch_size,
376 prefetch_config,
377 })
378 }
379
380 fn create_chunk_map(
381 mgr: &FileCacheMgr,
382 blob_info: &BlobInfo,
383 blob_file: &str,
384 ) -> Result<(Arc<dyn ChunkMap>, bool)> {
385 // The builder now records the number of chunks in the blob table, so we can
386 // use IndexedChunkMap as a chunk map, but for the old Nydus bootstrap, we
387 // need downgrade to use DigestedChunkMap as a compatible solution.
388 let is_v5 = !blob_info.meta_ci_is_valid();
389 let mut direct_chunkmap = true;
390 let chunk_map: Arc<dyn ChunkMap> = if (is_v5 && mgr.disable_indexed_map)
391 || blob_info.has_feature(BlobFeatures::_V5_NO_EXT_BLOB_TABLE)
392 {
393 direct_chunkmap = false;
394 Arc::new(BlobStateMap::from(DigestedChunkMap::new()))
395 } else {
396 Arc::new(BlobStateMap::from(IndexedChunkMap::new(
397 &format!("{}{}", blob_file, BLOB_DATA_FILE_SUFFIX),
398 blob_info.chunk_count(),
399 true,
400 )?))
401 };
402
403 Ok((chunk_map, direct_chunkmap))
404 }
405}
406
407#[cfg(test)]
408pub mod blob_cache_tests {
409 use nydus_api::FileCacheConfig;
410 use vmm_sys_util::tempdir::TempDir;
411 use vmm_sys_util::tempfile::TempFile;
412
413 #[test]
414 fn test_blob_cache_config() {
415 // new blob cache
416 let tmp_dir = TempDir::new().unwrap();
417 let dir = tmp_dir.as_path().to_path_buf();
418 let s = format!(
419 r###"
420 {{
421 "work_dir": {:?}
422 }}
423 "###,
424 dir
425 );
426
427 let mut blob_config: FileCacheConfig = serde_json::from_str(&s).unwrap();
428 assert!(!blob_config.disable_indexed_map);
429 assert_eq!(blob_config.work_dir, dir.to_str().unwrap());
430
431 let tmp_file = TempFile::new().unwrap();
432 let file = tmp_file.as_path().to_path_buf();
433 blob_config.work_dir = file.to_str().unwrap().to_owned();
434 assert!(blob_config.get_work_dir().is_err());
435 }
436
437 /*
438 #[test]
439 fn test_add() {
440 // new blob cache
441 let tmp_dir = TempDir::new().unwrap();
442 let s = format!(
443 r###"
444 {{
445 "work_dir": {:?}
446 }}
447 "###,
448 tmp_dir.as_path().to_path_buf().join("cache"),
449 );
450
451 let cache_config = CacheConfig {
452 cache_validate: true,
453 cache_compressed: false,
454 cache_type: String::from("blobcache"),
455 cache_config: serde_json::from_str(&s).unwrap(),
456 prefetch_config: BlobPrefetchConfig::default(),
457 };
458 let blob_cache = filecache::new(
459 cache_config,
460 Arc::new(MockBackend {
461 metrics: BackendMetrics::new("id", "mock"),
462 }) as Arc<dyn BlobBackend + Send + Sync>,
463 compress::Algorithm::Lz4Block,
464 digest::Algorithm::Blake3,
465 "id",
466 )
467 .unwrap();
468
469 // generate backend data
470 let mut expect = vec![1u8; 100];
471 let blob_id = "blobcache";
472 blob_cache
473 .backend
474 .read(blob_id, expect.as_mut(), 0)
475 .unwrap();
476
477 // generate chunk and bio
478 let mut chunk = MockChunkInfo::new();
479 chunk.block_id = RafsDigest::from_buf(&expect, digest::Algorithm::Blake3);
480 chunk.file_offset = 0;
481 chunk.compress_offset = 0;
482 chunk.compress_size = 100;
483 chunk.decompress_offset = 0;
484 chunk.decompress_size = 100;
485 let bio = BlobIoDesc::new(
486 Arc::new(chunk),
487 Arc::new(BlobInfo {
488 chunk_count: 0,
489 readahead_offset: 0,
490 readahead_size: 0,
491 blob_id: blob_id.to_string(),
492 blob_index: 0,
493 blob_decompressed_size: 0,
494 blob_compressed_size: 0,
495 }),
496 50,
497 50,
498 RAFS_DEFAULT_BLOCK_SIZE as u32,
499 true,
500 );
501
502 // read from cache
503 let r1 = unsafe {
504 let layout = Layout::from_size_align(50, 1).unwrap();
505 let ptr = alloc_zeroed(layout);
506 let vs = VolatileSlice::new(ptr, 50);
507 blob_cache.read(&mut [bio.clone()], &[vs]).unwrap();
508 Vec::from(from_raw_parts(ptr, 50))
509 };
510
511 let r2 = unsafe {
512 let layout = Layout::from_size_align(50, 1).unwrap();
513 let ptr = alloc_zeroed(layout);
514 let vs = VolatileSlice::new(ptr, 50);
515 blob_cache.read(&mut [bio], &[vs]).unwrap();
516 Vec::from(from_raw_parts(ptr, 50))
517 };
518
519 assert_eq!(r1, &expect[50..]);
520 assert_eq!(r2, &expect[50..]);
521 }
522
523 #[test]
524 fn test_merge_bio() {
525 let tmp_dir = TempDir::new().unwrap();
526 let s = format!(
527 r###"
528 {{
529 "work_dir": {:?}
530 }}
531 "###,
532 tmp_dir.as_path().to_path_buf().join("cache"),
533 );
534
535 let cache_config = CacheConfig {
536 cache_validate: true,
537 cache_compressed: false,
538 cache_type: String::from("blobcache"),
539 cache_config: serde_json::from_str(&s).unwrap(),
540 prefetch_worker: BlobPrefetchConfig::default(),
541 };
542
543 let blob_cache = filecache::new(
544 cache_config,
545 Arc::new(MockBackend {
546 metrics: BackendMetrics::new("id", "mock"),
547 }) as Arc<dyn BlobBackend + Send + Sync>,
548 compress::Algorithm::Lz4Block,
549 digest::Algorithm::Blake3,
550 "id",
551 )
552 .unwrap();
553
554 let merging_size: u64 = 128 * 1024 * 1024;
555
556 let single_chunk = MockChunkInfo {
557 compress_offset: 1000,
558 compress_size: merging_size as u32 - 1,
559 ..Default::default()
560 };
561
562 let bio = BlobIoDesc::new(
563 Arc::new(single_chunk.clone()),
564 Arc::new(BlobInfo {
565 chunk_count: 0,
566 readahead_offset: 0,
567 readahead_size: 0,
568 blob_id: "1".to_string(),
569 blob_index: 0,
570 blob_decompressed_size: 0,
571 blob_compressed_size: 0,
572 }),
573 50,
574 50,
575 RAFS_DEFAULT_BLOCK_SIZE as u32,
576 true,
577 );
578
579 let (mut send, recv) = spmc::channel::<MergedBackendRequest>();
580 let mut bios = vec![bio];
581
582 blob_cache.generate_merged_requests_for_prefetch(
583 &mut bios,
584 &mut send,
585 merging_size as usize,
586 );
587 let mr = recv.recv().unwrap();
588
589 assert_eq!(mr.blob_offset, single_chunk.compress_offset());
590 assert_eq!(mr.blob_size, single_chunk.compress_size());
591
592 // ---
593 let chunk1 = MockChunkInfo {
594 compress_offset: 1000,
595 compress_size: merging_size as u32 - 2000,
596 ..Default::default()
597 };
598
599 let bio1 = BlobIoDesc::new(
600 Arc::new(chunk1.clone()),
601 Arc::new(BlobInfo {
602 chunk_count: 0,
603 readahead_offset: 0,
604 readahead_size: 0,
605 blob_id: "1".to_string(),
606 blob_index: 0,
607 blob_decompressed_size: 0,
608 blob_compressed_size: 0,
609 }),
610 50,
611 50,
612 RAFS_DEFAULT_BLOCK_SIZE as u32,
613 true,
614 );
615
616 let chunk2 = MockChunkInfo {
617 compress_offset: 1000 + merging_size - 2000,
618 compress_size: 200,
619 ..Default::default()
620 };
621
622 let bio2 = BlobIoDesc::new(
623 Arc::new(chunk2.clone()),
624 Arc::new(BlobInfo {
625 chunk_count: 0,
626 readahead_offset: 0,
627 readahead_size: 0,
628 blob_id: "1".to_string(),
629 blob_index: 0,
630 blob_decompressed_size: 0,
631 blob_compressed_size: 0,
632 }),
633 50,
634 50,
635 RAFS_DEFAULT_BLOCK_SIZE as u32,
636 true,
637 );
638
639 let mut bios = vec![bio1, bio2];
640 let (mut send, recv) = spmc::channel::<MergedBackendRequest>();
641 blob_cache.generate_merged_requests_for_prefetch(
642 &mut bios,
643 &mut send,
644 merging_size as usize,
645 );
646 let mr = recv.recv().unwrap();
647
648 assert_eq!(mr.blob_offset, chunk1.compress_offset());
649 assert_eq!(
650 mr.blob_size,
651 chunk1.compress_size() + chunk2.compress_size()
652 );
653
654 // ---
655 let chunk1 = MockChunkInfo {
656 compress_offset: 1000,
657 compress_size: merging_size as u32 - 2000,
658 ..Default::default()
659 };
660
661 let bio1 = BlobIoDesc::new(
662 Arc::new(chunk1.clone()),
663 Arc::new(BlobInfo {
664 chunk_count: 0,
665 readahead_offset: 0,
666 readahead_size: 0,
667 blob_id: "1".to_string(),
668 blob_index: 0,
669 blob_decompressed_size: 0,
670 blob_compressed_size: 0,
671 }),
672 50,
673 50,
674 RAFS_DEFAULT_BLOCK_SIZE as u32,
675 true,
676 );
677
678 let chunk2 = MockChunkInfo {
679 compress_offset: 1000 + merging_size - 2000 + 1,
680 compress_size: 200,
681 ..Default::default()
682 };
683
684 let bio2 = BlobIoDesc::new(
685 Arc::new(chunk2.clone()),
686 Arc::new(BlobInfo {
687 chunk_count: 0,
688 readahead_offset: 0,
689 readahead_size: 0,
690 blob_id: "1".to_string(),
691 blob_index: 0,
692 blob_decompressed_size: 0,
693 blob_compressed_size: 0,
694 }),
695 50,
696 50,
697 RAFS_DEFAULT_BLOCK_SIZE as u32,
698 true,
699 );
700
701 let mut bios = vec![bio1, bio2];
702 let (mut send, recv) = spmc::channel::<MergedBackendRequest>();
703 blob_cache.generate_merged_requests_for_prefetch(
704 &mut bios,
705 &mut send,
706 merging_size as usize,
707 );
708
709 let mr = recv.recv().unwrap();
710 assert_eq!(mr.blob_offset, chunk1.compress_offset());
711 assert_eq!(mr.blob_size, chunk1.compress_size());
712
713 let mr = recv.recv().unwrap();
714 assert_eq!(mr.blob_offset, chunk2.compress_offset());
715 assert_eq!(mr.blob_size, chunk2.compress_size());
716
717 // ---
718 let chunk1 = MockChunkInfo {
719 compress_offset: 1000,
720 compress_size: merging_size as u32 - 2000,
721 ..Default::default()
722 };
723
724 let bio1 = BlobIoDesc::new(
725 Arc::new(chunk1.clone()),
726 Arc::new(BlobInfo {
727 chunk_count: 0,
728 readahead_offset: 0,
729 readahead_size: 0,
730 blob_id: "1".to_string(),
731 blob_index: 0,
732 blob_decompressed_size: 0,
733 blob_compressed_size: 0,
734 }),
735 50,
736 50,
737 RAFS_DEFAULT_BLOCK_SIZE as u32,
738 true,
739 );
740
741 let chunk2 = MockChunkInfo {
742 compress_offset: 1000 + merging_size - 2000,
743 compress_size: 200,
744 ..Default::default()
745 };
746
747 let bio2 = BlobIoDesc::new(
748 Arc::new(chunk2.clone()),
749 Arc::new(BlobInfo {
750 chunk_count: 0,
751 readahead_offset: 0,
752 readahead_size: 0,
753 blob_id: "2".to_string(),
754 blob_index: 0,
755 blob_decompressed_size: 0,
756 blob_compressed_size: 0,
757 }),
758 50,
759 50,
760 RAFS_DEFAULT_BLOCK_SIZE as u32,
761 true,
762 );
763
764 let mut bios = vec![bio1, bio2];
765 let (mut send, recv) = spmc::channel::<MergedBackendRequest>();
766 blob_cache.generate_merged_requests_for_prefetch(
767 &mut bios,
768 &mut send,
769 merging_size as usize,
770 );
771
772 let mr = recv.recv().unwrap();
773 assert_eq!(mr.blob_offset, chunk1.compress_offset());
774 assert_eq!(mr.blob_size, chunk1.compress_size());
775
776 let mr = recv.recv().unwrap();
777 assert_eq!(mr.blob_offset, chunk2.compress_offset());
778 assert_eq!(mr.blob_size, chunk2.compress_size());
779
780 // ---
781 let chunk1 = MockChunkInfo {
782 compress_offset: 1000,
783 compress_size: merging_size as u32 - 2000,
784 ..Default::default()
785 };
786
787 let bio1 = BlobIoDesc::new(
788 Arc::new(chunk1.clone()),
789 Arc::new(BlobInfo {
790 chunk_count: 0,
791 readahead_offset: 0,
792 readahead_size: 0,
793 blob_id: "1".to_string(),
794 blob_index: 0,
795 blob_decompressed_size: 0,
796 blob_compressed_size: 0,
797 }),
798 50,
799 50,
800 RAFS_DEFAULT_BLOCK_SIZE as u32,
801 true,
802 );
803
804 let chunk2 = MockChunkInfo {
805 compress_offset: 1000 + merging_size - 2000,
806 compress_size: 200,
807 ..Default::default()
808 };
809
810 let bio2 = BlobIoDesc::new(
811 Arc::new(chunk2.clone()),
812 Arc::new(BlobInfo {
813 chunk_count: 0,
814 readahead_offset: 0,
815 readahead_size: 0,
816 blob_id: "1".to_string(),
817 blob_index: 0,
818 blob_decompressed_size: 0,
819 blob_compressed_size: 0,
820 }),
821 50,
822 50,
823 RAFS_DEFAULT_BLOCK_SIZE as u32,
824 true,
825 );
826
827 let chunk3 = MockChunkInfo {
828 compress_offset: 1000 + merging_size - 2000,
829 compress_size: 200,
830 ..Default::default()
831 };
832
833 let bio3 = BlobIoDesc::new(
834 Arc::new(chunk3.clone()),
835 Arc::new(BlobInfo {
836 chunk_count: 0,
837 readahead_offset: 0,
838 readahead_size: 0,
839 blob_id: "2".to_string(),
840 blob_index: 0,
841 blob_decompressed_size: 0,
842 blob_compressed_size: 0,
843 }),
844 50,
845 50,
846 RAFS_DEFAULT_BLOCK_SIZE as u32,
847 true,
848 );
849
850 let mut bios = vec![bio1, bio2, bio3];
851 let (mut send, recv) = spmc::channel::<MergedBackendRequest>();
852 blob_cache.generate_merged_requests_for_prefetch(
853 &mut bios,
854 &mut send,
855 merging_size as usize,
856 );
857
858 let mr = recv.recv().unwrap();
859 assert_eq!(mr.blob_offset, chunk1.compress_offset());
860 assert_eq!(
861 mr.blob_size,
862 chunk1.compress_size() + chunk2.compress_size()
863 );
864
865 let mr = recv.recv().unwrap();
866 assert_eq!(mr.blob_offset, chunk3.compress_offset());
867 assert_eq!(mr.blob_size, chunk3.compress_size());
868 }
869 */
870}