nydus_service/
fs_cache.rs

1// Copyright (C) 2022 Alibaba Cloud. All rights reserved.
2//
3// SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
4
5//! Handler to expose RAFSv6 image through EROFS/fscache.
6//!
7//! The [`FsCacheHandler`] is the inter-connection between in kernel EROFS/fscache drivers
8//! and the user space [BlobCacheMgr](https://docs.rs/nydus-service/latest/nydus_service/blob_cache/struct.BlobCacheMgr.html).
9//! The workflow is as below:
10//! - EROFS presents a filesystem structure by parsing a RAFS image metadata blob.
11//! - EROFS sends requests to the fscache subsystem when user reads data from files.
12//! - Fscache subsystem send requests to [FsCacheHandler] if the requested data has been cached yet.
13//! - [FsCacheHandler] reads blob data from the [BlobCacheMgr] and sends back reply messages.
14
15use std::collections::hash_map::Entry::Vacant;
16use std::collections::HashMap;
17use std::convert::TryFrom;
18use std::fs::{self, File, OpenOptions};
19use std::io::{copy, Error, ErrorKind, Result, Write};
20use std::ops::Deref;
21use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
22use std::path::{Path, PathBuf};
23use std::ptr::read_unaligned;
24use std::string::String;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::{Arc, Barrier, Condvar, Mutex, MutexGuard, RwLock};
27use std::{cmp, env, thread, time};
28
29use mio::unix::SourceFd;
30use mio::{Events, Interest, Poll, Token, Waker};
31use nydus_storage::cache::BlobCache;
32use nydus_storage::device::BlobPrefetchRequest;
33use nydus_storage::factory::{ASYNC_RUNTIME, BLOB_FACTORY};
34
35use crate::blob_cache::{
36    generate_blob_key, BlobCacheMgr, BlobConfig, DataBlobConfig, MetaBlobConfig,
37};
38
39nix::ioctl_write_int!(fscache_cread, 0x98, 1);
40
41/// Maximum size of fscache request message from kernel.
42const MIN_DATA_BUF_SIZE: usize = 1024;
43const MSG_HEADER_SIZE: usize = 16;
44const MSG_OPEN_SIZE: usize = 16;
45const MSG_READ_SIZE: usize = 16;
46
47const TOKEN_EVENT_WAKER: usize = 1;
48const TOKEN_EVENT_FSCACHE: usize = 2;
49
50const BLOB_CACHE_INIT_RETRY: u8 = 5;
51const BLOB_CACHE_INIT_INTERVAL_MS: u64 = 300;
52
53/// Command code in requests from fscache driver.
54#[repr(u32)]
55#[derive(Debug, Eq, PartialEq)]
56enum FsCacheOpCode {
57    Open = 0,
58    Close = 1,
59    Read = 2,
60}
61
62impl TryFrom<u32> for FsCacheOpCode {
63    type Error = Error;
64
65    fn try_from(value: u32) -> std::result::Result<Self, Self::Error> {
66        match value {
67            0 => Ok(FsCacheOpCode::Open),
68            1 => Ok(FsCacheOpCode::Close),
69            2 => Ok(FsCacheOpCode::Read),
70            _ => Err(einval!(format!(
71                "fscache: invalid operation code {}",
72                value
73            ))),
74        }
75    }
76}
77
78/// Common header for request messages.
79#[repr(C)]
80#[derive(Debug, Eq, PartialEq)]
81struct FsCacheMsgHeader {
82    /// Message identifier to associate reply with request by the fscache driver.
83    msg_id: u32,
84    /// Message operation code.
85    opcode: FsCacheOpCode,
86    /// Message length, including message header and message body.
87    len: u32,
88    /// A unique ID identifying the cache file operated on.
89    object_id: u32,
90}
91
92impl TryFrom<&[u8]> for FsCacheMsgHeader {
93    type Error = Error;
94
95    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
96        if value.len() < MSG_HEADER_SIZE {
97            return Err(einval!(format!(
98                "fscache: request message size is too small, {}",
99                value.len()
100            )));
101        }
102
103        // Safe because we have verified buffer size.
104        let msg_id = unsafe { read_unaligned(value[0..4].as_ptr() as *const u32) };
105        let opcode = unsafe { read_unaligned(value[4..8].as_ptr() as *const u32) };
106        let len = unsafe { read_unaligned(value[8..12].as_ptr() as *const u32) };
107        let opcode = FsCacheOpCode::try_from(opcode)?;
108        let object_id = unsafe { read_unaligned(value[12..16].as_ptr() as *const u32) };
109        if len as usize != value.len() {
110            return Err(einval!(format!(
111                "fscache: message length {} does not match length from message header {}",
112                value.len(),
113                len
114            )));
115        }
116
117        Ok(FsCacheMsgHeader {
118            msg_id,
119            opcode,
120            len,
121            object_id,
122        })
123    }
124}
125
126/// Request message to open a file.
127///
128/// The opened file should be kept valid until corresponding `CLOSE` message has been received
129/// from the fscache driver.
130#[derive(Default, Debug, Eq, PartialEq)]
131struct FsCacheMsgOpen {
132    volume_key: String,
133    cookie_key: String,
134    fd: u32,
135    flags: u32,
136}
137
138impl TryFrom<&[u8]> for FsCacheMsgOpen {
139    type Error = Error;
140
141    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
142        if value.len() < MSG_OPEN_SIZE {
143            return Err(einval!(format!(
144                "fscache: request message size is too small, {}",
145                value.len()
146            )));
147        }
148
149        // Safe because we have verified buffer size.
150        let volume_key_size = unsafe { read_unaligned(value[0..4].as_ptr() as *const u32) };
151        let cookie_key_size = unsafe { read_unaligned(value[4..8].as_ptr() as *const u32) };
152        let fd = unsafe { read_unaligned(value[8..12].as_ptr() as *const u32) };
153        let flags = unsafe { read_unaligned(value[12..16].as_ptr() as *const u32) };
154        if volume_key_size.checked_add(cookie_key_size).is_none()
155            || (volume_key_size + cookie_key_size)
156                .checked_add(MSG_OPEN_SIZE as u32)
157                .is_none()
158        {
159            return Err(einval!(
160                "fscache: invalid volume/cookie key length in OPEN request"
161            ));
162        }
163        let total_sz = (volume_key_size + cookie_key_size) as usize + MSG_OPEN_SIZE;
164        if value.len() < total_sz {
165            return Err(einval!("fscache: invalid message length for OPEN request"));
166        }
167        let pos = MSG_OPEN_SIZE + volume_key_size as usize;
168        let volume_key = String::from_utf8(value[MSG_OPEN_SIZE..pos].to_vec())
169            .map_err(|_e| einval!("fscache: invalid volume key in OPEN request"))?
170            .trim_end_matches('\0')
171            .to_string();
172        let cookie_key = String::from_utf8(value[pos..pos + cookie_key_size as usize].to_vec())
173            .map_err(|_e| einval!("fscache: invalid cookie key in OPEN request"))?;
174
175        Ok(FsCacheMsgOpen {
176            volume_key,
177            cookie_key,
178            fd,
179            flags,
180        })
181    }
182}
183
184/// Request message to feed requested data into the cache file.
185#[repr(C)]
186#[derive(Default, Debug, Eq, PartialEq)]
187struct FsCacheMsgRead {
188    off: u64,
189    len: u64,
190}
191
192impl TryFrom<&[u8]> for FsCacheMsgRead {
193    type Error = Error;
194
195    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
196        if value.len() < MSG_READ_SIZE {
197            return Err(einval!(format!(
198                "fscache: request message size is too small, {}",
199                value.len()
200            )));
201        }
202
203        // Safe because we have verified buffer size.
204        let off = unsafe { read_unaligned(value[0..8].as_ptr() as *const u64) };
205        let len = unsafe { read_unaligned(value[8..16].as_ptr() as *const u64) };
206
207        Ok(FsCacheMsgRead { off, len })
208    }
209}
210
211struct FsCacheBootstrap {
212    bootstrap_file: File,
213    cache_file: File,
214}
215
216struct FsCacheBlobCache {
217    cache: Option<Arc<dyn BlobCache>>,
218    config: Arc<DataBlobConfig>,
219    file: Arc<File>,
220}
221
222impl FsCacheBlobCache {
223    fn set_blob_cache(&mut self, cache: Option<Arc<dyn BlobCache>>) {
224        self.cache = cache;
225    }
226
227    fn get_blob_cache(&self) -> Option<Arc<dyn BlobCache>> {
228        self.cache.clone()
229    }
230}
231
232#[derive(Clone)]
233enum FsCacheObject {
234    Bootstrap(Arc<FsCacheBootstrap>),
235    DataBlob(Arc<RwLock<FsCacheBlobCache>>),
236}
237
238/// Struct to maintain cached file objects.
239#[derive(Default)]
240struct FsCacheState {
241    id_to_object_map: HashMap<u32, (FsCacheObject, u32)>,
242    id_to_config_map: HashMap<u32, Arc<DataBlobConfig>>,
243    blob_cache_mgr: Arc<BlobCacheMgr>,
244}
245
246/// Handler to cooperate with Linux fscache driver to manage cached blob objects.
247///
248/// The `FsCacheHandler` create a communication channel with the Linux fscache driver, configure
249/// the communication session and serves all requests from the fscache driver.
250pub struct FsCacheHandler {
251    active: AtomicBool,
252    barrier: Barrier,
253    threads: usize,
254    file: File,
255    state: Arc<Mutex<FsCacheState>>,
256    poller: Mutex<Poll>,
257    waker: Arc<Waker>,
258    cache_dir: PathBuf,
259}
260
261impl FsCacheHandler {
262    /// Create a new instance of [FsCacheHandler].
263    pub fn new(
264        path: &str,
265        dir: &str,
266        tag: Option<&str>,
267        blob_cache_mgr: Arc<BlobCacheMgr>,
268        threads: usize,
269        restore_file: Option<&File>,
270    ) -> Result<Self> {
271        info!(
272            "fscache: create FsCacheHandler with dir {}, tag {}",
273            dir,
274            tag.unwrap_or("<None>")
275        );
276
277        let mut file = match restore_file {
278            None => OpenOptions::new()
279                .write(true)
280                .read(true)
281                .create(false)
282                .open(path)
283                .map_err(|e| {
284                    error!("Failed to open cachefiles device {}. {}", path, e);
285                    e
286                })?,
287            Some(f) => f.try_clone()?,
288        };
289
290        let poller =
291            Poll::new().map_err(|_e| eother!("fscache: failed to create poller for service"))?;
292        let waker = Waker::new(poller.registry(), Token(TOKEN_EVENT_WAKER))
293            .map_err(|_e| eother!("fscache: failed to create waker for service"))?;
294        poller
295            .registry()
296            .register(
297                &mut SourceFd(&file.as_raw_fd()),
298                Token(TOKEN_EVENT_FSCACHE),
299                Interest::READABLE,
300            )
301            .map_err(|_e| eother!("fscache: failed to register fd for service"))?;
302
303        if restore_file.is_none() {
304            // Initialize the fscache session
305            file.write_all(format!("dir {}", dir).as_bytes())?;
306            file.flush()?;
307            if let Some(tag) = tag {
308                file.write_all(format!("tag {}", tag).as_bytes())?;
309                file.flush()?;
310            }
311            file.write_all(b"bind ondemand")?;
312            file.flush()?;
313        } else {
314            // send restore cmd, if we are in restore process
315            file.write_all(b"restore")?;
316            file.flush()?;
317        }
318
319        let state = FsCacheState {
320            id_to_object_map: Default::default(),
321            id_to_config_map: Default::default(),
322            blob_cache_mgr,
323        };
324        let cache_dir = PathBuf::new().join(dir).join("cache");
325
326        Ok(FsCacheHandler {
327            active: AtomicBool::new(true),
328            barrier: Barrier::new(threads + 1),
329            threads,
330            file,
331            state: Arc::new(Mutex::new(state)),
332            poller: Mutex::new(poller),
333            waker: Arc::new(waker),
334            cache_dir,
335        })
336    }
337
338    /// Get number of working threads to service fscache requests.
339    pub fn working_threads(&self) -> usize {
340        self.threads
341    }
342
343    /// Stop worker threads for the fscache service.
344    pub fn stop(&self) {
345        self.active.store(false, Ordering::Release);
346        if let Err(e) = self.waker.wake() {
347            error!("fscache: failed to signal worker thread to exit, {}", e);
348        }
349        self.barrier.wait();
350    }
351
352    /// Run the event loop to handle all requests from kernel fscache driver.
353    ///
354    /// This method should only be invoked by a single thread, which will poll the fscache fd
355    /// and dispatch requests from fscache fd to other working threads.
356    pub fn run_loop(&self) -> Result<()> {
357        let mut events = Events::with_capacity(64);
358        let mut buf = vec![0u8; MIN_DATA_BUF_SIZE];
359
360        loop {
361            match self.poller.lock().unwrap().poll(&mut events, None) {
362                Ok(_) => {}
363                Err(e) if e.kind() == ErrorKind::Interrupted => continue,
364                Err(e) => {
365                    warn!("fscache: failed to poll events");
366                    return Err(e);
367                }
368            }
369
370            for event in events.iter() {
371                if event.is_error() {
372                    error!("fscache: got error event from poller");
373                    continue;
374                }
375                if event.token() == Token(TOKEN_EVENT_FSCACHE) {
376                    if event.is_readable() {
377                        self.handle_requests(&mut buf)?;
378                    }
379                } else if event.is_readable()
380                    && event.token() == Token(TOKEN_EVENT_WAKER)
381                    && !self.active.load(Ordering::Acquire)
382                {
383                    // Notify next worker to exit.
384                    let _ = self.waker.wake();
385                    self.barrier.wait();
386                    return Ok(());
387                }
388            }
389        }
390    }
391
392    pub fn get_file(&self) -> &File {
393        &self.file
394    }
395
396    /// Read and process all requests from fscache driver until no data available.
397    fn handle_requests(&self, buf: &mut [u8]) -> Result<()> {
398        loop {
399            let ret = unsafe {
400                libc::read(
401                    self.file.as_raw_fd(),
402                    buf.as_ptr() as *mut u8 as *mut libc::c_void,
403                    buf.len(),
404                )
405            };
406            match ret {
407                // A special behavior of old cachefile driver which returns zero if there's no
408                // pending requests instead of `ErrorKind::WouldBlock`.
409                0 => return Ok(()),
410                _i if _i > 0 => self.handle_one_request(&buf[0..ret as usize])?,
411                _ => {
412                    let err = Error::last_os_error();
413                    match err.kind() {
414                        ErrorKind::Interrupted => continue,
415                        ErrorKind::WouldBlock => return Ok(()),
416                        _ => return Err(err),
417                    }
418                }
419            }
420        }
421    }
422
423    fn handle_one_request(&self, buf: &[u8]) -> Result<()> {
424        let hdr = FsCacheMsgHeader::try_from(buf)?;
425        let buf = &buf[MSG_HEADER_SIZE..];
426
427        match hdr.opcode {
428            FsCacheOpCode::Open => {
429                let msg = FsCacheMsgOpen::try_from(buf)?;
430                self.handle_open_request(&hdr, &msg);
431            }
432            FsCacheOpCode::Close => {
433                self.handle_close_request(&hdr);
434            }
435            FsCacheOpCode::Read => {
436                let msg = FsCacheMsgRead::try_from(buf)?;
437                self.handle_read_request(&hdr, &msg);
438            }
439        }
440
441        Ok(())
442    }
443
444    fn handle_open_request(&self, hdr: &FsCacheMsgHeader, msg: &FsCacheMsgOpen) {
445        // Drop the 'erofs,' prefix if any
446        let domain_id = msg
447            .volume_key
448            .strip_prefix("erofs,")
449            .unwrap_or(msg.volume_key.as_str());
450
451        let key = generate_blob_key(domain_id, &msg.cookie_key);
452        match self.get_config(&key) {
453            None => {
454                unsafe { libc::close(msg.fd as i32) };
455                self.reply(&format!("copen {},{}", hdr.msg_id, -libc::ENOENT));
456            }
457            Some(cfg) => match cfg {
458                BlobConfig::DataBlob(config) => {
459                    let reply = self.handle_open_data_blob(hdr, msg, config);
460                    self.reply(&reply);
461                }
462                BlobConfig::MetaBlob(config) => {
463                    self.handle_open_bootstrap(hdr, msg, config);
464                }
465            },
466        }
467    }
468
469    fn handle_open_data_blob(
470        &self,
471        hdr: &FsCacheMsgHeader,
472        msg: &FsCacheMsgOpen,
473        config: Arc<DataBlobConfig>,
474    ) -> String {
475        let mut state = self.state.lock().unwrap();
476        if let Vacant(e) = state.id_to_object_map.entry(hdr.object_id) {
477            let fsblob = Arc::new(RwLock::new(FsCacheBlobCache {
478                cache: None,
479                config: config.clone(),
480                file: Arc::new(unsafe { File::from_raw_fd(msg.fd as RawFd) }),
481            }));
482            e.insert((FsCacheObject::DataBlob(fsblob.clone()), msg.fd));
483            state.id_to_config_map.insert(hdr.object_id, config.clone());
484            let blob_size = config.blob_info().deref().uncompressed_size();
485            let barrier = Arc::new(Barrier::new(2));
486            Self::init_blob_cache(fsblob, barrier.clone());
487            // make sure that the blobcache init thread have gotten writer lock before user daemon
488            // receives first request.
489            barrier.wait();
490            format!("copen {},{}", hdr.msg_id, blob_size)
491        } else {
492            unsafe { libc::close(msg.fd as i32) };
493            format!("copen {},{}", hdr.msg_id, -libc::EALREADY)
494        }
495    }
496
497    fn init_blob_cache(fsblob: Arc<RwLock<FsCacheBlobCache>>, barrier: Arc<Barrier>) {
498        thread::spawn(move || {
499            let mut guard = fsblob.write().unwrap();
500            barrier.wait();
501            //for now FsCacheBlobCache only init once, should not have blobcache associated with it
502            assert!(guard.get_blob_cache().is_none());
503            for _ in 0..BLOB_CACHE_INIT_RETRY {
504                match Self::create_data_blob_object(&guard.config, guard.file.clone()) {
505                    Err(e) => {
506                        warn!("fscache: create_data_blob_object failed {}", e);
507                        thread::sleep(time::Duration::from_millis(BLOB_CACHE_INIT_INTERVAL_MS));
508                    }
509                    Ok(blob) => {
510                        guard.set_blob_cache(Some(blob.clone()));
511                        if let Err(e) = Self::do_prefetch(&guard.config, blob.clone()) {
512                            warn!(
513                                "fscache: failed to prefetch data for blob {}, {}",
514                                blob.blob_id(),
515                                e
516                            );
517                        }
518                        break;
519                    }
520                }
521            }
522        });
523    }
524
525    fn do_prefetch(cfg: &DataBlobConfig, blob: Arc<dyn BlobCache>) -> Result<()> {
526        let blob_info = cfg.blob_info().deref();
527        let cache_cfg = cfg.config_v2().get_cache_config()?;
528        if !cache_cfg.prefetch.enable {
529            return Ok(());
530        }
531        blob.start_prefetch()
532            .map_err(|e| eother!(format!("failed to start prefetch worker, {}", e)))?;
533
534        let size = match cache_cfg.prefetch.batch_size.checked_next_power_of_two() {
535            None => nydus_api::default_prefetch_batch_size() as u64,
536            Some(1) => nydus_api::default_prefetch_batch_size() as u64,
537            Some(s) => s as u64,
538        };
539        let size = std::cmp::max(0x4_0000u64, size);
540        let blob_size = blob_info.compressed_data_size();
541        let count = blob_size.div_ceil(size);
542        let mut blob_req = Vec::with_capacity(count as usize);
543        let mut pre_offset = 0u64;
544        for _i in 0..count {
545            blob_req.push(BlobPrefetchRequest {
546                blob_id: blob_info.blob_id().to_owned(),
547                offset: pre_offset,
548                len: cmp::min(size, blob_size - pre_offset),
549            });
550            pre_offset += size;
551            if pre_offset >= blob_size {
552                break;
553            }
554        }
555
556        let id = blob.blob_id();
557        info!("fscache: start to prefetch data for blob {}", id);
558        if let Err(e) = blob.prefetch(blob.clone(), &blob_req, &[]) {
559            warn!("fscache: failed to prefetch data for blob {}, {}", id, e);
560        }
561
562        Ok(())
563    }
564
565    /// The `fscache` factory essentially creates a namespace for blob objects cached by the
566    /// fscache subsystem. The data blob files will be managed the in kernel fscache driver,
567    /// the chunk map file will be managed by the userspace daemon. We need to figure out the
568    /// way to share blob/chunkamp files with filecache manager.
569    fn create_data_blob_object(
570        config: &DataBlobConfig,
571        file: Arc<File>,
572    ) -> Result<Arc<dyn BlobCache>> {
573        let mut blob_info = config.blob_info().deref().clone();
574        blob_info.set_fscache_file(Some(file));
575        let blob_ref = Arc::new(blob_info);
576        BLOB_FACTORY.new_blob_cache(config.config_v2(), &blob_ref)
577    }
578
579    fn fill_bootstrap_cache(bootstrap: Arc<FsCacheBootstrap>) -> Result<u64> {
580        // Safe because bootstrap.bootstrap_file/cache_file are valid.
581        let mut src = unsafe { File::from_raw_fd(bootstrap.bootstrap_file.as_raw_fd()) };
582        let mut dst = unsafe { File::from_raw_fd(bootstrap.cache_file.as_raw_fd()) };
583        let ret = copy(&mut src, &mut dst);
584        std::mem::forget(src);
585        std::mem::forget(dst);
586        ret.map_err(|e| {
587            warn!("failed to copy content from bootstap into cache fd, {}", e);
588            e
589        })
590    }
591
592    fn handle_open_bootstrap(
593        &self,
594        hdr: &FsCacheMsgHeader,
595        msg: &FsCacheMsgOpen,
596        config: Arc<MetaBlobConfig>,
597    ) {
598        let path = config.path().display();
599        let condvar = Arc::new((Mutex::new(false), Condvar::new()));
600        let condvar2 = condvar.clone();
601        let mut state = self.get_state();
602
603        let ret: i64 = if let Vacant(e) = state.id_to_object_map.entry(hdr.object_id) {
604            match OpenOptions::new().read(true).open(config.path()) {
605                Err(e) => {
606                    warn!("fscache: failed to open bootstrap file {}, {}", path, e);
607                    -libc::ENOENT as i64
608                }
609                Ok(f) => match f.metadata() {
610                    Err(e) => {
611                        warn!("fscache: failed to open bootstrap file {}, {}", path, e);
612                        -libc::ENOENT as i64
613                    }
614                    Ok(md) => {
615                        let cache_file = unsafe { File::from_raw_fd(msg.fd as RawFd) };
616                        let bootstrap = Arc::new(FsCacheBootstrap {
617                            bootstrap_file: f,
618                            cache_file,
619                        });
620                        let object = FsCacheObject::Bootstrap(bootstrap.clone());
621                        e.insert((object, msg.fd));
622                        ASYNC_RUNTIME.spawn_blocking(|| async move {
623                            // Ensure copen reply message has been sent to kernel.
624                            {
625                                let (m, c) = condvar.as_ref();
626                                let mut g = m.lock().unwrap();
627                                while !*g {
628                                    g = c.wait(g).unwrap();
629                                }
630                            }
631
632                            for _i in 0..3 {
633                                if Self::fill_bootstrap_cache(bootstrap.clone()).is_ok() {
634                                    break;
635                                }
636                                tokio::time::sleep(time::Duration::from_secs(2)).await;
637                            }
638                        });
639                        md.len() as i64
640                    }
641                },
642            }
643        } else {
644            -libc::EALREADY as i64
645        };
646
647        if ret < 0 {
648            unsafe { libc::close(msg.fd as i32) };
649        }
650        self.reply(&format!("copen {},{}", hdr.msg_id, ret));
651        if ret >= 0 {
652            let (m, c) = condvar2.as_ref();
653            *m.lock().unwrap() = true;
654            c.notify_one();
655        }
656    }
657
658    fn handle_close_request(&self, hdr: &FsCacheMsgHeader) {
659        let mut state = self.get_state();
660
661        if let Some((FsCacheObject::DataBlob(fsblob), _)) =
662            state.id_to_object_map.remove(&hdr.object_id)
663        {
664            // Safe to unwrap() because `id_to_config_map` and `id_to_object_map` is kept
665            // in consistence.
666            let config = state.id_to_config_map.remove(&hdr.object_id).unwrap();
667            let factory_config = config.config_v2();
668            let guard = fsblob.read().unwrap();
669            match guard.get_blob_cache() {
670                Some(blob) => {
671                    if let Ok(cache_cfg) = factory_config.get_cache_config() {
672                        if cache_cfg.prefetch.enable {
673                            let _ = blob.stop_prefetch();
674                        }
675                    }
676                    let id = blob.blob_id().to_string();
677                    drop(blob);
678                    BLOB_FACTORY.gc(Some((factory_config, &id)));
679                }
680                _ => warn!("fscache: blob object not ready {}", hdr.object_id),
681            }
682        }
683    }
684
685    fn handle_read_request(&self, hdr: &FsCacheMsgHeader, msg: &FsCacheMsgRead) {
686        let fd: u32;
687
688        match self.get_object(hdr.object_id) {
689            None => {
690                warn!(
691                    "fscache: no cached file object found for obj_id {}",
692                    hdr.object_id
693                );
694                return;
695            }
696            Some((FsCacheObject::DataBlob(fsblob), u)) => {
697                fd = u;
698                let guard = fsblob.read().unwrap();
699                match guard.get_blob_cache() {
700                    Some(blob) => match blob.get_blob_object() {
701                        None => {
702                            warn!("fscache: internal error: cached object is not BlobCache objects")
703                        }
704                        Some(obj) => {
705                            if let Err(e) = obj.fetch_range_uncompressed(msg.off, msg.len) {
706                                error!("fscache: failed to read data from blob object: {}", e,);
707                            }
708                        }
709                    },
710                    _ => {
711                        //TODO: maybe we should retry init blob object here
712                        warn!("fscache: blob object not ready");
713                    }
714                }
715            }
716            Some((FsCacheObject::Bootstrap(bs), u)) => {
717                // TODO: should we feed the bootstrap at together to improve performance?
718                fd = u;
719                let base = unsafe {
720                    libc::mmap(
721                        std::ptr::null_mut(),
722                        msg.len as usize,
723                        libc::PROT_READ,
724                        libc::MAP_SHARED,
725                        bs.bootstrap_file.as_raw_fd(),
726                        msg.off as libc::off_t,
727                    )
728                };
729                if base == libc::MAP_FAILED {
730                    warn!(
731                        "fscache: failed to mmap bootstrap file, {}",
732                        std::io::Error::last_os_error()
733                    );
734                } else {
735                    let ret = unsafe {
736                        libc::pwrite(
737                            bs.cache_file.as_raw_fd(),
738                            base,
739                            msg.len as usize,
740                            msg.off as libc::off_t,
741                        )
742                    };
743                    let _ = unsafe { libc::munmap(base, msg.len as usize) };
744                    if ret < 0 {
745                        warn!(
746                            "fscache: failed to write bootstrap blob data to cached file, {}",
747                            std::io::Error::last_os_error()
748                        );
749                    }
750                }
751            }
752        }
753
754        if let Err(e) = unsafe { fscache_cread(fd as i32, hdr.msg_id as u64) } {
755            warn!("failed to send reply for cread request, {}", e);
756        }
757    }
758
759    /// Reclaim unused facache objects.
760    pub fn cull_cache(&self, blob_id: String) -> Result<()> {
761        let children = fs::read_dir(self.cache_dir.clone())?;
762        let mut res = true;
763        // This is safe, because only api server which is a single thread server will call this func,
764        // and no other func will change cwd.
765        let cwd_old = env::current_dir()?;
766
767        info!("try to cull blob {}", blob_id);
768
769        // calc blob path in all volumes then try to cull them
770        for child in children {
771            let child = child?;
772            let path = child.path();
773            let file_name = match child.file_name().to_str() {
774                Some(n) => n.to_string(),
775                None => {
776                    warn!("failed to get file name of {}", child.path().display());
777                    continue;
778                }
779            };
780            if !path.is_dir() || !file_name.starts_with("Ierofs,") {
781                continue;
782            }
783
784            // get volume_key form volume dir name e.g. Ierofs,SharedDomain
785            let volume_key = &file_name[1..];
786            let (cookie_dir, cookie_name) = self.generate_cookie_path(&path, volume_key, &blob_id);
787            let cookie_path = cookie_dir.join(&cookie_name);
788            if !cookie_path.is_file() {
789                continue;
790            }
791            let cookie_path = cookie_path.display();
792
793            match self.inuse(&cookie_dir, &cookie_name) {
794                Err(e) => {
795                    warn!("blob {} call inuse err {}, cull failed!", cookie_path, e);
796                    res = false;
797                }
798                Ok(true) => {
799                    warn!("blob {} in use, skip!", cookie_path);
800                    res = false;
801                }
802                Ok(false) => {
803                    if let Err(e) = self.cull(&cookie_dir, &cookie_name) {
804                        warn!("blob {} call cull err {}, cull failed!", cookie_path, e);
805                        res = false;
806                    }
807                }
808            }
809        }
810
811        env::set_current_dir(cwd_old)?;
812        if res {
813            Ok(())
814        } else {
815            Err(eother!("failed to cull blob objects from fscache"))
816        }
817    }
818
819    #[inline]
820    fn hash_32(&self, val: u32) -> u32 {
821        val * 0x61C88647
822    }
823
824    #[inline]
825    fn rol32(&self, word: u32, shift: i32) -> u32 {
826        word << (shift & 31) | (word >> ((-shift) & 31))
827    }
828
829    #[inline]
830    fn round_up_u32(&self, size: usize) -> usize {
831        (size + 3) / 4 * 4
832    }
833
834    //address from kernel fscache_hash()
835    fn fscache_hash(&self, salt: u32, data: &[u8]) -> u32 {
836        assert_eq!(data.len() % 4, 0);
837
838        let mut x = 0;
839        let mut y = salt;
840        let mut buf_le32: [u8; 4] = [0; 4];
841        let n = data.len() / 4;
842
843        for i in 0..n {
844            buf_le32.clone_from_slice(&data[i * 4..i * 4 + 4]);
845            let a = u32::from_ne_bytes(buf_le32).to_le();
846            x ^= a;
847            y ^= x;
848            x = self.rol32(x, 7);
849            x += y;
850            y = self.rol32(y, 20);
851            y *= 9;
852        }
853        self.hash_32(y ^ self.hash_32(x))
854    }
855
856    fn generate_cookie_path(
857        &self,
858        volume_path: &Path,
859        volume_key: &str,
860        cookie_key: &str,
861    ) -> (PathBuf, String) {
862        //calc volume hash
863        let mut volume_hash_key: Vec<u8> =
864            Vec::with_capacity(self.round_up_u32(volume_key.len() + 2));
865        volume_hash_key.push(volume_key.len() as u8);
866        volume_hash_key.append(&mut volume_key.as_bytes().to_vec());
867        volume_hash_key.resize(volume_hash_key.capacity(), 0);
868        let volume_hash = self.fscache_hash(0, volume_hash_key.as_slice());
869
870        //calc cookie hash
871        let mut cookie_hash_key: Vec<u8> = Vec::with_capacity(self.round_up_u32(cookie_key.len()));
872        cookie_hash_key.append(&mut cookie_key.as_bytes().to_vec());
873        cookie_hash_key.resize(cookie_hash_key.capacity(), 0);
874        let dir_hash = self.fscache_hash(volume_hash, cookie_hash_key.as_slice());
875
876        let dir = format!("@{:02x}", dir_hash as u8);
877        let cookie = format!("D{}", cookie_key);
878        (volume_path.join(dir), cookie)
879    }
880
881    fn inuse(&self, cookie_dir: &Path, cookie_name: &str) -> Result<bool> {
882        env::set_current_dir(cookie_dir)?;
883        let msg = format!("inuse {}", cookie_name);
884        let ret = unsafe {
885            libc::write(
886                self.file.as_raw_fd(),
887                msg.as_bytes().as_ptr() as *const u8 as *const libc::c_void,
888                msg.len(),
889            )
890        };
891        if ret < 0 {
892            let err = Error::last_os_error();
893            if let Some(e) = err.raw_os_error() {
894                if e == libc::EBUSY {
895                    return Ok(true);
896                }
897            }
898            Err(err)
899        } else {
900            Ok(false)
901        }
902    }
903
904    fn cull(&self, cookie_dir: &Path, cookie_name: &str) -> Result<()> {
905        env::set_current_dir(cookie_dir)?;
906        let msg = format!("cull {}", cookie_name);
907        let ret = unsafe {
908            libc::write(
909                self.file.as_raw_fd(),
910                msg.as_bytes().as_ptr() as *const u8 as *const libc::c_void,
911                msg.len(),
912            )
913        };
914        if ret as usize != msg.len() {
915            Err(Error::last_os_error())
916        } else {
917            Ok(())
918        }
919    }
920
921    #[inline]
922    fn reply(&self, result: &str) {
923        // Safe because the fd and data buffer are valid. And we trust the fscache driver which
924        // will never return error for write operations.
925        let ret = unsafe {
926            libc::write(
927                self.file.as_raw_fd(),
928                result.as_bytes().as_ptr() as *const u8 as *const libc::c_void,
929                result.len(),
930            )
931        };
932        if ret as usize != result.len() {
933            warn!(
934                "fscache: failed to send reply \"{}\", {}",
935                result,
936                std::io::Error::last_os_error()
937            );
938        }
939    }
940
941    #[inline]
942    fn get_state(&self) -> MutexGuard<'_, FsCacheState> {
943        self.state.lock().unwrap()
944    }
945
946    #[inline]
947    fn get_object(&self, object_id: u32) -> Option<(FsCacheObject, u32)> {
948        self.get_state().id_to_object_map.get(&object_id).cloned()
949    }
950
951    #[inline]
952    fn get_config(&self, key: &str) -> Option<BlobConfig> {
953        self.get_state().blob_cache_mgr.get_config(key)
954    }
955}
956
957impl AsRawFd for FsCacheHandler {
958    fn as_raw_fd(&self) -> RawFd {
959        self.file.as_raw_fd()
960    }
961}
962
963#[cfg(test)]
964mod tests {
965    use super::*;
966
967    #[test]
968    fn test_op_code() {
969        assert_eq!(FsCacheOpCode::try_from(0).unwrap(), FsCacheOpCode::Open);
970        assert_eq!(FsCacheOpCode::try_from(1).unwrap(), FsCacheOpCode::Close);
971        assert_eq!(FsCacheOpCode::try_from(2).unwrap(), FsCacheOpCode::Read);
972        FsCacheOpCode::try_from(3).unwrap_err();
973    }
974
975    #[test]
976    fn test_msg_header() {
977        let hdr = FsCacheMsgHeader::try_from(
978            vec![1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0, 0, 0].as_slice(),
979        )
980        .unwrap();
981        assert_eq!(hdr.msg_id, 0x1);
982        assert_eq!(hdr.opcode, FsCacheOpCode::Read);
983        assert_eq!(hdr.len, 17);
984        assert_eq!(hdr.object_id, 0x2);
985
986        FsCacheMsgHeader::try_from(vec![0u8, 0, 0, 1, 0, 0, 0, 3, 0, 0, 0, 13, 0].as_slice())
987            .unwrap_err();
988        FsCacheMsgHeader::try_from(vec![0u8, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 13].as_slice())
989            .unwrap_err();
990        FsCacheMsgHeader::try_from(vec![0u8, 0, 0, 1, 0, 0, 0, 2, 0, 0].as_slice()).unwrap_err();
991        FsCacheMsgHeader::try_from(vec![].as_slice()).unwrap_err();
992    }
993
994    #[test]
995    fn test_fs_cache_msg_open_try_from() {
996        // request message size too small
997        assert!(FsCacheMsgOpen::try_from(
998            vec![1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0].as_slice()
999        )
1000        .is_err());
1001
1002        // volume key size or cookie key size too large
1003        assert!(FsCacheMsgOpen::try_from(
1004            vec![255u8, 127, 127, 127, 255, 127, 127, 255, 17, 0, 0, 0, 2u8, 0, 0, 0, 4u8, 0, 0, 0]
1005                .as_slice()
1006        )
1007        .is_err());
1008        assert!(FsCacheMsgOpen::try_from(
1009            vec![
1010                255u8, 127, 127, 127, 241u8, 127, 128, 128, 17, 0, 0, 0, 2u8, 0, 0, 0, 4u8, 0, 0,
1011                0,
1012            ]
1013            .as_slice()
1014        )
1015        .is_err());
1016
1017        // value size too small
1018        assert!(FsCacheMsgOpen::try_from(
1019            vec![1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0, 0, 0].as_slice()
1020        )
1021        .is_err());
1022
1023        let res = FsCacheMsgOpen::try_from(
1024            vec![
1025                1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0, 0, 4u8, 0, 0, 0,
1026            ]
1027            .as_slice(),
1028        );
1029        assert!(res.is_ok());
1030        assert_eq!(
1031            res.unwrap(),
1032            FsCacheMsgOpen {
1033                volume_key: String::from("\u{4}"),
1034                cookie_key: String::from("\0\0"),
1035                fd: 17,
1036                flags: 2
1037            }
1038        );
1039    }
1040
1041    #[test]
1042    fn test_fs_cache_msg_read_try_from() {
1043        assert!(FsCacheMsgRead::try_from(
1044            vec![1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0].as_slice()
1045        )
1046        .is_err());
1047
1048        let res = FsCacheMsgRead::try_from(
1049            vec![1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0, 0].as_slice(),
1050        );
1051        assert!(res.is_ok());
1052        assert_eq!(
1053            res.unwrap(),
1054            FsCacheMsgRead {
1055                off: 8589934593,
1056                len: 8589934609,
1057            }
1058        );
1059    }
1060}