nydus_builder/
compact.rs

1// Copyright 2020 Ant Group. All rights reserved.
2// Copyright 2023 Alibaba Cloud. All rights reserved.
3//
4// SPDX-License-Identifier: Apache-2.0
5
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::io::Write;
9use std::mem;
10use std::ops::Deref;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use anyhow::{bail, ensure, Result};
15use nydus_rafs::metadata::chunk::ChunkWrapper;
16use nydus_rafs::metadata::{RafsSuper, RafsVersion};
17use nydus_storage::backend::BlobBackend;
18use nydus_storage::utils::alloc_buf;
19use nydus_utils::digest::RafsDigest;
20use nydus_utils::{digest, try_round_up_4k};
21use serde::{Deserialize, Serialize};
22use sha2::Digest;
23
24use crate::attributes::Attributes;
25use crate::core::context::Artifact;
26
27use super::core::blob::Blob;
28use super::core::bootstrap::Bootstrap;
29use super::{
30    ArtifactStorage, ArtifactWriter, BlobContext, BlobManager, BootstrapManager, BuildContext,
31    BuildOutput, ChunkDict, ConversionType, Features, Tree, TreeNode, WhiteoutSpec,
32};
33
34const DEFAULT_COMPACT_BLOB_SIZE: usize = 10 * 1024 * 1024;
35const DEFAULT_MAX_COMPACT_SIZE: usize = 100 * 1024 * 1024;
36
37const fn default_compact_blob_size() -> usize {
38    DEFAULT_COMPACT_BLOB_SIZE
39}
40
41const fn default_max_compact_size() -> usize {
42    DEFAULT_MAX_COMPACT_SIZE
43}
44
45#[derive(Clone, Deserialize, Serialize)]
46pub struct Config {
47    /// rebuild blobs whose used_ratio < min_used_ratio
48    /// used_ratio = (compress_size of all chunks which are referenced by bootstrap) / blob_compress_size
49    /// available value: 0-99, 0 means disable
50    /// hint: it's better to disable this option when there are some shared blobs
51    /// for example: build-cache
52    pub min_used_ratio: u8,
53    /// we compact blobs whose size are less than compact_blob_size
54    pub compact_blob_size: usize,
55    /// size of compacted blobs should not be larger than max_compact_size
56    pub max_compact_size: usize,
57    /// if number of blobs >= layers_to_compact, do compact
58    /// 0 means always try compact
59    pub layers_to_compact: usize,
60    /// local blobs dir, may haven't upload to backend yet
61    /// what's more, new blobs will output to this dir
62    /// name of blob file should be equal to blob_id
63    pub blobs_dir: String,
64}
65
66impl Default for Config {
67    fn default() -> Self {
68        Self {
69            min_used_ratio: 0,
70            compact_blob_size: default_compact_blob_size(),
71            max_compact_size: default_max_compact_size(),
72            layers_to_compact: 0,
73            blobs_dir: String::new(),
74        }
75    }
76}
77
78#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
79enum ChunkKey {
80    // Chunk digest for RAFS v5, may be extended to support RAFS v6 in future.
81    Digest(RafsDigest),
82    // (blob_idx, compress_offset) for RAFS v6 only
83    Offset(u32, u64),
84}
85
86impl ChunkKey {
87    fn from(c: &ChunkWrapper) -> Self {
88        match c {
89            ChunkWrapper::V5(_) => Self::Digest(*c.id()),
90            ChunkWrapper::V6(_) => Self::Offset(c.blob_index(), c.compressed_offset()),
91            ChunkWrapper::Ref(_) => Self::Digest(*c.id()),
92        }
93    }
94}
95
96#[derive(Clone, Debug)]
97struct ChunkSet {
98    chunks: HashMap<ChunkKey, ChunkWrapper>,
99    total_size: usize,
100}
101
102impl ChunkSet {
103    fn new() -> Self {
104        Self {
105            chunks: Default::default(),
106            total_size: 0,
107        }
108    }
109
110    fn add_chunk(&mut self, chunk: &ChunkWrapper) {
111        let key = ChunkKey::from(chunk);
112        if let Entry::Vacant(e) = self.chunks.entry(key) {
113            e.insert(chunk.clone());
114            self.total_size += chunk.compressed_size() as usize;
115        }
116    }
117
118    fn get_chunk(&self, key: &ChunkKey) -> Option<&ChunkWrapper> {
119        self.chunks.get(key)
120    }
121
122    fn merge(&mut self, other: Self) {
123        for (_, c) in other.chunks.iter() {
124            self.add_chunk(c);
125        }
126    }
127
128    #[allow(clippy::too_many_arguments)]
129    fn dump(
130        &self,
131        build_ctx: &BuildContext,
132        blob_storage: ArtifactStorage,
133        ori_blob_ids: &[String],
134        new_blob_ctx: &mut BlobContext,
135        new_blob_idx: u32,
136        aligned_chunk: bool,
137        backend: &Arc<dyn BlobBackend + Send + Sync>,
138    ) -> Result<Vec<(ChunkWrapper, ChunkWrapper)>> {
139        let mut blob_writer = ArtifactWriter::new(blob_storage)?;
140        let mut chunks = self.chunks.values().collect::<Vec<&ChunkWrapper>>();
141        // sort chunks first, don't break order in original blobs
142        chunks.sort_by(|a, b| {
143            if (*a).blob_index() == (*b).blob_index() {
144                (*a).compressed_offset().cmp(&(*b).compressed_offset())
145            } else {
146                (*a).blob_index().cmp(&(*b).blob_index())
147            }
148        });
149
150        let mut changed_chunks = Vec::new();
151        for chunk in chunks {
152            let blob_idx = chunk.blob_index();
153            // get data from backend
154            // todo: merge download requests
155            let reader = backend
156                .get_reader(&ori_blob_ids[blob_idx as usize])
157                .expect("get blob err");
158            let mut buf = alloc_buf(chunk.compressed_size() as usize);
159            reader
160                .read(&mut buf, chunk.compressed_offset())
161                .expect("read blob data err");
162            blob_writer.write_all(&buf)?;
163
164            let mut new_chunk = chunk.clone();
165            // file offset field is useless
166            new_chunk.set_index(new_blob_ctx.chunk_count);
167            new_chunk.set_blob_index(new_blob_idx);
168            new_chunk.set_compressed_offset(new_blob_ctx.current_compressed_offset);
169            new_chunk.set_uncompressed_offset(new_blob_ctx.current_uncompressed_offset);
170            new_blob_ctx.add_chunk_meta_info(&new_chunk, None)?;
171            // insert change ops
172            changed_chunks.push((chunk.clone(), new_chunk));
173
174            new_blob_ctx.blob_hash.update(&buf);
175            new_blob_ctx.chunk_count += 1;
176            new_blob_ctx.current_compressed_offset += chunk.compressed_size() as u64;
177            new_blob_ctx.compressed_blob_size += chunk.compressed_size() as u64;
178
179            let aligned_size = if aligned_chunk {
180                try_round_up_4k(chunk.uncompressed_size()).unwrap()
181            } else {
182                chunk.uncompressed_size() as u64
183            };
184            new_blob_ctx.current_uncompressed_offset += aligned_size;
185            new_blob_ctx.uncompressed_blob_size += aligned_size;
186        }
187        new_blob_ctx.blob_id = format!("{:x}", new_blob_ctx.blob_hash.clone().finalize());
188
189        // dump blob meta for v6
190        Blob::dump_meta_data(build_ctx, new_blob_ctx, &mut blob_writer)?;
191        let blob_id = new_blob_ctx.blob_id();
192        blob_writer.finalize(blob_id)?;
193
194        Ok(changed_chunks)
195    }
196}
197
198#[derive(Clone, Debug, Default)]
199enum State {
200    ChunkDict,
201    /// delete this blob
202    Delete,
203    #[default]
204    Invalid,
205    Original(ChunkSet),
206    /// output chunks as a new blob file
207    Rebuild(ChunkSet),
208}
209
210impl State {
211    fn is_rebuild(&self) -> bool {
212        matches!(self, Self::Rebuild(_))
213    }
214
215    fn is_from_dict(&self) -> bool {
216        matches!(self, Self::ChunkDict)
217    }
218
219    fn is_invalid(&self) -> bool {
220        matches!(self, Self::Invalid)
221    }
222
223    fn merge_blob(&mut self, other: Self) -> Result<()> {
224        let merge_cs = match other {
225            State::Original(cs) => cs,
226            State::Rebuild(cs) => cs,
227            _ => bail!("invalid state"),
228        };
229        match self {
230            State::Rebuild(cs) => {
231                cs.merge(merge_cs);
232            }
233            _ => bail!("invalid state"),
234        }
235        Ok(())
236    }
237
238    fn chunk_total_size(&self) -> Result<usize> {
239        Ok(match self {
240            State::Original(cs) => cs.total_size,
241            State::Rebuild(cs) => cs.total_size,
242            _ => bail!("invalid state"),
243        })
244    }
245}
246
247#[inline]
248fn apply_chunk_change(from: &ChunkWrapper, to: &mut ChunkWrapper) -> Result<()> {
249    ensure!(
250        to.uncompressed_size() == from.uncompressed_size(),
251        "different uncompress size"
252    );
253    ensure!(
254        to.compressed_size() == from.compressed_size(),
255        "different compressed size"
256    );
257
258    to.set_blob_index(from.blob_index());
259    to.set_index(from.index());
260    to.set_uncompressed_offset(from.uncompressed_offset());
261    to.set_compressed_offset(from.compressed_offset());
262    Ok(())
263}
264
265/// RAFS blob compactor to compact multiple small blobs into one blob.
266pub struct BlobCompactor {
267    /// v5 or v6
268    version: RafsVersion,
269    /// states
270    states: Vec<State>,
271    /// original blobs
272    ori_blob_mgr: BlobManager,
273    /// new blobs
274    new_blob_mgr: BlobManager,
275    /// chunk --> list<tree_node, chunk_idx in node>
276    c2nodes: HashMap<ChunkKey, Vec<(TreeNode, usize)>>,
277    /// original blob index --> list<tree_node, chunk_idx in node>
278    b2nodes: HashMap<u32, Vec<(TreeNode, usize)>>,
279    /// blobs backend
280    backend: Arc<dyn BlobBackend + Send + Sync>,
281}
282
283impl BlobCompactor {
284    /// Create a new instance of [BlobCompactor].
285    fn new(
286        version: RafsVersion,
287        ori_blob_mgr: BlobManager,
288        backend: Arc<dyn BlobBackend + Send + Sync>,
289        digester: digest::Algorithm,
290        bootstrap: &Bootstrap,
291    ) -> Result<Self> {
292        let ori_blobs_number = ori_blob_mgr.len();
293        let mut compactor = Self {
294            version,
295            states: vec![Default::default(); ori_blobs_number],
296            ori_blob_mgr,
297            new_blob_mgr: BlobManager::new(digester, false),
298            c2nodes: HashMap::new(),
299            b2nodes: HashMap::new(),
300            backend,
301        };
302        compactor.load_chunk_dict_blobs();
303        compactor.load_and_dedup_chunks(bootstrap)?;
304        Ok(compactor)
305    }
306
307    fn is_v6(&self) -> bool {
308        self.version.is_v6()
309    }
310
311    fn load_and_dedup_chunks(&mut self, bootstrap: &Bootstrap) -> Result<()> {
312        let mut all_chunks = ChunkSet::new();
313        let chunk_dict = self.get_chunk_dict();
314
315        let cb = &mut |n: &Tree| -> Result<()> {
316            let mut node = n.borrow_mut_node();
317            for chunk_idx in 0..node.chunks.len() {
318                let chunk = &mut node.chunks[chunk_idx];
319                let chunk_key = ChunkKey::from(&chunk.inner);
320
321                if self.states[chunk.inner.blob_index() as usize].is_from_dict() {
322                    // dedup by chunk dict
323                    if let Some(c) =
324                        chunk_dict.get_chunk(chunk.inner.id(), chunk.inner.uncompressed_size())
325                    {
326                        let mut chunk_inner = chunk.inner.deref().clone();
327                        apply_chunk_change(c, &mut chunk_inner)?;
328                        chunk.inner = Arc::new(chunk_inner);
329                    } else if let Some(c) = all_chunks.get_chunk(&chunk_key) {
330                        let mut chunk_inner = chunk.inner.deref().clone();
331                        apply_chunk_change(c, &mut chunk_inner)?;
332                        chunk.inner = Arc::new(chunk_inner);
333                    } else {
334                        all_chunks.add_chunk(&chunk.inner);
335                        // add to per blob ChunkSet
336                        let blob_index = chunk.inner.blob_index() as usize;
337                        if self.states[blob_index].is_invalid() {
338                            self.states[blob_index] = State::Original(ChunkSet::new());
339                        }
340                        if let State::Original(cs) = &mut self.states[blob_index] {
341                            cs.add_chunk(&chunk.inner);
342                        }
343                    }
344                }
345
346                // construct blobs/chunk --> nodes index map
347                self.c2nodes
348                    .entry(chunk_key)
349                    .or_default()
350                    .push((n.node.clone(), chunk_idx));
351                self.b2nodes
352                    .entry(chunk.inner.blob_index())
353                    .or_default()
354                    .push((n.node.clone(), chunk_idx));
355            }
356            Ok(())
357        };
358
359        bootstrap.tree.walk_bfs(false, cb)
360    }
361
362    fn get_chunk_dict(&self) -> Arc<dyn ChunkDict> {
363        self.ori_blob_mgr.get_chunk_dict()
364    }
365
366    fn load_chunk_dict_blobs(&mut self) {
367        let chunk_dict = self.get_chunk_dict();
368        let blobs = chunk_dict.get_blobs();
369        for i in 0..blobs.len() {
370            if let Some(real_blob_idx) = chunk_dict.get_real_blob_idx(i as u32) {
371                self.states[real_blob_idx as usize] = State::ChunkDict;
372            }
373        }
374    }
375
376    fn apply_blob_move(&mut self, from: u32, to: u32) -> Result<()> {
377        if let Some(idx_list) = self.b2nodes.get(&from) {
378            for (n, chunk_idx) in idx_list.iter() {
379                let mut node = n.borrow_mut();
380                ensure!(
381                    node.chunks[*chunk_idx].inner.blob_index() == from,
382                    "unexpected blob_index of chunk"
383                );
384                node.chunks[*chunk_idx].set_blob_index(to);
385            }
386        }
387        Ok(())
388    }
389
390    fn apply_chunk_change(&mut self, c: &(ChunkWrapper, ChunkWrapper)) -> Result<()> {
391        if let Some(chunks) = self.c2nodes.get(&ChunkKey::from(&c.0)) {
392            for (n, chunk_idx) in chunks.iter() {
393                let mut node = n.borrow_mut();
394                let chunk = &mut node.chunks[*chunk_idx];
395                let mut chunk_inner = chunk.inner.deref().clone();
396                apply_chunk_change(&c.1, &mut chunk_inner)?;
397                chunk.inner = Arc::new(chunk_inner);
398            }
399        }
400        Ok(())
401    }
402
403    fn delete_unused_blobs(&mut self) {
404        for i in 0..self.states.len() {
405            if self.states[i].is_invalid() {
406                info!(
407                    "compactor: delete unused blob {}",
408                    self.ori_blob_mgr.get_blob(i).unwrap().blob_id
409                );
410                self.states[i] = State::Delete;
411            }
412        }
413    }
414
415    fn prepare_to_rebuild(&mut self, idx: usize) -> Result<()> {
416        if !self.states[idx].is_rebuild() {
417            return Ok(());
418        }
419
420        let mut old = State::Invalid;
421        mem::swap(&mut self.states[idx], &mut old);
422        if let State::Original(cs) = old {
423            self.states[idx] = State::Rebuild(cs);
424        } else {
425            mem::swap(&mut self.states[idx], &mut old);
426            bail!("invalid state");
427        }
428
429        Ok(())
430    }
431
432    fn try_rebuild_blobs(&mut self, ratio: u8) -> Result<()> {
433        for idx in 0..self.ori_blob_mgr.len() {
434            let blob_info = self.ori_blob_mgr.get_blob(idx).unwrap();
435            let used_ratio = match &self.states[idx] {
436                State::Original(cs) => {
437                    let compressed_blob_size = if blob_info.compressed_blob_size == 0 {
438                        let reader = match self.backend.get_reader(&blob_info.blob_id) {
439                            Ok(r) => r,
440                            Err(e) => bail!("compactor: failed to get blob reader, {}", e),
441                        };
442                        match reader.blob_size() {
443                            Ok(sz) => sz,
444                            Err(e) => bail!("compactor: failed to get blob size, {}", e),
445                        }
446                    } else {
447                        blob_info.compressed_blob_size
448                    };
449                    (cs.total_size * 100 / compressed_blob_size as usize) as u8
450                }
451                _ => 100_u8,
452            };
453
454            info!(
455                "compactor: original blob size {}, used data ratio {}%",
456                blob_info.blob_id, used_ratio
457            );
458            if used_ratio < ratio {
459                self.prepare_to_rebuild(idx)?;
460            }
461        }
462
463        Ok(())
464    }
465
466    fn merge_blob(&mut self, from: usize, to: usize) -> Result<()> {
467        let mut old = State::Delete;
468        mem::swap(&mut self.states[from], &mut old);
469        self.states[to].merge_blob(old)
470    }
471
472    /// use greedy algorithm to merge small blobs(<low)
473    fn try_merge_blobs(&mut self, low: usize, max: usize) -> Result<()> {
474        let mut need_merge_blobs = Vec::new();
475        for idx in 0..self.states.len() {
476            let blob_info = self.ori_blob_mgr.get_blob(idx).unwrap();
477            match &self.states[idx] {
478                State::Original(cs) => {
479                    let blob_size = if blob_info.compressed_blob_size == 0 {
480                        cs.total_size
481                    } else {
482                        blob_info.compressed_blob_size as usize
483                    };
484                    if blob_size < low {
485                        info!(
486                            "compactor: try to merge blob {} size {}",
487                            blob_info.blob_id, blob_size
488                        );
489                        need_merge_blobs.push((idx, blob_size));
490                    }
491                }
492                State::Rebuild(cs) => {
493                    if cs.total_size < low {
494                        info!(
495                            "compactor: try to merge blob {} size {}",
496                            blob_info.blob_id, cs.total_size
497                        );
498                        need_merge_blobs.push((idx, cs.total_size));
499                    }
500                }
501                _ => {}
502            }
503        }
504        // sort by size
505        need_merge_blobs.sort_by(|(_, len1), (_, len2)| len1.cmp(len2));
506        // try merge
507        if need_merge_blobs.len() < 2 {
508            return Ok(());
509        }
510
511        let mut merge_to = need_merge_blobs[0].0;
512        for (blob_idx, _) in need_merge_blobs.iter().skip(1) {
513            let before_size = self.states[merge_to].chunk_total_size()?;
514            let append_size = self.states[*blob_idx].chunk_total_size()?;
515            if before_size + append_size <= max {
516                self.prepare_to_rebuild(merge_to)?;
517                self.merge_blob(*blob_idx, merge_to)?;
518            } else {
519                merge_to = *blob_idx;
520            }
521        }
522
523        Ok(())
524    }
525
526    fn original_blob_ids(&self) -> Vec<String> {
527        self.ori_blob_mgr
528            .get_blobs()
529            .into_iter()
530            .map(|blob| blob.blob_id.clone())
531            .collect()
532    }
533
534    fn dump_new_blobs(
535        &mut self,
536        build_ctx: &BuildContext,
537        dir: &str,
538        aligned_chunk: bool,
539    ) -> Result<()> {
540        let ori_blob_ids = self.original_blob_ids();
541        ensure!(self.states.len() == self.ori_blob_mgr.len());
542
543        for idx in 0..self.states.len() {
544            match &self.states[idx] {
545                State::Original(_) | State::ChunkDict => {
546                    info!("compactor: keep original data blob {}", ori_blob_ids[idx]);
547                    // already exists, no need to dump
548                    let ctx = self.ori_blob_mgr.take_blob(idx);
549                    let blob_idx = self.new_blob_mgr.alloc_index()?;
550                    if blob_idx != idx as u32 {
551                        self.apply_blob_move(idx as u32, blob_idx)?;
552                    }
553                    self.new_blob_mgr.add_blob(ctx);
554                }
555                State::Delete => {
556                    info!("compactor: delete compacted blob {}", ori_blob_ids[idx]);
557                }
558                State::Rebuild(cs) => {
559                    let blob_storage =
560                        ArtifactStorage::FileDir((PathBuf::from(dir), String::new()));
561                    let mut blob_ctx = BlobContext::new(
562                        String::from(""),
563                        0,
564                        build_ctx.blob_features,
565                        build_ctx.compressor,
566                        build_ctx.digester,
567                        build_ctx.cipher,
568                        Default::default(),
569                        None,
570                        false,
571                    );
572                    blob_ctx.set_meta_info_enabled(self.is_v6());
573                    let blob_idx = self.new_blob_mgr.alloc_index()?;
574                    let new_chunks = cs.dump(
575                        build_ctx,
576                        blob_storage,
577                        &ori_blob_ids,
578                        &mut blob_ctx,
579                        blob_idx,
580                        aligned_chunk,
581                        &self.backend,
582                    )?;
583                    for change_chunk in new_chunks.iter() {
584                        self.apply_chunk_change(change_chunk)?;
585                    }
586                    info!("compactor: successfully rebuild blob {}", blob_ctx.blob_id);
587                    self.new_blob_mgr.add_blob(blob_ctx);
588                }
589                State::Invalid => bail!("compactor: invalid state for blob {}", ori_blob_ids[idx]),
590            }
591        }
592
593        Ok(())
594    }
595
596    fn do_compact(&mut self, cfg: &Config) -> Result<()> {
597        self.delete_unused_blobs();
598        self.try_rebuild_blobs(cfg.min_used_ratio)?;
599        self.try_merge_blobs(cfg.compact_blob_size, cfg.max_compact_size)?;
600        Ok(())
601    }
602
603    /// Compact multiple small data blobs into one to reduce number of blobs.
604    pub fn compact(
605        rs: RafsSuper,
606        d_bootstrap: PathBuf,
607        chunk_dict: Option<Arc<dyn ChunkDict>>,
608        backend: Arc<dyn BlobBackend + Send + Sync>,
609        cfg: &Config,
610    ) -> Result<Option<BuildOutput>> {
611        let mut build_ctx = BuildContext::new(
612            "".to_string(),
613            false,
614            0,
615            rs.meta.get_compressor(),
616            rs.meta.get_digester(),
617            rs.meta.explicit_uidgid(),
618            WhiteoutSpec::None,
619            ConversionType::DirectoryToRafs,
620            PathBuf::from(""),
621            Default::default(),
622            None,
623            None,
624            false,
625            Features::new(),
626            false,
627            Attributes::default(),
628        );
629        let mut bootstrap_mgr =
630            BootstrapManager::new(Some(ArtifactStorage::SingleFile(d_bootstrap)), None);
631        let mut bootstrap_ctx = bootstrap_mgr.create_ctx()?;
632        let mut ori_blob_mgr = BlobManager::new(rs.meta.get_digester(), false);
633        ori_blob_mgr.extend_from_blob_table(&build_ctx, rs.superblock.get_blob_infos())?;
634        if let Some(dict) = chunk_dict {
635            ori_blob_mgr.set_chunk_dict(dict);
636            ori_blob_mgr.extend_from_chunk_dict(&build_ctx)?;
637        }
638        if ori_blob_mgr.len() < cfg.layers_to_compact {
639            return Ok(None);
640        }
641
642        let tree = Tree::from_bootstrap(&rs, &mut ())?;
643        let mut bootstrap = Bootstrap::new(tree)?;
644        let mut compactor = Self::new(
645            build_ctx.fs_version,
646            ori_blob_mgr,
647            backend.clone(),
648            rs.meta.get_digester(),
649            &bootstrap,
650        )?;
651        compactor.do_compact(cfg)?;
652        compactor.dump_new_blobs(&build_ctx, &cfg.blobs_dir, build_ctx.aligned_chunk)?;
653        if compactor.new_blob_mgr.is_empty() {
654            info!("compactor: no chance to compact data blobs");
655            return Ok(None);
656        }
657
658        info!("compactor: successfully compacted blob");
659        // blobs have already been dumped, dump bootstrap only
660        let blob_table = compactor.new_blob_mgr.to_blob_table(&build_ctx)?;
661        bootstrap.build(&mut build_ctx, &mut bootstrap_ctx)?;
662        bootstrap.dump(
663            &mut build_ctx,
664            &mut bootstrap_mgr.bootstrap_storage,
665            &mut bootstrap_ctx,
666            &blob_table,
667        )?;
668
669        Ok(Some(BuildOutput::new(
670            &compactor.new_blob_mgr,
671            None,
672            &bootstrap_mgr.bootstrap_storage,
673            &None,
674        )?))
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use crate::core::node::Node;
681    use crate::HashChunkDict;
682    use crate::{NodeChunk, Overlay};
683
684    use super::*;
685    use nydus_api::ConfigV2;
686    use nydus_rafs::metadata::RafsSuperConfig;
687    use nydus_storage::backend::{BackendResult, BlobReader};
688    use nydus_storage::device::v5::BlobV5ChunkInfo;
689    use nydus_storage::device::{BlobChunkFlags, BlobChunkInfo, BlobFeatures};
690    use nydus_storage::RAFS_DEFAULT_CHUNK_SIZE;
691    use nydus_utils::crypt::Algorithm;
692    use nydus_utils::metrics::BackendMetrics;
693    use nydus_utils::{compress, crypt};
694    use std::any::Any;
695    use vmm_sys_util::tempdir::TempDir;
696    use vmm_sys_util::tempfile::TempFile;
697
698    #[doc(hidden)]
699    #[macro_export]
700    macro_rules! impl_getter {
701        ($G: ident, $F: ident, $U: ty) => {
702            fn $G(&self) -> $U {
703                self.$F
704            }
705        };
706    }
707
708    #[derive(Default, Clone)]
709    struct MockChunkInfo {
710        pub block_id: RafsDigest,
711        pub blob_index: u32,
712        pub flags: BlobChunkFlags,
713        pub compress_size: u32,
714        pub uncompress_size: u32,
715        pub compress_offset: u64,
716        pub uncompress_offset: u64,
717        pub file_offset: u64,
718        pub index: u32,
719        pub crc32: u32,
720    }
721
722    impl BlobChunkInfo for MockChunkInfo {
723        fn chunk_id(&self) -> &RafsDigest {
724            &self.block_id
725        }
726        fn id(&self) -> u32 {
727            self.index
728        }
729        fn is_compressed(&self) -> bool {
730            self.flags.contains(BlobChunkFlags::COMPRESSED)
731        }
732
733        fn is_batch(&self) -> bool {
734            self.flags.contains(BlobChunkFlags::BATCH)
735        }
736
737        fn is_encrypted(&self) -> bool {
738            false
739        }
740
741        fn has_crc32(&self) -> bool {
742            self.flags.contains(BlobChunkFlags::HAS_CRC32)
743        }
744
745        fn crc32(&self) -> u32 {
746            if self.has_crc32() {
747                self.crc32
748            } else {
749                0
750            }
751        }
752
753        fn as_any(&self) -> &dyn Any {
754            self
755        }
756
757        impl_getter!(blob_index, blob_index, u32);
758        impl_getter!(compressed_offset, compress_offset, u64);
759        impl_getter!(compressed_size, compress_size, u32);
760        impl_getter!(uncompressed_offset, uncompress_offset, u64);
761        impl_getter!(uncompressed_size, uncompress_size, u32);
762    }
763
764    impl BlobV5ChunkInfo for MockChunkInfo {
765        fn as_base(&self) -> &dyn BlobChunkInfo {
766            self
767        }
768
769        impl_getter!(index, index, u32);
770        impl_getter!(file_offset, file_offset, u64);
771        impl_getter!(flags, flags, BlobChunkFlags);
772    }
773
774    struct MockBackend {
775        pub metrics: Arc<BackendMetrics>,
776    }
777
778    impl BlobReader for MockBackend {
779        fn blob_size(&self) -> BackendResult<u64> {
780            Ok(1)
781        }
782
783        fn try_read(&self, buf: &mut [u8], _offset: u64) -> BackendResult<usize> {
784            let mut i = 0;
785            while i < buf.len() {
786                buf[i] = i as u8;
787                i += 1;
788            }
789            Ok(i)
790        }
791
792        fn metrics(&self) -> &BackendMetrics {
793            // Safe because nydusd must have backend attached with id, only image builder can no id
794            // but use backend instance to upload blob.
795            &self.metrics
796        }
797    }
798
799    unsafe impl Send for MockBackend {}
800    unsafe impl Sync for MockBackend {}
801
802    impl BlobBackend for MockBackend {
803        fn shutdown(&self) {}
804
805        fn metrics(&self) -> &BackendMetrics {
806            // Safe because nydusd must have backend attached with id, only image builder can no id
807            // but use backend instance to upload blob.
808            &self.metrics
809        }
810
811        fn get_reader(&self, _blob_id: &str) -> BackendResult<Arc<dyn BlobReader>> {
812            Ok(Arc::new(MockBackend {
813                metrics: self.metrics.clone(),
814            }))
815        }
816    }
817
818    #[test]
819    fn test_chunk_key_from() {
820        let cw = ChunkWrapper::new(RafsVersion::V5);
821        matches!(ChunkKey::from(&cw), ChunkKey::Digest(_));
822
823        let cw = ChunkWrapper::new(RafsVersion::V6);
824        matches!(ChunkKey::from(&cw), ChunkKey::Offset(_, _));
825
826        let chunk = Arc::new(MockChunkInfo {
827            block_id: Default::default(),
828            blob_index: 2,
829            flags: BlobChunkFlags::empty(),
830            compress_size: 0x800,
831            uncompress_size: 0x1000,
832            compress_offset: 0x800,
833            uncompress_offset: 0x1000,
834            file_offset: 0x1000,
835            index: 1,
836            crc32: 0,
837        }) as Arc<dyn BlobChunkInfo>;
838        let cw = ChunkWrapper::Ref(chunk);
839        ChunkKey::from(&cw);
840    }
841
842    #[test]
843    fn test_chunk_set() {
844        let mut chunk_set1 = ChunkSet::new();
845
846        let mut chunk_wrapper1 = ChunkWrapper::new(RafsVersion::V5);
847        chunk_wrapper1.set_id(RafsDigest { data: [1u8; 32] });
848        chunk_wrapper1.set_compressed_size(8);
849        let mut chunk_wrapper2 = ChunkWrapper::new(RafsVersion::V6);
850        chunk_wrapper2.set_compressed_size(16);
851
852        chunk_set1.add_chunk(&chunk_wrapper1);
853        chunk_set1.add_chunk(&chunk_wrapper2);
854        assert_eq!(chunk_set1.total_size, 24);
855
856        let chunk_key2 = ChunkKey::from(&chunk_wrapper2);
857        assert_eq!(
858            format!("{:?}", Some(chunk_wrapper2)),
859            format!("{:?}", chunk_set1.get_chunk(&chunk_key2))
860        );
861
862        let mut chunk_wrapper3 = ChunkWrapper::new(RafsVersion::V5);
863        chunk_wrapper3.set_id(RafsDigest { data: [3u8; 32] });
864        chunk_wrapper3.set_compressed_size(32);
865
866        let mut chunk_set2 = ChunkSet::new();
867        chunk_set2.add_chunk(&chunk_wrapper3);
868        chunk_set2.merge(chunk_set1);
869        assert_eq!(chunk_set2.total_size, 56);
870        assert_eq!(chunk_set2.chunks.len(), 3);
871
872        let build_ctx = BuildContext::default();
873        let tmp_file = TempFile::new().unwrap();
874        let blob_storage = ArtifactStorage::SingleFile(PathBuf::from(tmp_file.as_path()));
875        let cipher_object = Algorithm::Aes256Xts.new_cipher().unwrap();
876        let mut new_blob_ctx = BlobContext::new(
877            "blob_id".to_owned(),
878            0,
879            BlobFeatures::all(),
880            compress::Algorithm::Lz4Block,
881            digest::Algorithm::Sha256,
882            crypt::Algorithm::Aes256Xts,
883            Arc::new(cipher_object),
884            None,
885            false,
886        );
887        let ori_blob_ids = ["1".to_owned(), "2".to_owned()];
888        let backend = Arc::new(MockBackend {
889            metrics: BackendMetrics::new("id", "backend_type"),
890        }) as Arc<dyn BlobBackend + Send + Sync>;
891
892        let mut res = chunk_set2
893            .dump(
894                &build_ctx,
895                blob_storage,
896                &ori_blob_ids,
897                &mut new_blob_ctx,
898                0,
899                true,
900                &backend,
901            )
902            .unwrap();
903
904        res.sort_by(|a, b| a.0.id().data.cmp(&b.0.id().data));
905
906        assert_eq!(res.len(), 3);
907        assert_eq!(
908            format!("{:?}", res[0].1.id()),
909            format!("{:?}", RafsDigest { data: [0u8; 32] })
910        );
911        assert_eq!(
912            format!("{:?}", res[1].1.id()),
913            format!("{:?}", RafsDigest { data: [1u8; 32] })
914        );
915        assert_eq!(
916            format!("{:?}", res[2].1.id()),
917            format!("{:?}", RafsDigest { data: [3u8; 32] })
918        );
919    }
920
921    #[test]
922    fn test_state() {
923        let state = State::Rebuild(ChunkSet::new());
924        assert!(state.is_rebuild());
925        let state = State::ChunkDict;
926        assert!(state.is_from_dict());
927        let state = State::default();
928        assert!(state.is_invalid());
929
930        let mut chunk_set1 = ChunkSet::new();
931        let mut chunk_wrapper1 = ChunkWrapper::new(RafsVersion::V5);
932        chunk_wrapper1.set_id(RafsDigest { data: [1u8; 32] });
933        chunk_wrapper1.set_compressed_size(8);
934        chunk_set1.add_chunk(&chunk_wrapper1);
935        let mut state1 = State::Original(chunk_set1);
936        assert_eq!(state1.chunk_total_size().unwrap(), 8);
937
938        let mut chunk_wrapper2 = ChunkWrapper::new(RafsVersion::V6);
939        chunk_wrapper2.set_compressed_size(16);
940        let mut chunk_set2 = ChunkSet::new();
941        chunk_set2.add_chunk(&chunk_wrapper2);
942        let mut state2 = State::Rebuild(chunk_set2);
943        assert_eq!(state2.chunk_total_size().unwrap(), 16);
944
945        assert!(state1.merge_blob(state2.clone()).is_err());
946        assert!(state2.merge_blob(state1).is_ok());
947        assert!(state2.merge_blob(State::Invalid).is_err());
948
949        assert_eq!(state2.chunk_total_size().unwrap(), 24);
950        assert!(State::Delete.chunk_total_size().is_err());
951    }
952
953    #[test]
954    fn test_apply_chunk_change() {
955        let mut chunk_wrapper1 = ChunkWrapper::new(RafsVersion::V5);
956        chunk_wrapper1.set_id(RafsDigest { data: [1u8; 32] });
957        chunk_wrapper1.set_uncompressed_size(8);
958        chunk_wrapper1.set_compressed_size(8);
959
960        let mut chunk_wrapper2 = ChunkWrapper::new(RafsVersion::V6);
961        chunk_wrapper2.set_uncompressed_size(16);
962        chunk_wrapper2.set_compressed_size(16);
963
964        assert!(apply_chunk_change(&chunk_wrapper1, &mut chunk_wrapper2).is_err());
965        chunk_wrapper2.set_uncompressed_size(8);
966        assert!(apply_chunk_change(&chunk_wrapper1, &mut chunk_wrapper2).is_err());
967
968        chunk_wrapper2.set_compressed_size(8);
969        chunk_wrapper1.set_blob_index(0x10);
970        chunk_wrapper1.set_index(0x20);
971        chunk_wrapper1.set_uncompressed_offset(0x30);
972        chunk_wrapper1.set_compressed_offset(0x40);
973        assert!(apply_chunk_change(&chunk_wrapper1, &mut chunk_wrapper2).is_ok());
974        assert_eq!(chunk_wrapper2.blob_index(), 0x10);
975        assert_eq!(chunk_wrapper2.index(), 0x20);
976        assert_eq!(chunk_wrapper2.uncompressed_offset(), 0x30);
977        assert_eq!(chunk_wrapper2.compressed_offset(), 0x40);
978    }
979
980    fn create_blob_compactor() -> Result<BlobCompactor> {
981        let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR");
982        let mut source_path = PathBuf::from(root_dir);
983        source_path.push("../tests/texture/bootstrap/rafs-v5.boot");
984        let path = source_path.to_str().unwrap();
985        let rafs_config = RafsSuperConfig {
986            version: RafsVersion::V5,
987            compressor: compress::Algorithm::Lz4Block,
988            digester: digest::Algorithm::Blake3,
989            chunk_size: 0x100000,
990            batch_size: 0,
991            explicit_uidgid: true,
992            is_tarfs_mode: false,
993        };
994        let dict =
995            HashChunkDict::from_commandline_arg(path, Arc::new(ConfigV2::default()), &rafs_config)
996                .unwrap();
997
998        let mut ori_blob_mgr = BlobManager::new(digest::Algorithm::Sha256, false);
999        ori_blob_mgr.set_chunk_dict(dict);
1000
1001        let backend = Arc::new(MockBackend {
1002            metrics: BackendMetrics::new("id", "backend_type"),
1003        });
1004
1005        let tmpdir = TempDir::new()?;
1006        let tmpfile = TempFile::new_in(tmpdir.as_path())?;
1007        let node = Node::from_fs_object(
1008            RafsVersion::V6,
1009            tmpdir.as_path().to_path_buf(),
1010            tmpfile.as_path().to_path_buf(),
1011            Overlay::UpperAddition,
1012            RAFS_DEFAULT_CHUNK_SIZE as u32,
1013            0,
1014            true,
1015            false,
1016        )?;
1017        let tree = Tree::new(node);
1018        let bootstrap = Bootstrap::new(tree)?;
1019
1020        BlobCompactor::new(
1021            RafsVersion::V6,
1022            ori_blob_mgr,
1023            backend,
1024            digest::Algorithm::Sha256,
1025            &bootstrap,
1026        )
1027    }
1028
1029    #[test]
1030    fn test_blob_compactor_new() {
1031        let compactor = create_blob_compactor();
1032        assert!(compactor.is_ok());
1033        assert!(compactor.unwrap().is_v6());
1034    }
1035
1036    #[test]
1037    fn test_blob_compactor_load_chunk_dict_blobs() {
1038        let mut compactor = create_blob_compactor().unwrap();
1039        let chunk_dict = compactor.get_chunk_dict();
1040        let n = chunk_dict.get_blobs().len();
1041        for i in 0..n {
1042            chunk_dict.set_real_blob_idx(i as u32, i as u32);
1043        }
1044        compactor.states = vec![State::default(); n + 1];
1045        compactor.load_chunk_dict_blobs();
1046
1047        assert_eq!(compactor.states.len(), n + 1);
1048        assert!(compactor.states[0].is_from_dict());
1049        assert!(compactor.states[n >> 1].is_from_dict());
1050        assert!(compactor.states[n - 1].is_from_dict());
1051        assert!(!compactor.states[n].is_from_dict());
1052    }
1053
1054    fn blob_compactor_load_and_dedup_chunks() -> Result<BlobCompactor> {
1055        let mut compactor = create_blob_compactor()?;
1056
1057        let mut chunk1 = ChunkWrapper::new(RafsVersion::V5);
1058        chunk1.set_id(RafsDigest { data: [1u8; 32] });
1059        chunk1.set_uncompressed_size(0);
1060        chunk1.set_compressed_offset(0x11);
1061        chunk1.set_blob_index(1);
1062        let node_chunk1 = NodeChunk {
1063            source: crate::ChunkSource::Dict,
1064            inner: Arc::new(chunk1.clone()),
1065        };
1066        let mut chunk2 = ChunkWrapper::new(RafsVersion::V6);
1067        chunk2.set_id(RafsDigest { data: [2u8; 32] });
1068        chunk2.set_uncompressed_size(0x20);
1069        chunk2.set_compressed_offset(0x22);
1070        chunk2.set_blob_index(2);
1071        let node_chunk2 = NodeChunk {
1072            source: crate::ChunkSource::Dict,
1073            inner: Arc::new(chunk2.clone()),
1074        };
1075        let mut chunk3 = ChunkWrapper::new(RafsVersion::V6);
1076        chunk3.set_id(RafsDigest { data: [3u8; 32] });
1077        chunk3.set_uncompressed_size(0x20);
1078        chunk3.set_compressed_offset(0x22);
1079        chunk3.set_blob_index(2);
1080        let node_chunk3 = NodeChunk {
1081            source: crate::ChunkSource::Dict,
1082            inner: Arc::new(chunk3.clone()),
1083        };
1084
1085        let mut chunk_dict = HashChunkDict::new(digest::Algorithm::Sha256);
1086        chunk_dict.add_chunk(
1087            Arc::new(ChunkWrapper::new(RafsVersion::V5)),
1088            digest::Algorithm::Sha256,
1089        );
1090        chunk_dict.add_chunk(Arc::new(chunk1.clone()), digest::Algorithm::Sha256);
1091        compactor.ori_blob_mgr.set_chunk_dict(Arc::new(chunk_dict));
1092
1093        compactor.states = vec![State::ChunkDict; 5];
1094
1095        let tmpdir = TempDir::new()?;
1096        let tmpfile = TempFile::new_in(tmpdir.as_path())?;
1097        let node = Node::from_fs_object(
1098            RafsVersion::V6,
1099            tmpdir.as_path().to_path_buf(),
1100            tmpfile.as_path().to_path_buf(),
1101            Overlay::UpperAddition,
1102            RAFS_DEFAULT_CHUNK_SIZE as u32,
1103            0,
1104            true,
1105            false,
1106        )?;
1107        let mut tree = Tree::new(node);
1108        let tmpfile2 = TempFile::new_in(tmpdir.as_path())?;
1109        let mut node = Node::from_fs_object(
1110            RafsVersion::V6,
1111            tmpdir.as_path().to_path_buf(),
1112            tmpfile2.as_path().to_path_buf(),
1113            Overlay::UpperAddition,
1114            RAFS_DEFAULT_CHUNK_SIZE as u32,
1115            0,
1116            true,
1117            false,
1118        )?;
1119        node.chunks.push(node_chunk1);
1120        node.chunks.push(node_chunk2);
1121        node.chunks.push(node_chunk3);
1122        let tree2 = Tree::new(node);
1123        tree.insert_child(tree2);
1124
1125        let bootstrap = Bootstrap::new(tree)?;
1126
1127        assert!(compactor.load_and_dedup_chunks(&bootstrap).is_ok());
1128        assert_eq!(compactor.c2nodes.len(), 2);
1129        assert_eq!(compactor.b2nodes.len(), 2);
1130
1131        let chunk_key1 = ChunkKey::from(&chunk1);
1132        assert!(compactor.c2nodes.contains_key(&chunk_key1));
1133        assert_eq!(compactor.c2nodes.get(&chunk_key1).unwrap().len(), 1);
1134        assert!(compactor.b2nodes.contains_key(&chunk2.blob_index()));
1135        assert_eq!(
1136            compactor.b2nodes.get(&chunk2.blob_index()).unwrap().len(),
1137            2
1138        );
1139
1140        Ok(compactor)
1141    }
1142
1143    #[test]
1144    fn test_blob_compactor_load_and_dedup_chunks() {
1145        assert!(blob_compactor_load_and_dedup_chunks().is_ok());
1146    }
1147
1148    #[test]
1149    fn test_blob_compactor_dump_new_blobs() {
1150        let tmp_dir = TempDir::new().unwrap();
1151        let build_ctx = BuildContext::new(
1152            "build_ctx".to_string(),
1153            false,
1154            0,
1155            compress::Algorithm::Lz4Block,
1156            digest::Algorithm::Sha256,
1157            true,
1158            WhiteoutSpec::None,
1159            ConversionType::DirectoryToRafs,
1160            PathBuf::from(tmp_dir.as_path()),
1161            Default::default(),
1162            None,
1163            None,
1164            false,
1165            Features::new(),
1166            false,
1167            Attributes::default(),
1168        );
1169
1170        let mut compactor = blob_compactor_load_and_dedup_chunks().unwrap();
1171
1172        let blob_ctx1 = BlobContext::new(
1173            "blob_id1".to_owned(),
1174            0,
1175            build_ctx.blob_features,
1176            build_ctx.compressor,
1177            build_ctx.digester,
1178            build_ctx.cipher,
1179            Default::default(),
1180            None,
1181            false,
1182        );
1183        let blob_ctx2 = BlobContext::new(
1184            "blob_id2".to_owned(),
1185            0,
1186            build_ctx.blob_features,
1187            build_ctx.compressor,
1188            build_ctx.digester,
1189            build_ctx.cipher,
1190            Default::default(),
1191            None,
1192            false,
1193        );
1194        let blob_ctx3 = BlobContext::new(
1195            "blob_id3".to_owned(),
1196            0,
1197            build_ctx.blob_features,
1198            build_ctx.compressor,
1199            build_ctx.digester,
1200            build_ctx.cipher,
1201            Default::default(),
1202            None,
1203            false,
1204        );
1205        let blob_ctx4 = BlobContext::new(
1206            "blob_id4".to_owned(),
1207            0,
1208            build_ctx.blob_features,
1209            build_ctx.compressor,
1210            build_ctx.digester,
1211            build_ctx.cipher,
1212            Default::default(),
1213            None,
1214            false,
1215        );
1216        let blob_ctx5 = BlobContext::new(
1217            "blob_id5".to_owned(),
1218            0,
1219            build_ctx.blob_features,
1220            build_ctx.compressor,
1221            build_ctx.digester,
1222            build_ctx.cipher,
1223            Default::default(),
1224            None,
1225            false,
1226        );
1227        compactor.ori_blob_mgr.add_blob(blob_ctx1);
1228        compactor.ori_blob_mgr.add_blob(blob_ctx2);
1229        compactor.ori_blob_mgr.add_blob(blob_ctx3);
1230        compactor.ori_blob_mgr.add_blob(blob_ctx4);
1231        compactor.ori_blob_mgr.add_blob(blob_ctx5);
1232
1233        compactor.states[0] = State::Invalid;
1234
1235        let tmp_dir = TempDir::new().unwrap();
1236        let dir = tmp_dir.as_path().to_str().unwrap();
1237        assert!(compactor.dump_new_blobs(&build_ctx, dir, true).is_err());
1238
1239        compactor.states = vec![
1240            State::Delete,
1241            State::ChunkDict,
1242            State::Original(ChunkSet::new()),
1243            State::Rebuild(ChunkSet::new()),
1244            State::Delete,
1245        ];
1246        assert!(compactor.dump_new_blobs(&build_ctx, dir, true).is_ok());
1247        assert_eq!(compactor.ori_blob_mgr.len(), 3);
1248    }
1249
1250    #[test]
1251    fn test_blob_compactor_do_compact() {
1252        let mut compactor = blob_compactor_load_and_dedup_chunks().unwrap();
1253
1254        let tmp_dir = TempDir::new().unwrap();
1255        let build_ctx = BuildContext::new(
1256            "build_ctx".to_string(),
1257            false,
1258            0,
1259            compress::Algorithm::Lz4Block,
1260            digest::Algorithm::Sha256,
1261            true,
1262            WhiteoutSpec::None,
1263            ConversionType::DirectoryToRafs,
1264            PathBuf::from(tmp_dir.as_path()),
1265            Default::default(),
1266            None,
1267            None,
1268            false,
1269            Features::new(),
1270            false,
1271            Attributes::default(),
1272        );
1273        let mut blob_ctx1 = BlobContext::new(
1274            "blob_id1".to_owned(),
1275            0,
1276            build_ctx.blob_features,
1277            build_ctx.compressor,
1278            build_ctx.digester,
1279            build_ctx.cipher,
1280            Default::default(),
1281            None,
1282            false,
1283        );
1284        blob_ctx1.compressed_blob_size = 2;
1285        let mut blob_ctx2 = BlobContext::new(
1286            "blob_id2".to_owned(),
1287            0,
1288            build_ctx.blob_features,
1289            build_ctx.compressor,
1290            build_ctx.digester,
1291            build_ctx.cipher,
1292            Default::default(),
1293            None,
1294            false,
1295        );
1296        blob_ctx2.compressed_blob_size = 0;
1297        let blob_ctx3 = BlobContext::new(
1298            "blob_id3".to_owned(),
1299            0,
1300            build_ctx.blob_features,
1301            build_ctx.compressor,
1302            build_ctx.digester,
1303            build_ctx.cipher,
1304            Default::default(),
1305            None,
1306            false,
1307        );
1308        let blob_ctx4 = BlobContext::new(
1309            "blob_id4".to_owned(),
1310            0,
1311            build_ctx.blob_features,
1312            build_ctx.compressor,
1313            build_ctx.digester,
1314            build_ctx.cipher,
1315            Default::default(),
1316            None,
1317            false,
1318        );
1319        let blob_ctx5 = BlobContext::new(
1320            "blob_id5".to_owned(),
1321            0,
1322            build_ctx.blob_features,
1323            build_ctx.compressor,
1324            build_ctx.digester,
1325            build_ctx.cipher,
1326            Default::default(),
1327            None,
1328            false,
1329        );
1330        compactor.ori_blob_mgr.add_blob(blob_ctx1);
1331        compactor.ori_blob_mgr.add_blob(blob_ctx2);
1332        compactor.ori_blob_mgr.add_blob(blob_ctx3);
1333        compactor.ori_blob_mgr.add_blob(blob_ctx4);
1334        compactor.ori_blob_mgr.add_blob(blob_ctx5);
1335
1336        let mut chunk_set1 = ChunkSet::new();
1337        chunk_set1.total_size = 4;
1338        let mut chunk_set2 = ChunkSet::new();
1339        chunk_set2.total_size = 6;
1340        let mut chunk_set3 = ChunkSet::new();
1341        chunk_set3.total_size = 5;
1342
1343        compactor.states = vec![
1344            State::Original(chunk_set1),
1345            State::Original(chunk_set2),
1346            State::Rebuild(chunk_set3),
1347            State::ChunkDict,
1348            State::Invalid,
1349        ];
1350
1351        let cfg = Config {
1352            min_used_ratio: 50,
1353            compact_blob_size: 10,
1354            max_compact_size: 8,
1355            layers_to_compact: 0,
1356            blobs_dir: "blobs_dir".to_string(),
1357        };
1358
1359        assert!(compactor.do_compact(&cfg).is_ok());
1360        assert!(!compactor.states.last().unwrap().is_invalid());
1361    }
1362}