zng_task/channel/
ipc.rs

1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{fmt, io};
4
5use serde::{Deserialize, Serialize};
6use zng_time::Deadline;
7
8use crate::channel::ChannelError;
9
10/// The transmitting end of an IPC channel.
11///
12/// Use [`ipc_unbounded`] to declare a new channel.
13pub struct IpcSender<T> {
14    #[cfg(ipc)]
15    sender: Option<ipc_channel::ipc::IpcSender<T>>,
16    #[cfg(not(ipc))]
17    sender: super::Sender<T>,
18}
19impl<T: IpcValue> fmt::Debug for IpcSender<T> {
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        f.debug_struct("IpcSender").finish_non_exhaustive()
22    }
23}
24impl<T: IpcValue> Clone for IpcSender<T> {
25    fn clone(&self) -> Self {
26        Self {
27            sender: self.sender.clone(),
28        }
29    }
30}
31impl<T: IpcValue> IpcSender<T> {
32    /// Send a value into the channel.
33    ///
34    /// IPC channels are unbounded, this never blocks in the current release.
35    pub fn send_blocking(&mut self, msg: T) -> Result<(), ChannelError> {
36        #[cfg(ipc)]
37        {
38            let sender = match self.sender.take() {
39                Some(s) => s,
40                None => return Err(ChannelError::disconnected()),
41            };
42            let r = crate::channel::with_ipc_serialization(|| sender.send(msg).map_err(ChannelError::disconnected_by));
43            if r.is_ok() {
44                self.sender = Some(sender);
45            }
46            r
47        }
48        #[cfg(not(ipc))]
49        {
50            self.sender.send_blocking(msg)
51        }
52    }
53}
54impl<T: IpcValue> Serialize for IpcSender<T> {
55    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
56    where
57        S: serde::Serializer,
58    {
59        #[cfg(ipc)]
60        {
61            if !crate::channel::is_ipc_serialization() {
62                return Err(serde::ser::Error::custom("cannot serialize `IpcSender` outside IPC"));
63            }
64            self.sender.serialize(serializer)
65        }
66        #[cfg(not(ipc))]
67        {
68            let _ = serializer;
69            Err(serde::ser::Error::custom("cannot serialize `IpcSender` outside IPC"))
70        }
71    }
72}
73impl<'de, T: IpcValue> Deserialize<'de> for IpcSender<T> {
74    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
75    where
76        D: serde::Deserializer<'de>,
77    {
78        #[cfg(ipc)]
79        {
80            Ok(Self {
81                sender: Option::<ipc_channel::ipc::IpcSender<T>>::deserialize(deserializer)?,
82            })
83        }
84        #[cfg(not(ipc))]
85        {
86            let _ = deserializer;
87            Err(serde::de::Error::custom("cannot deserialize `IpcSender` outside IPC"))
88        }
89    }
90}
91
92/// The receiving end of an IPC channel.
93///
94/// Use [`ipc_unbounded`] to declare a new channel.
95pub struct IpcReceiver<T> {
96    #[cfg(ipc)]
97    recv: Option<ipc_channel::ipc::IpcReceiver<T>>,
98    #[cfg(not(ipc))]
99    recv: super::Receiver<T>,
100}
101impl<T: IpcValue> fmt::Debug for IpcReceiver<T> {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        f.debug_struct("IpcReceiver").finish_non_exhaustive()
104    }
105}
106impl<T: IpcValue> IpcReceiver<T> {
107    /// Wait for an incoming value from the channel associated with this receiver.
108    ///
109    /// Returns an error if all senders have been dropped.
110    pub async fn recv(&mut self) -> Result<T, ChannelError> {
111        #[cfg(ipc)]
112        {
113            let recv = match self.recv.take() {
114                Some(r) => r,
115                None => return Err(ChannelError::disconnected()),
116            };
117            let (recv, r) = blocking::unblock(move || {
118                let r = recv.recv();
119                (recv, r)
120            })
121            .await;
122            let r = r?;
123            self.recv = Some(recv);
124            Ok(r)
125        }
126        #[cfg(not(ipc))]
127        {
128            self.recv.recv().await
129        }
130    }
131
132    /// Block for an incoming value from the channel associated with this receiver.
133    ///
134    /// Returns an error if all senders have been dropped or the `deadline` is reached.
135    pub async fn recv_deadline(&mut self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
136        #[cfg(ipc)]
137        {
138            match crate::with_deadline(self.recv(), deadline).await {
139                Ok(r) => r,
140                Err(_) => Err(ChannelError::Timeout),
141            }
142        }
143        #[cfg(not(ipc))]
144        {
145            self.recv.recv_deadline(deadline).await
146        }
147    }
148
149    /// Block for an incoming value from the channel associated with this receiver.
150    pub fn recv_blocking(&mut self) -> Result<T, ChannelError> {
151        #[cfg(ipc)]
152        {
153            let recv = match self.recv.take() {
154                Some(r) => r,
155                None => return Err(ChannelError::disconnected()),
156            };
157            let r = recv.recv()?;
158            self.recv = Some(recv);
159            Ok(r)
160        }
161        #[cfg(not(ipc))]
162        {
163            self.recv.recv_blocking()
164        }
165    }
166
167    /// Block for an incoming value from the channel associated with this receiver.
168    ///
169    /// Returns an error if all senders have been dropped or the `deadline` is reached.
170    pub fn recv_deadline_blocking(&mut self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
171        #[cfg(ipc)]
172        {
173            let recv = match self.recv.take() {
174                Some(r) => r,
175                None => return Err(ChannelError::disconnected()),
176            };
177            match deadline.into().time_left() {
178                Some(d) => match recv.try_recv_timeout(d) {
179                    Ok(r) => {
180                        self.recv = Some(recv);
181                        Ok(r)
182                    }
183                    Err(e) => match e {
184                        ipc_channel::ipc::TryRecvError::IpcError(e) => Err(ChannelError::disconnected_by(e)),
185                        ipc_channel::ipc::TryRecvError::Empty => {
186                            self.recv = Some(recv);
187                            Err(ChannelError::Timeout)
188                        }
189                    },
190                },
191                None => {
192                    self.recv = Some(recv);
193                    Err(ChannelError::Timeout)
194                }
195            }
196        }
197        #[cfg(not(ipc))]
198        {
199            self.recv.recv_deadline_blocking(deadline)
200        }
201    }
202
203    /// Create a blocking iterator that receives until a channel error.
204    pub fn iter(&mut self) -> impl Iterator<Item = T> {
205        #[cfg(ipc)]
206        {
207            std::iter::from_fn(|| self.recv_blocking().ok()).fuse()
208        }
209        #[cfg(not(ipc))]
210        {
211            self.recv.iter()
212        }
213    }
214
215    /// Returns the next incoming message in the channel or `None`.
216    pub fn try_recv(&mut self) -> Result<Option<T>, ChannelError> {
217        #[cfg(ipc)]
218        {
219            let recv = match self.recv.take() {
220                Some(r) => r,
221                None => return Err(ChannelError::disconnected()),
222            };
223            match recv.try_recv() {
224                Ok(r) => {
225                    self.recv = Some(recv);
226                    Ok(Some(r))
227                }
228                Err(e) => match e {
229                    ipc_channel::ipc::TryRecvError::IpcError(e) => Err(ChannelError::disconnected_by(e)),
230                    ipc_channel::ipc::TryRecvError::Empty => Ok(None),
231                },
232            }
233        }
234        #[cfg(not(ipc))]
235        {
236            self.recv.try_recv()
237        }
238    }
239
240    /// Iterate over all the pending incoming messages in the channel, until the channel is empty or error.
241    pub fn try_iter(&mut self) -> impl Iterator<Item = T> {
242        #[cfg(ipc)]
243        {
244            std::iter::from_fn(|| self.try_recv().ok().flatten()).fuse()
245        }
246        #[cfg(not(ipc))]
247        {
248            self.recv.try_iter()
249        }
250    }
251}
252impl<T: IpcValue> Serialize for IpcReceiver<T> {
253    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
254    where
255        S: serde::Serializer,
256    {
257        #[cfg(ipc)]
258        {
259            if !crate::channel::is_ipc_serialization() {
260                return Err(serde::ser::Error::custom("cannot serialize `IpcReceiver` outside IPC"));
261            }
262            self.recv.serialize(serializer)
263        }
264        #[cfg(not(ipc))]
265        {
266            let _ = serializer;
267            Err(serde::ser::Error::custom("cannot serialize `IpcReceiver` outside IPC"))
268        }
269    }
270}
271impl<'de, T: IpcValue> Deserialize<'de> for IpcReceiver<T> {
272    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
273    where
274        D: serde::Deserializer<'de>,
275    {
276        #[cfg(ipc)]
277        {
278            Ok(Self {
279                recv: Option::<ipc_channel::ipc::IpcReceiver<T>>::deserialize(deserializer)?,
280            })
281        }
282        #[cfg(not(ipc))]
283        {
284            let _ = deserializer;
285            Err(serde::de::Error::custom("cannot deserialize `IpcReceiver` outside IPC"))
286        }
287    }
288}
289
290/// Create an unbounded IPC channel.
291///
292/// Note that the channel endpoints can also be send over IPC, the first channel is setup by [`Worker`]. You
293/// can also use the [`NamedIpcReceiver`] or [`NamedIpcSender`] to create the first channel with a custom process.
294///
295/// Note that the channel is only IPC if build with `"ipc"` crate feature, otherwise it will falls back to [`channel::unbounded`].
296///
297/// [`channel::unbounded`]: crate::channel::unbounded
298/// [`Worker`]: crate::process::worker::Worker
299pub fn ipc_unbounded<T: IpcValue>() -> io::Result<(IpcSender<T>, IpcReceiver<T>)> {
300    #[cfg(ipc)]
301    {
302        let (s, r) = ipc_channel::ipc::channel()?;
303        Ok((IpcSender { sender: Some(s) }, IpcReceiver { recv: Some(r) }))
304    }
305    #[cfg(not(ipc))]
306    {
307        let (sender, recv) = super::unbounded();
308        Ok((IpcSender { sender }, IpcReceiver { recv }))
309    }
310}
311
312/// Init named IPC connection with another process, the receiver end is in the first process.
313///
314/// Note that this is less efficient than [`ipc_unbounded`], it is only recommended for creating the first channel,
315/// you can send other channels using the first channel.
316///
317/// See also [`NamedIpcSender`].
318pub struct NamedIpcReceiver<T: IpcValue> {
319    #[cfg(ipc)]
320    server: ipc_channel::ipc::IpcOneShotServer<IpcReceiver<T>>,
321    #[cfg(ipc)]
322    name: String,
323
324    #[cfg(not(ipc))]
325    inner: named_channel_fallback::NamedReceiver<T>,
326}
327impl<T: IpcValue> fmt::Debug for NamedIpcReceiver<T> {
328    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329        f.debug_struct("NamedIpcReceiver")
330            .field("name", &self.name())
331            .finish_non_exhaustive()
332    }
333}
334impl<T: IpcValue> NamedIpcReceiver<T> {
335    /// New initial IPC connection.
336    pub fn new() -> io::Result<Self> {
337        #[cfg(ipc)]
338        {
339            let (server, name) = ipc_channel::ipc::IpcOneShotServer::new()?;
340            Ok(Self { server, name })
341        }
342        #[cfg(not(ipc))]
343        {
344            Ok(Self {
345                inner: named_channel_fallback::NamedReceiver::new(),
346            })
347        }
348    }
349
350    /// Unique name that must be used by the other process to [`IpcSender::connect`].
351    ///
352    /// You can share the name with the other process using a command argument or environment variable.
353    pub fn name(&self) -> &str {
354        #[cfg(ipc)]
355        {
356            &self.name
357        }
358        #[cfg(not(ipc))]
359        {
360            self.inner.name()
361        }
362    }
363
364    /// Await until other process connects.
365    pub async fn connect(self) -> Result<IpcReceiver<T>, ChannelError> {
366        blocking::unblock(move || self.connect_blocking()).await
367    }
368
369    /// Await until other process connects or `deadline` elapses.
370    pub async fn connect_deadline(self, deadline: impl Into<Deadline>) -> Result<IpcReceiver<T>, ChannelError> {
371        match crate::with_deadline(self.connect(), deadline).await {
372            Ok(r) => r,
373            Err(_) => Err(ChannelError::Timeout),
374        }
375    }
376
377    /// Blocks until other process connects.
378    pub fn connect_blocking(self) -> Result<IpcReceiver<T>, ChannelError> {
379        #[cfg(ipc)]
380        {
381            let (_, recv) = self.server.accept().map_err(ChannelError::disconnected_by)?;
382            Ok(recv)
383        }
384        #[cfg(not(ipc))]
385        {
386            self.inner.connect_blocking()
387        }
388    }
389
390    /// Blocks until other process connects or `deadline` elapses.
391    pub fn connect_deadline_blocking(self, deadline: impl Into<Deadline>) -> Result<IpcReceiver<T>, ChannelError> {
392        crate::block_on(self.connect_deadline(deadline))
393    }
394}
395impl<T: IpcValue> IpcSender<T> {
396    /// Connect with a named receiver created in another process with [`NamedIpcReceiver`].
397    ///
398    /// This must only be called once for the `ipc_receiver_name`.
399    pub fn connect(ipc_receiver_name: impl Into<String>) -> Result<Self, ChannelError> {
400        Self::connect_impl(ipc_receiver_name.into())
401    }
402    #[cfg(ipc)]
403    fn connect_impl(ipc_receiver_name: String) -> Result<Self, ChannelError> {
404        let sender = ipc_channel::ipc::IpcSender::<IpcReceiver<T>>::connect(ipc_receiver_name).map_err(ChannelError::disconnected_by)?;
405        let (s, r) = ipc_unbounded().map_err(ChannelError::disconnected_by)?;
406        crate::channel::with_ipc_serialization(|| sender.send(r)).map_err(ChannelError::disconnected_by)?;
407        Ok(s)
408    }
409    #[cfg(not(ipc))]
410    fn connect_impl(ipc_receiver_name: String) -> Result<Self, ChannelError> {
411        named_channel_fallback::sender_connect_blocking(&ipc_receiver_name)
412    }
413}
414
415/// Init named IPC connection with another process, the sender end is in the first process.
416///
417/// Note that this is less efficient than [`ipc_unbounded`], it is only recommended for creating the first channel,
418/// you can send other channels using the first channel.
419///
420/// See also [`NamedIpcReceiver`].
421pub struct NamedIpcSender<T: IpcValue> {
422    #[cfg(ipc)]
423    server: ipc_channel::ipc::IpcOneShotServer<IpcSender<T>>,
424    #[cfg(ipc)]
425    name: String,
426    #[cfg(not(ipc))]
427    inner: named_channel_fallback::NamedSender<T>,
428}
429impl<T: IpcValue> fmt::Debug for NamedIpcSender<T> {
430    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431        f.debug_struct("NamedIpcSender").field("name", &self.name()).finish_non_exhaustive()
432    }
433}
434impl<T: IpcValue> NamedIpcSender<T> {
435    /// New initial IPC connection.
436    pub fn new() -> io::Result<Self> {
437        #[cfg(ipc)]
438        {
439            let (server, name) = ipc_channel::ipc::IpcOneShotServer::new()?;
440            Ok(Self { server, name })
441        }
442        #[cfg(not(ipc))]
443        {
444            Ok(Self {
445                inner: named_channel_fallback::NamedSender::new(),
446            })
447        }
448    }
449
450    /// Unique name that must be used by the other process to [`IpcReceiver::connect`].
451    ///
452    /// You can share the name with the other process using a command argument or environment variable.
453    pub fn name(&self) -> &str {
454        #[cfg(ipc)]
455        {
456            &self.name
457        }
458        #[cfg(not(ipc))]
459        {
460            self.inner.name()
461        }
462    }
463
464    /// Await until other process connects.
465    pub async fn connect(self) -> Result<IpcSender<T>, ChannelError> {
466        blocking::unblock(move || self.connect_blocking()).await
467    }
468
469    /// Await until other process connects or `deadline` elapses.
470    pub async fn connect_deadline(self, deadline: impl Into<Deadline>) -> Result<IpcSender<T>, ChannelError> {
471        match crate::with_deadline(self.connect(), deadline).await {
472            Ok(r) => r,
473            Err(_) => Err(ChannelError::Timeout),
474        }
475    }
476
477    /// Blocks until other process connects.
478    pub fn connect_blocking(self) -> Result<IpcSender<T>, ChannelError> {
479        #[cfg(ipc)]
480        {
481            let (_, sender) = self.server.accept().map_err(ChannelError::disconnected_by)?;
482            Ok(sender)
483        }
484        #[cfg(not(ipc))]
485        {
486            self.inner.connect_blocking()
487        }
488    }
489
490    /// Blocks until other process connects or `deadline` elapses.
491    pub fn connect_deadline_blocking(self, deadline: impl Into<Deadline>) -> Result<IpcSender<T>, ChannelError> {
492        crate::block_on(self.connect_deadline(deadline))
493    }
494}
495impl<T: IpcValue> IpcReceiver<T> {
496    /// Connect with a named sender created in another process with [`NamedIpcSender`].
497    ///
498    /// This must only be called once for the `ipc_sender_name`.
499    pub fn connect(ipc_sender_name: impl Into<String>) -> Result<Self, ChannelError> {
500        Self::connect_impl(ipc_sender_name.into())
501    }
502    #[cfg(ipc)]
503    fn connect_impl(ipc_sender_name: String) -> Result<Self, ChannelError> {
504        let sender = ipc_channel::ipc::IpcSender::<IpcSender<T>>::connect(ipc_sender_name).map_err(ChannelError::disconnected_by)?;
505        let (s, r) = ipc_unbounded().map_err(ChannelError::disconnected_by)?;
506        crate::channel::with_ipc_serialization(|| sender.send(s)).map_err(ChannelError::disconnected_by)?;
507        Ok(r)
508    }
509    #[cfg(not(ipc))]
510    fn connect_impl(ipc_sender_name: String) -> Result<Self, ChannelError> {
511        named_channel_fallback::receiver_connect_blocking(&ipc_sender_name)
512    }
513}
514
515/// Represents a type that can be an input and output of IPC channels.
516///
517/// # Trait Alias
518///
519/// This trait is used like a type alias for traits and is
520/// already implemented for all types it applies to.
521///
522/// # Implementing
523///
524/// Types need to be `serde::Serialize + serde::de::Deserialize + Send + 'static` to auto-implement this trait,
525/// if you want to send an external type in that does not implement all the traits
526/// you may need to declare a *newtype* wrapper.
527#[diagnostic::on_unimplemented(note = "`IpcValue` is implemented for all `T: Serialize + Deserialize + Send + 'static`")]
528pub trait IpcValue: serde::Serialize + for<'d> serde::de::Deserialize<'d> + Send + 'static {}
529impl<T: serde::Serialize + for<'d> serde::de::Deserialize<'d> + Send + 'static> IpcValue for T {}
530
531#[cfg(ipc)]
532impl From<ipc_channel::ipc::IpcError> for ChannelError {
533    fn from(value: ipc_channel::ipc::IpcError) -> Self {
534        match value {
535            ipc_channel::ipc::IpcError::Disconnected => ChannelError::disconnected(),
536            e => ChannelError::disconnected_by(e),
537        }
538    }
539}
540#[cfg(ipc)]
541impl From<ipc_channel::ipc::TryRecvError> for ChannelError {
542    fn from(value: ipc_channel::ipc::TryRecvError) -> Self {
543        match value {
544            ipc_channel::ipc::TryRecvError::IpcError(ipc_channel::ipc::IpcError::Disconnected) => ChannelError::disconnected(),
545            ipc_channel::ipc::TryRecvError::Empty => ChannelError::Timeout,
546            e => ChannelError::disconnected_by(e),
547        }
548    }
549}
550
551#[cfg(not(ipc))]
552mod named_channel_fallback {
553    use std::{
554        any::Any,
555        collections::HashMap,
556        error::Error,
557        fmt, mem,
558        sync::{Arc, Weak, atomic::AtomicU64},
559    };
560
561    use parking_lot::Mutex;
562    use zng_txt::{Txt, formatx};
563
564    use crate::channel::{ChannelError, IpcReceiver, IpcSender, IpcValue, Receiver, Sender, ipc_unbounded, rendezvous};
565
566    static NAME_COUNT: AtomicU64 = AtomicU64::new(0);
567
568    type P = (Mutex<Box<dyn Any + Send>>, Sender<()>);
569    static PENDING: Mutex<Option<HashMap<Txt, Weak<P>>>> = Mutex::new(None);
570
571    pub struct NamedSender<T: IpcValue> {
572        sender: IpcSender<T>,
573        name: Txt,
574        pending_entry: Arc<P>,
575        sig_recv: Receiver<()>,
576    }
577    impl<T: IpcValue> NamedSender<T> {
578        pub fn new() -> Self {
579            let i = NAME_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
580            let name = formatx!("<not-ipc-{}-{i}>", std::process::id());
581
582            let (sender, receiver) = ipc_unbounded::<T>().unwrap();
583            let (sig_sender, sig_recv) = rendezvous();
584
585            let s: Box<dyn Any + Send> = Box::new(receiver);
586            let pending_entry = Arc::new((Mutex::new(s), sig_sender));
587            PENDING
588                .lock()
589                .get_or_insert_default()
590                .insert(name.clone(), Arc::downgrade(&pending_entry));
591
592            Self {
593                sender,
594                name,
595                pending_entry,
596                sig_recv,
597            }
598        }
599
600        pub fn name(&self) -> &str {
601            &self.name
602        }
603
604        pub fn connect_blocking(self) -> Result<IpcSender<T>, ChannelError> {
605            self.sig_recv.recv_blocking()?;
606            Ok(self.sender)
607        }
608    }
609
610    pub fn receiver_connect_blocking<T: IpcValue>(name: &str) -> Result<IpcReceiver<T>, ChannelError> {
611        let mut p = PENDING.lock();
612        let p = p.get_or_insert_default();
613        p.retain(|_, v| v.strong_count() > 0);
614        match p.remove(name) {
615            Some(e) => match e.upgrade() {
616                Some(e) => {
617                    let recv = mem::replace(&mut *e.0.lock(), Box::new(()));
618                    e.1.send_blocking(());
619                    match recv.downcast::<IpcReceiver<T>>() {
620                        Ok(r) => Ok(*r),
621                        Err(_) => Err(ChannelError::disconnected_by(TypeMismatchError)),
622                    }
623                }
624                None => Err(ChannelError::disconnected()),
625            },
626            None => Err(ChannelError::disconnected()),
627        }
628    }
629
630    pub struct NamedReceiver<T: IpcValue> {
631        receiver: IpcReceiver<T>,
632        name: Txt,
633        pending_entry: Arc<P>,
634        sig_recv: Receiver<()>,
635    }
636    impl<T: IpcValue> NamedReceiver<T> {
637        pub fn new() -> Self {
638            let i = NAME_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
639            let name = formatx!("<not-ipc-{}-{i}>", std::process::id());
640
641            let (sender, receiver) = ipc_unbounded::<T>().unwrap();
642            let (sig_sender, sig_recv) = rendezvous();
643
644            let s: Box<dyn Any + Send> = Box::new(sender);
645            let pending_entry = Arc::new((Mutex::new(s), sig_sender));
646            PENDING
647                .lock()
648                .get_or_insert_default()
649                .insert(name.clone(), Arc::downgrade(&pending_entry));
650
651            Self {
652                receiver,
653                name,
654                pending_entry,
655                sig_recv,
656            }
657        }
658
659        pub fn name(&self) -> &str {
660            &self.name
661        }
662
663        pub fn connect_blocking(self) -> Result<IpcReceiver<T>, ChannelError> {
664            self.sig_recv.recv_blocking()?;
665            Ok(self.receiver)
666        }
667    }
668
669    pub fn sender_connect_blocking<T: IpcValue>(name: &str) -> Result<IpcSender<T>, ChannelError> {
670        let mut p = PENDING.lock();
671        let p = p.get_or_insert_default();
672        p.retain(|_, v| v.strong_count() > 0);
673        match p.remove(name) {
674            Some(e) => match e.upgrade() {
675                Some(e) => {
676                    let recv = mem::replace(&mut *e.0.lock(), Box::new(()));
677                    e.1.send(());
678                    match recv.downcast::<IpcSender<T>>() {
679                        Ok(r) => Ok(*r),
680                        Err(_) => Err(ChannelError::disconnected_by(TypeMismatchError)),
681                    }
682                }
683                None => Err(ChannelError::disconnected()),
684            },
685            None => Err(ChannelError::disconnected()),
686        }
687    }
688
689    #[derive(Debug)]
690    struct TypeMismatchError;
691    impl fmt::Display for TypeMismatchError {
692        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
693            write!(f, "named channel type does not match")
694        }
695    }
696    impl Error for TypeMismatchError {}
697}