tokio_unix_ipc/
typed_channel.rs1use std::fmt;
2use std::io;
3use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
4use std::os::unix::net::UnixStream;
5use std::path::Path;
6
7use serde_::de::DeserializeOwned;
8use serde_::Serialize;
9
10use crate::raw_channel::{raw_channel, RawReceiver, RawSender};
11use crate::serde::{deserialize, serialize};
12
13pub struct Receiver<T> {
15 raw_receiver: RawReceiver,
16 _marker: std::marker::PhantomData<T>,
17}
18
19pub struct Sender<T> {
21 raw_sender: RawSender,
22 _marker: std::marker::PhantomData<T>,
23}
24
25impl<T> fmt::Debug for Receiver<T> {
26 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
27 f.debug_struct("Receiver")
28 .field("fd", &self.as_raw_fd())
29 .finish()
30 }
31}
32
33impl<T> fmt::Debug for Sender<T> {
34 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35 f.debug_struct("Sender")
36 .field("fd", &self.as_raw_fd())
37 .finish()
38 }
39}
40
41macro_rules! fd_impl {
42 ($field:ident, $raw_ty:ident, $ty:ty) => {
43 #[allow(dead_code)]
44 impl<T> $ty {
45 pub(crate) unsafe fn from_raw_fd(fd: RawFd) -> io::Result<Self> {
46 Ok(Self {
47 $field: $raw_ty::from_raw_fd(fd)?,
48 _marker: std::marker::PhantomData,
49 })
50 }
51
52 pub(crate) fn from_std(stream: UnixStream) -> io::Result<Self> {
53 Ok(Self {
54 $field: $raw_ty::from_std(stream)?,
55 _marker: std::marker::PhantomData,
56 })
57 }
58
59 pub(crate) fn extract_raw_fd(&self) -> RawFd {
60 self.$field.extract_raw_fd()
61 }
62 }
63
64 impl<T: Serialize + DeserializeOwned> FromRawFd for $ty {
65 unsafe fn from_raw_fd(fd: RawFd) -> Self {
66 Self {
67 $field: FromRawFd::from_raw_fd(fd),
68 _marker: std::marker::PhantomData,
69 }
70 }
71 }
72
73 impl<T> IntoRawFd for $ty {
74 fn into_raw_fd(self) -> RawFd {
75 self.$field.into_raw_fd()
76 }
77 }
78
79 impl<T: Serialize + DeserializeOwned> From<$raw_ty> for $ty {
80 fn from(value: $raw_ty) -> Self {
81 Self {
82 $field: value,
83 _marker: std::marker::PhantomData,
84 }
85 }
86 }
87
88 impl<T> AsRawFd for $ty {
89 fn as_raw_fd(&self) -> RawFd {
90 self.$field.as_raw_fd()
91 }
92 }
93 };
94}
95
96fd_impl!(raw_receiver, RawReceiver, Receiver<T>);
97fd_impl!(raw_sender, RawSender, Sender<T>);
98
99pub fn channel<S: Serialize + DeserializeOwned, R: Serialize + DeserializeOwned>(
101) -> io::Result<(Sender<S>, Receiver<R>)> {
102 let (sender, receiver) = raw_channel()?;
103 Ok((sender.into(), receiver.into()))
104}
105
106pub fn symmetric_channel<T: Serialize + DeserializeOwned>() -> io::Result<(Sender<T>, Receiver<T>)>
109{
110 let (sender, receiver) = raw_channel()?;
111 Ok((sender.into(), receiver.into()))
112}
113
114pub fn channel_from_std<S: Serialize + DeserializeOwned, R: Serialize + DeserializeOwned>(
116 sender: UnixStream,
117) -> io::Result<(Sender<S>, Receiver<R>)> {
118 let receiver = sender.try_clone()?;
119 let sender = RawSender::from_std(sender)?;
120 let receiver = RawReceiver::from_std(receiver)?;
121 Ok((sender.into(), receiver.into()))
122}
123
124impl<T: Serialize + DeserializeOwned> Receiver<T> {
125 pub async fn connect<P: AsRef<Path>>(p: P) -> io::Result<Receiver<T>> {
127 RawReceiver::connect(p).await.map(Into::into)
128 }
129
130 pub fn into_raw_receiver(self) -> RawReceiver {
132 self.raw_receiver
133 }
134
135 pub async fn recv(&self) -> io::Result<T> {
137 let (buf, fds) = self.raw_receiver.recv().await?;
138 deserialize::<(T, bool)>(&buf, fds.as_deref().unwrap_or_default()).map(|x| x.0)
139 }
140}
141
142unsafe impl<T> Send for Receiver<T> {}
143unsafe impl<T> Sync for Receiver<T> {}
144
145impl<T: Serialize + DeserializeOwned> Sender<T> {
146 pub fn into_raw_sender(self) -> RawSender {
148 self.raw_sender
149 }
150
151 pub async fn send(&self, s: T) -> io::Result<()> {
153 let (payload, fds) = serialize((s, true))?;
156 self.raw_sender.send(&payload, &fds).await?;
157 Ok(())
158 }
159}
160
161unsafe impl<T> Send for Sender<T> {}
162unsafe impl<T> Sync for Sender<T> {}