event_manager/
endpoint.rs

1// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause
3
4//! A manager remote endpoint allows user to interact with the `EventManger` (as a `SubscriberOps`
5//! trait object) from a different thread of execution.
6//!
7//! This is particularly useful when the `EventManager` owns (via `EventManager::add_subscriber`)
8//! a subscriber object the user needs to work with (via `EventManager::subscriber_mut`), but the
9//! `EventManager` being on a different thread requires synchronized handles.
10//!
11//! Until more sophisticated methods are explored (for example making the `EventManager` offer
12//! interior mutability using something like an RCU mechanism), the current approach relies on
13//! passing boxed closures to the manager and getting back a boxed result. The manager is notified
14//! about incoming invocation requests via an `EventFd` which is added to the epoll event set.
15//! The signature of the closures as they are received is the `FnOnceBox` type alias defined
16//! below. The actual return type is opaque to the manager, but known to the initiator. The manager
17//! runs each closure to completion, and then returns the boxed result using a sender object that
18//! is part of the initial message that also included the closure.
19
20use std::any::Any;
21use std::os::unix::io::{AsRawFd, RawFd};
22use std::result;
23use std::sync::mpsc::{channel, Receiver, Sender};
24use std::sync::Arc;
25
26use vmm_sys_util::eventfd::{EventFd, EFD_NONBLOCK};
27
28use super::{Errno, Error, MutEventSubscriber, Result, SubscriberOps};
29
30// The return type of the closure received by the manager is erased (by placing it into a
31// `Box<dyn Any + Send>` in order to have a single concrete type definition for the messages that
32// contain closures to be executed (the `FnMsg` defined below). The actual return type is
33// recovered by the initiator of the remote call (more details in the implementation of
34// `RemoteEndpoint::call_blocking` below). The `Send` bound is required to send back the boxed
35// result over the return channel.
36type ErasedResult = Box<dyn Any + Send>;
37
38// Type alias for the boxed closures received by the manager. The `Send` bound at the end applies
39// to the type of the closure, and is required to send the box over the channel.
40type FnOnceBox<S> = Box<dyn FnOnce(&mut dyn SubscriberOps<Subscriber = S>) -> ErasedResult + Send>;
41
42// The type of the messages received by the manager over its receive mpsc channel.
43pub(crate) struct FnMsg<S> {
44    // The closure to execute.
45    pub(crate) fnbox: FnOnceBox<S>,
46    // The sending endpoint of the channel used by the remote called to wait for the result.
47    pub(crate) sender: Option<Sender<ErasedResult>>,
48}
49
50// Used by the `EventManager` to keep state associated with the channel.
51#[derive(Debug)]
52pub(crate) struct EventManagerChannel<S> {
53    // A clone of this is given to every `RemoteEndpoint` and used to signal the presence of
54    // an new message on the channel.
55    pub(crate) event_fd: Arc<EventFd>,
56    // A clone of this sender is given to every `RemoteEndpoint` and used to send `FnMsg` objects
57    // to the `EventManager` over the channel.
58    pub(crate) sender: Sender<FnMsg<S>>,
59    // The receiving half of the channel, used to receive incoming `FnMsg` objects.
60    pub(crate) receiver: Receiver<FnMsg<S>>,
61}
62
63impl<S> EventManagerChannel<S> {
64    pub(crate) fn new() -> Result<Self> {
65        let (sender, receiver) = channel();
66        Ok(EventManagerChannel {
67            event_fd: Arc::new(
68                EventFd::new(EFD_NONBLOCK).map_err(|e| Error::EventFd(Errno::from(e)))?,
69            ),
70            sender,
71            receiver,
72        })
73    }
74
75    pub(crate) fn fd(&self) -> RawFd {
76        self.event_fd.as_raw_fd()
77    }
78
79    pub(crate) fn remote_endpoint(&self) -> RemoteEndpoint<S> {
80        RemoteEndpoint {
81            msg_sender: self.sender.clone(),
82            event_fd: self.event_fd.clone(),
83        }
84    }
85}
86
87/// Enables interactions with an `EventManager` that runs on a different thread of execution.
88#[derive(Debug)]
89pub struct RemoteEndpoint<S> {
90    // A sender associated with `EventManager` channel requests are sent over.
91    msg_sender: Sender<FnMsg<S>>,
92    // Used to notify the `EventManager` about the arrival of a new request.
93    event_fd: Arc<EventFd>,
94}
95
96impl<S> Clone for RemoteEndpoint<S> {
97    fn clone(&self) -> Self {
98        RemoteEndpoint {
99            msg_sender: self.msg_sender.clone(),
100            event_fd: self.event_fd.clone(),
101        }
102    }
103}
104
105impl<S: MutEventSubscriber> RemoteEndpoint<S> {
106    // Send a message to the remote EventManger and raise a notification.
107    fn send(&self, msg: FnMsg<S>) -> Result<()> {
108        self.msg_sender.send(msg).map_err(|_| Error::ChannelSend)?;
109        self.event_fd
110            .write(1)
111            .map_err(|e| Error::EventFd(Errno::from(e)))?;
112        Ok(())
113    }
114
115    /// Call the specified closure on the associated remote `EventManager` (provided as a
116    /// `SubscriberOps` trait object), and return the result. This method blocks until the result
117    /// is received, and calling it from the same thread where the event loop runs leads to
118    /// a deadlock.
119    pub fn call_blocking<F, O, E>(&self, f: F) -> result::Result<O, E>
120    where
121        F: FnOnce(&mut dyn SubscriberOps<Subscriber = S>) -> result::Result<O, E> + Send + 'static,
122        O: Send + 'static,
123        E: From<Error> + Send + 'static,
124    {
125        // Create a temporary channel used to get back the result. We keep the receiving end,
126        // and put the sending end into the message we pass to the remote `EventManager`.
127        let (sender, receiver) = channel();
128
129        // We erase the return type of `f` by moving and calling it inside another closure which
130        // hides the result as an `ErasedResult`. This allows using the same channel to send
131        // closures with different signatures (and thus different types) to the remote
132        // `EventManager`.
133        let fnbox = Box::new(
134            move |ops: &mut dyn SubscriberOps<Subscriber = S>| -> ErasedResult { Box::new(f(ops)) },
135        );
136
137        // Send the message requesting the closure invocation.
138        self.send(FnMsg {
139            fnbox,
140            sender: Some(sender),
141        })?;
142
143        // Block until a response is received. We can use unwrap because the downcast cannot fail,
144        // since the signature of F (more specifically, the return value) constrains the concrete
145        // type that's in the box.
146        let result_box = receiver
147            .recv()
148            .map_err(|_| Error::ChannelRecv)?
149            .downcast()
150            .unwrap();
151
152        // Turns out the dereference operator has a special behaviour for boxed objects; if we
153        // own a `b: Box<T>` and call `*b`, the box goes away and we get the `T` inside.
154        *result_box
155    }
156
157    /// Call the specified closure on the associated local/remote `EventManager` (provided as a
158    /// `SubscriberOps` trait object), and discard the result. This method only fires
159    /// the request but does not wait for result, so it may be called from the same thread where
160    /// the event loop runs.
161    pub fn fire<F>(&self, f: F) -> Result<()>
162    where
163        F: FnOnce(&mut dyn SubscriberOps<Subscriber = S>) + Send + 'static,
164    {
165        // We erase the return type of `f` by moving and calling it inside another closure which
166        // hides the result as an `ErasedResult`. This allows using the same channel send closures
167        // with different signatures (and thus different types) to the remote `EventManager`.
168        let fnbox = Box::new(
169            move |ops: &mut dyn SubscriberOps<Subscriber = S>| -> ErasedResult {
170                f(ops);
171                Box::new(())
172            },
173        );
174
175        // Send the message requesting the closure invocation.
176        self.send(FnMsg {
177            fnbox,
178            sender: None,
179        })
180    }
181
182    /// Kick the worker thread to wake up from the epoll event loop.
183    pub fn kick(&self) -> Result<()> {
184        self.event_fd
185            .write(1)
186            .map(|_| ())
187            .map_err(|e| Error::EventFd(Errno::from(e)))
188    }
189}