nydus_builder/core/
chunk_dict.rs

1// Copyright 2020 Ant Group. All rights reserved.
2//
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::{BTreeMap, HashMap};
6use std::mem::size_of;
7use std::path::{Path, PathBuf};
8use std::sync::atomic::{AtomicU32, Ordering};
9use std::sync::{Arc, Mutex};
10
11use anyhow::{bail, Context, Result};
12use nydus_api::ConfigV2;
13use nydus_rafs::metadata::chunk::ChunkWrapper;
14use nydus_rafs::metadata::layout::v5::RafsV5ChunkInfo;
15use nydus_rafs::metadata::{RafsSuper, RafsSuperConfig};
16use nydus_storage::device::BlobInfo;
17use nydus_utils::digest::{self, RafsDigest};
18
19use crate::Tree;
20
21#[derive(Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
22pub struct DigestWithBlobIndex(pub RafsDigest, pub u32, pub Option<u32>);
23
24/// Trait to manage chunk cache for chunk deduplication.
25pub trait ChunkDict: Sync + Send + 'static {
26    /// Add a chunk into the cache.
27    fn add_chunk(&mut self, chunk: Arc<ChunkWrapper>, digester: digest::Algorithm);
28
29    /// Get a cached chunk from the cache.
30    fn get_chunk(&self, digest: &RafsDigest, uncompressed_size: u32) -> Option<&Arc<ChunkWrapper>>;
31
32    /// Get all `BlobInfo` objects referenced by cached chunks.
33    fn get_blobs(&self) -> Vec<Arc<BlobInfo>>;
34
35    /// Get the `BlobInfo` object with inner index `idx`.
36    fn get_blob_by_inner_idx(&self, idx: u32) -> Option<&Arc<BlobInfo>>;
37
38    /// Associate an external index with the inner index.
39    fn set_real_blob_idx(&self, inner_idx: u32, out_idx: u32);
40
41    /// Get the external index associated with an inner index.
42    fn get_real_blob_idx(&self, inner_idx: u32) -> Option<u32>;
43
44    /// Get the digest algorithm used to generate chunk digest.
45    fn digester(&self) -> digest::Algorithm;
46}
47
48impl ChunkDict for () {
49    fn add_chunk(&mut self, _chunk: Arc<ChunkWrapper>, _digester: digest::Algorithm) {}
50
51    fn get_chunk(
52        &self,
53        _digest: &RafsDigest,
54        _uncompressed_size: u32,
55    ) -> Option<&Arc<ChunkWrapper>> {
56        None
57    }
58
59    fn get_blobs(&self) -> Vec<Arc<BlobInfo>> {
60        Vec::new()
61    }
62
63    fn get_blob_by_inner_idx(&self, _idx: u32) -> Option<&Arc<BlobInfo>> {
64        None
65    }
66
67    fn set_real_blob_idx(&self, _inner_idx: u32, _out_idx: u32) {
68        panic!("()::set_real_blob_idx() should not be invoked");
69    }
70
71    fn get_real_blob_idx(&self, inner_idx: u32) -> Option<u32> {
72        Some(inner_idx)
73    }
74
75    fn digester(&self) -> digest::Algorithm {
76        digest::Algorithm::Sha256
77    }
78}
79
80/// An implementation of [ChunkDict] based on [HashMap].
81pub struct HashChunkDict {
82    m: HashMap<RafsDigest, (Arc<ChunkWrapper>, AtomicU32)>,
83    blobs: Vec<Arc<BlobInfo>>,
84    blob_idx_m: Mutex<BTreeMap<u32, u32>>,
85    digester: digest::Algorithm,
86}
87
88impl ChunkDict for HashChunkDict {
89    fn add_chunk(&mut self, chunk: Arc<ChunkWrapper>, digester: digest::Algorithm) {
90        if self.digester == digester {
91            if let Some(e) = self.m.get(chunk.id()) {
92                e.1.fetch_add(1, Ordering::AcqRel);
93            } else {
94                self.m
95                    .insert(chunk.id().to_owned(), (chunk, AtomicU32::new(1)));
96            }
97        }
98    }
99
100    fn get_chunk(&self, digest: &RafsDigest, uncompressed_size: u32) -> Option<&Arc<ChunkWrapper>> {
101        if let Some((chunk, _)) = self.m.get(digest) {
102            if chunk.uncompressed_size() == 0 || chunk.uncompressed_size() == uncompressed_size {
103                return Some(chunk);
104            }
105        }
106        None
107    }
108
109    fn get_blobs(&self) -> Vec<Arc<BlobInfo>> {
110        self.blobs.clone()
111    }
112
113    fn get_blob_by_inner_idx(&self, idx: u32) -> Option<&Arc<BlobInfo>> {
114        self.blobs.get(idx as usize)
115    }
116
117    fn set_real_blob_idx(&self, inner_idx: u32, out_idx: u32) {
118        self.blob_idx_m.lock().unwrap().insert(inner_idx, out_idx);
119    }
120
121    fn get_real_blob_idx(&self, inner_idx: u32) -> Option<u32> {
122        self.blob_idx_m.lock().unwrap().get(&inner_idx).copied()
123    }
124
125    fn digester(&self) -> digest::Algorithm {
126        self.digester
127    }
128}
129
130impl HashChunkDict {
131    /// Create a new instance of [HashChunkDict].
132    pub fn new(digester: digest::Algorithm) -> Self {
133        HashChunkDict {
134            m: Default::default(),
135            blobs: vec![],
136            blob_idx_m: Mutex::new(Default::default()),
137            digester,
138        }
139    }
140
141    /// Get an immutable reference to the internal `HashMap`.
142    pub fn hashmap(&self) -> &HashMap<RafsDigest, (Arc<ChunkWrapper>, AtomicU32)> {
143        &self.m
144    }
145
146    /// Parse commandline argument for chunk dictionary and load chunks into the dictionary.
147    pub fn from_commandline_arg(
148        arg: &str,
149        config: Arc<ConfigV2>,
150        rafs_config: &RafsSuperConfig,
151    ) -> Result<Arc<dyn ChunkDict>> {
152        let file_path = parse_chunk_dict_arg(arg)?;
153        HashChunkDict::from_bootstrap_file(&file_path, config, rafs_config)
154            .map(|d| Arc::new(d) as Arc<dyn ChunkDict>)
155    }
156
157    /// Load chunks from the RAFS filesystem into the chunk dictionary.
158    pub fn from_bootstrap_file(
159        path: &Path,
160        config: Arc<ConfigV2>,
161        rafs_config: &RafsSuperConfig,
162    ) -> Result<Self> {
163        let (rs, _) = RafsSuper::load_from_file(path, config, true)
164            .with_context(|| format!("failed to open bootstrap file {:?}", path))?;
165        let mut d = HashChunkDict {
166            m: HashMap::new(),
167            blobs: rs.superblock.get_blob_infos(),
168            blob_idx_m: Mutex::new(BTreeMap::new()),
169            digester: rafs_config.digester,
170        };
171
172        rafs_config.check_compatibility(&rs.meta)?;
173        if rs.meta.is_v5() || rs.meta.has_inlined_chunk_digest() {
174            Tree::from_bootstrap(&rs, &mut d).context("failed to build tree from bootstrap")?;
175        } else if rs.meta.is_v6() {
176            d.load_chunk_table(&rs)
177                .context("failed to load chunk table")?;
178        } else {
179            unimplemented!()
180        }
181
182        Ok(d)
183    }
184
185    fn load_chunk_table(&mut self, rs: &RafsSuper) -> Result<()> {
186        let size = rs.meta.chunk_table_size as usize;
187        if size == 0 || self.digester != rs.meta.get_digester() {
188            return Ok(());
189        }
190
191        let unit_size = size_of::<RafsV5ChunkInfo>();
192        if size % unit_size != 0 {
193            return Err(std::io::Error::from_raw_os_error(libc::EINVAL)).with_context(|| {
194                format!(
195                    "load_chunk_table: invalid rafs v6 chunk table size {}",
196                    size
197                )
198            });
199        }
200
201        for idx in 0..(size / unit_size) {
202            let chunk = rs.superblock.get_chunk_info(idx)?;
203            let chunk_info = Arc::new(ChunkWrapper::from_chunk_info(chunk));
204            self.add_chunk(chunk_info, self.digester);
205        }
206
207        Ok(())
208    }
209}
210
211/// Parse a chunk dictionary argument string.
212///
213/// # Argument
214/// `arg` may be in inform of:
215/// - type=path: type of external source and corresponding path
216/// - path: type default to "bootstrap"
217///
218/// for example:
219///     bootstrap=image.boot
220///     image.boot
221///     ~/image/image.boot
222///     boltdb=/var/db/dict.db (not supported yet)
223pub fn parse_chunk_dict_arg(arg: &str) -> Result<PathBuf> {
224    let (file_type, file_path) = match arg.find('=') {
225        None => ("bootstrap", arg),
226        Some(idx) => (&arg[0..idx], &arg[idx + 1..]),
227    };
228
229    debug!("parse chunk dict argument {}={}", file_type, file_path);
230
231    match file_type {
232        "bootstrap" => Ok(PathBuf::from(file_path)),
233        _ => bail!("invalid chunk dict type {}", file_type),
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use nydus_rafs::metadata::RafsVersion;
241    use nydus_utils::{compress, digest};
242    use std::path::PathBuf;
243
244    #[test]
245    fn test_null_dict() {
246        let mut dict = Box::new(()) as Box<dyn ChunkDict>;
247
248        let chunk = Arc::new(ChunkWrapper::new(RafsVersion::V5));
249        dict.add_chunk(chunk.clone(), digest::Algorithm::Sha256);
250        assert!(dict.get_chunk(chunk.id(), 0).is_none());
251        assert_eq!(dict.get_blobs().len(), 0);
252        assert_eq!(dict.get_real_blob_idx(5).unwrap(), 5);
253    }
254
255    #[test]
256    fn test_chunk_dict() {
257        let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR");
258        let mut source_path = PathBuf::from(root_dir);
259        source_path.push("../tests/texture/bootstrap/rafs-v5.boot");
260        let path = source_path.to_str().unwrap();
261        let rafs_config = RafsSuperConfig {
262            version: RafsVersion::V5,
263            compressor: compress::Algorithm::Lz4Block,
264            digester: digest::Algorithm::Blake3,
265            chunk_size: 0x100000,
266            batch_size: 0,
267            explicit_uidgid: true,
268            is_tarfs_mode: false,
269        };
270        let dict =
271            HashChunkDict::from_commandline_arg(path, Arc::new(ConfigV2::default()), &rafs_config)
272                .unwrap();
273
274        assert!(dict.get_chunk(&RafsDigest::default(), 0).is_none());
275        assert_eq!(dict.get_blobs().len(), 18);
276        dict.set_real_blob_idx(0, 10);
277        assert_eq!(dict.get_real_blob_idx(0), Some(10));
278        assert_eq!(dict.get_real_blob_idx(1), None);
279    }
280}