ipc_channel/router.rs
1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10//! Routers allow converting IPC channels to crossbeam channels.
11//! The [RouterProxy] provides various methods to register
12//! `IpcReceiver<T>`s. The router will then either call the appropriate callback or route the
13//! message to a crossbeam `Sender<T>` or `Receiver<T>`. You should use the global `ROUTER` to
14//! access the `RouterProxy` methods (via `ROUTER`'s `Deref` for `RouterProxy`.
15
16use std::collections::HashMap;
17use std::sync::{LazyLock, Mutex};
18use std::thread;
19
20use crate::ipc::OpaqueIpcReceiver;
21use crate::ipc::{self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender};
22use crossbeam_channel::{self, Receiver, Sender};
23use serde::{Deserialize, Serialize};
24
25/// Global object wrapping a `RouterProxy`.
26/// Add routes ([add_route](RouterProxy::add_route)), or convert IpcReceiver<T>
27/// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver))
28pub static ROUTER: LazyLock<RouterProxy> = LazyLock::new(RouterProxy::new);
29
30/// A `RouterProxy` provides methods for talking to the router. Calling
31/// [new](RouterProxy::new) automatically spins up a router thread which
32/// waits for events on its registered `IpcReceiver<T>`s. The `RouterProxy`'s
33/// methods communicate with the running router thread to register new
34/// `IpcReceiver<T>`'s
35pub struct RouterProxy {
36 comm: Mutex<RouterProxyComm>,
37}
38
39#[allow(clippy::new_without_default)]
40impl RouterProxy {
41 pub fn new() -> RouterProxy {
42 // Router acts like a receiver, running in its own thread with both
43 // receiver ends.
44 // Router proxy takes both sending ends.
45 let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
46 let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap();
47 thread::Builder::new()
48 .name("router-proxy".to_string())
49 .spawn(move || Router::new(msg_receiver, wakeup_receiver).run())
50 .expect("Failed to spawn router proxy thread");
51 RouterProxy {
52 comm: Mutex::new(RouterProxyComm {
53 msg_sender,
54 wakeup_sender,
55 shutdown: false,
56 }),
57 }
58 }
59
60 /// Add a new (receiver, callback) pair to the router, and send a wakeup message
61 /// to the router.
62 ///
63 /// Consider using [add_typed_route](Self::add_typed_route) instead, which prevents
64 /// mismatches between the receiver and callback types.
65 #[deprecated(since = "0.19.0", note = "please use 'add_typed_route' instead")]
66 pub fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
67 let comm = self.comm.lock().unwrap();
68
69 if comm.shutdown {
70 return;
71 }
72
73 comm.msg_sender
74 .send(RouterMsg::AddRoute(receiver, callback))
75 .unwrap();
76 comm.wakeup_sender.send(()).unwrap();
77 }
78
79 /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
80 /// to the router.
81 ///
82 /// Unlike [add_route](Self::add_route) this method is strongly typed and guarantees
83 /// that the `receiver` and the `callback` use the same message type.
84 pub fn add_typed_route<T>(&self, receiver: IpcReceiver<T>, mut callback: TypedRouterHandler<T>)
85 where
86 T: Serialize + for<'de> Deserialize<'de> + 'static,
87 {
88 // Before passing the message on to the callback, turn it into the appropriate type
89 let modified_callback = move |msg: IpcMessage| {
90 let typed_message = msg.to::<T>();
91 callback(typed_message)
92 };
93
94 #[allow(deprecated)]
95 self.add_route(receiver.to_opaque(), Box::new(modified_callback));
96 }
97
98 /// Send a shutdown message to the router containing a ACK sender,
99 /// send a wakeup message to the router, and block on the ACK.
100 /// Calling it is idempotent,
101 /// which can be useful when running a multi-process system in single-process mode.
102 pub fn shutdown(&self) {
103 let mut comm = self.comm.lock().unwrap();
104
105 if comm.shutdown {
106 return;
107 }
108 comm.shutdown = true;
109
110 let (ack_sender, ack_receiver) = crossbeam_channel::unbounded();
111 comm.wakeup_sender
112 .send(())
113 .map(|_| {
114 comm.msg_sender
115 .send(RouterMsg::Shutdown(ack_sender))
116 .unwrap();
117 ack_receiver.recv().unwrap();
118 })
119 .unwrap();
120 }
121
122 /// A convenience function to route an `IpcReceiver<T>` to an existing `Sender<T>`.
123 pub fn route_ipc_receiver_to_crossbeam_sender<T>(
124 &self,
125 ipc_receiver: IpcReceiver<T>,
126 crossbeam_sender: Sender<T>,
127 ) where
128 T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
129 {
130 self.add_typed_route(
131 ipc_receiver,
132 Box::new(move |message| drop(crossbeam_sender.send(message.unwrap()))),
133 )
134 }
135
136 /// A convenience function to route an `IpcReceiver<T>` to a `Receiver<T>`: the most common
137 /// use of a `Router`.
138 pub fn route_ipc_receiver_to_new_crossbeam_receiver<T>(
139 &self,
140 ipc_receiver: IpcReceiver<T>,
141 ) -> Receiver<T>
142 where
143 T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
144 {
145 let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
146 self.route_ipc_receiver_to_crossbeam_sender(ipc_receiver, crossbeam_sender);
147 crossbeam_receiver
148 }
149}
150
151struct RouterProxyComm {
152 msg_sender: Sender<RouterMsg>,
153 wakeup_sender: IpcSender<()>,
154 shutdown: bool,
155}
156
157/// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet
158/// and listens for events using select().
159struct Router {
160 /// Get messages from RouterProxy.
161 msg_receiver: Receiver<RouterMsg>,
162 /// The ID/index of the special channel we use to identify messages from msg_receiver.
163 msg_wakeup_id: u64,
164 /// Set of all receivers which have been registered for us to select on.
165 ipc_receiver_set: IpcReceiverSet,
166 /// Maps ids to their handler functions.
167 handlers: HashMap<u64, RouterHandler>,
168}
169
170impl Router {
171 fn new(msg_receiver: Receiver<RouterMsg>, wakeup_receiver: IpcReceiver<()>) -> Router {
172 let mut ipc_receiver_set = IpcReceiverSet::new().unwrap();
173 let msg_wakeup_id = ipc_receiver_set.add(wakeup_receiver).unwrap();
174 Router {
175 msg_receiver,
176 msg_wakeup_id,
177 ipc_receiver_set,
178 handlers: HashMap::new(),
179 }
180 }
181
182 /// Continuously loop waiting for wakeup signals from router proxy.
183 /// Iterate over events either:
184 /// 1) If a message comes in from our special `wakeup_receiver` (identified through
185 /// msg_wakeup_id. Read message from `msg_receiver` and add a new receiver
186 /// to our receiver set.
187 /// 2) Call appropriate handler based on message id.
188 /// 3) Remove handler once channel closes.
189 fn run(&mut self) {
190 loop {
191 // Wait for events to come from our select() new channels are added to
192 // our ReceiverSet below.
193 let results = match self.ipc_receiver_set.select() {
194 Ok(results) => results,
195 Err(_) => break,
196 };
197
198 // Iterate over numerous events that were ready at this time.
199 for result in results.into_iter() {
200 match result {
201 // Message came from the RouterProxy. Listen on our `msg_receiver`
202 // channel.
203 IpcSelectionResult::MessageReceived(id, _) if id == self.msg_wakeup_id => {
204 match self.msg_receiver.recv().unwrap() {
205 RouterMsg::AddRoute(receiver, handler) => {
206 let new_receiver_id =
207 self.ipc_receiver_set.add_opaque(receiver).unwrap();
208 self.handlers.insert(new_receiver_id, handler);
209 },
210 RouterMsg::Shutdown(sender) => {
211 sender
212 .send(())
213 .expect("Failed to send comfirmation of shutdown.");
214 break;
215 },
216 }
217 },
218 // Event from one of our registered receivers, call callback.
219 IpcSelectionResult::MessageReceived(id, message) => {
220 self.handlers.get_mut(&id).unwrap()(message)
221 },
222 IpcSelectionResult::ChannelClosed(id) => {
223 let _ = self.handlers.remove(&id).unwrap();
224 },
225 }
226 }
227 }
228 }
229}
230
231enum RouterMsg {
232 /// Register the receiver OpaqueIpcReceiver for listening for events on.
233 /// When a message comes from this receiver, call RouterHandler.
234 AddRoute(OpaqueIpcReceiver, RouterHandler),
235 /// Shutdown the router, providing a sender to send an acknowledgement.
236 Shutdown(Sender<()>),
237}
238
239/// Function to call when a new event is received from the corresponding receiver.
240pub type RouterHandler = Box<dyn FnMut(IpcMessage) + Send>;
241
242/// Like [RouterHandler] but includes the type that will be passed to the callback
243pub type TypedRouterHandler<T> = Box<dyn FnMut(Result<T, bincode::Error>) + Send>;