use std::any::Any;
use std::cmp::PartialEq;
use std::convert::From;
use std::fmt::{Display, Formatter};
use std::io::Result;
use std::ops::Deref;
use std::process::id;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::{error, fmt, io};
use fuse_backend_rs::api::vfs::VfsError;
use fuse_backend_rs::transport::Error as FuseTransportError;
use fuse_backend_rs::Error as FuseError;
use rust_fsm::*;
use serde::{self, Serialize};
use serde_json::Error as SerdeError;
use crate::fs_service::{FsBackendCollection, FsService};
use nydus_app::BuildTimeInfo;
use rafs::RafsError;
use crate::upgrade::UpgradeMgrError;
#[allow(dead_code)]
#[allow(clippy::upper_case_acronyms)]
#[derive(Debug, Hash, PartialEq, Eq, Serialize)]
pub enum DaemonState {
INIT = 1,
RUNNING = 2,
READY = 3,
STOPPED = 4,
UNKNOWN = 5,
}
impl Display for DaemonState {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl From<i32> for DaemonState {
fn from(i: i32) -> Self {
match i {
1 => DaemonState::INIT,
2 => DaemonState::RUNNING,
3 => DaemonState::READY,
4 => DaemonState::STOPPED,
_ => DaemonState::UNKNOWN,
}
}
}
#[derive(Debug)]
pub enum DaemonError {
AlreadyExists,
Common(String),
InvalidArguments(String),
InvalidConfig(String),
NotFound,
NotReady,
Unsupported,
Serde(SerdeError),
ThreadSpawn(io::Error),
UpgradeManager(UpgradeMgrError),
Channel(String),
StartService(String),
ServiceStop,
UnexpectedEvent(DaemonStateMachineInput),
WaitDaemon(io::Error),
FsTypeMismatch(String),
PassthroughFs(io::Error),
Rafs(RafsError),
Vfs(VfsError),
HandleEventNotEpollIn,
HandleEventUnknownEvent,
IterateQueue,
InvalidDescriptorChain(FuseTransportError),
ProcessQueue(FuseError),
Epoll(io::Error),
DaemonFailure(String),
SessionShutdown(FuseTransportError),
}
impl fmt::Display for DaemonError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidArguments(s) => write!(f, "Invalid argument: {}", s),
Self::InvalidConfig(s) => write!(f, "Invalid config: {}", s),
Self::DaemonFailure(s) => write!(f, "Daemon error: {}", s),
_ => write!(f, "{:?}", self),
}
}
}
impl error::Error for DaemonError {}
impl From<DaemonError> for io::Error {
fn from(e: DaemonError) -> Self {
einval!(e)
}
}
impl From<VfsError> for DaemonError {
fn from(e: VfsError) -> Self {
DaemonError::Vfs(e)
}
}
impl From<RafsError> for DaemonError {
fn from(error: RafsError) -> Self {
DaemonError::Rafs(error)
}
}
pub type DaemonResult<T> = std::result::Result<T, DaemonError>;
#[derive(Serialize)]
pub struct DaemonInfo {
pub version: BuildTimeInfo,
pub id: Option<String>,
pub supervisor: Option<String>,
pub state: DaemonState,
pub backend_collection: Option<FsBackendCollection>,
}
pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn id(&self) -> Option<String>;
fn get_state(&self) -> DaemonState;
fn set_state(&self, s: DaemonState);
fn version(&self) -> BuildTimeInfo;
fn export_info(&self, include_fs_info: bool) -> DaemonResult<String> {
let mut response = DaemonInfo {
version: self.version(),
id: self.id(),
supervisor: self.supervisor(),
state: self.get_state(),
backend_collection: None,
};
if include_fs_info {
if let Some(fs) = self.get_default_fs_service() {
response.backend_collection = Some(fs.backend_collection().deref().clone());
}
}
serde_json::to_string(&response).map_err(DaemonError::Serde)
}
fn start(&self) -> DaemonResult<()>;
fn disconnect(&self) -> DaemonResult<()>;
fn interrupt(&self) {}
fn stop(&self) -> DaemonResult<()> {
let s = self.get_state();
if s == DaemonState::STOPPED {
return Ok(());
}
if s == DaemonState::RUNNING {
self.on_event(DaemonStateMachineInput::Stop)?;
}
self.on_event(DaemonStateMachineInput::Stop)
}
fn wait(&self) -> DaemonResult<()>;
fn wait_service(&self) -> DaemonResult<()> {
Ok(())
}
fn wait_state_machine(&self) -> DaemonResult<()> {
Ok(())
}
fn trigger_exit(&self) -> DaemonResult<()> {
let s = self.get_state();
if s == DaemonState::STOPPED {
return Ok(());
}
if s == DaemonState::INIT {
return self.on_event(DaemonStateMachineInput::Stop);
}
if s == DaemonState::RUNNING {
self.on_event(DaemonStateMachineInput::Stop)?;
}
self.on_event(DaemonStateMachineInput::Exit)
}
fn supervisor(&self) -> Option<String>;
fn save(&self) -> DaemonResult<()>;
fn restore(&self) -> DaemonResult<()>;
fn trigger_takeover(&self) -> DaemonResult<()> {
self.on_event(DaemonStateMachineInput::Takeover)
}
fn trigger_start(&self) -> DaemonResult<()> {
self.on_event(DaemonStateMachineInput::Start)
}
fn get_default_fs_service(&self) -> Option<Arc<dyn FsService>>;
}
state_machine! {
derive(Debug, Clone)
pub DaemonStateMachine(Init)
Init => {
Mount => Ready,
Takeover => Ready[Restore],
Stop => Die[StopStateMachine],
},
Ready => {
Start => Running[StartService],
Stop => Die[Umount],
Exit => Die[StopStateMachine],
},
Running => {
Stop => Ready [TerminateService],
},
}
pub struct DaemonStateMachineContext {
pid: u32,
daemon: Arc<dyn NydusDaemon>,
sm: StateMachine<DaemonStateMachine>,
request_receiver: Receiver<DaemonStateMachineInput>,
result_sender: Sender<DaemonResult<()>>,
}
impl DaemonStateMachineContext {
pub fn new(
daemon: Arc<dyn NydusDaemon>,
request_receiver: Receiver<DaemonStateMachineInput>,
result_sender: Sender<DaemonResult<()>>,
) -> Self {
DaemonStateMachineContext {
pid: id(),
daemon,
sm: StateMachine::new(),
request_receiver,
result_sender,
}
}
pub fn kick_state_machine(mut self) -> Result<JoinHandle<Result<()>>> {
let thread = thread::Builder::new()
.name("state_machine".to_string())
.spawn(move || {
loop {
use DaemonStateMachineOutput::*;
let event = self
.request_receiver
.recv()
.expect("Event channel can't be broken!");
let last = self.sm.state().clone();
let input = &event;
let action = if let Ok(a) = self.sm.consume(&event) {
a
} else {
error!(
"Wrong event input. Event={:?}, CurrentState={:?}",
input, &last
);
self.result_sender
.send(Err(DaemonError::UnexpectedEvent(input.clone())))
.unwrap();
continue;
};
let d = self.daemon.as_ref();
let cur = self.sm.state();
info!(
"State machine(pid={}): from {:?} to {:?}, input [{:?}], output [{:?}]",
&self.pid, last, cur, input, &action
);
let r = match action {
Some(a) => match a {
StartService => d.start().map(|r| {
d.set_state(DaemonState::RUNNING);
r
}),
TerminateService => {
d.interrupt();
let res = d.wait_service();
if res.is_ok() {
d.set_state(DaemonState::READY);
}
res
}
Umount => d.disconnect().map(|r| {
d.interrupt();
d.wait_service()
.unwrap_or_else(|e| error!("failed to wait service {}", e));
d.set_state(DaemonState::STOPPED);
r
}),
Restore => {
let res = d.restore();
if res.is_ok() {
d.set_state(DaemonState::READY);
}
res
}
StopStateMachine => {
d.set_state(DaemonState::STOPPED);
Ok(())
}
},
_ => Ok(()), };
self.result_sender.send(r).unwrap();
if d.get_state() == DaemonState::STOPPED {
break;
}
}
info!("state_machine thread exits");
Ok(())
})
.map_err(DaemonError::ThreadSpawn)?;
Ok(thread)
}
}
pub trait DaemonStateMachineSubscriber {
fn on_event(&self, event: DaemonStateMachineInput) -> DaemonResult<()>;
}
#[cfg(test)]
mod tests {
use super::*;
use nydus::FsBackendType;
#[test]
fn it_should_convert_int_to_daemonstate() {
let stat = DaemonState::from(1);
assert_eq!(stat, DaemonState::INIT);
let stat = DaemonState::from(2);
assert_eq!(stat, DaemonState::RUNNING);
let stat = DaemonState::from(3);
assert_eq!(stat, DaemonState::READY);
let stat = DaemonState::from(4);
assert_eq!(stat, DaemonState::STOPPED);
let stat = DaemonState::from(5);
assert_eq!(stat, DaemonState::UNKNOWN);
let stat = DaemonState::from(8);
assert_eq!(stat, DaemonState::UNKNOWN);
}
#[test]
fn it_should_convert_str_to_fsbackendtype() {
let backend_type: FsBackendType = "rafs".parse().unwrap();
assert!(backend_type == FsBackendType::Rafs);
let backend_type: FsBackendType = "passthrough_fs".parse().unwrap();
assert!(backend_type == FsBackendType::PassthroughFs);
assert!("xxxxxxxxxxxxx".parse::<FsBackendType>().is_err());
}
}