nydus-rafs 0.1.0

The RAFS filesystem format for Nydus Image Service
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
// Copyright 2020 Ant Group. All rights reserved.
// Copyright (C) 2020 Alibaba Cloud. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0

//! Structs and Traits for RAFS file system meta data management.

use std::any::Any;
use std::collections::HashSet;
use std::ffi::{OsStr, OsString};
use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
use std::fs::OpenOptions;
use std::io::{Error, Result};
use std::os::unix::ffi::OsStrExt;
use std::path::{Component, Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::bail;

use fuse_backend_rs::abi::fuse_abi::Attr;
use fuse_backend_rs::api::filesystem::{Entry, ROOT_ID};
use nydus_utils::compress;
use nydus_utils::digest::{self, RafsDigest};
use serde::Serialize;
use serde_with::{serde_as, DisplayFromStr};
use storage::device::{BlobChunkInfo, BlobInfo, BlobIoVec};

use self::layout::{XattrName, XattrValue, RAFS_SUPER_VERSION_V5, RAFS_SUPER_VERSION_V6};
use self::noop::NoopSuperBlock;
use crate::fs::{RafsConfig, RAFS_DEFAULT_ATTR_TIMEOUT, RAFS_DEFAULT_ENTRY_TIMEOUT};
use crate::{RafsError, RafsIoReader, RafsIoWrite, RafsResult};

pub mod cached_v5;
pub mod direct_v5;
pub mod direct_v6;
pub mod layout;
mod md_v5;
mod md_v6;
mod noop;

pub use storage::{RAFS_DEFAULT_CHUNK_SIZE, RAFS_MAX_CHUNK_SIZE};

/// Maximum size of blob id string.
pub const RAFS_BLOB_ID_MAX_LENGTH: usize = 64;
/// Block size reported to fuse by get_attr().
pub const RAFS_ATTR_BLOCK_SIZE: u32 = 4096;
/// Maximum size of file name supported by rafs.
pub const RAFS_MAX_NAME: usize = 255;
/// Maximum size of the rafs metadata blob.
pub const RAFS_MAX_METADATA_SIZE: usize = 0x8000_0000;
/// File name for Unix current directory.
pub const DOT: &str = ".";
/// File name for Unix parent directory.
pub const DOTDOT: &str = "..";

/// Type of RAFS inode number.
pub type Inode = u64;

/// Trait to get information about inodes supported by the filesystem instance.
pub trait RafsSuperInodes {
    /// Get the maximum inode number supported by the filesystem instance.
    fn get_max_ino(&self) -> Inode;

    /// Get a `RafsInode` trait object for an inode, validating the inode content if requested.
    fn get_inode(&self, ino: Inode, digest_validate: bool) -> Result<Arc<dyn RafsInode>>;

    /// Validate the content of inode itself, optionally recursively validate into children.
    fn validate_digest(
        &self,
        inode: Arc<dyn RafsInode>,
        recursive: bool,
        digester: digest::Algorithm,
    ) -> Result<bool>;
}

/// Trait to access Rafs filesystem superblock and inodes.
pub trait RafsSuperBlock: RafsSuperInodes + Send + Sync {
    /// Load the super block from a reader.
    fn load(&mut self, r: &mut RafsIoReader) -> Result<()>;

    /// Update Rafs filesystem metadata and storage backend.
    fn update(&self, r: &mut RafsIoReader) -> RafsResult<()>;

    /// Destroy a Rafs filesystem super block.
    fn destroy(&mut self);

    /// Get all blob information objects used by the filesystem.
    fn get_blob_infos(&self) -> Vec<Arc<BlobInfo>>;

    fn root_ino(&self) -> u64;

    /// Get a chunk info.
    fn get_chunk_info(&self, _idx: usize) -> Result<Arc<dyn BlobChunkInfo>> {
        unimplemented!()
    }
}

pub enum PostWalkAction {
    Continue,
    Break,
}

pub type ChildInodeHandler<'a> =
    &'a mut dyn FnMut(Option<Arc<dyn RafsInode>>, OsString, u64, u64) -> Result<PostWalkAction>;

/// Trait to access metadata and data for an inode.
///
/// The RAFS filesystem is a readonly filesystem, so does its inodes. The `RafsInode` trait acts
/// as field accessors for those readonly inodes, to hide implementation details.
pub trait RafsInode: Any {
    /// Validate the node for data integrity.
    ///
    /// The inode object may be transmuted from a raw buffer, read from an external file, so the
    /// caller must validate it before accessing any fields.
    fn validate(&self, max_inode: Inode, chunk_size: u64) -> Result<()>;

    /// Get `Entry` of the inode.
    fn get_entry(&self) -> Entry;

    /// Get `Attr` of the inode.
    fn get_attr(&self) -> Attr;

    /// Get file name size of the inode.
    fn get_name_size(&self) -> u16;

    /// Get symlink target of the inode if it's a symlink.
    fn get_symlink(&self) -> Result<OsString>;

    /// Get size of symlink.
    fn get_symlink_size(&self) -> u16;

    /// Get child inode of a directory by name.
    fn get_child_by_name(&self, name: &OsStr) -> Result<Arc<dyn RafsInode>>;

    fn walk_children_inodes(&self, entry_offset: u64, handler: ChildInodeHandler) -> Result<()>;

    /// Get child inode of a directory by child index, child index starting at 0.
    fn get_child_by_index(&self, idx: u32) -> Result<Arc<dyn RafsInode>>;

    /// Get number of directory's child inode.
    fn get_child_count(&self) -> u32;

    /// Get the index into the inode table of the directory's first child.
    fn get_child_index(&self) -> Result<u32>;

    /// Get number of data chunk of a normal file.
    fn get_chunk_count(&self) -> u32;

    /// Get chunk info object for a chunk.
    fn get_chunk_info(&self, idx: u32) -> Result<Arc<dyn BlobChunkInfo>>;

    /// Check whether the inode has extended attributes.
    fn has_xattr(&self) -> bool;

    /// Get the value of xattr with key `name`.
    fn get_xattr(&self, name: &OsStr) -> Result<Option<XattrValue>>;

    /// Get all xattr keys.
    fn get_xattrs(&self) -> Result<Vec<XattrName>>;

    /// Check whether the inode is a directory.
    fn is_dir(&self) -> bool;

    /// Check whether the inode is a symlink.
    fn is_symlink(&self) -> bool;

    /// Check whether the inode is a regular file.
    fn is_reg(&self) -> bool;

    /// Check whether the inode is a hardlink.
    fn is_hardlink(&self) -> bool;

    /// Get the inode number of the inode.
    fn ino(&self) -> u64;

    /// Get file name of the inode.
    fn name(&self) -> OsString;

    /// Get inode number of the parent directory.
    fn parent(&self) -> u64;

    /// Get real device number of the inode.
    fn rdev(&self) -> u32;

    /// Get flags of the inode.
    fn flags(&self) -> u64;

    /// Get project id associated with the inode.
    fn projid(&self) -> u32;

    /// Get data size of the inode.
    fn size(&self) -> u64;

    /// Check whether the inode has no content.
    fn is_empty_size(&self) -> bool {
        self.size() == 0
    }

    /// Get digest value of the inode metadata.
    fn get_digest(&self) -> RafsDigest;

    /// Collect all descendants of the inode for image building.
    fn collect_descendants_inodes(
        &self,
        descendants: &mut Vec<Arc<dyn RafsInode>>,
    ) -> Result<usize>;

    /// Allocate blob io vectors to read file data in range [offset, offset + size).
    fn alloc_bio_vecs(&self, offset: u64, size: usize, user_io: bool) -> Result<Vec<BlobIoVec>>;

    fn as_any(&self) -> &dyn Any;

    fn walk_chunks(
        &self,
        cb: &mut dyn FnMut(&dyn BlobChunkInfo) -> anyhow::Result<()>,
    ) -> anyhow::Result<()> {
        let chunk_count = self.get_chunk_count();
        for i in 0..chunk_count {
            cb(self.get_chunk_info(i)?.as_ref())?;
        }
        Ok(())
    }
}

/// Trait to store Rafs meta block and validate alignment.
pub trait RafsStore {
    /// Write the Rafs filesystem metadata to a writer.
    fn store(&self, w: &mut dyn RafsIoWrite) -> Result<usize>;
}

bitflags! {
    /// Rafs filesystem feature flags.
    #[derive(Serialize)]
    pub struct RafsSuperFlags: u64 {
        /// V5: Data chunks are not compressed.
        const COMPRESS_NONE = 0x0000_0001;
        /// V5: Data chunks are compressed with lz4_block.
        const COMPRESS_LZ4_BLOCK = 0x0000_0002;
        /// V5: Use blake3 hash algorithm to calculate digest.
        const DIGESTER_BLAKE3 = 0x0000_0004;
        /// V5: Use sha256 hash algorithm to calculate digest.
        const DIGESTER_SHA256 = 0x0000_0008;
        /// Inode has explicit uid gid fields.
        ///
        /// If unset, use nydusd process euid/egid for all inodes at runtime.
        const EXPLICIT_UID_GID = 0x0000_0010;
        /// Inode has extended attributes.
        const HAS_XATTR = 0x0000_0020;
        // V5: Data chunks are compressed with gzip
        const COMPRESS_GZIP = 0x0000_0040;
        // V5: Data chunks are compressed with zstd
        const COMPRESS_ZSTD = 0x0000_0080;
    }
}

impl Default for RafsSuperFlags {
    fn default() -> Self {
        RafsSuperFlags::empty()
    }
}

impl Display for RafsSuperFlags {
    fn fmt(&self, f: &mut Formatter) -> FmtResult {
        write!(f, "{:?}", self)?;
        Ok(())
    }
}

/// Rafs filesystem meta-data cached from on disk RAFS super block.
#[serde_as]
#[derive(Clone, Copy, Debug, Serialize)]
pub struct RafsSuperMeta {
    /// Filesystem magic number.
    pub magic: u32,
    /// Filesystem version number.
    pub version: u32,
    /// Size of on disk super block.
    pub sb_size: u32,
    /// Inode number of root inode.
    pub root_inode: Inode,
    /// Chunk size.
    pub chunk_size: u32,
    /// Number of inodes in the filesystem.
    pub inodes_count: u64,
    #[serde_as(as = "DisplayFromStr")]
    /// V5: superblock flags for Rafs v5.
    pub flags: RafsSuperFlags,
    /// Number of inode entries in inode offset table.
    pub inode_table_entries: u32,
    /// Offset of the inode offset table into the metadata blob.
    pub inode_table_offset: u64,
    /// Size of blob information table.
    pub blob_table_size: u32,
    /// Offset of the blob information table into the metadata blob.
    pub blob_table_offset: u64,
    /// Size of extended blob information table.
    pub extended_blob_table_offset: u64,
    /// Offset of the extended blob information table into the metadata blob.
    pub extended_blob_table_entries: u32,
    /// Start of data prefetch range.
    pub blob_readahead_offset: u32,
    /// Size of data prefetch range.
    pub blob_readahead_size: u32,
    /// Offset of the inode prefetch table into the metadata blob.
    pub prefetch_table_offset: u64,
    /// Size of the inode prefetch table.
    pub prefetch_table_entries: u32,
    /// Default attribute timeout value.
    pub attr_timeout: Duration,
    /// Default inode timeout value.
    pub entry_timeout: Duration,
    pub meta_blkaddr: u32,
    pub root_nid: u16,
    pub is_chunk_dict: bool,
    /// Offset of the chunk table
    pub chunk_table_offset: u64,
    /// Size  of the chunk table
    pub chunk_table_size: u64,
}

impl RafsSuperMeta {
    /// Check whether the superblock is for Rafs v4/v5 filesystems.
    pub fn is_v5(&self) -> bool {
        self.version == RAFS_SUPER_VERSION_V5
    }

    /// Check whether the superblock is for Rafs v6 filesystems.
    pub fn is_v6(&self) -> bool {
        self.version == RAFS_SUPER_VERSION_V6
    }

    pub fn is_chunk_dict(&self) -> bool {
        self.is_chunk_dict
    }

    /// Check whether the explicit UID/GID feature has been enable or not.
    pub fn explicit_uidgid(&self) -> bool {
        self.flags.contains(RafsSuperFlags::EXPLICIT_UID_GID)
    }

    /// Check whether the filesystem supports extended attribute or not.
    pub fn has_xattr(&self) -> bool {
        self.flags.contains(RafsSuperFlags::HAS_XATTR)
    }

    /// Get compression algorithm to handle chunk data for the filesystem.
    pub fn get_compressor(&self) -> compress::Algorithm {
        if self.is_v5() || self.is_v6() {
            self.flags.into()
        } else {
            compress::Algorithm::None
        }
    }

    /// V5: get message digest algorithm to validate chunk data for the filesystem.
    pub fn get_digester(&self) -> digest::Algorithm {
        if self.is_v5() || self.is_v6() {
            self.flags.into()
        } else {
            digest::Algorithm::Blake3
        }
    }
}

impl Default for RafsSuperMeta {
    fn default() -> Self {
        RafsSuperMeta {
            magic: 0,
            version: 0,
            sb_size: 0,
            inodes_count: 0,
            root_inode: 0,
            chunk_size: 0,
            flags: RafsSuperFlags::empty(),
            inode_table_entries: 0,
            inode_table_offset: 0,
            blob_table_size: 0,
            blob_table_offset: 0,
            extended_blob_table_offset: 0,
            extended_blob_table_entries: 0,
            blob_readahead_offset: 0,
            blob_readahead_size: 0,
            prefetch_table_offset: 0,
            prefetch_table_entries: 0,
            attr_timeout: Duration::from_secs(RAFS_DEFAULT_ATTR_TIMEOUT),
            entry_timeout: Duration::from_secs(RAFS_DEFAULT_ENTRY_TIMEOUT),
            meta_blkaddr: 0,
            root_nid: 0,
            is_chunk_dict: false,
            chunk_table_offset: 0,
            chunk_table_size: 0,
        }
    }
}

/// Rafs metadata working mode.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum RafsMode {
    /// Directly mapping and accessing metadata into process by mmap().
    Direct,
    /// Read metadata into memory before using.
    Cached,
}

