zng_view_api/
ipc.rs

1//! IPC types.
2
3use std::{fmt, ops::Deref, time::Duration};
4
5use crate::{AnyResult, Event, Request, Response};
6
7#[cfg(ipc)]
8use ipc_channel::ipc::{IpcOneShotServer, IpcReceiver, IpcSender, channel};
9
10#[cfg(not(ipc))]
11use flume::unbounded as channel;
12
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use zng_txt::Txt;
16
17pub(crate) type IpcResult<T> = std::result::Result<T, ViewChannelError>;
18
19/// Bytes sender.
20///
21/// Use [`bytes_channel`] to create.
22#[cfg_attr(ipc, derive(serde::Serialize, serde::Deserialize))]
23pub struct IpcBytesSender {
24    #[cfg(ipc)]
25    sender: ipc_channel::ipc::IpcBytesSender,
26    #[cfg(not(ipc))]
27    sender: flume::Sender<Vec<u8>>,
28}
29impl IpcBytesSender {
30    /// Send a byte package.
31    pub fn send(&self, bytes: Vec<u8>) -> IpcResult<()> {
32        #[cfg(ipc)]
33        {
34            self.sender.send(&bytes).map_err(handle_io_error)
35        }
36
37        #[cfg(not(ipc))]
38        self.sender.send(bytes).map_err(handle_send_error)
39    }
40}
41impl fmt::Debug for IpcBytesSender {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        write!(f, "IpcBytesSender")
44    }
45}
46
47/// Bytes receiver.
48///
49/// Use [`bytes_channel`] to create.
50#[cfg_attr(ipc, derive(serde::Serialize, serde::Deserialize))]
51pub struct IpcBytesReceiver {
52    #[cfg(ipc)]
53    recv: ipc_channel::ipc::IpcBytesReceiver,
54    #[cfg(not(ipc))]
55    recv: flume::Receiver<Vec<u8>>,
56}
57impl IpcBytesReceiver {
58    /// Receive a bytes package.
59    pub fn recv(&self) -> IpcResult<Vec<u8>> {
60        self.recv.recv().map_err(handle_recv_error)
61    }
62}
63impl fmt::Debug for IpcBytesReceiver {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        write!(f, "IpcBytesReceiver")
66    }
67}
68
69/// Create a bytes channel.
70#[cfg(ipc)]
71pub fn bytes_channel() -> (IpcBytesSender, IpcBytesReceiver) {
72    let (sender, recv) = ipc_channel::ipc::bytes_channel().unwrap();
73    (IpcBytesSender { sender }, IpcBytesReceiver { recv })
74}
75
76/// Create a bytes channel.
77#[cfg(not(ipc))]
78pub fn bytes_channel() -> (IpcBytesSender, IpcBytesReceiver) {
79    let (sender, recv) = flume::unbounded();
80    (IpcBytesSender { sender }, IpcBytesReceiver { recv })
81}
82
83#[cfg(not(ipc))]
84mod arc_bytes {
85    pub fn serialize<S>(bytes: &std::sync::Arc<Vec<u8>>, serializer: S) -> Result<S::Ok, S::Error>
86    where
87        S: serde::Serializer,
88    {
89        serde_bytes::serialize(&bytes[..], serializer)
90    }
91    pub fn deserialize<'de, D>(deserializer: D) -> Result<std::sync::Arc<Vec<u8>>, D::Error>
92    where
93        D: serde::Deserializer<'de>,
94    {
95        Ok(std::sync::Arc::new(serde_bytes::deserialize(deserializer)?))
96    }
97}
98
99/// Immutable shared memory that can be send fast over IPC.
100///
101/// # `not(feature="ipc")`
102///
103/// If the default `"ipc"` feature is disabled this is only a `Vec<u8>`.
104#[derive(Clone, Serialize, Deserialize)]
105pub struct IpcBytes {
106    // `IpcSharedMemory` cannot have zero length, we use `None` in this case.
107    #[cfg(ipc)]
108    bytes: Option<ipc_channel::ipc::IpcSharedMemory>,
109    // `IpcSharedMemory` only clones a pointer.
110    #[cfg(not(ipc))]
111    #[serde(with = "arc_bytes")]
112    bytes: std::sync::Arc<Vec<u8>>,
113}
114/// Pointer equal.
115impl PartialEq for IpcBytes {
116    #[cfg(not(ipc))]
117    fn eq(&self, other: &Self) -> bool {
118        std::sync::Arc::ptr_eq(&self.bytes, &other.bytes)
119    }
120
121    #[cfg(ipc)]
122    fn eq(&self, other: &Self) -> bool {
123        match (&self.bytes, &other.bytes) {
124            (None, None) => true,
125            (Some(a), Some(b)) => a.as_ptr() == b.as_ptr(),
126            _ => false,
127        }
128    }
129}
130impl IpcBytes {
131    /// Copy the `bytes` to a new shared memory allocation.
132    pub fn from_slice(bytes: &[u8]) -> Self {
133        IpcBytes {
134            #[cfg(ipc)]
135            bytes: {
136                if bytes.is_empty() {
137                    None
138                } else {
139                    Some(ipc_channel::ipc::IpcSharedMemory::from_bytes(bytes))
140                }
141            },
142            #[cfg(not(ipc))]
143            bytes: std::sync::Arc::new(bytes.to_vec()),
144        }
145    }
146
147    /// If the `"ipc"` feature is enabled copy the bytes to a new shared memory region, if not
148    /// just wraps the `bytes` in a shared pointer.
149    pub fn from_vec(bytes: Vec<u8>) -> Self {
150        #[cfg(ipc)]
151        {
152            Self::from_slice(&bytes)
153        }
154
155        #[cfg(not(ipc))]
156        IpcBytes {
157            bytes: std::sync::Arc::new(bytes),
158        }
159    }
160
161    /// Copy the shared bytes to a new vec.
162    ///
163    /// If the `"ipc"` feature is not enabled and `self` is the only reference this operation is zero-cost.
164    pub fn to_vec(self) -> Vec<u8> {
165        #[cfg(ipc)]
166        {
167            self.bytes.map(|s| s.to_vec()).unwrap_or_default()
168        }
169        #[cfg(not(ipc))]
170        {
171            match std::sync::Arc::try_unwrap(self.bytes) {
172                Ok(d) => d,
173                Err(a) => a.as_ref().to_vec(),
174            }
175        }
176    }
177
178    /// Returns the underlying shared memory reference, if the bytes are not zero-length.
179    #[cfg(ipc)]
180    pub fn ipc_shared_memory(&self) -> Option<ipc_channel::ipc::IpcSharedMemory> {
181        self.bytes.clone()
182    }
183
184    /// Returns the underlying shared reference.
185    #[cfg(not(ipc))]
186    pub fn arc(&self) -> std::sync::Arc<Vec<u8>> {
187        self.bytes.clone()
188    }
189}
190impl Deref for IpcBytes {
191    type Target = [u8];
192
193    fn deref(&self) -> &Self::Target {
194        #[cfg(ipc)]
195        return if let Some(bytes) = &self.bytes { bytes } else { &[] };
196
197        #[cfg(not(ipc))]
198        &self.bytes
199    }
200}
201impl fmt::Debug for IpcBytes {
202    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203        write!(f, "IpcBytes(<{} bytes>)", self.len())
204    }
205}
206
207#[cfg(not(ipc))]
208type IpcSender<T> = flume::Sender<T>;
209#[cfg(not(ipc))]
210type IpcReceiver<T> = flume::Receiver<T>;
211
212/// IPC channel with view-process error.
213#[derive(Debug, Clone, PartialEq)]
214#[non_exhaustive]
215pub enum ViewChannelError {
216    /// IPC channel disconnected.
217    Disconnected,
218}
219impl fmt::Display for ViewChannelError {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        write!(f, "ipc channel disconnected")
222    }
223}
224impl std::error::Error for ViewChannelError {}
225
226/// Call `new`, then spawn the view-process using the `name` then call `connect`.
227#[cfg(ipc)]
228pub(crate) struct AppInit {
229    server: IpcOneShotServer<AppInitMsg>,
230    name: Txt,
231}
232#[cfg(ipc)]
233impl AppInit {
234    pub fn new() -> Self {
235        let (server, name) = IpcOneShotServer::new().expect("failed to create init channel");
236        AppInit {
237            server,
238            name: Txt::from_str(&name),
239        }
240    }
241
242    /// Unique name for the view-process to find this channel.
243    pub fn name(&self) -> &str {
244        &self.name
245    }
246
247    /// Tries to connect to the view-process and receive the actual channels.
248    pub fn connect(self) -> AnyResult<(RequestSender, ResponseReceiver, EventReceiver)> {
249        use crate::view_timeout;
250
251        let (init_sender, init_recv) = flume::bounded(1);
252        let handle = std::thread::Builder::new()
253            .name("connection-init".into())
254            .stack_size(256 * 1024)
255            .spawn(move || {
256                let r = self.server.accept();
257                let _ = init_sender.send(r);
258            })
259            .expect("failed to spawn thread");
260
261        let timeout = view_timeout();
262        let (_, (req_sender, chan_sender)) = init_recv.recv_timeout(Duration::from_secs(timeout)).map_err(|e| match e {
263            flume::RecvTimeoutError::Timeout => format!("timeout, did not connect in {timeout}s"),
264            flume::RecvTimeoutError::Disconnected => {
265                std::panic::resume_unwind(handle.join().unwrap_err());
266            }
267        })??;
268        let (rsp_sender, rsp_recv) = channel()?;
269        let (evt_sender, evt_recv) = channel()?;
270        chan_sender.send((rsp_sender, evt_sender))?;
271        Ok((
272            RequestSender(Mutex::new(req_sender)),
273            ResponseReceiver(Mutex::new(rsp_recv)),
274            EventReceiver(Mutex::new(evt_recv)),
275        ))
276    }
277}
278
279/// Start the view-process server and waits for `(request, response, event)`.
280#[cfg(ipc)]
281pub fn connect_view_process(server_name: Txt) -> IpcResult<ViewChannels> {
282    let _s = tracing::trace_span!("connect_view_process").entered();
283
284    let app_init_sender = IpcSender::connect(server_name.into_owned()).expect("failed to connect to init channel");
285
286    let (req_sender, req_recv) = channel().map_err(handle_io_error)?;
287    // Large messages can only be received in a receiver created in the same process that is receiving (on Windows)
288    // so we create a channel to transfer the response and event senders.
289    // See issue: https://github.com/servo/ipc-channel/issues/277
290    let (chan_sender, chan_recv) = channel().map_err(handle_io_error)?;
291
292    app_init_sender.send((req_sender, chan_sender)).map_err(handle_send_error)?;
293    let (rsp_sender, evt_sender) = chan_recv.recv().map_err(handle_recv_error)?;
294
295    Ok(ViewChannels {
296        request_receiver: RequestReceiver(Mutex::new(req_recv)),
297        response_sender: ResponseSender(Mutex::new(rsp_sender)),
298        event_sender: EventSender(Mutex::new(evt_sender)),
299    })
300}
301
302/// (
303///    RequestSender,
304///    Workaround-sender-for-response-channel,
305///    EventReceiver,
306/// )
307type AppInitMsg = (IpcSender<Request>, IpcSender<(IpcSender<Response>, IpcSender<Event>)>);
308
309#[cfg(not(ipc))]
310pub(crate) struct AppInit {
311    // (
312    //    RequestSender,
313    //    Workaround-sender-for-response-channel,
314    //    EventReceiver,
315    // )
316    init: flume::Receiver<AppInitMsg>,
317    name: Txt,
318}
319#[cfg(not(ipc))]
320mod name_map {
321    use std::{
322        collections::HashMap,
323        sync::{Mutex, OnceLock},
324    };
325
326    use zng_txt::Txt;
327
328    use super::AppInitMsg;
329
330    type Map = Mutex<HashMap<Txt, flume::Sender<AppInitMsg>>>;
331
332    pub fn get() -> &'static Map {
333        static MAP: OnceLock<Map> = OnceLock::new();
334        MAP.get_or_init(Map::default)
335    }
336}
337#[cfg(not(ipc))]
338impl AppInit {
339    pub fn new() -> Self {
340        use std::sync::atomic::{AtomicU32, Ordering};
341        use zng_txt::formatx;
342
343        static NAME_COUNT: AtomicU32 = AtomicU32::new(0);
344
345        let name = formatx!("<not-ipc-{}>", NAME_COUNT.fetch_add(1, Ordering::Relaxed));
346
347        let (init_sender, init_recv) = flume::bounded(1);
348
349        name_map::get().lock().unwrap().insert(name.clone(), init_sender);
350
351        AppInit { name, init: init_recv }
352    }
353
354    pub fn name(&self) -> &str {
355        &self.name
356    }
357
358    /// Tries to connect to the view-process and receive the actual channels.
359    pub fn connect(self) -> AnyResult<(RequestSender, ResponseReceiver, EventReceiver)> {
360        let (req_sender, chan_sender) = self.init.recv_timeout(Duration::from_secs(5)).map_err(|e| match e {
361            flume::RecvTimeoutError::Timeout => "timeout, did not connect in 5s",
362            flume::RecvTimeoutError::Disconnected => panic!("disconnected"),
363        })?;
364        let (rsp_sender, rsp_recv) = flume::unbounded();
365        let (evt_sender, evt_recv) = flume::unbounded();
366        chan_sender.send((rsp_sender, evt_sender))?;
367        Ok((
368            RequestSender(Mutex::new(req_sender)),
369            ResponseReceiver(Mutex::new(rsp_recv)),
370            EventReceiver(Mutex::new(evt_recv)),
371        ))
372    }
373}
374
375/// Start the view-process server and waits for `(request, response, event)`.
376#[cfg(not(ipc))]
377pub fn connect_view_process(server_name: Txt) -> IpcResult<ViewChannels> {
378    let app_init_sender = name_map::get().lock().unwrap().remove(&server_name).unwrap();
379
380    let (req_sender, req_recv) = channel();
381    let (chan_sender, chan_recv) = channel();
382
383    app_init_sender.send((req_sender, chan_sender)).map_err(handle_send_error)?;
384    let (rsp_sender, evt_sender) = chan_recv.recv().map_err(handle_recv_error)?;
385
386    Ok(ViewChannels {
387        request_receiver: RequestReceiver(Mutex::new(req_recv)),
388        response_sender: ResponseSender(Mutex::new(rsp_sender)),
389        event_sender: EventSender(Mutex::new(evt_sender)),
390    })
391}
392
393/// Channels that must be used for implementing a view-process.
394pub struct ViewChannels {
395    /// View implementers must receive requests from this channel, call [`Api::respond`] and then
396    /// return the response using the `response_sender`.
397    ///
398    /// [`Api::respond`]: crate::Api::respond
399    pub request_receiver: RequestReceiver,
400
401    /// View implementers must synchronously send one response per request received in `request_receiver`.
402    pub response_sender: ResponseSender,
403
404    /// View implements must send events using this channel. Events can be asynchronous.
405    pub event_sender: EventSender,
406}
407
408pub(crate) struct RequestSender(Mutex<IpcSender<Request>>);
409impl RequestSender {
410    pub fn send(&mut self, req: Request) -> IpcResult<()> {
411        self.0.get_mut().send(req).map_err(handle_send_error)
412    }
413}
414
415/// Requests channel end-point.
416///
417/// View-process implementers must receive [`Request`], call [`Api::respond`] and then use a [`ResponseSender`]
418/// to send back the response.
419///
420/// [`Api::respond`]: crate::Api::respond
421pub struct RequestReceiver(Mutex<IpcReceiver<Request>>); // Mutex for Sync
422impl RequestReceiver {
423    /// Receive one [`Request`].
424    pub fn recv(&mut self) -> IpcResult<Request> {
425        self.0.get_mut().recv().map_err(handle_recv_error)
426    }
427}
428
429/// Responses channel entry-point.
430///
431/// View-process implementers must send [`Response`] returned by [`Api::respond`] using this sender.
432///
433/// Requests are received using [`RequestReceiver`] a response must be send for each request, synchronously.
434///
435/// [`Api::respond`]: crate::Api::respond
436pub struct ResponseSender(Mutex<IpcSender<Response>>); // Mutex for Sync
437impl ResponseSender {
438    /// Send a response.
439    ///
440    /// # Panics
441    ///
442    /// If the `rsp` is not [`must_be_send`].
443    ///
444    /// [`must_be_send`]: Response::must_be_send
445    pub fn send(&mut self, rsp: Response) -> IpcResult<()> {
446        assert!(rsp.must_be_send());
447        self.0.get_mut().send(rsp).map_err(handle_send_error)
448    }
449}
450pub(crate) struct ResponseReceiver(Mutex<IpcReceiver<Response>>);
451impl ResponseReceiver {
452    pub fn recv(&mut self) -> IpcResult<Response> {
453        self.0.get_mut().recv().map_err(handle_recv_error)
454    }
455}
456
457/// Event channel entry-point.
458///
459/// View-process implementers must send [`Event`] messages using this sender. The events
460/// can be asynchronous, not related to the [`Api::respond`] calls.
461///
462/// [`Api::respond`]: crate::Api::respond
463pub struct EventSender(Mutex<IpcSender<Event>>);
464impl EventSender {
465    /// Send an event notification.
466    pub fn send(&mut self, ev: Event) -> IpcResult<()> {
467        self.0.get_mut().send(ev).map_err(handle_send_error)
468    }
469}
470pub(crate) struct EventReceiver(Mutex<IpcReceiver<Event>>);
471impl EventReceiver {
472    pub fn recv(&mut self) -> IpcResult<Event> {
473        self.0.get_mut().recv().map_err(handle_recv_error)
474    }
475
476    #[cfg(ipc)]
477    pub fn recv_timeout(&mut self, duration: Duration) -> IpcResult<Option<Event>> {
478        match self.0.get_mut().try_recv_timeout(duration) {
479            Ok(ev) => Ok(Some(ev)),
480            Err(e) => match e {
481                ipc_channel::ipc::TryRecvError::IpcError(ipc_error) => Err(handle_recv_error(ipc_error)),
482                ipc_channel::ipc::TryRecvError::Empty => Ok(None),
483            },
484        }
485    }
486
487    #[cfg(not(ipc))]
488    pub fn recv_timeout(&mut self, duration: Duration) -> IpcResult<Option<Event>> {
489        match self.0.get_mut().recv_timeout(duration) {
490            Ok(ev) => Ok(Some(ev)),
491            Err(e) => match e {
492                flume::RecvTimeoutError::Timeout => Ok(None),
493                flume::RecvTimeoutError::Disconnected => Err(ViewChannelError::Disconnected),
494            },
495        }
496    }
497}
498
499#[cfg(ipc)]
500fn handle_recv_error(e: ipc_channel::ipc::IpcError) -> ViewChannelError {
501    match e {
502        ipc_channel::ipc::IpcError::Disconnected => ViewChannelError::Disconnected,
503        e => {
504            tracing::error!("IO or bincode error: {e:?}");
505            ViewChannelError::Disconnected
506        }
507    }
508}
509#[cfg(not(ipc))]
510fn handle_recv_error(e: flume::RecvError) -> ViewChannelError {
511    match e {
512        flume::RecvError::Disconnected => ViewChannelError::Disconnected,
513    }
514}
515
516#[cfg(ipc)]
517#[expect(clippy::boxed_local)]
518fn handle_send_error(e: ipc_channel::Error) -> ViewChannelError {
519    match *e {
520        ipc_channel::ErrorKind::Io(e) => {
521            if e.kind() == std::io::ErrorKind::BrokenPipe {
522                return ViewChannelError::Disconnected;
523            }
524            #[cfg(windows)]
525            if e.raw_os_error() == Some(-2147024664) {
526                // 0x800700E8 - "The pipe is being closed."
527                return ViewChannelError::Disconnected;
528            }
529            #[cfg(target_os = "macos")]
530            if e.kind() == std::io::ErrorKind::NotFound && format!("{e:?}") == "Custom { kind: NotFound, error: SendInvalidDest }" {
531                // this error happens in the same test that on Windows is 0x800700E8 and on Ubuntu is BrokenPipe
532                return ViewChannelError::Disconnected;
533            }
534            panic!("unexpected IO error: {e:?}")
535        }
536        e => panic!("serialization error: {e:?}"),
537    }
538}
539
540#[cfg(not(ipc))]
541fn handle_send_error<T>(_: flume::SendError<T>) -> ViewChannelError {
542    ViewChannelError::Disconnected
543}
544
545#[cfg(ipc)]
546fn handle_io_error(e: std::io::Error) -> ViewChannelError {
547    match e.kind() {
548        std::io::ErrorKind::BrokenPipe => ViewChannelError::Disconnected,
549        e => panic!("unexpected IO error: {e:?}"),
550    }
551}