ipc_channel/router.rs
1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10//! Routers allow converting IPC channels to crossbeam channels.
11//! The [RouterProxy] provides various methods to register
12//! `IpcReceiver<T>`s. The router will then either call the appropriate callback or route the
13//! message to a crossbeam `Sender<T>` or `Receiver<T>`. You should use the global `ROUTER` to
14//! access the `RouterProxy` methods (via `ROUTER`'s `Deref` for `RouterProxy`.
15
16use std::collections::HashMap;
17use std::sync::{LazyLock, Mutex};
18use std::thread::{self, JoinHandle};
19
20use crate::error::SerDeError;
21use crate::ipc::OpaqueIpcReceiver;
22use crate::ipc::{self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender};
23use crossbeam_channel::{self, Receiver, Sender};
24use serde_core::{Deserialize, Serialize};
25
26/// Global object wrapping a `RouterProxy`.
27/// Add routes ([add_typed_route](RouterProxy::add_typed_route)), or convert `IpcReceiver<T>`
28/// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver))
29pub static ROUTER: LazyLock<RouterProxy> = LazyLock::new(RouterProxy::new);
30
31/// A `RouterProxy` provides methods for talking to the router. Calling
32/// [new](RouterProxy::new) automatically spins up a router thread which
33/// waits for events on its registered `IpcReceiver<T>`s. The `RouterProxy`'s
34/// methods communicate with the running router thread to register new
35/// `IpcReceiver<T>`'s
36pub struct RouterProxy {
37 comm: Mutex<RouterProxyComm>,
38}
39
40impl Drop for RouterProxy {
41 fn drop(&mut self) {
42 self.shutdown();
43 }
44}
45
46#[allow(clippy::new_without_default)]
47impl RouterProxy {
48 pub fn new() -> RouterProxy {
49 // Router acts like a receiver, running in its own thread with both
50 // receiver ends.
51 // Router proxy takes both sending ends.
52 let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
53 let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap();
54 let handle = thread::Builder::new()
55 .name("router-proxy".to_string())
56 .spawn(move || Router::new(msg_receiver, wakeup_receiver).run())
57 .expect("Failed to spawn router proxy thread");
58 RouterProxy {
59 comm: Mutex::new(RouterProxyComm {
60 msg_sender,
61 wakeup_sender,
62 shutdown: false,
63 handle: Some(handle),
64 }),
65 }
66 }
67
68 /// Add a new (receiver, callback) pair to the router, and send a wakeup message
69 /// to the router.
70 ///
71 /// The `callback` is dropped when `receiver`'s channel disconnects.
72 fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
73 let comm = self.comm.lock().unwrap();
74
75 if comm.shutdown {
76 return;
77 }
78
79 comm.msg_sender
80 .send(RouterMsg::AddRoute(receiver, callback))
81 .unwrap();
82 comm.wakeup_sender.send(()).unwrap();
83 }
84
85 /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
86 /// to the router.
87 ///
88 /// The `callback` is dropped when `receiver`'s channel disconnects.
89 pub fn add_typed_route<T>(
90 &self,
91 receiver: IpcReceiver<T>,
92 mut callback: TypedRouterMultiHandler<T>,
93 ) where
94 T: Serialize + for<'de> Deserialize<'de> + 'static,
95 {
96 // Before passing the message on to the callback, turn it into the appropriate type
97 let modified_callback = move |msg: IpcMessage| {
98 let typed_message = msg.to::<T>();
99 callback(typed_message)
100 };
101
102 self.add_route(
103 receiver.to_opaque(),
104 RouterHandler::Multi(Box::new(modified_callback)),
105 );
106 }
107
108 /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
109 /// to the router.
110 pub fn add_typed_one_shot_route<T>(
111 &self,
112 receiver: IpcReceiver<T>,
113 callback: TypedRouterOneShotHandler<T>,
114 ) where
115 T: Serialize + for<'de> Deserialize<'de> + 'static,
116 {
117 // Before passing the message on to the callback, turn it into the appropriate type
118 let modified_callback = move |msg: IpcMessage| {
119 let typed_message = msg.to::<T>();
120 callback(typed_message)
121 };
122
123 self.add_route(
124 receiver.to_opaque(),
125 RouterHandler::Once(Some(Box::new(modified_callback))),
126 );
127 }
128
129 /// Send a shutdown message to the router containing a ACK sender,
130 /// send a wakeup message to the router, and block on the ACK.
131 /// Calling it is idempotent,
132 /// which can be useful when running a multi-process system in single-process mode.
133 pub fn shutdown(&self) {
134 let mut comm = self.comm.lock().unwrap();
135
136 if comm.shutdown {
137 return;
138 }
139 comm.shutdown = true;
140
141 let (ack_sender, ack_receiver) = crossbeam_channel::unbounded();
142 comm.wakeup_sender
143 .send(())
144 .map(|_| {
145 comm.msg_sender
146 .send(RouterMsg::Shutdown(ack_sender))
147 .unwrap();
148 ack_receiver.recv().unwrap();
149 })
150 .unwrap();
151 comm.handle
152 .take()
153 .expect("Should have a join handle at shutdown")
154 .join()
155 .expect("Failed to join on the router proxy thread");
156 }
157
158 /// A convenience function to route an `IpcReceiver<T>` to an existing `Sender<T>`.
159 pub fn route_ipc_receiver_to_crossbeam_sender<T>(
160 &self,
161 ipc_receiver: IpcReceiver<T>,
162 crossbeam_sender: Sender<T>,
163 ) where
164 T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
165 {
166 self.add_typed_route(
167 ipc_receiver,
168 Box::new(move |message| drop(crossbeam_sender.send(message.unwrap()))),
169 )
170 }
171
172 /// A convenience function to route an `IpcReceiver<T>` to a `Receiver<T>`: the most common
173 /// use of a `Router`.
174 pub fn route_ipc_receiver_to_new_crossbeam_receiver<T>(
175 &self,
176 ipc_receiver: IpcReceiver<T>,
177 ) -> Receiver<T>
178 where
179 T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
180 {
181 let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
182 self.route_ipc_receiver_to_crossbeam_sender(ipc_receiver, crossbeam_sender);
183 crossbeam_receiver
184 }
185}
186
187struct RouterProxyComm {
188 msg_sender: Sender<RouterMsg>,
189 wakeup_sender: IpcSender<()>,
190 shutdown: bool,
191 handle: Option<JoinHandle<()>>,
192}
193
194/// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet
195/// and listens for events using select().
196struct Router {
197 /// Get messages from RouterProxy.
198 msg_receiver: Receiver<RouterMsg>,
199 /// The ID/index of the special channel we use to identify messages from msg_receiver.
200 msg_wakeup_id: u64,
201 /// Set of all receivers which have been registered for us to select on.
202 ipc_receiver_set: IpcReceiverSet,
203 /// Maps ids to their handler functions.
204 handlers: HashMap<u64, RouterHandler>,
205}
206
207impl Router {
208 fn new(msg_receiver: Receiver<RouterMsg>, wakeup_receiver: IpcReceiver<()>) -> Router {
209 let mut ipc_receiver_set = IpcReceiverSet::new().unwrap();
210 let msg_wakeup_id = ipc_receiver_set.add(wakeup_receiver).unwrap();
211 Router {
212 msg_receiver,
213 msg_wakeup_id,
214 ipc_receiver_set,
215 handlers: HashMap::new(),
216 }
217 }
218
219 /// Continuously loop waiting for wakeup signals from router proxy.
220 /// Iterate over events either:
221 /// 1) If a message comes in from our special `wakeup_receiver` (identified through
222 /// msg_wakeup_id. Read message from `msg_receiver` and add a new receiver
223 /// to our receiver set.
224 /// 2) Call appropriate handler based on message id.
225 /// 3) Remove handler once channel closes.
226 fn run(&mut self) {
227 loop {
228 // Wait for events to come from our select() new channels are added to
229 // our ReceiverSet below.
230 let results = match self.ipc_receiver_set.select() {
231 Ok(results) => results,
232 Err(_) => break,
233 };
234
235 // Iterate over numerous events that were ready at this time.
236 for result in results.into_iter() {
237 match result {
238 // Message came from the RouterProxy. Listen on our `msg_receiver`
239 // channel.
240 IpcSelectionResult::MessageReceived(id, _) if id == self.msg_wakeup_id => {
241 match self.msg_receiver.recv().unwrap() {
242 RouterMsg::AddRoute(receiver, handler) => {
243 let new_receiver_id =
244 self.ipc_receiver_set.add_opaque(receiver).unwrap();
245 self.handlers.insert(new_receiver_id, handler);
246 },
247 RouterMsg::Shutdown(sender) => {
248 sender
249 .send(())
250 .expect("Failed to send comfirmation of shutdown.");
251 return;
252 },
253 }
254 },
255 // Event from one of our registered receivers, call callback.
256 IpcSelectionResult::MessageReceived(id, message) => {
257 match self.handlers.get_mut(&id).unwrap() {
258 RouterHandler::Once(handler) => {
259 if let Some(handler) = handler.take() {
260 (handler)(message);
261 }
262 },
263 RouterHandler::Multi(ref mut handler) => {
264 (handler)(message);
265 },
266 }
267 },
268 IpcSelectionResult::ChannelClosed(id) => {
269 let _ = self.handlers.remove(&id).unwrap();
270 },
271 }
272 }
273 }
274 }
275}
276
277enum RouterMsg {
278 /// Register the receiver OpaqueIpcReceiver for listening for events on.
279 /// When a message comes from this receiver, call RouterHandler.
280 AddRoute(OpaqueIpcReceiver, RouterHandler),
281 /// Shutdown the router, providing a sender to send an acknowledgement.
282 Shutdown(Sender<()>),
283}
284
285/// Function to call when a new event is received from the corresponding receiver.
286pub type RouterMultiHandler = Box<dyn FnMut(IpcMessage) + Send>;
287
288/// Function to call the first time that a message is received from the corresponding receiver.
289pub type RouterOneShotHandler = Box<dyn FnOnce(IpcMessage) + Send>;
290
291enum RouterHandler {
292 Once(Option<RouterOneShotHandler>),
293 Multi(RouterMultiHandler),
294}
295
296/// Like [RouterMultiHandler] but includes the type that will be passed to the callback
297pub type TypedRouterMultiHandler<T> = Box<dyn FnMut(Result<T, SerDeError>) + Send>;
298
299/// Like [RouterOneShotHandler] but includes the type that will be passed to the callback
300pub type TypedRouterOneShotHandler<T> = Box<dyn FnOnce(Result<T, SerDeError>) + Send>;