impl FromStr for RafsMode {
    type Err = Error;

    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
        match s {
            "direct" => Ok(Self::Direct),
            "cached" => Ok(Self::Cached),
            _ => Err(einval!("rafs mode should be direct or cached")),
        }
    }
}

impl Display for RafsMode {
    fn fmt(&self, f: &mut Formatter) -> FmtResult {
        match self {
            Self::Direct => write!(f, "direct"),
            Self::Cached => write!(f, "cached"),
        }
    }
}

/// Cached Rafs super block and inode information.
pub struct RafsSuper {
    /// Rafs metadata working mode.
    pub mode: RafsMode,
    /// Whether validate data read from storage backend.
    pub validate_digest: bool,
    /// Cached metadata from on disk super block.
    pub meta: RafsSuperMeta,
    /// Rafs filesystem super block.
    pub superblock: Arc<dyn RafsSuperBlock>,
}

impl Default for RafsSuper {
    fn default() -> Self {
        Self {
            mode: RafsMode::Direct,
            validate_digest: false,
            meta: RafsSuperMeta::default(),
            superblock: Arc::new(NoopSuperBlock::new()),
        }
    }
}

impl RafsSuper {
    /// Create a new `RafsSuper` instance from a `RafsConfig` object.
    pub fn new(conf: &RafsConfig) -> Result<Self> {
        Ok(Self {
            mode: RafsMode::from_str(conf.mode.as_str())?,
            validate_digest: conf.digest_validate,
            ..Default::default()
        })
    }

