1use std::any::Any;
23use std::collections::hash_map::Drain;
24use std::collections::HashMap;
25use std::convert::TryFrom;
26use std::fmt::{Debug, Formatter};
27use std::fs::File;
28use std::io::{self, Error};
29use std::ops::Deref;
30use std::os::unix::io::AsRawFd;
31use std::path::Path;
32use std::sync::{Arc, Mutex};
33
34use arc_swap::ArcSwap;
35use fuse_backend_rs::api::filesystem::ZeroCopyWriter;
36use fuse_backend_rs::file_buf::FileVolatileSlice;
37use fuse_backend_rs::file_traits::FileReadWriteVolatile;
38
39use nydus_api::ConfigV2;
40use nydus_utils::compress;
41use nydus_utils::crypt::{self, Cipher, CipherContext};
42use nydus_utils::digest::{self, RafsDigest};
43
44use crate::cache::BlobCache;
45use crate::factory::BLOB_FACTORY;
46
47pub(crate) const BLOB_FEATURE_INCOMPAT_MASK: u32 = 0x0000_ffff;
48pub(crate) const BLOB_FEATURE_INCOMPAT_VALUE: u32 = 0x0000_0fff;
49
50bitflags! {
51 pub struct BlobFeatures: u32 {
53 const ALIGNED = 0x0000_0001;
55 const INLINED_FS_META = 0x0000_0002;
57 const CHUNK_INFO_V2 = 0x0000_0004;
59 const ZRAN = 0x0000_0008;
61 const SEPARATE = 0x0000_0010;
63 const INLINED_CHUNK_DIGEST = 0x0000_0020;
65 const TARFS = 0x0000_0040;
67 const BATCH = 0x0000_0080;
69 const ENCRYPTED = 0x0000_0100;
71 const HAS_TAR_HEADER = 0x1000_0000;
73 const HAS_TOC = 0x2000_0000;
75 const CAP_TAR_TOC = 0x4000_0000;
78 const _V5_NO_EXT_BLOB_TABLE = 0x8000_0000;
80 const IS_CHUNKDICT_GENERATED = 0x0000_0200;
82 const IS_SEPARATED_WITH_PREFETCH_FILES = 0x0000_0400;
84 const EXTERNAL = 0x0000_0800;
86 }
87}
88
89impl Default for BlobFeatures {
90 fn default() -> Self {
91 BlobFeatures::empty()
92 }
93}
94
95impl BlobFeatures {
96 pub fn is_tarfs(&self) -> bool {
98 self.contains(BlobFeatures::CAP_TAR_TOC) && self.contains(BlobFeatures::TARFS)
99 }
100}
101
102impl TryFrom<u32> for BlobFeatures {
103 type Error = Error;
104
105 fn try_from(value: u32) -> Result<Self, Self::Error> {
106 if value & BLOB_FEATURE_INCOMPAT_MASK & !BLOB_FEATURE_INCOMPAT_VALUE != 0
107 || value & BlobFeatures::_V5_NO_EXT_BLOB_TABLE.bits() != 0
108 {
109 Err(einval!(format!("invalid blob features: 0x{:x}", value)))
110 } else {
111 Ok(unsafe { BlobFeatures::from_bits_unchecked(value) })
113 }
114 }
115}
116
117#[derive(Clone, Debug, Default)]
122pub struct BlobInfo {
123 blob_index: u32,
125 blob_id: String,
127 blob_features: BlobFeatures,
129 compressed_size: u64,
131 uncompressed_size: u64,
133 chunk_size: u32,
135 chunk_count: u32,
139 compressor: compress::Algorithm,
141 cipher: crypt::Algorithm,
143 digester: digest::Algorithm,
145 prefetch_offset: u32,
147 prefetch_size: u32,
149 is_legacy_stargz: bool,
151
152 meta_ci_compressor: u32,
154 meta_ci_offset: u64,
156 meta_ci_compressed_size: u64,
158 meta_ci_uncompressed_size: u64,
160
161 blob_toc_digest: [u8; 32],
164 blob_meta_digest: [u8; 32],
168 blob_meta_size: u64,
170 blob_toc_size: u32,
172
173 fs_cache_file: Option<Arc<File>>,
175 meta_path: Arc<Mutex<String>>,
177 cipher_object: Arc<Cipher>,
179 cipher_ctx: Option<CipherContext>,
181
182 is_chunkdict_generated: bool,
184}
185
186impl BlobInfo {
187 pub fn new(
189 blob_index: u32,
190 blob_id: String,
191 uncompressed_size: u64,
192 compressed_size: u64,
193 chunk_size: u32,
194 chunk_count: u32,
195 blob_features: BlobFeatures,
196 ) -> Self {
197 let blob_id = blob_id.trim_end_matches('\0').to_string();
198 let mut blob_info = BlobInfo {
199 blob_index,
200 blob_id,
201 blob_features,
202 uncompressed_size,
203 compressed_size,
204 chunk_size,
205 chunk_count,
206
207 compressor: compress::Algorithm::None,
208 cipher: crypt::Algorithm::None,
209 digester: digest::Algorithm::Blake3,
210 prefetch_offset: 0,
211 prefetch_size: 0,
212 is_legacy_stargz: false,
213 meta_ci_compressor: 0,
214 meta_ci_offset: 0,
215 meta_ci_compressed_size: 0,
216 meta_ci_uncompressed_size: 0,
217
218 blob_toc_digest: [0u8; 32],
219 blob_meta_digest: [0u8; 32],
220 blob_meta_size: 0,
221 blob_toc_size: 0,
222
223 fs_cache_file: None,
224 meta_path: Arc::new(Mutex::new(String::new())),
225 cipher_object: Default::default(),
226 cipher_ctx: None,
227
228 is_chunkdict_generated: false,
229 };
230
231 blob_info.compute_features();
232
233 blob_info
234 }
235
236 pub fn set_chunk_count(&mut self, count: usize) {
238 self.chunk_count = count as u32;
239 }
240
241 pub fn set_compressed_size(&mut self, size: usize) {
243 self.compressed_size = size as u64;
244 }
245
246 pub fn set_uncompressed_size(&mut self, size: usize) {
248 self.uncompressed_size = size as u64;
249 }
250
251 pub fn set_meta_ci_compressed_size(&mut self, size: usize) {
253 self.meta_ci_compressed_size = size as u64;
254 }
255
256 pub fn set_meta_ci_uncompressed_size(&mut self, size: usize) {
258 self.meta_ci_uncompressed_size = size as u64;
259 }
260
261 pub fn set_meta_ci_offset(&mut self, size: usize) {
263 self.meta_ci_offset = size as u64;
264 }
265
266 pub fn set_chunkdict_generated(&mut self, is_chunkdict_generated: bool) {
268 self.is_chunkdict_generated = is_chunkdict_generated;
269 }
270
271 pub fn is_chunkdict_generated(&self) -> bool {
273 self.is_chunkdict_generated
274 }
275
276 pub fn blob_index(&self) -> u32 {
278 self.blob_index
279 }
280
281 pub fn blob_id(&self) -> String {
283 if (!self.has_feature(BlobFeatures::EXTERNAL)
284 && self.has_feature(BlobFeatures::INLINED_FS_META)
285 && !self.has_feature(BlobFeatures::SEPARATE))
286 || !self.has_feature(BlobFeatures::CAP_TAR_TOC)
287 {
288 let guard = self.meta_path.lock().unwrap();
289 if !guard.is_empty() {
290 return guard.deref().clone();
291 }
292 }
293 self.blob_id.clone()
294 }
295
296 pub fn set_blob_id(&mut self, blob_id: String) {
298 self.blob_id = blob_id
299 }
300
301 pub fn raw_blob_id(&self) -> &str {
303 &self.blob_id
304 }
305
306 pub fn compressed_data_size(&self) -> u64 {
308 if self.has_feature(BlobFeatures::SEPARATE) {
309 self.compressed_size
311 } else if self.has_feature(BlobFeatures::CAP_TAR_TOC) {
312 if self.meta_ci_is_valid() {
314 if self.has_feature(BlobFeatures::HAS_TAR_HEADER) {
316 self.meta_ci_offset - 0x200
318 } else {
319 self.meta_ci_offset
320 }
321 } else {
322 if self.has_feature(BlobFeatures::HAS_TAR_HEADER) {
324 self.compressed_size - 0x200
326 } else {
327 self.compressed_size
328 }
329 }
330 } else {
331 self.compressed_size
333 }
334 }
335
336 pub fn compressed_size(&self) -> u64 {
338 self.compressed_size
339 }
340
341 pub fn uncompressed_size(&self) -> u64 {
343 self.uncompressed_size
344 }
345
346 pub fn chunk_size(&self) -> u32 {
348 self.chunk_size
349 }
350
351 pub fn chunk_count(&self) -> u32 {
353 self.chunk_count
354 }
355
356 pub fn compressor(&self) -> compress::Algorithm {
358 self.compressor
359 }
360
361 pub fn set_compressor(&mut self, compressor: compress::Algorithm) {
363 self.compressor = compressor;
364 self.compute_features();
365 }
366
367 pub fn cipher(&self) -> crypt::Algorithm {
369 self.cipher
370 }
371
372 pub fn set_cipher(&mut self, cipher: crypt::Algorithm) {
374 self.cipher = cipher;
375 }
376
377 pub fn cipher_object(&self) -> Arc<Cipher> {
379 self.cipher_object.clone()
380 }
381
382 pub fn cipher_context(&self) -> Option<CipherContext> {
384 self.cipher_ctx.clone()
385 }
386
387 pub fn set_cipher_info(
389 &mut self,
390 cipher: crypt::Algorithm,
391 cipher_object: Arc<Cipher>,
392 cipher_ctx: Option<CipherContext>,
393 ) {
394 self.cipher = cipher;
395 self.cipher_object = cipher_object;
396 self.cipher_ctx = cipher_ctx;
397 }
398
399 pub fn digester(&self) -> digest::Algorithm {
401 self.digester
402 }
403
404 pub fn set_digester(&mut self, digester: digest::Algorithm) {
406 self.digester = digester;
407 }
408
409 pub fn prefetch_offset(&self) -> u64 {
411 self.prefetch_offset as u64
412 }
413
414 pub fn prefetch_size(&self) -> u64 {
416 self.prefetch_size as u64
417 }
418
419 pub fn set_prefetch_info(&mut self, offset: u64, size: u64) {
424 self.prefetch_offset = offset as u32;
425 self.prefetch_size = size as u32;
426 }
427
428 pub fn is_legacy_stargz(&self) -> bool {
430 self.is_legacy_stargz
431 }
432
433 pub fn set_blob_meta_info(
438 &mut self,
439 offset: u64,
440 compressed_size: u64,
441 uncompressed_size: u64,
442 compressor: u32,
443 ) {
444 self.meta_ci_compressor = compressor;
445 self.meta_ci_offset = offset;
446 self.meta_ci_compressed_size = compressed_size;
447 self.meta_ci_uncompressed_size = uncompressed_size;
448 }
449
450 pub fn meta_ci_compressor(&self) -> compress::Algorithm {
452 if self.meta_ci_compressor == compress::Algorithm::Lz4Block as u32 {
453 compress::Algorithm::Lz4Block
454 } else if self.meta_ci_compressor == compress::Algorithm::GZip as u32 {
455 compress::Algorithm::GZip
456 } else if self.meta_ci_compressor == compress::Algorithm::Zstd as u32 {
457 compress::Algorithm::Zstd
458 } else {
459 compress::Algorithm::None
460 }
461 }
462
463 pub fn meta_ci_offset(&self) -> u64 {
465 self.meta_ci_offset
466 }
467
468 pub fn meta_ci_compressed_size(&self) -> u64 {
470 self.meta_ci_compressed_size
471 }
472
473 pub fn meta_ci_uncompressed_size(&self) -> u64 {
475 self.meta_ci_uncompressed_size
476 }
477
478 pub fn meta_ci_is_valid(&self) -> bool {
480 self.meta_ci_compressed_size != 0 && self.meta_ci_uncompressed_size != 0
481 }
482
483 pub fn set_fscache_file(&mut self, file: Option<Arc<File>>) {
485 self.fs_cache_file = file;
486 }
487
488 #[cfg(target_os = "linux")]
489 pub(crate) fn get_fscache_file(&self) -> Option<Arc<File>> {
491 self.fs_cache_file.clone()
492 }
493
494 pub fn features(&self) -> BlobFeatures {
496 self.blob_features
497 }
498
499 pub fn has_feature(&self, features: BlobFeatures) -> bool {
501 self.blob_features.bits() & features.bits() == features.bits()
502 }
503
504 pub fn is_external(&self) -> bool {
505 self.has_feature(BlobFeatures::EXTERNAL)
506 }
507
508 fn compute_features(&mut self) {
510 if self.chunk_count == 0 {
511 self.blob_features |= BlobFeatures::_V5_NO_EXT_BLOB_TABLE;
512 }
513 if self.compressor == compress::Algorithm::GZip
514 && !self.has_feature(BlobFeatures::CHUNK_INFO_V2)
515 {
516 self.is_legacy_stargz = true;
517 }
518 }
519
520 pub fn set_separated_with_prefetch_files_feature(&mut self, is_prefetchblob: bool) {
521 if is_prefetchblob {
522 self.blob_features |= BlobFeatures::IS_SEPARATED_WITH_PREFETCH_FILES;
523 }
524 }
525
526 pub fn blob_toc_digest(&self) -> &[u8; 32] {
530 &self.blob_toc_digest
531 }
532
533 pub fn set_blob_toc_digest(&mut self, digest: [u8; 32]) {
535 self.blob_toc_digest = digest;
536 }
537
538 pub fn blob_toc_size(&self) -> u32 {
540 self.blob_toc_size
541 }
542
543 pub fn set_blob_toc_size(&mut self, sz: u32) {
545 self.blob_toc_size = sz;
546 }
547
548 pub fn blob_meta_digest(&self) -> &[u8; 32] {
554 &self.blob_meta_digest
555 }
556
557 pub fn set_blob_meta_digest(&mut self, digest: [u8; 32]) {
559 self.blob_meta_digest = digest;
560 }
561
562 pub fn blob_meta_size(&self) -> u64 {
564 self.blob_meta_size
565 }
566
567 pub fn set_blob_meta_size(&mut self, size: u64) {
569 self.blob_meta_size = size;
570 }
571
572 pub fn set_blob_id_from_meta_path(&self, path: &Path) -> Result<(), Error> {
574 *self.meta_path.lock().unwrap() = Self::get_blob_id_from_meta_path(path)?;
575 Ok(())
576 }
577
578 pub fn get_blob_id_from_meta_path(path: &Path) -> Result<String, Error> {
579 let mut id = path.file_name().ok_or_else(|| {
581 einval!(format!(
582 "failed to get blob id from meta file path {}",
583 path.display()
584 ))
585 })?;
586 loop {
587 let id1 = Path::new(id).file_stem().ok_or_else(|| {
588 einval!(format!(
589 "failed to get blob id from meta file path {}",
590 path.display()
591 ))
592 })?;
593 if id1.is_empty() {
594 return Err(einval!(format!(
595 "failed to get blob id from meta file path {}",
596 path.display()
597 )));
598 } else if id == id1 {
599 break;
600 } else {
601 id = id1;
602 }
603 }
604 let id = id.to_str().ok_or_else(|| {
605 einval!(format!(
606 "failed to get blob id from meta file path {}",
607 path.display()
608 ))
609 })?;
610
611 Ok(id.to_string())
612 }
613
614 pub fn get_blob_meta_id(&self) -> Result<String, Error> {
616 assert!(self.has_feature(BlobFeatures::SEPARATE));
617 let id = if self.has_feature(BlobFeatures::INLINED_FS_META) {
618 let guard = self.meta_path.lock().unwrap();
619 if guard.is_empty() {
620 return Err(einval!("failed to get blob id from meta file name"));
621 }
622 guard.deref().clone()
623 } else {
624 hex::encode(self.blob_meta_digest)
625 };
626 Ok(id)
627 }
628
629 pub fn get_cipher_info(&self) -> (crypt::Algorithm, Arc<Cipher>, Option<CipherContext>) {
631 (
632 self.cipher,
633 self.cipher_object.clone(),
634 self.cipher_ctx.clone(),
635 )
636 }
637}
638
639bitflags! {
640 pub struct BlobChunkFlags: u32 {
642 const COMPRESSED = 0x0000_0001;
644 const _HOLECHUNK = 0x0000_0002;
646 const ENCRYPTED = 0x0000_0004;
648 const BATCH = 0x0000_0008;
650 const HAS_CRC32 = 0x0000_0010;
652 }
653}
654
655impl Default for BlobChunkFlags {
656 fn default() -> Self {
657 BlobChunkFlags::empty()
658 }
659}
660
661pub trait BlobChunkInfo: Any + Sync + Send {
673 fn chunk_id(&self) -> &RafsDigest;
675
676 fn id(&self) -> u32;
681
682 fn blob_index(&self) -> u32;
684
685 fn compressed_offset(&self) -> u64;
687
688 fn compressed_size(&self) -> u32;
690
691 fn compressed_end(&self) -> u64 {
693 self.compressed_offset() + self.compressed_size() as u64
694 }
695
696 fn uncompressed_offset(&self) -> u64;
698
699 fn uncompressed_size(&self) -> u32;
701
702 fn uncompressed_end(&self) -> u64 {
704 self.uncompressed_offset() + self.uncompressed_size() as u64
705 }
706
707 fn is_batch(&self) -> bool;
709
710 fn is_compressed(&self) -> bool;
715
716 fn is_encrypted(&self) -> bool;
718
719 fn has_crc32(&self) -> bool;
721
722 fn crc32(&self) -> u32;
724
725 fn as_any(&self) -> &dyn Any;
726}
727
728#[derive(Clone)]
733pub struct BlobIoChunk(Arc<dyn BlobChunkInfo>);
734
735impl From<Arc<dyn BlobChunkInfo>> for BlobIoChunk {
736 fn from(v: Arc<dyn BlobChunkInfo>) -> Self {
737 BlobIoChunk(v)
738 }
739}
740
741impl BlobChunkInfo for BlobIoChunk {
742 fn chunk_id(&self) -> &RafsDigest {
743 self.0.chunk_id()
744 }
745
746 fn id(&self) -> u32 {
747 self.0.id()
748 }
749
750 fn blob_index(&self) -> u32 {
751 self.0.blob_index()
752 }
753
754 fn compressed_offset(&self) -> u64 {
755 self.0.compressed_offset()
756 }
757
758 fn compressed_size(&self) -> u32 {
759 self.0.compressed_size()
760 }
761
762 fn uncompressed_offset(&self) -> u64 {
763 self.0.uncompressed_offset()
764 }
765
766 fn uncompressed_size(&self) -> u32 {
767 self.0.uncompressed_size()
768 }
769
770 fn is_batch(&self) -> bool {
771 self.0.is_batch()
772 }
773
774 fn is_compressed(&self) -> bool {
775 self.0.is_compressed()
776 }
777
778 fn is_encrypted(&self) -> bool {
779 self.0.is_encrypted()
780 }
781
782 fn has_crc32(&self) -> bool {
783 self.0.has_crc32()
784 }
785
786 fn crc32(&self) -> u32 {
787 self.0.crc32()
788 }
789
790 fn as_any(&self) -> &dyn Any {
791 self
792 }
793}
794
795#[derive(Clone)]
797pub struct BlobIoDesc {
798 pub blob: Arc<BlobInfo>,
800 pub chunkinfo: BlobIoChunk,
802 pub offset: u32,
804 pub size: u32,
806 pub(crate) user_io: bool,
811}
812
813impl BlobIoDesc {
814 pub fn new(
816 blob: Arc<BlobInfo>,
817 chunkinfo: BlobIoChunk,
818 offset: u32,
819 size: u32,
820 user_io: bool,
821 ) -> Self {
822 BlobIoDesc {
823 blob,
824 chunkinfo,
825 offset,
826 size,
827 user_io,
828 }
829 }
830
831 pub fn is_continuous(&self, next: &BlobIoDesc, max_gap: u64) -> bool {
833 let prev_end = self.chunkinfo.compressed_offset() + self.chunkinfo.compressed_size() as u64;
834 let next_offset = next.chunkinfo.compressed_offset();
835
836 if self.chunkinfo.is_batch() || next.chunkinfo.is_batch() {
837 return next.chunkinfo.uncompressed_offset() - self.chunkinfo.uncompressed_end()
839 <= max_gap;
840 }
841
842 if self.chunkinfo.blob_index() == next.chunkinfo.blob_index() && next_offset >= prev_end {
843 if next.blob.is_legacy_stargz() {
844 next_offset - prev_end <= max_gap * 8
845 } else {
846 next_offset - prev_end <= max_gap
847 }
848 } else {
849 false
850 }
851 }
852}
853
854impl Debug for BlobIoDesc {
855 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
856 f.debug_struct("BlobIoDesc")
857 .field("blob_index", &self.blob.blob_index)
858 .field("chunk_index", &self.chunkinfo.id())
859 .field("compressed_offset", &self.chunkinfo.compressed_offset())
860 .field("file_offset", &self.offset)
861 .field("size", &self.size)
862 .field("user", &self.user_io)
863 .finish()
864 }
865}
866
867pub struct BlobIoVec {
869 bi_blob: Arc<BlobInfo>,
871 bi_size: u64,
873 pub(crate) bi_vec: Vec<BlobIoDesc>,
875}
876
877impl BlobIoVec {
878 pub fn new(bi_blob: Arc<BlobInfo>) -> Self {
880 BlobIoVec {
881 bi_blob,
882 bi_size: 0,
883 bi_vec: Vec::with_capacity(128),
884 }
885 }
886
887 pub fn push(&mut self, desc: BlobIoDesc) {
889 assert_eq!(self.bi_blob.blob_index(), desc.blob.blob_index());
890 assert_eq!(self.bi_blob.blob_id(), desc.blob.blob_id());
891 assert!(self.bi_size.checked_add(desc.size as u64).is_some());
892 self.bi_size += desc.size as u64;
893 self.bi_vec.push(desc);
894 }
895
896 pub fn append(&mut self, mut vec: BlobIoVec) {
898 assert_eq!(self.bi_blob.blob_id(), vec.bi_blob.blob_id());
899 assert!(self.bi_size.checked_add(vec.bi_size).is_some());
900 self.bi_vec.append(vec.bi_vec.as_mut());
901 self.bi_size += vec.bi_size;
902 }
903
904 pub fn reset(&mut self) {
906 self.bi_size = 0;
907 self.bi_vec.truncate(0);
908 }
909
910 pub fn len(&self) -> usize {
912 self.bi_vec.len()
913 }
914
915 pub fn is_empty(&self) -> bool {
917 self.bi_vec.is_empty()
918 }
919
920 pub fn size(&self) -> u64 {
922 self.bi_size
923 }
924
925 pub fn blob_io_desc(&self, index: usize) -> Option<&BlobIoDesc> {
927 if index < self.bi_vec.len() {
928 Some(&self.bi_vec[index])
929 } else {
930 None
931 }
932 }
933
934 pub fn blob_index(&self) -> u32 {
936 self.bi_blob.blob_index()
937 }
938
939 pub fn is_target_blob(&self, blob_index: u32) -> bool {
941 self.bi_blob.blob_index() == blob_index
942 }
943
944 pub fn has_same_blob(&self, desc: &BlobIoVec) -> bool {
946 self.bi_blob.blob_index() == desc.bi_blob.blob_index()
947 }
948}
949
950impl Debug for BlobIoVec {
951 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
952 f.debug_struct("BlobIoDesc")
953 .field("blob_index", &self.bi_blob.blob_index)
954 .field("size", &self.bi_size)
955 .field("decriptors", &self.bi_vec)
956 .finish()
957 }
958}
959
960#[derive(Default)]
962pub struct BlobIoMerge {
963 map: HashMap<String, BlobIoVec>,
964 current: String,
965}
966
967impl BlobIoMerge {
968 pub fn append(&mut self, desc: BlobIoVec) {
970 if !desc.is_empty() {
971 let id = desc.bi_blob.blob_id.as_str();
972 if self.current != id {
973 self.current = id.to_string();
974 }
975 if let Some(prev) = self.map.get_mut(id) {
976 prev.append(desc);
977 } else {
978 self.map.insert(id.to_string(), desc);
979 }
980 }
981 }
982
983 pub fn drain(&mut self) -> Drain<'_, String, BlobIoVec> {
985 self.map.drain()
986 }
987
988 pub fn get_current_element(&mut self) -> Option<&mut BlobIoVec> {
990 self.map.get_mut(&self.current)
991 }
992}
993
994#[derive(Clone, Debug, Default)]
999pub(crate) struct BlobIoSegment {
1000 pub offset: u32,
1002 pub len: u32,
1004}
1005
1006impl BlobIoSegment {
1007 pub fn new(offset: u32, len: u32) -> Self {
1009 Self { offset, len }
1010 }
1011
1012 #[inline]
1013 pub fn append(&mut self, offset: u32, len: u32) {
1014 assert!(offset.checked_add(len).is_some());
1015 assert_eq!(offset, 0);
1016
1017 self.len += len;
1018 }
1019
1020 pub fn is_empty(&self) -> bool {
1021 self.offset == 0 && self.len == 0
1022 }
1023}
1024
1025#[derive(Clone, Debug)]
1027pub(crate) enum BlobIoTag {
1028 User(BlobIoSegment),
1030 Internal,
1032}
1033
1034impl BlobIoTag {
1035 pub fn is_user_io(&self) -> bool {
1037 matches!(self, BlobIoTag::User(_))
1038 }
1039}
1040
1041#[derive(Default, Clone)]
1050pub struct BlobIoRange {
1051 pub(crate) blob_info: Arc<BlobInfo>,
1052 pub(crate) blob_offset: u64,
1053 pub(crate) blob_size: u64,
1054 pub(crate) chunks: Vec<Arc<dyn BlobChunkInfo>>,
1055 pub(crate) tags: Vec<BlobIoTag>,
1056}
1057
1058impl Debug for BlobIoRange {
1059 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
1060 f.debug_struct("BlobIoRange")
1061 .field("blob_id", &self.blob_info.blob_id())
1062 .field("blob_offset", &self.blob_offset)
1063 .field("blob_size", &self.blob_size)
1064 .field("tags", &self.tags)
1065 .finish()
1066 }
1067}
1068
1069impl BlobIoRange {
1070 pub fn new(bio: &BlobIoDesc, capacity: usize) -> Self {
1072 let blob_size = bio.chunkinfo.compressed_size() as u64;
1073 let blob_offset = bio.chunkinfo.compressed_offset();
1074 assert!(blob_offset.checked_add(blob_size).is_some());
1075
1076 let mut chunks = Vec::with_capacity(capacity);
1077 let mut tags = Vec::with_capacity(capacity);
1078 tags.push(Self::tag_from_desc(bio));
1079 chunks.push(bio.chunkinfo.0.clone());
1080
1081 BlobIoRange {
1082 blob_info: bio.blob.clone(),
1083 blob_offset,
1084 blob_size,
1085 chunks,
1086 tags,
1087 }
1088 }
1089
1090 pub fn merge(&mut self, bio: &BlobIoDesc, _max_gap: u64) {
1092 let end = self.blob_offset + self.blob_size;
1093 let offset = bio.chunkinfo.compressed_offset();
1094 let size = bio.chunkinfo.compressed_size() as u64;
1095 let size = if end == offset {
1096 assert!(offset.checked_add(size).is_some());
1097 size
1098 } else {
1099 assert!(offset > end);
1100 size + (offset - end)
1101 };
1102 assert!(end.checked_add(size).is_some());
1103
1104 self.blob_size += size;
1105 self.tags.push(Self::tag_from_desc(bio));
1106 self.chunks.push(bio.chunkinfo.0.clone());
1107 }
1108
1109 fn tag_from_desc(bio: &BlobIoDesc) -> BlobIoTag {
1110 if bio.user_io {
1111 BlobIoTag::User(BlobIoSegment::new(bio.offset, bio.size as u32))
1112 } else {
1113 BlobIoTag::Internal
1114 }
1115 }
1116}
1117
1118pub struct BlobPrefetchRequest {
1125 pub blob_id: String,
1127 pub offset: u64,
1129 pub len: u64,
1131}
1132
1133pub trait BlobObject: AsRawFd {
1142 fn base_offset(&self) -> u64;
1144
1145 fn is_all_data_ready(&self) -> bool;
1147
1148 fn fetch_range_compressed(&self, offset: u64, size: u64, prefetch: bool) -> io::Result<()>;
1152
1153 fn fetch_range_uncompressed(&self, offset: u64, size: u64) -> io::Result<()>;
1158
1159 fn prefetch_chunks(&self, range: &BlobIoRange) -> io::Result<()>;
1163}
1164
1165#[derive(Clone, Default)]
1170pub struct BlobDevice {
1171 blobs: Arc<ArcSwap<Vec<Arc<dyn BlobCache>>>>,
1172 blob_count: usize,
1173}
1174
1175impl BlobDevice {
1176 pub fn new(config: &Arc<ConfigV2>, blob_infos: &[Arc<BlobInfo>]) -> io::Result<BlobDevice> {
1178 let mut blobs = Vec::with_capacity(blob_infos.len());
1179 for blob_info in blob_infos.iter() {
1180 let blob = BLOB_FACTORY.new_blob_cache(config, blob_info)?;
1181 blobs.push(blob);
1182 }
1183
1184 Ok(BlobDevice {
1185 blobs: Arc::new(ArcSwap::new(Arc::new(blobs))),
1186 blob_count: blob_infos.len(),
1187 })
1188 }
1189
1190 pub fn update(
1195 &self,
1196 config: &Arc<ConfigV2>,
1197 blob_infos: &[Arc<BlobInfo>],
1198 fs_prefetch: bool,
1199 ) -> io::Result<()> {
1200 if self.blobs.load().len() != blob_infos.len() {
1201 return Err(einval!(
1202 "number of blobs doesn't match when update 'BlobDevice' object"
1203 ));
1204 }
1205
1206 let mut blobs = Vec::with_capacity(blob_infos.len());
1207 for blob_info in blob_infos.iter() {
1208 let blob = BLOB_FACTORY.new_blob_cache(config, blob_info)?;
1209 blobs.push(blob);
1210 }
1211
1212 if fs_prefetch {
1213 self.stop_prefetch();
1217 }
1218 self.blobs.store(Arc::new(blobs));
1219 if fs_prefetch {
1220 self.start_prefetch();
1221 }
1222
1223 Ok(())
1224 }
1225
1226 pub fn close(&self) -> io::Result<()> {
1228 Ok(())
1229 }
1230
1231 pub fn has_device(&self) -> bool {
1233 self.blob_count > 0
1234 }
1235
1236 pub fn read_to(&self, w: &mut dyn ZeroCopyWriter, desc: &mut BlobIoVec) -> io::Result<usize> {
1238 if desc.bi_vec.is_empty() {
1243 if desc.bi_size == 0 {
1244 Ok(0)
1245 } else {
1246 Err(einval!("BlobIoVec size doesn't match."))
1247 }
1248 } else if desc.blob_index() as usize >= self.blob_count {
1249 Err(einval!("BlobIoVec has out of range blob_index."))
1250 } else {
1251 let size = desc.bi_size;
1252 let mut f = BlobDeviceIoVec::new(self, desc);
1253 w.write_from(&mut f, size as usize, 0)
1256 }
1257 }
1258
1259 pub fn prefetch(
1261 &self,
1262 io_vecs: &[&BlobIoVec],
1263 prefetches: &[BlobPrefetchRequest],
1264 ) -> io::Result<()> {
1265 for idx in 0..prefetches.len() {
1266 if let Some(blob) = self.get_blob_by_id(&prefetches[idx].blob_id) {
1267 let _ = blob.prefetch(blob.clone(), &prefetches[idx..idx + 1], &[]);
1268 }
1269 }
1270
1271 for io_vec in io_vecs.iter() {
1272 if let Some(blob) = self.get_blob_by_iovec(io_vec) {
1273 let _ = blob
1275 .prefetch(blob.clone(), &[], &io_vec.bi_vec)
1276 .map_err(|e| {
1277 error!("failed to prefetch blob data, {}", e);
1278 });
1279 }
1280 }
1281
1282 Ok(())
1283 }
1284
1285 pub fn start_prefetch(&self) {
1287 for blob in self.blobs.load().iter() {
1288 let _ = blob.start_prefetch();
1289 }
1290 }
1291
1292 pub fn stop_prefetch(&self) {
1294 for blob in self.blobs.load().iter() {
1295 let _ = blob.stop_prefetch();
1296 }
1297 }
1298
1299 pub fn fetch_range_synchronous(&self, prefetches: &[BlobPrefetchRequest]) -> io::Result<()> {
1301 for req in prefetches {
1302 if req.len == 0 {
1303 continue;
1304 }
1305 if let Some(cache) = self.get_blob_by_id(&req.blob_id) {
1306 trace!(
1307 "fetch blob {} offset {} size {}",
1308 req.blob_id,
1309 req.offset,
1310 req.len
1311 );
1312 if let Some(obj) = cache.get_blob_object() {
1313 obj.fetch_range_uncompressed(req.offset as u64, req.len as u64)
1314 .map_err(|e| {
1315 warn!(
1316 "Failed to prefetch data from blob {}, offset {}, size {}, {}",
1317 cache.blob_id(),
1318 req.offset,
1319 req.len,
1320 e
1321 );
1322 e
1323 })?;
1324 } else {
1325 error!("No support for fetching uncompressed blob data");
1326 return Err(einval!("No support for fetching uncompressed blob data"));
1327 }
1328 }
1329 }
1330
1331 Ok(())
1332 }
1333
1334 pub fn all_chunks_ready(&self, io_vecs: &[BlobIoVec]) -> bool {
1336 for io_vec in io_vecs.iter() {
1337 if let Some(blob) = self.get_blob_by_iovec(io_vec) {
1338 let chunk_map = blob.get_chunk_map();
1339 for desc in io_vec.bi_vec.iter() {
1340 if !chunk_map.is_ready(&desc.chunkinfo).unwrap_or(false) {
1341 return false;
1342 }
1343 }
1344 } else {
1345 return false;
1346 }
1347 }
1348
1349 true
1350 }
1351
1352 pub fn create_io_chunk(&self, blob_index: u32, chunk_index: u32) -> Option<BlobIoChunk> {
1354 if (blob_index as usize) < self.blob_count {
1355 let state = self.blobs.load();
1356 let blob = &state[blob_index as usize];
1357 blob.get_chunk_info(chunk_index).map(|v| v.into())
1358 } else {
1359 None
1360 }
1361 }
1362
1363 pub fn get_chunk_info(
1365 &self,
1366 blob_index: u32,
1367 chunk_index: u32,
1368 ) -> Option<Arc<dyn BlobChunkInfo>> {
1369 if (blob_index as usize) < self.blob_count {
1370 let state = self.blobs.load();
1371 let blob = &state[blob_index as usize];
1372 blob.get_chunk_info(chunk_index)
1373 } else {
1374 None
1375 }
1376 }
1377
1378 fn get_blob_by_iovec(&self, iovec: &BlobIoVec) -> Option<Arc<dyn BlobCache>> {
1379 let blob_index = iovec.blob_index();
1380 if (blob_index as usize) < self.blob_count {
1381 return Some(self.blobs.load()[blob_index as usize].clone());
1382 }
1383
1384 None
1385 }
1386
1387 fn get_blob_by_id(&self, blob_id: &str) -> Option<Arc<dyn BlobCache>> {
1388 for blob in self.blobs.load().iter() {
1389 if blob.blob_id() == blob_id {
1390 return Some(blob.clone());
1391 }
1392 }
1393
1394 None
1395 }
1396}
1397
1398struct BlobDeviceIoVec<'a> {
1403 dev: &'a BlobDevice,
1404 iovec: &'a mut BlobIoVec,
1405}
1406
1407impl<'a> BlobDeviceIoVec<'a> {
1408 fn new(dev: &'a BlobDevice, iovec: &'a mut BlobIoVec) -> Self {
1409 BlobDeviceIoVec { dev, iovec }
1410 }
1411}
1412
1413impl FileReadWriteVolatile for BlobDeviceIoVec<'_> {
1414 fn read_volatile(&mut self, _slice: FileVolatileSlice) -> Result<usize, Error> {
1415 unimplemented!();
1417 }
1418
1419 fn write_volatile(&mut self, _slice: FileVolatileSlice) -> Result<usize, Error> {
1420 unimplemented!();
1422 }
1423
1424 fn read_at_volatile(&mut self, slice: FileVolatileSlice, offset: u64) -> Result<usize, Error> {
1425 let buffers = [slice];
1426 self.read_vectored_at_volatile(&buffers, offset)
1427 }
1428
1429 fn read_vectored_at_volatile(
1431 &mut self,
1432 buffers: &[FileVolatileSlice],
1433 _offset: u64,
1434 ) -> Result<usize, Error> {
1435 let index = self.iovec.blob_index();
1437 let blobs = &self.dev.blobs.load();
1438
1439 if (index as usize) < blobs.len() {
1440 blobs[index as usize].read(self.iovec, buffers)
1441 } else {
1442 let msg = format!(
1443 "failed to get blob object for BlobIoVec, index {}, blob array len: {}",
1444 index,
1445 blobs.len()
1446 );
1447 Err(einval!(msg))
1448 }
1449 }
1450
1451 fn write_at_volatile(
1452 &mut self,
1453 _slice: FileVolatileSlice,
1454 _offset: u64,
1455 ) -> Result<usize, Error> {
1456 unimplemented!()
1457 }
1458}
1459
1460pub mod v5 {
1469 use super::*;
1470
1471 pub trait BlobV5ChunkInfo: BlobChunkInfo {
1478 fn index(&self) -> u32;
1480
1481 fn file_offset(&self) -> u64;
1483
1484 fn flags(&self) -> BlobChunkFlags;
1486
1487 fn as_base(&self) -> &dyn BlobChunkInfo;
1489 }
1490}
1491
1492#[cfg(test)]
1493mod tests {
1494 use std::path::PathBuf;
1495
1496 use super::*;
1497 use crate::test::MockChunkInfo;
1498
1499 #[test]
1500 fn test_blob_io_chunk() {
1501 let chunk: Arc<dyn BlobChunkInfo> = Arc::new(MockChunkInfo {
1502 block_id: Default::default(),
1503 blob_index: 0,
1504 flags: Default::default(),
1505 compress_size: 0x100,
1506 uncompress_size: 0x200,
1507 compress_offset: 0x1000,
1508 uncompress_offset: 0x2000,
1509 file_offset: 0,
1510 index: 3,
1511 crc32: 0,
1512 });
1513 let iochunk: BlobIoChunk = chunk.clone().into();
1514
1515 assert_eq!(iochunk.id(), 3);
1516 assert_eq!(iochunk.compressed_offset(), 0x1000);
1517 assert_eq!(iochunk.compressed_size(), 0x100);
1518 assert_eq!(iochunk.uncompressed_offset(), 0x2000);
1519 assert_eq!(iochunk.uncompressed_size(), 0x200);
1520 assert!(!iochunk.is_compressed());
1521 }
1522
1523 #[test]
1524 fn test_chunk_is_continuous() {
1525 let blob_info = Arc::new(BlobInfo::new(
1526 1,
1527 "test1".to_owned(),
1528 0x200000,
1529 0x100000,
1530 0x100000,
1531 512,
1532 BlobFeatures::_V5_NO_EXT_BLOB_TABLE,
1533 ));
1534 let chunk1 = Arc::new(MockChunkInfo {
1535 block_id: Default::default(),
1536 blob_index: 1,
1537 flags: BlobChunkFlags::empty(),
1538 compress_size: 0x800,
1539 uncompress_size: 0x1000,
1540 compress_offset: 0,
1541 uncompress_offset: 0,
1542 file_offset: 0,
1543 index: 0,
1544 crc32: 0,
1545 }) as Arc<dyn BlobChunkInfo>;
1546 let chunk2 = Arc::new(MockChunkInfo {
1547 block_id: Default::default(),
1548 blob_index: 1,
1549 flags: BlobChunkFlags::empty(),
1550 compress_size: 0x800,
1551 uncompress_size: 0x1000,
1552 compress_offset: 0x800,
1553 uncompress_offset: 0x1000,
1554 file_offset: 0x1000,
1555 index: 1,
1556 crc32: 0,
1557 }) as Arc<dyn BlobChunkInfo>;
1558 let chunk3 = Arc::new(MockChunkInfo {
1559 block_id: Default::default(),
1560 blob_index: 1,
1561 flags: BlobChunkFlags::empty(),
1562 compress_size: 0x800,
1563 uncompress_size: 0x1000,
1564 compress_offset: 0x1800,
1565 uncompress_offset: 0x3000,
1566 file_offset: 0x3000,
1567 index: 1,
1568 crc32: 0,
1569 }) as Arc<dyn BlobChunkInfo>;
1570
1571 let desc1 = BlobIoDesc {
1572 blob: blob_info.clone(),
1573 chunkinfo: chunk1.into(),
1574 offset: 0,
1575 size: 0x1000,
1576 user_io: true,
1577 };
1578 let desc2 = BlobIoDesc {
1579 blob: blob_info.clone(),
1580 chunkinfo: chunk2.into(),
1581 offset: 0,
1582 size: 0x1000,
1583 user_io: true,
1584 };
1585 let desc3 = BlobIoDesc {
1586 blob: blob_info,
1587 chunkinfo: chunk3.into(),
1588 offset: 0,
1589 size: 0x1000,
1590 user_io: true,
1591 };
1592
1593 assert!(desc1.is_continuous(&desc2, 0x0));
1594 assert!(desc1.is_continuous(&desc2, 0x1000));
1595 assert!(!desc2.is_continuous(&desc1, 0x1000));
1596 assert!(!desc2.is_continuous(&desc1, 0x0));
1597
1598 assert!(!desc1.is_continuous(&desc3, 0x0));
1599 assert!(!desc1.is_continuous(&desc3, 0x400));
1600 assert!(!desc1.is_continuous(&desc3, 0x800));
1601 assert!(desc1.is_continuous(&desc3, 0x1000));
1602
1603 assert!(!desc2.is_continuous(&desc3, 0x0));
1604 assert!(!desc2.is_continuous(&desc3, 0x400));
1605 assert!(desc2.is_continuous(&desc3, 0x800));
1606 assert!(desc2.is_continuous(&desc3, 0x1000));
1607 }
1608
1609 #[test]
1610 fn test_append_same_blob_with_diff_index() {
1611 let blob1 = Arc::new(BlobInfo::new(
1612 1,
1613 "test1".to_owned(),
1614 0x200000,
1615 0x100000,
1616 0x100000,
1617 512,
1618 BlobFeatures::_V5_NO_EXT_BLOB_TABLE,
1619 ));
1620 let chunk1 = Arc::new(MockChunkInfo {
1621 block_id: Default::default(),
1622 blob_index: 1,
1623 flags: BlobChunkFlags::empty(),
1624 compress_size: 0x800,
1625 uncompress_size: 0x1000,
1626 compress_offset: 0,
1627 uncompress_offset: 0,
1628 file_offset: 0,
1629 index: 0,
1630 crc32: 0,
1631 }) as Arc<dyn BlobChunkInfo>;
1632 let mut iovec = BlobIoVec::new(blob1.clone());
1633 iovec.push(BlobIoDesc::new(blob1, BlobIoChunk(chunk1), 0, 0x1000, true));
1634
1635 let blob2 = Arc::new(BlobInfo::new(
1636 2, "test1".to_owned(), 0x200000,
1639 0x100000,
1640 0x100000,
1641 512,
1642 BlobFeatures::_V5_NO_EXT_BLOB_TABLE,
1643 ));
1644 let chunk2 = Arc::new(MockChunkInfo {
1645 block_id: Default::default(),
1646 blob_index: 2,
1647 flags: BlobChunkFlags::empty(),
1648 compress_size: 0x800,
1649 uncompress_size: 0x1000,
1650 compress_offset: 0x800,
1651 uncompress_offset: 0x1000,
1652 file_offset: 0x1000,
1653 index: 1,
1654 crc32: 0,
1655 }) as Arc<dyn BlobChunkInfo>;
1656 let mut iovec2 = BlobIoVec::new(blob2.clone());
1657 iovec2.push(BlobIoDesc::new(blob2, BlobIoChunk(chunk2), 0, 0x1000, true));
1658
1659 iovec.append(iovec2);
1660 assert_eq!(0x2000, iovec.bi_size);
1661 }
1662
1663 #[test]
1664 fn test_extend_large_blob_io_vec() {
1665 let size = 0x2_0000_0000; let chunk_size = 0x10_0000; let chunk_count = (size / chunk_size as u64) as u32;
1668 let large_blob = Arc::new(BlobInfo::new(
1669 0,
1670 "blob_id".to_owned(),
1671 size,
1672 size,
1673 chunk_size,
1674 chunk_count,
1675 BlobFeatures::default(),
1676 ));
1677
1678 let mut iovec = BlobIoVec::new(large_blob.clone());
1679 let mut iovec2 = BlobIoVec::new(large_blob.clone());
1680
1681 for chunk_idx in 0..chunk_count {
1683 let chunk = Arc::new(MockChunkInfo {
1684 block_id: Default::default(),
1685 blob_index: large_blob.blob_index,
1686 flags: BlobChunkFlags::empty(),
1687 compress_size: chunk_size,
1688 compress_offset: chunk_idx as u64 * chunk_size as u64,
1689 uncompress_size: 2 * chunk_size,
1690 uncompress_offset: 2 * chunk_idx as u64 * chunk_size as u64,
1691 file_offset: 2 * chunk_idx as u64 * chunk_size as u64,
1692 index: chunk_idx as u32,
1693 crc32: 0,
1694 }) as Arc<dyn BlobChunkInfo>;
1695 let desc = BlobIoDesc::new(large_blob.clone(), BlobIoChunk(chunk), 0, chunk_size, true);
1696 if chunk_idx < chunk_count / 2 {
1697 iovec.push(desc);
1698 } else {
1699 iovec2.push(desc)
1700 }
1701 }
1702
1703 iovec.append(iovec2);
1705
1706 assert_eq!(size, iovec.size());
1707 assert_eq!(chunk_count, iovec.len() as u32);
1708 }
1709
1710 #[test]
1711 fn test_blob_info_blob_meta_id() {
1712 let blob_info = BlobInfo::new(
1713 1,
1714 "blob_id".to_owned(),
1715 0,
1716 0,
1717 0,
1718 1,
1719 BlobFeatures::SEPARATE | BlobFeatures::INLINED_FS_META,
1720 );
1721
1722 let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR");
1723 let mut source_path = PathBuf::from(root_dir);
1724 source_path.push("../tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef");
1725
1726 assert!(blob_info
1727 .set_blob_id_from_meta_path(source_path.as_path())
1728 .is_ok());
1729
1730 let id = blob_info.get_blob_meta_id();
1731 assert!(id.is_ok());
1732 assert_eq!(
1733 id.unwrap(),
1734 "be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef".to_owned()
1735 );
1736 }
1737}