a653rs_linux_core/
ipc.rs

1//! Implementation of IPC
2use std::io::{ErrorKind, IoSlice, IoSliceMut};
3use std::marker::PhantomData;
4use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
5use std::os::unix::net::UnixDatagram;
6use std::os::unix::prelude::{AsRawFd, FromRawFd, RawFd};
7use std::path::Path;
8use std::time::Duration;
9
10use anyhow::Error;
11use nix::cmsg_space;
12use nix::errno::Errno;
13use nix::sys::socket::{
14    recvmsg, sendmsg, socketpair, AddressFamily, ControlMessage, ControlMessageOwned, MsgFlags,
15    SockFlag, SockType,
16};
17use polling::{Event, Events, Poller};
18use serde::{Deserialize, Serialize};
19
20use crate::error::{ResultExt, SystemError, TypedResult};
21
22#[derive(Debug)]
23/// Internal data type for the IPC sender
24pub struct IpcSender<T> {
25    socket: UnixDatagram,
26    _p: PhantomData<T>,
27}
28
29#[derive(Debug)]
30/// Internal data type for the IPC receiver
31pub struct IpcReceiver<T> {
32    socket: UnixDatagram,
33    _p: PhantomData<T>,
34}
35
36impl<T> IpcSender<T>
37where
38    T: Serialize,
39{
40    /// Sends value alongside the IpcSender
41    /// This fails if the resource is temporarily not available.
42    pub fn try_send(&self, value: &T) -> TypedResult<()> {
43        self.socket
44            .send(bincode::serialize(value).typ(SystemError::Panic)?.as_ref())
45            .typ(SystemError::Panic)?;
46        Ok(())
47    }
48
49    /// Try sending value alongside the IpcSender for a certain duration
50    pub fn try_send_timeout(&self, _value: &T, _duration: Duration) -> TypedResult<bool> {
51        todo!()
52    }
53}
54
55impl<T> IpcReceiver<T>
56where
57    T: for<'de> Deserialize<'de> + Serialize,
58{
59    /// Reads a single instance of T from the IpcReceiver
60    pub fn try_recv(&self) -> TypedResult<Option<T>> {
61        let mut buffer = vec![0; 65507];
62        let len = match self.socket.recv(&mut buffer) {
63            Ok(len) => len,
64            Err(e) if e.kind() != ErrorKind::TimedOut => {
65                return Err(Error::from(e)).typ(SystemError::Panic)
66            }
67            _ => return Ok(None),
68        };
69
70        // Serialize the received data into T
71        bincode::deserialize(&buffer[0..len])
72            .map(|r| Some(r))
73            .typ(SystemError::Panic)
74    }
75
76    /// Reads a single instance of T from the IpcReceiver but fail after
77    /// duration
78    pub fn try_recv_timeout(&self, duration: Duration) -> TypedResult<Option<T>> {
79        let poller = Poller::new().typ(SystemError::Panic)?;
80        unsafe {
81            poller
82                .add(&self.socket, Event::readable(42))
83                .typ(SystemError::Panic)?;
84        }
85        let poll_res = poller.wait(&mut Events::new(), Some(duration));
86        if let Err(_) | Ok(0) = poll_res {
87            return Ok(None);
88        }
89
90        self.try_recv()
91    }
92}
93
94pub fn bind_receiver<T>(path: &Path) -> TypedResult<IpcReceiver<T>> {
95    let socket = UnixDatagram::bind(path).typ(SystemError::Panic)?;
96    socket.set_nonblocking(true).typ(SystemError::Panic)?;
97    Ok(IpcReceiver::from(socket))
98}
99
100pub fn connect_sender<T>(path: &Path) -> TypedResult<IpcSender<T>> {
101    let socket = UnixDatagram::unbound().typ(SystemError::Panic)?;
102    socket.connect(path).typ(SystemError::Panic)?;
103    socket.set_nonblocking(true).typ(SystemError::Panic)?;
104    Ok(IpcSender::from(socket))
105}
106
107impl<T> AsRawFd for IpcSender<T> {
108    fn as_raw_fd(&self) -> RawFd {
109        self.socket.as_raw_fd()
110    }
111}
112
113impl<T> AsRawFd for IpcReceiver<T> {
114    fn as_raw_fd(&self) -> RawFd {
115        self.socket.as_raw_fd()
116    }
117}
118
119impl<T> AsFd for IpcSender<T> {
120    fn as_fd(&self) -> BorrowedFd<'_> {
121        self.socket.as_fd()
122    }
123}
124
125impl<T> AsFd for IpcReceiver<T> {
126    fn as_fd(&self) -> BorrowedFd<'_> {
127        self.socket.as_fd()
128    }
129}
130
131impl<T> From<UnixDatagram> for IpcReceiver<T> {
132    fn from(value: UnixDatagram) -> Self {
133        Self {
134            socket: value,
135            _p: PhantomData,
136        }
137    }
138}
139
140impl<T> From<UnixDatagram> for IpcSender<T> {
141    fn from(value: UnixDatagram) -> Self {
142        Self {
143            socket: value,
144            _p: PhantomData,
145        }
146    }
147}
148
149impl<T> From<OwnedFd> for IpcSender<T> {
150    fn from(value: OwnedFd) -> Self {
151        Self {
152            socket: UnixDatagram::from(value),
153            _p: PhantomData,
154        }
155    }
156}
157
158impl<T> From<OwnedFd> for IpcReceiver<T> {
159    fn from(value: OwnedFd) -> Self {
160        Self {
161            socket: UnixDatagram::from(value),
162            _p: PhantomData,
163        }
164    }
165}
166
167impl<T> FromRawFd for IpcSender<T> {
168    unsafe fn from_raw_fd(fd: RawFd) -> Self {
169        Self {
170            socket: UnixDatagram::from_raw_fd(fd),
171            _p: PhantomData,
172        }
173    }
174}
175
176impl<T> FromRawFd for IpcReceiver<T> {
177    unsafe fn from_raw_fd(fd: RawFd) -> Self {
178        Self {
179            socket: UnixDatagram::from_raw_fd(fd),
180            _p: PhantomData,
181        }
182    }
183}
184
185/// Creates a pair of sockets that are meant for passing file descriptors to
186/// partitions.
187pub fn io_pair<T>() -> TypedResult<(IoSender<T>, IoReceiver<T>)> {
188    let (tx, rx) = socketpair(
189        AddressFamily::Unix,
190        SockType::Datagram,
191        None,
192        SockFlag::empty(),
193    )
194    .typ(SystemError::Panic)?;
195    Ok((IoSender::from(tx), IoReceiver::from(rx)))
196}
197
198#[derive(Debug)]
199/// Internal data type for the IO resource sender
200pub struct IoSender<T> {
201    socket: UnixDatagram,
202    _p: PhantomData<T>,
203}
204
205impl<T> IoSender<T>
206where
207    T: AsRawFd,
208{
209    /// Sends a resource to the receiving socket.
210    pub fn try_send(&self, resource: impl AsRawFd) -> TypedResult<()> {
211        let fds = [resource.as_raw_fd()];
212        let cmsg = [ControlMessage::ScmRights(&fds)];
213        let buffer = [0u8; 1];
214        let iov = [IoSlice::new(buffer.as_slice())];
215        let io_fd = self.socket.as_raw_fd();
216        sendmsg::<()>(io_fd, &iov, &cmsg, MsgFlags::empty(), None).typ(SystemError::Panic)?;
217        Ok(())
218    }
219}
220
221impl<T> IoReceiver<T>
222where
223    T: FromRawFd,
224{
225    /// Returns the next available IO resource.
226    /// Returns `None`, if no further resources can be read from the socket.
227    ///
228    /// # Safety
229    /// Only safe if `T` matches the type of the file descriptor.
230    pub unsafe fn try_receive(&self) -> TypedResult<Option<T>> {
231        let mut cmsg = cmsg_space!(RawFd);
232        let mut iobuf = [0u8; 1];
233        let mut iov = [IoSliceMut::new(&mut iobuf)];
234        let io_fd = self.socket.as_raw_fd();
235        match recvmsg::<()>(io_fd, &mut iov, Some(&mut cmsg), MsgFlags::MSG_DONTWAIT) {
236            Ok(msg) => {
237                if let Some(ControlMessageOwned::ScmRights(fds)) =
238                    msg.cmsgs().typ(SystemError::Panic)?.next()
239                {
240                    if let &[raw_fd] = fds.as_slice() {
241                        let sock = unsafe { T::from_raw_fd(raw_fd) };
242                        return Ok(Some(sock));
243                    }
244                }
245                Ok(None)
246            }
247            // This should never block since the socket is only written to before the partition
248            // starts.
249            Err(e) if e != Errno::EAGAIN && e != Errno::EINTR => {
250                Err(Error::from(e)).typ(SystemError::Panic)
251            }
252            _ => Ok(None),
253        }
254    }
255}
256
257#[derive(Debug)]
258/// Internal data type for the IO resource sender
259pub struct IoReceiver<T> {
260    socket: UnixDatagram,
261    _p: PhantomData<T>,
262}
263
264impl<T> AsRawFd for IoSender<T> {
265    fn as_raw_fd(&self) -> RawFd {
266        self.socket.as_raw_fd()
267    }
268}
269
270impl<T> AsRawFd for IoReceiver<T> {
271    fn as_raw_fd(&self) -> RawFd {
272        self.socket.as_raw_fd()
273    }
274}
275
276impl<T> From<OwnedFd> for IoSender<T> {
277    fn from(value: OwnedFd) -> Self {
278        Self {
279            socket: UnixDatagram::from(value),
280            _p: PhantomData,
281        }
282    }
283}
284
285impl<T> From<OwnedFd> for IoReceiver<T> {
286    fn from(value: OwnedFd) -> Self {
287        Self {
288            socket: UnixDatagram::from(value),
289            _p: PhantomData,
290        }
291    }
292}
293
294impl<T> FromRawFd for IoSender<T> {
295    unsafe fn from_raw_fd(fd: RawFd) -> Self {
296        Self {
297            socket: UnixDatagram::from_raw_fd(fd),
298            _p: PhantomData,
299        }
300    }
301}
302
303impl<T> FromRawFd for IoReceiver<T> {
304    unsafe fn from_raw_fd(fd: RawFd) -> Self {
305        Self {
306            socket: UnixDatagram::from_raw_fd(fd),
307            _p: PhantomData,
308        }
309    }
310}