ipc_channel/
router.rs

1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10//! Routers allow converting IPC channels to crossbeam channels.
11//! The [RouterProxy] provides various methods to register
12//! `IpcReceiver<T>`s. The router will then either call the appropriate callback or route the
13//! message to a crossbeam `Sender<T>` or `Receiver<T>`. You should use the global `ROUTER` to
14//! access the `RouterProxy` methods (via `ROUTER`'s `Deref` for `RouterProxy`.
15
16use std::collections::HashMap;
17use std::sync::{LazyLock, Mutex};
18use std::thread;
19
20use crate::ipc::OpaqueIpcReceiver;
21use crate::ipc::{self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender};
22use crossbeam_channel::{self, Receiver, Sender};
23use serde::{Deserialize, Serialize};
24
25/// Global object wrapping a `RouterProxy`.
26/// Add routes ([add_route](RouterProxy::add_route)), or convert IpcReceiver<T>
27/// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver))
28pub static ROUTER: LazyLock<RouterProxy> = LazyLock::new(RouterProxy::new);
29
30/// A `RouterProxy` provides methods for talking to the router. Calling
31/// [new](RouterProxy::new) automatically spins up a router thread which
32/// waits for events on its registered `IpcReceiver<T>`s. The `RouterProxy`'s
33/// methods communicate with the running router thread to register new
34/// `IpcReceiver<T>`'s
35pub struct RouterProxy {
36    comm: Mutex<RouterProxyComm>,
37}
38
39#[allow(clippy::new_without_default)]
40impl RouterProxy {
41    pub fn new() -> RouterProxy {
42        // Router acts like a receiver, running in its own thread with both
43        // receiver ends.
44        // Router proxy takes both sending ends.
45        let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
46        let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap();
47        thread::Builder::new()
48            .name("router-proxy".to_string())
49            .spawn(move || Router::new(msg_receiver, wakeup_receiver).run())
50            .expect("Failed to spawn router proxy thread");
51        RouterProxy {
52            comm: Mutex::new(RouterProxyComm {
53                msg_sender,
54                wakeup_sender,
55                shutdown: false,
56            }),
57        }
58    }
59
60    /// Add a new (receiver, callback) pair to the router, and send a wakeup message
61    /// to the router.
62    ///
63    /// Consider using [add_typed_route](Self::add_typed_route) instead, which prevents
64    /// mismatches between the receiver and callback types.
65    #[deprecated(since = "0.19.0", note = "please use 'add_typed_route' instead")]
66    pub fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
67        let comm = self.comm.lock().unwrap();
68
69        if comm.shutdown {
70            return;
71        }
72
73        comm.msg_sender
74            .send(RouterMsg::AddRoute(receiver, callback))
75            .unwrap();
76        comm.wakeup_sender.send(()).unwrap();
77    }
78
79    /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
80    /// to the router.
81    ///
82    /// Unlike [add_route](Self::add_route) this method is strongly typed and guarantees
83    /// that the `receiver` and the `callback` use the same message type.
84    pub fn add_typed_route<T>(&self, receiver: IpcReceiver<T>, mut callback: TypedRouterHandler<T>)
85    where
86        T: Serialize + for<'de> Deserialize<'de> + 'static,
87    {
88        // Before passing the message on to the callback, turn it into the appropriate type
89        let modified_callback = move |msg: IpcMessage| {
90            let typed_message = msg.to::<T>();
91            callback(typed_message)
92        };
93
94        #[allow(deprecated)]
95        self.add_route(receiver.to_opaque(), Box::new(modified_callback));
96    }
97
98    /// Send a shutdown message to the router containing a ACK sender,
99    /// send a wakeup message to the router, and block on the ACK.
100    /// Calling it is idempotent,
101    /// which can be useful when running a multi-process system in single-process mode.
102    pub fn shutdown(&self) {
103        let mut comm = self.comm.lock().unwrap();
104
105        if comm.shutdown {
106            return;
107        }
108        comm.shutdown = true;
109
110        let (ack_sender, ack_receiver) = crossbeam_channel::unbounded();
111        comm.wakeup_sender
112            .send(())
113            .map(|_| {
114                comm.msg_sender
115                    .send(RouterMsg::Shutdown(ack_sender))
116                    .unwrap();
117                ack_receiver.recv().unwrap();
118            })
119            .unwrap();
120    }
121
122    /// A convenience function to route an `IpcReceiver<T>` to an existing `Sender<T>`.
123    pub fn route_ipc_receiver_to_crossbeam_sender<T>(
124        &self,
125        ipc_receiver: IpcReceiver<T>,
126        crossbeam_sender: Sender<T>,
127    ) where
128        T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
129    {
130        self.add_typed_route(
131            ipc_receiver,
132            Box::new(move |message| drop(crossbeam_sender.send(message.unwrap()))),
133        )
134    }
135
136    /// A convenience function to route an `IpcReceiver<T>` to a `Receiver<T>`: the most common
137    /// use of a `Router`.
138    pub fn route_ipc_receiver_to_new_crossbeam_receiver<T>(
139        &self,
140        ipc_receiver: IpcReceiver<T>,
141    ) -> Receiver<T>
142    where
143        T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
144    {
145        let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
146        self.route_ipc_receiver_to_crossbeam_sender(ipc_receiver, crossbeam_sender);
147        crossbeam_receiver
148    }
149}
150
151struct RouterProxyComm {
152    msg_sender: Sender<RouterMsg>,
153    wakeup_sender: IpcSender<()>,
154    shutdown: bool,
155}
156
157/// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet
158/// and listens for events using select().
159struct Router {
160    /// Get messages from RouterProxy.
161    msg_receiver: Receiver<RouterMsg>,
162    /// The ID/index of the special channel we use to identify messages from msg_receiver.
163    msg_wakeup_id: u64,
164    /// Set of all receivers which have been registered for us to select on.
165    ipc_receiver_set: IpcReceiverSet,
166    /// Maps ids to their handler functions.
167    handlers: HashMap<u64, RouterHandler>,
168}
169
170impl Router {
171    fn new(msg_receiver: Receiver<RouterMsg>, wakeup_receiver: IpcReceiver<()>) -> Router {
172        let mut ipc_receiver_set = IpcReceiverSet::new().unwrap();
173        let msg_wakeup_id = ipc_receiver_set.add(wakeup_receiver).unwrap();
174        Router {
175            msg_receiver,
176            msg_wakeup_id,
177            ipc_receiver_set,
178            handlers: HashMap::new(),
179        }
180    }
181
182    /// Continuously loop waiting for wakeup signals from router proxy.
183    /// Iterate over events either:
184    /// 1) If a message comes in from our special `wakeup_receiver` (identified through
185    ///    msg_wakeup_id. Read message from `msg_receiver` and add a new receiver
186    ///    to our receiver set.
187    /// 2) Call appropriate handler based on message id.
188    /// 3) Remove handler once channel closes.
189    fn run(&mut self) {
190        loop {
191            // Wait for events to come from our select() new channels are added to
192            // our ReceiverSet below.
193            let results = match self.ipc_receiver_set.select() {
194                Ok(results) => results,
195                Err(_) => break,
196            };
197
198            // Iterate over numerous events that were ready at this time.
199            for result in results.into_iter() {
200                match result {
201                    // Message came from the RouterProxy. Listen on our `msg_receiver`
202                    // channel.
203                    IpcSelectionResult::MessageReceived(id, _) if id == self.msg_wakeup_id => {
204                        match self.msg_receiver.recv().unwrap() {
205                            RouterMsg::AddRoute(receiver, handler) => {
206                                let new_receiver_id =
207                                    self.ipc_receiver_set.add_opaque(receiver).unwrap();
208                                self.handlers.insert(new_receiver_id, handler);
209                            },
210                            RouterMsg::Shutdown(sender) => {
211                                sender
212                                    .send(())
213                                    .expect("Failed to send comfirmation of shutdown.");
214                                break;
215                            },
216                        }
217                    },
218                    // Event from one of our registered receivers, call callback.
219                    IpcSelectionResult::MessageReceived(id, message) => {
220                        self.handlers.get_mut(&id).unwrap()(message)
221                    },
222                    IpcSelectionResult::ChannelClosed(id) => {
223                        let _ = self.handlers.remove(&id).unwrap();
224                    },
225                }
226            }
227        }
228    }
229}
230
231enum RouterMsg {
232    /// Register the receiver OpaqueIpcReceiver for listening for events on.
233    /// When a message comes from this receiver, call RouterHandler.
234    AddRoute(OpaqueIpcReceiver, RouterHandler),
235    /// Shutdown the router, providing a sender to send an acknowledgement.
236    Shutdown(Sender<()>),
237}
238
239/// Function to call when a new event is received from the corresponding receiver.
240pub type RouterHandler = Box<dyn FnMut(IpcMessage) + Send>;
241
242/// Like [RouterHandler] but includes the type that will be passed to the callback
243pub type TypedRouterHandler<T> = Box<dyn FnMut(Result<T, bincode::Error>) + Send>;