use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::io::Result as IOResult;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use lazy_static::lazy_static;
use nydus_api::{BackendConfigV2, ConfigV2};
use tokio::runtime::{Builder, Runtime};
use tokio::time;
#[cfg(feature = "backend-localfs")]
use crate::backend::localfs;
#[cfg(feature = "backend-oss")]
use crate::backend::oss;
#[cfg(feature = "backend-registry")]
use crate::backend::registry;
#[cfg(feature = "backend-s3")]
use crate::backend::s3;
use crate::backend::BlobBackend;
use crate::cache::{BlobCache, BlobCacheMgr, DummyCacheMgr, FileCacheMgr};
use crate::device::BlobInfo;
lazy_static! {
pub static ref ASYNC_RUNTIME: Arc<Runtime> = {
let runtime = Builder::new_multi_thread()
.worker_threads(1) .thread_keep_alive(Duration::from_secs(10))
.max_blocking_threads(8)
.thread_name("cache-flusher")
.enable_all()
.build();
match runtime {
Ok(v) => Arc::new(v),
Err(e) => panic!("failed to create tokio async runtime, {}", e),
}
};
}
#[derive(Eq, PartialEq)]
struct BlobCacheMgrKey {
config: Arc<ConfigV2>,
}
#[allow(clippy::derive_hash_xor_eq)]
impl Hash for BlobCacheMgrKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.config.id.hash(state);
if let Some(backend) = self.config.backend.as_ref() {
backend.backend_type.hash(state);
}
if let Some(cache) = self.config.cache.as_ref() {
cache.cache_type.hash(state);
cache.prefetch.hash(state);
}
}
}
lazy_static::lazy_static! {
pub static ref BLOB_FACTORY: BlobFactory = BlobFactory::new();
}
pub struct BlobFactory {
mgrs: Mutex<HashMap<BlobCacheMgrKey, Arc<dyn BlobCacheMgr>>>,
mgr_checker_active: AtomicBool,
}
impl BlobFactory {
pub fn new() -> Self {
BlobFactory {
mgrs: Mutex::new(HashMap::new()),
mgr_checker_active: AtomicBool::new(false),
}
}
pub fn start_mgr_checker(&self) {
if self
.mgr_checker_active
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
return;
}
ASYNC_RUNTIME.spawn(async {
let mut interval = time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
BLOB_FACTORY.check_cache_stat();
}
});
}
pub fn new_blob_cache(
&self,
config: &Arc<ConfigV2>,
blob_info: &Arc<BlobInfo>,
) -> IOResult<Arc<dyn BlobCache>> {
let backend_cfg = config.get_backend_config()?;
let cache_cfg = config.get_cache_config()?;
let key = BlobCacheMgrKey {
config: config.clone(),
};
let mut guard = self.mgrs.lock().unwrap();
if let Some(mgr) = guard.get(&key) {
return mgr.get_blob_cache(blob_info);
}
let backend = Self::new_backend(backend_cfg, &blob_info.blob_id())?;
let mgr = match cache_cfg.cache_type.as_str() {
"blobcache" | "filecache" => {
let mgr = FileCacheMgr::new(cache_cfg, backend, ASYNC_RUNTIME.clone(), &config.id)?;
mgr.init()?;
Arc::new(mgr) as Arc<dyn BlobCacheMgr>
}
#[cfg(target_os = "linux")]
"fscache" => {
let mgr = crate::cache::FsCacheMgr::new(
cache_cfg,
backend,
ASYNC_RUNTIME.clone(),
&config.id,
)?;
mgr.init()?;
Arc::new(mgr) as Arc<dyn BlobCacheMgr>
}
_ => {
let mgr = DummyCacheMgr::new(cache_cfg, backend, false)?;
mgr.init()?;
Arc::new(mgr) as Arc<dyn BlobCacheMgr>
}
};
let mgr = guard.entry(key).or_insert_with(|| mgr);
mgr.get_blob_cache(blob_info)
}
pub fn gc(&self, victim: Option<(&Arc<ConfigV2>, &str)>) {
let mut mgrs = Vec::new();
if let Some((config, id)) = victim {
let key = BlobCacheMgrKey {
config: config.clone(),
};
let mgr = self.mgrs.lock().unwrap().get(&key).cloned();
if let Some(mgr) = mgr {
if mgr.gc(Some(id)) {
mgrs.push((key, mgr.clone()));
}
}
} else {
for (key, mgr) in self.mgrs.lock().unwrap().iter() {
if mgr.gc(None) {
mgrs.push((
BlobCacheMgrKey {
config: key.config.clone(),
},
mgr.clone(),
));
}
}
}
for (key, mgr) in mgrs {
let mut guard = self.mgrs.lock().unwrap();
if mgr.gc(None) {
guard.remove(&key);
}
}
}
#[allow(unused_variables)]
pub fn new_backend(
config: &BackendConfigV2,
blob_id: &str,
) -> IOResult<Arc<dyn BlobBackend + Send + Sync>> {
match config.backend_type.as_str() {
#[cfg(feature = "backend-oss")]
"oss" => Ok(Arc::new(oss::Oss::new(
config.get_oss_config()?,
Some(blob_id),
)?)),
#[cfg(feature = "backend-s3")]
"s3" => Ok(Arc::new(s3::S3::new(
config.get_s3_config()?,
Some(blob_id),
)?)),
#[cfg(feature = "backend-registry")]
"registry" => Ok(Arc::new(registry::Registry::new(
config.get_registry_config()?,
Some(blob_id),
)?)),
#[cfg(feature = "backend-localfs")]
"localfs" => Ok(Arc::new(localfs::LocalFs::new(
config.get_localfs_config()?,
Some(blob_id),
)?)),
_ => Err(einval!(format!(
"unsupported backend type '{}'",
config.backend_type
))),
}
}
fn check_cache_stat(&self) {
let mgrs = self.mgrs.lock().unwrap();
for (_key, mgr) in mgrs.iter() {
mgr.check_stat();
}
}
}
impl Default for BlobFactory {
fn default() -> Self {
Self::new()
}
}