Skip to main content

zng_view_api/
ipc.rs

1#![cfg_attr(not(ipc), allow(unused))]
2
3//! IPC types.
4
5use std::time::Duration;
6
7use crate::{AnyResult, Event, Request, Response};
8
9use zng_task::channel::{self, ChannelError, IpcReceiver, IpcSender};
10use zng_task::parking_lot::Mutex;
11use zng_txt::Txt;
12
13type AppInitMsg = (
14    channel::IpcReceiver<Request>,
15    channel::IpcSender<Response>,
16    channel::IpcSender<Event>,
17);
18
19/// Call `new`, then spawn the view-process using the `name` then call `connect`.
20pub(crate) struct AppInit {
21    init_sender: channel::NamedIpcSender<AppInitMsg>,
22}
23impl AppInit {
24    pub fn new() -> Self {
25        AppInit {
26            init_sender: channel::NamedIpcSender::new().expect("failed to create init channel"),
27        }
28    }
29
30    /// Unique name for the view-process to find this channel.
31    pub fn name(&self) -> &str {
32        self.init_sender.name()
33    }
34
35    /// Tries to connect to the view-process and receive the actual channels.
36    pub fn connect(self) -> AnyResult<(RequestSender, ResponseReceiver, EventReceiver)> {
37        let mut init_sender = self
38            .init_sender
39            .connect_deadline_blocking(std::time::Duration::from_secs(crate::view_timeout()))?;
40
41        let (req_sender, req_recv) = channel::ipc_unbounded()?;
42        let (rsp_sender, rsp_recv) = channel::ipc_unbounded()?;
43        let (evt_sender, evt_recv) = channel::ipc_unbounded()?;
44        init_sender.send_blocking((req_recv, rsp_sender, evt_sender))?;
45        Ok((
46            RequestSender(Mutex::new(req_sender)),
47            ResponseReceiver(Mutex::new(rsp_recv)),
48            EventReceiver(Mutex::new(evt_recv)),
49        ))
50    }
51}
52
53/// Start the view-process server and waits for `(request, response, event)`.
54pub fn connect_view_process(ipc_sender_name: Txt) -> Result<ViewChannels, channel::ChannelError> {
55    let _s = tracing::trace_span!("connect_view_process").entered();
56
57    let mut init_recv = channel::IpcReceiver::<AppInitMsg>::connect(ipc_sender_name)?;
58
59    let (req_recv, rsp_sender, evt_sender) = init_recv.recv_deadline_blocking(std::time::Duration::from_secs(crate::view_timeout()))?;
60
61    Ok(ViewChannels {
62        request_receiver: RequestReceiver(Mutex::new(req_recv)),
63        response_sender: ResponseSender(Mutex::new(rsp_sender)),
64        event_sender: EventSender(Mutex::new(evt_sender)),
65    })
66}
67
68/// Channels that must be used for implementing a view-process.
69pub struct ViewChannels {
70    /// View implementers must receive requests from this channel, call [`Api::respond`] and then
71    /// return the response using the `response_sender`.
72    ///
73    /// [`Api::respond`]: crate::Api::respond
74    pub request_receiver: RequestReceiver,
75
76    /// View implementers must synchronously send one response per request received in `request_receiver`.
77    pub response_sender: ResponseSender,
78
79    /// View implements must send events using this channel. Events can be asynchronous.
80    pub event_sender: EventSender,
81}
82
83type IpcResult<T> = Result<T, ChannelError>;
84
85pub(crate) struct RequestSender(Mutex<IpcSender<Request>>);
86impl RequestSender {
87    pub fn send(&mut self, req: Request) -> IpcResult<()> {
88        let r = self.0.get_mut().send_blocking(req);
89        if let Err(e) = &r {
90            tracing::debug!("request sender error, {e}");
91        }
92        r
93    }
94}
95impl Drop for RequestSender {
96    fn drop(&mut self) {
97        tracing::trace!("dropped RequestSender");
98    }
99}
100
101/// Requests channel end-point.
102///
103/// View-process implementers must receive [`Request`], call [`Api::respond`] and then use a [`ResponseSender`]
104/// to send back the response.
105///
106/// [`Api::respond`]: crate::Api::respond
107pub struct RequestReceiver(Mutex<IpcReceiver<Request>>); // Mutex for Sync
108impl RequestReceiver {
109    /// Receive one [`Request`].
110    pub fn recv(&mut self) -> IpcResult<Request> {
111        let r = self.0.get_mut().recv_blocking();
112        if let Err(e) = &r {
113            tracing::debug!("request receiver error, {e}");
114        }
115        r
116    }
117}
118impl Drop for RequestReceiver {
119    fn drop(&mut self) {
120        tracing::trace!("dropped RequestReceiver");
121    }
122}
123
124/// Responses channel entry-point.
125///
126/// View-process implementers must send [`Response`] returned by [`Api::respond`] using this sender.
127///
128/// Requests are received using [`RequestReceiver`] a response must be send for each request, synchronously.
129///
130/// [`Api::respond`]: crate::Api::respond
131pub struct ResponseSender(Mutex<IpcSender<Response>>); // Mutex for Sync
132impl ResponseSender {
133    /// Send a response.
134    ///
135    /// # Panics
136    ///
137    /// If the `rsp` is not [`must_be_send`].
138    ///
139    /// [`must_be_send`]: Response::must_be_send
140    pub fn send(&mut self, rsp: Response) -> IpcResult<()> {
141        assert!(rsp.must_be_send());
142        let r = self.0.get_mut().send_blocking(rsp);
143        if let Err(e) = &r {
144            tracing::debug!("response sender error, {e}");
145        }
146        r
147    }
148}
149impl Drop for ResponseSender {
150    fn drop(&mut self) {
151        tracing::trace!("dropped ResponseSender");
152    }
153}
154
155pub(crate) struct ResponseReceiver(Mutex<IpcReceiver<Response>>);
156impl ResponseReceiver {
157    pub fn recv(&mut self) -> IpcResult<Response> {
158        let r = self.0.get_mut().recv_blocking();
159        if let Err(e) = &r {
160            tracing::debug!("response receiver error, {e}");
161        }
162        r
163    }
164}
165impl Drop for ResponseReceiver {
166    fn drop(&mut self) {
167        tracing::trace!("dropped ResponseReceiver");
168    }
169}
170
171/// Event channel entry-point.
172///
173/// View-process implementers must send [`Event`] messages using this sender. The events
174/// can be asynchronous, not related to the [`Api::respond`] calls.
175///
176/// [`Api::respond`]: crate::Api::respond
177pub struct EventSender(Mutex<IpcSender<Event>>);
178impl EventSender {
179    /// Send an event notification.
180    pub fn send(&mut self, ev: Event) -> IpcResult<()> {
181        let r = self.0.get_mut().send_blocking(ev);
182        if let Err(e) = &r {
183            tracing::debug!("event sender error, {e}");
184        }
185        r
186    }
187}
188pub(crate) struct EventReceiver(Mutex<IpcReceiver<Event>>);
189impl EventReceiver {
190    pub fn recv(&mut self) -> IpcResult<Event> {
191        let r = self.0.get_mut().recv_blocking();
192        if let Err(e) = &r {
193            tracing::debug!("event receiver error, {e}");
194        }
195        r
196    }
197
198    pub fn recv_timeout(&mut self, duration: Duration) -> IpcResult<Event> {
199        let r = self.0.get_mut().recv_deadline_blocking(duration);
200        if let Err(e) = &r {
201            match e {
202                ChannelError::Timeout => {}
203                e => tracing::debug!("event receiver error, {e}"),
204            }
205        }
206        r
207    }
208}