#[macro_use]
extern crate log;
use std::fmt::{Display, Formatter};
use std::sync::{Arc, Mutex};
use std::thread;
use vhost::vhost_user::{Error as VhostUserError, Listener, SlaveListener, SlaveReqHandler};
use vm_memory::bitmap::Bitmap;
use vm_memory::mmap::NewBitmap;
use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap};
use self::handler::VhostUserHandler;
mod backend;
pub use self::backend::{VhostUserBackend, VhostUserBackendMut};
mod event_loop;
pub use self::event_loop::VringEpollHandler;
mod handler;
pub use self::handler::VhostUserHandlerError;
mod vring;
pub use self::vring::{
VringMutex, VringRwLock, VringState, VringStateGuard, VringStateMutGuard, VringT,
};
type GM<B> = GuestMemoryAtomic<GuestMemoryMmap<B>>;
#[derive(Debug)]
pub enum Error {
NewVhostUserHandler(VhostUserHandlerError),
CreateSlaveListener(VhostUserError),
CreateSlaveReqHandler(VhostUserError),
StartDaemon(std::io::Error),
WaitDaemon(std::boxed::Box<dyn std::any::Any + std::marker::Send>),
HandleRequest(VhostUserError),
}
impl Display for Error {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Error::NewVhostUserHandler(e) => write!(f, "cannot create vhost user handler: {}", e),
Error::CreateSlaveListener(e) => write!(f, "cannot create slave listener: {}", e),
Error::CreateSlaveReqHandler(e) => write!(f, "cannot create slave req handler: {}", e),
Error::StartDaemon(e) => write!(f, "failed to start daemon: {}", e),
Error::WaitDaemon(_e) => write!(f, "failed to wait for daemon exit"),
Error::HandleRequest(e) => write!(f, "failed to handle request: {}", e),
}
}
}
pub type Result<T> = std::result::Result<T, Error>;
pub struct VhostUserDaemon<S, V, B: Bitmap + 'static = ()> {
name: String,
handler: Arc<Mutex<VhostUserHandler<S, V, B>>>,
main_thread: Option<thread::JoinHandle<Result<()>>>,
}
impl<S, V, B> VhostUserDaemon<S, V, B>
where
S: VhostUserBackend<V, B> + Clone + 'static,
V: VringT<GM<B>> + Clone + Send + Sync + 'static,
B: NewBitmap + Clone + Send + Sync,
{
pub fn new(
name: String,
backend: S,
atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<B>>,
) -> Result<Self> {
let handler = Arc::new(Mutex::new(
VhostUserHandler::new(backend, atomic_mem).map_err(Error::NewVhostUserHandler)?,
));
Ok(VhostUserDaemon {
name,
handler,
main_thread: None,
})
}
fn start_daemon(
&mut self,
mut handler: SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>,
) -> Result<()> {
let handle = thread::Builder::new()
.name(self.name.clone())
.spawn(move || loop {
handler.handle_request().map_err(Error::HandleRequest)?;
})
.map_err(Error::StartDaemon)?;
self.main_thread = Some(handle);
Ok(())
}
pub fn start_client(&mut self, socket_path: &str) -> Result<()> {
let slave_handler = SlaveReqHandler::connect(socket_path, self.handler.clone())
.map_err(Error::CreateSlaveReqHandler)?;
self.start_daemon(slave_handler)
}
pub fn start(&mut self, listener: Listener) -> Result<()> {
let mut slave_listener = SlaveListener::new(listener, self.handler.clone())
.map_err(Error::CreateSlaveListener)?;
let slave_handler = self.accept(&mut slave_listener)?;
self.start_daemon(slave_handler)
}
fn accept(
&self,
slave_listener: &mut SlaveListener<Mutex<VhostUserHandler<S, V, B>>>,
) -> Result<SlaveReqHandler<Mutex<VhostUserHandler<S, V, B>>>> {
loop {
match slave_listener.accept() {
Err(e) => return Err(Error::CreateSlaveListener(e)),
Ok(Some(v)) => return Ok(v),
Ok(None) => continue,
}
}
}
pub fn wait(&mut self) -> Result<()> {
if let Some(handle) = self.main_thread.take() {
match handle.join().map_err(Error::WaitDaemon)? {
Ok(()) => Ok(()),
Err(Error::HandleRequest(VhostUserError::SocketBroken(_))) => Ok(()),
Err(e) => Err(e),
}
} else {
Ok(())
}
}
pub fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<S, V, B>>> {
self.handler.lock().unwrap().get_epoll_handlers()
}
}
#[cfg(test)]
mod tests {
use super::backend::tests::MockVhostBackend;
use super::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::sync::Barrier;
use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};
#[test]
fn test_new_daemon() {
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
);
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
let handlers = daemon.get_epoll_handlers();
assert_eq!(handlers.len(), 2);
let barrier = Arc::new(Barrier::new(2));
let tmpdir = tempfile::tempdir().unwrap();
let mut path = tmpdir.path().to_path_buf();
path.push("socket");
let barrier2 = barrier.clone();
let path1 = path.clone();
let thread = thread::spawn(move || {
barrier2.wait();
let socket = UnixStream::connect(&path1).unwrap();
barrier2.wait();
drop(socket)
});
let listener = Listener::new(&path, false).unwrap();
barrier.wait();
daemon.start(listener).unwrap();
barrier.wait();
daemon.wait().unwrap_err();
daemon.wait().unwrap();
thread.join().unwrap();
}
#[test]
fn test_new_daemon_client() {
let mem = GuestMemoryAtomic::new(
GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
);
let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();
let handlers = daemon.get_epoll_handlers();
assert_eq!(handlers.len(), 2);
let barrier = Arc::new(Barrier::new(2));
let tmpdir = tempfile::tempdir().unwrap();
let mut path = tmpdir.path().to_path_buf();
path.push("socket");
let barrier2 = barrier.clone();
let path1 = path.clone();
let thread = thread::spawn(move || {
let listener = UnixListener::bind(&path1).unwrap();
barrier2.wait();
let (stream, _) = listener.accept().unwrap();
barrier2.wait();
drop(stream)
});
barrier.wait();
daemon
.start_client(path.as_path().to_str().unwrap())
.unwrap();
barrier.wait();
daemon.wait().unwrap_err();
daemon.wait().unwrap();
thread.join().unwrap();
}
}