1use std::collections::hash_map::Entry::Vacant;
16use std::collections::HashMap;
17use std::convert::TryFrom;
18use std::fs::{self, File, OpenOptions};
19use std::io::{copy, Error, ErrorKind, Result, Write};
20use std::ops::Deref;
21use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
22use std::path::{Path, PathBuf};
23use std::ptr::read_unaligned;
24use std::string::String;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::{Arc, Barrier, Condvar, Mutex, MutexGuard, RwLock};
27use std::{cmp, env, thread, time};
28
29use mio::unix::SourceFd;
30use mio::{Events, Interest, Poll, Token, Waker};
31use nydus_storage::cache::BlobCache;
32use nydus_storage::device::BlobPrefetchRequest;
33use nydus_storage::factory::{ASYNC_RUNTIME, BLOB_FACTORY};
34
35use crate::blob_cache::{
36 generate_blob_key, BlobCacheMgr, BlobConfig, DataBlobConfig, MetaBlobConfig,
37};
38
39nix::ioctl_write_int!(fscache_cread, 0x98, 1);
40
41const MIN_DATA_BUF_SIZE: usize = 1024;
43const MSG_HEADER_SIZE: usize = 16;
44const MSG_OPEN_SIZE: usize = 16;
45const MSG_READ_SIZE: usize = 16;
46
47const TOKEN_EVENT_WAKER: usize = 1;
48const TOKEN_EVENT_FSCACHE: usize = 2;
49
50const BLOB_CACHE_INIT_RETRY: u8 = 5;
51const BLOB_CACHE_INIT_INTERVAL_MS: u64 = 300;
52
53#[repr(u32)]
55#[derive(Debug, Eq, PartialEq)]
56enum FsCacheOpCode {
57 Open = 0,
58 Close = 1,
59 Read = 2,
60}
61
62impl TryFrom<u32> for FsCacheOpCode {
63 type Error = Error;
64
65 fn try_from(value: u32) -> std::result::Result<Self, Self::Error> {
66 match value {
67 0 => Ok(FsCacheOpCode::Open),
68 1 => Ok(FsCacheOpCode::Close),
69 2 => Ok(FsCacheOpCode::Read),
70 _ => Err(einval!(format!(
71 "fscache: invalid operation code {}",
72 value
73 ))),
74 }
75 }
76}
77
78#[repr(C)]
80#[derive(Debug, Eq, PartialEq)]
81struct FsCacheMsgHeader {
82 msg_id: u32,
84 opcode: FsCacheOpCode,
86 len: u32,
88 object_id: u32,
90}
91
92impl TryFrom<&[u8]> for FsCacheMsgHeader {
93 type Error = Error;
94
95 fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
96 if value.len() < MSG_HEADER_SIZE {
97 return Err(einval!(format!(
98 "fscache: request message size is too small, {}",
99 value.len()
100 )));
101 }
102
103 let msg_id = unsafe { read_unaligned(value[0..4].as_ptr() as *const u32) };
105 let opcode = unsafe { read_unaligned(value[4..8].as_ptr() as *const u32) };
106 let len = unsafe { read_unaligned(value[8..12].as_ptr() as *const u32) };
107 let opcode = FsCacheOpCode::try_from(opcode)?;
108 let object_id = unsafe { read_unaligned(value[12..16].as_ptr() as *const u32) };
109 if len as usize != value.len() {
110 return Err(einval!(format!(
111 "fscache: message length {} does not match length from message header {}",
112 value.len(),
113 len
114 )));
115 }
116
117 Ok(FsCacheMsgHeader {
118 msg_id,
119 opcode,
120 len,
121 object_id,
122 })
123 }
124}
125
126#[derive(Default, Debug, Eq, PartialEq)]
131struct FsCacheMsgOpen {
132 volume_key: String,
133 cookie_key: String,
134 fd: u32,
135 flags: u32,
136}
137
138impl TryFrom<&[u8]> for FsCacheMsgOpen {
139 type Error = Error;
140
141 fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
142 if value.len() < MSG_OPEN_SIZE {
143 return Err(einval!(format!(
144 "fscache: request message size is too small, {}",
145 value.len()
146 )));
147 }
148
149 let volume_key_size = unsafe { read_unaligned(value[0..4].as_ptr() as *const u32) };
151 let cookie_key_size = unsafe { read_unaligned(value[4..8].as_ptr() as *const u32) };
152 let fd = unsafe { read_unaligned(value[8..12].as_ptr() as *const u32) };
153 let flags = unsafe { read_unaligned(value[12..16].as_ptr() as *const u32) };
154 if volume_key_size.checked_add(cookie_key_size).is_none()
155 || (volume_key_size + cookie_key_size)
156 .checked_add(MSG_OPEN_SIZE as u32)
157 .is_none()
158 {
159 return Err(einval!(
160 "fscache: invalid volume/cookie key length in OPEN request"
161 ));
162 }
163 let total_sz = (volume_key_size + cookie_key_size) as usize + MSG_OPEN_SIZE;
164 if value.len() < total_sz {
165 return Err(einval!("fscache: invalid message length for OPEN request"));
166 }
167 let pos = MSG_OPEN_SIZE + volume_key_size as usize;
168 let volume_key = String::from_utf8(value[MSG_OPEN_SIZE..pos].to_vec())
169 .map_err(|_e| einval!("fscache: invalid volume key in OPEN request"))?
170 .trim_end_matches('\0')
171 .to_string();
172 let cookie_key = String::from_utf8(value[pos..pos + cookie_key_size as usize].to_vec())
173 .map_err(|_e| einval!("fscache: invalid cookie key in OPEN request"))?;
174
175 Ok(FsCacheMsgOpen {
176 volume_key,
177 cookie_key,
178 fd,
179 flags,
180 })
181 }
182}
183
184#[repr(C)]
186#[derive(Default, Debug, Eq, PartialEq)]
187struct FsCacheMsgRead {
188 off: u64,
189 len: u64,
190}
191
192impl TryFrom<&[u8]> for FsCacheMsgRead {
193 type Error = Error;
194
195 fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
196 if value.len() < MSG_READ_SIZE {
197 return Err(einval!(format!(
198 "fscache: request message size is too small, {}",
199 value.len()
200 )));
201 }
202
203 let off = unsafe { read_unaligned(value[0..8].as_ptr() as *const u64) };
205 let len = unsafe { read_unaligned(value[8..16].as_ptr() as *const u64) };
206
207 Ok(FsCacheMsgRead { off, len })
208 }
209}
210
211struct FsCacheBootstrap {
212 bootstrap_file: File,
213 cache_file: File,
214}
215
216struct FsCacheBlobCache {
217 cache: Option<Arc<dyn BlobCache>>,
218 config: Arc<DataBlobConfig>,
219 file: Arc<File>,
220}
221
222impl FsCacheBlobCache {
223 fn set_blob_cache(&mut self, cache: Option<Arc<dyn BlobCache>>) {
224 self.cache = cache;
225 }
226
227 fn get_blob_cache(&self) -> Option<Arc<dyn BlobCache>> {
228 self.cache.clone()
229 }
230}
231
232#[derive(Clone)]
233enum FsCacheObject {
234 Bootstrap(Arc<FsCacheBootstrap>),
235 DataBlob(Arc<RwLock<FsCacheBlobCache>>),
236}
237
238#[derive(Default)]
240struct FsCacheState {
241 id_to_object_map: HashMap<u32, (FsCacheObject, u32)>,
242 id_to_config_map: HashMap<u32, Arc<DataBlobConfig>>,
243 blob_cache_mgr: Arc<BlobCacheMgr>,
244}
245
246pub struct FsCacheHandler {
251 active: AtomicBool,
252 barrier: Barrier,
253 threads: usize,
254 file: File,
255 state: Arc<Mutex<FsCacheState>>,
256 poller: Mutex<Poll>,
257 waker: Arc<Waker>,
258 cache_dir: PathBuf,
259}
260
261impl FsCacheHandler {
262 pub fn new(
264 path: &str,
265 dir: &str,
266 tag: Option<&str>,
267 blob_cache_mgr: Arc<BlobCacheMgr>,
268 threads: usize,
269 restore_file: Option<&File>,
270 ) -> Result<Self> {
271 info!(
272 "fscache: create FsCacheHandler with dir {}, tag {}",
273 dir,
274 tag.unwrap_or("<None>")
275 );
276
277 let mut file = match restore_file {
278 None => OpenOptions::new()
279 .write(true)
280 .read(true)
281 .create(false)
282 .open(path)
283 .map_err(|e| {
284 error!("Failed to open cachefiles device {}. {}", path, e);
285 e
286 })?,
287 Some(f) => f.try_clone()?,
288 };
289
290 let poller =
291 Poll::new().map_err(|_e| eother!("fscache: failed to create poller for service"))?;
292 let waker = Waker::new(poller.registry(), Token(TOKEN_EVENT_WAKER))
293 .map_err(|_e| eother!("fscache: failed to create waker for service"))?;
294 poller
295 .registry()
296 .register(
297 &mut SourceFd(&file.as_raw_fd()),
298 Token(TOKEN_EVENT_FSCACHE),
299 Interest::READABLE,
300 )
301 .map_err(|_e| eother!("fscache: failed to register fd for service"))?;
302
303 if restore_file.is_none() {
304 file.write_all(format!("dir {}", dir).as_bytes())?;
306 file.flush()?;
307 if let Some(tag) = tag {
308 file.write_all(format!("tag {}", tag).as_bytes())?;
309 file.flush()?;
310 }
311 file.write_all(b"bind ondemand")?;
312 file.flush()?;
313 } else {
314 file.write_all(b"restore")?;
316 file.flush()?;
317 }
318
319 let state = FsCacheState {
320 id_to_object_map: Default::default(),
321 id_to_config_map: Default::default(),
322 blob_cache_mgr,
323 };
324 let cache_dir = PathBuf::new().join(dir).join("cache");
325
326 Ok(FsCacheHandler {
327 active: AtomicBool::new(true),
328 barrier: Barrier::new(threads + 1),
329 threads,
330 file,
331 state: Arc::new(Mutex::new(state)),
332 poller: Mutex::new(poller),
333 waker: Arc::new(waker),
334 cache_dir,
335 })
336 }
337
338 pub fn working_threads(&self) -> usize {
340 self.threads
341 }
342
343 pub fn stop(&self) {
345 self.active.store(false, Ordering::Release);
346 if let Err(e) = self.waker.wake() {
347 error!("fscache: failed to signal worker thread to exit, {}", e);
348 }
349 self.barrier.wait();
350 }
351
352 pub fn run_loop(&self) -> Result<()> {
357 let mut events = Events::with_capacity(64);
358 let mut buf = vec![0u8; MIN_DATA_BUF_SIZE];
359
360 loop {
361 match self.poller.lock().unwrap().poll(&mut events, None) {
362 Ok(_) => {}
363 Err(e) if e.kind() == ErrorKind::Interrupted => continue,
364 Err(e) => {
365 warn!("fscache: failed to poll events");
366 return Err(e);
367 }
368 }
369
370 for event in events.iter() {
371 if event.is_error() {
372 error!("fscache: got error event from poller");
373 continue;
374 }
375 if event.token() == Token(TOKEN_EVENT_FSCACHE) {
376 if event.is_readable() {
377 self.handle_requests(&mut buf)?;
378 }
379 } else if event.is_readable()
380 && event.token() == Token(TOKEN_EVENT_WAKER)
381 && !self.active.load(Ordering::Acquire)
382 {
383 let _ = self.waker.wake();
385 self.barrier.wait();
386 return Ok(());
387 }
388 }
389 }
390 }
391
392 pub fn get_file(&self) -> &File {
393 &self.file
394 }
395
396 fn handle_requests(&self, buf: &mut [u8]) -> Result<()> {
398 loop {
399 let ret = unsafe {
400 libc::read(
401 self.file.as_raw_fd(),
402 buf.as_ptr() as *mut u8 as *mut libc::c_void,
403 buf.len(),
404 )
405 };
406 match ret {
407 0 => return Ok(()),
410 _i if _i > 0 => self.handle_one_request(&buf[0..ret as usize])?,
411 _ => {
412 let err = Error::last_os_error();
413 match err.kind() {
414 ErrorKind::Interrupted => continue,
415 ErrorKind::WouldBlock => return Ok(()),
416 _ => return Err(err),
417 }
418 }
419 }
420 }
421 }
422
423 fn handle_one_request(&self, buf: &[u8]) -> Result<()> {
424 let hdr = FsCacheMsgHeader::try_from(buf)?;
425 let buf = &buf[MSG_HEADER_SIZE..];
426
427 match hdr.opcode {
428 FsCacheOpCode::Open => {
429 let msg = FsCacheMsgOpen::try_from(buf)?;
430 self.handle_open_request(&hdr, &msg);
431 }
432 FsCacheOpCode::Close => {
433 self.handle_close_request(&hdr);
434 }
435 FsCacheOpCode::Read => {
436 let msg = FsCacheMsgRead::try_from(buf)?;
437 self.handle_read_request(&hdr, &msg);
438 }
439 }
440
441 Ok(())
442 }
443
444 fn handle_open_request(&self, hdr: &FsCacheMsgHeader, msg: &FsCacheMsgOpen) {
445 let domain_id = msg
447 .volume_key
448 .strip_prefix("erofs,")
449 .unwrap_or(msg.volume_key.as_str());
450
451 let key = generate_blob_key(domain_id, &msg.cookie_key);
452 match self.get_config(&key) {
453 None => {
454 unsafe { libc::close(msg.fd as i32) };
455 self.reply(&format!("copen {},{}", hdr.msg_id, -libc::ENOENT));
456 }
457 Some(cfg) => match cfg {
458 BlobConfig::DataBlob(config) => {
459 let reply = self.handle_open_data_blob(hdr, msg, config);
460 self.reply(&reply);
461 }
462 BlobConfig::MetaBlob(config) => {
463 self.handle_open_bootstrap(hdr, msg, config);
464 }
465 },
466 }
467 }
468
469 fn handle_open_data_blob(
470 &self,
471 hdr: &FsCacheMsgHeader,
472 msg: &FsCacheMsgOpen,
473 config: Arc<DataBlobConfig>,
474 ) -> String {
475 let mut state = self.state.lock().unwrap();
476 if let Vacant(e) = state.id_to_object_map.entry(hdr.object_id) {
477 let fsblob = Arc::new(RwLock::new(FsCacheBlobCache {
478 cache: None,
479 config: config.clone(),
480 file: Arc::new(unsafe { File::from_raw_fd(msg.fd as RawFd) }),
481 }));
482 e.insert((FsCacheObject::DataBlob(fsblob.clone()), msg.fd));
483 state.id_to_config_map.insert(hdr.object_id, config.clone());
484 let blob_size = config.blob_info().deref().uncompressed_size();
485 let barrier = Arc::new(Barrier::new(2));
486 Self::init_blob_cache(fsblob, barrier.clone());
487 barrier.wait();
490 format!("copen {},{}", hdr.msg_id, blob_size)
491 } else {
492 unsafe { libc::close(msg.fd as i32) };
493 format!("copen {},{}", hdr.msg_id, -libc::EALREADY)
494 }
495 }
496
497 fn init_blob_cache(fsblob: Arc<RwLock<FsCacheBlobCache>>, barrier: Arc<Barrier>) {
498 thread::spawn(move || {
499 let mut guard = fsblob.write().unwrap();
500 barrier.wait();
501 assert!(guard.get_blob_cache().is_none());
503 for _ in 0..BLOB_CACHE_INIT_RETRY {
504 match Self::create_data_blob_object(&guard.config, guard.file.clone()) {
505 Err(e) => {
506 warn!("fscache: create_data_blob_object failed {}", e);
507 thread::sleep(time::Duration::from_millis(BLOB_CACHE_INIT_INTERVAL_MS));
508 }
509 Ok(blob) => {
510 guard.set_blob_cache(Some(blob.clone()));
511 if let Err(e) = Self::do_prefetch(&guard.config, blob.clone()) {
512 warn!(
513 "fscache: failed to prefetch data for blob {}, {}",
514 blob.blob_id(),
515 e
516 );
517 }
518 break;
519 }
520 }
521 }
522 });
523 }
524
525 fn do_prefetch(cfg: &DataBlobConfig, blob: Arc<dyn BlobCache>) -> Result<()> {
526 let blob_info = cfg.blob_info().deref();
527 let cache_cfg = cfg.config_v2().get_cache_config()?;
528 if !cache_cfg.prefetch.enable {
529 return Ok(());
530 }
531 blob.start_prefetch()
532 .map_err(|e| eother!(format!("failed to start prefetch worker, {}", e)))?;
533
534 let size = match cache_cfg.prefetch.batch_size.checked_next_power_of_two() {
535 None => nydus_api::default_prefetch_batch_size() as u64,
536 Some(1) => nydus_api::default_prefetch_batch_size() as u64,
537 Some(s) => s as u64,
538 };
539 let size = std::cmp::max(0x4_0000u64, size);
540 let blob_size = blob_info.compressed_data_size();
541 let count = blob_size.div_ceil(size);
542 let mut blob_req = Vec::with_capacity(count as usize);
543 let mut pre_offset = 0u64;
544 for _i in 0..count {
545 blob_req.push(BlobPrefetchRequest {
546 blob_id: blob_info.blob_id().to_owned(),
547 offset: pre_offset,
548 len: cmp::min(size, blob_size - pre_offset),
549 });
550 pre_offset += size;
551 if pre_offset >= blob_size {
552 break;
553 }
554 }
555
556 let id = blob.blob_id();
557 info!("fscache: start to prefetch data for blob {}", id);
558 if let Err(e) = blob.prefetch(blob.clone(), &blob_req, &[]) {
559 warn!("fscache: failed to prefetch data for blob {}, {}", id, e);
560 }
561
562 Ok(())
563 }
564
565 fn create_data_blob_object(
570 config: &DataBlobConfig,
571 file: Arc<File>,
572 ) -> Result<Arc<dyn BlobCache>> {
573 let mut blob_info = config.blob_info().deref().clone();
574 blob_info.set_fscache_file(Some(file));
575 let blob_ref = Arc::new(blob_info);
576 BLOB_FACTORY.new_blob_cache(config.config_v2(), &blob_ref)
577 }
578
579 fn fill_bootstrap_cache(bootstrap: Arc<FsCacheBootstrap>) -> Result<u64> {
580 let mut src = unsafe { File::from_raw_fd(bootstrap.bootstrap_file.as_raw_fd()) };
582 let mut dst = unsafe { File::from_raw_fd(bootstrap.cache_file.as_raw_fd()) };
583 let ret = copy(&mut src, &mut dst);
584 std::mem::forget(src);
585 std::mem::forget(dst);
586 ret.map_err(|e| {
587 warn!("failed to copy content from bootstap into cache fd, {}", e);
588 e
589 })
590 }
591
592 fn handle_open_bootstrap(
593 &self,
594 hdr: &FsCacheMsgHeader,
595 msg: &FsCacheMsgOpen,
596 config: Arc<MetaBlobConfig>,
597 ) {
598 let path = config.path().display();
599 let condvar = Arc::new((Mutex::new(false), Condvar::new()));
600 let condvar2 = condvar.clone();
601 let mut state = self.get_state();
602
603 let ret: i64 = if let Vacant(e) = state.id_to_object_map.entry(hdr.object_id) {
604 match OpenOptions::new().read(true).open(config.path()) {
605 Err(e) => {
606 warn!("fscache: failed to open bootstrap file {}, {}", path, e);
607 -libc::ENOENT as i64
608 }
609 Ok(f) => match f.metadata() {
610 Err(e) => {
611 warn!("fscache: failed to open bootstrap file {}, {}", path, e);
612 -libc::ENOENT as i64
613 }
614 Ok(md) => {
615 let cache_file = unsafe { File::from_raw_fd(msg.fd as RawFd) };
616 let bootstrap = Arc::new(FsCacheBootstrap {
617 bootstrap_file: f,
618 cache_file,
619 });
620 let object = FsCacheObject::Bootstrap(bootstrap.clone());
621 e.insert((object, msg.fd));
622 ASYNC_RUNTIME.spawn_blocking(|| async move {
623 {
625 let (m, c) = condvar.as_ref();
626 let mut g = m.lock().unwrap();
627 while !*g {
628 g = c.wait(g).unwrap();
629 }
630 }
631
632 for _i in 0..3 {
633 if Self::fill_bootstrap_cache(bootstrap.clone()).is_ok() {
634 break;
635 }
636 tokio::time::sleep(time::Duration::from_secs(2)).await;
637 }
638 });
639 md.len() as i64
640 }
641 },
642 }
643 } else {
644 -libc::EALREADY as i64
645 };
646
647 if ret < 0 {
648 unsafe { libc::close(msg.fd as i32) };
649 }
650 self.reply(&format!("copen {},{}", hdr.msg_id, ret));
651 if ret >= 0 {
652 let (m, c) = condvar2.as_ref();
653 *m.lock().unwrap() = true;
654 c.notify_one();
655 }
656 }
657
658 fn handle_close_request(&self, hdr: &FsCacheMsgHeader) {
659 let mut state = self.get_state();
660
661 if let Some((FsCacheObject::DataBlob(fsblob), _)) =
662 state.id_to_object_map.remove(&hdr.object_id)
663 {
664 let config = state.id_to_config_map.remove(&hdr.object_id).unwrap();
667 let factory_config = config.config_v2();
668 let guard = fsblob.read().unwrap();
669 match guard.get_blob_cache() {
670 Some(blob) => {
671 if let Ok(cache_cfg) = factory_config.get_cache_config() {
672 if cache_cfg.prefetch.enable {
673 let _ = blob.stop_prefetch();
674 }
675 }
676 let id = blob.blob_id().to_string();
677 drop(blob);
678 BLOB_FACTORY.gc(Some((factory_config, &id)));
679 }
680 _ => warn!("fscache: blob object not ready {}", hdr.object_id),
681 }
682 }
683 }
684
685 fn handle_read_request(&self, hdr: &FsCacheMsgHeader, msg: &FsCacheMsgRead) {
686 let fd: u32;
687
688 match self.get_object(hdr.object_id) {
689 None => {
690 warn!(
691 "fscache: no cached file object found for obj_id {}",
692 hdr.object_id
693 );
694 return;
695 }
696 Some((FsCacheObject::DataBlob(fsblob), u)) => {
697 fd = u;
698 let guard = fsblob.read().unwrap();
699 match guard.get_blob_cache() {
700 Some(blob) => match blob.get_blob_object() {
701 None => {
702 warn!("fscache: internal error: cached object is not BlobCache objects")
703 }
704 Some(obj) => {
705 if let Err(e) = obj.fetch_range_uncompressed(msg.off, msg.len) {
706 error!("fscache: failed to read data from blob object: {}", e,);
707 }
708 }
709 },
710 _ => {
711 warn!("fscache: blob object not ready");
713 }
714 }
715 }
716 Some((FsCacheObject::Bootstrap(bs), u)) => {
717 fd = u;
719 let base = unsafe {
720 libc::mmap(
721 std::ptr::null_mut(),
722 msg.len as usize,
723 libc::PROT_READ,
724 libc::MAP_SHARED,
725 bs.bootstrap_file.as_raw_fd(),
726 msg.off as libc::off_t,
727 )
728 };
729 if base == libc::MAP_FAILED {
730 warn!(
731 "fscache: failed to mmap bootstrap file, {}",
732 std::io::Error::last_os_error()
733 );
734 } else {
735 let ret = unsafe {
736 libc::pwrite(
737 bs.cache_file.as_raw_fd(),
738 base,
739 msg.len as usize,
740 msg.off as libc::off_t,
741 )
742 };
743 let _ = unsafe { libc::munmap(base, msg.len as usize) };
744 if ret < 0 {
745 warn!(
746 "fscache: failed to write bootstrap blob data to cached file, {}",
747 std::io::Error::last_os_error()
748 );
749 }
750 }
751 }
752 }
753
754 if let Err(e) = unsafe { fscache_cread(fd as i32, hdr.msg_id as u64) } {
755 warn!("failed to send reply for cread request, {}", e);
756 }
757 }
758
759 pub fn cull_cache(&self, blob_id: String) -> Result<()> {
761 let children = fs::read_dir(self.cache_dir.clone())?;
762 let mut res = true;
763 let cwd_old = env::current_dir()?;
766
767 info!("try to cull blob {}", blob_id);
768
769 for child in children {
771 let child = child?;
772 let path = child.path();
773 let file_name = match child.file_name().to_str() {
774 Some(n) => n.to_string(),
775 None => {
776 warn!("failed to get file name of {}", child.path().display());
777 continue;
778 }
779 };
780 if !path.is_dir() || !file_name.starts_with("Ierofs,") {
781 continue;
782 }
783
784 let volume_key = &file_name[1..];
786 let (cookie_dir, cookie_name) = self.generate_cookie_path(&path, volume_key, &blob_id);
787 let cookie_path = cookie_dir.join(&cookie_name);
788 if !cookie_path.is_file() {
789 continue;
790 }
791 let cookie_path = cookie_path.display();
792
793 match self.inuse(&cookie_dir, &cookie_name) {
794 Err(e) => {
795 warn!("blob {} call inuse err {}, cull failed!", cookie_path, e);
796 res = false;
797 }
798 Ok(true) => {
799 warn!("blob {} in use, skip!", cookie_path);
800 res = false;
801 }
802 Ok(false) => {
803 if let Err(e) = self.cull(&cookie_dir, &cookie_name) {
804 warn!("blob {} call cull err {}, cull failed!", cookie_path, e);
805 res = false;
806 }
807 }
808 }
809 }
810
811 env::set_current_dir(cwd_old)?;
812 if res {
813 Ok(())
814 } else {
815 Err(eother!("failed to cull blob objects from fscache"))
816 }
817 }
818
819 #[inline]
820 fn hash_32(&self, val: u32) -> u32 {
821 val * 0x61C88647
822 }
823
824 #[inline]
825 fn rol32(&self, word: u32, shift: i32) -> u32 {
826 word << (shift & 31) | (word >> ((-shift) & 31))
827 }
828
829 #[inline]
830 fn round_up_u32(&self, size: usize) -> usize {
831 (size + 3) / 4 * 4
832 }
833
834 fn fscache_hash(&self, salt: u32, data: &[u8]) -> u32 {
836 assert_eq!(data.len() % 4, 0);
837
838 let mut x = 0;
839 let mut y = salt;
840 let mut buf_le32: [u8; 4] = [0; 4];
841 let n = data.len() / 4;
842
843 for i in 0..n {
844 buf_le32.clone_from_slice(&data[i * 4..i * 4 + 4]);
845 let a = u32::from_ne_bytes(buf_le32).to_le();
846 x ^= a;
847 y ^= x;
848 x = self.rol32(x, 7);
849 x += y;
850 y = self.rol32(y, 20);
851 y *= 9;
852 }
853 self.hash_32(y ^ self.hash_32(x))
854 }
855
856 fn generate_cookie_path(
857 &self,
858 volume_path: &Path,
859 volume_key: &str,
860 cookie_key: &str,
861 ) -> (PathBuf, String) {
862 let mut volume_hash_key: Vec<u8> =
864 Vec::with_capacity(self.round_up_u32(volume_key.len() + 2));
865 volume_hash_key.push(volume_key.len() as u8);
866 volume_hash_key.append(&mut volume_key.as_bytes().to_vec());
867 volume_hash_key.resize(volume_hash_key.capacity(), 0);
868 let volume_hash = self.fscache_hash(0, volume_hash_key.as_slice());
869
870 let mut cookie_hash_key: Vec<u8> = Vec::with_capacity(self.round_up_u32(cookie_key.len()));
872 cookie_hash_key.append(&mut cookie_key.as_bytes().to_vec());
873 cookie_hash_key.resize(cookie_hash_key.capacity(), 0);
874 let dir_hash = self.fscache_hash(volume_hash, cookie_hash_key.as_slice());
875
876 let dir = format!("@{:02x}", dir_hash as u8);
877 let cookie = format!("D{}", cookie_key);
878 (volume_path.join(dir), cookie)
879 }
880
881 fn inuse(&self, cookie_dir: &Path, cookie_name: &str) -> Result<bool> {
882 env::set_current_dir(cookie_dir)?;
883 let msg = format!("inuse {}", cookie_name);
884 let ret = unsafe {
885 libc::write(
886 self.file.as_raw_fd(),
887 msg.as_bytes().as_ptr() as *const u8 as *const libc::c_void,
888 msg.len(),
889 )
890 };
891 if ret < 0 {
892 let err = Error::last_os_error();
893 if let Some(e) = err.raw_os_error() {
894 if e == libc::EBUSY {
895 return Ok(true);
896 }
897 }
898 Err(err)
899 } else {
900 Ok(false)
901 }
902 }
903
904 fn cull(&self, cookie_dir: &Path, cookie_name: &str) -> Result<()> {
905 env::set_current_dir(cookie_dir)?;
906 let msg = format!("cull {}", cookie_name);
907 let ret = unsafe {
908 libc::write(
909 self.file.as_raw_fd(),
910 msg.as_bytes().as_ptr() as *const u8 as *const libc::c_void,
911 msg.len(),
912 )
913 };
914 if ret as usize != msg.len() {
915 Err(Error::last_os_error())
916 } else {
917 Ok(())
918 }
919 }
920
921 #[inline]
922 fn reply(&self, result: &str) {
923 let ret = unsafe {
926 libc::write(
927 self.file.as_raw_fd(),
928 result.as_bytes().as_ptr() as *const u8 as *const libc::c_void,
929 result.len(),
930 )
931 };
932 if ret as usize != result.len() {
933 warn!(
934 "fscache: failed to send reply \"{}\", {}",
935 result,
936 std::io::Error::last_os_error()
937 );
938 }
939 }
940
941 #[inline]
942 fn get_state(&self) -> MutexGuard<'_, FsCacheState> {
943 self.state.lock().unwrap()
944 }
945
946 #[inline]
947 fn get_object(&self, object_id: u32) -> Option<(FsCacheObject, u32)> {
948 self.get_state().id_to_object_map.get(&object_id).cloned()
949 }
950
951 #[inline]
952 fn get_config(&self, key: &str) -> Option<BlobConfig> {
953 self.get_state().blob_cache_mgr.get_config(key)
954 }
955}
956
957impl AsRawFd for FsCacheHandler {
958 fn as_raw_fd(&self) -> RawFd {
959 self.file.as_raw_fd()
960 }
961}
962
963#[cfg(test)]
964mod tests {
965 use super::*;
966
967 #[test]
968 fn test_op_code() {
969 assert_eq!(FsCacheOpCode::try_from(0).unwrap(), FsCacheOpCode::Open);
970 assert_eq!(FsCacheOpCode::try_from(1).unwrap(), FsCacheOpCode::Close);
971 assert_eq!(FsCacheOpCode::try_from(2).unwrap(), FsCacheOpCode::Read);
972 FsCacheOpCode::try_from(3).unwrap_err();
973 }
974
975 #[test]
976 fn test_msg_header() {
977 let hdr = FsCacheMsgHeader::try_from(
978 vec![1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0, 0, 0].as_slice(),
979 )
980 .unwrap();
981 assert_eq!(hdr.msg_id, 0x1);
982 assert_eq!(hdr.opcode, FsCacheOpCode::Read);
983 assert_eq!(hdr.len, 17);
984 assert_eq!(hdr.object_id, 0x2);
985
986 FsCacheMsgHeader::try_from(vec![0u8, 0, 0, 1, 0, 0, 0, 3, 0, 0, 0, 13, 0].as_slice())
987 .unwrap_err();
988 FsCacheMsgHeader::try_from(vec![0u8, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 13].as_slice())
989 .unwrap_err();
990 FsCacheMsgHeader::try_from(vec![0u8, 0, 0, 1, 0, 0, 0, 2, 0, 0].as_slice()).unwrap_err();
991 FsCacheMsgHeader::try_from(vec![].as_slice()).unwrap_err();
992 }
993
994 #[test]
995 fn test_fs_cache_msg_open_try_from() {
996 assert!(FsCacheMsgOpen::try_from(
998 vec![1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0].as_slice()
999 )
1000 .is_err());
1001
1002 assert!(FsCacheMsgOpen::try_from(
1004 vec![255u8, 127, 127, 127, 255, 127, 127, 255, 17, 0, 0, 0, 2u8, 0, 0, 0, 4u8, 0, 0, 0]
1005 .as_slice()
1006 )
1007 .is_err());
1008 assert!(FsCacheMsgOpen::try_from(
1009 vec![
1010 255u8, 127, 127, 127, 241u8, 127, 128, 128, 17, 0, 0, 0, 2u8, 0, 0, 0, 4u8, 0, 0,
1011 0,
1012 ]
1013 .as_slice()
1014 )
1015 .is_err());
1016
1017 assert!(FsCacheMsgOpen::try_from(
1019 vec![1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0, 0, 0].as_slice()
1020 )
1021 .is_err());
1022
1023 let res = FsCacheMsgOpen::try_from(
1024 vec![
1025 1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0, 0, 4u8, 0, 0, 0,
1026 ]
1027 .as_slice(),
1028 );
1029 assert!(res.is_ok());
1030 assert_eq!(
1031 res.unwrap(),
1032 FsCacheMsgOpen {
1033 volume_key: String::from("\u{4}"),
1034 cookie_key: String::from("\0\0"),
1035 fd: 17,
1036 flags: 2
1037 }
1038 );
1039 }
1040
1041 #[test]
1042 fn test_fs_cache_msg_read_try_from() {
1043 assert!(FsCacheMsgRead::try_from(
1044 vec![1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0].as_slice()
1045 )
1046 .is_err());
1047
1048 let res = FsCacheMsgRead::try_from(
1049 vec![1u8, 0, 0, 0, 2, 0, 0, 0, 17, 0, 0, 0, 2u8, 0, 0, 0].as_slice(),
1050 );
1051 assert!(res.is_ok());
1052 assert_eq!(
1053 res.unwrap(),
1054 FsCacheMsgRead {
1055 off: 8589934593,
1056 len: 8589934609,
1057 }
1058 );
1059 }
1060}