1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{fmt, io};
4
5use serde::{Deserialize, Serialize};
6use zng_time::Deadline;
7
8use crate::channel::ChannelError;
9
10pub struct IpcSender<T> {
14 #[cfg(ipc)]
15 sender: Option<ipc_channel::ipc::IpcSender<T>>,
16 #[cfg(not(ipc))]
17 sender: super::Sender<T>,
18}
19impl<T: IpcValue> fmt::Debug for IpcSender<T> {
20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21 f.debug_struct("IpcSender").finish_non_exhaustive()
22 }
23}
24impl<T: IpcValue> Clone for IpcSender<T> {
25 fn clone(&self) -> Self {
26 Self {
27 sender: self.sender.clone(),
28 }
29 }
30}
31impl<T: IpcValue> IpcSender<T> {
32 pub fn send_blocking(&mut self, msg: T) -> Result<(), ChannelError> {
36 #[cfg(ipc)]
37 {
38 let sender = match self.sender.take() {
39 Some(s) => s,
40 None => return Err(ChannelError::disconnected()),
41 };
42 let r = crate::channel::with_ipc_serialization(|| sender.send(msg).map_err(ChannelError::disconnected_by));
43 if r.is_ok() {
44 self.sender = Some(sender);
45 }
46 r
47 }
48 #[cfg(not(ipc))]
49 {
50 self.sender.send_blocking(msg)
51 }
52 }
53}
54impl<T: IpcValue> Serialize for IpcSender<T> {
55 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
56 where
57 S: serde::Serializer,
58 {
59 #[cfg(ipc)]
60 {
61 if !crate::channel::is_ipc_serialization() {
62 return Err(serde::ser::Error::custom("cannot serialize `IpcSender` outside IPC"));
63 }
64 self.sender.serialize(serializer)
65 }
66 #[cfg(not(ipc))]
67 {
68 let _ = serializer;
69 Err(serde::ser::Error::custom("cannot serialize `IpcSender` outside IPC"))
70 }
71 }
72}
73impl<'de, T: IpcValue> Deserialize<'de> for IpcSender<T> {
74 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
75 where
76 D: serde::Deserializer<'de>,
77 {
78 #[cfg(ipc)]
79 {
80 Ok(Self {
81 sender: Option::<ipc_channel::ipc::IpcSender<T>>::deserialize(deserializer)?,
82 })
83 }
84 #[cfg(not(ipc))]
85 {
86 let _ = deserializer;
87 Err(serde::de::Error::custom("cannot deserialize `IpcSender` outside IPC"))
88 }
89 }
90}
91
92pub struct IpcReceiver<T> {
96 #[cfg(ipc)]
97 recv: Option<ipc_channel::ipc::IpcReceiver<T>>,
98 #[cfg(not(ipc))]
99 recv: super::Receiver<T>,
100}
101impl<T: IpcValue> fmt::Debug for IpcReceiver<T> {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 f.debug_struct("IpcReceiver").finish_non_exhaustive()
104 }
105}
106impl<T: IpcValue> IpcReceiver<T> {
107 pub async fn recv(&mut self) -> Result<T, ChannelError> {
111 #[cfg(ipc)]
112 {
113 let recv = match self.recv.take() {
114 Some(r) => r,
115 None => return Err(ChannelError::disconnected()),
116 };
117 let (recv, r) = blocking::unblock(move || {
118 let r = recv.recv();
119 (recv, r)
120 })
121 .await;
122 let r = r?;
123 self.recv = Some(recv);
124 Ok(r)
125 }
126 #[cfg(not(ipc))]
127 {
128 self.recv.recv().await
129 }
130 }
131
132 pub async fn recv_deadline(&mut self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
136 #[cfg(ipc)]
137 {
138 match crate::with_deadline(self.recv(), deadline).await {
139 Ok(r) => r,
140 Err(_) => Err(ChannelError::Timeout),
141 }
142 }
143 #[cfg(not(ipc))]
144 {
145 self.recv.recv_deadline(deadline).await
146 }
147 }
148
149 pub fn recv_blocking(&mut self) -> Result<T, ChannelError> {
151 #[cfg(ipc)]
152 {
153 let recv = match self.recv.take() {
154 Some(r) => r,
155 None => return Err(ChannelError::disconnected()),
156 };
157 let r = recv.recv()?;
158 self.recv = Some(recv);
159 Ok(r)
160 }
161 #[cfg(not(ipc))]
162 {
163 self.recv.recv_blocking()
164 }
165 }
166
167 pub fn recv_deadline_blocking(&mut self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
171 #[cfg(ipc)]
172 {
173 let recv = match self.recv.take() {
174 Some(r) => r,
175 None => return Err(ChannelError::disconnected()),
176 };
177 match deadline.into().time_left() {
178 Some(d) => match recv.try_recv_timeout(d) {
179 Ok(r) => {
180 self.recv = Some(recv);
181 Ok(r)
182 }
183 Err(e) => match e {
184 ipc_channel::ipc::TryRecvError::IpcError(e) => Err(ChannelError::disconnected_by(e)),
185 ipc_channel::ipc::TryRecvError::Empty => {
186 self.recv = Some(recv);
187 Err(ChannelError::Timeout)
188 }
189 },
190 },
191 None => {
192 self.recv = Some(recv);
193 Err(ChannelError::Timeout)
194 }
195 }
196 }
197 #[cfg(not(ipc))]
198 {
199 self.recv.recv_deadline_blocking(deadline)
200 }
201 }
202
203 pub fn iter(&mut self) -> impl Iterator<Item = T> {
205 #[cfg(ipc)]
206 {
207 std::iter::from_fn(|| self.recv_blocking().ok()).fuse()
208 }
209 #[cfg(not(ipc))]
210 {
211 self.recv.iter()
212 }
213 }
214
215 pub fn try_recv(&mut self) -> Result<Option<T>, ChannelError> {
217 #[cfg(ipc)]
218 {
219 let recv = match self.recv.take() {
220 Some(r) => r,
221 None => return Err(ChannelError::disconnected()),
222 };
223 match recv.try_recv() {
224 Ok(r) => {
225 self.recv = Some(recv);
226 Ok(Some(r))
227 }
228 Err(e) => match e {
229 ipc_channel::ipc::TryRecvError::IpcError(e) => Err(ChannelError::disconnected_by(e)),
230 ipc_channel::ipc::TryRecvError::Empty => Ok(None),
231 },
232 }
233 }
234 #[cfg(not(ipc))]
235 {
236 self.recv.try_recv()
237 }
238 }
239
240 pub fn try_iter(&mut self) -> impl Iterator<Item = T> {
242 #[cfg(ipc)]
243 {
244 std::iter::from_fn(|| self.try_recv().ok().flatten()).fuse()
245 }
246 #[cfg(not(ipc))]
247 {
248 self.recv.try_iter()
249 }
250 }
251}
252impl<T: IpcValue> Serialize for IpcReceiver<T> {
253 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
254 where
255 S: serde::Serializer,
256 {
257 #[cfg(ipc)]
258 {
259 if !crate::channel::is_ipc_serialization() {
260 return Err(serde::ser::Error::custom("cannot serialize `IpcReceiver` outside IPC"));
261 }
262 self.recv.serialize(serializer)
263 }
264 #[cfg(not(ipc))]
265 {
266 let _ = serializer;
267 Err(serde::ser::Error::custom("cannot serialize `IpcReceiver` outside IPC"))
268 }
269 }
270}
271impl<'de, T: IpcValue> Deserialize<'de> for IpcReceiver<T> {
272 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
273 where
274 D: serde::Deserializer<'de>,
275 {
276 #[cfg(ipc)]
277 {
278 Ok(Self {
279 recv: Option::<ipc_channel::ipc::IpcReceiver<T>>::deserialize(deserializer)?,
280 })
281 }
282 #[cfg(not(ipc))]
283 {
284 let _ = deserializer;
285 Err(serde::de::Error::custom("cannot deserialize `IpcReceiver` outside IPC"))
286 }
287 }
288}
289
290pub fn ipc_unbounded<T: IpcValue>() -> io::Result<(IpcSender<T>, IpcReceiver<T>)> {
300 #[cfg(ipc)]
301 {
302 let (s, r) = ipc_channel::ipc::channel()?;
303 Ok((IpcSender { sender: Some(s) }, IpcReceiver { recv: Some(r) }))
304 }
305 #[cfg(not(ipc))]
306 {
307 let (sender, recv) = super::unbounded();
308 Ok((IpcSender { sender }, IpcReceiver { recv }))
309 }
310}
311
312pub struct NamedIpcReceiver<T: IpcValue> {
319 #[cfg(ipc)]
320 server: ipc_channel::ipc::IpcOneShotServer<IpcReceiver<T>>,
321 #[cfg(ipc)]
322 name: String,
323
324 #[cfg(not(ipc))]
325 inner: named_channel_fallback::NamedReceiver<T>,
326}
327impl<T: IpcValue> fmt::Debug for NamedIpcReceiver<T> {
328 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329 f.debug_struct("NamedIpcReceiver")
330 .field("name", &self.name())
331 .finish_non_exhaustive()
332 }
333}
334impl<T: IpcValue> NamedIpcReceiver<T> {
335 pub fn new() -> io::Result<Self> {
337 #[cfg(ipc)]
338 {
339 let (server, name) = ipc_channel::ipc::IpcOneShotServer::new()?;
340 Ok(Self { server, name })
341 }
342 #[cfg(not(ipc))]
343 {
344 Ok(Self {
345 inner: named_channel_fallback::NamedReceiver::new(),
346 })
347 }
348 }
349
350 pub fn name(&self) -> &str {
354 #[cfg(ipc)]
355 {
356 &self.name
357 }
358 #[cfg(not(ipc))]
359 {
360 self.inner.name()
361 }
362 }
363
364 pub async fn connect(self) -> Result<IpcReceiver<T>, ChannelError> {
366 blocking::unblock(move || self.connect_blocking()).await
367 }
368
369 pub async fn connect_deadline(self, deadline: impl Into<Deadline>) -> Result<IpcReceiver<T>, ChannelError> {
371 match crate::with_deadline(self.connect(), deadline).await {
372 Ok(r) => r,
373 Err(_) => Err(ChannelError::Timeout),
374 }
375 }
376
377 pub fn connect_blocking(self) -> Result<IpcReceiver<T>, ChannelError> {
379 #[cfg(ipc)]
380 {
381 let (_, recv) = self.server.accept().map_err(ChannelError::disconnected_by)?;
382 Ok(recv)
383 }
384 #[cfg(not(ipc))]
385 {
386 self.inner.connect_blocking()
387 }
388 }
389
390 pub fn connect_deadline_blocking(self, deadline: impl Into<Deadline>) -> Result<IpcReceiver<T>, ChannelError> {
392 crate::block_on(self.connect_deadline(deadline))
393 }
394}
395impl<T: IpcValue> IpcSender<T> {
396 pub fn connect(ipc_receiver_name: impl Into<String>) -> Result<Self, ChannelError> {
400 Self::connect_impl(ipc_receiver_name.into())
401 }
402 #[cfg(ipc)]
403 fn connect_impl(ipc_receiver_name: String) -> Result<Self, ChannelError> {
404 let sender = ipc_channel::ipc::IpcSender::<IpcReceiver<T>>::connect(ipc_receiver_name).map_err(ChannelError::disconnected_by)?;
405 let (s, r) = ipc_unbounded().map_err(ChannelError::disconnected_by)?;
406 crate::channel::with_ipc_serialization(|| sender.send(r)).map_err(ChannelError::disconnected_by)?;
407 Ok(s)
408 }
409 #[cfg(not(ipc))]
410 fn connect_impl(ipc_receiver_name: String) -> Result<Self, ChannelError> {
411 named_channel_fallback::sender_connect_blocking(&ipc_receiver_name)
412 }
413}
414
415pub struct NamedIpcSender<T: IpcValue> {
422 #[cfg(ipc)]
423 server: ipc_channel::ipc::IpcOneShotServer<IpcSender<T>>,
424 #[cfg(ipc)]
425 name: String,
426 #[cfg(not(ipc))]
427 inner: named_channel_fallback::NamedSender<T>,
428}
429impl<T: IpcValue> fmt::Debug for NamedIpcSender<T> {
430 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431 f.debug_struct("NamedIpcSender").field("name", &self.name()).finish_non_exhaustive()
432 }
433}
434impl<T: IpcValue> NamedIpcSender<T> {
435 pub fn new() -> io::Result<Self> {
437 #[cfg(ipc)]
438 {
439 let (server, name) = ipc_channel::ipc::IpcOneShotServer::new()?;
440 Ok(Self { server, name })
441 }
442 #[cfg(not(ipc))]
443 {
444 Ok(Self {
445 inner: named_channel_fallback::NamedSender::new(),
446 })
447 }
448 }
449
450 pub fn name(&self) -> &str {
454 #[cfg(ipc)]
455 {
456 &self.name
457 }
458 #[cfg(not(ipc))]
459 {
460 self.inner.name()
461 }
462 }
463
464 pub async fn connect(self) -> Result<IpcSender<T>, ChannelError> {
466 blocking::unblock(move || self.connect_blocking()).await
467 }
468
469 pub async fn connect_deadline(self, deadline: impl Into<Deadline>) -> Result<IpcSender<T>, ChannelError> {
471 match crate::with_deadline(self.connect(), deadline).await {
472 Ok(r) => r,
473 Err(_) => Err(ChannelError::Timeout),
474 }
475 }
476
477 pub fn connect_blocking(self) -> Result<IpcSender<T>, ChannelError> {
479 #[cfg(ipc)]
480 {
481 let (_, sender) = self.server.accept().map_err(ChannelError::disconnected_by)?;
482 Ok(sender)
483 }
484 #[cfg(not(ipc))]
485 {
486 self.inner.connect_blocking()
487 }
488 }
489
490 pub fn connect_deadline_blocking(self, deadline: impl Into<Deadline>) -> Result<IpcSender<T>, ChannelError> {
492 crate::block_on(self.connect_deadline(deadline))
493 }
494}
495impl<T: IpcValue> IpcReceiver<T> {
496 pub fn connect(ipc_sender_name: impl Into<String>) -> Result<Self, ChannelError> {
500 Self::connect_impl(ipc_sender_name.into())
501 }
502 #[cfg(ipc)]
503 fn connect_impl(ipc_sender_name: String) -> Result<Self, ChannelError> {
504 let sender = ipc_channel::ipc::IpcSender::<IpcSender<T>>::connect(ipc_sender_name).map_err(ChannelError::disconnected_by)?;
505 let (s, r) = ipc_unbounded().map_err(ChannelError::disconnected_by)?;
506 crate::channel::with_ipc_serialization(|| sender.send(s)).map_err(ChannelError::disconnected_by)?;
507 Ok(r)
508 }
509 #[cfg(not(ipc))]
510 fn connect_impl(ipc_sender_name: String) -> Result<Self, ChannelError> {
511 named_channel_fallback::receiver_connect_blocking(&ipc_sender_name)
512 }
513}
514
515#[diagnostic::on_unimplemented(note = "`IpcValue` is implemented for all `T: Serialize + Deserialize + Send + 'static`")]
528pub trait IpcValue: serde::Serialize + for<'d> serde::de::Deserialize<'d> + Send + 'static {}
529impl<T: serde::Serialize + for<'d> serde::de::Deserialize<'d> + Send + 'static> IpcValue for T {}
530
531#[cfg(ipc)]
532impl From<ipc_channel::ipc::IpcError> for ChannelError {
533 fn from(value: ipc_channel::ipc::IpcError) -> Self {
534 match value {
535 ipc_channel::ipc::IpcError::Disconnected => ChannelError::disconnected(),
536 e => ChannelError::disconnected_by(e),
537 }
538 }
539}
540#[cfg(ipc)]
541impl From<ipc_channel::ipc::TryRecvError> for ChannelError {
542 fn from(value: ipc_channel::ipc::TryRecvError) -> Self {
543 match value {
544 ipc_channel::ipc::TryRecvError::IpcError(ipc_channel::ipc::IpcError::Disconnected) => ChannelError::disconnected(),
545 ipc_channel::ipc::TryRecvError::Empty => ChannelError::Timeout,
546 e => ChannelError::disconnected_by(e),
547 }
548 }
549}
550
551#[cfg(not(ipc))]
552mod named_channel_fallback {
553 use std::{
554 any::Any,
555 collections::HashMap,
556 error::Error,
557 fmt, mem,
558 sync::{Arc, Weak, atomic::AtomicU64},
559 };
560
561 use parking_lot::Mutex;
562 use zng_txt::{Txt, formatx};
563
564 use crate::channel::{ChannelError, IpcReceiver, IpcSender, IpcValue, Receiver, Sender, ipc_unbounded, rendezvous};
565
566 static NAME_COUNT: AtomicU64 = AtomicU64::new(0);
567
568 type P = (Mutex<Box<dyn Any + Send>>, Sender<()>);
569 static PENDING: Mutex<Option<HashMap<Txt, Weak<P>>>> = Mutex::new(None);
570
571 pub struct NamedSender<T: IpcValue> {
572 sender: IpcSender<T>,
573 name: Txt,
574 pending_entry: Arc<P>,
575 sig_recv: Receiver<()>,
576 }
577 impl<T: IpcValue> NamedSender<T> {
578 pub fn new() -> Self {
579 let i = NAME_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
580 let name = formatx!("<not-ipc-{}-{i}>", std::process::id());
581
582 let (sender, receiver) = ipc_unbounded::<T>().unwrap();
583 let (sig_sender, sig_recv) = rendezvous();
584
585 let s: Box<dyn Any + Send> = Box::new(receiver);
586 let pending_entry = Arc::new((Mutex::new(s), sig_sender));
587 PENDING
588 .lock()
589 .get_or_insert_default()
590 .insert(name.clone(), Arc::downgrade(&pending_entry));
591
592 Self {
593 sender,
594 name,
595 pending_entry,
596 sig_recv,
597 }
598 }
599
600 pub fn name(&self) -> &str {
601 &self.name
602 }
603
604 pub fn connect_blocking(self) -> Result<IpcSender<T>, ChannelError> {
605 self.sig_recv.recv_blocking()?;
606 Ok(self.sender)
607 }
608 }
609
610 pub fn receiver_connect_blocking<T: IpcValue>(name: &str) -> Result<IpcReceiver<T>, ChannelError> {
611 let mut p = PENDING.lock();
612 let p = p.get_or_insert_default();
613 p.retain(|_, v| v.strong_count() > 0);
614 match p.remove(name) {
615 Some(e) => match e.upgrade() {
616 Some(e) => {
617 let recv = mem::replace(&mut *e.0.lock(), Box::new(()));
618 e.1.send_blocking(());
619 match recv.downcast::<IpcReceiver<T>>() {
620 Ok(r) => Ok(*r),
621 Err(_) => Err(ChannelError::disconnected_by(TypeMismatchError)),
622 }
623 }
624 None => Err(ChannelError::disconnected()),
625 },
626 None => Err(ChannelError::disconnected()),
627 }
628 }
629
630 pub struct NamedReceiver<T: IpcValue> {
631 receiver: IpcReceiver<T>,
632 name: Txt,
633 pending_entry: Arc<P>,
634 sig_recv: Receiver<()>,
635 }
636 impl<T: IpcValue> NamedReceiver<T> {
637 pub fn new() -> Self {
638 let i = NAME_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
639 let name = formatx!("<not-ipc-{}-{i}>", std::process::id());
640
641 let (sender, receiver) = ipc_unbounded::<T>().unwrap();
642 let (sig_sender, sig_recv) = rendezvous();
643
644 let s: Box<dyn Any + Send> = Box::new(sender);
645 let pending_entry = Arc::new((Mutex::new(s), sig_sender));
646 PENDING
647 .lock()
648 .get_or_insert_default()
649 .insert(name.clone(), Arc::downgrade(&pending_entry));
650
651 Self {
652 receiver,
653 name,
654 pending_entry,
655 sig_recv,
656 }
657 }
658
659 pub fn name(&self) -> &str {
660 &self.name
661 }
662
663 pub fn connect_blocking(self) -> Result<IpcReceiver<T>, ChannelError> {
664 self.sig_recv.recv_blocking()?;
665 Ok(self.receiver)
666 }
667 }
668
669 pub fn sender_connect_blocking<T: IpcValue>(name: &str) -> Result<IpcSender<T>, ChannelError> {
670 let mut p = PENDING.lock();
671 let p = p.get_or_insert_default();
672 p.retain(|_, v| v.strong_count() > 0);
673 match p.remove(name) {
674 Some(e) => match e.upgrade() {
675 Some(e) => {
676 let recv = mem::replace(&mut *e.0.lock(), Box::new(()));
677 e.1.send(());
678 match recv.downcast::<IpcSender<T>>() {
679 Ok(r) => Ok(*r),
680 Err(_) => Err(ChannelError::disconnected_by(TypeMismatchError)),
681 }
682 }
683 None => Err(ChannelError::disconnected()),
684 },
685 None => Err(ChannelError::disconnected()),
686 }
687 }
688
689 #[derive(Debug)]
690 struct TypeMismatchError;
691 impl fmt::Display for TypeMismatchError {
692 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
693 write!(f, "named channel type does not match")
694 }
695 }
696 impl Error for TypeMismatchError {}
697}