event_manager/endpoint.rs
1// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0 OR BSD-3-Clause
3
4//! A manager remote endpoint allows user to interact with the `EventManger` (as a `SubscriberOps`
5//! trait object) from a different thread of execution.
6//!
7//! This is particularly useful when the `EventManager` owns (via `EventManager::add_subscriber`)
8//! a subscriber object the user needs to work with (via `EventManager::subscriber_mut`), but the
9//! `EventManager` being on a different thread requires synchronized handles.
10//!
11//! Until more sophisticated methods are explored (for example making the `EventManager` offer
12//! interior mutability using something like an RCU mechanism), the current approach relies on
13//! passing boxed closures to the manager and getting back a boxed result. The manager is notified
14//! about incoming invocation requests via an `EventFd` which is added to the epoll event set.
15//! The signature of the closures as they are received is the `FnOnceBox` type alias defined
16//! below. The actual return type is opaque to the manager, but known to the initiator. The manager
17//! runs each closure to completion, and then returns the boxed result using a sender object that
18//! is part of the initial message that also included the closure.
19
20use std::any::Any;
21use std::os::unix::io::{AsRawFd, RawFd};
22use std::result;
23use std::sync::mpsc::{channel, Receiver, Sender};
24use std::sync::Arc;
25
26use vmm_sys_util::eventfd::{EventFd, EFD_NONBLOCK};
27
28use super::{Errno, Error, MutEventSubscriber, Result, SubscriberOps};
29
30// The return type of the closure received by the manager is erased (by placing it into a
31// `Box<dyn Any + Send>` in order to have a single concrete type definition for the messages that
32// contain closures to be executed (the `FnMsg` defined below). The actual return type is
33// recovered by the initiator of the remote call (more details in the implementation of
34// `RemoteEndpoint::call_blocking` below). The `Send` bound is required to send back the boxed
35// result over the return channel.
36type ErasedResult = Box<dyn Any + Send>;
37
38// Type alias for the boxed closures received by the manager. The `Send` bound at the end applies
39// to the type of the closure, and is required to send the box over the channel.
40type FnOnceBox<S> = Box<dyn FnOnce(&mut dyn SubscriberOps<Subscriber = S>) -> ErasedResult + Send>;
41
42// The type of the messages received by the manager over its receive mpsc channel.
43pub(crate) struct FnMsg<S> {
44 // The closure to execute.
45 pub(crate) fnbox: FnOnceBox<S>,
46 // The sending endpoint of the channel used by the remote called to wait for the result.
47 pub(crate) sender: Option<Sender<ErasedResult>>,
48}
49
50// Used by the `EventManager` to keep state associated with the channel.
51#[derive(Debug)]
52pub(crate) struct EventManagerChannel<S> {
53 // A clone of this is given to every `RemoteEndpoint` and used to signal the presence of
54 // an new message on the channel.
55 pub(crate) event_fd: Arc<EventFd>,
56 // A clone of this sender is given to every `RemoteEndpoint` and used to send `FnMsg` objects
57 // to the `EventManager` over the channel.
58 pub(crate) sender: Sender<FnMsg<S>>,
59 // The receiving half of the channel, used to receive incoming `FnMsg` objects.
60 pub(crate) receiver: Receiver<FnMsg<S>>,
61}
62
63impl<S> EventManagerChannel<S> {
64 pub(crate) fn new() -> Result<Self> {
65 let (sender, receiver) = channel();
66 Ok(EventManagerChannel {
67 event_fd: Arc::new(
68 EventFd::new(EFD_NONBLOCK).map_err(|e| Error::EventFd(Errno::from(e)))?,
69 ),
70 sender,
71 receiver,
72 })
73 }
74
75 pub(crate) fn fd(&self) -> RawFd {
76 self.event_fd.as_raw_fd()
77 }
78
79 pub(crate) fn remote_endpoint(&self) -> RemoteEndpoint<S> {
80 RemoteEndpoint {
81 msg_sender: self.sender.clone(),
82 event_fd: self.event_fd.clone(),
83 }
84 }
85}
86
87/// Enables interactions with an `EventManager` that runs on a different thread of execution.
88#[derive(Debug)]
89pub struct RemoteEndpoint<S> {
90 // A sender associated with `EventManager` channel requests are sent over.
91 msg_sender: Sender<FnMsg<S>>,
92 // Used to notify the `EventManager` about the arrival of a new request.
93 event_fd: Arc<EventFd>,
94}
95
96impl<S> Clone for RemoteEndpoint<S> {
97 fn clone(&self) -> Self {
98 RemoteEndpoint {
99 msg_sender: self.msg_sender.clone(),
100 event_fd: self.event_fd.clone(),
101 }
102 }
103}
104
105impl<S: MutEventSubscriber> RemoteEndpoint<S> {
106 // Send a message to the remote EventManger and raise a notification.
107 fn send(&self, msg: FnMsg<S>) -> Result<()> {
108 self.msg_sender.send(msg).map_err(|_| Error::ChannelSend)?;
109 self.event_fd
110 .write(1)
111 .map_err(|e| Error::EventFd(Errno::from(e)))?;
112 Ok(())
113 }
114
115 /// Call the specified closure on the associated remote `EventManager` (provided as a
116 /// `SubscriberOps` trait object), and return the result. This method blocks until the result
117 /// is received, and calling it from the same thread where the event loop runs leads to
118 /// a deadlock.
119 pub fn call_blocking<F, O, E>(&self, f: F) -> result::Result<O, E>
120 where
121 F: FnOnce(&mut dyn SubscriberOps<Subscriber = S>) -> result::Result<O, E> + Send + 'static,
122 O: Send + 'static,
123 E: From<Error> + Send + 'static,
124 {
125 // Create a temporary channel used to get back the result. We keep the receiving end,
126 // and put the sending end into the message we pass to the remote `EventManager`.
127 let (sender, receiver) = channel();
128
129 // We erase the return type of `f` by moving and calling it inside another closure which
130 // hides the result as an `ErasedResult`. This allows using the same channel to send
131 // closures with different signatures (and thus different types) to the remote
132 // `EventManager`.
133 let fnbox = Box::new(
134 move |ops: &mut dyn SubscriberOps<Subscriber = S>| -> ErasedResult { Box::new(f(ops)) },
135 );
136
137 // Send the message requesting the closure invocation.
138 self.send(FnMsg {
139 fnbox,
140 sender: Some(sender),
141 })?;
142
143 // Block until a response is received. We can use unwrap because the downcast cannot fail,
144 // since the signature of F (more specifically, the return value) constrains the concrete
145 // type that's in the box.
146 let result_box = receiver
147 .recv()
148 .map_err(|_| Error::ChannelRecv)?
149 .downcast()
150 .unwrap();
151
152 // Turns out the dereference operator has a special behaviour for boxed objects; if we
153 // own a `b: Box<T>` and call `*b`, the box goes away and we get the `T` inside.
154 *result_box
155 }
156
157 /// Call the specified closure on the associated local/remote `EventManager` (provided as a
158 /// `SubscriberOps` trait object), and discard the result. This method only fires
159 /// the request but does not wait for result, so it may be called from the same thread where
160 /// the event loop runs.
161 pub fn fire<F>(&self, f: F) -> Result<()>
162 where
163 F: FnOnce(&mut dyn SubscriberOps<Subscriber = S>) + Send + 'static,
164 {
165 // We erase the return type of `f` by moving and calling it inside another closure which
166 // hides the result as an `ErasedResult`. This allows using the same channel send closures
167 // with different signatures (and thus different types) to the remote `EventManager`.
168 let fnbox = Box::new(
169 move |ops: &mut dyn SubscriberOps<Subscriber = S>| -> ErasedResult {
170 f(ops);
171 Box::new(())
172 },
173 );
174
175 // Send the message requesting the closure invocation.
176 self.send(FnMsg {
177 fnbox,
178 sender: None,
179 })
180 }
181
182 /// Kick the worker thread to wake up from the epoll event loop.
183 pub fn kick(&self) -> Result<()> {
184 self.event_fd
185 .write(1)
186 .map(|_| ())
187 .map_err(|e| Error::EventFd(Errno::from(e)))
188 }
189}