1use std::io::{ErrorKind, IoSlice, IoSliceMut};
3use std::marker::PhantomData;
4use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
5use std::os::unix::net::UnixDatagram;
6use std::os::unix::prelude::{AsRawFd, FromRawFd, RawFd};
7use std::path::Path;
8use std::time::Duration;
9
10use anyhow::Error;
11use nix::cmsg_space;
12use nix::errno::Errno;
13use nix::sys::socket::{
14 recvmsg, sendmsg, socketpair, AddressFamily, ControlMessage, ControlMessageOwned, MsgFlags,
15 SockFlag, SockType,
16};
17use polling::{Event, Events, Poller};
18use serde::{Deserialize, Serialize};
19
20use crate::error::{ResultExt, SystemError, TypedResult};
21
22#[derive(Debug)]
23pub struct IpcSender<T> {
25 socket: UnixDatagram,
26 _p: PhantomData<T>,
27}
28
29#[derive(Debug)]
30pub struct IpcReceiver<T> {
32 socket: UnixDatagram,
33 _p: PhantomData<T>,
34}
35
36impl<T> IpcSender<T>
37where
38 T: Serialize,
39{
40 pub fn try_send(&self, value: &T) -> TypedResult<()> {
43 self.socket
44 .send(bincode::serialize(value).typ(SystemError::Panic)?.as_ref())
45 .typ(SystemError::Panic)?;
46 Ok(())
47 }
48
49 pub fn try_send_timeout(&self, _value: &T, _duration: Duration) -> TypedResult<bool> {
51 todo!()
52 }
53}
54
55impl<T> IpcReceiver<T>
56where
57 T: for<'de> Deserialize<'de> + Serialize,
58{
59 pub fn try_recv(&self) -> TypedResult<Option<T>> {
61 let mut buffer = vec![0; 65507];
62 let len = match self.socket.recv(&mut buffer) {
63 Ok(len) => len,
64 Err(e) if e.kind() != ErrorKind::TimedOut => {
65 return Err(Error::from(e)).typ(SystemError::Panic)
66 }
67 _ => return Ok(None),
68 };
69
70 bincode::deserialize(&buffer[0..len])
72 .map(|r| Some(r))
73 .typ(SystemError::Panic)
74 }
75
76 pub fn try_recv_timeout(&self, duration: Duration) -> TypedResult<Option<T>> {
79 let poller = Poller::new().typ(SystemError::Panic)?;
80 unsafe {
81 poller
82 .add(&self.socket, Event::readable(42))
83 .typ(SystemError::Panic)?;
84 }
85 let poll_res = poller.wait(&mut Events::new(), Some(duration));
86 if let Err(_) | Ok(0) = poll_res {
87 return Ok(None);
88 }
89
90 self.try_recv()
91 }
92}
93
94pub fn bind_receiver<T>(path: &Path) -> TypedResult<IpcReceiver<T>> {
95 let socket = UnixDatagram::bind(path).typ(SystemError::Panic)?;
96 socket.set_nonblocking(true).typ(SystemError::Panic)?;
97 Ok(IpcReceiver::from(socket))
98}
99
100pub fn connect_sender<T>(path: &Path) -> TypedResult<IpcSender<T>> {
101 let socket = UnixDatagram::unbound().typ(SystemError::Panic)?;
102 socket.connect(path).typ(SystemError::Panic)?;
103 socket.set_nonblocking(true).typ(SystemError::Panic)?;
104 Ok(IpcSender::from(socket))
105}
106
107impl<T> AsRawFd for IpcSender<T> {
108 fn as_raw_fd(&self) -> RawFd {
109 self.socket.as_raw_fd()
110 }
111}
112
113impl<T> AsRawFd for IpcReceiver<T> {
114 fn as_raw_fd(&self) -> RawFd {
115 self.socket.as_raw_fd()
116 }
117}
118
119impl<T> AsFd for IpcSender<T> {
120 fn as_fd(&self) -> BorrowedFd<'_> {
121 self.socket.as_fd()
122 }
123}
124
125impl<T> AsFd for IpcReceiver<T> {
126 fn as_fd(&self) -> BorrowedFd<'_> {
127 self.socket.as_fd()
128 }
129}
130
131impl<T> From<UnixDatagram> for IpcReceiver<T> {
132 fn from(value: UnixDatagram) -> Self {
133 Self {
134 socket: value,
135 _p: PhantomData,
136 }
137 }
138}
139
140impl<T> From<UnixDatagram> for IpcSender<T> {
141 fn from(value: UnixDatagram) -> Self {
142 Self {
143 socket: value,
144 _p: PhantomData,
145 }
146 }
147}
148
149impl<T> From<OwnedFd> for IpcSender<T> {
150 fn from(value: OwnedFd) -> Self {
151 Self {
152 socket: UnixDatagram::from(value),
153 _p: PhantomData,
154 }
155 }
156}
157
158impl<T> From<OwnedFd> for IpcReceiver<T> {
159 fn from(value: OwnedFd) -> Self {
160 Self {
161 socket: UnixDatagram::from(value),
162 _p: PhantomData,
163 }
164 }
165}
166
167impl<T> FromRawFd for IpcSender<T> {
168 unsafe fn from_raw_fd(fd: RawFd) -> Self {
169 Self {
170 socket: UnixDatagram::from_raw_fd(fd),
171 _p: PhantomData,
172 }
173 }
174}
175
176impl<T> FromRawFd for IpcReceiver<T> {
177 unsafe fn from_raw_fd(fd: RawFd) -> Self {
178 Self {
179 socket: UnixDatagram::from_raw_fd(fd),
180 _p: PhantomData,
181 }
182 }
183}
184
185pub fn io_pair<T>() -> TypedResult<(IoSender<T>, IoReceiver<T>)> {
188 let (tx, rx) = socketpair(
189 AddressFamily::Unix,
190 SockType::Datagram,
191 None,
192 SockFlag::empty(),
193 )
194 .typ(SystemError::Panic)?;
195 Ok((IoSender::from(tx), IoReceiver::from(rx)))
196}
197
198#[derive(Debug)]
199pub struct IoSender<T> {
201 socket: UnixDatagram,
202 _p: PhantomData<T>,
203}
204
205impl<T> IoSender<T>
206where
207 T: AsRawFd,
208{
209 pub fn try_send(&self, resource: impl AsRawFd) -> TypedResult<()> {
211 let fds = [resource.as_raw_fd()];
212 let cmsg = [ControlMessage::ScmRights(&fds)];
213 let buffer = [0u8; 1];
214 let iov = [IoSlice::new(buffer.as_slice())];
215 let io_fd = self.socket.as_raw_fd();
216 sendmsg::<()>(io_fd, &iov, &cmsg, MsgFlags::empty(), None).typ(SystemError::Panic)?;
217 Ok(())
218 }
219}
220
221impl<T> IoReceiver<T>
222where
223 T: FromRawFd,
224{
225 pub unsafe fn try_receive(&self) -> TypedResult<Option<T>> {
231 let mut cmsg = cmsg_space!(RawFd);
232 let mut iobuf = [0u8; 1];
233 let mut iov = [IoSliceMut::new(&mut iobuf)];
234 let io_fd = self.socket.as_raw_fd();
235 match recvmsg::<()>(io_fd, &mut iov, Some(&mut cmsg), MsgFlags::MSG_DONTWAIT) {
236 Ok(msg) => {
237 if let Some(ControlMessageOwned::ScmRights(fds)) =
238 msg.cmsgs().typ(SystemError::Panic)?.next()
239 {
240 if let &[raw_fd] = fds.as_slice() {
241 let sock = unsafe { T::from_raw_fd(raw_fd) };
242 return Ok(Some(sock));
243 }
244 }
245 Ok(None)
246 }
247 Err(e) if e != Errno::EAGAIN && e != Errno::EINTR => {
250 Err(Error::from(e)).typ(SystemError::Panic)
251 }
252 _ => Ok(None),
253 }
254 }
255}
256
257#[derive(Debug)]
258pub struct IoReceiver<T> {
260 socket: UnixDatagram,
261 _p: PhantomData<T>,
262}
263
264impl<T> AsRawFd for IoSender<T> {
265 fn as_raw_fd(&self) -> RawFd {
266 self.socket.as_raw_fd()
267 }
268}
269
270impl<T> AsRawFd for IoReceiver<T> {
271 fn as_raw_fd(&self) -> RawFd {
272 self.socket.as_raw_fd()
273 }
274}
275
276impl<T> From<OwnedFd> for IoSender<T> {
277 fn from(value: OwnedFd) -> Self {
278 Self {
279 socket: UnixDatagram::from(value),
280 _p: PhantomData,
281 }
282 }
283}
284
285impl<T> From<OwnedFd> for IoReceiver<T> {
286 fn from(value: OwnedFd) -> Self {
287 Self {
288 socket: UnixDatagram::from(value),
289 _p: PhantomData,
290 }
291 }
292}
293
294impl<T> FromRawFd for IoSender<T> {
295 unsafe fn from_raw_fd(fd: RawFd) -> Self {
296 Self {
297 socket: UnixDatagram::from_raw_fd(fd),
298 _p: PhantomData,
299 }
300 }
301}
302
303impl<T> FromRawFd for IoReceiver<T> {
304 unsafe fn from_raw_fd(fd: RawFd) -> Self {
305 Self {
306 socket: UnixDatagram::from_raw_fd(fd),
307 _p: PhantomData,
308 }
309 }
310}