1use std::cmp;
20use std::io::Result;
21use std::sync::Arc;
22use std::time::Instant;
23
24use fuse_backend_rs::file_buf::FileVolatileSlice;
25use nydus_utils::compress::zlib_random::ZranDecoder;
26use nydus_utils::crypt::{self, Cipher, CipherContext};
27use nydus_utils::{compress, digest};
28
29use crate::backend::{BlobBackend, BlobReader};
30use crate::cache::state::ChunkMap;
31use crate::device::{
32 BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoVec, BlobObject, BlobPrefetchRequest,
33};
34use crate::meta::BlobCompressionContextInfo;
35use crate::utils::{alloc_buf, check_crc, check_hash};
36use crate::{StorageResult, RAFS_MAX_CHUNK_SIZE};
37
38mod cachedfile;
39#[cfg(feature = "dedup")]
40mod dedup;
41mod dummycache;
42mod filecache;
43#[cfg(target_os = "linux")]
44mod fscache;
45mod worker;
46
47pub mod state;
48
49pub use dummycache::DummyCacheMgr;
50pub use filecache::FileCacheMgr;
51#[cfg(target_os = "linux")]
52pub use fscache::FsCacheMgr;
53
54pub const SINGLE_INFLIGHT_WAIT_TIMEOUT: u64 = 2000;
56
57struct BlobIoMergeState<'a, F: FnMut(BlobIoRange)> {
58 cb: F,
59 size: u32,
61 bios: Vec<&'a BlobIoDesc>,
62}
63
64impl<'a, F: FnMut(BlobIoRange)> BlobIoMergeState<'a, F> {
65 pub fn new(bio: &'a BlobIoDesc, cb: F) -> Self {
67 let size = bio.chunkinfo.compressed_size();
68
69 BlobIoMergeState {
70 cb,
71 size,
72 bios: vec![bio],
73 }
74 }
75
76 #[inline]
78 fn size(&self) -> usize {
79 self.size as usize
80 }
81
82 #[inline]
84 fn push(&mut self, bio: &'a BlobIoDesc) {
85 let start = bio.chunkinfo.compressed_offset();
86 let size = if !self.bios.is_empty() {
87 let last = &self.bios[self.bios.len() - 1].chunkinfo;
88 let prev = last.compressed_offset() + last.compressed_size() as u64;
89 assert!(prev <= start);
90 assert!(start - prev < u32::MAX as u64);
91 (start - prev) as u32 + bio.chunkinfo.compressed_size()
92 } else {
93 bio.chunkinfo.compressed_size()
94 };
95 assert!(self.size.checked_add(size).is_some());
96 self.size += size;
97 self.bios.push(bio);
98 }
99
100 #[inline]
102 pub fn issue(&mut self, max_gap: u64) {
103 if !self.bios.is_empty() {
104 let mut mr = BlobIoRange::new(self.bios[0], self.bios.len());
105 for bio in self.bios[1..].iter() {
106 mr.merge(bio, max_gap);
107 }
108 (self.cb)(mr);
109
110 self.bios.truncate(0);
111 self.size = 0;
112 }
113 }
114
115 pub fn merge_and_issue(bios: &[BlobIoDesc], max_comp_size: u64, max_gap: u64, op: F) {
118 if !bios.is_empty() {
119 let mut index = 1;
120 let mut state = BlobIoMergeState::new(&bios[0], op);
121
122 for cur_bio in &bios[1..] {
123 if !bios[index - 1].is_continuous(cur_bio, max_gap)
126 || state.size() as u64 >= max_comp_size
127 {
128 state.issue(max_gap);
129 }
130 state.push(cur_bio);
131 index += 1
132 }
133 state.issue(max_gap);
134 }
135 }
136}
137
138pub trait BlobCache: Send + Sync {
143 fn blob_id(&self) -> &str;
145
146 fn blob_uncompressed_size(&self) -> Result<u64>;
148
149 fn blob_compressed_size(&self) -> Result<u64>;
151
152 fn blob_compressor(&self) -> compress::Algorithm;
154
155 fn blob_cipher(&self) -> crypt::Algorithm;
157
158 fn blob_cipher_object(&self) -> Arc<Cipher>;
160
161 fn blob_cipher_context(&self) -> Option<CipherContext>;
163
164 fn blob_digester(&self) -> digest::Algorithm;
166
167 fn is_legacy_stargz(&self) -> bool;
169
170 fn get_legacy_stargz_size(&self, offset: u64, uncomp_size: usize) -> Result<usize> {
172 let blob_size = self.blob_compressed_size()?;
173 let max_size = blob_size.checked_sub(offset).ok_or_else(|| {
174 einval!(format!(
175 "chunk compressed offset {:x} is bigger than blob file size {:x}",
176 offset, blob_size
177 ))
178 })?;
179 let max_size = cmp::min(max_size, usize::MAX as u64) as usize;
180 Ok(compress::compute_compressed_gzip_size(
181 uncomp_size,
182 max_size,
183 ))
184 }
185
186 fn is_zran(&self) -> bool {
188 false
189 }
190
191 fn is_batch(&self) -> bool {
193 false
194 }
195
196 fn need_validation(&self) -> bool;
198
199 fn reader(&self) -> &dyn BlobReader;
201
202 fn get_chunk_map(&self) -> &Arc<dyn ChunkMap>;
204
205 fn get_chunk_info(&self, chunk_index: u32) -> Option<Arc<dyn BlobChunkInfo>>;
207
208 fn get_blob_object(&self) -> Option<&dyn BlobObject> {
210 None
211 }
212
213 fn start_prefetch(&self) -> StorageResult<()>;
217
218 fn stop_prefetch(&self) -> StorageResult<()>;
222
223 fn is_prefetch_active(&self) -> bool;
225
226 fn prefetch(
228 &self,
229 cache: Arc<dyn BlobCache>,
230 prefetches: &[BlobPrefetchRequest],
231 bios: &[BlobIoDesc],
232 ) -> StorageResult<usize>;
233
234 fn prefetch_range(&self, _range: &BlobIoRange) -> Result<usize> {
236 Err(enosys!("doesn't support prefetch_range()"))
237 }
238
239 fn read(&self, iovec: &mut BlobIoVec, buffers: &[FileVolatileSlice]) -> Result<usize>;
241
242 fn read_chunks_from_backend<'a, 'b>(
252 &'a self,
253 blob_offset: u64,
254 blob_size: usize,
255 chunks: &'b [Arc<dyn BlobChunkInfo>],
256 prefetch: bool,
257 ) -> Result<ChunkDecompressState<'a, 'b>>
258 where
259 Self: Sized,
260 {
261 let mut c_buf = alloc_buf(blob_size);
263 let start = Instant::now();
264 let nr_read = self
265 .reader()
266 .read(c_buf.as_mut_slice(), blob_offset)
267 .map_err(|e| eio!(e))?;
268 if nr_read != blob_size {
269 return Err(eio!(format!(
270 "request for {} bytes but got {} bytes",
271 blob_size, nr_read
272 )));
273 }
274 let duration = Instant::now().duration_since(start).as_millis();
275 debug!(
276 "read_chunks_from_backend: {} {} {} bytes at {}, duration {}ms",
277 std::thread::current().name().unwrap_or_default(),
278 if prefetch { "prefetch" } else { "fetch" },
279 blob_size,
280 blob_offset,
281 duration
282 );
283
284 let chunks = chunks.iter().map(|v| v.as_ref()).collect();
285 Ok(ChunkDecompressState::new(blob_offset, self, chunks, c_buf))
286 }
287
288 fn read_chunk_from_backend(
293 &self,
294 chunk: &dyn BlobChunkInfo,
295 buffer: &mut [u8],
296 ) -> Result<Option<Vec<u8>>> {
297 let start = Instant::now();
298 let offset = chunk.compressed_offset();
299 let mut c_buf = None;
300
301 if self.is_zran() || self.is_batch() {
302 return Err(enosys!("read_chunk_from_backend"));
303 } else if !chunk.is_compressed() && !chunk.is_encrypted() {
304 let size = self.reader().read(buffer, offset).map_err(|e| eio!(e))?;
305 if size != buffer.len() {
306 return Err(eio!("storage backend returns less data than requested"));
307 }
308 } else {
309 let c_size = if self.is_legacy_stargz() {
310 self.get_legacy_stargz_size(offset, buffer.len())?
311 } else {
312 chunk.compressed_size() as usize
313 };
314 let mut raw_buffer = alloc_buf(c_size);
315 let size = self
316 .reader()
317 .read(raw_buffer.as_mut_slice(), offset)
318 .map_err(|e| eio!(e))?;
319 if size != raw_buffer.len() {
320 return Err(eio!("storage backend returns less data than requested"));
321 }
322 let decrypted_buffer = crypt::decrypt_with_context(
323 &raw_buffer,
324 &self.blob_cipher_object(),
325 &self.blob_cipher_context(),
326 chunk.is_encrypted(),
327 )?;
328 self.decompress_chunk_data(&decrypted_buffer, buffer, chunk.is_compressed())?;
329 c_buf = Some(raw_buffer);
330 }
331
332 let duration = Instant::now().duration_since(start).as_millis();
333 debug!(
334 "read_chunk_from_backend: {} {} bytes at {}, duration {}ms",
335 std::thread::current().name().unwrap_or_default(),
336 chunk.compressed_size(),
337 chunk.compressed_offset(),
338 duration
339 );
340 self.validate_chunk_data(chunk, buffer, false)
341 .map_err(|e| {
342 warn!("failed to read data from backend, {}", e);
343 e
344 })?;
345
346 Ok(c_buf)
347 }
348
349 fn decompress_chunk_data(
351 &self,
352 raw_buffer: &[u8],
353 buffer: &mut [u8],
354 is_compressed: bool,
355 ) -> Result<()> {
356 if is_compressed {
357 let compressor = self.blob_compressor();
358 let ret = compress::decompress(raw_buffer, buffer, compressor).map_err(|e| {
359 error!("failed to decompress chunk: {}", e);
360 e
361 })?;
362 if ret != buffer.len() {
363 return Err(einval!(format!(
364 "size of decompressed data doesn't match expected, {} vs {}, raw_buffer: {}",
365 ret,
366 buffer.len(),
367 raw_buffer.len()
368 )));
369 }
370 } else if raw_buffer.as_ptr() != buffer.as_ptr() {
371 buffer.copy_from_slice(raw_buffer);
373 }
374 Ok(())
375 }
376
377 fn validate_chunk_data(
379 &self,
380 chunk: &dyn BlobChunkInfo,
381 buffer: &[u8],
382 force_validation: bool,
383 ) -> Result<usize> {
384 let d_size = chunk.uncompressed_size() as usize;
385 if buffer.len() != d_size {
386 Err(eio!("uncompressed size and buffer size doesn't match"))
387 } else if (self.need_validation() || chunk.has_crc32() || force_validation)
388 && !self.is_legacy_stargz()
389 && !self.check_digest(chunk, buffer)
390 {
391 Err(std::io::Error::new(
392 std::io::ErrorKind::InvalidData,
393 "data digest value doesn't match",
394 ))
395 } else {
396 Ok(d_size)
397 }
398 }
399
400 fn check_digest(&self, chunk: &dyn BlobChunkInfo, buffer: &[u8]) -> bool {
401 if chunk.has_crc32() {
402 check_crc(buffer, chunk.crc32())
403 } else {
404 check_hash(buffer, chunk.chunk_id(), self.blob_digester())
405 }
406 }
407
408 fn get_blob_meta_info(&self) -> Result<Option<Arc<BlobCompressionContextInfo>>> {
409 Ok(None)
410 }
411}
412
413pub struct ChunkDecompressState<'a, 'b> {
415 blob_offset: u64,
416 chunk_idx: usize,
417 batch_idx: u32,
418 zran_idx: u32,
419 cache: &'a dyn BlobCache,
420 chunks: Vec<&'b dyn BlobChunkInfo>,
421 c_buf: Vec<u8>,
422 d_buf: Vec<u8>,
423}
424
425impl<'a, 'b> ChunkDecompressState<'a, 'b> {
426 fn new(
427 blob_offset: u64,
428 cache: &'a dyn BlobCache,
429 chunks: Vec<&'b dyn BlobChunkInfo>,
430 c_buf: Vec<u8>,
431 ) -> Self {
432 ChunkDecompressState {
433 blob_offset,
434 chunk_idx: 0,
435 batch_idx: u32::MAX,
436 zran_idx: u32::MAX,
437 cache,
438 chunks,
439 c_buf,
440 d_buf: Vec::new(),
441 }
442 }
443
444 fn decompress_batch(
445 &mut self,
446 meta: &Arc<BlobCompressionContextInfo>,
447 c_offset: u64,
448 ) -> Result<()> {
449 let ctx = meta.get_batch_context(self.batch_idx)?;
450 let c_size = ctx.compressed_size() as u64;
451 let d_size = ctx.uncompressed_batch_size() as u64;
452 if c_offset < self.blob_offset
453 || c_offset.checked_add(c_size).is_none()
454 || c_offset + c_size > self.blob_offset + self.c_buf.len() as u64
455 || d_size > RAFS_MAX_CHUNK_SIZE
456 {
457 let msg = format!(
458 "invalid chunk: z_offset 0x{:x}, z_size 0x{:x}, c_offset 0x{:x}, c_size 0x{:x}, d_size 0x{:x}",
459 self.blob_offset,
460 self.c_buf.len(),
461 c_offset,
462 c_size,
463 d_size
464 );
465 return Err(einval!(msg));
466 }
467
468 let c_offset = (c_offset - self.blob_offset) as usize;
469 let input = &self.c_buf[c_offset..c_offset + c_size as usize];
470 let decrypted_buffer = crypt::decrypt_with_context(
471 input,
472 &self.cache.blob_cipher_object(),
473 &self.cache.blob_cipher_context(),
474 meta.state.is_encrypted(),
475 )?;
476 let mut output = alloc_buf(d_size as usize);
477
478 self.cache
479 .decompress_chunk_data(&decrypted_buffer, &mut output, c_size != d_size)?;
480
481 if output.len() != d_size as usize {
482 return Err(einval!(format!(
483 "decompressed data size doesn't match: {} vs {}",
484 output.len(),
485 d_size
486 )));
487 }
488
489 self.d_buf = output;
490
491 Ok(())
492 }
493
494 fn decompress_zran(&mut self, meta: &Arc<BlobCompressionContextInfo>) -> Result<()> {
495 let (ctx, dict) = meta.get_zran_context(self.zran_idx)?;
496 let c_offset = ctx.in_offset;
497 let c_size = ctx.in_len as u64;
498 if c_offset < self.blob_offset
499 || c_offset.checked_add(c_size).is_none()
500 || c_offset + c_size > self.blob_offset + self.c_buf.len() as u64
501 || ctx.out_len as u64 > RAFS_MAX_CHUNK_SIZE
502 {
503 let msg = format!(
504 "invalid chunk: z_offset 0x{:x}, z_size 0x{:x}, c_offset 0x{:x}, c_size 0x{:x}, d_size 0x{:x}",
505 self.blob_offset,
506 self.c_buf.len(),
507 c_offset,
508 c_size,
509 ctx.out_len
510 );
511 return Err(einval!(msg));
512 }
513
514 let c_offset = (c_offset - self.blob_offset) as usize;
515 let input = &self.c_buf[c_offset..c_offset + c_size as usize];
516 let mut output = alloc_buf(ctx.out_len as usize);
517 let mut decoder = ZranDecoder::new()?;
518 decoder.uncompress(&ctx, Some(dict), input, &mut output)?;
519 self.d_buf = output;
520
521 Ok(())
522 }
523
524 fn next_batch(&mut self, chunk: &dyn BlobChunkInfo) -> Result<Vec<u8>> {
525 if !chunk.is_batch() {
527 return self.next_buf(chunk);
528 }
529
530 let meta = self
531 .cache
532 .get_blob_meta_info()?
533 .ok_or_else(|| einval!("failed to get blob meta object for Batch"))?;
534
535 let batch_idx = meta.get_batch_index(chunk.id())?;
536 if batch_idx != self.batch_idx {
537 self.batch_idx = batch_idx;
538 self.decompress_batch(&meta, chunk.compressed_offset())?;
539 }
540 let offset = meta.get_uncompressed_offset_in_batch_buf(chunk.id())? as usize;
541 let end = offset + chunk.uncompressed_size() as usize;
542 if end > self.d_buf.len() {
543 return Err(einval!(format!(
544 "invalid Batch decompression status, end: {}, len: {}",
545 end,
546 self.d_buf.len()
547 )));
548 }
549
550 let mut buffer = alloc_buf(chunk.uncompressed_size() as usize);
553 buffer.copy_from_slice(&self.d_buf[offset as usize..end]);
554 Ok(buffer)
555 }
556
557 fn next_zran(&mut self, chunk: &dyn BlobChunkInfo) -> Result<Vec<u8>> {
558 let meta = self
559 .cache
560 .get_blob_meta_info()?
561 .ok_or_else(|| einval!("failed to get blob meta object for ZRan"))?;
562 let zran_idx = meta.get_zran_index(chunk.id())?;
563 if zran_idx != self.zran_idx {
564 self.zran_idx = zran_idx;
565 self.decompress_zran(&meta)?;
566 }
567 let offset = meta.get_zran_offset(chunk.id())? as usize;
568 let end = offset + chunk.uncompressed_size() as usize;
569 if end > self.d_buf.len() {
570 return Err(einval!("invalid ZRan decompression status"));
571 }
572 let mut buffer = alloc_buf(chunk.uncompressed_size() as usize);
575 buffer.copy_from_slice(&self.d_buf[offset as usize..end]);
576 Ok(buffer)
577 }
578
579 fn next_buf(&mut self, chunk: &dyn BlobChunkInfo) -> Result<Vec<u8>> {
580 let c_offset = chunk.compressed_offset();
581 let c_size = chunk.compressed_size();
582 let d_size = chunk.uncompressed_size() as usize;
583 if c_offset < self.blob_offset
584 || c_offset - self.blob_offset > usize::MAX as u64
585 || c_offset.checked_add(c_size as u64).is_none()
586 || c_offset + c_size as u64 > self.blob_offset + self.c_buf.len() as u64
587 || d_size as u64 > RAFS_MAX_CHUNK_SIZE
588 {
589 let msg = format!(
590 "invalid chunk info: c_offset 0x{:x}, c_size 0x{:x}, d_size 0x{:x}, blob_offset 0x{:x}",
591 c_offset, c_size, d_size, self.blob_offset
592 );
593 return Err(eio!(msg));
594 }
595
596 let offset_merged = (c_offset - self.blob_offset) as usize;
597 let end_merged = offset_merged + c_size as usize;
598 let decrypted_buffer = crypt::decrypt_with_context(
599 &self.c_buf[offset_merged..end_merged],
600 &self.cache.blob_cipher_object(),
601 &self.cache.blob_cipher_context(),
602 chunk.is_encrypted(),
603 )?;
604 let mut buffer = alloc_buf(d_size);
605 self.cache
606 .decompress_chunk_data(&decrypted_buffer, &mut buffer, chunk.is_compressed())?;
607 self.cache
608 .validate_chunk_data(chunk, &buffer, false)
609 .map_err(|e| {
610 warn!("failed to read data from backend, {}", e);
611 e
612 })?;
613 Ok(buffer)
614 }
615
616 pub fn compressed_buf(&self) -> &[u8] {
618 &self.c_buf
619 }
620}
621
622impl Iterator for ChunkDecompressState<'_, '_> {
623 type Item = Result<Vec<u8>>;
624
625 fn next(&mut self) -> Option<Self::Item> {
626 if self.chunk_idx >= self.chunks.len() {
627 return None;
628 }
629
630 let cache = self.cache;
631 let chunk = self.chunks[self.chunk_idx];
632 self.chunk_idx += 1;
633 let res = if cache.is_batch() {
634 self.next_batch(chunk)
635 } else if cache.is_zran() {
636 self.next_zran(chunk)
637 } else {
638 self.next_buf(chunk)
639 };
640 Some(res)
641 }
642}
643
644pub(crate) trait BlobCacheMgr: Send + Sync {
649 fn init(&self) -> Result<()>;
651
652 fn destroy(&self);
654
655 fn gc(&self, _id: Option<&str>) -> bool;
659
660 fn backend(&self) -> &dyn BlobBackend;
662
663 fn get_blob_cache(&self, blob_info: &Arc<BlobInfo>) -> Result<Arc<dyn BlobCache>>;
665
666 fn check_stat(&self);
668}
669
670#[cfg(feature = "dedup")]
671pub use dedup::CasMgr;
672
673#[cfg(not(feature = "dedup"))]
674pub struct CasMgr {}
675
676#[cfg(test)]
677mod tests {
678 use crate::device::{BlobChunkFlags, BlobFeatures};
679 use crate::test::MockChunkInfo;
680
681 use super::*;
682
683 #[test]
684 fn test_io_merge_state_new() {
685 let blob_info = Arc::new(BlobInfo::new(
686 1,
687 "test1".to_owned(),
688 0x200000,
689 0x100000,
690 0x100000,
691 512,
692 BlobFeatures::_V5_NO_EXT_BLOB_TABLE,
693 ));
694 let chunk1 = Arc::new(MockChunkInfo {
695 block_id: Default::default(),
696 blob_index: 1,
697 flags: BlobChunkFlags::empty(),
698 compress_size: 0x800,
699 uncompress_size: 0x1000,
700 compress_offset: 0,
701 uncompress_offset: 0,
702 file_offset: 0,
703 index: 0,
704 crc32: 0,
705 }) as Arc<dyn BlobChunkInfo>;
706 let chunk2 = Arc::new(MockChunkInfo {
707 block_id: Default::default(),
708 blob_index: 1,
709 flags: BlobChunkFlags::empty(),
710 compress_size: 0x800,
711 uncompress_size: 0x1000,
712 compress_offset: 0x800,
713 uncompress_offset: 0x1000,
714 file_offset: 0x1000,
715 index: 1,
716 crc32: 0,
717 }) as Arc<dyn BlobChunkInfo>;
718 let chunk3 = Arc::new(MockChunkInfo {
719 block_id: Default::default(),
720 blob_index: 1,
721 flags: BlobChunkFlags::empty(),
722 compress_size: 0x800,
723 uncompress_size: 0x1000,
724 compress_offset: 0x1000,
725 uncompress_offset: 0x1000,
726 file_offset: 0x1000,
727 index: 1,
728 crc32: 0,
729 }) as Arc<dyn BlobChunkInfo>;
730
731 let cb = |_merged| {};
732 let desc1 = BlobIoDesc {
733 blob: blob_info.clone(),
734 chunkinfo: chunk1.into(),
735 offset: 0,
736 size: 0x1000,
737 user_io: true,
738 };
739 let mut state = BlobIoMergeState::new(&desc1, cb);
740 assert_eq!(state.size(), 0x800);
741 assert_eq!(state.bios.len(), 1);
742
743 let desc2 = BlobIoDesc {
744 blob: blob_info.clone(),
745 chunkinfo: chunk2.into(),
746 offset: 0,
747 size: 0x1000,
748 user_io: true,
749 };
750 state.push(&desc2);
751 assert_eq!(state.size, 0x1000);
752 assert_eq!(state.bios.len(), 2);
753
754 state.issue(0);
755 assert_eq!(state.size(), 0x0);
756 assert_eq!(state.bios.len(), 0);
757
758 let desc3 = BlobIoDesc {
759 blob: blob_info,
760 chunkinfo: chunk3.into(),
761 offset: 0,
762 size: 0x1000,
763 user_io: true,
764 };
765 state.push(&desc3);
766 assert_eq!(state.size, 0x800);
767 assert_eq!(state.bios.len(), 1);
768
769 state.issue(0);
770 assert_eq!(state.size(), 0x0);
771 assert_eq!(state.bios.len(), 0);
772
773 let mut count = 0;
774 BlobIoMergeState::merge_and_issue(
775 &[desc1.clone(), desc2.clone(), desc3.clone()],
776 0x4000,
777 0x0,
778 |_v| count += 1,
779 );
780 assert_eq!(count, 1);
781
782 let mut count = 0;
783 BlobIoMergeState::merge_and_issue(
784 &[desc1.clone(), desc2.clone(), desc3.clone()],
785 0x1000,
786 0x0,
787 |_v| count += 1,
788 );
789 assert_eq!(count, 2);
790
791 let mut count = 0;
792 BlobIoMergeState::merge_and_issue(&[desc1.clone(), desc3.clone()], 0x4000, 0x0, |_v| {
793 count += 1
794 });
795 assert_eq!(count, 2);
796
797 assert!(desc1.is_continuous(&desc2, 0));
798 assert!(!desc1.is_continuous(&desc3, 0));
799 }
800}