tokio_unix_ipc/
typed_channel.rs

1use std::fmt;
2use std::io;
3use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
4use std::os::unix::net::UnixStream;
5use std::path::Path;
6
7use serde_::de::DeserializeOwned;
8use serde_::Serialize;
9
10use crate::raw_channel::{raw_channel, RawReceiver, RawSender};
11use crate::serde::{deserialize, serialize};
12
13/// A typed receiver.
14pub struct Receiver<T> {
15    raw_receiver: RawReceiver,
16    _marker: std::marker::PhantomData<T>,
17}
18
19/// A typed sender.
20pub struct Sender<T> {
21    raw_sender: RawSender,
22    _marker: std::marker::PhantomData<T>,
23}
24
25impl<T> fmt::Debug for Receiver<T> {
26    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
27        f.debug_struct("Receiver")
28            .field("fd", &self.as_raw_fd())
29            .finish()
30    }
31}
32
33impl<T> fmt::Debug for Sender<T> {
34    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35        f.debug_struct("Sender")
36            .field("fd", &self.as_raw_fd())
37            .finish()
38    }
39}
40
41macro_rules! fd_impl {
42    ($field:ident, $raw_ty:ident, $ty:ty) => {
43        #[allow(dead_code)]
44        impl<T> $ty {
45            pub(crate) unsafe fn from_raw_fd(fd: RawFd) -> io::Result<Self> {
46                Ok(Self {
47                    $field: $raw_ty::from_raw_fd(fd)?,
48                    _marker: std::marker::PhantomData,
49                })
50            }
51
52            pub(crate) fn from_std(stream: UnixStream) -> io::Result<Self> {
53                Ok(Self {
54                    $field: $raw_ty::from_std(stream)?,
55                    _marker: std::marker::PhantomData,
56                })
57            }
58
59            pub(crate) fn extract_raw_fd(&self) -> RawFd {
60                self.$field.extract_raw_fd()
61            }
62        }
63
64        impl<T: Serialize + DeserializeOwned> FromRawFd for $ty {
65            unsafe fn from_raw_fd(fd: RawFd) -> Self {
66                Self {
67                    $field: FromRawFd::from_raw_fd(fd),
68                    _marker: std::marker::PhantomData,
69                }
70            }
71        }
72
73        impl<T> IntoRawFd for $ty {
74            fn into_raw_fd(self) -> RawFd {
75                self.$field.into_raw_fd()
76            }
77        }
78
79        impl<T: Serialize + DeserializeOwned> From<$raw_ty> for $ty {
80            fn from(value: $raw_ty) -> Self {
81                Self {
82                    $field: value,
83                    _marker: std::marker::PhantomData,
84                }
85            }
86        }
87
88        impl<T> AsRawFd for $ty {
89            fn as_raw_fd(&self) -> RawFd {
90                self.$field.as_raw_fd()
91            }
92        }
93    };
94}
95
96fd_impl!(raw_receiver, RawReceiver, Receiver<T>);
97fd_impl!(raw_sender, RawSender, Sender<T>);
98
99/// Creates a typed connected channel.
100pub fn channel<S: Serialize + DeserializeOwned, R: Serialize + DeserializeOwned>(
101) -> io::Result<(Sender<S>, Receiver<R>)> {
102    let (sender, receiver) = raw_channel()?;
103    Ok((sender.into(), receiver.into()))
104}
105
106/// Creates a typed connected channel where the same type is
107/// both sent and received.
108pub fn symmetric_channel<T: Serialize + DeserializeOwned>() -> io::Result<(Sender<T>, Receiver<T>)>
109{
110    let (sender, receiver) = raw_channel()?;
111    Ok((sender.into(), receiver.into()))
112}
113
114/// Creates a typed connected channel from an already extant socket.
115pub fn channel_from_std<S: Serialize + DeserializeOwned, R: Serialize + DeserializeOwned>(
116    sender: UnixStream,
117) -> io::Result<(Sender<S>, Receiver<R>)> {
118    let receiver = sender.try_clone()?;
119    let sender = RawSender::from_std(sender)?;
120    let receiver = RawReceiver::from_std(receiver)?;
121    Ok((sender.into(), receiver.into()))
122}
123
124impl<T: Serialize + DeserializeOwned> Receiver<T> {
125    /// Connects a receiver to a named unix socket.
126    pub async fn connect<P: AsRef<Path>>(p: P) -> io::Result<Receiver<T>> {
127        RawReceiver::connect(p).await.map(Into::into)
128    }
129
130    /// Converts the typed receiver into a raw one.
131    pub fn into_raw_receiver(self) -> RawReceiver {
132        self.raw_receiver
133    }
134
135    /// Receives a structured message from the socket.
136    pub async fn recv(&self) -> io::Result<T> {
137        let (buf, fds) = self.raw_receiver.recv().await?;
138        deserialize::<(T, bool)>(&buf, fds.as_deref().unwrap_or_default()).map(|x| x.0)
139    }
140}
141
142unsafe impl<T> Send for Receiver<T> {}
143unsafe impl<T> Sync for Receiver<T> {}
144
145impl<T: Serialize + DeserializeOwned> Sender<T> {
146    /// Converts the typed sender into a raw one.
147    pub fn into_raw_sender(self) -> RawSender {
148        self.raw_sender
149    }
150
151    /// Receives a structured message from the socket.
152    pub async fn send(&self, s: T) -> io::Result<()> {
153        // we always serialize a dummy bool at the end so that the message
154        // will not be empty because of zero sized types.
155        let (payload, fds) = serialize((s, true))?;
156        self.raw_sender.send(&payload, &fds).await?;
157        Ok(())
158    }
159}
160
161unsafe impl<T> Send for Sender<T> {}
162unsafe impl<T> Sync for Sender<T> {}