vhost-user-backend 0.22.0

A framework to build vhost-user backend service daemon
// Copyright 2019 Intel Corporation. All Rights Reserved.
// Copyright 2019-2021 Alibaba Cloud Computing. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0

//! A simple framework to run a vhost-user backend service.

#[macro_use]
extern crate log;

use std::fmt::{Display, Formatter};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::thread;

use vhost::vhost_user::{BackendListener, BackendReqHandler, Error as VhostUserError, Listener};
use vm_memory::mmap::NewBitmap;
use vm_memory::{GuestMemoryAtomic, GuestMemoryMmap};

use self::handler::VhostUserHandler;

mod backend;
pub use self::backend::{VhostUserBackend, VhostUserBackendMut};

mod event_loop;
pub use self::event_loop::VringEpollHandler;

mod handler;
pub use self::handler::VhostUserHandlerError;

pub mod bitmap;
use crate::bitmap::BitmapReplace;

mod vring;
pub use self::vring::{
    VringMutex, VringRwLock, VringState, VringStateGuard, VringStateMutGuard, VringT,
};

// Due to the way `xen` handles memory mappings we can not combine it with
// `postcopy` feature which relies on persistent memory mappings. Thus we
// disallow enabling both features at the same time.
#[cfg(all(
    not(RUSTDOC_disable_feature_compat_errors),
    not(doc),
    feature = "postcopy",
    feature = "xen"
))]
compile_error!("Both `postcopy` and `xen` features can not be enabled at the same time.");

/// An alias for `GuestMemoryAtomic<GuestMemoryMmap<B>>` to simplify code.
type GM<B> = GuestMemoryAtomic<GuestMemoryMmap<B>>;

#[derive(Debug)]
/// Errors related to vhost-user daemon.
pub enum Error {
    /// Failed to create a new vhost-user handler.
    NewVhostUserHandler(VhostUserHandlerError),
    /// Failed creating vhost-user backend listener.
    CreateBackendListener(VhostUserError),
    /// Failed creating vhost-user backend handler.
    CreateBackendReqHandler(VhostUserError),
    /// Failed creating listener socket
    CreateVhostUserListener(VhostUserError),
    /// Failed starting daemon thread.
    StartDaemon(std::io::Error),
    /// Failed waiting for daemon thread.
    WaitDaemon(std::boxed::Box<dyn std::any::Any + std::marker::Send>),
    /// Failed handling a vhost-user request.
    HandleRequest(VhostUserError),
}

impl Display for Error {
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
        match self {
            Error::NewVhostUserHandler(e) => write!(f, "cannot create vhost user handler: {e}"),
            Error::CreateBackendListener(e) => write!(f, "cannot create backend listener: {e}"),
            Error::CreateBackendReqHandler(e) => {
                write!(f, "cannot create backend req handler: {e}")
            }
            Error::CreateVhostUserListener(e) => {
                write!(f, "cannot create vhost-user listener: {e}")
            }
            Error::StartDaemon(e) => write!(f, "failed to start daemon: {e}"),
            Error::WaitDaemon(_e) => write!(f, "failed to wait for daemon exit"),
            Error::HandleRequest(e) => write!(f, "failed to handle request: {e}"),
        }
    }
}

/// Result of vhost-user daemon operations.
pub type Result<T> = std::result::Result<T, Error>;

/// Implement a simple framework to run a vhost-user service daemon.
///
/// This structure is the public API the backend is allowed to interact with in order to run
/// a fully functional vhost-user daemon.
pub struct VhostUserDaemon<T: VhostUserBackend> {
    name: String,
    handler: Arc<Mutex<VhostUserHandler<T>>>,
    main_thread: Option<thread::JoinHandle<Result<()>>>,
}

