nydus_storage/cache/fscache/
mod.rs1use std::collections::HashMap;
6use std::fs::File;
7use std::io::{Error, Result};
8use std::os::unix::io::AsRawFd;
9use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering};
10use std::sync::{Arc, RwLock};
11
12use nydus_api::CacheConfigV2;
13use nydus_utils::metrics::BlobcacheMetrics;
14use tokio::runtime::Runtime;
15
16use crate::backend::BlobBackend;
17use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta};
18use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX;
19use crate::cache::state::{BlobStateMap, IndexedChunkMap, RangeMap};
20use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
21#[cfg(feature = "dedup")]
22use crate::cache::CasMgr;
23use crate::cache::{BlobCache, BlobCacheMgr};
24use crate::device::{BlobFeatures, BlobInfo, BlobObject};
25use crate::factory::BLOB_FACTORY;
26use crate::utils::get_path_from_file;
27
28const FSCACHE_BLOBS_CHECK_NUM: u8 = 1;
29
30#[derive(Clone)]
33pub struct FsCacheMgr {
34 blobs: Arc<RwLock<HashMap<String, Arc<FileCacheEntry>>>>,
35 backend: Arc<dyn BlobBackend>,
36 metrics: Arc<BlobcacheMetrics>,
37 prefetch_config: Arc<AsyncPrefetchConfig>,
38 runtime: Arc<Runtime>,
39 worker_mgr: Arc<AsyncWorkerMgr>,
40 work_dir: String,
41 need_validation: bool,
42 blobs_check_count: Arc<AtomicU8>,
43 closed: Arc<AtomicBool>,
44 user_io_batch_size: u32,
45}
46
47impl FsCacheMgr {
48 pub fn new(
50 config: &CacheConfigV2,
51 backend: Arc<dyn BlobBackend>,
52 runtime: Arc<Runtime>,
53 id: &str,
54 user_io_batch_size: u32,
55 ) -> Result<FsCacheMgr> {
56 if config.cache_compressed {
57 return Err(enosys!("fscache doesn't support compressed cache mode"));
58 }
59
60 let blob_cfg = config.get_fscache_config()?;
61 let work_dir = blob_cfg.get_work_dir()?;
62 let metrics = BlobcacheMetrics::new(id, work_dir);
63 let prefetch_config: Arc<AsyncPrefetchConfig> = Arc::new((&config.prefetch).into());
64 let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?;
65
66 BLOB_FACTORY.start_mgr_checker();
67
68 Ok(FsCacheMgr {
69 blobs: Arc::new(RwLock::new(HashMap::new())),
70 backend,
71 metrics,
72 prefetch_config,
73 runtime,
74 worker_mgr: Arc::new(worker_mgr),
75 work_dir: work_dir.to_owned(),
76 need_validation: config.cache_validate,
77 blobs_check_count: Arc::new(AtomicU8::new(0)),
78 closed: Arc::new(AtomicBool::new(false)),
79 user_io_batch_size,
80 })
81 }
82
83 fn get(&self, blob: &Arc<BlobInfo>) -> Option<Arc<FileCacheEntry>> {
85 self.blobs.read().unwrap().get(&blob.blob_id()).cloned()
86 }
87
88 fn get_or_create_cache_entry(&self, blob: &Arc<BlobInfo>) -> Result<Arc<FileCacheEntry>> {
91 if let Some(entry) = self.get(blob) {
92 return Ok(entry);
93 }
94
95 let entry = FileCacheEntry::new_fs_cache(
96 self,
97 blob.clone(),
98 self.prefetch_config.clone(),
99 self.runtime.clone(),
100 self.worker_mgr.clone(),
101 )?;
102 let entry = Arc::new(entry);
103 let mut guard = self.blobs.write().unwrap();
104 if let Some(entry) = guard.get(&blob.blob_id()) {
105 Ok(entry.clone())
106 } else {
107 let blob_id = blob.blob_id();
108 guard.insert(blob_id.clone(), entry.clone());
109 self.metrics
110 .underlying_files
111 .lock()
112 .unwrap()
113 .insert(blob_id + BLOB_DATA_FILE_SUFFIX);
114 Ok(entry)
115 }
116 }
117}
118
119impl BlobCacheMgr for FsCacheMgr {
120 fn init(&self) -> Result<()> {
121 AsyncWorkerMgr::start(self.worker_mgr.clone())
122 }
123
124 fn destroy(&self) {
125 if !self.closed.load(Ordering::Acquire) {
126 self.closed.store(true, Ordering::Release);
127 self.worker_mgr.stop();
128 self.backend().shutdown();
129 self.metrics.release().unwrap_or_else(|e| error!("{:?}", e));
130 }
131 }
132
133 fn gc(&self, id: Option<&str>) -> bool {
134 if let Some(blob_id) = id {
135 self.blobs.write().unwrap().remove(blob_id);
136 } else {
137 let mut reclaim = Vec::new();
138 let guard = self.blobs.write().unwrap();
139 for (id, entry) in guard.iter() {
140 if Arc::strong_count(entry) == 1 {
141 reclaim.push(id.to_owned());
142 }
143 }
144 drop(guard);
145
146 for key in reclaim.iter() {
147 let mut guard = self.blobs.write().unwrap();
148 if let Some(entry) = guard.get(key) {
149 if Arc::strong_count(entry) == 1 {
150 guard.remove(key);
151 }
152 }
153 }
154 }
155
156 self.blobs.read().unwrap().is_empty()
157 }
158
159 fn backend(&self) -> &dyn BlobBackend {
160 self.backend.as_ref()
161 }
162
163 fn get_blob_cache(&self, blob_info: &Arc<BlobInfo>) -> Result<Arc<dyn BlobCache>> {
164 self.get_or_create_cache_entry(blob_info)
165 .map(|v| v as Arc<dyn BlobCache>)
166 }
167
168 fn check_stat(&self) {
169 let guard = self.blobs.read().unwrap();
170
171 let mut all_ready = true;
172 for (_id, entry) in guard.iter() {
173 if !entry.is_all_data_ready() {
174 all_ready = false;
175 break;
176 }
177 }
178
179 if all_ready {
181 if self.blobs_check_count.load(Ordering::Acquire) == FSCACHE_BLOBS_CHECK_NUM {
182 self.worker_mgr.stop();
183 self.metrics.data_all_ready.store(true, Ordering::Release);
184 } else {
185 self.blobs_check_count.fetch_add(1, Ordering::Acquire);
186 }
187 } else {
188 self.blobs_check_count.store(0, Ordering::Release);
189 }
190 }
191}
192
193impl Drop for FsCacheMgr {
194 fn drop(&mut self) {
195 self.destroy();
196 }
197}
198
199impl FileCacheEntry {
200 pub fn new_fs_cache(
201 mgr: &FsCacheMgr,
202 blob_info: Arc<BlobInfo>,
203 prefetch_config: Arc<AsyncPrefetchConfig>,
204 runtime: Arc<Runtime>,
205 workers: Arc<AsyncWorkerMgr>,
206 ) -> Result<Self> {
207 if blob_info.has_feature(BlobFeatures::_V5_NO_EXT_BLOB_TABLE) {
208 return Err(einval!("fscache does not support Rafs v5 blobs"));
209 }
210 let is_tarfs = blob_info.features().is_tarfs();
211 if is_tarfs {
212 return Err(einval!("fscache does not support RAFS in tarfs mode"));
213 }
214
215 let file = blob_info
216 .get_fscache_file()
217 .ok_or_else(|| einval!("No fscache file associated with the blob_info"))?;
218 let is_separate_meta = blob_info.has_feature(BlobFeatures::SEPARATE);
219 let is_batch = blob_info.has_feature(BlobFeatures::BATCH);
220 let is_zran = blob_info.has_feature(BlobFeatures::ZRAN);
221 let cache_cipher = blob_info.cipher();
222 let is_cache_encrypted = cache_cipher.is_encryption_enabled();
223 let blob_id = blob_info.blob_id();
224 let blob_meta_id = if is_separate_meta {
225 blob_info.get_blob_meta_id()?
226 } else {
227 blob_id.clone()
228 };
229 let reader = mgr
230 .backend
231 .get_reader(&blob_id)
232 .map_err(|_e| eio!("failed to get reader for data blob"))?;
233 let blob_meta_reader = if is_separate_meta {
234 mgr.backend.get_reader(&blob_meta_id).map_err(|e| {
235 eio!(format!(
236 "failed to get reader for blob.meta {}, {}",
237 blob_id, e
238 ))
239 })?
240 } else {
241 reader.clone()
242 };
243 let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?;
244
245 let cas_mgr = if is_tarfs {
247 warn!("chunk deduplication trun off");
248 None
249 } else {
250 #[cfg(feature = "dedup")]
251 {
252 CasMgr::get_singleton()
253 }
254 #[cfg(not(feature = "dedup"))]
255 None
256 };
257
258 let need_validation = mgr.need_validation
259 && !blob_info.is_legacy_stargz()
260 && blob_info.has_feature(BlobFeatures::INLINED_CHUNK_DIGEST);
261 let load_chunk_digest = need_validation || cas_mgr.is_some();
262 let blob_file_path = format!("{}/{}", mgr.work_dir, blob_meta_id);
263 let meta = if blob_info.meta_ci_is_valid() {
264 FileCacheMeta::new(
265 blob_file_path.clone(),
266 blob_info.clone(),
267 Some(blob_meta_reader),
268 None,
269 true,
270 load_chunk_digest,
271 )?
272 } else {
273 return Err(enosys!(
274 "fscache doesn't support blobs without blob meta information"
275 ));
276 };
277
278 let chunk_map = Arc::new(BlobStateMap::from(IndexedChunkMap::new(
279 &format!("{}{}", blob_file_path, BLOB_DATA_FILE_SUFFIX),
280 blob_info.chunk_count(),
281 false,
282 )?));
283 Self::restore_chunk_map(blob_info.clone(), file.clone(), &meta, &chunk_map);
284
285 let mut blob_data_file_path = String::new();
286 if cas_mgr.is_some() {
287 blob_data_file_path = if let Some(path) = get_path_from_file(&file) {
288 path
289 } else {
290 warn!("can't get path from file");
291 "".to_string()
292 }
293 }
294
295 Ok(FileCacheEntry {
296 blob_id,
297 blob_info: blob_info.clone(),
298 cache_cipher_object: Default::default(),
299 cache_cipher_context: Default::default(),
300 cas_mgr,
301 chunk_map,
302 file,
303 file_path: Arc::new(blob_data_file_path),
304 meta: Some(meta),
305 metrics: mgr.metrics.clone(),
306 prefetch_state: Arc::new(AtomicU32::new(0)),
307 reader,
308 runtime,
309 workers,
310
311 blob_compressed_size,
312 blob_uncompressed_size: blob_info.uncompressed_size(),
313 is_get_blob_object_supported: true,
314 is_raw_data: false,
315 is_direct_chunkmap: true,
316 is_cache_encrypted,
317 is_legacy_stargz: blob_info.is_legacy_stargz(),
318 is_tarfs,
319 is_batch,
320 is_zran,
321 dio_enabled: true,
322 need_validation,
323 user_io_batch_size: mgr.user_io_batch_size,
324 prefetch_config,
325 })
326 }
327
328 fn restore_chunk_map(
329 blob_info: Arc<BlobInfo>,
330 file: Arc<File>,
331 meta: &FileCacheMeta,
332 chunk_map: &BlobStateMap<IndexedChunkMap, u32>,
333 ) {
334 let blob_meta = match meta.get_blob_meta() {
335 Some(v) => v,
336 None => {
337 warn!("failed to get blob meta object for blob, skip chunkmap recover");
338 return;
339 }
340 };
341
342 let mut i = 0;
343 while i < blob_info.chunk_count() {
344 let hole_offset = unsafe {
345 libc::lseek64(
346 file.as_raw_fd(),
347 blob_meta.get_uncompressed_offset(i as usize) as i64,
348 libc::SEEK_HOLE,
349 )
350 };
351
352 if hole_offset < 0 {
353 warn!(
354 "seek hole err {} for blob {}",
355 Error::last_os_error(),
356 blob_info.blob_id()
357 );
358 break;
359 }
360
361 if hole_offset as u64 == blob_info.uncompressed_size() {
362 debug!(
363 "seek hole to file end, blob {} rest chunks {} - {} all ready",
364 blob_info.blob_id(),
365 i,
366 blob_info.chunk_count() - 1,
367 );
368 if let Err(e) =
369 chunk_map.set_range_ready_and_clear_pending(i, blob_info.chunk_count() - i)
370 {
371 warn!("set range ready err {}", e);
372 }
373 break;
374 }
375
376 let hole_index = match blob_meta.get_chunk_index(hole_offset as u64) {
377 Ok(h) => h as u32,
378 Err(e) => {
379 warn!("get offset chunk index err {}", e);
380 break;
381 }
382 };
383 if hole_index > i {
384 debug!(
385 "set blob {} rang {}-{} ready",
386 blob_info.blob_id(),
387 i,
388 hole_index - 1,
389 );
390 if let Err(e) = chunk_map.set_range_ready_and_clear_pending(i, hole_index - i) {
391 warn!("set range ready err {}", e);
392 break;
393 }
394 }
395 i = hole_index + 1;
396 }
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use std::{fs::OpenOptions, path::PathBuf};
403
404 use nydus_api::ConfigV2;
405 use nydus_utils::{compress, metrics::BackendMetrics};
406
407 use crate::{factory::ASYNC_RUNTIME, test::MockBackend, RAFS_DEFAULT_CHUNK_SIZE};
408
409 use super::*;
410
411 #[test]
412 fn test_fs_cache_mgr() {
413 let content = r#"version=2
414 id = "my_id"
415 metadata_path = "meta_path"
416 [backend]
417 type = "localfs"
418 [backend.localfs]
419 blob_file = "/tmp/nydus.blob.data"
420 dir = "/tmp"
421 alt_dirs = ["/var/nydus/cache"]
422 [cache]
423 type = "fscache"
424 compressed = false
425 validate = true
426 [cache.fscache]
427 work_dir = "/tmp"
428 "#;
429
430 let cfg: ConfigV2 = toml::from_str(content).unwrap();
431 let backend = MockBackend {
432 metrics: BackendMetrics::new("dummy", "localfs"),
433 };
434
435 let mut mgr: FsCacheMgr = FsCacheMgr::new(
436 cfg.get_cache_config().unwrap(),
437 Arc::new(backend),
438 ASYNC_RUNTIME.clone(),
439 &cfg.id,
440 0,
441 )
442 .unwrap();
443 assert!(mgr.init().is_ok());
444 mgr.work_dir = "../tests/texture/zran/".to_string();
445
446 let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR");
447 let path = PathBuf::from(root_dir).join("../tests/texture/zran/233c72f2b6b698c07021c4da367cfe2dff4f049efbaa885ca0ff760ea297865a");
448
449 let features = BlobFeatures::ALIGNED
450 | BlobFeatures::INLINED_FS_META
451 | BlobFeatures::CHUNK_INFO_V2
452 | BlobFeatures::ZRAN;
453
454 let mut blob_info = BlobInfo::new(
455 0,
456 "233c72f2b6b698c07021c4da367cfe2dff4f049efbaa885ca0ff760ea297865a".to_string(),
457 0x16c6000,
458 9839040,
459 RAFS_DEFAULT_CHUNK_SIZE as u32,
460 0xa3,
461 features,
462 );
463
464 blob_info.set_blob_meta_info(0, 0xa1290, 0xa1290, compress::Algorithm::None as u32);
465
466 let f1: File = OpenOptions::new()
467 .truncate(true)
468 .create(true)
469 .write(true)
470 .read(true)
471 .open(path.as_os_str())
472 .unwrap();
473 f1.set_len(800).unwrap();
474
475 blob_info.set_fscache_file(Some(Arc::new(f1.try_clone().unwrap())));
476
477 assert!(mgr.get_blob_cache(&Arc::new(blob_info.clone())).is_ok());
478 assert!(mgr.gc(Some(
479 "233c72f2b6b698c07021c4da367cfe2dff4f049efbaa885ca0ff760ea297865a"
480 )));
481 mgr.check_stat();
482 let _backend = mgr.backend();
483 mgr.destroy();
484 drop(mgr);
485 }
486}