    /// Destroy the filesystem super block.
    pub fn destroy(&mut self) {
        Arc::get_mut(&mut self.superblock)
            .expect("Inodes are no longer used.")
            .destroy();
    }

    /// Load Rafs super block from a metadata file.
    pub fn load_from_metadata<P: AsRef<Path>>(
        path: P,
        mode: RafsMode,
        validate_digest: bool,
    ) -> Result<Self> {
        // open bootstrap file
        let file = OpenOptions::new()
            .read(true)
            .write(false)
            .open(path.as_ref())?;
        let mut rs = RafsSuper {
            mode,
            validate_digest,
            ..Default::default()
        };
        let mut reader = Box::new(file) as RafsIoReader;

        rs.load(&mut reader)?;

        Ok(rs)
    }

    pub fn load_chunk_dict_from_metadata(path: &Path) -> Result<Self> {
        // open bootstrap file
        let file = OpenOptions::new().read(true).write(false).open(path)?;
        let mut rs = RafsSuper {
            mode: RafsMode::Direct,
            validate_digest: true,
            ..Default::default()
        };
        let mut reader = Box::new(file) as RafsIoReader;

        rs.meta.is_chunk_dict = true;
        rs.load(&mut reader)?;

        Ok(rs)
    }

    /// Load RAFS metadata and optionally cache inodes.
    pub fn load(&mut self, r: &mut RafsIoReader) -> Result<()> {
        // Try to load the filesystem as Rafs v5
        if self.try_load_v5(r)? {
            return Ok(());
        }

        if self.try_load_v6(r)? {
            return Ok(());
        }

        Err(einval!("invalid superblock version number"))
    }

