Skip to main content

ipc_channel/platform/unix/
mod.rs

1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10use crate::ipc::IpcMessage;
11use libc::{
12    self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, MSG_EOR,
13    PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET,
14};
15use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK};
16use libc::{iovec, msghdr, off_t, recvmsg, sendmsg};
17use libc::{sa_family_t, setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t};
18use libc::{EAGAIN, EWOULDBLOCK};
19use mio::unix::SourceFd;
20use mio::{Events, Interest, Poll, Token};
21use rustc_hash::FxHasher;
22use std::cell::Cell;
23use std::cmp;
24use std::collections::HashMap;
25use std::convert::TryInto;
26use std::ffi::{c_uint, CString};
27use std::fmt::{self, Debug, Formatter};
28use std::hash::BuildHasherDefault;
29use std::io;
30use std::mem;
31use std::ops::{Deref, RangeFrom};
32use std::os::fd::RawFd;
33use std::ptr;
34use std::slice;
35use std::sync::atomic::{AtomicUsize, Ordering};
36use std::sync::{Arc, LazyLock};
37use std::thread;
38use std::time::{Duration, UNIX_EPOCH};
39use tempfile::{Builder, TempDir};
40use thiserror::Error;
41
42const MAX_FDS_IN_CMSG: u32 = 64;
43
44// The value of SO_SNDBUF is not the size we are actually allowed to use...
45// Empirically, we have to deduct 32 bytes from that on Linux.
46//
47// On FreeBSD the control messages share the buffer space with the normal data
48// The kernel-side representation of an SCM_RIGHTS message consists of the
49// cmsghdr, followed by the pointers to the file descriptors. Thus, the amount
50// we need to deduct from SO_SNDBUF depends on the number of file descriptors.
51//
52// A safe solution is to budget for all MAX_FDS_IN_CMSG file descriptors being
53// present, each taking up 8 bytes regardless of the target.
54const RESERVED_SIZE: usize = unsafe { CMSG_SPACE(MAX_FDS_IN_CMSG * 8) } as usize;
55
56#[cfg(any(target_os = "linux", target_os = "illumos", target_os = "freebsd"))]
57const SOCK_FLAGS: c_int = libc::SOCK_CLOEXEC;
58#[cfg(not(any(target_os = "linux", target_os = "illumos", target_os = "freebsd")))]
59const SOCK_FLAGS: c_int = 0;
60
61#[cfg(any(target_os = "linux", target_os = "illumos", target_os = "freebsd"))]
62const RECVMSG_FLAGS: c_int = libc::MSG_CMSG_CLOEXEC;
63#[cfg(not(any(target_os = "linux", target_os = "illumos", target_os = "freebsd")))]
64const RECVMSG_FLAGS: c_int = 0;
65
66#[cfg(target_env = "gnu")]
67type IovLen = usize;
68
69#[cfg(not(target_env = "gnu"))]
70type IovLen = i32;
71#[cfg(target_env = "gnu")]
72type MsgControlLen = size_t;
73#[cfg(not(target_env = "gnu"))]
74type MsgControlLen = socklen_t;
75
76#[derive(Debug, Error)]
77pub enum OsTrySelectError {
78    #[error("Error in IO: {0}.")]
79    IoError(#[from] UnixError),
80    #[error("No messages were received and no disconnections occurred.")]
81    Empty,
82}
83
84unsafe fn new_sockaddr_un(path: *const c_char) -> (sockaddr_un, usize) {
85    let mut sockaddr: sockaddr_un = mem::zeroed();
86    libc::strncpy(
87        sockaddr.sun_path.as_mut_ptr(),
88        path,
89        sockaddr.sun_path.len() - 1,
90    );
91    sockaddr.sun_family = libc::AF_UNIX as sa_family_t;
92    (sockaddr, mem::size_of::<sockaddr_un>())
93}
94
95static SYSTEM_SENDBUF_SIZE: LazyLock<usize> = LazyLock::new(|| {
96    let (tx, _) = channel().expect("Failed to obtain a socket for checking maximum send size");
97    tx.get_system_sendbuf_size()
98        .expect("Failed to obtain maximum send size for socket")
99});
100
101// The pid of the current process which is used to create unique IDs
102static PID: LazyLock<u32> = LazyLock::new(std::process::id);
103
104// A global count used to create unique IDs
105static SHM_COUNT: AtomicUsize = AtomicUsize::new(0);
106
107pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), UnixError> {
108    let mut results = [0, 0];
109    unsafe {
110        if socketpair(
111            libc::AF_UNIX,
112            SOCK_SEQPACKET | SOCK_FLAGS,
113            0,
114            &mut results[0],
115        ) >= 0
116        {
117            Ok((
118                OsIpcSender::from_fd(results[0]),
119                OsIpcReceiver::from_fd(results[1]),
120            ))
121        } else {
122            Err(UnixError::last())
123        }
124    }
125}
126
127#[derive(Clone, Copy)]
128struct PollEntry {
129    pub id: u64,
130    pub fd: RawFd,
131}
132
133#[derive(Debug)]
134pub struct OsIpcReceiver {
135    fd: Cell<c_int>,
136}
137
138impl Drop for OsIpcReceiver {
139    fn drop(&mut self) {
140        unsafe {
141            let fd = self.fd.get();
142            if fd >= 0 {
143                let result = libc::close(fd);
144                assert!(
145                    thread::panicking() || result == 0,
146                    "closed receiver (fd: {}): {}",
147                    fd,
148                    UnixError::last(),
149                );
150            }
151        }
152    }
153}
154
155impl OsIpcReceiver {
156    fn from_fd(fd: c_int) -> OsIpcReceiver {
157        OsIpcReceiver { fd: Cell::new(fd) }
158    }
159
160    fn consume_fd(&self) -> c_int {
161        let fd = self.fd.get();
162        self.fd.set(-1);
163        fd
164    }
165
166    pub fn consume(&self) -> OsIpcReceiver {
167        OsIpcReceiver::from_fd(self.consume_fd())
168    }
169
170    #[allow(clippy::type_complexity)]
171    pub fn recv(&self) -> Result<IpcMessage, UnixError> {
172        recv(self.fd.get(), BlockingMode::Blocking)
173    }
174
175    #[allow(clippy::type_complexity)]
176    pub fn try_recv(&self) -> Result<IpcMessage, UnixError> {
177        recv(self.fd.get(), BlockingMode::Nonblocking)
178    }
179
180    #[allow(clippy::type_complexity)]
181    pub fn try_recv_timeout(&self, duration: Duration) -> Result<IpcMessage, UnixError> {
182        recv(self.fd.get(), BlockingMode::Timeout(duration))
183    }
184}
185
186#[derive(PartialEq, Debug)]
187struct SharedFileDescriptor(c_int);
188
189impl Drop for SharedFileDescriptor {
190    fn drop(&mut self) {
191        unsafe {
192            let result = libc::close(self.0);
193            assert!(thread::panicking() || result == 0);
194        }
195    }
196}
197
198#[derive(Debug, Clone)]
199pub struct OsIpcSender {
200    fd: Arc<SharedFileDescriptor>,
201}
202
203impl OsIpcSender {
204    fn from_fd(fd: c_int) -> OsIpcSender {
205        OsIpcSender {
206            fd: Arc::new(SharedFileDescriptor(fd)),
207        }
208    }
209
210    /// Maximum size of the kernel buffer used for transfers over this channel.
211    ///
212    /// Note: This is *not* the actual maximal packet size we are allowed to use...
213    /// Some of it is reserved by the kernel for bookkeeping.
214    fn get_system_sendbuf_size(&self) -> Result<usize, UnixError> {
215        unsafe {
216            let mut socket_sendbuf_size: c_int = 0;
217            let mut socket_sendbuf_size_len = mem::size_of::<c_int>() as socklen_t;
218            if getsockopt(
219                self.fd.0,
220                libc::SOL_SOCKET,
221                libc::SO_SNDBUF,
222                &mut socket_sendbuf_size as *mut _ as *mut c_void,
223                &mut socket_sendbuf_size_len as *mut socklen_t,
224            ) < 0
225            {
226                return Err(UnixError::last());
227            }
228            Ok(socket_sendbuf_size.try_into().unwrap())
229        }
230    }
231
232    /// Calculate maximum payload data size per fragment.
233    ///
234    /// It is the total size of the kernel buffer, minus the part reserved by the kernel.
235    ///
236    /// The `sendbuf_size` passed in should usually be the maximum kernel buffer size,
237    /// i.e. the value of *SYSTEM_SENDBUF_SIZE --
238    /// except after getting ENOBUFS, in which case it needs to be reduced.
239    fn fragment_size(sendbuf_size: usize) -> usize {
240        sendbuf_size - RESERVED_SIZE
241    }
242
243    /// Calculate maximum payload data size of first fragment.
244    ///
245    /// This one is smaller than regular fragments, because it carries the message (size) header.
246    fn first_fragment_size(sendbuf_size: usize) -> usize {
247        (Self::fragment_size(sendbuf_size) - 2 * mem::size_of::<usize>()) & (!8usize + 1)
248        // Ensure optimal alignment.
249    }
250
251    /// Maximum data size that can be transferred over this channel in a single packet.
252    ///
253    /// This is the size of the main data chunk only --
254    /// it's independent of any auxiliary data (FDs) transferred along with it.
255    ///
256    /// A send on this channel won't block for transfers up to this size
257    /// under normal circumstances.
258    /// (It might still block if heavy memory pressure causes ENOBUFS,
259    /// forcing us to reduce the packet size.)
260    pub fn get_max_fragment_size() -> usize {
261        Self::first_fragment_size(*SYSTEM_SENDBUF_SIZE)
262    }
263
264    pub fn send(
265        &self,
266        data: &[u8],
267        channels: Vec<OsIpcChannel>,
268        shared_memory_regions: Vec<OsIpcSharedMemory>,
269    ) -> Result<(), UnixError> {
270        let mut fds = Vec::new();
271        for channel in channels.iter() {
272            fds.push(channel.fd());
273        }
274        for shared_memory_region in shared_memory_regions.iter() {
275            fds.push(shared_memory_region.store.fd());
276        }
277
278        // `len` is the total length of the message.
279        // Its value will be sent as a message header before the payload data.
280        //
281        // Not to be confused with the length of the data to send in this packet
282        // (i.e. the length of the data buffer passed in),
283        // which in a fragmented send will be smaller than the total message length.
284        fn send_first_fragment(
285            sender_fd: c_int,
286            fds: &[c_int],
287            data_buffer: &[u8],
288            len: usize,
289        ) -> Result<(), UnixError> {
290            assert!(fds.len() <= MAX_FDS_IN_CMSG as usize);
291
292            let mut fragment_size: usize = 0;
293            fragment_size =
294                mem::size_of_val(&fragment_size) + mem::size_of_val(&len) + data_buffer.len();
295
296            let result = unsafe {
297                let cmsg_length = mem::size_of_val(fds) as c_uint;
298                let (cmsg_buffer, cmsg_space) = if cmsg_length > 0 {
299                    let cmsg_buffer =
300                        libc::malloc(CMSG_SPACE(cmsg_length) as usize) as *mut cmsghdr;
301                    if cmsg_buffer.is_null() {
302                        return Err(UnixError::last());
303                    }
304                    (*cmsg_buffer).cmsg_len = CMSG_LEN(cmsg_length) as MsgControlLen;
305                    (*cmsg_buffer).cmsg_level = libc::SOL_SOCKET;
306                    (*cmsg_buffer).cmsg_type = libc::SCM_RIGHTS;
307
308                    ptr::copy_nonoverlapping(
309                        fds.as_ptr(),
310                        CMSG_DATA(cmsg_buffer) as *mut c_int,
311                        fds.len(),
312                    );
313                    (cmsg_buffer, CMSG_SPACE(cmsg_length))
314                } else {
315                    (ptr::null_mut(), 0)
316                };
317
318                let mut iovec = [
319                    // First fragment begins with a header recording the fragment size
320                    // (including the header) and the total data length.
321                    //
322                    // The receiver uses this to determine if the fragment has been fully
323                    // read, and afterwards whether it already got the entire message,
324                    // or needs to receive additional fragments -- and if so, how much.
325                    iovec {
326                        iov_base: &fragment_size as *const _ as *mut c_void,
327                        iov_len: mem::size_of_val(&fragment_size),
328                    },
329                    iovec {
330                        iov_base: &len as *const _ as *mut c_void,
331                        iov_len: mem::size_of_val(&len),
332                    },
333                    iovec {
334                        iov_base: data_buffer.as_ptr() as *mut c_void,
335                        iov_len: data_buffer.len(),
336                    },
337                ];
338
339                let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen);
340                let result = sendmsg(sender_fd, &msghdr, MSG_EOR);
341                libc::free(cmsg_buffer as *mut c_void);
342                result
343            };
344
345            if result > 0 {
346                assert_eq!(fragment_size, result.try_into().unwrap());
347                Ok(())
348            } else {
349                Err(UnixError::last())
350            }
351        }
352
353        fn send_followup_fragment(sender_fd: c_int, data_buffer: &[u8]) -> Result<(), UnixError> {
354            let result = unsafe {
355                libc::send(
356                    sender_fd,
357                    data_buffer.as_ptr() as *const c_void,
358                    data_buffer.len(),
359                    0,
360                )
361            };
362
363            if result > 0 {
364                assert_eq!(data_buffer.len(), result.try_into().unwrap());
365                Ok(())
366            } else {
367                Err(UnixError::last())
368            }
369        }
370
371        let mut sendbuf_size = *SYSTEM_SENDBUF_SIZE;
372
373        /// Reduce send buffer size after getting ENOBUFS,
374        /// i.e. when the kernel failed to allocate a large enough buffer.
375        ///
376        /// (If the buffer already was significantly smaller
377        /// than the memory page size though,
378        /// if means something else must have gone wrong;
379        /// so there is no point in further downsizing,
380        /// and we error out instead.)
381        fn downsize(sendbuf_size: &mut usize, sent_size: usize) -> Result<(), ()> {
382            if sent_size > 2000 {
383                *sendbuf_size /= 2;
384                // Make certain we end up with less than what we tried before...
385                if *sendbuf_size >= sent_size {
386                    *sendbuf_size = sent_size / 2;
387                }
388                Ok(())
389            } else {
390                Err(())
391            }
392        }
393
394        // If the message is small enough, try sending it in a single fragment.
395        if data.len() <= Self::get_max_fragment_size() {
396            match send_first_fragment(self.fd.0, &fds[..], data, data.len()) {
397                Ok(_) => return Ok(()),
398                Err(error) => {
399                    // ENOBUFS means the kernel failed to allocate a buffer large enough
400                    // to actually transfer the message,
401                    // although the message was small enough to fit the maximum send size --
402                    // so we have to proceed with a fragmented send nevertheless,
403                    // using a reduced send buffer size.
404                    //
405                    // Any other errors we might get here are non-recoverable.
406                    if !(matches!(error, UnixError::Errno(libc::ENOBUFS))
407                        && downsize(&mut sendbuf_size, data.len()).is_ok())
408                    {
409                        return Err(error);
410                    }
411                },
412            }
413        }
414
415        // The packet is too big. Fragmentation time!
416        //
417        // Create dedicated channel to send all but the first fragment.
418        // This way we avoid fragments of different messages interleaving in the receiver.
419        //
420        // The receiver end of the channel is sent with the first fragment
421        // along any other file descriptors that are to be transferred in the message.
422        let (dedicated_tx, dedicated_rx) = channel()?;
423        // Extract FD handle without consuming the Receiver, so the FD doesn't get closed.
424        fds.push(dedicated_rx.fd.get());
425
426        // Split up the packet into fragments.
427        let mut byte_position = 0;
428        while byte_position < data.len() {
429            let end_byte_position;
430            let result = if byte_position == 0 {
431                // First fragment. No offset; but contains message header (total size).
432                // The auxiliary data (FDs) is also sent along with this one.
433
434                // This fragment always uses the full allowable buffer size.
435                end_byte_position = Self::first_fragment_size(sendbuf_size);
436                send_first_fragment(self.fd.0, &fds[..], &data[..end_byte_position], data.len())
437            } else {
438                // Followup fragment. No header; but offset by amount of data already sent.
439
440                end_byte_position = cmp::min(
441                    byte_position + Self::fragment_size(sendbuf_size),
442                    data.len(),
443                );
444                send_followup_fragment(dedicated_tx.fd.0, &data[byte_position..end_byte_position])
445            };
446
447            if let Err(error) = result {
448                if matches!(error, UnixError::Errno(libc::ENOBUFS))
449                    && downsize(&mut sendbuf_size, end_byte_position - byte_position).is_ok()
450                {
451                    // If the kernel failed to allocate a buffer large enough for the packet,
452                    // retry with a smaller size (if possible).
453                    continue;
454                } else {
455                    return Err(error);
456                }
457            }
458
459            byte_position = end_byte_position;
460        }
461
462        Ok(())
463    }
464
465    pub fn connect(name: String) -> Result<OsIpcSender, UnixError> {
466        let name = CString::new(name).unwrap();
467        unsafe {
468            let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
469            let (sockaddr, len) = new_sockaddr_un(name.as_ptr());
470            if libc::connect(
471                fd,
472                &sockaddr as *const _ as *const sockaddr,
473                len as socklen_t,
474            ) < 0
475            {
476                return Err(UnixError::last());
477            }
478
479            Ok(OsIpcSender::from_fd(fd))
480        }
481    }
482}
483
484#[derive(Debug)]
485pub enum OsIpcChannel {
486    Sender(OsIpcSender),
487    Receiver(OsIpcReceiver),
488}
489
490impl OsIpcChannel {
491    fn fd(&self) -> c_int {
492        match *self {
493            OsIpcChannel::Sender(ref sender) => sender.fd.0,
494            OsIpcChannel::Receiver(ref receiver) => receiver.fd.get(),
495        }
496    }
497}
498
499pub struct OsIpcReceiverSet {
500    incrementor: RangeFrom<u64>,
501    poll: Poll,
502    pollfds: HashMap<Token, PollEntry, BuildHasherDefault<FxHasher>>,
503    events: Events,
504}
505
506impl Drop for OsIpcReceiverSet {
507    fn drop(&mut self) {
508        for &PollEntry { id: _, fd } in self.pollfds.values() {
509            let result = unsafe { libc::close(fd) };
510            assert!(thread::panicking() || result == 0);
511        }
512    }
513}
514
515impl OsIpcReceiverSet {
516    pub fn new() -> Result<OsIpcReceiverSet, UnixError> {
517        let fx = BuildHasherDefault::<FxHasher>::default();
518        Ok(OsIpcReceiverSet {
519            incrementor: 0..,
520            poll: Poll::new()?,
521            pollfds: HashMap::with_hasher(fx),
522            events: Events::with_capacity(10),
523        })
524    }
525
526    pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64, UnixError> {
527        let last_index = self.incrementor.next().unwrap();
528        let fd = receiver.consume_fd();
529        let fd_token = Token(fd as usize);
530        let poll_entry = PollEntry { id: last_index, fd };
531        self.poll
532            .registry()
533            .register(&mut SourceFd(&fd), fd_token, Interest::READABLE)?;
534        self.pollfds.insert(fd_token, poll_entry);
535        Ok(last_index)
536    }
537
538    pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>, UnixError> {
539        // Poll until we receive at least one event.
540        loop {
541            match self.poll.poll(&mut self.events, None) {
542                Ok(()) if !self.events.is_empty() => break,
543                Ok(()) => {},
544                Err(ref error) => {
545                    if error.kind() != io::ErrorKind::Interrupted {
546                        return Err(UnixError::last());
547                    }
548                },
549            }
550
551            if !self.events.is_empty() {
552                break;
553            }
554        }
555
556        match self.selection_results() {
557            Ok(v) => Ok(v),
558            Err(OsTrySelectError::Empty) => panic!("Blocking select cannot return Empty"),
559            Err(OsTrySelectError::IoError(e)) => Err(e),
560        }
561    }
562
563    pub fn try_select(&mut self) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
564        // Non-blocking poll to receive zero or more events.
565        self.try_select_timeout(Duration::ZERO)
566    }
567
568    pub fn try_select_timeout(
569        &mut self,
570        duration: Duration,
571    ) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
572        // Poll for the specified duration until we receive zero or more events.
573        match self.poll.poll(&mut self.events, Some(duration)) {
574            Ok(()) => (),
575            Err(ref error) => {
576                if error.kind() != io::ErrorKind::Interrupted {
577                    return Err(OsTrySelectError::IoError(UnixError::last()));
578                }
579            },
580        }
581
582        let v = self.selection_results()?;
583        if v.is_empty() {
584            Err(OsTrySelectError::Empty)
585        } else {
586            Ok(v)
587        }
588    }
589
590    fn selection_results(&mut self) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
591        let mut selection_results = Vec::new();
592        for event in self.events.iter() {
593            // We only register this `Poll` for readable events.
594            assert!(event.is_readable());
595
596            let event_token = event.token();
597            let poll_entry = *self
598                .pollfds
599                .get(&event_token)
600                .expect("Got event for unknown token.");
601            loop {
602                match recv(poll_entry.fd, BlockingMode::Nonblocking) {
603                    Ok(ipc_message) => {
604                        selection_results.push(OsIpcSelectionResult::DataReceived(
605                            poll_entry.id,
606                            ipc_message,
607                        ));
608                    },
609                    Err(err) if err.channel_is_closed() => {
610                        self.pollfds.remove(&event_token).unwrap();
611                        self.poll
612                            .registry()
613                            .deregister(&mut SourceFd(&poll_entry.fd))
614                            .unwrap();
615                        unsafe {
616                            libc::close(poll_entry.fd);
617                        }
618
619                        selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id));
620                        break;
621                    },
622                    Err(UnixError::Errno(code)) if code == EWOULDBLOCK => {
623                        // We tried to read another message from the file descriptor and
624                        // it would have blocked, so we have exhausted all of the data
625                        // pending to read.
626                        break;
627                    },
628                    Err(e) => return Err(OsTrySelectError::IoError(e)),
629                }
630            }
631        }
632
633        Ok(selection_results)
634    }
635}
636
637pub enum OsIpcSelectionResult {
638    DataReceived(u64, IpcMessage),
639    ChannelClosed(u64),
640}
641
642impl OsIpcSelectionResult {
643    pub fn unwrap(self) -> (u64, IpcMessage) {
644        match self {
645            OsIpcSelectionResult::DataReceived(id, ipc_message) => (id, ipc_message),
646            OsIpcSelectionResult::ChannelClosed(id) => {
647                panic!("OsIpcSelectionResult::unwrap(): receiver ID {id} was closed!")
648            },
649        }
650    }
651}
652
653#[derive(Debug)]
654pub struct OsOpaqueIpcChannel {
655    fd: c_int,
656}
657
658impl Drop for OsOpaqueIpcChannel {
659    fn drop(&mut self) {
660        // Make sure we don't leak!
661        //
662        // The `OsOpaqueIpcChannel` objects should always be used,
663        // i.e. converted with `to_sender()` or `to_receiver()` --
664        // so the value should already be unset before the object gets dropped.
665        debug_assert!(self.fd == -1);
666    }
667}
668
669impl OsOpaqueIpcChannel {
670    fn from_fd(fd: c_int) -> OsOpaqueIpcChannel {
671        OsOpaqueIpcChannel { fd }
672    }
673
674    pub fn to_sender(&mut self) -> OsIpcSender {
675        OsIpcSender::from_fd(mem::replace(&mut self.fd, -1))
676    }
677
678    pub fn to_receiver(&mut self) -> OsIpcReceiver {
679        OsIpcReceiver::from_fd(mem::replace(&mut self.fd, -1))
680    }
681}
682
683pub struct OsIpcOneShotServer {
684    fd: c_int,
685
686    // Object representing the temporary directory the socket was created in.
687    // The directory is automatically deleted (along with the socket inside it)
688    // when this field is dropped.
689    _temp_dir: TempDir,
690}
691
692impl Drop for OsIpcOneShotServer {
693    fn drop(&mut self) {
694        unsafe {
695            let result = libc::close(self.fd);
696            assert!(thread::panicking() || result == 0);
697        }
698    }
699}
700
701impl OsIpcOneShotServer {
702    pub fn new() -> Result<(OsIpcOneShotServer, String), UnixError> {
703        unsafe {
704            let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
705            let temp_dir = Builder::new().tempdir()?;
706            let socket_path = temp_dir.path().join("socket");
707            let path_string = socket_path.to_str().unwrap();
708
709            let path_c_string = CString::new(path_string).unwrap();
710            let (sockaddr, len) = new_sockaddr_un(path_c_string.as_ptr());
711            if libc::bind(
712                fd,
713                &sockaddr as *const _ as *const sockaddr,
714                len as socklen_t,
715            ) != 0
716            {
717                return Err(UnixError::last());
718            }
719
720            if libc::listen(fd, 10) != 0 {
721                return Err(UnixError::last());
722            }
723
724            Ok((
725                OsIpcOneShotServer {
726                    fd,
727                    _temp_dir: temp_dir,
728                },
729                path_string.to_string(),
730            ))
731        }
732    }
733
734    #[allow(clippy::type_complexity)]
735    pub fn accept(self) -> Result<(OsIpcReceiver, IpcMessage), UnixError> {
736        unsafe {
737            let sockaddr: *mut sockaddr = ptr::null_mut();
738            let sockaddr_len: *mut socklen_t = ptr::null_mut();
739            let client_fd = libc::accept(self.fd, sockaddr, sockaddr_len);
740            if client_fd < 0 {
741                return Err(UnixError::last());
742            }
743            make_socket_lingering(client_fd)?;
744
745            let receiver = OsIpcReceiver::from_fd(client_fd);
746            let ipc_message = receiver.recv()?;
747            Ok((receiver, ipc_message))
748        }
749    }
750}
751
752// Make sure that the kernel doesn't return errors to readers if there's still data left after we
753// close our end.
754//
755// See, for example, https://github.com/servo/ipc-channel/issues/29
756fn make_socket_lingering(sockfd: c_int) -> Result<(), UnixError> {
757    let linger = linger {
758        l_onoff: 1,
759        l_linger: 30,
760    };
761    let err = unsafe {
762        setsockopt(
763            sockfd,
764            SOL_SOCKET,
765            SO_LINGER,
766            &linger as *const _ as *const c_void,
767            mem::size_of::<linger>() as socklen_t,
768        )
769    };
770    if err < 0 {
771        let error = UnixError::last();
772        if let UnixError::Errno(libc::EINVAL) = error {
773            // If the other side of the connection is already closed, POSIX.1-2024 (and earlier
774            // versions) require that setsockopt return EINVAL [1]. This is a bit unfortunate
775            // because SO_LINGER for a closed socket is logically a no-op, which is why some OSes
776            // like Linux don't follow this part of the spec. But other OSes like illumos do return
777            // EINVAL here.
778            //
779            // SO_LINGER is widely understood and EINVAL should not occur for any other reason, so
780            // accept those errors.
781            //
782            // Another option would be to call make_socket_lingering on the initial socket created
783            // by libc::socket, but whether accept inherits a particular option is
784            // implementation-defined [2]. This means that special-casing EINVAL is the most
785            // portable thing to do.
786            //
787            // [1] https://pubs.opengroup.org/onlinepubs/9799919799/functions/setsockopt.html:
788            //     "[EINVAL] The specified option is invalid at the specified socket level or the
789            //     socket has been shut down."
790            //
791            // [2] https://pubs.opengroup.org/onlinepubs/9799919799/functions/accept.html: "It is
792            //     implementation-defined which socket options, if any, on the accepted socket will
793            //     have a default value determined by a value previously customized by setsockopt()
794            //     on socket, rather than the default value used for other new sockets."
795        } else {
796            return Err(error);
797        }
798    }
799    Ok(())
800}
801
802struct BackingStore {
803    fd: c_int,
804}
805
806impl BackingStore {
807    pub fn new(length: usize) -> BackingStore {
808        let count = SHM_COUNT.fetch_add(1, Ordering::Relaxed);
809        let timestamp = UNIX_EPOCH.elapsed().unwrap();
810        let name = CString::new(format!(
811            "/ipc-channel-shared-memory.{}.{}.{}.{}",
812            count,
813            *PID,
814            timestamp.as_secs(),
815            timestamp.subsec_nanos()
816        ))
817        .unwrap();
818        let fd = create_shmem(name, length);
819        Self::from_fd(fd)
820    }
821
822    pub fn from_fd(fd: c_int) -> BackingStore {
823        BackingStore { fd }
824    }
825
826    pub fn fd(&self) -> c_int {
827        self.fd
828    }
829
830    pub unsafe fn map_file(&self, length: Option<size_t>) -> (*mut u8, size_t) {
831        let length = length.unwrap_or_else(|| {
832            let mut st = mem::MaybeUninit::uninit();
833            if libc::fstat(self.fd, st.as_mut_ptr()) != 0 {
834                panic!("error stating fd {}: {}", self.fd, UnixError::last());
835            }
836            st.assume_init().st_size as size_t
837        });
838        if length == 0 {
839            // This will cause `mmap` to fail, so handle it explicitly.
840            return (ptr::null_mut(), length);
841        }
842        let address = libc::mmap(
843            ptr::null_mut(),
844            length,
845            PROT_READ | PROT_WRITE,
846            MAP_SHARED,
847            self.fd,
848            0,
849        );
850        assert!(!address.is_null());
851        assert!(address != MAP_FAILED);
852        (address as *mut u8, length)
853    }
854}
855
856impl Drop for BackingStore {
857    fn drop(&mut self) {
858        unsafe {
859            let result = libc::close(self.fd);
860            assert!(thread::panicking() || result == 0);
861        }
862    }
863}
864
865pub struct OsIpcSharedMemory {
866    ptr: *mut u8,
867    length: usize,
868    store: BackingStore,
869}
870
871unsafe impl Send for OsIpcSharedMemory {}
872unsafe impl Sync for OsIpcSharedMemory {}
873
874impl Drop for OsIpcSharedMemory {
875    fn drop(&mut self) {
876        unsafe {
877            if !self.ptr.is_null() {
878                let result = libc::munmap(self.ptr as *mut c_void, self.length);
879                assert!(thread::panicking() || result == 0);
880            }
881        }
882    }
883}
884
885impl Clone for OsIpcSharedMemory {
886    fn clone(&self) -> OsIpcSharedMemory {
887        unsafe {
888            let store = BackingStore::from_fd(libc::dup(self.store.fd()));
889            let (address, _) = store.map_file(Some(self.length));
890            OsIpcSharedMemory::from_raw_parts(address, self.length, store)
891        }
892    }
893}
894
895impl PartialEq for OsIpcSharedMemory {
896    fn eq(&self, other: &OsIpcSharedMemory) -> bool {
897        **self == **other
898    }
899}
900
901impl Debug for OsIpcSharedMemory {
902    fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
903        (**self).fmt(formatter)
904    }
905}
906
907impl Deref for OsIpcSharedMemory {
908    type Target = [u8];
909
910    #[inline]
911    fn deref(&self) -> &[u8] {
912        unsafe { slice::from_raw_parts(self.ptr, self.length) }
913    }
914}
915
916impl OsIpcSharedMemory {
917    /// # Safety
918    ///
919    /// This is safe if there is only one reader/writer on the data.
920    /// User can achieve this by not cloning [crate::ipc::IpcSharedMemory]
921    /// and serializing/deserializing only once.
922    #[inline]
923    pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
924        unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
925    }
926
927    pub fn take(self) -> Option<Vec<u8>> {
928        Some((*self).to_vec())
929    }
930}
931
932impl OsIpcSharedMemory {
933    unsafe fn from_raw_parts(
934        ptr: *mut u8,
935        length: usize,
936        store: BackingStore,
937    ) -> OsIpcSharedMemory {
938        OsIpcSharedMemory { ptr, length, store }
939    }
940
941    unsafe fn from_fd(fd: c_int) -> OsIpcSharedMemory {
942        let store = BackingStore::from_fd(fd);
943        let (ptr, length) = store.map_file(None);
944        OsIpcSharedMemory::from_raw_parts(ptr, length, store)
945    }
946
947    pub fn from_byte(byte: u8, length: usize) -> OsIpcSharedMemory {
948        unsafe {
949            let store = BackingStore::new(length);
950            let (address, _) = store.map_file(Some(length));
951            for element in slice::from_raw_parts_mut(address, length) {
952                *element = byte;
953            }
954            OsIpcSharedMemory::from_raw_parts(address, length, store)
955        }
956    }
957
958    pub fn from_bytes(bytes: &[u8]) -> OsIpcSharedMemory {
959        unsafe {
960            let store = BackingStore::new(bytes.len());
961            let (address, _) = store.map_file(Some(bytes.len()));
962            ptr::copy_nonoverlapping(bytes.as_ptr(), address, bytes.len());
963            OsIpcSharedMemory::from_raw_parts(address, bytes.len(), store)
964        }
965    }
966}
967
968#[derive(Debug, Error)]
969pub enum UnixError {
970    Errno(c_int),
971    ChannelClosed,
972    IoError(io::Error),
973}
974
975impl UnixError {
976    fn last() -> UnixError {
977        UnixError::Errno(io::Error::last_os_error().raw_os_error().unwrap())
978    }
979
980    #[allow(dead_code)]
981    pub fn channel_is_closed(&self) -> bool {
982        matches!(self, UnixError::ChannelClosed)
983    }
984}
985
986impl From<UnixError> for crate::IpcError {
987    fn from(error: UnixError) -> Self {
988        match error {
989            UnixError::ChannelClosed => crate::IpcError::Disconnected,
990            e => crate::IpcError::Io(io::Error::from(e)),
991        }
992    }
993}
994
995impl fmt::Display for UnixError {
996    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
997        match self {
998            UnixError::Errno(errno) => {
999                fmt::Display::fmt(&io::Error::from_raw_os_error(*errno), fmt)
1000            },
1001            UnixError::ChannelClosed => write!(fmt, "All senders for this socket closed"),
1002            UnixError::IoError(e) => write!(fmt, "{e}"),
1003        }
1004    }
1005}
1006
1007impl From<UnixError> for io::Error {
1008    fn from(unix_error: UnixError) -> io::Error {
1009        match unix_error {
1010            UnixError::Errno(errno) => io::Error::from_raw_os_error(errno),
1011            UnixError::ChannelClosed => io::Error::new(io::ErrorKind::ConnectionReset, unix_error),
1012            UnixError::IoError(e) => e,
1013        }
1014    }
1015}
1016
1017impl From<UnixError> for crate::TryRecvError {
1018    fn from(error: UnixError) -> Self {
1019        match error {
1020            UnixError::ChannelClosed => {
1021                crate::TryRecvError::IpcError(crate::IpcError::Disconnected)
1022            },
1023            UnixError::Errno(code) if code == EAGAIN || code == EWOULDBLOCK => {
1024                crate::TryRecvError::Empty
1025            },
1026            e => crate::TryRecvError::IpcError(crate::IpcError::Io(io::Error::from(e))),
1027        }
1028    }
1029}
1030
1031impl From<io::Error> for UnixError {
1032    fn from(e: io::Error) -> UnixError {
1033        if let Some(errno) = e.raw_os_error() {
1034            UnixError::Errno(errno)
1035        } else {
1036            assert!(e.kind() == io::ErrorKind::ConnectionReset);
1037            UnixError::ChannelClosed
1038        }
1039    }
1040}
1041
1042#[derive(Copy, Clone)]
1043enum BlockingMode {
1044    Blocking,
1045    Nonblocking,
1046    Timeout(Duration),
1047}
1048
1049#[allow(clippy::uninit_vec, clippy::type_complexity)]
1050fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result<IpcMessage, UnixError> {
1051    let (mut channels, mut shared_memory_regions) = (Vec::new(), Vec::new());
1052
1053    // First fragments begins with a header recording the total data length.
1054    //
1055    // We use this to determine whether we already got the entire message,
1056    // or need to receive additional fragments -- and if so, how much.
1057    let mut total_size = 0usize;
1058    let mut main_data_buffer;
1059    unsafe {
1060        // Allocate a buffer without initialising the memory.
1061        main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size());
1062        main_data_buffer.set_len(OsIpcSender::get_max_fragment_size());
1063
1064        let mut fragment_size: usize = 0;
1065
1066        let mut bytes_read: usize = 0;
1067
1068        let mut blocking_mode = blocking_mode;
1069
1070        loop {
1071            let mut iovec = [
1072                iovec {
1073                    iov_base: &mut fragment_size as *mut _ as *mut c_void,
1074                    iov_len: mem::size_of_val(&fragment_size),
1075                },
1076                iovec {
1077                    iov_base: &mut total_size as *mut _ as *mut c_void,
1078                    iov_len: mem::size_of_val(&total_size),
1079                },
1080                iovec {
1081                    iov_base: main_data_buffer.as_mut_ptr() as *mut c_void,
1082                    iov_len: main_data_buffer.len(),
1083                },
1084            ];
1085
1086            let mut iovec: &mut [iovec] = &mut iovec;
1087
1088            let mut offset = bytes_read;
1089            while offset > 0 {
1090                let change = offset.min(iovec[0].iov_len);
1091                iovec[0].iov_base = iovec[0].iov_base.offset(change.try_into().unwrap());
1092                iovec[0].iov_len -= change;
1093                offset -= change;
1094
1095                if iovec[0].iov_len == 0 {
1096                    iovec = &mut iovec[1..];
1097                }
1098            }
1099
1100            let mut cmsg = UnixCmsg::new(iovec)?;
1101
1102            bytes_read += cmsg.recv(fd, blocking_mode).inspect_err(|_| {
1103                assert_eq!(bytes_read, 0);
1104            })?;
1105
1106            let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int;
1107            let cmsg_length = cmsg.msghdr.msg_controllen;
1108            let channel_length = if cmsg_length == 0 {
1109                0
1110            } else {
1111                // The control header is followed by an array of FDs. The size of the control header is
1112                // determined by CMSG_SPACE. (On Linux this would the same as CMSG_ALIGN, but that isn't
1113                // exposed by libc. CMSG_SPACE(0) is the portable version of that.)
1114                (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::<c_int>()
1115            };
1116            for index in 0..channel_length {
1117                let fd = *cmsg_fds.add(index);
1118                if is_socket(fd) {
1119                    channels.push(OsOpaqueIpcChannel::from_fd(fd));
1120                    continue;
1121                }
1122                shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd));
1123            }
1124
1125            if bytes_read >= mem::size_of_val(&fragment_size) && bytes_read == fragment_size {
1126                break;
1127            }
1128
1129            // Block on subsequent reads, to avoid returning mid-fragment
1130            blocking_mode = BlockingMode::Blocking;
1131        }
1132
1133        main_data_buffer
1134            .set_len(bytes_read - mem::size_of_val(&fragment_size) - mem::size_of_val(&total_size));
1135    }
1136
1137    if total_size == main_data_buffer.len() {
1138        // Fast path: no fragments.
1139        return Ok(IpcMessage::new(
1140            main_data_buffer,
1141            channels,
1142            shared_memory_regions,
1143        ));
1144    }
1145
1146    // Reassemble fragments.
1147    //
1148    // The initial fragment carries the receive end of a dedicated channel
1149    // through which all the remaining fragments will be coming in.
1150    let dedicated_rx = channels.pop().unwrap().to_receiver();
1151
1152    // Extend the buffer to hold the entire message, without initialising the memory.
1153    let len = main_data_buffer.len();
1154    main_data_buffer.reserve_exact(total_size - len);
1155
1156    // Receive followup fragments directly into the main buffer.
1157    while main_data_buffer.len() < total_size {
1158        let write_pos = main_data_buffer.len();
1159        let end_pos = cmp::min(
1160            write_pos + OsIpcSender::fragment_size(*SYSTEM_SENDBUF_SIZE),
1161            total_size,
1162        );
1163        let result = unsafe {
1164            assert!(end_pos <= main_data_buffer.capacity());
1165            main_data_buffer.set_len(end_pos);
1166
1167            // Integer underflow could make the following code unsound...
1168            assert!(end_pos >= write_pos);
1169
1170            // Note: we always use blocking mode for followup fragments,
1171            // to make sure that once we start receiving a multi-fragment message,
1172            // we don't abort in the middle of it...
1173            let result = libc::recv(
1174                dedicated_rx.fd.get(),
1175                main_data_buffer[write_pos..].as_mut_ptr() as *mut c_void,
1176                end_pos - write_pos,
1177                0,
1178            );
1179            main_data_buffer.set_len(write_pos + cmp::max(result, 0) as usize);
1180            result
1181        };
1182
1183        match result.cmp(&0) {
1184            cmp::Ordering::Greater => continue,
1185            cmp::Ordering::Equal => return Err(UnixError::ChannelClosed),
1186            cmp::Ordering::Less => return Err(UnixError::last()),
1187        }
1188    }
1189
1190    Ok(IpcMessage::new(
1191        main_data_buffer,
1192        channels,
1193        shared_memory_regions,
1194    ))
1195}
1196
1197// https://github.com/servo/ipc-channel/issues/192
1198fn new_msghdr(iovec: &mut [iovec], cmsg_buffer: *mut cmsghdr, cmsg_space: MsgControlLen) -> msghdr {
1199    let mut msghdr: msghdr = unsafe { mem::zeroed() };
1200    msghdr.msg_name = ptr::null_mut();
1201    msghdr.msg_namelen = 0;
1202    msghdr.msg_iov = iovec.as_mut_ptr();
1203    msghdr.msg_iovlen = iovec.len() as IovLen;
1204    msghdr.msg_control = cmsg_buffer as *mut c_void;
1205    msghdr.msg_controllen = cmsg_space;
1206    msghdr.msg_flags = 0;
1207    msghdr
1208}
1209
1210fn create_shmem(name: CString, length: usize) -> c_int {
1211    unsafe {
1212        let fd = libc::memfd_create(name.as_ptr(), libc::MFD_CLOEXEC);
1213        assert!(fd >= 0);
1214        assert_eq!(libc::ftruncate(fd, length as off_t), 0);
1215        fd
1216    }
1217}
1218
1219struct UnixCmsg {
1220    cmsg_buffer: *mut cmsghdr,
1221    msghdr: msghdr,
1222}
1223
1224unsafe impl Send for UnixCmsg {}
1225
1226impl Drop for UnixCmsg {
1227    fn drop(&mut self) {
1228        unsafe {
1229            libc::free(self.cmsg_buffer as *mut c_void);
1230        }
1231    }
1232}
1233
1234impl UnixCmsg {
1235    unsafe fn new(iovec: &mut [iovec]) -> Result<UnixCmsg, UnixError> {
1236        let cmsg_length = CMSG_SPACE(MAX_FDS_IN_CMSG * (mem::size_of::<c_int>() as c_uint));
1237        let cmsg_buffer = libc::malloc(cmsg_length as usize) as *mut cmsghdr;
1238        if cmsg_buffer.is_null() {
1239            return Err(UnixError::last());
1240        }
1241        Ok(UnixCmsg {
1242            cmsg_buffer,
1243            msghdr: new_msghdr(iovec, cmsg_buffer, cmsg_length as MsgControlLen),
1244        })
1245    }
1246
1247    unsafe fn recv(&mut self, fd: c_int, blocking_mode: BlockingMode) -> Result<usize, UnixError> {
1248        match blocking_mode {
1249            BlockingMode::Nonblocking => {
1250                if libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) < 0 {
1251                    return Err(UnixError::last());
1252                }
1253            },
1254            BlockingMode::Timeout(duration) => {
1255                let events = libc::POLLIN | libc::POLLPRI | libc::POLLRDHUP;
1256
1257                let mut fd = [libc::pollfd {
1258                    fd,
1259                    events,
1260                    revents: 0,
1261                }];
1262                let result = libc::poll(
1263                    fd.as_mut_ptr(),
1264                    fd.len() as _,
1265                    duration.as_millis().try_into().unwrap_or(-1),
1266                );
1267
1268                match result.cmp(&0) {
1269                    cmp::Ordering::Equal => return Err(UnixError::Errno(EAGAIN)),
1270                    cmp::Ordering::Less => return Err(UnixError::last()),
1271                    cmp::Ordering::Greater => {},
1272                }
1273            },
1274            BlockingMode::Blocking => {},
1275        }
1276
1277        let result = recvmsg(fd, &mut self.msghdr, RECVMSG_FLAGS);
1278
1279        let result = match result.cmp(&0) {
1280            cmp::Ordering::Equal => Err(UnixError::ChannelClosed),
1281            cmp::Ordering::Less => Err(UnixError::last()),
1282            cmp::Ordering::Greater => Ok(result as usize),
1283        };
1284
1285        if let BlockingMode::Nonblocking = blocking_mode {
1286            if libc::fcntl(fd, libc::F_SETFL, 0) < 0 {
1287                return Err(UnixError::last());
1288            }
1289        }
1290        result
1291    }
1292
1293    unsafe fn cmsg_len(&self) -> size_t {
1294        (*(self.msghdr.msg_control as *const cmsghdr)).cmsg_len as size_t
1295    }
1296}
1297
1298fn is_socket(fd: c_int) -> bool {
1299    unsafe {
1300        let mut st = mem::MaybeUninit::uninit();
1301        if libc::fstat(fd, st.as_mut_ptr()) != 0 {
1302            return false;
1303        }
1304        (st.assume_init().st_mode & S_IFMT) == S_IFSOCK
1305    }
1306}