use std::io::Result;
use std::str::FromStr;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread::JoinHandle;
use mio::Waker;
use nix::sys::signal::{kill, SIGTERM};
use nix::unistd::Pid;
use nydus::daemon::NydusDaemon;
use nydus::{FsBackendMountCmd, FsBackendType, FsBackendUmountCmd, FsService};
use nydus_api::{
start_http_thread, ApiError, ApiMountCmd, ApiRequest, ApiResponse, ApiResponsePayload,
ApiResult, BlobCacheEntry, BlobCacheObjectId, DaemonConf, DaemonErrorKind, MetricsErrorKind,
};
use nydus_utils::metrics;
use crate::DAEMON_CONTROLLER;
struct ApiServer {
to_http: Sender<ApiResponse>,
}
impl ApiServer {
fn new(to_http: Sender<ApiResponse>) -> Result<Self> {
Ok(ApiServer { to_http })
}
fn process_request(&self, request: ApiRequest) -> Result<()> {
let resp = match request {
ApiRequest::ConfigureDaemon(conf) => self.configure_daemon(conf),
ApiRequest::GetDaemonInfo => self.daemon_info(true),
ApiRequest::GetEvents => Self::events(),
ApiRequest::Exit => self.do_exit(),
ApiRequest::Start => self.do_start(),
ApiRequest::SendFuseFd => self.send_fuse_fd(),
ApiRequest::TakeoverFuseFd => self.do_takeover(),
ApiRequest::Mount(mountpoint, info) => self.do_mount(mountpoint, info),
ApiRequest::Remount(mountpoint, info) => self.do_remount(mountpoint, info),
ApiRequest::Umount(mountpoint) => self.do_umount(mountpoint),
ApiRequest::ExportBackendMetrics(id) => Self::export_backend_metrics(id),
ApiRequest::ExportBlobcacheMetrics(id) => Self::export_blobcache_metrics(id),
ApiRequest::ExportFsGlobalMetrics(id) => Self::export_global_metrics(id),
ApiRequest::ExportFsFilesMetrics(id, latest_read_files) => {
Self::export_files_metrics(id, latest_read_files)
}
ApiRequest::ExportFsAccessPatterns(id) => Self::export_access_patterns(id),
ApiRequest::ExportFsBackendInfo(mountpoint) => self.backend_info(&mountpoint),
ApiRequest::ExportFsInflightMetrics => self.export_inflight_metrics(),
ApiRequest::GetDaemonInfoV2 => self.daemon_info(false),
ApiRequest::GetBlobObject(_param) => todo!(),
ApiRequest::CreateBlobObject(entry) => self.create_blob_cache_entry(&entry),
ApiRequest::DeleteBlobObject(param) => self.remove_blob_cache_entry(¶m),
ApiRequest::DeleteBlobFile(blob_id) => self.blob_cache_gc(blob_id),
};
self.respond(resp);
Ok(())
}
fn respond(&self, resp: ApiResult<ApiResponsePayload>) {
if let Err(e) = self.to_http.send(resp) {
error!("send API response failed {}", e);
}
}
fn configure_daemon(&self, conf: DaemonConf) -> ApiResponse {
conf.log_level
.parse::<log::LevelFilter>()
.map_err(|e| {
error!("Invalid log level passed, {}", e);
ApiError::ResponsePayloadType
})
.map(|v| {
log::set_max_level(v);
ApiResponsePayload::Empty
})
}
fn daemon_info(&self, include_fs_info: bool) -> ApiResponse {
self.get_daemon_object()?
.export_info(include_fs_info)
.map_err(|e| ApiError::Metrics(MetricsErrorKind::Daemon(e.into())))
.map(ApiResponsePayload::DaemonInfo)
}
fn do_exit(&self) -> ApiResponse {
let d = self.get_daemon_object()?;
d.trigger_exit()
.map(|_| {
info!("exit daemon by http request");
ApiResponsePayload::Empty
})
.map_err(|e| ApiError::DaemonAbnormal(e.into()))?;
kill(Pid::this(), SIGTERM).unwrap_or_else(|e| error!("Send signal error. {}", e));
Ok(ApiResponsePayload::Empty)
}
fn do_takeover(&self) -> ApiResponse {
let d = self.get_daemon_object()?;
d.trigger_takeover()
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::DaemonAbnormal(e.into()))
}
fn events() -> ApiResponse {
let events = metrics::export_events().map_err(|e| ApiError::Events(format!("{:?}", e)))?;
Ok(ApiResponsePayload::Events(events))
}
fn export_global_metrics(id: Option<String>) -> ApiResponse {
metrics::export_global_stats(&id)
.map(ApiResponsePayload::FsGlobalMetrics)
.map_err(|e| ApiError::Metrics(MetricsErrorKind::Stats(e)))
}
fn export_files_metrics(id: Option<String>, latest_read_files: bool) -> ApiResponse {
metrics::export_files_stats(&id, latest_read_files)
.map(ApiResponsePayload::FsFilesMetrics)
.map_err(|e| ApiError::Metrics(MetricsErrorKind::Stats(e)))
}
fn export_access_patterns(id: Option<String>) -> ApiResponse {
metrics::export_files_access_pattern(&id)
.map(ApiResponsePayload::FsFilesPatterns)
.map_err(|e| ApiError::Metrics(MetricsErrorKind::Stats(e)))
}
fn export_backend_metrics(id: Option<String>) -> ApiResponse {
metrics::export_backend_metrics(&id)
.map(ApiResponsePayload::BackendMetrics)
.map_err(|e| ApiError::Metrics(MetricsErrorKind::Stats(e)))
}
fn export_blobcache_metrics(id: Option<String>) -> ApiResponse {
metrics::export_blobcache_metrics(&id)
.map(ApiResponsePayload::BlobcacheMetrics)
.map_err(|e| ApiError::Metrics(MetricsErrorKind::Stats(e)))
}
#[inline]
fn get_daemon_object(&self) -> std::result::Result<Arc<dyn NydusDaemon>, ApiError> {
Ok(DAEMON_CONTROLLER.get_daemon())
}
fn backend_info(&self, mountpoint: &str) -> ApiResponse {
let info = self
.get_default_fs_service()?
.export_backend_info(mountpoint)
.map_err(|e| ApiError::Metrics(MetricsErrorKind::Daemon(e.into())))?;
Ok(ApiResponsePayload::FsBackendInfo(info))
}
fn export_inflight_metrics(&self) -> ApiResponse {
let fs = self.get_default_fs_service()?;
if let Some(ops) = fs
.export_inflight_ops()
.map_err(|e| ApiError::Metrics(MetricsErrorKind::Daemon(e.into())))?
{
Ok(ApiResponsePayload::FsInflightMetrics(ops))
} else {
Ok(ApiResponsePayload::Empty)
}
}
fn do_mount(&self, mountpoint: String, cmd: ApiMountCmd) -> ApiResponse {
let fs_type = FsBackendType::from_str(&cmd.fs_type)
.map_err(|e| ApiError::MountFilesystem(e.into()))?;
let fs = self.get_default_fs_service()?;
fs.mount(FsBackendMountCmd {
fs_type,
mountpoint,
config: cmd.config,
source: cmd.source,
prefetch_files: cmd.prefetch_files,
})
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::MountFilesystem(e.into()))
}
fn do_remount(&self, mountpoint: String, cmd: ApiMountCmd) -> ApiResponse {
let fs_type = FsBackendType::from_str(&cmd.fs_type)
.map_err(|e| ApiError::MountFilesystem(e.into()))?;
self.get_default_fs_service()?
.remount(FsBackendMountCmd {
fs_type,
mountpoint,
config: cmd.config,
source: cmd.source,
prefetch_files: cmd.prefetch_files,
})
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::MountFilesystem(e.into()))
}
fn do_umount(&self, mountpoint: String) -> ApiResponse {
self.get_default_fs_service()?
.umount(FsBackendUmountCmd { mountpoint })
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::MountFilesystem(e.into()))
}
fn send_fuse_fd(&self) -> ApiResponse {
let d = self.get_daemon_object()?;
d.save()
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::DaemonAbnormal(e.into()))
}
fn get_default_fs_service(&self) -> std::result::Result<Arc<dyn FsService>, ApiError> {
DAEMON_CONTROLLER
.get_fs_service()
.ok_or(ApiError::DaemonAbnormal(DaemonErrorKind::Unsupported))
}
fn create_blob_cache_entry(&self, entry: &BlobCacheEntry) -> ApiResponse {
match DAEMON_CONTROLLER.get_blob_cache_mgr() {
None => Err(ApiError::DaemonAbnormal(DaemonErrorKind::Unsupported)),
Some(mgr) => {
if let Err(e) = mgr.add_blob_entry(entry) {
Err(ApiError::DaemonAbnormal(DaemonErrorKind::Other(format!(
"{}",
e
))))
} else {
Ok(ApiResponsePayload::Empty)
}
}
}
}
fn remove_blob_cache_entry(&self, param: &BlobCacheObjectId) -> ApiResponse {
match DAEMON_CONTROLLER.get_blob_cache_mgr() {
None => Err(ApiError::DaemonAbnormal(DaemonErrorKind::Unsupported)),
Some(mgr) => {
if let Err(e) = mgr.remove_blob_entry(param) {
Err(ApiError::DaemonAbnormal(DaemonErrorKind::Other(format!(
"{}",
e
))))
} else {
Ok(ApiResponsePayload::Empty)
}
}
}
}
fn blob_cache_gc(&self, blob_id: String) -> ApiResponse {
self.get_daemon_object()?
.delete_blob(blob_id)
.map_err(|e| ApiError::DaemonAbnormal(e.into()))
.map(|_| ApiResponsePayload::Empty)
}
fn do_start(&self) -> ApiResponse {
let d = self.get_daemon_object()?;
d.trigger_start()
.map(|_| ApiResponsePayload::Empty)
.map_err(|e| ApiError::DaemonAbnormal(e.into()))
}
}
struct ApiServerHandler {
server: ApiServer,
api_receiver: Receiver<Option<ApiRequest>>,
}
impl ApiServerHandler {
fn new(server: ApiServer, api_receiver: Receiver<Option<ApiRequest>>) -> Result<Self> {
Ok(Self {
server,
api_receiver,
})
}
fn handle_requests_from_router(&self) {
loop {
match self.api_receiver.recv() {
Ok(request) => {
if let Some(req) = request {
self.server.process_request(req).unwrap_or_else(|e| {
error!("HTTP handler failed to process request, {}", e)
});
} else {
debug!("Received exit notification from the HTTP router");
return;
}
}
Err(_e) => {
error!("Failed to receive request from the HTTP router");
return;
}
}
}
}
}
pub struct ApiServerController {
http_handler_thread: Option<JoinHandle<Result<()>>>,
http_router_thread: Option<JoinHandle<Result<()>>>,
sock: Option<String>,
waker: Option<Arc<Waker>>,
}
impl ApiServerController {
pub fn new(sock: Option<&str>) -> Self {
ApiServerController {
sock: sock.map(|v| v.to_string()),
http_handler_thread: None,
http_router_thread: None,
waker: None,
}
}
pub fn start(&mut self) -> Result<()> {
if self.sock.is_none() {
return Ok(());
}
let apisock = self.sock.as_ref().unwrap();
let (to_handler, from_router) = channel();
let (to_router, from_handler) = channel();
let api_server = ApiServer::new(to_router)?;
let api_handler = ApiServerHandler::new(api_server, from_router)?;
let (router_thread, waker) = start_http_thread(apisock, to_handler, from_handler)?;
let daemon_waker = DAEMON_CONTROLLER.alloc_waker();
info!("HTTP API server running at {}", apisock);
let handler_thread = std::thread::Builder::new()
.name("api-server".to_string())
.spawn(move || {
api_handler.handle_requests_from_router();
info!("HTTP api-server handler thread exits");
let _ = daemon_waker.wake();
Ok(())
})
.map_err(|_e| einval!("Failed to start work thread for HTTP handler"))?;
self.waker = Some(waker);
self.http_handler_thread = Some(handler_thread);
self.http_router_thread = Some(router_thread);
Ok(())
}
pub fn stop(&mut self) {
if let Some(waker) = self.waker.take() {
let _ = waker.wake();
}
if let Some(t) = self.http_handler_thread.take() {
if let Err(e) = t.join() {
error!(
"Failed to join the HTTP handler thread, execution error. {:?}",
e
);
}
}
if let Some(t) = self.http_router_thread.take() {
if let Err(e) = t.join() {
error!(
"Failed to join the HTTP router thread, execution error. {:?}",
e
);
}
}
if let Some(apisock) = self.sock.as_ref() {
std::fs::remove_file(apisock).unwrap_or_default();
}
}
}