    /// Update the filesystem metadata and storage backend.
    pub fn update(&self, r: &mut RafsIoReader) -> RafsResult<()> {
        if self.meta.is_v5() {
            self.skip_v5_superblock(r)
                .map_err(RafsError::FillSuperblock)?;
        }

        self.superblock.update(r)
    }

    /// Store RAFS metadata to backend storage.
    pub fn store(&self, w: &mut dyn RafsIoWrite) -> Result<usize> {
        if self.meta.is_v5() {
            return self.store_v5(w);
        }

        Err(einval!("invalid superblock version number"))
    }

    /// Get an inode from an inode number, optionally validating the inode metadata.
    pub fn get_inode(&self, ino: Inode, digest_validate: bool) -> Result<Arc<dyn RafsInode>> {
        self.superblock.get_inode(ino, digest_validate)
    }

    /// Get the maximum inode number supported by the filesystem instance.
    pub fn get_max_ino(&self) -> Inode {
        self.superblock.get_max_ino()
    }

    /// Convert an inode number to a file path.
    pub fn path_from_ino(&self, ino: Inode) -> Result<PathBuf> {
        if ino == ROOT_ID {
            return Ok(self.get_inode(ino, false)?.name().into());
        }

        let mut path = PathBuf::new();
        let mut cur_ino = ino;
        let mut inode;

        loop {
            inode = self.get_inode(cur_ino, false)?;
            let e: PathBuf = inode.name().into();
            path = e.join(path);

            if inode.ino() == ROOT_ID {
                break;
            } else {
                cur_ino = inode.parent();
            }
        }

        Ok(path)
    }

