1use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::io::Write;
9use std::mem;
10use std::ops::Deref;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use anyhow::{bail, ensure, Result};
15use nydus_rafs::metadata::chunk::ChunkWrapper;
16use nydus_rafs::metadata::{RafsSuper, RafsVersion};
17use nydus_storage::backend::BlobBackend;
18use nydus_storage::utils::alloc_buf;
19use nydus_utils::digest::RafsDigest;
20use nydus_utils::{digest, try_round_up_4k};
21use serde::{Deserialize, Serialize};
22use sha2::Digest;
23
24use crate::attributes::Attributes;
25use crate::core::context::Artifact;
26
27use super::core::blob::Blob;
28use super::core::bootstrap::Bootstrap;
29use super::{
30 ArtifactStorage, ArtifactWriter, BlobContext, BlobManager, BootstrapManager, BuildContext,
31 BuildOutput, ChunkDict, ConversionType, Features, Tree, TreeNode, WhiteoutSpec,
32};
33
34const DEFAULT_COMPACT_BLOB_SIZE: usize = 10 * 1024 * 1024;
35const DEFAULT_MAX_COMPACT_SIZE: usize = 100 * 1024 * 1024;
36
37const fn default_compact_blob_size() -> usize {
38 DEFAULT_COMPACT_BLOB_SIZE
39}
40
41const fn default_max_compact_size() -> usize {
42 DEFAULT_MAX_COMPACT_SIZE
43}
44
45#[derive(Clone, Deserialize, Serialize)]
46pub struct Config {
47 pub min_used_ratio: u8,
53 pub compact_blob_size: usize,
55 pub max_compact_size: usize,
57 pub layers_to_compact: usize,
60 pub blobs_dir: String,
64}
65
66impl Default for Config {
67 fn default() -> Self {
68 Self {
69 min_used_ratio: 0,
70 compact_blob_size: default_compact_blob_size(),
71 max_compact_size: default_max_compact_size(),
72 layers_to_compact: 0,
73 blobs_dir: String::new(),
74 }
75 }
76}
77
78#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
79enum ChunkKey {
80 Digest(RafsDigest),
82 Offset(u32, u64),
84}
85
86impl ChunkKey {
87 fn from(c: &ChunkWrapper) -> Self {
88 match c {
89 ChunkWrapper::V5(_) => Self::Digest(*c.id()),
90 ChunkWrapper::V6(_) => Self::Offset(c.blob_index(), c.compressed_offset()),
91 ChunkWrapper::Ref(_) => Self::Digest(*c.id()),
92 }
93 }
94}
95
96#[derive(Clone, Debug)]
97struct ChunkSet {
98 chunks: HashMap<ChunkKey, ChunkWrapper>,
99 total_size: usize,
100}
101
102impl ChunkSet {
103 fn new() -> Self {
104 Self {
105 chunks: Default::default(),
106 total_size: 0,
107 }
108 }
109
110 fn add_chunk(&mut self, chunk: &ChunkWrapper) {
111 let key = ChunkKey::from(chunk);
112 if let Entry::Vacant(e) = self.chunks.entry(key) {
113 e.insert(chunk.clone());
114 self.total_size += chunk.compressed_size() as usize;
115 }
116 }
117
118 fn get_chunk(&self, key: &ChunkKey) -> Option<&ChunkWrapper> {
119 self.chunks.get(key)
120 }
121
122 fn merge(&mut self, other: Self) {
123 for (_, c) in other.chunks.iter() {
124 self.add_chunk(c);
125 }
126 }
127
128 #[allow(clippy::too_many_arguments)]
129 fn dump(
130 &self,
131 build_ctx: &BuildContext,
132 blob_storage: ArtifactStorage,
133 ori_blob_ids: &[String],
134 new_blob_ctx: &mut BlobContext,
135 new_blob_idx: u32,
136 aligned_chunk: bool,
137 backend: &Arc<dyn BlobBackend + Send + Sync>,
138 ) -> Result<Vec<(ChunkWrapper, ChunkWrapper)>> {
139 let mut blob_writer = ArtifactWriter::new(blob_storage)?;
140 let mut chunks = self.chunks.values().collect::<Vec<&ChunkWrapper>>();
141 chunks.sort_by(|a, b| {
143 if (*a).blob_index() == (*b).blob_index() {
144 (*a).compressed_offset().cmp(&(*b).compressed_offset())
145 } else {
146 (*a).blob_index().cmp(&(*b).blob_index())
147 }
148 });
149
150 let mut changed_chunks = Vec::new();
151 for chunk in chunks {
152 let blob_idx = chunk.blob_index();
153 let reader = backend
156 .get_reader(&ori_blob_ids[blob_idx as usize])
157 .expect("get blob err");
158 let mut buf = alloc_buf(chunk.compressed_size() as usize);
159 reader
160 .read(&mut buf, chunk.compressed_offset())
161 .expect("read blob data err");
162 blob_writer.write_all(&buf)?;
163
164 let mut new_chunk = chunk.clone();
165 new_chunk.set_index(new_blob_ctx.chunk_count);
167 new_chunk.set_blob_index(new_blob_idx);
168 new_chunk.set_compressed_offset(new_blob_ctx.current_compressed_offset);
169 new_chunk.set_uncompressed_offset(new_blob_ctx.current_uncompressed_offset);
170 new_blob_ctx.add_chunk_meta_info(&new_chunk, None)?;
171 changed_chunks.push((chunk.clone(), new_chunk));
173
174 new_blob_ctx.blob_hash.update(&buf);
175 new_blob_ctx.chunk_count += 1;
176 new_blob_ctx.current_compressed_offset += chunk.compressed_size() as u64;
177 new_blob_ctx.compressed_blob_size += chunk.compressed_size() as u64;
178
179 let aligned_size = if aligned_chunk {
180 try_round_up_4k(chunk.uncompressed_size()).unwrap()
181 } else {
182 chunk.uncompressed_size() as u64
183 };
184 new_blob_ctx.current_uncompressed_offset += aligned_size;
185 new_blob_ctx.uncompressed_blob_size += aligned_size;
186 }
187 new_blob_ctx.blob_id = format!("{:x}", new_blob_ctx.blob_hash.clone().finalize());
188
189 Blob::dump_meta_data(build_ctx, new_blob_ctx, &mut blob_writer)?;
191 let blob_id = new_blob_ctx.blob_id();
192 blob_writer.finalize(blob_id)?;
193
194 Ok(changed_chunks)
195 }
196}
197
198#[derive(Clone, Debug, Default)]
199enum State {
200 ChunkDict,
201 Delete,
203 #[default]
204 Invalid,
205 Original(ChunkSet),
206 Rebuild(ChunkSet),
208}
209
210impl State {
211 fn is_rebuild(&self) -> bool {
212 matches!(self, Self::Rebuild(_))
213 }
214
215 fn is_from_dict(&self) -> bool {
216 matches!(self, Self::ChunkDict)
217 }
218
219 fn is_invalid(&self) -> bool {
220 matches!(self, Self::Invalid)
221 }
222
223 fn merge_blob(&mut self, other: Self) -> Result<()> {
224 let merge_cs = match other {
225 State::Original(cs) => cs,
226 State::Rebuild(cs) => cs,
227 _ => bail!("invalid state"),
228 };
229 match self {
230 State::Rebuild(cs) => {
231 cs.merge(merge_cs);
232 }
233 _ => bail!("invalid state"),
234 }
235 Ok(())
236 }
237
238 fn chunk_total_size(&self) -> Result<usize> {
239 Ok(match self {
240 State::Original(cs) => cs.total_size,
241 State::Rebuild(cs) => cs.total_size,
242 _ => bail!("invalid state"),
243 })
244 }
245}
246
247#[inline]
248fn apply_chunk_change(from: &ChunkWrapper, to: &mut ChunkWrapper) -> Result<()> {
249 ensure!(
250 to.uncompressed_size() == from.uncompressed_size(),
251 "different uncompress size"
252 );
253 ensure!(
254 to.compressed_size() == from.compressed_size(),
255 "different compressed size"
256 );
257
258 to.set_blob_index(from.blob_index());
259 to.set_index(from.index());
260 to.set_uncompressed_offset(from.uncompressed_offset());
261 to.set_compressed_offset(from.compressed_offset());
262 Ok(())
263}
264
265pub struct BlobCompactor {
267 version: RafsVersion,
269 states: Vec<State>,
271 ori_blob_mgr: BlobManager,
273 new_blob_mgr: BlobManager,
275 c2nodes: HashMap<ChunkKey, Vec<(TreeNode, usize)>>,
277 b2nodes: HashMap<u32, Vec<(TreeNode, usize)>>,
279 backend: Arc<dyn BlobBackend + Send + Sync>,
281}
282
283impl BlobCompactor {
284 fn new(
286 version: RafsVersion,
287 ori_blob_mgr: BlobManager,
288 backend: Arc<dyn BlobBackend + Send + Sync>,
289 digester: digest::Algorithm,
290 bootstrap: &Bootstrap,
291 ) -> Result<Self> {
292 let ori_blobs_number = ori_blob_mgr.len();
293 let mut compactor = Self {
294 version,
295 states: vec![Default::default(); ori_blobs_number],
296 ori_blob_mgr,
297 new_blob_mgr: BlobManager::new(digester, false),
298 c2nodes: HashMap::new(),
299 b2nodes: HashMap::new(),
300 backend,
301 };
302 compactor.load_chunk_dict_blobs();
303 compactor.load_and_dedup_chunks(bootstrap)?;
304 Ok(compactor)
305 }
306
307 fn is_v6(&self) -> bool {
308 self.version.is_v6()
309 }
310
311 fn load_and_dedup_chunks(&mut self, bootstrap: &Bootstrap) -> Result<()> {
312 let mut all_chunks = ChunkSet::new();
313 let chunk_dict = self.get_chunk_dict();
314
315 let cb = &mut |n: &Tree| -> Result<()> {
316 let mut node = n.borrow_mut_node();
317 for chunk_idx in 0..node.chunks.len() {
318 let chunk = &mut node.chunks[chunk_idx];
319 let chunk_key = ChunkKey::from(&chunk.inner);
320
321 if self.states[chunk.inner.blob_index() as usize].is_from_dict() {
322 if let Some(c) =
324 chunk_dict.get_chunk(chunk.inner.id(), chunk.inner.uncompressed_size())
325 {
326 let mut chunk_inner = chunk.inner.deref().clone();
327 apply_chunk_change(c, &mut chunk_inner)?;
328 chunk.inner = Arc::new(chunk_inner);
329 } else if let Some(c) = all_chunks.get_chunk(&chunk_key) {
330 let mut chunk_inner = chunk.inner.deref().clone();
331 apply_chunk_change(c, &mut chunk_inner)?;
332 chunk.inner = Arc::new(chunk_inner);
333 } else {
334 all_chunks.add_chunk(&chunk.inner);
335 let blob_index = chunk.inner.blob_index() as usize;
337 if self.states[blob_index].is_invalid() {
338 self.states[blob_index] = State::Original(ChunkSet::new());
339 }
340 if let State::Original(cs) = &mut self.states[blob_index] {
341 cs.add_chunk(&chunk.inner);
342 }
343 }
344 }
345
346 self.c2nodes
348 .entry(chunk_key)
349 .or_default()
350 .push((n.node.clone(), chunk_idx));
351 self.b2nodes
352 .entry(chunk.inner.blob_index())
353 .or_default()
354 .push((n.node.clone(), chunk_idx));
355 }
356 Ok(())
357 };
358
359 bootstrap.tree.walk_bfs(false, cb)
360 }
361
362 fn get_chunk_dict(&self) -> Arc<dyn ChunkDict> {
363 self.ori_blob_mgr.get_chunk_dict()
364 }
365
366 fn load_chunk_dict_blobs(&mut self) {
367 let chunk_dict = self.get_chunk_dict();
368 let blobs = chunk_dict.get_blobs();
369 for i in 0..blobs.len() {
370 if let Some(real_blob_idx) = chunk_dict.get_real_blob_idx(i as u32) {
371 self.states[real_blob_idx as usize] = State::ChunkDict;
372 }
373 }
374 }
375
376 fn apply_blob_move(&mut self, from: u32, to: u32) -> Result<()> {
377 if let Some(idx_list) = self.b2nodes.get(&from) {
378 for (n, chunk_idx) in idx_list.iter() {
379 let mut node = n.borrow_mut();
380 ensure!(
381 node.chunks[*chunk_idx].inner.blob_index() == from,
382 "unexpected blob_index of chunk"
383 );
384 node.chunks[*chunk_idx].set_blob_index(to);
385 }
386 }
387 Ok(())
388 }
389
390 fn apply_chunk_change(&mut self, c: &(ChunkWrapper, ChunkWrapper)) -> Result<()> {
391 if let Some(chunks) = self.c2nodes.get(&ChunkKey::from(&c.0)) {
392 for (n, chunk_idx) in chunks.iter() {
393 let mut node = n.borrow_mut();
394 let chunk = &mut node.chunks[*chunk_idx];
395 let mut chunk_inner = chunk.inner.deref().clone();
396 apply_chunk_change(&c.1, &mut chunk_inner)?;
397 chunk.inner = Arc::new(chunk_inner);
398 }
399 }
400 Ok(())
401 }
402
403 fn delete_unused_blobs(&mut self) {
404 for i in 0..self.states.len() {
405 if self.states[i].is_invalid() {
406 info!(
407 "compactor: delete unused blob {}",
408 self.ori_blob_mgr.get_blob(i).unwrap().blob_id
409 );
410 self.states[i] = State::Delete;
411 }
412 }
413 }
414
415 fn prepare_to_rebuild(&mut self, idx: usize) -> Result<()> {
416 if !self.states[idx].is_rebuild() {
417 return Ok(());
418 }
419
420 let mut old = State::Invalid;
421 mem::swap(&mut self.states[idx], &mut old);
422 if let State::Original(cs) = old {
423 self.states[idx] = State::Rebuild(cs);
424 } else {
425 mem::swap(&mut self.states[idx], &mut old);
426 bail!("invalid state");
427 }
428
429 Ok(())
430 }
431
432 fn try_rebuild_blobs(&mut self, ratio: u8) -> Result<()> {
433 for idx in 0..self.ori_blob_mgr.len() {
434 let blob_info = self.ori_blob_mgr.get_blob(idx).unwrap();
435 let used_ratio = match &self.states[idx] {
436 State::Original(cs) => {
437 let compressed_blob_size = if blob_info.compressed_blob_size == 0 {
438 let reader = match self.backend.get_reader(&blob_info.blob_id) {
439 Ok(r) => r,
440 Err(e) => bail!("compactor: failed to get blob reader, {}", e),
441 };
442 match reader.blob_size() {
443 Ok(sz) => sz,
444 Err(e) => bail!("compactor: failed to get blob size, {}", e),
445 }
446 } else {
447 blob_info.compressed_blob_size
448 };
449 (cs.total_size * 100 / compressed_blob_size as usize) as u8
450 }
451 _ => 100_u8,
452 };
453
454 info!(
455 "compactor: original blob size {}, used data ratio {}%",
456 blob_info.blob_id, used_ratio
457 );
458 if used_ratio < ratio {
459 self.prepare_to_rebuild(idx)?;
460 }
461 }
462
463 Ok(())
464 }
465
466 fn merge_blob(&mut self, from: usize, to: usize) -> Result<()> {
467 let mut old = State::Delete;
468 mem::swap(&mut self.states[from], &mut old);
469 self.states[to].merge_blob(old)
470 }
471
472 fn try_merge_blobs(&mut self, low: usize, max: usize) -> Result<()> {
474 let mut need_merge_blobs = Vec::new();
475 for idx in 0..self.states.len() {
476 let blob_info = self.ori_blob_mgr.get_blob(idx).unwrap();
477 match &self.states[idx] {
478 State::Original(cs) => {
479 let blob_size = if blob_info.compressed_blob_size == 0 {
480 cs.total_size
481 } else {
482 blob_info.compressed_blob_size as usize
483 };
484 if blob_size < low {
485 info!(
486 "compactor: try to merge blob {} size {}",
487 blob_info.blob_id, blob_size
488 );
489 need_merge_blobs.push((idx, blob_size));
490 }
491 }
492 State::Rebuild(cs) => {
493 if cs.total_size < low {
494 info!(
495 "compactor: try to merge blob {} size {}",
496 blob_info.blob_id, cs.total_size
497 );
498 need_merge_blobs.push((idx, cs.total_size));
499 }
500 }
501 _ => {}
502 }
503 }
504 need_merge_blobs.sort_by(|(_, len1), (_, len2)| len1.cmp(len2));
506 if need_merge_blobs.len() < 2 {
508 return Ok(());
509 }
510
511 let mut merge_to = need_merge_blobs[0].0;
512 for (blob_idx, _) in need_merge_blobs.iter().skip(1) {
513 let before_size = self.states[merge_to].chunk_total_size()?;
514 let append_size = self.states[*blob_idx].chunk_total_size()?;
515 if before_size + append_size <= max {
516 self.prepare_to_rebuild(merge_to)?;
517 self.merge_blob(*blob_idx, merge_to)?;
518 } else {
519 merge_to = *blob_idx;
520 }
521 }
522
523 Ok(())
524 }
525
526 fn original_blob_ids(&self) -> Vec<String> {
527 self.ori_blob_mgr
528 .get_blobs()
529 .into_iter()
530 .map(|blob| blob.blob_id.clone())
531 .collect()
532 }
533
534 fn dump_new_blobs(
535 &mut self,
536 build_ctx: &BuildContext,
537 dir: &str,
538 aligned_chunk: bool,
539 ) -> Result<()> {
540 let ori_blob_ids = self.original_blob_ids();
541 ensure!(self.states.len() == self.ori_blob_mgr.len());
542
543 for idx in 0..self.states.len() {
544 match &self.states[idx] {
545 State::Original(_) | State::ChunkDict => {
546 info!("compactor: keep original data blob {}", ori_blob_ids[idx]);
547 let ctx = self.ori_blob_mgr.take_blob(idx);
549 let blob_idx = self.new_blob_mgr.alloc_index()?;
550 if blob_idx != idx as u32 {
551 self.apply_blob_move(idx as u32, blob_idx)?;
552 }
553 self.new_blob_mgr.add_blob(ctx);
554 }
555 State::Delete => {
556 info!("compactor: delete compacted blob {}", ori_blob_ids[idx]);
557 }
558 State::Rebuild(cs) => {
559 let blob_storage =
560 ArtifactStorage::FileDir((PathBuf::from(dir), String::new()));
561 let mut blob_ctx = BlobContext::new(
562 String::from(""),
563 0,
564 build_ctx.blob_features,
565 build_ctx.compressor,
566 build_ctx.digester,
567 build_ctx.cipher,
568 Default::default(),
569 None,
570 false,
571 );
572 blob_ctx.set_meta_info_enabled(self.is_v6());
573 let blob_idx = self.new_blob_mgr.alloc_index()?;
574 let new_chunks = cs.dump(
575 build_ctx,
576 blob_storage,
577 &ori_blob_ids,
578 &mut blob_ctx,
579 blob_idx,
580 aligned_chunk,
581 &self.backend,
582 )?;
583 for change_chunk in new_chunks.iter() {
584 self.apply_chunk_change(change_chunk)?;
585 }
586 info!("compactor: successfully rebuild blob {}", blob_ctx.blob_id);
587 self.new_blob_mgr.add_blob(blob_ctx);
588 }
589 State::Invalid => bail!("compactor: invalid state for blob {}", ori_blob_ids[idx]),
590 }
591 }
592
593 Ok(())
594 }
595
596 fn do_compact(&mut self, cfg: &Config) -> Result<()> {
597 self.delete_unused_blobs();
598 self.try_rebuild_blobs(cfg.min_used_ratio)?;
599 self.try_merge_blobs(cfg.compact_blob_size, cfg.max_compact_size)?;
600 Ok(())
601 }
602
603 pub fn compact(
605 rs: RafsSuper,
606 d_bootstrap: PathBuf,
607 chunk_dict: Option<Arc<dyn ChunkDict>>,
608 backend: Arc<dyn BlobBackend + Send + Sync>,
609 cfg: &Config,
610 ) -> Result<Option<BuildOutput>> {
611 let mut build_ctx = BuildContext::new(
612 "".to_string(),
613 false,
614 0,
615 rs.meta.get_compressor(),
616 rs.meta.get_digester(),
617 rs.meta.explicit_uidgid(),
618 WhiteoutSpec::None,
619 ConversionType::DirectoryToRafs,
620 PathBuf::from(""),
621 Default::default(),
622 None,
623 None,
624 false,
625 Features::new(),
626 false,
627 Attributes::default(),
628 );
629 let mut bootstrap_mgr =
630 BootstrapManager::new(Some(ArtifactStorage::SingleFile(d_bootstrap)), None);
631 let mut bootstrap_ctx = bootstrap_mgr.create_ctx()?;
632 let mut ori_blob_mgr = BlobManager::new(rs.meta.get_digester(), false);
633 ori_blob_mgr.extend_from_blob_table(&build_ctx, rs.superblock.get_blob_infos())?;
634 if let Some(dict) = chunk_dict {
635 ori_blob_mgr.set_chunk_dict(dict);
636 ori_blob_mgr.extend_from_chunk_dict(&build_ctx)?;
637 }
638 if ori_blob_mgr.len() < cfg.layers_to_compact {
639 return Ok(None);
640 }
641
642 let tree = Tree::from_bootstrap(&rs, &mut ())?;
643 let mut bootstrap = Bootstrap::new(tree)?;
644 let mut compactor = Self::new(
645 build_ctx.fs_version,
646 ori_blob_mgr,
647 backend.clone(),
648 rs.meta.get_digester(),
649 &bootstrap,
650 )?;
651 compactor.do_compact(cfg)?;
652 compactor.dump_new_blobs(&build_ctx, &cfg.blobs_dir, build_ctx.aligned_chunk)?;
653 if compactor.new_blob_mgr.is_empty() {
654 info!("compactor: no chance to compact data blobs");
655 return Ok(None);
656 }
657
658 info!("compactor: successfully compacted blob");
659 let blob_table = compactor.new_blob_mgr.to_blob_table(&build_ctx)?;
661 bootstrap.build(&mut build_ctx, &mut bootstrap_ctx)?;
662 bootstrap.dump(
663 &mut build_ctx,
664 &mut bootstrap_mgr.bootstrap_storage,
665 &mut bootstrap_ctx,
666 &blob_table,
667 )?;
668
669 Ok(Some(BuildOutput::new(
670 &compactor.new_blob_mgr,
671 None,
672 &bootstrap_mgr.bootstrap_storage,
673 &None,
674 )?))
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use crate::core::node::Node;
681 use crate::HashChunkDict;
682 use crate::{NodeChunk, Overlay};
683
684 use super::*;
685 use nydus_api::ConfigV2;
686 use nydus_rafs::metadata::RafsSuperConfig;
687 use nydus_storage::backend::{BackendResult, BlobReader};
688 use nydus_storage::device::v5::BlobV5ChunkInfo;
689 use nydus_storage::device::{BlobChunkFlags, BlobChunkInfo, BlobFeatures};
690 use nydus_storage::RAFS_DEFAULT_CHUNK_SIZE;
691 use nydus_utils::crypt::Algorithm;
692 use nydus_utils::metrics::BackendMetrics;
693 use nydus_utils::{compress, crypt};
694 use std::any::Any;
695 use vmm_sys_util::tempdir::TempDir;
696 use vmm_sys_util::tempfile::TempFile;
697
698 #[doc(hidden)]
699 #[macro_export]
700 macro_rules! impl_getter {
701 ($G: ident, $F: ident, $U: ty) => {
702 fn $G(&self) -> $U {
703 self.$F
704 }
705 };
706 }
707
708 #[derive(Default, Clone)]
709 struct MockChunkInfo {
710 pub block_id: RafsDigest,
711 pub blob_index: u32,
712 pub flags: BlobChunkFlags,
713 pub compress_size: u32,
714 pub uncompress_size: u32,
715 pub compress_offset: u64,
716 pub uncompress_offset: u64,
717 pub file_offset: u64,
718 pub index: u32,
719 pub crc32: u32,
720 }
721
722 impl BlobChunkInfo for MockChunkInfo {
723 fn chunk_id(&self) -> &RafsDigest {
724 &self.block_id
725 }
726 fn id(&self) -> u32 {
727 self.index
728 }
729 fn is_compressed(&self) -> bool {
730 self.flags.contains(BlobChunkFlags::COMPRESSED)
731 }
732
733 fn is_batch(&self) -> bool {
734 self.flags.contains(BlobChunkFlags::BATCH)
735 }
736
737 fn is_encrypted(&self) -> bool {
738 false
739 }
740
741 fn has_crc32(&self) -> bool {
742 self.flags.contains(BlobChunkFlags::HAS_CRC32)
743 }
744
745 fn crc32(&self) -> u32 {
746 if self.has_crc32() {
747 self.crc32
748 } else {
749 0
750 }
751 }
752
753 fn as_any(&self) -> &dyn Any {
754 self
755 }
756
757 impl_getter!(blob_index, blob_index, u32);
758 impl_getter!(compressed_offset, compress_offset, u64);
759 impl_getter!(compressed_size, compress_size, u32);
760 impl_getter!(uncompressed_offset, uncompress_offset, u64);
761 impl_getter!(uncompressed_size, uncompress_size, u32);
762 }
763
764 impl BlobV5ChunkInfo for MockChunkInfo {
765 fn as_base(&self) -> &dyn BlobChunkInfo {
766 self
767 }
768
769 impl_getter!(index, index, u32);
770 impl_getter!(file_offset, file_offset, u64);
771 impl_getter!(flags, flags, BlobChunkFlags);
772 }
773
774 struct MockBackend {
775 pub metrics: Arc<BackendMetrics>,
776 }
777
778 impl BlobReader for MockBackend {
779 fn blob_size(&self) -> BackendResult<u64> {
780 Ok(1)
781 }
782
783 fn try_read(&self, buf: &mut [u8], _offset: u64) -> BackendResult<usize> {
784 let mut i = 0;
785 while i < buf.len() {
786 buf[i] = i as u8;
787 i += 1;
788 }
789 Ok(i)
790 }
791
792 fn metrics(&self) -> &BackendMetrics {
793 &self.metrics
796 }
797 }
798
799 unsafe impl Send for MockBackend {}
800 unsafe impl Sync for MockBackend {}
801
802 impl BlobBackend for MockBackend {
803 fn shutdown(&self) {}
804
805 fn metrics(&self) -> &BackendMetrics {
806 &self.metrics
809 }
810
811 fn get_reader(&self, _blob_id: &str) -> BackendResult<Arc<dyn BlobReader>> {
812 Ok(Arc::new(MockBackend {
813 metrics: self.metrics.clone(),
814 }))
815 }
816 }
817
818 #[test]
819 fn test_chunk_key_from() {
820 let cw = ChunkWrapper::new(RafsVersion::V5);
821 matches!(ChunkKey::from(&cw), ChunkKey::Digest(_));
822
823 let cw = ChunkWrapper::new(RafsVersion::V6);
824 matches!(ChunkKey::from(&cw), ChunkKey::Offset(_, _));
825
826 let chunk = Arc::new(MockChunkInfo {
827 block_id: Default::default(),
828 blob_index: 2,
829 flags: BlobChunkFlags::empty(),
830 compress_size: 0x800,
831 uncompress_size: 0x1000,
832 compress_offset: 0x800,
833 uncompress_offset: 0x1000,
834 file_offset: 0x1000,
835 index: 1,
836 crc32: 0,
837 }) as Arc<dyn BlobChunkInfo>;
838 let cw = ChunkWrapper::Ref(chunk);
839 ChunkKey::from(&cw);
840 }
841
842 #[test]
843 fn test_chunk_set() {
844 let mut chunk_set1 = ChunkSet::new();
845
846 let mut chunk_wrapper1 = ChunkWrapper::new(RafsVersion::V5);
847 chunk_wrapper1.set_id(RafsDigest { data: [1u8; 32] });
848 chunk_wrapper1.set_compressed_size(8);
849 let mut chunk_wrapper2 = ChunkWrapper::new(RafsVersion::V6);
850 chunk_wrapper2.set_compressed_size(16);
851
852 chunk_set1.add_chunk(&chunk_wrapper1);
853 chunk_set1.add_chunk(&chunk_wrapper2);
854 assert_eq!(chunk_set1.total_size, 24);
855
856 let chunk_key2 = ChunkKey::from(&chunk_wrapper2);
857 assert_eq!(
858 format!("{:?}", Some(chunk_wrapper2)),
859 format!("{:?}", chunk_set1.get_chunk(&chunk_key2))
860 );
861
862 let mut chunk_wrapper3 = ChunkWrapper::new(RafsVersion::V5);
863 chunk_wrapper3.set_id(RafsDigest { data: [3u8; 32] });
864 chunk_wrapper3.set_compressed_size(32);
865
866 let mut chunk_set2 = ChunkSet::new();
867 chunk_set2.add_chunk(&chunk_wrapper3);
868 chunk_set2.merge(chunk_set1);
869 assert_eq!(chunk_set2.total_size, 56);
870 assert_eq!(chunk_set2.chunks.len(), 3);
871
872 let build_ctx = BuildContext::default();
873 let tmp_file = TempFile::new().unwrap();
874 let blob_storage = ArtifactStorage::SingleFile(PathBuf::from(tmp_file.as_path()));
875 let cipher_object = Algorithm::Aes256Xts.new_cipher().unwrap();
876 let mut new_blob_ctx = BlobContext::new(
877 "blob_id".to_owned(),
878 0,
879 BlobFeatures::all(),
880 compress::Algorithm::Lz4Block,
881 digest::Algorithm::Sha256,
882 crypt::Algorithm::Aes256Xts,
883 Arc::new(cipher_object),
884 None,
885 false,
886 );
887 let ori_blob_ids = ["1".to_owned(), "2".to_owned()];
888 let backend = Arc::new(MockBackend {
889 metrics: BackendMetrics::new("id", "backend_type"),
890 }) as Arc<dyn BlobBackend + Send + Sync>;
891
892 let mut res = chunk_set2
893 .dump(
894 &build_ctx,
895 blob_storage,
896 &ori_blob_ids,
897 &mut new_blob_ctx,
898 0,
899 true,
900 &backend,
901 )
902 .unwrap();
903
904 res.sort_by(|a, b| a.0.id().data.cmp(&b.0.id().data));
905
906 assert_eq!(res.len(), 3);
907 assert_eq!(
908 format!("{:?}", res[0].1.id()),
909 format!("{:?}", RafsDigest { data: [0u8; 32] })
910 );
911 assert_eq!(
912 format!("{:?}", res[1].1.id()),
913 format!("{:?}", RafsDigest { data: [1u8; 32] })
914 );
915 assert_eq!(
916 format!("{:?}", res[2].1.id()),
917 format!("{:?}", RafsDigest { data: [3u8; 32] })
918 );
919 }
920
921 #[test]
922 fn test_state() {
923 let state = State::Rebuild(ChunkSet::new());
924 assert!(state.is_rebuild());
925 let state = State::ChunkDict;
926 assert!(state.is_from_dict());
927 let state = State::default();
928 assert!(state.is_invalid());
929
930 let mut chunk_set1 = ChunkSet::new();
931 let mut chunk_wrapper1 = ChunkWrapper::new(RafsVersion::V5);
932 chunk_wrapper1.set_id(RafsDigest { data: [1u8; 32] });
933 chunk_wrapper1.set_compressed_size(8);
934 chunk_set1.add_chunk(&chunk_wrapper1);
935 let mut state1 = State::Original(chunk_set1);
936 assert_eq!(state1.chunk_total_size().unwrap(), 8);
937
938 let mut chunk_wrapper2 = ChunkWrapper::new(RafsVersion::V6);
939 chunk_wrapper2.set_compressed_size(16);
940 let mut chunk_set2 = ChunkSet::new();
941 chunk_set2.add_chunk(&chunk_wrapper2);
942 let mut state2 = State::Rebuild(chunk_set2);
943 assert_eq!(state2.chunk_total_size().unwrap(), 16);
944
945 assert!(state1.merge_blob(state2.clone()).is_err());
946 assert!(state2.merge_blob(state1).is_ok());
947 assert!(state2.merge_blob(State::Invalid).is_err());
948
949 assert_eq!(state2.chunk_total_size().unwrap(), 24);
950 assert!(State::Delete.chunk_total_size().is_err());
951 }
952
953 #[test]
954 fn test_apply_chunk_change() {
955 let mut chunk_wrapper1 = ChunkWrapper::new(RafsVersion::V5);
956 chunk_wrapper1.set_id(RafsDigest { data: [1u8; 32] });
957 chunk_wrapper1.set_uncompressed_size(8);
958 chunk_wrapper1.set_compressed_size(8);
959
960 let mut chunk_wrapper2 = ChunkWrapper::new(RafsVersion::V6);
961 chunk_wrapper2.set_uncompressed_size(16);
962 chunk_wrapper2.set_compressed_size(16);
963
964 assert!(apply_chunk_change(&chunk_wrapper1, &mut chunk_wrapper2).is_err());
965 chunk_wrapper2.set_uncompressed_size(8);
966 assert!(apply_chunk_change(&chunk_wrapper1, &mut chunk_wrapper2).is_err());
967
968 chunk_wrapper2.set_compressed_size(8);
969 chunk_wrapper1.set_blob_index(0x10);
970 chunk_wrapper1.set_index(0x20);
971 chunk_wrapper1.set_uncompressed_offset(0x30);
972 chunk_wrapper1.set_compressed_offset(0x40);
973 assert!(apply_chunk_change(&chunk_wrapper1, &mut chunk_wrapper2).is_ok());
974 assert_eq!(chunk_wrapper2.blob_index(), 0x10);
975 assert_eq!(chunk_wrapper2.index(), 0x20);
976 assert_eq!(chunk_wrapper2.uncompressed_offset(), 0x30);
977 assert_eq!(chunk_wrapper2.compressed_offset(), 0x40);
978 }
979
980 fn create_blob_compactor() -> Result<BlobCompactor> {
981 let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR");
982 let mut source_path = PathBuf::from(root_dir);
983 source_path.push("../tests/texture/bootstrap/rafs-v5.boot");
984 let path = source_path.to_str().unwrap();
985 let rafs_config = RafsSuperConfig {
986 version: RafsVersion::V5,
987 compressor: compress::Algorithm::Lz4Block,
988 digester: digest::Algorithm::Blake3,
989 chunk_size: 0x100000,
990 batch_size: 0,
991 explicit_uidgid: true,
992 is_tarfs_mode: false,
993 };
994 let dict =
995 HashChunkDict::from_commandline_arg(path, Arc::new(ConfigV2::default()), &rafs_config)
996 .unwrap();
997
998 let mut ori_blob_mgr = BlobManager::new(digest::Algorithm::Sha256, false);
999 ori_blob_mgr.set_chunk_dict(dict);
1000
1001 let backend = Arc::new(MockBackend {
1002 metrics: BackendMetrics::new("id", "backend_type"),
1003 });
1004
1005 let tmpdir = TempDir::new()?;
1006 let tmpfile = TempFile::new_in(tmpdir.as_path())?;
1007 let node = Node::from_fs_object(
1008 RafsVersion::V6,
1009 tmpdir.as_path().to_path_buf(),
1010 tmpfile.as_path().to_path_buf(),
1011 Overlay::UpperAddition,
1012 RAFS_DEFAULT_CHUNK_SIZE as u32,
1013 0,
1014 true,
1015 false,
1016 )?;
1017 let tree = Tree::new(node);
1018 let bootstrap = Bootstrap::new(tree)?;
1019
1020 BlobCompactor::new(
1021 RafsVersion::V6,
1022 ori_blob_mgr,
1023 backend,
1024 digest::Algorithm::Sha256,
1025 &bootstrap,
1026 )
1027 }
1028
1029 #[test]
1030 fn test_blob_compactor_new() {
1031 let compactor = create_blob_compactor();
1032 assert!(compactor.is_ok());
1033 assert!(compactor.unwrap().is_v6());
1034 }
1035
1036 #[test]
1037 fn test_blob_compactor_load_chunk_dict_blobs() {
1038 let mut compactor = create_blob_compactor().unwrap();
1039 let chunk_dict = compactor.get_chunk_dict();
1040 let n = chunk_dict.get_blobs().len();
1041 for i in 0..n {
1042 chunk_dict.set_real_blob_idx(i as u32, i as u32);
1043 }
1044 compactor.states = vec![State::default(); n + 1];
1045 compactor.load_chunk_dict_blobs();
1046
1047 assert_eq!(compactor.states.len(), n + 1);
1048 assert!(compactor.states[0].is_from_dict());
1049 assert!(compactor.states[n >> 1].is_from_dict());
1050 assert!(compactor.states[n - 1].is_from_dict());
1051 assert!(!compactor.states[n].is_from_dict());
1052 }
1053
1054 fn blob_compactor_load_and_dedup_chunks() -> Result<BlobCompactor> {
1055 let mut compactor = create_blob_compactor()?;
1056
1057 let mut chunk1 = ChunkWrapper::new(RafsVersion::V5);
1058 chunk1.set_id(RafsDigest { data: [1u8; 32] });
1059 chunk1.set_uncompressed_size(0);
1060 chunk1.set_compressed_offset(0x11);
1061 chunk1.set_blob_index(1);
1062 let node_chunk1 = NodeChunk {
1063 source: crate::ChunkSource::Dict,
1064 inner: Arc::new(chunk1.clone()),
1065 };
1066 let mut chunk2 = ChunkWrapper::new(RafsVersion::V6);
1067 chunk2.set_id(RafsDigest { data: [2u8; 32] });
1068 chunk2.set_uncompressed_size(0x20);
1069 chunk2.set_compressed_offset(0x22);
1070 chunk2.set_blob_index(2);
1071 let node_chunk2 = NodeChunk {
1072 source: crate::ChunkSource::Dict,
1073 inner: Arc::new(chunk2.clone()),
1074 };
1075 let mut chunk3 = ChunkWrapper::new(RafsVersion::V6);
1076 chunk3.set_id(RafsDigest { data: [3u8; 32] });
1077 chunk3.set_uncompressed_size(0x20);
1078 chunk3.set_compressed_offset(0x22);
1079 chunk3.set_blob_index(2);
1080 let node_chunk3 = NodeChunk {
1081 source: crate::ChunkSource::Dict,
1082 inner: Arc::new(chunk3.clone()),
1083 };
1084
1085 let mut chunk_dict = HashChunkDict::new(digest::Algorithm::Sha256);
1086 chunk_dict.add_chunk(
1087 Arc::new(ChunkWrapper::new(RafsVersion::V5)),
1088 digest::Algorithm::Sha256,
1089 );
1090 chunk_dict.add_chunk(Arc::new(chunk1.clone()), digest::Algorithm::Sha256);
1091 compactor.ori_blob_mgr.set_chunk_dict(Arc::new(chunk_dict));
1092
1093 compactor.states = vec![State::ChunkDict; 5];
1094
1095 let tmpdir = TempDir::new()?;
1096 let tmpfile = TempFile::new_in(tmpdir.as_path())?;
1097 let node = Node::from_fs_object(
1098 RafsVersion::V6,
1099 tmpdir.as_path().to_path_buf(),
1100 tmpfile.as_path().to_path_buf(),
1101 Overlay::UpperAddition,
1102 RAFS_DEFAULT_CHUNK_SIZE as u32,
1103 0,
1104 true,
1105 false,
1106 )?;
1107 let mut tree = Tree::new(node);
1108 let tmpfile2 = TempFile::new_in(tmpdir.as_path())?;
1109 let mut node = Node::from_fs_object(
1110 RafsVersion::V6,
1111 tmpdir.as_path().to_path_buf(),
1112 tmpfile2.as_path().to_path_buf(),
1113 Overlay::UpperAddition,
1114 RAFS_DEFAULT_CHUNK_SIZE as u32,
1115 0,
1116 true,
1117 false,
1118 )?;
1119 node.chunks.push(node_chunk1);
1120 node.chunks.push(node_chunk2);
1121 node.chunks.push(node_chunk3);
1122 let tree2 = Tree::new(node);
1123 tree.insert_child(tree2);
1124
1125 let bootstrap = Bootstrap::new(tree)?;
1126
1127 assert!(compactor.load_and_dedup_chunks(&bootstrap).is_ok());
1128 assert_eq!(compactor.c2nodes.len(), 2);
1129 assert_eq!(compactor.b2nodes.len(), 2);
1130
1131 let chunk_key1 = ChunkKey::from(&chunk1);
1132 assert!(compactor.c2nodes.contains_key(&chunk_key1));
1133 assert_eq!(compactor.c2nodes.get(&chunk_key1).unwrap().len(), 1);
1134 assert!(compactor.b2nodes.contains_key(&chunk2.blob_index()));
1135 assert_eq!(
1136 compactor.b2nodes.get(&chunk2.blob_index()).unwrap().len(),
1137 2
1138 );
1139
1140 Ok(compactor)
1141 }
1142
1143 #[test]
1144 fn test_blob_compactor_load_and_dedup_chunks() {
1145 assert!(blob_compactor_load_and_dedup_chunks().is_ok());
1146 }
1147
1148 #[test]
1149 fn test_blob_compactor_dump_new_blobs() {
1150 let tmp_dir = TempDir::new().unwrap();
1151 let build_ctx = BuildContext::new(
1152 "build_ctx".to_string(),
1153 false,
1154 0,
1155 compress::Algorithm::Lz4Block,
1156 digest::Algorithm::Sha256,
1157 true,
1158 WhiteoutSpec::None,
1159 ConversionType::DirectoryToRafs,
1160 PathBuf::from(tmp_dir.as_path()),
1161 Default::default(),
1162 None,
1163 None,
1164 false,
1165 Features::new(),
1166 false,
1167 Attributes::default(),
1168 );
1169
1170 let mut compactor = blob_compactor_load_and_dedup_chunks().unwrap();
1171
1172 let blob_ctx1 = BlobContext::new(
1173 "blob_id1".to_owned(),
1174 0,
1175 build_ctx.blob_features,
1176 build_ctx.compressor,
1177 build_ctx.digester,
1178 build_ctx.cipher,
1179 Default::default(),
1180 None,
1181 false,
1182 );
1183 let blob_ctx2 = BlobContext::new(
1184 "blob_id2".to_owned(),
1185 0,
1186 build_ctx.blob_features,
1187 build_ctx.compressor,
1188 build_ctx.digester,
1189 build_ctx.cipher,
1190 Default::default(),
1191 None,
1192 false,
1193 );
1194 let blob_ctx3 = BlobContext::new(
1195 "blob_id3".to_owned(),
1196 0,
1197 build_ctx.blob_features,
1198 build_ctx.compressor,
1199 build_ctx.digester,
1200 build_ctx.cipher,
1201 Default::default(),
1202 None,
1203 false,
1204 );
1205 let blob_ctx4 = BlobContext::new(
1206 "blob_id4".to_owned(),
1207 0,
1208 build_ctx.blob_features,
1209 build_ctx.compressor,
1210 build_ctx.digester,
1211 build_ctx.cipher,
1212 Default::default(),
1213 None,
1214 false,
1215 );
1216 let blob_ctx5 = BlobContext::new(
1217 "blob_id5".to_owned(),
1218 0,
1219 build_ctx.blob_features,
1220 build_ctx.compressor,
1221 build_ctx.digester,
1222 build_ctx.cipher,
1223 Default::default(),
1224 None,
1225 false,
1226 );
1227 compactor.ori_blob_mgr.add_blob(blob_ctx1);
1228 compactor.ori_blob_mgr.add_blob(blob_ctx2);
1229 compactor.ori_blob_mgr.add_blob(blob_ctx3);
1230 compactor.ori_blob_mgr.add_blob(blob_ctx4);
1231 compactor.ori_blob_mgr.add_blob(blob_ctx5);
1232
1233 compactor.states[0] = State::Invalid;
1234
1235 let tmp_dir = TempDir::new().unwrap();
1236 let dir = tmp_dir.as_path().to_str().unwrap();
1237 assert!(compactor.dump_new_blobs(&build_ctx, dir, true).is_err());
1238
1239 compactor.states = vec![
1240 State::Delete,
1241 State::ChunkDict,
1242 State::Original(ChunkSet::new()),
1243 State::Rebuild(ChunkSet::new()),
1244 State::Delete,
1245 ];
1246 assert!(compactor.dump_new_blobs(&build_ctx, dir, true).is_ok());
1247 assert_eq!(compactor.ori_blob_mgr.len(), 3);
1248 }
1249
1250 #[test]
1251 fn test_blob_compactor_do_compact() {
1252 let mut compactor = blob_compactor_load_and_dedup_chunks().unwrap();
1253
1254 let tmp_dir = TempDir::new().unwrap();
1255 let build_ctx = BuildContext::new(
1256 "build_ctx".to_string(),
1257 false,
1258 0,
1259 compress::Algorithm::Lz4Block,
1260 digest::Algorithm::Sha256,
1261 true,
1262 WhiteoutSpec::None,
1263 ConversionType::DirectoryToRafs,
1264 PathBuf::from(tmp_dir.as_path()),
1265 Default::default(),
1266 None,
1267 None,
1268 false,
1269 Features::new(),
1270 false,
1271 Attributes::default(),
1272 );
1273 let mut blob_ctx1 = BlobContext::new(
1274 "blob_id1".to_owned(),
1275 0,
1276 build_ctx.blob_features,
1277 build_ctx.compressor,
1278 build_ctx.digester,
1279 build_ctx.cipher,
1280 Default::default(),
1281 None,
1282 false,
1283 );
1284 blob_ctx1.compressed_blob_size = 2;
1285 let mut blob_ctx2 = BlobContext::new(
1286 "blob_id2".to_owned(),
1287 0,
1288 build_ctx.blob_features,
1289 build_ctx.compressor,
1290 build_ctx.digester,
1291 build_ctx.cipher,
1292 Default::default(),
1293 None,
1294 false,
1295 );
1296 blob_ctx2.compressed_blob_size = 0;
1297 let blob_ctx3 = BlobContext::new(
1298 "blob_id3".to_owned(),
1299 0,
1300 build_ctx.blob_features,
1301 build_ctx.compressor,
1302 build_ctx.digester,
1303 build_ctx.cipher,
1304 Default::default(),
1305 None,
1306 false,
1307 );
1308 let blob_ctx4 = BlobContext::new(
1309 "blob_id4".to_owned(),
1310 0,
1311 build_ctx.blob_features,
1312 build_ctx.compressor,
1313 build_ctx.digester,
1314 build_ctx.cipher,
1315 Default::default(),
1316 None,
1317 false,
1318 );
1319 let blob_ctx5 = BlobContext::new(
1320 "blob_id5".to_owned(),
1321 0,
1322 build_ctx.blob_features,
1323 build_ctx.compressor,
1324 build_ctx.digester,
1325 build_ctx.cipher,
1326 Default::default(),
1327 None,
1328 false,
1329 );
1330 compactor.ori_blob_mgr.add_blob(blob_ctx1);
1331 compactor.ori_blob_mgr.add_blob(blob_ctx2);
1332 compactor.ori_blob_mgr.add_blob(blob_ctx3);
1333 compactor.ori_blob_mgr.add_blob(blob_ctx4);
1334 compactor.ori_blob_mgr.add_blob(blob_ctx5);
1335
1336 let mut chunk_set1 = ChunkSet::new();
1337 chunk_set1.total_size = 4;
1338 let mut chunk_set2 = ChunkSet::new();
1339 chunk_set2.total_size = 6;
1340 let mut chunk_set3 = ChunkSet::new();
1341 chunk_set3.total_size = 5;
1342
1343 compactor.states = vec![
1344 State::Original(chunk_set1),
1345 State::Original(chunk_set2),
1346 State::Rebuild(chunk_set3),
1347 State::ChunkDict,
1348 State::Invalid,
1349 ];
1350
1351 let cfg = Config {
1352 min_used_ratio: 50,
1353 compact_blob_size: 10,
1354 max_compact_size: 8,
1355 layers_to_compact: 0,
1356 blobs_dir: "blobs_dir".to_string(),
1357 };
1358
1359 assert!(compactor.do_compact(&cfg).is_ok());
1360 assert!(!compactor.states.last().unwrap().is_invalid());
1361 }
1362}