impl<T> VhostUserDaemon<T>
where
    T: VhostUserBackend + Clone + 'static,
    T::Bitmap: BitmapReplace + NewBitmap + Clone + Send + Sync,
    T::Vring: Clone + Send + Sync,
{
    /// Create the daemon instance, providing the backend implementation of `VhostUserBackend`.
    ///
    /// Under the hood, this will start a dedicated thread responsible for listening onto
    /// registered event. Those events can be vring events or custom events from the backend,
    /// but they get to be registered later during the sequence.
    pub fn new(
        name: String,
        backend: T,
        atomic_mem: GuestMemoryAtomic<GuestMemoryMmap<T::Bitmap>>,
    ) -> Result<Self> {
        let handler = Arc::new(Mutex::new(
            VhostUserHandler::new(backend, atomic_mem).map_err(Error::NewVhostUserHandler)?,
        ));

        Ok(VhostUserDaemon {
            name,
            handler,
            main_thread: None,
        })
    }

    /// Run a dedicated thread handling all requests coming through the socket.
    /// This runs in an infinite loop that should be terminating once the other
    /// end of the socket (the VMM) hangs up.
    ///
    /// This function is the common code for starting a new daemon, no matter if
    /// it acts as a client or a server.
    fn start_daemon(
        &mut self,
        mut handler: BackendReqHandler<Mutex<VhostUserHandler<T>>>,
    ) -> Result<()> {
        let handle = thread::Builder::new()
            .name(self.name.clone())
            .spawn(move || loop {
                handler.handle_request().map_err(Error::HandleRequest)?;
            })
            .map_err(Error::StartDaemon)?;

        self.main_thread = Some(handle);

        Ok(())
    }

    /// Connect to the vhost-user socket and run a dedicated thread handling
    /// all requests coming through this socket. This runs in an infinite loop
    /// that should be terminating once the other end of the socket (the VMM)
    /// hangs up.
    pub fn start_client(&mut self, socket_path: &str) -> Result<()> {
        let backend_handler = BackendReqHandler::connect(socket_path, self.handler.clone())
            .map_err(Error::CreateBackendReqHandler)?;
        self.start_daemon(backend_handler)
    }

    /// Listen to the vhost-user socket and run a dedicated thread handling all requests coming
    /// through this socket.
    ///
    /// This runs in an infinite loop that should be terminating once the other end of the socket
    /// (the VMM) disconnects.
    ///
    /// *Note:* A convenience function [VhostUserDaemon::serve] exists that
    /// may be a better option than this for simple use-cases.
    pub fn start(&mut self, listener: &mut Listener) -> Result<()> {
        let mut backend_listener = BackendListener::new(listener, self.handler.clone())
            .map_err(Error::CreateBackendListener)?;
        let backend_handler = self.accept(&mut backend_listener)?;
        self.start_daemon(backend_handler)
    }

    fn accept(
        &self,
        backend_listener: &mut BackendListener<Mutex<VhostUserHandler<T>>>,
    ) -> Result<BackendReqHandler<Mutex<VhostUserHandler<T>>>> {
        loop {
            match backend_listener.accept() {
                Err(e) => return Err(Error::CreateBackendListener(e)),
                Ok(Some(v)) => return Ok(v),
                Ok(None) => continue,
            }
        }
    }

    /// Wait for the thread handling the vhost-user socket connection to terminate.
    ///
    /// *Note:* A convenience function [VhostUserDaemon::serve] exists that
    /// may be a better option than this for simple use-cases.
    pub fn wait(&mut self) -> Result<()> {
        if let Some(handle) = self.main_thread.take() {
            match handle.join().map_err(Error::WaitDaemon)? {
                Ok(()) => Ok(()),
                Err(Error::HandleRequest(VhostUserError::SocketBroken(_))) => Ok(()),
                Err(e) => Err(e),
            }
        } else {
            Ok(())
        }
    }

    /// Bind to socket, handle a single connection and shutdown
    ///
    /// This is a convenience function that provides an easy way to handle the
    /// following actions without needing to call the low-level functions:
    /// - Create a listener
    /// - Start listening
    /// - Handle a single event
    /// - Send the exit event to all handler threads
    ///
    /// Internal `Err` results that indicate a device disconnect will be treated
    /// as success and `Ok(())` will be returned in those cases.
    ///
    /// *Note:* See [VhostUserDaemon::start] and [VhostUserDaemon::wait] if you
    /// need more flexibility.
    pub fn serve<P: AsRef<Path>>(&mut self, socket: P) -> Result<()> {
        let mut listener = Listener::new(socket, true).map_err(Error::CreateVhostUserListener)?;

        self.start(&mut listener)?;
        let result = self.wait();

        // Regardless of the result, we want to signal worker threads to exit
        self.handler.lock().unwrap().send_exit_event();

        // For this convenience function we are not treating certain "expected"
        // outcomes as error. Disconnects and partial messages can be usual
        // behaviour seen from quitting guests.
        match &result {
            Err(e) => match e {
                Error::HandleRequest(VhostUserError::Disconnected) => Ok(()),
                Error::HandleRequest(VhostUserError::PartialMessage) => Ok(()),
                _ => result,
            },
            _ => result,
        }
    }

    /// Retrieve the vring epoll handler.
    ///
    /// This is necessary to perform further actions like registering and unregistering some extra
    /// event file descriptors.
    pub fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<T>>> {
        // Do not expect poisoned lock.
        self.handler.lock().unwrap().get_epoll_handlers()
    }
}