    /// Convert a file path to an inode number.
    pub fn ino_from_path(&self, f: &Path) -> Result<u64> {
        let root_ino = self.superblock.root_ino();
        if f == Path::new("/") {
            return Ok(root_ino);
        }

        if !f.starts_with("/") {
            return Err(einval!());
        }

        let mut parent = self.get_inode(root_ino, self.validate_digest)?;

        let entries = f
            .components()
            .filter(|comp| *comp != Component::RootDir)
            .map(|comp| match comp {
                Component::Normal(name) => Some(name),
                Component::ParentDir => Some(OsStr::from_bytes(DOTDOT.as_bytes())),
                Component::CurDir => Some(OsStr::from_bytes(DOT.as_bytes())),
                _ => None,
            })
            .collect::<Vec<_>>();

        if entries.is_empty() {
            warn!("Path can't be parsed {:?}", f);
            return Err(enoent!());
        }

        for p in entries {
            if p.is_none() {
                error!("Illegal specified path {:?}", f);
                return Err(einval!());
            }

            // Safe because it already checks if p is None above.
            match parent.get_child_by_name(p.unwrap()) {
                Ok(p) => parent = p,
                Err(_) => {
                    warn!("File {:?} not in rafs", p.unwrap());
                    return Err(enoent!());
                }
            }
        }

        Ok(parent.ino())
    }

