1use crate::anyhow;
2use crate::core::blob::Blob;
3use crate::finalize_blob;
4use crate::Artifact;
5use crate::ArtifactWriter;
6use crate::BlobContext;
7use crate::BlobManager;
8use crate::Bootstrap;
9use crate::BootstrapManager;
10use crate::BuildContext;
11use crate::BuildOutput;
12use crate::ChunkSource;
13use crate::ConversionType;
14use crate::NodeChunk;
15use crate::Path;
16use crate::PathBuf;
17use crate::Tree;
18use anyhow::{bail, Context, Result};
19use nydus_api::ConfigV2;
20use nydus_rafs::metadata::layout::RafsBlobTable;
21use nydus_rafs::metadata::RafsSuper;
22use nydus_rafs::metadata::RafsVersion;
23use nydus_storage::backend::BlobBackend;
24use nydus_storage::device::BlobInfo;
25use nydus_storage::meta::BatchContextGenerator;
26use nydus_storage::meta::BlobChunkInfoV2Ondisk;
27use nydus_utils::compress;
28use serde::Deserialize;
29use sha2::Digest;
30use std::cmp::{max, min};
31use std::mem::size_of;
32use std::sync::Arc;
33pub struct OptimizePrefetch {}
34
35struct PrefetchBlobState {
36 blob_info: BlobInfo,
37 blob_ctx: BlobContext,
38 blob_writer: Box<dyn Artifact>,
39}
40
41#[derive(Clone)]
42struct PrefetchFileRange {
43 offset: u64,
44 size: usize,
45}
46
47#[derive(Clone)]
48pub struct PrefetchFileInfo {
49 path: PathBuf,
50 ranges: Option<Vec<PrefetchFileRange>>,
51}
52
53#[derive(Deserialize)]
54struct PrefetchJson {
55 version: String,
56 files: Vec<PrefetchFileJson>,
57}
58
59#[derive(Deserialize)]
60struct PrefetchFileJson {
61 path: String,
62 ranges: Option<Vec<[u64; 2]>>,
63}
64
65impl PrefetchBlobState {
66 fn new(ctx: &BuildContext, blob_layer_num: u32, output_blob_dir_path: &Path) -> Result<Self> {
67 let mut blob_info = BlobInfo::new(
68 blob_layer_num,
69 String::from("prefetch-blob"),
70 0,
71 0,
72 ctx.chunk_size,
73 u32::MAX,
74 ctx.blob_features,
75 );
76 blob_info.set_compressor(ctx.compressor);
77 blob_info.set_separated_with_prefetch_files_feature(true);
78 let mut blob_ctx = BlobContext::from(ctx, &blob_info, ChunkSource::Build)?;
79 blob_ctx.blob_meta_info_enabled = true;
80 let blob_writer = ArtifactWriter::new(crate::ArtifactStorage::FileDir((
81 output_blob_dir_path.to_path_buf(),
82 String::new(),
83 )))
84 .map(|writer| Box::new(writer) as Box<dyn Artifact>)?;
85 Ok(Self {
86 blob_info,
87 blob_ctx,
88 blob_writer,
89 })
90 }
91}
92
93impl OptimizePrefetch {
94 pub fn generate_prefetch(
96 tree: &mut Tree,
97 ctx: &mut BuildContext,
98 bootstrap_mgr: &mut BootstrapManager,
99 blob_table: &mut RafsBlobTable,
100 output_blob_dir_path: PathBuf,
101 prefetch_files: Vec<PrefetchFileInfo>,
102 backend: Arc<dyn BlobBackend + Send + Sync>,
103 ) -> Result<BuildOutput> {
104 let blob_layer_num = match blob_table {
107 RafsBlobTable::V5(table) => table.get_all().len(),
108 RafsBlobTable::V6(table) => table.get_all().len(),
109 };
110 let mut blob_state =
111 PrefetchBlobState::new(&ctx, blob_layer_num as u32, &output_blob_dir_path)?;
112 let mut batch = BatchContextGenerator::new(0)?;
113 for node in prefetch_files.clone() {
114 Self::process_prefetch_node(
115 tree,
116 node,
117 &mut blob_state,
118 &mut batch,
119 blob_table,
120 backend.clone(),
121 )?;
122 }
123
124 Self::dump_blob(ctx, blob_table, &mut blob_state)?;
125
126 debug!("prefetch blob id: {}", ctx.blob_id);
127
128 let blob_mgr =
129 Self::build_dump_bootstrap(tree, ctx, bootstrap_mgr, blob_table, prefetch_files)?;
130 BuildOutput::new(&blob_mgr, None, &bootstrap_mgr.bootstrap_storage, &None)
131 }
132
133 fn build_dump_bootstrap(
134 tree: &mut Tree,
135 ctx: &mut BuildContext,
136 bootstrap_mgr: &mut BootstrapManager,
137 blob_table: &mut RafsBlobTable,
138 prefetch_files: Vec<PrefetchFileInfo>,
139 ) -> Result<BlobManager> {
140 let mut bootstrap_ctx = bootstrap_mgr.create_ctx()?;
141 let mut bootstrap = Bootstrap::new(tree.clone())?;
142
143 bootstrap.build(ctx, &mut bootstrap_ctx)?;
145
146 for node in prefetch_files.clone() {
148 let file = &node.path;
149 if tree.get_node(&file).is_none() {
150 warn!(
151 "prefetch file {} is skipped, no need to fixing hardlink",
152 file.display()
153 );
154 continue;
155 }
156
157 let tree_node = tree
158 .get_node(&file)
159 .ok_or(anyhow!("failed to get node"))?
160 .node
161 .as_ref();
162 let child_node = tree_node.borrow();
163 let key = (
164 child_node.layer_idx,
165 child_node.info.src_ino,
166 child_node.info.src_dev,
167 );
168 let chunks = child_node.chunks.clone();
169 drop(child_node);
170
171 if let Some(indexes) = bootstrap_ctx.inode_map.get_mut(&key) {
172 for n in indexes.iter() {
173 n.borrow_mut().chunks = chunks.clone();
175 }
176 }
177 }
178 let mut blob_mgr = BlobManager::new(ctx.digester, false);
180 let blob_info = match blob_table {
181 RafsBlobTable::V5(table) => table.get_all(),
182 RafsBlobTable::V6(table) => table.get_all(),
183 };
184 blob_mgr.extend_from_blob_table(ctx, blob_info)?;
185 let blob_table_withprefetch = blob_mgr.to_blob_table(&ctx)?;
186
187 bootstrap.dump(
188 ctx,
189 &mut bootstrap_mgr.bootstrap_storage,
190 &mut bootstrap_ctx,
191 &blob_table_withprefetch,
192 )?;
193 Ok(blob_mgr)
194 }
195
196 fn dump_blob(
197 ctx: &mut BuildContext,
198 blob_table: &mut RafsBlobTable,
199 blob_state: &mut PrefetchBlobState,
200 ) -> Result<()> {
201 match blob_table {
202 RafsBlobTable::V5(table) => {
203 table.entries.push(blob_state.blob_info.clone().into());
204 }
205 RafsBlobTable::V6(table) => {
206 table.entries.push(blob_state.blob_info.clone().into());
207 }
208 }
209
210 let mut blob_mgr = BlobManager::new(ctx.digester, false);
211 blob_mgr.add_blob(blob_state.blob_ctx.clone());
212 blob_mgr.set_current_blob_index(0);
213 Blob::finalize_blob_data(&ctx, &mut blob_mgr, blob_state.blob_writer.as_mut())?;
214 if let RafsBlobTable::V6(_) = blob_table {
215 if let Some((_, blob_ctx)) = blob_mgr.get_current_blob() {
216 Blob::dump_meta_data(&ctx, blob_ctx, blob_state.blob_writer.as_mut()).unwrap();
217 };
218 }
219 ctx.blob_id = String::from("");
220 blob_mgr.get_current_blob().unwrap().1.blob_id = String::from("");
221 finalize_blob(ctx, &mut blob_mgr, blob_state.blob_writer.as_mut())?;
222 ctx.blob_id = blob_mgr
223 .get_current_blob()
224 .ok_or(anyhow!("failed to get current blob"))?
225 .1
226 .blob_id
227 .clone();
228
229 let entries = match blob_table {
230 RafsBlobTable::V5(table) => table.get_all(),
231 RafsBlobTable::V6(table) => table.get_all(),
232 };
233
234 assert!(
236 entries
237 .iter()
238 .filter(|blob| blob.blob_id() == "prefetch-blob")
239 .count()
240 == 1,
241 "Expected exactly one prefetch-blob"
242 );
243 match blob_table {
245 RafsBlobTable::V5(table) => {
246 rewrite_blob_id(&mut table.entries, "prefetch-blob", ctx.blob_id.clone())
247 }
248 RafsBlobTable::V6(table) => {
249 rewrite_blob_id(&mut table.entries, "prefetch-blob", ctx.blob_id.clone())
250 }
251 }
252 Ok(())
253 }
254
255 fn process_prefetch_node(
256 tree: &mut Tree,
257 prefetch_file_info: PrefetchFileInfo,
258 prefetch_state: &mut PrefetchBlobState,
259 batch: &mut BatchContextGenerator,
260 blob_table: &RafsBlobTable,
261 backend: Arc<dyn BlobBackend + Send + Sync>,
262 ) -> Result<()> {
263 let file = prefetch_file_info.path.clone();
264 if tree.get_node_mut(&file).is_none() {
265 warn!("prefetch file {} is bad, skip it", file.display());
266 return Ok(());
267 }
268
269 let tree_node = tree
270 .get_node_mut(&file)
271 .ok_or(anyhow!("failed to get node"))?
272 .node
273 .as_ref();
274 let entries = match blob_table {
275 RafsBlobTable::V5(table) => table.get_all(),
276 RafsBlobTable::V6(table) => table.get_all(),
277 };
278
279 let mut child = tree_node.borrow_mut();
280 let chunks: &mut Vec<NodeChunk> = child.chunks.as_mut();
281 let blob_ctx = &mut prefetch_state.blob_ctx;
282 let blob_info = &mut prefetch_state.blob_info;
283 let encrypted = blob_ctx.blob_compressor != compress::Algorithm::None;
284
285 for chunk in chunks {
286 if let Some(ref ranges) = prefetch_file_info.ranges {
288 let mut should_skip = true;
289 for range in ranges {
290 if range_overlap(chunk, range) {
291 should_skip = false;
292 break;
293 }
294 }
295 if should_skip {
296 continue;
297 }
298 }
299
300 let blob_id = entries
301 .get(chunk.inner.blob_index() as usize)
302 .map(|entry| entry.blob_id())
303 .ok_or(anyhow!("failed to get blob id"))?;
304
305 let inner = Arc::make_mut(&mut chunk.inner);
306
307 let reader = backend
308 .clone()
309 .get_reader(&blob_id.clone())
310 .expect("get blob err");
311 let mut buf = vec![0u8; inner.compressed_size() as usize];
312 reader
313 .read(&mut buf, inner.compressed_offset())
314 .expect("read blob err");
315 prefetch_state.blob_writer.write_all(&buf)?;
316 inner.set_blob_index(blob_info.blob_index());
317 if blob_ctx.chunk_count == u32::MAX {
318 blob_ctx.chunk_count = 0;
319 }
320 inner.set_index(blob_ctx.chunk_count);
321 blob_ctx.chunk_count += 1;
322 inner.set_compressed_offset(blob_ctx.current_compressed_offset);
323 inner.set_uncompressed_offset(blob_ctx.current_uncompressed_offset);
324 let mut aligned_d_size: u64 = inner.uncompressed_size() as u64;
325 if let RafsBlobTable::V6(_) = blob_table {
326 aligned_d_size = nydus_utils::try_round_up_4k(inner.uncompressed_size())
327 .ok_or_else(|| anyhow!("invalid size"))?;
328 let info = batch.generate_chunk_info(
329 blob_ctx.current_compressed_offset,
330 blob_ctx.current_uncompressed_offset,
331 inner.uncompressed_size(),
332 encrypted,
333 )?;
334 blob_info.set_meta_ci_compressed_size(
335 (blob_info.meta_ci_compressed_size()
336 + size_of::<BlobChunkInfoV2Ondisk>() as u64) as usize,
337 );
338
339 blob_info.set_meta_ci_uncompressed_size(
340 (blob_info.meta_ci_uncompressed_size()
341 + size_of::<BlobChunkInfoV2Ondisk>() as u64) as usize,
342 );
343 blob_ctx.add_chunk_meta_info(&inner, Some(info))?;
344 }
345 blob_ctx.compressed_blob_size += inner.compressed_size() as u64;
346 blob_ctx.uncompressed_blob_size += aligned_d_size;
347 blob_ctx.current_compressed_offset += inner.compressed_size() as u64;
348 blob_ctx.current_uncompressed_offset += aligned_d_size;
349 blob_ctx.blob_hash.update(&buf);
350
351 blob_info.set_compressed_size(blob_ctx.compressed_blob_size as usize);
352 blob_info.set_uncompressed_size(blob_ctx.uncompressed_blob_size as usize);
353 blob_info.set_chunk_count(blob_ctx.chunk_count as usize);
354 }
355
356 Ok(())
357 }
358}
359
360fn rewrite_blob_id(entries: &mut [Arc<BlobInfo>], blob_id: &str, new_blob_id: String) {
361 entries
362 .iter_mut()
363 .filter(|blob| blob.blob_id() == blob_id)
364 .for_each(|blob| {
365 let mut info = (**blob).clone();
366 info.set_blob_id(new_blob_id.clone());
367 *blob = Arc::new(info);
368 });
369}
370
371pub fn update_ctx_from_bootstrap(
372 ctx: &mut BuildContext,
373 config: Arc<ConfigV2>,
374 bootstrap_path: &Path,
375) -> Result<RafsSuper> {
376 let (sb, _) = RafsSuper::load_from_file(bootstrap_path, config, false)?;
377
378 ctx.blob_features = sb
379 .superblock
380 .get_blob_infos()
381 .first()
382 .ok_or_else(|| anyhow!("No blob info found in superblock"))?
383 .features();
384
385 let config = sb.meta.get_config();
386 if config.is_tarfs_mode {
387 ctx.conversion_type = ConversionType::TarToRafs;
388 }
389
390 ctx.fs_version =
391 RafsVersion::try_from(sb.meta.version).context("Failed to get RAFS version")?;
392 ctx.compressor = config.compressor;
393 Ok(sb)
394}
395
396pub fn generate_prefetch_file_info(prefetch_file: &Path) -> Result<Vec<PrefetchFileInfo>> {
397 let content = std::fs::read_to_string(prefetch_file).map_err(|e| {
398 anyhow!(
399 "failed to read prefetch files from {}: {}",
400 prefetch_file.display(),
401 e
402 )
403 })?;
404
405 if content.trim().is_empty() {
406 return Ok(Vec::new());
407 }
408
409 let prefetch_json: PrefetchJson = serde_json::from_str(&content)
410 .map_err(|e| anyhow!("failed to parse prefetch file as JSON: {}", e))?;
411
412 if prefetch_json.version != "v1" {
413 bail!(
414 "unsupported prefetch file version: {}",
415 prefetch_json.version
416 );
417 }
418
419 let mut prefetch_nodes = Vec::new();
420 for file_json in prefetch_json.files {
421 let path = PathBuf::from(file_json.path);
422 if !path.is_absolute() {
423 warn!(
424 "prefetch file path is not absolute, skipping: {}",
425 path.display()
426 );
427 continue;
428 }
429 let ranges = file_json.ranges.map(|rs| {
430 rs.into_iter()
431 .map(|r| PrefetchFileRange {
432 offset: r[0],
433 size: r[1] as usize,
434 })
435 .collect()
436 });
437 prefetch_nodes.push(PrefetchFileInfo { path, ranges });
438 }
439 Ok(prefetch_nodes)
440}
441
442fn range_overlap(chunk: &mut NodeChunk, range: &PrefetchFileRange) -> bool {
443 if max(range.offset, chunk.inner.file_offset())
444 <= min(
445 range.offset + range.size as u64,
446 chunk.inner.file_offset() + chunk.inner.uncompressed_size() as u64,
447 )
448 {
449 return true;
450 }
451 false
452}