Skip to main content

nydus_storage/cache/fscache/
mod.rs

1// Copyright (C) 2022 Alibaba Cloud. All rights reserved.
2//
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::HashMap;
6use std::fs::File;
7use std::io::{Error, Result};
8use std::os::unix::io::AsRawFd;
9use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering};
10use std::sync::{Arc, RwLock};
11
12use nydus_api::CacheConfigV2;
13use nydus_utils::metrics::BlobcacheMetrics;
14use tokio::runtime::Runtime;
15
16use crate::backend::BlobBackend;
17use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta};
18use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX;
19use crate::cache::state::{BlobStateMap, IndexedChunkMap, RangeMap};
20use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
21#[cfg(feature = "dedup")]
22use crate::cache::CasMgr;
23use crate::cache::{BlobCache, BlobCacheMgr};
24use crate::device::{BlobFeatures, BlobInfo, BlobObject};
25use crate::factory::BLOB_FACTORY;
26use crate::utils::get_path_from_file;
27
28const FSCACHE_BLOBS_CHECK_NUM: u8 = 1;
29
30/// An implementation of [BlobCacheMgr](../trait.BlobCacheMgr.html) to improve performance by
31/// caching uncompressed blob with Linux fscache subsystem.
32#[derive(Clone)]
33pub struct FsCacheMgr {
34    blobs: Arc<RwLock<HashMap<String, Arc<FileCacheEntry>>>>,
35    backend: Arc<dyn BlobBackend>,
36    metrics: Arc<BlobcacheMetrics>,
37    prefetch_config: Arc<AsyncPrefetchConfig>,
38    runtime: Arc<Runtime>,
39    worker_mgr: Arc<AsyncWorkerMgr>,
40    work_dir: String,
41    need_validation: bool,
42    blobs_check_count: Arc<AtomicU8>,
43    closed: Arc<AtomicBool>,
44    user_io_batch_size: u32,
45}
46
47impl FsCacheMgr {
48    /// Create a new instance of `FileCacheMgr`.
49    pub fn new(
50        config: &CacheConfigV2,
51        backend: Arc<dyn BlobBackend>,
52        runtime: Arc<Runtime>,
53        id: &str,
54        user_io_batch_size: u32,
55    ) -> Result<FsCacheMgr> {
56        if config.cache_compressed {
57            return Err(enosys!("fscache doesn't support compressed cache mode"));
58        }
59
60        let blob_cfg = config.get_fscache_config()?;
61        let work_dir = blob_cfg.get_work_dir()?;
62        let metrics = BlobcacheMetrics::new(id, work_dir);
63        let prefetch_config: Arc<AsyncPrefetchConfig> = Arc::new((&config.prefetch).into());
64        let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?;
65
66        BLOB_FACTORY.start_mgr_checker();
67
68        Ok(FsCacheMgr {
69            blobs: Arc::new(RwLock::new(HashMap::new())),
70            backend,
71            metrics,
72            prefetch_config,
73            runtime,
74            worker_mgr: Arc::new(worker_mgr),
75            work_dir: work_dir.to_owned(),
76            need_validation: config.cache_validate,
77            blobs_check_count: Arc::new(AtomicU8::new(0)),
78            closed: Arc::new(AtomicBool::new(false)),
79            user_io_batch_size,
80        })
81    }
82
83    // Get the file cache entry for the specified blob object.
84    fn get(&self, blob: &Arc<BlobInfo>) -> Option<Arc<FileCacheEntry>> {
85        self.blobs.read().unwrap().get(&blob.blob_id()).cloned()
86    }
87
88    // Create a file cache entry for the specified blob object if not present, otherwise
89    // return the existing one.
90    fn get_or_create_cache_entry(&self, blob: &Arc<BlobInfo>) -> Result<Arc<FileCacheEntry>> {
91        if let Some(entry) = self.get(blob) {
92            return Ok(entry);
93        }
94
95        let entry = FileCacheEntry::new_fs_cache(
96            self,
97            blob.clone(),
98            self.prefetch_config.clone(),
99            self.runtime.clone(),
100            self.worker_mgr.clone(),
101        )?;
102        let entry = Arc::new(entry);
103        let mut guard = self.blobs.write().unwrap();
104        if let Some(entry) = guard.get(&blob.blob_id()) {
105            Ok(entry.clone())
106        } else {
107            let blob_id = blob.blob_id();
108            guard.insert(blob_id.clone(), entry.clone());
109            self.metrics
110                .underlying_files
111                .lock()
112                .unwrap()
113                .insert(blob_id + BLOB_DATA_FILE_SUFFIX);
114            Ok(entry)
115        }
116    }
117}
118
119impl BlobCacheMgr for FsCacheMgr {
120    fn init(&self) -> Result<()> {
121        AsyncWorkerMgr::start(self.worker_mgr.clone())
122    }
123
124    fn destroy(&self) {
125        if !self.closed.load(Ordering::Acquire) {
126            self.closed.store(true, Ordering::Release);
127            self.worker_mgr.stop();
128            self.backend().shutdown();
129            self.metrics.release().unwrap_or_else(|e| error!("{:?}", e));
130        }
131    }
132
133    fn gc(&self, id: Option<&str>) -> bool {
134        if let Some(blob_id) = id {
135            self.blobs.write().unwrap().remove(blob_id);
136        } else {
137            let mut reclaim = Vec::new();
138            let guard = self.blobs.write().unwrap();
139            for (id, entry) in guard.iter() {
140                if Arc::strong_count(entry) == 1 {
141                    reclaim.push(id.to_owned());
142                }
143            }
144            drop(guard);
145
146            for key in reclaim.iter() {
147                let mut guard = self.blobs.write().unwrap();
148                if let Some(entry) = guard.get(key) {
149                    if Arc::strong_count(entry) == 1 {
150                        guard.remove(key);
151                    }
152                }
153            }
154        }
155
156        self.blobs.read().unwrap().is_empty()
157    }
158
159    fn backend(&self) -> &dyn BlobBackend {
160        self.backend.as_ref()
161    }
162
163    fn get_blob_cache(&self, blob_info: &Arc<BlobInfo>) -> Result<Arc<dyn BlobCache>> {
164        self.get_or_create_cache_entry(blob_info)
165            .map(|v| v as Arc<dyn BlobCache>)
166    }
167
168    fn check_stat(&self) {
169        let guard = self.blobs.read().unwrap();
170
171        let mut all_ready = true;
172        for (_id, entry) in guard.iter() {
173            if !entry.is_all_data_ready() {
174                all_ready = false;
175                break;
176            }
177        }
178
179        // we should double check blobs stat, in case some blobs hadn't been created when we checked.
180        if all_ready {
181            if self.blobs_check_count.load(Ordering::Acquire) == FSCACHE_BLOBS_CHECK_NUM {
182                self.worker_mgr.stop();
183                self.metrics.data_all_ready.store(true, Ordering::Release);
184            } else {
185                self.blobs_check_count.fetch_add(1, Ordering::Acquire);
186            }
187        } else {
188            self.blobs_check_count.store(0, Ordering::Release);
189        }
190    }
191}
192
193impl Drop for FsCacheMgr {
194    fn drop(&mut self) {
195        self.destroy();
196    }
197}
198
199impl FileCacheEntry {
200    pub fn new_fs_cache(
201        mgr: &FsCacheMgr,
202        blob_info: Arc<BlobInfo>,
203        prefetch_config: Arc<AsyncPrefetchConfig>,
204        runtime: Arc<Runtime>,
205        workers: Arc<AsyncWorkerMgr>,
206    ) -> Result<Self> {
207        if blob_info.has_feature(BlobFeatures::_V5_NO_EXT_BLOB_TABLE) {
208            return Err(einval!("fscache does not support Rafs v5 blobs"));
209        }
210        let is_tarfs = blob_info.features().is_tarfs();
211        if is_tarfs {
212            return Err(einval!("fscache does not support RAFS in tarfs mode"));
213        }
214
215        let file = blob_info
216            .get_fscache_file()
217            .ok_or_else(|| einval!("No fscache file associated with the blob_info"))?;
218        let is_separate_meta = blob_info.has_feature(BlobFeatures::SEPARATE);
219        let is_batch = blob_info.has_feature(BlobFeatures::BATCH);
220        let is_zran = blob_info.has_feature(BlobFeatures::ZRAN);
221        let cache_cipher = blob_info.cipher();
222        let is_cache_encrypted = cache_cipher.is_encryption_enabled();
223        let blob_id = blob_info.blob_id();
224        let blob_meta_id = if is_separate_meta {
225            blob_info.get_blob_meta_id()?
226        } else {
227            blob_id.clone()
228        };
229        let reader = mgr
230            .backend
231            .get_reader(&blob_id)
232            .map_err(|_e| eio!("failed to get reader for data blob"))?;
233        let blob_meta_reader = if is_separate_meta {
234            mgr.backend.get_reader(&blob_meta_id).map_err(|e| {
235                eio!(format!(
236                    "failed to get reader for blob.meta {}, {}",
237                    blob_id, e
238                ))
239            })?
240        } else {
241            reader.clone()
242        };
243        let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?;
244
245        // Turn off chunk deduplication in case of tarfs.
246        let cas_mgr = if is_tarfs {
247            warn!("chunk deduplication trun off");
248            None
249        } else {
250            #[cfg(feature = "dedup")]
251            {
252                CasMgr::get_singleton()
253            }
254            #[cfg(not(feature = "dedup"))]
255            None
256        };
257
258        let need_validation = mgr.need_validation
259            && !blob_info.is_legacy_stargz()
260            && blob_info.has_feature(BlobFeatures::INLINED_CHUNK_DIGEST);
261        let load_chunk_digest = need_validation || cas_mgr.is_some();
262        let blob_file_path = format!("{}/{}", mgr.work_dir, blob_meta_id);
263        let meta = if blob_info.meta_ci_is_valid() {
264            FileCacheMeta::new(
265                blob_file_path.clone(),
266                blob_info.clone(),
267                Some(blob_meta_reader),
268                None,
269                true,
270                load_chunk_digest,
271            )?
272        } else {
273            return Err(enosys!(
274                "fscache doesn't support blobs without blob meta information"
275            ));
276        };
277
278        let chunk_map = Arc::new(BlobStateMap::from(IndexedChunkMap::new(
279            &format!("{}{}", blob_file_path, BLOB_DATA_FILE_SUFFIX),
280            blob_info.chunk_count(),
281            false,
282        )?));
283        Self::restore_chunk_map(blob_info.clone(), file.clone(), &meta, &chunk_map);
284
285        let mut blob_data_file_path = String::new();
286        if cas_mgr.is_some() {
287            blob_data_file_path = if let Some(path) = get_path_from_file(&file) {
288                path
289            } else {
290                warn!("can't get path from file");
291                "".to_string()
292            }
293        }
294
295        Ok(FileCacheEntry {
296            blob_id,
297            blob_info: blob_info.clone(),
298            cache_cipher_object: Default::default(),
299            cache_cipher_context: Default::default(),
300            cas_mgr,
301            chunk_map,
302            file,
303            file_path: Arc::new(blob_data_file_path),
304            meta: Some(meta),
305            metrics: mgr.metrics.clone(),
306            prefetch_state: Arc::new(AtomicU32::new(0)),
307            reader,
308            runtime,
309            workers,
310
311            blob_compressed_size,
312            blob_uncompressed_size: blob_info.uncompressed_size(),
313            is_get_blob_object_supported: true,
314            is_raw_data: false,
315            is_direct_chunkmap: true,
316            is_cache_encrypted,
317            is_legacy_stargz: blob_info.is_legacy_stargz(),
318            is_tarfs,
319            is_batch,
320            is_zran,
321            dio_enabled: true,
322            need_validation,
323            user_io_batch_size: mgr.user_io_batch_size,
324            prefetch_config,
325        })
326    }
327
328    fn restore_chunk_map(
329        blob_info: Arc<BlobInfo>,
330        file: Arc<File>,
331        meta: &FileCacheMeta,
332        chunk_map: &BlobStateMap<IndexedChunkMap, u32>,
333    ) {
334        let blob_meta = match meta.get_blob_meta() {
335            Some(v) => v,
336            None => {
337                warn!("failed to get blob meta object for blob, skip chunkmap recover");
338                return;
339            }
340        };
341
342        let mut i = 0;
343        while i < blob_info.chunk_count() {
344            let hole_offset = unsafe {
345                libc::lseek64(
346                    file.as_raw_fd(),
347                    blob_meta.get_uncompressed_offset(i as usize) as i64,
348                    libc::SEEK_HOLE,
349                )
350            };
351
352            if hole_offset < 0 {
353                warn!(
354                    "seek hole err {} for blob {}",
355                    Error::last_os_error(),
356                    blob_info.blob_id()
357                );
358                break;
359            }
360
361            if hole_offset as u64 == blob_info.uncompressed_size() {
362                debug!(
363                    "seek hole to file end, blob {} rest chunks {} - {} all ready",
364                    blob_info.blob_id(),
365                    i,
366                    blob_info.chunk_count() - 1,
367                );
368                if let Err(e) =
369                    chunk_map.set_range_ready_and_clear_pending(i, blob_info.chunk_count() - i)
370                {
371                    warn!("set range ready err {}", e);
372                }
373                break;
374            }
375
376            let hole_index = match blob_meta.get_chunk_index(hole_offset as u64) {
377                Ok(h) => h as u32,
378                Err(e) => {
379                    warn!("get offset chunk index err {}", e);
380                    break;
381                }
382            };
383            if hole_index > i {
384                debug!(
385                    "set blob {} rang {}-{} ready",
386                    blob_info.blob_id(),
387                    i,
388                    hole_index - 1,
389                );
390                if let Err(e) = chunk_map.set_range_ready_and_clear_pending(i, hole_index - i) {
391                    warn!("set range ready err {}", e);
392                    break;
393                }
394            }
395            i = hole_index + 1;
396        }
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use std::{fs::OpenOptions, path::PathBuf};
403
404    use nydus_api::ConfigV2;
405    use nydus_utils::{compress, metrics::BackendMetrics};
406
407    use crate::{factory::ASYNC_RUNTIME, test::MockBackend, RAFS_DEFAULT_CHUNK_SIZE};
408
409    use super::*;
410
411    #[test]
412    fn test_fs_cache_mgr() {
413        let content = r#"version=2
414        id = "my_id"
415        metadata_path = "meta_path"
416        [backend]
417        type = "localfs"
418        [backend.localfs]
419        blob_file = "/tmp/nydus.blob.data"
420        dir = "/tmp"
421        alt_dirs = ["/var/nydus/cache"]
422        [cache]
423        type = "fscache"
424        compressed = false
425        validate = true
426        [cache.fscache]
427        work_dir = "/tmp"
428        "#;
429
430        let cfg: ConfigV2 = toml::from_str(content).unwrap();
431        let backend = MockBackend {
432            metrics: BackendMetrics::new("dummy", "localfs"),
433        };
434
435        let mut mgr: FsCacheMgr = FsCacheMgr::new(
436            cfg.get_cache_config().unwrap(),
437            Arc::new(backend),
438            ASYNC_RUNTIME.clone(),
439            &cfg.id,
440            0,
441        )
442        .unwrap();
443        assert!(mgr.init().is_ok());
444        mgr.work_dir = "../tests/texture/zran/".to_string();
445
446        let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR");
447        let path = PathBuf::from(root_dir).join("../tests/texture/zran/233c72f2b6b698c07021c4da367cfe2dff4f049efbaa885ca0ff760ea297865a");
448
449        let features = BlobFeatures::ALIGNED
450            | BlobFeatures::INLINED_FS_META
451            | BlobFeatures::CHUNK_INFO_V2
452            | BlobFeatures::ZRAN;
453
454        let mut blob_info = BlobInfo::new(
455            0,
456            "233c72f2b6b698c07021c4da367cfe2dff4f049efbaa885ca0ff760ea297865a".to_string(),
457            0x16c6000,
458            9839040,
459            RAFS_DEFAULT_CHUNK_SIZE as u32,
460            0xa3,
461            features,
462        );
463
464        blob_info.set_blob_meta_info(0, 0xa1290, 0xa1290, compress::Algorithm::None as u32);
465
466        let f1: File = OpenOptions::new()
467            .truncate(true)
468            .create(true)
469            .write(true)
470            .read(true)
471            .open(path.as_os_str())
472            .unwrap();
473        f1.set_len(800).unwrap();
474
475        blob_info.set_fscache_file(Some(Arc::new(f1.try_clone().unwrap())));
476
477        assert!(mgr.get_blob_cache(&Arc::new(blob_info.clone())).is_ok());
478        assert!(mgr.gc(Some(
479            "233c72f2b6b698c07021c4da367cfe2dff4f049efbaa885ca0ff760ea297865a"
480        )));
481        mgr.check_stat();
482        let _backend = mgr.backend();
483        mgr.destroy();
484        drop(mgr);
485    }
486}