    /// Prefetch filesystem and file data to improve performance.
    ///
    /// To improve application filesystem access performance, the filesystem may prefetch file or
    /// metadata in advance. There are ways to configure the file list to be prefetched.
    /// 1. Static file prefetch list configured during image building, recorded in prefetch list
    ///    in Rafs v5 file system metadata.
    ///     Base on prefetch table which is persisted to bootstrap when building image.
    /// 2. Dynamic file prefetch list configured by command line. The dynamic file prefetch list
    ///    has higher priority and the static file prefetch list will be ignored if there's dynamic
    ///    prefetch list. When a directory is specified for dynamic prefetch list, all sub directory
    ///    and files under the directory will be prefetched.
    ///
    /// Each inode passed into should correspond to directory. And it already does the file type
    /// check inside.
    pub fn prefetch_files(
        &self,
        r: &mut RafsIoReader,
        files: Option<Vec<Inode>>,
        fetcher: &dyn Fn(&mut BlobIoVec),
    ) -> RafsResult<()> {
        // Try to prefetch files according to the list specified by the `--prefetch-files` option.
        if let Some(files) = files {
            // Avoid prefetching multiple times for hardlinks to the same file.
            let mut hardlinks: HashSet<u64> = HashSet::new();
            let mut head_desc = BlobIoVec {
                bi_size: 0,
                bi_flags: 0,
                bi_vec: Vec::new(),
            };

            for f_ino in files {
                self.prefetch_data(f_ino, &mut head_desc, &mut hardlinks, fetcher)
                    .map_err(|e| RafsError::Prefetch(e.to_string()))?;
            }
            // Flush the pending prefetch requests.
            fetcher(&mut head_desc);
            Ok(())
        } else if self.meta.is_v5() {
            self.prefetch_data_v5(r, fetcher).map(|_| ())
        } else if self.meta.is_v6() {
            self.prefetch_data_v6(r, fetcher).map(|_| ())
        } else {
            Err(RafsError::Prefetch(
                "Unknown filesystem version, prefetch disabled".to_string(),
            ))
        }
    }

    #[inline]
    fn prefetch_inode<F>(
        inode: &Arc<dyn RafsInode>,
        head_desc: &mut BlobIoVec,
        hardlinks: &mut HashSet<u64>,
        prefetcher: F,
    ) -> Result<()>
    where
        F: Fn(&mut BlobIoVec, bool),
    {
        // Check for duplicated hardlinks.
        if inode.is_hardlink() {
            if hardlinks.contains(&inode.ino()) {
                return Ok(());
            } else {
                hardlinks.insert(inode.ino());
            }
        }

        let descs = inode.alloc_bio_vecs(0, inode.size() as usize, false)?;
        for desc in descs {
            // Flush the pending prefetch if the next desc target a different blob.
            if !head_desc.has_same_blob(&desc) {
                prefetcher(head_desc, true);
            }
            head_desc.append(desc);
            prefetcher(head_desc, false);
        }

        Ok(())
    }

