use std::any::Any;
use std::os::unix::io::{AsRawFd, RawFd};
use std::result;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use vmm_sys_util::eventfd::{EventFd, EFD_NONBLOCK};
use super::{Errno, Error, MutEventSubscriber, Result, SubscriberOps};
type ErasedResult = Box<dyn Any + Send>;
type FnOnceBox<S> = Box<dyn FnOnce(&mut dyn SubscriberOps<Subscriber = S>) -> ErasedResult + Send>;
pub(crate) struct FnMsg<S> {
pub(crate) fnbox: FnOnceBox<S>,
pub(crate) sender: Option<Sender<ErasedResult>>,
}
#[derive(Debug)]
pub(crate) struct EventManagerChannel<S> {
pub(crate) event_fd: Arc<EventFd>,
pub(crate) sender: Sender<FnMsg<S>>,
pub(crate) receiver: Receiver<FnMsg<S>>,
}
impl<S> EventManagerChannel<S> {
pub(crate) fn new() -> Result<Self> {
let (sender, receiver) = channel();
Ok(EventManagerChannel {
event_fd: Arc::new(
EventFd::new(EFD_NONBLOCK).map_err(|e| Error::EventFd(Errno::from(e)))?,
),
sender,
receiver,
})
}
pub(crate) fn fd(&self) -> RawFd {
self.event_fd.as_raw_fd()
}
pub(crate) fn remote_endpoint(&self) -> RemoteEndpoint<S> {
RemoteEndpoint {
msg_sender: self.sender.clone(),
event_fd: self.event_fd.clone(),
}
}
}
#[derive(Debug)]
pub struct RemoteEndpoint<S> {
msg_sender: Sender<FnMsg<S>>,
event_fd: Arc<EventFd>,
}
impl<S> Clone for RemoteEndpoint<S> {
fn clone(&self) -> Self {
RemoteEndpoint {
msg_sender: self.msg_sender.clone(),
event_fd: self.event_fd.clone(),
}
}
}
impl<S: MutEventSubscriber> RemoteEndpoint<S> {
fn send(&self, msg: FnMsg<S>) -> Result<()> {
self.msg_sender.send(msg).map_err(|_| Error::ChannelSend)?;
self.event_fd
.write(1)
.map_err(|e| Error::EventFd(Errno::from(e)))?;
Ok(())
}
pub fn call_blocking<F, O, E>(&self, f: F) -> result::Result<O, E>
where
F: FnOnce(&mut dyn SubscriberOps<Subscriber = S>) -> result::Result<O, E> + Send + 'static,
O: Send + 'static,
E: From<Error> + Send + 'static,
{
let (sender, receiver) = channel();
let fnbox = Box::new(
move |ops: &mut dyn SubscriberOps<Subscriber = S>| -> ErasedResult { Box::new(f(ops)) },
);
self.send(FnMsg {
fnbox,
sender: Some(sender),
})?;
let result_box = receiver
.recv()
.map_err(|_| Error::ChannelRecv)?
.downcast()
.unwrap();
*result_box
}
pub fn fire<F>(&self, f: F) -> Result<()>
where
F: FnOnce(&mut dyn SubscriberOps<Subscriber = S>) + Send + 'static,
{
let fnbox = Box::new(
move |ops: &mut dyn SubscriberOps<Subscriber = S>| -> ErasedResult {
f(ops);
Box::new(())
},
);
self.send(FnMsg {
fnbox,
sender: None,
})
}
pub fn kick(&self) -> Result<()> {
self.event_fd
.write(1)
.map(|_| ())
.map_err(|e| Error::EventFd(Errno::from(e)))
}
}