use std::collections::HashMap;
use std::ops::Deref;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, MutexGuard};
use fuse_backend_rs::api::{BackendFileSystem, Vfs};
#[cfg(target_os = "linux")]
use fuse_backend_rs::passthrough::{Config, PassthroughFs};
use nydus::{FsBackendDesc, FsBackendType};
use rafs::fs::{Rafs, RafsConfig};
use rafs::{trim_backend_config, RafsError, RafsIoRead};
use serde::{self, Deserialize, Serialize};
use crate::daemon::DaemonResult;
use crate::upgrade::{self, UpgradeManager};
use crate::DaemonError;
type BackFileSystem = Box<dyn BackendFileSystem<Inode = u64, Handle = u64> + Send + Sync>;
#[derive(Clone)]
pub struct FsBackendMountCmd {
pub fs_type: FsBackendType,
pub source: String,
pub config: String,
pub mountpoint: String,
pub prefetch_files: Option<Vec<String>>,
}
#[derive(Clone, Deserialize, Serialize, Debug)]
pub struct FsBackendUmountCmd {
pub mountpoint: String,
}
#[derive(Default, Serialize, Clone)]
pub struct FsBackendCollection(HashMap<String, FsBackendDesc>);
impl FsBackendCollection {
pub fn add(&mut self, id: &str, cmd: &FsBackendMountCmd) -> DaemonResult<()> {
let fs_config = match cmd.fs_type {
FsBackendType::Rafs => {
let mut config: serde_json::Value =
serde_json::from_str(&cmd.config).map_err(DaemonError::Serde)?;
trim_backend_config!(
config,
"access_key_id",
"access_key_secret",
"auth",
"token"
);
Some(config)
}
FsBackendType::PassthroughFs => {
None
}
};
let desc = FsBackendDesc {
backend_type: cmd.fs_type.clone(),
mountpoint: cmd.mountpoint.clone(),
mounted_time: chrono::Local::now(),
config: fs_config,
};
self.0.insert(id.to_string(), desc);
Ok(())
}
pub fn del(&mut self, id: &str) {
self.0.remove(id);
}
}
pub trait FsService: Send + Sync {
fn get_vfs(&self) -> &Vfs;
fn upgrade_mgr(&self) -> Option<MutexGuard<UpgradeManager>>;
fn backend_collection(&self) -> MutexGuard<FsBackendCollection>;
fn mount(&self, cmd: FsBackendMountCmd) -> DaemonResult<()> {
if self.backend_from_mountpoint(&cmd.mountpoint)?.is_some() {
return Err(DaemonError::AlreadyExists);
}
let backend = fs_backend_factory(&cmd)?;
let index = self.get_vfs().mount(backend, &cmd.mountpoint)?;
info!("{} mounted at {}", &cmd.fs_type, &cmd.mountpoint);
self.backend_collection().add(&cmd.mountpoint, &cmd)?;
if let Some(mut mgr_guard) = self.upgrade_mgr() {
upgrade::add_mounts_state(&mut mgr_guard, cmd, index)?;
}
Ok(())
}
fn remount(&self, cmd: FsBackendMountCmd) -> DaemonResult<()> {
let rootfs = self
.backend_from_mountpoint(&cmd.mountpoint)?
.ok_or(DaemonError::NotFound)?;
let rafs_config = RafsConfig::from_str(&cmd.config)?;
let mut bootstrap = <dyn RafsIoRead>::from_file(&&cmd.source)?;
let any_fs = rootfs.deref().as_any();
let rafs = any_fs
.downcast_ref::<Rafs>()
.ok_or_else(|| DaemonError::FsTypeMismatch("to rafs".to_string()))?;
rafs.update(&mut bootstrap, rafs_config)
.map_err(|e| match e {
RafsError::Unsupported => DaemonError::Unsupported,
e => DaemonError::Rafs(e),
})?;
self.backend_collection().add(&cmd.mountpoint, &cmd)?;
if let Some(mut mgr_guard) = self.upgrade_mgr() {
upgrade::update_mounts_state(&mut mgr_guard, cmd)?;
}
Ok(())
}
fn umount(&self, cmd: FsBackendUmountCmd) -> DaemonResult<()> {
let _ = self
.backend_from_mountpoint(&cmd.mountpoint)?
.ok_or(DaemonError::NotFound)?;
self.get_vfs().umount(&cmd.mountpoint)?;
self.backend_collection().del(&cmd.mountpoint);
if let Some(mut mgr_guard) = self.upgrade_mgr() {
upgrade::remove_mounts_state(&mut mgr_guard, cmd)?;
}
Ok(())
}
fn backend_from_mountpoint(&self, mp: &str) -> DaemonResult<Option<Arc<BackFileSystem>>> {
self.get_vfs().get_rootfs(mp).map_err(|e| e.into())
}
fn export_backend_info(&self, mountpoint: &str) -> DaemonResult<String> {
let fs = self
.backend_from_mountpoint(mountpoint)?
.ok_or(DaemonError::NotFound)?;
let any_fs = fs.deref().as_any();
let rafs = any_fs
.downcast_ref::<Rafs>()
.ok_or_else(|| DaemonError::FsTypeMismatch("to rafs".to_string()))?;
let resp = serde_json::to_string(rafs.metadata()).map_err(DaemonError::Serde)?;
Ok(resp)
}
fn export_inflight_ops(&self) -> DaemonResult<Option<String>>;
}
fn validate_prefetch_file_list(input: &Option<Vec<String>>) -> DaemonResult<Option<Vec<PathBuf>>> {
if let Some(list) = input {
let list: Vec<PathBuf> = list.iter().map(PathBuf::from).collect();
for elem in list.iter() {
if !elem.is_absolute() {
return Err(DaemonError::Common("Illegal prefetch list".to_string()));
}
}
Ok(Some(list))
} else {
Ok(None)
}
}
fn fs_backend_factory(cmd: &FsBackendMountCmd) -> DaemonResult<BackFileSystem> {
let prefetch_files = validate_prefetch_file_list(&cmd.prefetch_files)?;
match cmd.fs_type {
FsBackendType::Rafs => {
let rafs_config = RafsConfig::from_str(cmd.config.as_str())?;
let mut bootstrap = <dyn RafsIoRead>::from_file(&cmd.source)?;
let mut rafs = Rafs::new(rafs_config, &cmd.mountpoint, &mut bootstrap)?;
rafs.import(bootstrap, prefetch_files)?;
info!("Rafs imported");
Ok(Box::new(rafs))
}
FsBackendType::PassthroughFs => {
#[cfg(target_os = "macos")]
return Err(DaemonError::InvalidArguments(String::from(
"not support passthroughfs",
)));
#[cfg(target_os = "linux")]
{
let fs_cfg = Config {
root_dir: cmd.source.to_string(),
do_import: false,
writeback: true,
no_open: true,
xattr: true,
..Default::default()
};
let passthrough_fs =
PassthroughFs::new(fs_cfg).map_err(DaemonError::PassthroughFs)?;
passthrough_fs
.import()
.map_err(DaemonError::PassthroughFs)?;
info!("PassthroughFs imported");
Ok(Box::new(passthrough_fs))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_should_add_new_backend() {
let mut col: FsBackendCollection = Default::default();
let r = col.add(
"test",
&FsBackendMountCmd {
fs_type: FsBackendType::Rafs,
config: "{\"config\": \"test\"}".to_string(),
mountpoint: "testmonutount".to_string(),
source: "testsource".to_string(),
prefetch_files: Some(vec!["testfile".to_string()]),
},
);
assert!(r.is_ok(), "failed to add backend collection");
assert_eq!(col.0.len(), 1);
col.del("test");
assert_eq!(col.0.len(), 0);
}
#[test]
fn it_should_verify_prefetch_files() {
let files = validate_prefetch_file_list(&Some(vec!["/etc/passwd".to_string()]));
assert!(files.is_ok(), "failed to verify prefetch files");
assert_eq!(1, files.unwrap().unwrap().len());
assert!(
validate_prefetch_file_list(&Some(vec!["etc/passwd".to_string()])).is_err(),
"should not pass verify"
);
}
#[test]
fn it_should_create_rafs_backend() {
let config = r#"
{
"device": {
"backend": {
"type": "oss",
"config": {
"endpoint": "test",
"access_key_id": "test",
"access_key_secret": "test",
"bucket_name": "antsys-nydus",
"object_prefix":"nydus_v2/",
"scheme": "http"
}
}
},
"mode": "direct",
"digest_validate": false,
"enable_xattr": true,
"fs_prefetch": {
"enable": true,
"threads_count": 10,
"merging_size": 131072,
"bandwidth_rate": 10485760
}
}"#;
let bootstrap = "./tests/texture/bootstrap/nydusd_daemon_test_bootstrap";
if fs_backend_factory(&FsBackendMountCmd {
fs_type: FsBackendType::Rafs,
config: config.to_string(),
mountpoint: "testmountpoint".to_string(),
source: bootstrap.to_string(),
prefetch_files: Some(vec!["/testfile".to_string()]),
})
.unwrap()
.as_any()
.downcast_ref::<Rafs>()
.is_none()
{
panic!("failed to create rafs backend")
}
}
}