    fn prefetch_data<F>(
        &self,
        ino: u64,
        head_desc: &mut BlobIoVec,
        hardlinks: &mut HashSet<u64>,
        fetcher: F,
    ) -> Result<()>
    where
        F: Fn(&mut BlobIoVec),
    {
        let try_prefetch = |desc: &mut BlobIoVec, flush: bool| {
            // Issue a prefetch request since target is large enough.
            // As files belonging to the same directory are arranged in adjacent,
            // it should fetch a range of blob in batch.
            if flush || desc.bi_size >= (4 * RAFS_DEFAULT_CHUNK_SIZE) as usize {
                trace!("fetching head bio size {}", desc.bi_size);
                fetcher(desc);
                desc.reset();
            }
        };

        let inode = self
            .superblock
            .get_inode(ino, self.validate_digest)
            .map_err(|_e| enoent!("Can't find inode"))?;

        if inode.is_dir() {
            let mut descendants = Vec::new();
            // FIXME: Collecting descendants in DFS(Deep-First-Search) way impacts merging
            // possibility, which means a single Merging Request spans multiple directories.
            // But only files in the same directory are located closely in blob.
            let _ = inode.collect_descendants_inodes(&mut descendants)?;
            for i in descendants.iter() {
                Self::prefetch_inode(i, head_desc, hardlinks, try_prefetch)?;
            }
        } else if !inode.is_empty_size() && inode.is_reg() {
            // An empty regular file will also be packed into nydus image,
            // then it has a size of zero.
            // Moreover, for rafs v5, symlink has size of zero but non-zero size
            // for symlink size. For rafs v6, symlink size is also represented by i_size.
            // So we have to restrain the condition here.
            Self::prefetch_inode(&inode, head_desc, hardlinks, try_prefetch)?;
        }

        Ok(())
    }

    /// Walkthrough the file tree rooted at ino, calling cb for each file or directory
    /// in the tree by DFS order, including ino, please ensure ino is a directory.
    pub fn walk_dir(
        &self,
        ino: Inode,
        parent: Option<&PathBuf>,
        cb: &mut dyn FnMut(&dyn RafsInode, &Path) -> anyhow::Result<()>,
    ) -> anyhow::Result<()> {
        let inode = self.get_inode(ino, false)?;
        if !inode.is_dir() {
            bail!("inode {} is not a directory", ino);
        }
        self.walk_dir_inner(inode.as_ref(), parent, cb)
    }

    fn walk_dir_inner(
        &self,
        inode: &dyn RafsInode,
        parent: Option<&PathBuf>,
        cb: &mut dyn FnMut(&dyn RafsInode, &Path) -> anyhow::Result<()>,
    ) -> anyhow::Result<()> {
        let path = if let Some(parent) = parent {
            parent.join(inode.name())
        } else {
            PathBuf::from("/")
        };
        cb(inode, &path)?;
        if !inode.is_dir() {
            return Ok(());
        }
        let child_count = inode.get_child_count();
        for idx in 0..child_count {
            let child = inode.get_child_by_index(idx)?;
            self.walk_dir_inner(child.as_ref(), Some(&path), cb)?;
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_rafs_mode() {
        assert!(RafsMode::from_str("").is_err());
        assert!(RafsMode::from_str("directed").is_err());
        assert!(RafsMode::from_str("Direct").is_err());
        assert!(RafsMode::from_str("Cached").is_err());
        assert_eq!(RafsMode::from_str("direct").unwrap(), RafsMode::Direct);
        assert_eq!(RafsMode::from_str("cached").unwrap(), RafsMode::Cached);
        assert_eq!(&format!("{}", RafsMode::Direct), "direct");
        assert_eq!(&format!("{}", RafsMode::Cached), "cached");
    }
}