use std::collections::HashMap;
use std::io::{Error, ErrorKind, Result};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use nydus_api::http::{
BlobCacheEntry, BlobCacheList, BlobCacheObjectId, FsCacheConfig, BLOB_CACHE_TYPE_BOOTSTRAP,
};
use rafs::metadata::{RafsMode, RafsSuper};
use storage::device::BlobInfo;
use storage::factory::{BackendConfig, CacheConfig, FactoryConfig};
const ID_SPLITTER: &str = "/";
pub fn generate_blob_key(domain_id: &str, blob_id: &str) -> String {
if domain_id.is_empty() {
blob_id.to_string()
} else {
format!("{}{}{}", domain_id, ID_SPLITTER, blob_id)
}
}
pub struct BlobCacheConfigBootstrap {
blob_id: String,
scoped_blob_id: String,
path: PathBuf,
factory_config: Arc<FactoryConfig>,
data_blobs: Mutex<Vec<Arc<BlobCacheConfigDataBlob>>>,
}
impl BlobCacheConfigBootstrap {
pub fn path(&self) -> &Path {
&self.path
}
fn add_data_blob(&self, blob: Arc<BlobCacheConfigDataBlob>) {
self.data_blobs.lock().unwrap().push(blob);
}
}
pub struct BlobCacheConfigDataBlob {
blob_info: Arc<BlobInfo>,
scoped_blob_id: String,
factory_config: Arc<FactoryConfig>,
ref_count: AtomicU32,
}
impl BlobCacheConfigDataBlob {
pub fn blob_info(&self) -> &Arc<BlobInfo> {
&self.blob_info
}
pub fn factory_config(&self) -> &Arc<FactoryConfig> {
&self.factory_config
}
}
#[derive(Clone)]
pub enum BlobCacheObjectConfig {
Bootstrap(Arc<BlobCacheConfigBootstrap>),
DataBlob(Arc<BlobCacheConfigDataBlob>),
}
impl BlobCacheObjectConfig {
fn new_data_blob(
domain_id: String,
blob_info: Arc<BlobInfo>,
factory_config: Arc<FactoryConfig>,
) -> Self {
let scoped_blob_id = generate_blob_key(&domain_id, blob_info.blob_id());
BlobCacheObjectConfig::DataBlob(Arc::new(BlobCacheConfigDataBlob {
blob_info,
scoped_blob_id,
factory_config,
ref_count: AtomicU32::new(1),
}))
}
fn new_bootstrap_blob(
domain_id: String,
blob_id: String,
path: PathBuf,
factory_config: Arc<FactoryConfig>,
) -> Self {
let scoped_blob_id = generate_blob_key(&domain_id, &blob_id);
BlobCacheObjectConfig::Bootstrap(Arc::new(BlobCacheConfigBootstrap {
blob_id,
scoped_blob_id,
path,
factory_config,
data_blobs: Mutex::new(Vec::new()),
}))
}
fn get_key(&self) -> &str {
match self {
BlobCacheObjectConfig::Bootstrap(o) => &o.scoped_blob_id,
BlobCacheObjectConfig::DataBlob(o) => &o.scoped_blob_id,
}
}
fn bootstrap_config(&self) -> Option<Arc<BlobCacheConfigBootstrap>> {
match self {
BlobCacheObjectConfig::Bootstrap(o) => Some(o.clone()),
BlobCacheObjectConfig::DataBlob(_o) => None,
}
}
}
#[derive(Default)]
struct BlobCacheState {
id_to_config_map: HashMap<String, BlobCacheObjectConfig>,
}
impl BlobCacheState {
fn new() -> Self {
Self {
id_to_config_map: HashMap::new(),
}
}
fn try_add(&mut self, config: BlobCacheObjectConfig) -> Result<()> {
let key = config.get_key();
if let Some(entry) = self.id_to_config_map.get(key) {
match entry {
BlobCacheObjectConfig::Bootstrap(_o) => {
return Err(Error::new(
ErrorKind::AlreadyExists,
"blob_cache: bootstrap blob already exists",
));
}
BlobCacheObjectConfig::DataBlob(o) => {
o.ref_count.fetch_add(1, Ordering::AcqRel);
}
}
} else {
self.id_to_config_map.insert(key.to_owned(), config);
}
Ok(())
}
fn remove(&mut self, param: &BlobCacheObjectId) -> Result<()> {
if param.blob_id.is_empty() && !param.domain_id.is_empty() {
let scoped_blob_prefix = format!("{}{}", param.domain_id, ID_SPLITTER);
self.id_to_config_map.retain(|_k, v| match v {
BlobCacheObjectConfig::Bootstrap(o) => {
!o.scoped_blob_id.starts_with(&scoped_blob_prefix)
}
BlobCacheObjectConfig::DataBlob(o) => {
!o.scoped_blob_id.starts_with(&scoped_blob_prefix)
}
});
} else {
let mut data_blobs = Vec::new();
let mut is_bootstrap = false;
let scoped_blob_prefix = generate_blob_key(¶m.domain_id, ¶m.blob_id);
match self.id_to_config_map.get(&scoped_blob_prefix) {
None => return Err(enoent!("blob_cache: cache entry not found")),
Some(BlobCacheObjectConfig::Bootstrap(o)) => {
is_bootstrap = true;
data_blobs = o.data_blobs.lock().unwrap().clone();
}
Some(BlobCacheObjectConfig::DataBlob(o)) => {
data_blobs.push(o.clone());
}
}
for entry in data_blobs {
if entry.ref_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.id_to_config_map.remove(&entry.scoped_blob_id);
}
}
if is_bootstrap {
self.id_to_config_map.remove(&scoped_blob_prefix);
}
}
Ok(())
}
fn get(&self, key: &str) -> Option<BlobCacheObjectConfig> {
self.id_to_config_map.get(key).cloned()
}
}
#[derive(Default)]
pub struct BlobCacheMgr {
state: Mutex<BlobCacheState>,
}
impl BlobCacheMgr {
pub fn new() -> Self {
BlobCacheMgr {
state: Mutex::new(BlobCacheState::new()),
}
}
pub fn add_blob_entry(&self, entry: &BlobCacheEntry) -> Result<()> {
if entry.blob_type == BLOB_CACHE_TYPE_BOOTSTRAP {
let (path, factory_config) = self.get_bootstrap_info(entry)?;
self.add_bootstrap_object(&entry.domain_id, &entry.blob_id, path, factory_config)
.map_err(|e| {
warn!(
"blob_cache: failed to add cache entry for bootstrap blob: {:?}",
entry
);
e
})
} else {
warn!("blob_cache: invalid blob cache entry: {:?}", entry);
Err(einval!("blob_cache: invalid blob cache entry"))
}
}
pub fn add_blob_list(&self, blobs: &BlobCacheList) -> Result<()> {
for entry in blobs.blobs.iter() {
self.add_blob_entry(entry)?;
}
Ok(())
}
pub fn remove_blob_entry(&self, param: &BlobCacheObjectId) -> Result<()> {
self.get_state().remove(param)
}
pub fn get_config(&self, key: &str) -> Option<BlobCacheObjectConfig> {
self.get_state().get(key)
}
#[inline]
fn get_state(&self) -> MutexGuard<BlobCacheState> {
self.state.lock().unwrap()
}
fn get_bootstrap_info(&self, entry: &BlobCacheEntry) -> Result<(PathBuf, Arc<FactoryConfig>)> {
let config = &entry.blob_config;
if config.cache_type != "fscache" {
return Err(einval!(
"blob_cache: `config.cache_type` for bootstrap blob is invalid"
));
}
let cache_config = serde_json::from_value::<FsCacheConfig>(config.cache_config.clone())
.map_err(|_e| {
einval!("blob_cache: `config.cache_config` for bootstrap blob is invalid")
})?;
if entry.blob_id.contains(ID_SPLITTER) {
return Err(einval!(
"blob_cache: `blob_id` for bootstrap blob is invalid"
));
} else if entry.domain_id.contains(ID_SPLITTER) {
return Err(einval!(
"blob_cache: `domain_id` for bootstrap blob is invalid"
));
}
let path = config.metadata_path.clone().unwrap_or_default();
if path.is_empty() {
return Err(einval!(
"blob_cache: `config.metadata_path` for bootstrap blob is empty"
));
}
let path = Path::new(&path).canonicalize().map_err(|_e| {
einval!("blob_cache: `config.metadata_path` for bootstrap blob is invalid")
})?;
if !path.is_file() {
return Err(einval!(
"blob_cache: `config.metadata_path` for bootstrap blob is not a file"
));
}
let path2 = Path::new(&cache_config.work_dir);
let path2 = path2
.canonicalize()
.map_err(|_e| eio!("blob_cache: `config.cache_config.work_dir` is invalid"))?;
if !path2.is_dir() {
return Err(einval!(
"blob_cache: `config.cache_config.work_dir` is not a directory"
));
}
let prefetch_config = if entry.fs_prefetch.is_some() {
entry.fs_prefetch.clone().unwrap()
} else {
entry.blob_config.prefetch_config.clone()
};
let factory_config = Arc::new(FactoryConfig {
id: entry.blob_config.id.clone(),
backend: BackendConfig {
backend_type: entry.blob_config.backend_type.clone(),
backend_config: entry.blob_config.backend_config.clone(),
},
cache: CacheConfig {
cache_type: entry.blob_config.cache_type.clone(),
cache_compressed: false,
cache_config: entry.blob_config.cache_config.clone(),
cache_validate: false,
prefetch_config,
},
});
Ok((path, factory_config))
}
fn add_bootstrap_object(
&self,
domain_id: &str,
id: &str,
path: PathBuf,
factory_config: Arc<FactoryConfig>,
) -> Result<()> {
let rs = RafsSuper::load_from_metadata(&path, RafsMode::Direct, true)?;
let bootstrap = BlobCacheObjectConfig::new_bootstrap_blob(
domain_id.to_string(),
id.to_string(),
path,
factory_config.clone(),
);
let mut state = self.get_state();
state.try_add(bootstrap.clone())?;
let bs_obj = bootstrap.bootstrap_config().unwrap();
for bi in rs.superblock.get_blob_infos() {
debug!(
"blob_cache: add data blob {} to domain {}",
&bi.blob_id(),
domain_id
);
let data_blob = BlobCacheObjectConfig::new_data_blob(
domain_id.to_string(),
bi,
factory_config.clone(),
);
let data_blob_config = match &data_blob {
BlobCacheObjectConfig::DataBlob(entry) => entry.clone(),
_ => panic!("blob_cache: internal error"),
};
if let Err(e) = state.try_add(data_blob) {
let id = BlobCacheObjectId {
domain_id: domain_id.to_string(),
blob_id: id.to_string(),
};
let _ = state.remove(&id);
return Err(e);
}
bs_obj.add_data_blob(data_blob_config);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use nydus_api::http::BlobCacheEntryConfig;
use vmm_sys_util::tempdir::TempDir;
fn create_factory_config() -> String {
let config = r#"
{
"type": "bootstrap",
"id": "bootstrap1",
"domain_id": "userid1",
"config": {
"id": "factory1",
"backend_type": "localfs",
"backend_config": {
"dir": "/tmp/nydus"
},
"cache_type": "fscache",
"cache_config": {
"work_dir": "/tmp/nydus"
},
"metadata_path": "/tmp/nydus/bootstrap1"
}
}"#;
config.to_string()
}
#[test]
fn test_blob_cache_entry() {
let tmpdir = TempDir::new().unwrap();
let path = tmpdir.as_path().join("bootstrap1");
std::fs::write(&path, "metadata").unwrap();
let config = create_factory_config();
let content = config.replace("/tmp/nydus", tmpdir.as_path().to_str().unwrap());
let entry: BlobCacheEntry = serde_json::from_str(&content).unwrap();
assert_eq!(&entry.blob_type, "bootstrap");
assert_eq!(&entry.blob_id, "bootstrap1");
assert_eq!(&entry.domain_id, "userid1");
assert_eq!(&entry.blob_config.id, "factory1");
assert_eq!(&entry.blob_config.backend_type, "localfs");
assert_eq!(&entry.blob_config.cache_type, "fscache");
assert!(entry.blob_config.metadata_path.is_some());
assert!(entry.blob_config.backend_config.is_object());
assert!(entry.blob_config.cache_config.is_object());
let mgr = BlobCacheMgr::new();
let (path, factory_config) = mgr.get_bootstrap_info(&entry).unwrap();
assert_eq!(path, tmpdir.as_path().join("bootstrap1"));
assert_eq!(&factory_config.id, "factory1");
assert_eq!(&factory_config.backend.backend_type, "localfs");
assert_eq!(&factory_config.cache.cache_type, "fscache");
let blob = BlobCacheConfigBootstrap {
blob_id: "123456789-123".to_string(),
scoped_blob_id: "domain1".to_string(),
path: path.clone(),
factory_config,
data_blobs: Mutex::new(Vec::new()),
};
assert_eq!(blob.path(), &path);
}
#[test]
fn test_invalid_blob_id() {
let tmpdir = TempDir::new().unwrap();
let path = tmpdir.as_path().join("bootstrap1");
std::fs::write(&path, "metadata").unwrap();
let config = create_factory_config();
let content = config.replace("/tmp/nydus", tmpdir.as_path().to_str().unwrap());
let mut entry: BlobCacheEntry = serde_json::from_str(&content).unwrap();
let mgr = BlobCacheMgr::new();
entry.blob_id = "domain2/blob1".to_string();
mgr.get_bootstrap_info(&entry).unwrap_err();
}
#[test]
fn test_blob_cache_list() {
let config = r#"
{
"blobs" : [
{
"type": "bootstrap",
"id": "bootstrap1",
"domain_id": "userid1",
"config": {
"id": "factory1",
"backend_type": "localfs",
"backend_config": {
"dir": "/tmp/nydus"
},
"cache_type": "fscache",
"cache_config": {
"work_dir": "/tmp/nydus"
},
"metadata_path": "/tmp/nydus/bootstrap1"
}
},
{
"type": "bootstrap",
"id": "bootstrap2",
"domain_id": "userid2",
"config": {
"id": "factory1",
"backend_type": "localfs",
"backend_config": {
"dir": "/tmp/nydus"
},
"cache_type": "fscache",
"cache_config": {
"work_dir": "/tmp/nydus"
},
"metadata_path": "/tmp/nydus/bootstrap2"
}
}
]
}"#;
let list: BlobCacheList = serde_json::from_str(config).unwrap();
assert_eq!(list.blobs.len(), 2);
assert_eq!(&list.blobs[0].blob_type, "bootstrap");
assert_eq!(&list.blobs[0].blob_id, "bootstrap1");
assert_eq!(&list.blobs[0].blob_config.id, "factory1");
assert_eq!(&list.blobs[0].blob_config.backend_type, "localfs");
assert_eq!(&list.blobs[0].blob_config.cache_type, "fscache");
assert_eq!(&list.blobs[1].blob_type, "bootstrap");
assert_eq!(&list.blobs[1].blob_id, "bootstrap2");
}
#[test]
fn test_add_bootstrap() {
let tmpdir = TempDir::new().unwrap();
let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR");
let mut source_path = PathBuf::from(root_dir);
source_path.push("tests/texture/bootstrap/image_v2.boot");
let path = source_path.to_str().unwrap();
let config = create_factory_config();
let content = config.replace("/tmp/nydus", tmpdir.as_path().to_str().unwrap());
let entry: BlobCacheEntry = serde_json::from_str(&content).unwrap();
let blob_config = BlobCacheEntryConfig {
id: "factory1".to_string(),
backend_type: "localfs".to_string(),
backend_config: entry.blob_config.backend_config,
cache_type: "fscache".to_string(),
cache_config: entry.blob_config.cache_config,
prefetch_config: Default::default(),
metadata_path: Some(path.to_string()),
};
let mut entry = BlobCacheEntry {
blob_type: BLOB_CACHE_TYPE_BOOTSTRAP.to_string(),
blob_id: "image_v2".to_string(),
blob_config,
domain_id: "domain2".to_string(),
fs_prefetch: Default::default(),
};
let mgr = BlobCacheMgr::new();
mgr.add_blob_entry(&entry).unwrap();
let blob_id = generate_blob_key(&entry.domain_id, &entry.blob_id);
assert!(mgr.get_config(&blob_id).is_some());
let key = generate_blob_key(
&entry.domain_id,
"7fe907a0c9c7f35538f23f40baae5f2e8d148a3a6186f0f443f62d04b5e2d731",
);
assert!(mgr.get_config(&key).is_some());
assert_eq!(mgr.get_state().id_to_config_map.len(), 19);
entry.blob_id = "image_v2_cloned".to_string();
let blob_id_cloned = generate_blob_key(&entry.domain_id, &entry.blob_id);
mgr.add_blob_entry(&entry).unwrap();
assert_eq!(mgr.get_state().id_to_config_map.len(), 20);
assert!(mgr.get_config(&blob_id).is_some());
assert!(mgr.get_config(&blob_id_cloned).is_some());
mgr.remove_blob_entry(&BlobCacheObjectId {
domain_id: entry.domain_id.clone(),
blob_id: "image_v2".to_string(),
})
.unwrap();
assert_eq!(mgr.get_state().id_to_config_map.len(), 19);
assert!(mgr.get_config(&blob_id).is_none());
assert!(mgr.get_config(&blob_id_cloned).is_some());
mgr.remove_blob_entry(&BlobCacheObjectId {
domain_id: entry.domain_id,
blob_id: "image_v2_cloned".to_string(),
})
.unwrap();
assert_eq!(mgr.get_state().id_to_config_map.len(), 0);
assert!(mgr.get_config(&blob_id).is_none());
assert!(mgr.get_config(&blob_id_cloned).is_none());
}
}