1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{fmt, io};
4
5use serde::{Deserialize, Serialize};
6use zng_time::Deadline;
7
8use crate::channel::ChannelError;
9
10pub struct IpcSender<T> {
14 #[cfg(ipc)]
15 sender: Option<ipc_channel::ipc::IpcSender<T>>,
16 #[cfg(not(ipc))]
17 sender: super::Sender<T>,
18}
19impl<T: IpcValue> fmt::Debug for IpcSender<T> {
20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21 f.debug_struct("IpcSender").finish_non_exhaustive()
22 }
23}
24impl<T: IpcValue> Clone for IpcSender<T> {
25 fn clone(&self) -> Self {
26 Self {
27 sender: self.sender.clone(),
28 }
29 }
30}
31impl<T: IpcValue> IpcSender<T> {
32 pub fn send_blocking(&mut self, msg: T) -> Result<(), ChannelError> {
36 #[cfg(ipc)]
37 {
38 let sender = match self.sender.take() {
39 Some(s) => s,
40 None => return Err(ChannelError::disconnected()),
41 };
42 let r = crate::channel::with_ipc_serialization(|| sender.send(msg).map_err(ChannelError::disconnected_by));
43 if r.is_ok() {
44 self.sender = Some(sender);
45 }
46 r
47 }
48 #[cfg(not(ipc))]
49 {
50 self.sender.send_blocking(msg)
51 }
52 }
53}
54impl<T: IpcValue> Serialize for IpcSender<T> {
55 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
56 where
57 S: serde::Serializer,
58 {
59 #[cfg(ipc)]
60 {
61 if !crate::channel::is_ipc_serialization() {
62 return Err(serde::ser::Error::custom("cannot serialize `IpcSender` outside IPC"));
63 }
64 self.sender.serialize(serializer)
65 }
66 #[cfg(not(ipc))]
67 {
68 let _ = serializer;
69 Err(serde::ser::Error::custom("cannot serialize `IpcSender` outside IPC"))
70 }
71 }
72}
73impl<'de, T: IpcValue> Deserialize<'de> for IpcSender<T> {
74 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
75 where
76 D: serde::Deserializer<'de>,
77 {
78 #[cfg(ipc)]
79 {
80 Ok(Self {
81 sender: Option::<ipc_channel::ipc::IpcSender<T>>::deserialize(deserializer)?,
82 })
83 }
84 #[cfg(not(ipc))]
85 {
86 let _ = deserializer;
87 Err(serde::de::Error::custom("cannot deserialize `IpcSender` outside IPC"))
88 }
89 }
90}
91
92pub struct IpcReceiver<T> {
96 #[cfg(ipc)]
97 recv: Option<ipc_channel::ipc::IpcReceiver<T>>,
98 #[cfg(not(ipc))]
99 recv: super::Receiver<T>,
100}
101impl<T: IpcValue> fmt::Debug for IpcReceiver<T> {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 f.debug_struct("IpcReceiver").finish_non_exhaustive()
104 }
105}
106impl<T: IpcValue> IpcReceiver<T> {
107 pub async fn recv(&mut self) -> Result<T, ChannelError> {
111 #[cfg(ipc)]
112 {
113 let recv = match self.recv.take() {
114 Some(r) => r,
115 None => return Err(ChannelError::disconnected()),
116 };
117 let (recv, r) = blocking::unblock(move || {
118 let r = recv.recv();
119 (recv, r)
120 })
121 .await;
122 let r = r?;
123 self.recv = Some(recv);
124 Ok(r)
125 }
126 #[cfg(not(ipc))]
127 {
128 self.recv.recv().await
129 }
130 }
131
132 pub async fn recv_deadline(&mut self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
136 #[cfg(ipc)]
137 {
138 match crate::with_deadline(self.recv(), deadline).await {
139 Ok(r) => r,
140 Err(_) => Err(ChannelError::Timeout),
141 }
142 }
143 #[cfg(not(ipc))]
144 {
145 self.recv.recv_deadline(deadline).await
146 }
147 }
148
149 pub fn recv_blocking(&mut self) -> Result<T, ChannelError> {
151 #[cfg(ipc)]
152 {
153 let recv = match self.recv.take() {
154 Some(r) => r,
155 None => return Err(ChannelError::disconnected()),
156 };
157 let r = recv.recv()?;
158 self.recv = Some(recv);
159 Ok(r)
160 }
161 #[cfg(not(ipc))]
162 {
163 self.recv.recv_blocking()
164 }
165 }
166
167 pub fn recv_deadline_blocking(&mut self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
171 #[cfg(ipc)]
172 {
173 let recv = match self.recv.take() {
174 Some(r) => r,
175 None => return Err(ChannelError::disconnected()),
176 };
177 match deadline.into().time_left() {
178 Some(d) => match recv.try_recv_timeout(d) {
179 Ok(r) => {
180 self.recv = Some(recv);
181 Ok(r)
182 }
183 Err(e) => match e {
184 ipc_channel::TryRecvError::IpcError(e) => Err(ChannelError::disconnected_by(e)),
185 ipc_channel::TryRecvError::Empty => {
186 self.recv = Some(recv);
187 Err(ChannelError::Timeout)
188 }
189 },
190 },
191 None => {
192 self.recv = Some(recv);
193 Err(ChannelError::Timeout)
194 }
195 }
196 }
197 #[cfg(not(ipc))]
198 {
199 self.recv.recv_deadline_blocking(deadline)
200 }
201 }
202
203 pub fn iter(&mut self) -> impl Iterator<Item = T> {
205 #[cfg(ipc)]
206 {
207 std::iter::from_fn(|| self.recv_blocking().ok()).fuse()
208 }
209 #[cfg(not(ipc))]
210 {
211 self.recv.iter()
212 }
213 }
214
215 pub fn try_recv(&mut self) -> Result<Option<T>, ChannelError> {
217 #[cfg(ipc)]
218 {
219 let recv = match self.recv.take() {
220 Some(r) => r,
221 None => return Err(ChannelError::disconnected()),
222 };
223 match recv.try_recv() {
224 Ok(r) => {
225 self.recv = Some(recv);
226 Ok(Some(r))
227 }
228 Err(e) => match e {
229 ipc_channel::TryRecvError::IpcError(e) => Err(ChannelError::disconnected_by(e)),
230 ipc_channel::TryRecvError::Empty => Ok(None),
231 },
232 }
233 }
234 #[cfg(not(ipc))]
235 {
236 self.recv.try_recv()
237 }
238 }
239
240 pub fn try_iter(&mut self) -> impl Iterator<Item = T> {
242 #[cfg(ipc)]
243 {
244 std::iter::from_fn(|| self.try_recv().ok().flatten()).fuse()
245 }
246 #[cfg(not(ipc))]
247 {
248 self.recv.try_iter()
249 }
250 }
251}
252impl<T: IpcValue> Serialize for IpcReceiver<T> {
253 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
254 where
255 S: serde::Serializer,
256 {
257 #[cfg(ipc)]
258 {
259 if !crate::channel::is_ipc_serialization() {
260 return Err(serde::ser::Error::custom("cannot serialize `IpcReceiver` outside IPC"));
261 }
262 self.recv.serialize(serializer)
263 }
264 #[cfg(not(ipc))]
265 {
266 let _ = serializer;
267 Err(serde::ser::Error::custom("cannot serialize `IpcReceiver` outside IPC"))
268 }
269 }
270}
271impl<'de, T: IpcValue> Deserialize<'de> for IpcReceiver<T> {
272 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
273 where
274 D: serde::Deserializer<'de>,
275 {
276 #[cfg(ipc)]
277 {
278 Ok(Self {
279 recv: Option::<ipc_channel::ipc::IpcReceiver<T>>::deserialize(deserializer)?,
280 })
281 }
282 #[cfg(not(ipc))]
283 {
284 let _ = deserializer;
285 Err(serde::de::Error::custom("cannot deserialize `IpcReceiver` outside IPC"))
286 }
287 }
288}
289
290pub fn ipc_unbounded<T: IpcValue>() -> io::Result<(IpcSender<T>, IpcReceiver<T>)> {
300 #[cfg(ipc)]
301 {
302 let (s, r) = ipc_channel::ipc::channel()?;
303 Ok((IpcSender { sender: Some(s) }, IpcReceiver { recv: Some(r) }))
304 }
305 #[cfg(not(ipc))]
306 {
307 let (sender, recv) = super::unbounded();
308 Ok((IpcSender { sender }, IpcReceiver { recv }))
309 }
310}
311
312pub struct NamedIpcReceiver<T: IpcValue> {
319 #[cfg(ipc)]
320 server: ipc_channel::ipc::IpcOneShotServer<IpcReceiver<T>>,
321 #[cfg(ipc)]
322 name: String,
323
324 #[cfg(not(ipc))]
325 inner: named_channel_fallback::NamedReceiver<T>,
326}
327impl<T: IpcValue> fmt::Debug for NamedIpcReceiver<T> {
328 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329 f.debug_struct("NamedIpcReceiver")
330 .field("name", &self.name())
331 .finish_non_exhaustive()
332 }
333}
334impl<T: IpcValue> NamedIpcReceiver<T> {
335 pub fn new() -> io::Result<Self> {
337 #[cfg(ipc)]
338 {
339 let (server, name) = ipc_channel::ipc::IpcOneShotServer::new()?;
340 Ok(Self { server, name })
341 }
342 #[cfg(not(ipc))]
343 {
344 Ok(Self {
345 inner: named_channel_fallback::NamedReceiver::new(),
346 })
347 }
348 }
349
350 pub fn name(&self) -> &str {
354 #[cfg(ipc)]
355 {
356 &self.name
357 }
358 #[cfg(not(ipc))]
359 {
360 self.inner.name()
361 }
362 }
363
364 pub async fn connect(self) -> Result<IpcReceiver<T>, ChannelError> {
366 blocking::unblock(move || self.connect_blocking()).await
367 }
368
369 pub async fn connect_deadline(self, deadline: impl Into<Deadline>) -> Result<IpcReceiver<T>, ChannelError> {
371 match crate::with_deadline(self.connect(), deadline).await {
372 Ok(r) => r,
373 Err(_) => Err(ChannelError::Timeout),
374 }
375 }
376
377 pub fn connect_blocking(self) -> Result<IpcReceiver<T>, ChannelError> {
379 #[cfg(ipc)]
380 {
381 let (_, recv) = self.server.accept().map_err(ChannelError::disconnected_by)?;
382 Ok(recv)
383 }
384 #[cfg(not(ipc))]
385 {
386 self.inner.connect_blocking()
387 }
388 }
389
390 pub fn connect_deadline_blocking(self, deadline: impl Into<Deadline>) -> Result<IpcReceiver<T>, ChannelError> {
392 crate::block_on(self.connect_deadline(deadline))
393 }
394}
395impl<T: IpcValue> IpcSender<T> {
396 pub fn connect(ipc_receiver_name: impl Into<String>) -> Result<Self, ChannelError> {
400 Self::connect_impl(ipc_receiver_name.into())
401 }
402 #[cfg(ipc)]
403 fn connect_impl(ipc_receiver_name: String) -> Result<Self, ChannelError> {
404 let sender = ipc_channel::ipc::IpcSender::<IpcReceiver<T>>::connect(ipc_receiver_name).map_err(ChannelError::disconnected_by)?;
405 let (s, r) = ipc_unbounded().map_err(ChannelError::disconnected_by)?;
406 crate::channel::with_ipc_serialization(|| sender.send(r)).map_err(ChannelError::disconnected_by)?;
407 Ok(s)
408 }
409 #[cfg(not(ipc))]
410 fn connect_impl(ipc_receiver_name: String) -> Result<Self, ChannelError> {
411 named_channel_fallback::sender_connect_blocking(&ipc_receiver_name)
412 }
413}
414
415pub struct NamedIpcSender<T: IpcValue> {
422 #[cfg(ipc)]
423 server: ipc_channel::ipc::IpcOneShotServer<IpcSender<T>>,
424 #[cfg(ipc)]
425 name: String,
426 #[cfg(not(ipc))]
427 inner: named_channel_fallback::NamedSender<T>,
428}
429impl<T: IpcValue> fmt::Debug for NamedIpcSender<T> {
430 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431 f.debug_struct("NamedIpcSender").field("name", &self.name()).finish_non_exhaustive()
432 }
433}
434impl<T: IpcValue> NamedIpcSender<T> {
435 pub fn new() -> io::Result<Self> {
437 #[cfg(ipc)]
438 {
439 let (server, name) = ipc_channel::ipc::IpcOneShotServer::new()?;
440 Ok(Self { server, name })
441 }
442 #[cfg(not(ipc))]
443 {
444 Ok(Self {
445 inner: named_channel_fallback::NamedSender::new(),
446 })
447 }
448 }
449
450 pub fn name(&self) -> &str {
454 #[cfg(ipc)]
455 {
456 &self.name
457 }
458 #[cfg(not(ipc))]
459 {
460 self.inner.name()
461 }
462 }
463
464 pub async fn connect(self) -> Result<IpcSender<T>, ChannelError> {
466 blocking::unblock(move || self.connect_blocking()).await
467 }
468
469 pub async fn connect_deadline(self, deadline: impl Into<Deadline>) -> Result<IpcSender<T>, ChannelError> {
471 match crate::with_deadline(self.connect(), deadline).await {
472 Ok(r) => r,
473 Err(_) => Err(ChannelError::Timeout),
474 }
475 }
476
477 pub fn connect_blocking(self) -> Result<IpcSender<T>, ChannelError> {
479 #[cfg(ipc)]
480 {
481 let (_, sender) = self.server.accept().map_err(ChannelError::disconnected_by)?;
482 Ok(sender)
483 }
484 #[cfg(not(ipc))]
485 {
486 self.inner.connect_blocking()
487 }
488 }
489
490 pub fn connect_deadline_blocking(self, deadline: impl Into<Deadline>) -> Result<IpcSender<T>, ChannelError> {
492 crate::block_on(self.connect_deadline(deadline))
493 }
494}
495impl<T: IpcValue> IpcReceiver<T> {
496 pub fn connect(ipc_sender_name: impl Into<String>) -> Result<Self, ChannelError> {
500 Self::connect_impl(ipc_sender_name.into())
501 }
502 #[cfg(ipc)]
503 fn connect_impl(ipc_sender_name: String) -> Result<Self, ChannelError> {
504 let sender = ipc_channel::ipc::IpcSender::<IpcSender<T>>::connect(ipc_sender_name).map_err(ChannelError::disconnected_by)?;
505 let (s, r) = ipc_unbounded().map_err(ChannelError::disconnected_by)?;
506 crate::channel::with_ipc_serialization(|| sender.send(s)).map_err(ChannelError::disconnected_by)?;
507 Ok(r)
508 }
509 #[cfg(not(ipc))]
510 fn connect_impl(ipc_sender_name: String) -> Result<Self, ChannelError> {
511 named_channel_fallback::receiver_connect_blocking(&ipc_sender_name)
512 }
513}
514
515#[cfg(ipc)]
531#[diagnostic::on_unimplemented(note = "`IpcValue` is implemented for all `T: Serialize + Deserialize + Send + 'static`")]
532pub trait IpcValue: serde::Serialize + for<'d> serde::de::Deserialize<'d> + Send + 'static {}
533#[cfg(ipc)]
534impl<T: serde::Serialize + for<'d> serde::de::Deserialize<'d> + Send + 'static> IpcValue for T {}
535
536#[cfg(not(ipc))]
538#[diagnostic::on_unimplemented(note = "`IpcValue` is implemented for all `T: Send + 'static`")]
539pub trait IpcValue: Send + 'static {}
540#[cfg(not(ipc))]
541impl<T: Send + 'static> IpcValue for T {}
542
543#[cfg(ipc)]
544impl From<ipc_channel::IpcError> for ChannelError {
545 fn from(value: ipc_channel::IpcError) -> Self {
546 match value {
547 ipc_channel::IpcError::Disconnected => ChannelError::disconnected(),
548 e => ChannelError::disconnected_by(e),
549 }
550 }
551}
552#[cfg(ipc)]
553impl From<ipc_channel::TryRecvError> for ChannelError {
554 fn from(value: ipc_channel::TryRecvError) -> Self {
555 match value {
556 ipc_channel::TryRecvError::IpcError(ipc_channel::IpcError::Disconnected) => ChannelError::disconnected(),
557 ipc_channel::TryRecvError::Empty => ChannelError::Timeout,
558 e => ChannelError::disconnected_by(e),
559 }
560 }
561}
562
563#[cfg(not(ipc))]
564mod named_channel_fallback {
565 use std::{
566 any::Any,
567 collections::HashMap,
568 error::Error,
569 fmt, mem,
570 sync::{Arc, Weak, atomic::AtomicU64},
571 };
572
573 use parking_lot::Mutex;
574 use zng_txt::{Txt, formatx};
575
576 use crate::channel::{ChannelError, IpcReceiver, IpcSender, IpcValue, Receiver, Sender, ipc_unbounded, rendezvous};
577
578 static NAME_COUNT: AtomicU64 = AtomicU64::new(0);
579
580 type P = (Mutex<Box<dyn Any + Send>>, Sender<()>);
581 static PENDING: Mutex<Option<HashMap<Txt, Weak<P>>>> = Mutex::new(None);
582
583 pub struct NamedSender<T: IpcValue> {
584 sender: IpcSender<T>,
585 name: Txt,
586 pending_entry: Arc<P>,
587 sig_recv: Receiver<()>,
588 }
589 impl<T: IpcValue> NamedSender<T> {
590 pub fn new() -> Self {
591 let i = NAME_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
592 let name = formatx!("<not-ipc-{}-{i}>", std::process::id());
593
594 let (sender, receiver) = ipc_unbounded::<T>().unwrap();
595 let (sig_sender, sig_recv) = rendezvous();
596
597 let s: Box<dyn Any + Send> = Box::new(receiver);
598 let pending_entry = Arc::new((Mutex::new(s), sig_sender));
599 PENDING
600 .lock()
601 .get_or_insert_default()
602 .insert(name.clone(), Arc::downgrade(&pending_entry));
603
604 Self {
605 sender,
606 name,
607 pending_entry,
608 sig_recv,
609 }
610 }
611
612 pub fn name(&self) -> &str {
613 &self.name
614 }
615
616 pub fn connect_blocking(self) -> Result<IpcSender<T>, ChannelError> {
617 self.sig_recv.recv_blocking()?;
618 Ok(self.sender)
619 }
620 }
621
622 pub fn receiver_connect_blocking<T: IpcValue>(name: &str) -> Result<IpcReceiver<T>, ChannelError> {
623 let mut p = PENDING.lock();
624 let p = p.get_or_insert_default();
625 p.retain(|_, v| v.strong_count() > 0);
626 match p.remove(name) {
627 Some(e) => match e.upgrade() {
628 Some(e) => {
629 let recv = mem::replace(&mut *e.0.lock(), Box::new(()));
630 e.1.send_blocking(());
631 match recv.downcast::<IpcReceiver<T>>() {
632 Ok(r) => Ok(*r),
633 Err(_) => Err(ChannelError::disconnected_by(TypeMismatchError)),
634 }
635 }
636 None => Err(ChannelError::disconnected()),
637 },
638 None => Err(ChannelError::disconnected()),
639 }
640 }
641
642 pub struct NamedReceiver<T: IpcValue> {
643 receiver: IpcReceiver<T>,
644 name: Txt,
645 pending_entry: Arc<P>,
646 sig_recv: Receiver<()>,
647 }
648 impl<T: IpcValue> NamedReceiver<T> {
649 pub fn new() -> Self {
650 let i = NAME_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
651 let name = formatx!("<not-ipc-{}-{i}>", std::process::id());
652
653 let (sender, receiver) = ipc_unbounded::<T>().unwrap();
654 let (sig_sender, sig_recv) = rendezvous();
655
656 let s: Box<dyn Any + Send> = Box::new(sender);
657 let pending_entry = Arc::new((Mutex::new(s), sig_sender));
658 PENDING
659 .lock()
660 .get_or_insert_default()
661 .insert(name.clone(), Arc::downgrade(&pending_entry));
662
663 Self {
664 receiver,
665 name,
666 pending_entry,
667 sig_recv,
668 }
669 }
670
671 pub fn name(&self) -> &str {
672 &self.name
673 }
674
675 pub fn connect_blocking(self) -> Result<IpcReceiver<T>, ChannelError> {
676 self.sig_recv.recv_blocking()?;
677 Ok(self.receiver)
678 }
679 }
680
681 pub fn sender_connect_blocking<T: IpcValue>(name: &str) -> Result<IpcSender<T>, ChannelError> {
682 let mut p = PENDING.lock();
683 let p = p.get_or_insert_default();
684 p.retain(|_, v| v.strong_count() > 0);
685 match p.remove(name) {
686 Some(e) => match e.upgrade() {
687 Some(e) => {
688 let recv = mem::replace(&mut *e.0.lock(), Box::new(()));
689 e.1.send(());
690 match recv.downcast::<IpcSender<T>>() {
691 Ok(r) => Ok(*r),
692 Err(_) => Err(ChannelError::disconnected_by(TypeMismatchError)),
693 }
694 }
695 None => Err(ChannelError::disconnected()),
696 },
697 None => Err(ChannelError::disconnected()),
698 }
699 }
700
701 #[derive(Debug)]
702 struct TypeMismatchError;
703 impl fmt::Display for TypeMismatchError {
704 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
705 write!(f, "named channel type does not match")
706 }
707 }
708 impl Error for TypeMismatchError {}
709}