1use std::{fmt, ops::Deref, time::Duration};
4
5use crate::{AnyResult, Event, Request, Response};
6
7#[cfg(ipc)]
8use ipc_channel::ipc::{IpcOneShotServer, IpcReceiver, IpcSender, channel};
9
10#[cfg(not(ipc))]
11use flume::unbounded as channel;
12
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use zng_txt::Txt;
16
17pub(crate) type IpcResult<T> = std::result::Result<T, ViewChannelError>;
18
19#[cfg_attr(ipc, derive(serde::Serialize, serde::Deserialize))]
23pub struct IpcBytesSender {
24 #[cfg(ipc)]
25 sender: ipc_channel::ipc::IpcBytesSender,
26 #[cfg(not(ipc))]
27 sender: flume::Sender<Vec<u8>>,
28}
29impl IpcBytesSender {
30 pub fn send(&self, bytes: Vec<u8>) -> IpcResult<()> {
32 #[cfg(ipc)]
33 {
34 self.sender.send(&bytes).map_err(handle_io_error)
35 }
36
37 #[cfg(not(ipc))]
38 self.sender.send(bytes).map_err(handle_send_error)
39 }
40}
41impl fmt::Debug for IpcBytesSender {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 write!(f, "IpcBytesSender")
44 }
45}
46
47#[cfg_attr(ipc, derive(serde::Serialize, serde::Deserialize))]
51pub struct IpcBytesReceiver {
52 #[cfg(ipc)]
53 recv: ipc_channel::ipc::IpcBytesReceiver,
54 #[cfg(not(ipc))]
55 recv: flume::Receiver<Vec<u8>>,
56}
57impl IpcBytesReceiver {
58 pub fn recv(&self) -> IpcResult<Vec<u8>> {
60 self.recv.recv().map_err(handle_recv_error)
61 }
62}
63impl fmt::Debug for IpcBytesReceiver {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 write!(f, "IpcBytesReceiver")
66 }
67}
68
69#[cfg(ipc)]
71pub fn bytes_channel() -> (IpcBytesSender, IpcBytesReceiver) {
72 let (sender, recv) = ipc_channel::ipc::bytes_channel().unwrap();
73 (IpcBytesSender { sender }, IpcBytesReceiver { recv })
74}
75
76#[cfg(not(ipc))]
78pub fn bytes_channel() -> (IpcBytesSender, IpcBytesReceiver) {
79 let (sender, recv) = flume::unbounded();
80 (IpcBytesSender { sender }, IpcBytesReceiver { recv })
81}
82
83#[cfg(not(ipc))]
84mod arc_bytes {
85 pub fn serialize<S>(bytes: &std::sync::Arc<Vec<u8>>, serializer: S) -> Result<S::Ok, S::Error>
86 where
87 S: serde::Serializer,
88 {
89 serde_bytes::serialize(&bytes[..], serializer)
90 }
91 pub fn deserialize<'de, D>(deserializer: D) -> Result<std::sync::Arc<Vec<u8>>, D::Error>
92 where
93 D: serde::Deserializer<'de>,
94 {
95 Ok(std::sync::Arc::new(serde_bytes::deserialize(deserializer)?))
96 }
97}
98
99#[derive(Clone, Serialize, Deserialize)]
105pub struct IpcBytes {
106 #[cfg(ipc)]
108 bytes: Option<ipc_channel::ipc::IpcSharedMemory>,
109 #[cfg(not(ipc))]
111 #[serde(with = "arc_bytes")]
112 bytes: std::sync::Arc<Vec<u8>>,
113}
114impl PartialEq for IpcBytes {
116 #[cfg(not(ipc))]
117 fn eq(&self, other: &Self) -> bool {
118 std::sync::Arc::ptr_eq(&self.bytes, &other.bytes)
119 }
120
121 #[cfg(ipc)]
122 fn eq(&self, other: &Self) -> bool {
123 match (&self.bytes, &other.bytes) {
124 (None, None) => true,
125 (Some(a), Some(b)) => a.as_ptr() == b.as_ptr(),
126 _ => false,
127 }
128 }
129}
130impl IpcBytes {
131 pub fn from_slice(bytes: &[u8]) -> Self {
133 IpcBytes {
134 #[cfg(ipc)]
135 bytes: {
136 if bytes.is_empty() {
137 None
138 } else {
139 Some(ipc_channel::ipc::IpcSharedMemory::from_bytes(bytes))
140 }
141 },
142 #[cfg(not(ipc))]
143 bytes: std::sync::Arc::new(bytes.to_vec()),
144 }
145 }
146
147 pub fn from_vec(bytes: Vec<u8>) -> Self {
150 #[cfg(ipc)]
151 {
152 Self::from_slice(&bytes)
153 }
154
155 #[cfg(not(ipc))]
156 IpcBytes {
157 bytes: std::sync::Arc::new(bytes),
158 }
159 }
160
161 pub fn to_vec(self) -> Vec<u8> {
165 #[cfg(ipc)]
166 {
167 self.bytes.map(|s| s.to_vec()).unwrap_or_default()
168 }
169 #[cfg(not(ipc))]
170 {
171 match std::sync::Arc::try_unwrap(self.bytes) {
172 Ok(d) => d,
173 Err(a) => a.as_ref().to_vec(),
174 }
175 }
176 }
177
178 #[cfg(ipc)]
180 pub fn ipc_shared_memory(&self) -> Option<ipc_channel::ipc::IpcSharedMemory> {
181 self.bytes.clone()
182 }
183
184 #[cfg(not(ipc))]
186 pub fn arc(&self) -> std::sync::Arc<Vec<u8>> {
187 self.bytes.clone()
188 }
189}
190impl Deref for IpcBytes {
191 type Target = [u8];
192
193 fn deref(&self) -> &Self::Target {
194 #[cfg(ipc)]
195 return if let Some(bytes) = &self.bytes { bytes } else { &[] };
196
197 #[cfg(not(ipc))]
198 &self.bytes
199 }
200}
201impl fmt::Debug for IpcBytes {
202 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203 write!(f, "IpcBytes(<{} bytes>)", self.len())
204 }
205}
206
207#[cfg(not(ipc))]
208type IpcSender<T> = flume::Sender<T>;
209#[cfg(not(ipc))]
210type IpcReceiver<T> = flume::Receiver<T>;
211
212#[derive(Debug, Clone, PartialEq)]
214#[non_exhaustive]
215pub enum ViewChannelError {
216 Disconnected,
218}
219impl fmt::Display for ViewChannelError {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 write!(f, "ipc channel disconnected")
222 }
223}
224impl std::error::Error for ViewChannelError {}
225
226#[cfg(ipc)]
228pub(crate) struct AppInit {
229 server: IpcOneShotServer<AppInitMsg>,
230 name: Txt,
231}
232#[cfg(ipc)]
233impl AppInit {
234 pub fn new() -> Self {
235 let (server, name) = IpcOneShotServer::new().expect("failed to create init channel");
236 AppInit {
237 server,
238 name: Txt::from_str(&name),
239 }
240 }
241
242 pub fn name(&self) -> &str {
244 &self.name
245 }
246
247 pub fn connect(self) -> AnyResult<(RequestSender, ResponseReceiver, EventReceiver)> {
249 use crate::view_timeout;
250
251 let (init_sender, init_recv) = flume::bounded(1);
252 let handle = std::thread::Builder::new()
253 .name("connection-init".into())
254 .stack_size(256 * 1024)
255 .spawn(move || {
256 let r = self.server.accept();
257 let _ = init_sender.send(r);
258 })
259 .expect("failed to spawn thread");
260
261 let timeout = view_timeout();
262 let (_, (req_sender, chan_sender)) = init_recv.recv_timeout(Duration::from_secs(timeout)).map_err(|e| match e {
263 flume::RecvTimeoutError::Timeout => format!("timeout, did not connect in {timeout}s"),
264 flume::RecvTimeoutError::Disconnected => {
265 std::panic::resume_unwind(handle.join().unwrap_err());
266 }
267 })??;
268 let (rsp_sender, rsp_recv) = channel()?;
269 let (evt_sender, evt_recv) = channel()?;
270 chan_sender.send((rsp_sender, evt_sender))?;
271 Ok((
272 RequestSender(Mutex::new(req_sender)),
273 ResponseReceiver(Mutex::new(rsp_recv)),
274 EventReceiver(Mutex::new(evt_recv)),
275 ))
276 }
277}
278
279#[cfg(ipc)]
281pub fn connect_view_process(server_name: Txt) -> IpcResult<ViewChannels> {
282 let _s = tracing::trace_span!("connect_view_process").entered();
283
284 let app_init_sender = IpcSender::connect(server_name.into_owned()).expect("failed to connect to init channel");
285
286 let (req_sender, req_recv) = channel().map_err(handle_io_error)?;
287 let (chan_sender, chan_recv) = channel().map_err(handle_io_error)?;
291
292 app_init_sender.send((req_sender, chan_sender)).map_err(handle_send_error)?;
293 let (rsp_sender, evt_sender) = chan_recv.recv().map_err(handle_recv_error)?;
294
295 Ok(ViewChannels {
296 request_receiver: RequestReceiver(Mutex::new(req_recv)),
297 response_sender: ResponseSender(Mutex::new(rsp_sender)),
298 event_sender: EventSender(Mutex::new(evt_sender)),
299 })
300}
301
302type AppInitMsg = (IpcSender<Request>, IpcSender<(IpcSender<Response>, IpcSender<Event>)>);
308
309#[cfg(not(ipc))]
310pub(crate) struct AppInit {
311 init: flume::Receiver<AppInitMsg>,
317 name: Txt,
318}
319#[cfg(not(ipc))]
320mod name_map {
321 use std::{
322 collections::HashMap,
323 sync::{Mutex, OnceLock},
324 };
325
326 use zng_txt::Txt;
327
328 use super::AppInitMsg;
329
330 type Map = Mutex<HashMap<Txt, flume::Sender<AppInitMsg>>>;
331
332 pub fn get() -> &'static Map {
333 static MAP: OnceLock<Map> = OnceLock::new();
334 MAP.get_or_init(Map::default)
335 }
336}
337#[cfg(not(ipc))]
338impl AppInit {
339 pub fn new() -> Self {
340 use std::sync::atomic::{AtomicU32, Ordering};
341 use zng_txt::formatx;
342
343 static NAME_COUNT: AtomicU32 = AtomicU32::new(0);
344
345 let name = formatx!("<not-ipc-{}>", NAME_COUNT.fetch_add(1, Ordering::Relaxed));
346
347 let (init_sender, init_recv) = flume::bounded(1);
348
349 name_map::get().lock().unwrap().insert(name.clone(), init_sender);
350
351 AppInit { name, init: init_recv }
352 }
353
354 pub fn name(&self) -> &str {
355 &self.name
356 }
357
358 pub fn connect(self) -> AnyResult<(RequestSender, ResponseReceiver, EventReceiver)> {
360 let (req_sender, chan_sender) = self.init.recv_timeout(Duration::from_secs(5)).map_err(|e| match e {
361 flume::RecvTimeoutError::Timeout => "timeout, did not connect in 5s",
362 flume::RecvTimeoutError::Disconnected => panic!("disconnected"),
363 })?;
364 let (rsp_sender, rsp_recv) = flume::unbounded();
365 let (evt_sender, evt_recv) = flume::unbounded();
366 chan_sender.send((rsp_sender, evt_sender))?;
367 Ok((
368 RequestSender(Mutex::new(req_sender)),
369 ResponseReceiver(Mutex::new(rsp_recv)),
370 EventReceiver(Mutex::new(evt_recv)),
371 ))
372 }
373}
374
375#[cfg(not(ipc))]
377pub fn connect_view_process(server_name: Txt) -> IpcResult<ViewChannels> {
378 let app_init_sender = name_map::get().lock().unwrap().remove(&server_name).unwrap();
379
380 let (req_sender, req_recv) = channel();
381 let (chan_sender, chan_recv) = channel();
382
383 app_init_sender.send((req_sender, chan_sender)).map_err(handle_send_error)?;
384 let (rsp_sender, evt_sender) = chan_recv.recv().map_err(handle_recv_error)?;
385
386 Ok(ViewChannels {
387 request_receiver: RequestReceiver(Mutex::new(req_recv)),
388 response_sender: ResponseSender(Mutex::new(rsp_sender)),
389 event_sender: EventSender(Mutex::new(evt_sender)),
390 })
391}
392
393pub struct ViewChannels {
395 pub request_receiver: RequestReceiver,
400
401 pub response_sender: ResponseSender,
403
404 pub event_sender: EventSender,
406}
407
408pub(crate) struct RequestSender(Mutex<IpcSender<Request>>);
409impl RequestSender {
410 pub fn send(&mut self, req: Request) -> IpcResult<()> {
411 self.0.get_mut().send(req).map_err(handle_send_error)
412 }
413}
414
415pub struct RequestReceiver(Mutex<IpcReceiver<Request>>); impl RequestReceiver {
423 pub fn recv(&mut self) -> IpcResult<Request> {
425 self.0.get_mut().recv().map_err(handle_recv_error)
426 }
427}
428
429pub struct ResponseSender(Mutex<IpcSender<Response>>); impl ResponseSender {
438 pub fn send(&mut self, rsp: Response) -> IpcResult<()> {
446 assert!(rsp.must_be_send());
447 self.0.get_mut().send(rsp).map_err(handle_send_error)
448 }
449}
450pub(crate) struct ResponseReceiver(Mutex<IpcReceiver<Response>>);
451impl ResponseReceiver {
452 pub fn recv(&mut self) -> IpcResult<Response> {
453 self.0.get_mut().recv().map_err(handle_recv_error)
454 }
455}
456
457pub struct EventSender(Mutex<IpcSender<Event>>);
464impl EventSender {
465 pub fn send(&mut self, ev: Event) -> IpcResult<()> {
467 self.0.get_mut().send(ev).map_err(handle_send_error)
468 }
469}
470pub(crate) struct EventReceiver(Mutex<IpcReceiver<Event>>);
471impl EventReceiver {
472 pub fn recv(&mut self) -> IpcResult<Event> {
473 self.0.get_mut().recv().map_err(handle_recv_error)
474 }
475
476 #[cfg(ipc)]
477 pub fn recv_timeout(&mut self, duration: Duration) -> IpcResult<Option<Event>> {
478 match self.0.get_mut().try_recv_timeout(duration) {
479 Ok(ev) => Ok(Some(ev)),
480 Err(e) => match e {
481 ipc_channel::ipc::TryRecvError::IpcError(ipc_error) => Err(handle_recv_error(ipc_error)),
482 ipc_channel::ipc::TryRecvError::Empty => Ok(None),
483 },
484 }
485 }
486
487 #[cfg(not(ipc))]
488 pub fn recv_timeout(&mut self, duration: Duration) -> IpcResult<Option<Event>> {
489 match self.0.get_mut().recv_timeout(duration) {
490 Ok(ev) => Ok(Some(ev)),
491 Err(e) => match e {
492 flume::RecvTimeoutError::Timeout => Ok(None),
493 flume::RecvTimeoutError::Disconnected => Err(ViewChannelError::Disconnected),
494 },
495 }
496 }
497}
498
499#[cfg(ipc)]
500fn handle_recv_error(e: ipc_channel::ipc::IpcError) -> ViewChannelError {
501 match e {
502 ipc_channel::ipc::IpcError::Disconnected => ViewChannelError::Disconnected,
503 e => {
504 tracing::error!("IO or bincode error: {e:?}");
505 ViewChannelError::Disconnected
506 }
507 }
508}
509#[cfg(not(ipc))]
510fn handle_recv_error(e: flume::RecvError) -> ViewChannelError {
511 match e {
512 flume::RecvError::Disconnected => ViewChannelError::Disconnected,
513 }
514}
515
516#[cfg(ipc)]
517#[expect(clippy::boxed_local)]
518fn handle_send_error(e: ipc_channel::Error) -> ViewChannelError {
519 match *e {
520 ipc_channel::ErrorKind::Io(e) => {
521 if e.kind() == std::io::ErrorKind::BrokenPipe {
522 return ViewChannelError::Disconnected;
523 }
524 #[cfg(windows)]
525 if e.raw_os_error() == Some(-2147024664) {
526 return ViewChannelError::Disconnected;
528 }
529 #[cfg(target_os = "macos")]
530 if e.kind() == std::io::ErrorKind::NotFound && format!("{e:?}") == "Custom { kind: NotFound, error: SendInvalidDest }" {
531 return ViewChannelError::Disconnected;
533 }
534 panic!("unexpected IO error: {e:?}")
535 }
536 e => panic!("serialization error: {e:?}"),
537 }
538}
539
540#[cfg(not(ipc))]
541fn handle_send_error<T>(_: flume::SendError<T>) -> ViewChannelError {
542 ViewChannelError::Disconnected
543}
544
545#[cfg(ipc)]
546fn handle_io_error(e: std::io::Error) -> ViewChannelError {
547 match e.kind() {
548 std::io::ErrorKind::BrokenPipe => ViewChannelError::Disconnected,
549 e => panic!("unexpected IO error: {e:?}"),
550 }
551}