#[cfg(test)]
mod tests {
    use super::backend::tests::MockVhostBackend;
    use super::*;
    use libc::EAGAIN;
    use std::os::unix::net::{UnixListener, UnixStream};
    use std::sync::Barrier;
    use std::time::Duration;
    use vm_memory::{GuestAddress, GuestMemoryAtomic, GuestMemoryMmap};

    #[test]
    fn test_new_daemon() {
        let mem = GuestMemoryAtomic::new(
            GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
        );
        let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
        let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();

        let handlers = daemon.get_epoll_handlers();
        assert_eq!(handlers.len(), 2);

        let barrier = Arc::new(Barrier::new(2));
        let tmpdir = tempfile::tempdir().unwrap();
        let path = tmpdir.path().join("socket");

        thread::scope(|s| {
            s.spawn(|| {
                barrier.wait();
                let socket = UnixStream::connect(&path).unwrap();
                barrier.wait();
                drop(socket)
            });

            let mut listener = Listener::new(&path, false).unwrap();
            barrier.wait();
            daemon.start(&mut listener).unwrap();
            barrier.wait();
            // Above process generates a `HandleRequest(PartialMessage)` error.
            daemon.wait().unwrap_err();
            daemon.wait().unwrap();
        });
    }

    #[test]
    fn test_new_daemon_client() {
        let mem = GuestMemoryAtomic::new(
            GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
        );
        let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
        let mut daemon = VhostUserDaemon::new("test".to_owned(), backend, mem).unwrap();

        let handlers = daemon.get_epoll_handlers();
        assert_eq!(handlers.len(), 2);

        let barrier = Arc::new(Barrier::new(2));
        let tmpdir = tempfile::tempdir().unwrap();
        let path = tmpdir.path().join("socket");

        thread::scope(|s| {
            s.spawn(|| {
                let listener = UnixListener::bind(&path).unwrap();
                barrier.wait();
                let (stream, _) = listener.accept().unwrap();
                barrier.wait();
                drop(stream)
            });

            barrier.wait();
            daemon
                .start_client(path.as_path().to_str().unwrap())
                .unwrap();
            barrier.wait();
            // Above process generates a `HandleRequest(PartialMessage)` error.
            daemon.wait().unwrap_err();
            daemon.wait().unwrap();
        });
    }

    #[test]
    fn test_daemon_serve() {
        let mem = GuestMemoryAtomic::new(
            GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0x100000), 0x10000)]).unwrap(),
        );
        let backend = Arc::new(Mutex::new(MockVhostBackend::new()));
        let mut daemon = VhostUserDaemon::new("test".to_owned(), backend.clone(), mem).unwrap();
        let tmpdir = tempfile::tempdir().unwrap();
        let socket_path = tmpdir.path().join("socket");

        thread::scope(|s| {
            s.spawn(|| {
                let _ = daemon.serve(&socket_path);
            });

            // We have no way to wait for when the server becomes available...
            // So we will have to spin!
            while !socket_path.exists() {
                thread::sleep(Duration::from_millis(10));
            }

            // Check that no exit events got triggered yet
            for thread_id in 0..backend.queues_per_thread().len() {
                let fd = backend.exit_event(thread_id).unwrap();
                // Reading from exit fd should fail since nothing was written yet
                assert_eq!(
                    fd.0.consume().unwrap_err().raw_os_error().unwrap(),
                    EAGAIN,
                    "exit event should not have been raised yet!"
                );
            }

            let socket = UnixStream::connect(&socket_path).unwrap();
            // disconnect immediately again
            drop(socket);
        });

        // Check that exit events got triggered
        let backend = backend.lock().unwrap();
        for thread_id in 0..backend.queues_per_thread().len() {
            let fd = backend.exit_event(thread_id).unwrap();
            assert!(fd.0.consume().is_ok(), "No exit event was raised!");
        }
    }
}