Skip to main content

ipc_channel/
router.rs

1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10//! Routers allow converting IPC channels to crossbeam channels.
11//! The [RouterProxy] provides various methods to register
12//! `IpcReceiver<T>`s. The router will then either call the appropriate callback or route the
13//! message to a crossbeam `Sender<T>` or `Receiver<T>`. You should use the global `ROUTER` to
14//! access the `RouterProxy` methods (via `ROUTER`'s `Deref` for `RouterProxy`.
15
16use std::collections::HashMap;
17use std::sync::{LazyLock, Mutex};
18use std::thread::{self, JoinHandle};
19
20use crate::error::SerDeError;
21use crate::ipc::OpaqueIpcReceiver;
22use crate::ipc::{self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender};
23use crossbeam_channel::{self, Receiver, Sender};
24use serde_core::{Deserialize, Serialize};
25
26/// Global object wrapping a `RouterProxy`.
27/// Add routes ([add_typed_route](RouterProxy::add_typed_route)), or convert `IpcReceiver<T>`
28/// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver))
29pub static ROUTER: LazyLock<RouterProxy> = LazyLock::new(RouterProxy::new);
30
31/// A `RouterProxy` provides methods for talking to the router. Calling
32/// [new](RouterProxy::new) automatically spins up a router thread which
33/// waits for events on its registered `IpcReceiver<T>`s. The `RouterProxy`'s
34/// methods communicate with the running router thread to register new
35/// `IpcReceiver<T>`'s
36pub struct RouterProxy {
37    comm: Mutex<RouterProxyComm>,
38}
39
40impl Drop for RouterProxy {
41    fn drop(&mut self) {
42        self.shutdown();
43    }
44}
45
46#[allow(clippy::new_without_default)]
47impl RouterProxy {
48    pub fn new() -> RouterProxy {
49        // Router acts like a receiver, running in its own thread with both
50        // receiver ends.
51        // Router proxy takes both sending ends.
52        let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
53        let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap();
54        let handle = thread::Builder::new()
55            .name("router-proxy".to_string())
56            .spawn(move || Router::new(msg_receiver, wakeup_receiver).run())
57            .expect("Failed to spawn router proxy thread");
58        RouterProxy {
59            comm: Mutex::new(RouterProxyComm {
60                msg_sender,
61                wakeup_sender,
62                shutdown: false,
63                handle: Some(handle),
64            }),
65        }
66    }
67
68    /// Add a new (receiver, callback) pair to the router, and send a wakeup message
69    /// to the router.
70    ///
71    /// The `callback` is dropped when `receiver`'s channel disconnects.
72    fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
73        let comm = self.comm.lock().unwrap();
74
75        if comm.shutdown {
76            return;
77        }
78
79        comm.msg_sender
80            .send(RouterMsg::AddRoute(receiver, callback))
81            .unwrap();
82        comm.wakeup_sender.send(()).unwrap();
83    }
84
85    /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
86    /// to the router.
87    ///
88    /// The `callback` is dropped when `receiver`'s channel disconnects.
89    pub fn add_typed_route<T>(
90        &self,
91        receiver: IpcReceiver<T>,
92        mut callback: TypedRouterMultiHandler<T>,
93    ) where
94        T: Serialize + for<'de> Deserialize<'de> + 'static,
95    {
96        // Before passing the message on to the callback, turn it into the appropriate type
97        let modified_callback = move |msg: IpcMessage| {
98            let typed_message = msg.to::<T>();
99            callback(typed_message)
100        };
101
102        self.add_route(
103            receiver.to_opaque(),
104            RouterHandler::Multi(Box::new(modified_callback)),
105        );
106    }
107
108    /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
109    /// to the router.
110    pub fn add_typed_one_shot_route<T>(
111        &self,
112        receiver: IpcReceiver<T>,
113        callback: TypedRouterOneShotHandler<T>,
114    ) where
115        T: Serialize + for<'de> Deserialize<'de> + 'static,
116    {
117        // Before passing the message on to the callback, turn it into the appropriate type
118        let modified_callback = move |msg: IpcMessage| {
119            let typed_message = msg.to::<T>();
120            callback(typed_message)
121        };
122
123        self.add_route(
124            receiver.to_opaque(),
125            RouterHandler::Once(Some(Box::new(modified_callback))),
126        );
127    }
128
129    /// Send a shutdown message to the router containing a ACK sender,
130    /// send a wakeup message to the router, and block on the ACK.
131    /// Calling it is idempotent,
132    /// which can be useful when running a multi-process system in single-process mode.
133    pub fn shutdown(&self) {
134        let mut comm = self.comm.lock().unwrap();
135
136        if comm.shutdown {
137            return;
138        }
139        comm.shutdown = true;
140
141        let (ack_sender, ack_receiver) = crossbeam_channel::unbounded();
142        comm.wakeup_sender
143            .send(())
144            .map(|_| {
145                comm.msg_sender
146                    .send(RouterMsg::Shutdown(ack_sender))
147                    .unwrap();
148                ack_receiver.recv().unwrap();
149            })
150            .unwrap();
151        comm.handle
152            .take()
153            .expect("Should have a join handle at shutdown")
154            .join()
155            .expect("Failed to join on the router proxy thread");
156    }
157
158    /// A convenience function to route an `IpcReceiver<T>` to an existing `Sender<T>`.
159    pub fn route_ipc_receiver_to_crossbeam_sender<T>(
160        &self,
161        ipc_receiver: IpcReceiver<T>,
162        crossbeam_sender: Sender<T>,
163    ) where
164        T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
165    {
166        self.add_typed_route(
167            ipc_receiver,
168            Box::new(move |message| drop(crossbeam_sender.send(message.unwrap()))),
169        )
170    }
171
172    /// A convenience function to route an `IpcReceiver<T>` to a `Receiver<T>`: the most common
173    /// use of a `Router`.
174    pub fn route_ipc_receiver_to_new_crossbeam_receiver<T>(
175        &self,
176        ipc_receiver: IpcReceiver<T>,
177    ) -> Receiver<T>
178    where
179        T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
180    {
181        let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
182        self.route_ipc_receiver_to_crossbeam_sender(ipc_receiver, crossbeam_sender);
183        crossbeam_receiver
184    }
185}
186
187struct RouterProxyComm {
188    msg_sender: Sender<RouterMsg>,
189    wakeup_sender: IpcSender<()>,
190    shutdown: bool,
191    handle: Option<JoinHandle<()>>,
192}
193
194/// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet
195/// and listens for events using select().
196struct Router {
197    /// Get messages from RouterProxy.
198    msg_receiver: Receiver<RouterMsg>,
199    /// The ID/index of the special channel we use to identify messages from msg_receiver.
200    msg_wakeup_id: u64,
201    /// Set of all receivers which have been registered for us to select on.
202    ipc_receiver_set: IpcReceiverSet,
203    /// Maps ids to their handler functions.
204    handlers: HashMap<u64, RouterHandler>,
205}
206
207impl Router {
208    fn new(msg_receiver: Receiver<RouterMsg>, wakeup_receiver: IpcReceiver<()>) -> Router {
209        let mut ipc_receiver_set = IpcReceiverSet::new().unwrap();
210        let msg_wakeup_id = ipc_receiver_set.add(wakeup_receiver).unwrap();
211        Router {
212            msg_receiver,
213            msg_wakeup_id,
214            ipc_receiver_set,
215            handlers: HashMap::new(),
216        }
217    }
218
219    /// Continuously loop waiting for wakeup signals from router proxy.
220    /// Iterate over events either:
221    /// 1) If a message comes in from our special `wakeup_receiver` (identified through
222    ///    msg_wakeup_id. Read message from `msg_receiver` and add a new receiver
223    ///    to our receiver set.
224    /// 2) Call appropriate handler based on message id.
225    /// 3) Remove handler once channel closes.
226    fn run(&mut self) {
227        loop {
228            // Wait for events to come from our select() new channels are added to
229            // our ReceiverSet below.
230            let results = match self.ipc_receiver_set.select() {
231                Ok(results) => results,
232                Err(_) => break,
233            };
234
235            // Iterate over numerous events that were ready at this time.
236            for result in results.into_iter() {
237                match result {
238                    // Message came from the RouterProxy. Listen on our `msg_receiver`
239                    // channel.
240                    IpcSelectionResult::MessageReceived(id, _) if id == self.msg_wakeup_id => {
241                        match self.msg_receiver.recv().unwrap() {
242                            RouterMsg::AddRoute(receiver, handler) => {
243                                let new_receiver_id =
244                                    self.ipc_receiver_set.add_opaque(receiver).unwrap();
245                                self.handlers.insert(new_receiver_id, handler);
246                            },
247                            RouterMsg::Shutdown(sender) => {
248                                sender
249                                    .send(())
250                                    .expect("Failed to send comfirmation of shutdown.");
251                                return;
252                            },
253                        }
254                    },
255                    // Event from one of our registered receivers, call callback.
256                    IpcSelectionResult::MessageReceived(id, message) => {
257                        match self.handlers.get_mut(&id).unwrap() {
258                            RouterHandler::Once(handler) => {
259                                if let Some(handler) = handler.take() {
260                                    (handler)(message);
261                                }
262                            },
263                            RouterHandler::Multi(ref mut handler) => {
264                                (handler)(message);
265                            },
266                        }
267                    },
268                    IpcSelectionResult::ChannelClosed(id) => {
269                        let _ = self.handlers.remove(&id).unwrap();
270                    },
271                }
272            }
273        }
274    }
275}
276
277enum RouterMsg {
278    /// Register the receiver OpaqueIpcReceiver for listening for events on.
279    /// When a message comes from this receiver, call RouterHandler.
280    AddRoute(OpaqueIpcReceiver, RouterHandler),
281    /// Shutdown the router, providing a sender to send an acknowledgement.
282    Shutdown(Sender<()>),
283}
284
285/// Function to call when a new event is received from the corresponding receiver.
286pub type RouterMultiHandler = Box<dyn FnMut(IpcMessage) + Send>;
287
288/// Function to call the first time that a message is received from the corresponding receiver.
289pub type RouterOneShotHandler = Box<dyn FnOnce(IpcMessage) + Send>;
290
291enum RouterHandler {
292    Once(Option<RouterOneShotHandler>),
293    Multi(RouterMultiHandler),
294}
295
296/// Like [RouterMultiHandler] but includes the type that will be passed to the callback
297pub type TypedRouterMultiHandler<T> = Box<dyn FnMut(Result<T, SerDeError>) + Send>;
298
299/// Like [RouterOneShotHandler] but includes the type that will be passed to the callback
300pub type TypedRouterOneShotHandler<T> = Box<dyn FnOnce(Result<T, SerDeError>) + Send>;