nydus_builder/
optimize_prefetch.rs

1use crate::anyhow;
2use crate::core::blob::Blob;
3use crate::finalize_blob;
4use crate::Artifact;
5use crate::ArtifactWriter;
6use crate::BlobContext;
7use crate::BlobManager;
8use crate::Bootstrap;
9use crate::BootstrapManager;
10use crate::BuildContext;
11use crate::BuildOutput;
12use crate::ChunkSource;
13use crate::ConversionType;
14use crate::NodeChunk;
15use crate::Path;
16use crate::PathBuf;
17use crate::Tree;
18use anyhow::{bail, Context, Result};
19use nydus_api::ConfigV2;
20use nydus_rafs::metadata::layout::RafsBlobTable;
21use nydus_rafs::metadata::RafsSuper;
22use nydus_rafs::metadata::RafsVersion;
23use nydus_storage::backend::BlobBackend;
24use nydus_storage::device::BlobInfo;
25use nydus_storage::meta::BatchContextGenerator;
26use nydus_storage::meta::BlobChunkInfoV2Ondisk;
27use nydus_utils::compress;
28use serde::Deserialize;
29use sha2::Digest;
30use std::cmp::{max, min};
31use std::mem::size_of;
32use std::sync::Arc;
33pub struct OptimizePrefetch {}
34
35struct PrefetchBlobState {
36    blob_info: BlobInfo,
37    blob_ctx: BlobContext,
38    blob_writer: Box<dyn Artifact>,
39}
40
41#[derive(Clone)]
42struct PrefetchFileRange {
43    offset: u64,
44    size: usize,
45}
46
47#[derive(Clone)]
48pub struct PrefetchFileInfo {
49    path: PathBuf,
50    ranges: Option<Vec<PrefetchFileRange>>,
51}
52
53#[derive(Deserialize)]
54struct PrefetchJson {
55    version: String,
56    files: Vec<PrefetchFileJson>,
57}
58
59#[derive(Deserialize)]
60struct PrefetchFileJson {
61    path: String,
62    ranges: Option<Vec<[u64; 2]>>,
63}
64
65impl PrefetchBlobState {
66    fn new(ctx: &BuildContext, blob_layer_num: u32, output_blob_dir_path: &Path) -> Result<Self> {
67        let mut blob_info = BlobInfo::new(
68            blob_layer_num,
69            String::from("prefetch-blob"),
70            0,
71            0,
72            ctx.chunk_size,
73            u32::MAX,
74            ctx.blob_features,
75        );
76        blob_info.set_compressor(ctx.compressor);
77        blob_info.set_separated_with_prefetch_files_feature(true);
78        let mut blob_ctx = BlobContext::from(ctx, &blob_info, ChunkSource::Build)?;
79        blob_ctx.blob_meta_info_enabled = true;
80        let blob_writer = ArtifactWriter::new(crate::ArtifactStorage::FileDir((
81            output_blob_dir_path.to_path_buf(),
82            String::new(),
83        )))
84        .map(|writer| Box::new(writer) as Box<dyn Artifact>)?;
85        Ok(Self {
86            blob_info,
87            blob_ctx,
88            blob_writer,
89        })
90    }
91}
92
93impl OptimizePrefetch {
94    /// Generate a new bootstrap for prefetch.
95    pub fn generate_prefetch(
96        tree: &mut Tree,
97        ctx: &mut BuildContext,
98        bootstrap_mgr: &mut BootstrapManager,
99        blob_table: &mut RafsBlobTable,
100        output_blob_dir_path: PathBuf,
101        prefetch_files: Vec<PrefetchFileInfo>,
102        backend: Arc<dyn BlobBackend + Send + Sync>,
103    ) -> Result<BuildOutput> {
104        // create a new blob for prefetch layer
105
106        let blob_layer_num = match blob_table {
107            RafsBlobTable::V5(table) => table.get_all().len(),
108            RafsBlobTable::V6(table) => table.get_all().len(),
109        };
110        let mut blob_state =
111            PrefetchBlobState::new(&ctx, blob_layer_num as u32, &output_blob_dir_path)?;
112        let mut batch = BatchContextGenerator::new(0)?;
113        for node in prefetch_files.clone() {
114            Self::process_prefetch_node(
115                tree,
116                node,
117                &mut blob_state,
118                &mut batch,
119                blob_table,
120                backend.clone(),
121            )?;
122        }
123
124        Self::dump_blob(ctx, blob_table, &mut blob_state)?;
125
126        debug!("prefetch blob id: {}", ctx.blob_id);
127
128        let blob_mgr =
129            Self::build_dump_bootstrap(tree, ctx, bootstrap_mgr, blob_table, prefetch_files)?;
130        BuildOutput::new(&blob_mgr, None, &bootstrap_mgr.bootstrap_storage, &None)
131    }
132
133    fn build_dump_bootstrap(
134        tree: &mut Tree,
135        ctx: &mut BuildContext,
136        bootstrap_mgr: &mut BootstrapManager,
137        blob_table: &mut RafsBlobTable,
138        prefetch_files: Vec<PrefetchFileInfo>,
139    ) -> Result<BlobManager> {
140        let mut bootstrap_ctx = bootstrap_mgr.create_ctx()?;
141        let mut bootstrap = Bootstrap::new(tree.clone())?;
142
143        // Build bootstrap
144        bootstrap.build(ctx, &mut bootstrap_ctx)?;
145
146        // carefully address hardlink
147        for node in prefetch_files.clone() {
148            let file = &node.path;
149            if tree.get_node(&file).is_none() {
150                warn!(
151                    "prefetch file {} is skipped, no need to fixing hardlink",
152                    file.display()
153                );
154                continue;
155            }
156
157            let tree_node = tree
158                .get_node(&file)
159                .ok_or(anyhow!("failed to get node"))?
160                .node
161                .as_ref();
162            let child_node = tree_node.borrow();
163            let key = (
164                child_node.layer_idx,
165                child_node.info.src_ino,
166                child_node.info.src_dev,
167            );
168            let chunks = child_node.chunks.clone();
169            drop(child_node);
170
171            if let Some(indexes) = bootstrap_ctx.inode_map.get_mut(&key) {
172                for n in indexes.iter() {
173                    // Rewrite blob chunks to the prefetch blob's chunks
174                    n.borrow_mut().chunks = chunks.clone();
175                }
176            }
177        }
178        // generate blob table with extended table
179        let mut blob_mgr = BlobManager::new(ctx.digester, false);
180        let blob_info = match blob_table {
181            RafsBlobTable::V5(table) => table.get_all(),
182            RafsBlobTable::V6(table) => table.get_all(),
183        };
184        blob_mgr.extend_from_blob_table(ctx, blob_info)?;
185        let blob_table_withprefetch = blob_mgr.to_blob_table(&ctx)?;
186
187        bootstrap.dump(
188            ctx,
189            &mut bootstrap_mgr.bootstrap_storage,
190            &mut bootstrap_ctx,
191            &blob_table_withprefetch,
192        )?;
193        Ok(blob_mgr)
194    }
195
196    fn dump_blob(
197        ctx: &mut BuildContext,
198        blob_table: &mut RafsBlobTable,
199        blob_state: &mut PrefetchBlobState,
200    ) -> Result<()> {
201        match blob_table {
202            RafsBlobTable::V5(table) => {
203                table.entries.push(blob_state.blob_info.clone().into());
204            }
205            RafsBlobTable::V6(table) => {
206                table.entries.push(blob_state.blob_info.clone().into());
207            }
208        }
209
210        let mut blob_mgr = BlobManager::new(ctx.digester, false);
211        blob_mgr.add_blob(blob_state.blob_ctx.clone());
212        blob_mgr.set_current_blob_index(0);
213        Blob::finalize_blob_data(&ctx, &mut blob_mgr, blob_state.blob_writer.as_mut())?;
214        if let RafsBlobTable::V6(_) = blob_table {
215            if let Some((_, blob_ctx)) = blob_mgr.get_current_blob() {
216                Blob::dump_meta_data(&ctx, blob_ctx, blob_state.blob_writer.as_mut()).unwrap();
217            };
218        }
219        ctx.blob_id = String::from("");
220        blob_mgr.get_current_blob().unwrap().1.blob_id = String::from("");
221        finalize_blob(ctx, &mut blob_mgr, blob_state.blob_writer.as_mut())?;
222        ctx.blob_id = blob_mgr
223            .get_current_blob()
224            .ok_or(anyhow!("failed to get current blob"))?
225            .1
226            .blob_id
227            .clone();
228
229        let entries = match blob_table {
230            RafsBlobTable::V5(table) => table.get_all(),
231            RafsBlobTable::V6(table) => table.get_all(),
232        };
233
234        // Verify and update prefetch blob
235        assert!(
236            entries
237                .iter()
238                .filter(|blob| blob.blob_id() == "prefetch-blob")
239                .count()
240                == 1,
241            "Expected exactly one prefetch-blob"
242        );
243        // Rewrite prefetch blob id
244        match blob_table {
245            RafsBlobTable::V5(table) => {
246                rewrite_blob_id(&mut table.entries, "prefetch-blob", ctx.blob_id.clone())
247            }
248            RafsBlobTable::V6(table) => {
249                rewrite_blob_id(&mut table.entries, "prefetch-blob", ctx.blob_id.clone())
250            }
251        }
252        Ok(())
253    }
254
255    fn process_prefetch_node(
256        tree: &mut Tree,
257        prefetch_file_info: PrefetchFileInfo,
258        prefetch_state: &mut PrefetchBlobState,
259        batch: &mut BatchContextGenerator,
260        blob_table: &RafsBlobTable,
261        backend: Arc<dyn BlobBackend + Send + Sync>,
262    ) -> Result<()> {
263        let file = prefetch_file_info.path.clone();
264        if tree.get_node_mut(&file).is_none() {
265            warn!("prefetch file {} is bad, skip it", file.display());
266            return Ok(());
267        }
268
269        let tree_node = tree
270            .get_node_mut(&file)
271            .ok_or(anyhow!("failed to get node"))?
272            .node
273            .as_ref();
274        let entries = match blob_table {
275            RafsBlobTable::V5(table) => table.get_all(),
276            RafsBlobTable::V6(table) => table.get_all(),
277        };
278
279        let mut child = tree_node.borrow_mut();
280        let chunks: &mut Vec<NodeChunk> = child.chunks.as_mut();
281        let blob_ctx = &mut prefetch_state.blob_ctx;
282        let blob_info = &mut prefetch_state.blob_info;
283        let encrypted = blob_ctx.blob_compressor != compress::Algorithm::None;
284
285        for chunk in chunks {
286            // check the file range
287            if let Some(ref ranges) = prefetch_file_info.ranges {
288                let mut should_skip = true;
289                for range in ranges {
290                    if range_overlap(chunk, range) {
291                        should_skip = false;
292                        break;
293                    }
294                }
295                if should_skip {
296                    continue;
297                }
298            }
299
300            let blob_id = entries
301                .get(chunk.inner.blob_index() as usize)
302                .map(|entry| entry.blob_id())
303                .ok_or(anyhow!("failed to get blob id"))?;
304
305            let inner = Arc::make_mut(&mut chunk.inner);
306
307            let reader = backend
308                .clone()
309                .get_reader(&blob_id.clone())
310                .expect("get blob err");
311            let mut buf = vec![0u8; inner.compressed_size() as usize];
312            reader
313                .read(&mut buf, inner.compressed_offset())
314                .expect("read blob err");
315            prefetch_state.blob_writer.write_all(&buf)?;
316            inner.set_blob_index(blob_info.blob_index());
317            if blob_ctx.chunk_count == u32::MAX {
318                blob_ctx.chunk_count = 0;
319            }
320            inner.set_index(blob_ctx.chunk_count);
321            blob_ctx.chunk_count += 1;
322            inner.set_compressed_offset(blob_ctx.current_compressed_offset);
323            inner.set_uncompressed_offset(blob_ctx.current_uncompressed_offset);
324            let mut aligned_d_size: u64 = inner.uncompressed_size() as u64;
325            if let RafsBlobTable::V6(_) = blob_table {
326                aligned_d_size = nydus_utils::try_round_up_4k(inner.uncompressed_size())
327                    .ok_or_else(|| anyhow!("invalid size"))?;
328                let info = batch.generate_chunk_info(
329                    blob_ctx.current_compressed_offset,
330                    blob_ctx.current_uncompressed_offset,
331                    inner.uncompressed_size(),
332                    encrypted,
333                )?;
334                blob_info.set_meta_ci_compressed_size(
335                    (blob_info.meta_ci_compressed_size()
336                        + size_of::<BlobChunkInfoV2Ondisk>() as u64) as usize,
337                );
338
339                blob_info.set_meta_ci_uncompressed_size(
340                    (blob_info.meta_ci_uncompressed_size()
341                        + size_of::<BlobChunkInfoV2Ondisk>() as u64) as usize,
342                );
343                blob_ctx.add_chunk_meta_info(&inner, Some(info))?;
344            }
345            blob_ctx.compressed_blob_size += inner.compressed_size() as u64;
346            blob_ctx.uncompressed_blob_size += aligned_d_size;
347            blob_ctx.current_compressed_offset += inner.compressed_size() as u64;
348            blob_ctx.current_uncompressed_offset += aligned_d_size;
349            blob_ctx.blob_hash.update(&buf);
350
351            blob_info.set_compressed_size(blob_ctx.compressed_blob_size as usize);
352            blob_info.set_uncompressed_size(blob_ctx.uncompressed_blob_size as usize);
353            blob_info.set_chunk_count(blob_ctx.chunk_count as usize);
354        }
355
356        Ok(())
357    }
358}
359
360fn rewrite_blob_id(entries: &mut [Arc<BlobInfo>], blob_id: &str, new_blob_id: String) {
361    entries
362        .iter_mut()
363        .filter(|blob| blob.blob_id() == blob_id)
364        .for_each(|blob| {
365            let mut info = (**blob).clone();
366            info.set_blob_id(new_blob_id.clone());
367            *blob = Arc::new(info);
368        });
369}
370
371pub fn update_ctx_from_bootstrap(
372    ctx: &mut BuildContext,
373    config: Arc<ConfigV2>,
374    bootstrap_path: &Path,
375) -> Result<RafsSuper> {
376    let (sb, _) = RafsSuper::load_from_file(bootstrap_path, config, false)?;
377
378    ctx.blob_features = sb
379        .superblock
380        .get_blob_infos()
381        .first()
382        .ok_or_else(|| anyhow!("No blob info found in superblock"))?
383        .features();
384
385    let config = sb.meta.get_config();
386    if config.is_tarfs_mode {
387        ctx.conversion_type = ConversionType::TarToRafs;
388    }
389
390    ctx.fs_version =
391        RafsVersion::try_from(sb.meta.version).context("Failed to get RAFS version")?;
392    ctx.compressor = config.compressor;
393    Ok(sb)
394}
395
396pub fn generate_prefetch_file_info(prefetch_file: &Path) -> Result<Vec<PrefetchFileInfo>> {
397    let content = std::fs::read_to_string(prefetch_file).map_err(|e| {
398        anyhow!(
399            "failed to read prefetch files from {}: {}",
400            prefetch_file.display(),
401            e
402        )
403    })?;
404
405    if content.trim().is_empty() {
406        return Ok(Vec::new());
407    }
408
409    let prefetch_json: PrefetchJson = serde_json::from_str(&content)
410        .map_err(|e| anyhow!("failed to parse prefetch file as JSON: {}", e))?;
411
412    if prefetch_json.version != "v1" {
413        bail!(
414            "unsupported prefetch file version: {}",
415            prefetch_json.version
416        );
417    }
418
419    let mut prefetch_nodes = Vec::new();
420    for file_json in prefetch_json.files {
421        let path = PathBuf::from(file_json.path);
422        if !path.is_absolute() {
423            warn!(
424                "prefetch file path is not absolute, skipping: {}",
425                path.display()
426            );
427            continue;
428        }
429        let ranges = file_json.ranges.map(|rs| {
430            rs.into_iter()
431                .map(|r| PrefetchFileRange {
432                    offset: r[0],
433                    size: r[1] as usize,
434                })
435                .collect()
436        });
437        prefetch_nodes.push(PrefetchFileInfo { path, ranges });
438    }
439    Ok(prefetch_nodes)
440}
441
442fn range_overlap(chunk: &mut NodeChunk, range: &PrefetchFileRange) -> bool {
443    if max(range.offset, chunk.inner.file_offset())
444        <= min(
445            range.offset + range.size as u64,
446            chunk.inner.file_offset() + chunk.inner.uncompressed_size() as u64,
447        )
448    {
449        return true;
450    }
451    false
452}