ipc_channel/platform/unix/
mod.rs

1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10use crate::ipc::{self, IpcMessage};
11use bincode;
12use fnv::FnvHasher;
13use libc::{
14    self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, PROT_READ,
15    PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET,
16};
17use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK};
18use libc::{iovec, msghdr, off_t, recvmsg, sendmsg};
19use libc::{sa_family_t, setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t};
20use libc::{EAGAIN, EWOULDBLOCK};
21use mio::unix::SourceFd;
22use mio::{Events, Interest, Poll, Token};
23use std::cell::Cell;
24use std::cmp;
25use std::collections::HashMap;
26use std::convert::TryInto;
27use std::error::Error as StdError;
28use std::ffi::{c_uint, CString};
29use std::fmt::{self, Debug, Formatter};
30use std::hash::BuildHasherDefault;
31use std::io;
32use std::marker::PhantomData;
33use std::mem;
34use std::ops::{Deref, RangeFrom};
35use std::os::fd::RawFd;
36use std::ptr;
37use std::slice;
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::{Arc, LazyLock};
40use std::thread;
41use std::time::{Duration, UNIX_EPOCH};
42use tempfile::{Builder, TempDir};
43
44const MAX_FDS_IN_CMSG: u32 = 64;
45
46// The value Linux returns for SO_SNDBUF
47// is not the size we are actually allowed to use...
48// Empirically, we have to deduct 32 bytes from that.
49const RESERVED_SIZE: usize = 32;
50
51#[cfg(any(target_os = "linux", target_os = "illumos"))]
52const SOCK_FLAGS: c_int = libc::SOCK_CLOEXEC;
53#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
54const SOCK_FLAGS: c_int = 0;
55
56#[cfg(any(target_os = "linux", target_os = "illumos"))]
57const RECVMSG_FLAGS: c_int = libc::MSG_CMSG_CLOEXEC;
58#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
59const RECVMSG_FLAGS: c_int = 0;
60
61#[cfg(target_env = "gnu")]
62type IovLen = usize;
63#[cfg(target_env = "gnu")]
64type MsgControlLen = size_t;
65
66#[cfg(not(target_env = "gnu"))]
67type IovLen = i32;
68#[cfg(not(target_env = "gnu"))]
69type MsgControlLen = socklen_t;
70
71unsafe fn new_sockaddr_un(path: *const c_char) -> (sockaddr_un, usize) {
72    let mut sockaddr: sockaddr_un = mem::zeroed();
73    libc::strncpy(
74        sockaddr.sun_path.as_mut_ptr(),
75        path,
76        sockaddr.sun_path.len() - 1,
77    );
78    sockaddr.sun_family = libc::AF_UNIX as sa_family_t;
79    (sockaddr, mem::size_of::<sockaddr_un>())
80}
81
82static SYSTEM_SENDBUF_SIZE: LazyLock<usize> = LazyLock::new(|| {
83    let (tx, _) = channel().expect("Failed to obtain a socket for checking maximum send size");
84    tx.get_system_sendbuf_size()
85        .expect("Failed to obtain maximum send size for socket")
86});
87
88// The pid of the current process which is used to create unique IDs
89static PID: LazyLock<u32> = LazyLock::new(std::process::id);
90
91// A global count used to create unique IDs
92static SHM_COUNT: AtomicUsize = AtomicUsize::new(0);
93
94pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), UnixError> {
95    let mut results = [0, 0];
96    unsafe {
97        if socketpair(
98            libc::AF_UNIX,
99            SOCK_SEQPACKET | SOCK_FLAGS,
100            0,
101            &mut results[0],
102        ) >= 0
103        {
104            Ok((
105                OsIpcSender::from_fd(results[0]),
106                OsIpcReceiver::from_fd(results[1]),
107            ))
108        } else {
109            Err(UnixError::last())
110        }
111    }
112}
113
114#[derive(Clone, Copy)]
115struct PollEntry {
116    pub id: u64,
117    pub fd: RawFd,
118}
119
120#[derive(PartialEq, Debug)]
121pub struct OsIpcReceiver {
122    fd: Cell<c_int>,
123}
124
125impl Drop for OsIpcReceiver {
126    fn drop(&mut self) {
127        unsafe {
128            let fd = self.fd.get();
129            if fd >= 0 {
130                let result = libc::close(fd);
131                assert!(
132                    thread::panicking() || result == 0,
133                    "closed receiver (fd: {}): {}",
134                    fd,
135                    UnixError::last(),
136                );
137            }
138        }
139    }
140}
141
142impl OsIpcReceiver {
143    fn from_fd(fd: c_int) -> OsIpcReceiver {
144        OsIpcReceiver { fd: Cell::new(fd) }
145    }
146
147    fn consume_fd(&self) -> c_int {
148        let fd = self.fd.get();
149        self.fd.set(-1);
150        fd
151    }
152
153    pub fn consume(&self) -> OsIpcReceiver {
154        OsIpcReceiver::from_fd(self.consume_fd())
155    }
156
157    #[allow(clippy::type_complexity)]
158    pub fn recv(&self) -> Result<IpcMessage, UnixError> {
159        recv(self.fd.get(), BlockingMode::Blocking)
160    }
161
162    #[allow(clippy::type_complexity)]
163    pub fn try_recv(&self) -> Result<IpcMessage, UnixError> {
164        recv(self.fd.get(), BlockingMode::Nonblocking)
165    }
166
167    #[allow(clippy::type_complexity)]
168    pub fn try_recv_timeout(&self, duration: Duration) -> Result<IpcMessage, UnixError> {
169        recv(self.fd.get(), BlockingMode::Timeout(duration))
170    }
171}
172
173#[derive(PartialEq, Debug)]
174struct SharedFileDescriptor(c_int);
175
176impl Drop for SharedFileDescriptor {
177    fn drop(&mut self) {
178        unsafe {
179            let result = libc::close(self.0);
180            assert!(thread::panicking() || result == 0);
181        }
182    }
183}
184
185#[derive(PartialEq, Debug, Clone)]
186pub struct OsIpcSender {
187    fd: Arc<SharedFileDescriptor>,
188    // Make sure this is `!Sync`, to match `mpsc::Sender`; and to discourage sharing references.
189    //
190    // (Rather, senders should just be cloned, as they are shared internally anyway --
191    // another layer of sharing only adds unnecessary overhead...)
192    nosync_marker: PhantomData<Cell<()>>,
193}
194
195impl OsIpcSender {
196    fn from_fd(fd: c_int) -> OsIpcSender {
197        OsIpcSender {
198            fd: Arc::new(SharedFileDescriptor(fd)),
199            nosync_marker: PhantomData,
200        }
201    }
202
203    /// Maximum size of the kernel buffer used for transfers over this channel.
204    ///
205    /// Note: This is *not* the actual maximal packet size we are allowed to use...
206    /// Some of it is reserved by the kernel for bookkeeping.
207    fn get_system_sendbuf_size(&self) -> Result<usize, UnixError> {
208        unsafe {
209            let mut socket_sendbuf_size: c_int = 0;
210            let mut socket_sendbuf_size_len = mem::size_of::<c_int>() as socklen_t;
211            if getsockopt(
212                self.fd.0,
213                libc::SOL_SOCKET,
214                libc::SO_SNDBUF,
215                &mut socket_sendbuf_size as *mut _ as *mut c_void,
216                &mut socket_sendbuf_size_len as *mut socklen_t,
217            ) < 0
218            {
219                return Err(UnixError::last());
220            }
221            Ok(socket_sendbuf_size.try_into().unwrap())
222        }
223    }
224
225    /// Calculate maximum payload data size per fragment.
226    ///
227    /// It is the total size of the kernel buffer, minus the part reserved by the kernel.
228    ///
229    /// The `sendbuf_size` passed in should usually be the maximum kernel buffer size,
230    /// i.e. the value of *SYSTEM_SENDBUF_SIZE --
231    /// except after getting ENOBUFS, in which case it needs to be reduced.
232    fn fragment_size(sendbuf_size: usize) -> usize {
233        sendbuf_size - RESERVED_SIZE
234    }
235
236    /// Calculate maximum payload data size of first fragment.
237    ///
238    /// This one is smaller than regular fragments, because it carries the message (size) header.
239    fn first_fragment_size(sendbuf_size: usize) -> usize {
240        (Self::fragment_size(sendbuf_size) - mem::size_of::<usize>()) & (!8usize + 1)
241        // Ensure optimal alignment.
242    }
243
244    /// Maximum data size that can be transferred over this channel in a single packet.
245    ///
246    /// This is the size of the main data chunk only --
247    /// it's independent of any auxiliary data (FDs) transferred along with it.
248    ///
249    /// A send on this channel won't block for transfers up to this size
250    /// under normal circumstances.
251    /// (It might still block if heavy memory pressure causes ENOBUFS,
252    /// forcing us to reduce the packet size.)
253    pub fn get_max_fragment_size() -> usize {
254        Self::first_fragment_size(*SYSTEM_SENDBUF_SIZE)
255    }
256
257    pub fn send(
258        &self,
259        data: &[u8],
260        channels: Vec<OsIpcChannel>,
261        shared_memory_regions: Vec<OsIpcSharedMemory>,
262    ) -> Result<(), UnixError> {
263        let mut fds = Vec::new();
264        for channel in channels.iter() {
265            fds.push(channel.fd());
266        }
267        for shared_memory_region in shared_memory_regions.iter() {
268            fds.push(shared_memory_region.store.fd());
269        }
270
271        // `len` is the total length of the message.
272        // Its value will be sent as a message header before the payload data.
273        //
274        // Not to be confused with the length of the data to send in this packet
275        // (i.e. the length of the data buffer passed in),
276        // which in a fragmented send will be smaller than the total message length.
277        fn send_first_fragment(
278            sender_fd: c_int,
279            fds: &[c_int],
280            data_buffer: &[u8],
281            len: usize,
282        ) -> Result<(), UnixError> {
283            let result = unsafe {
284                let cmsg_length = mem::size_of_val(fds) as c_uint;
285                let (cmsg_buffer, cmsg_space) = if cmsg_length > 0 {
286                    let cmsg_buffer =
287                        libc::malloc(CMSG_SPACE(cmsg_length) as usize) as *mut cmsghdr;
288                    if cmsg_buffer.is_null() {
289                        return Err(UnixError::last());
290                    }
291                    (*cmsg_buffer).cmsg_len = CMSG_LEN(cmsg_length) as MsgControlLen;
292                    (*cmsg_buffer).cmsg_level = libc::SOL_SOCKET;
293                    (*cmsg_buffer).cmsg_type = libc::SCM_RIGHTS;
294
295                    ptr::copy_nonoverlapping(
296                        fds.as_ptr(),
297                        CMSG_DATA(cmsg_buffer) as *mut c_int,
298                        fds.len(),
299                    );
300                    (cmsg_buffer, CMSG_SPACE(cmsg_length))
301                } else {
302                    (ptr::null_mut(), 0)
303                };
304
305                let mut iovec = [
306                    // First fragment begins with a header recording the total data length.
307                    //
308                    // The receiver uses this to determine
309                    // whether it already got the entire message,
310                    // or needs to receive additional fragments -- and if so, how much.
311                    iovec {
312                        iov_base: &len as *const _ as *mut c_void,
313                        iov_len: mem::size_of_val(&len),
314                    },
315                    iovec {
316                        iov_base: data_buffer.as_ptr() as *mut c_void,
317                        iov_len: data_buffer.len(),
318                    },
319                ];
320
321                let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen);
322                let result = sendmsg(sender_fd, &msghdr, 0);
323                libc::free(cmsg_buffer as *mut c_void);
324                result
325            };
326
327            if result > 0 {
328                Ok(())
329            } else {
330                Err(UnixError::last())
331            }
332        }
333
334        fn send_followup_fragment(sender_fd: c_int, data_buffer: &[u8]) -> Result<(), UnixError> {
335            let result = unsafe {
336                libc::send(
337                    sender_fd,
338                    data_buffer.as_ptr() as *const c_void,
339                    data_buffer.len(),
340                    0,
341                )
342            };
343
344            if result > 0 {
345                Ok(())
346            } else {
347                Err(UnixError::last())
348            }
349        }
350
351        let mut sendbuf_size = *SYSTEM_SENDBUF_SIZE;
352
353        /// Reduce send buffer size after getting ENOBUFS,
354        /// i.e. when the kernel failed to allocate a large enough buffer.
355        ///
356        /// (If the buffer already was significantly smaller
357        /// than the memory page size though,
358        /// if means something else must have gone wrong;
359        /// so there is no point in further downsizing,
360        /// and we error out instead.)
361        fn downsize(sendbuf_size: &mut usize, sent_size: usize) -> Result<(), ()> {
362            if sent_size > 2000 {
363                *sendbuf_size /= 2;
364                // Make certain we end up with less than what we tried before...
365                if *sendbuf_size >= sent_size {
366                    *sendbuf_size = sent_size / 2;
367                }
368                Ok(())
369            } else {
370                Err(())
371            }
372        }
373
374        // If the message is small enough, try sending it in a single fragment.
375        if data.len() <= Self::get_max_fragment_size() {
376            match send_first_fragment(self.fd.0, &fds[..], data, data.len()) {
377                Ok(_) => return Ok(()),
378                Err(error) => {
379                    // ENOBUFS means the kernel failed to allocate a buffer large enough
380                    // to actually transfer the message,
381                    // although the message was small enough to fit the maximum send size --
382                    // so we have to proceed with a fragmented send nevertheless,
383                    // using a reduced send buffer size.
384                    //
385                    // Any other errors we might get here are non-recoverable.
386                    if !(matches!(error, UnixError::Errno(libc::ENOBUFS))
387                        && downsize(&mut sendbuf_size, data.len()).is_ok())
388                    {
389                        return Err(error);
390                    }
391                },
392            }
393        }
394
395        // The packet is too big. Fragmentation time!
396        //
397        // Create dedicated channel to send all but the first fragment.
398        // This way we avoid fragments of different messages interleaving in the receiver.
399        //
400        // The receiver end of the channel is sent with the first fragment
401        // along any other file descriptors that are to be transferred in the message.
402        let (dedicated_tx, dedicated_rx) = channel()?;
403        // Extract FD handle without consuming the Receiver, so the FD doesn't get closed.
404        fds.push(dedicated_rx.fd.get());
405
406        // Split up the packet into fragments.
407        let mut byte_position = 0;
408        while byte_position < data.len() {
409            let end_byte_position;
410            let result = if byte_position == 0 {
411                // First fragment. No offset; but contains message header (total size).
412                // The auxiliary data (FDs) is also sent along with this one.
413
414                // This fragment always uses the full allowable buffer size.
415                end_byte_position = Self::first_fragment_size(sendbuf_size);
416                send_first_fragment(self.fd.0, &fds[..], &data[..end_byte_position], data.len())
417            } else {
418                // Followup fragment. No header; but offset by amount of data already sent.
419
420                end_byte_position = cmp::min(
421                    byte_position + Self::fragment_size(sendbuf_size),
422                    data.len(),
423                );
424                send_followup_fragment(dedicated_tx.fd.0, &data[byte_position..end_byte_position])
425            };
426
427            if let Err(error) = result {
428                if matches!(error, UnixError::Errno(libc::ENOBUFS))
429                    && downsize(&mut sendbuf_size, end_byte_position - byte_position).is_ok()
430                {
431                    // If the kernel failed to allocate a buffer large enough for the packet,
432                    // retry with a smaller size (if possible).
433                    continue;
434                } else {
435                    return Err(error);
436                }
437            }
438
439            byte_position = end_byte_position;
440        }
441
442        Ok(())
443    }
444
445    pub fn connect(name: String) -> Result<OsIpcSender, UnixError> {
446        let name = CString::new(name).unwrap();
447        unsafe {
448            let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
449            let (sockaddr, len) = new_sockaddr_un(name.as_ptr());
450            if libc::connect(
451                fd,
452                &sockaddr as *const _ as *const sockaddr,
453                len as socklen_t,
454            ) < 0
455            {
456                return Err(UnixError::last());
457            }
458
459            Ok(OsIpcSender::from_fd(fd))
460        }
461    }
462}
463
464#[derive(PartialEq, Debug)]
465pub enum OsIpcChannel {
466    Sender(OsIpcSender),
467    Receiver(OsIpcReceiver),
468}
469
470impl OsIpcChannel {
471    fn fd(&self) -> c_int {
472        match *self {
473            OsIpcChannel::Sender(ref sender) => sender.fd.0,
474            OsIpcChannel::Receiver(ref receiver) => receiver.fd.get(),
475        }
476    }
477}
478
479pub struct OsIpcReceiverSet {
480    incrementor: RangeFrom<u64>,
481    poll: Poll,
482    pollfds: HashMap<Token, PollEntry, BuildHasherDefault<FnvHasher>>,
483    events: Events,
484}
485
486impl Drop for OsIpcReceiverSet {
487    fn drop(&mut self) {
488        for &PollEntry { id: _, fd } in self.pollfds.values() {
489            let result = unsafe { libc::close(fd) };
490            assert!(thread::panicking() || result == 0);
491        }
492    }
493}
494
495impl OsIpcReceiverSet {
496    pub fn new() -> Result<OsIpcReceiverSet, UnixError> {
497        let fnv = BuildHasherDefault::<FnvHasher>::default();
498        Ok(OsIpcReceiverSet {
499            incrementor: 0..,
500            poll: Poll::new()?,
501            pollfds: HashMap::with_hasher(fnv),
502            events: Events::with_capacity(10),
503        })
504    }
505
506    pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64, UnixError> {
507        let last_index = self.incrementor.next().unwrap();
508        let fd = receiver.consume_fd();
509        let fd_token = Token(fd as usize);
510        let poll_entry = PollEntry { id: last_index, fd };
511        self.poll
512            .registry()
513            .register(&mut SourceFd(&fd), fd_token, Interest::READABLE)?;
514        self.pollfds.insert(fd_token, poll_entry);
515        Ok(last_index)
516    }
517
518    pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>, UnixError> {
519        // Poll until we receive at least one event.
520        loop {
521            match self.poll.poll(&mut self.events, None) {
522                Ok(()) if !self.events.is_empty() => break,
523                Ok(()) => {},
524                Err(ref error) => {
525                    if error.kind() != io::ErrorKind::Interrupted {
526                        return Err(UnixError::last());
527                    }
528                },
529            }
530
531            if !self.events.is_empty() {
532                break;
533            }
534        }
535
536        let mut selection_results = Vec::new();
537        for event in self.events.iter() {
538            // We only register this `Poll` for readable events.
539            assert!(event.is_readable());
540
541            let event_token = event.token();
542            let poll_entry = self
543                .pollfds
544                .get(&event_token)
545                .expect("Got event for unknown token.")
546                .clone();
547            loop {
548                match recv(poll_entry.fd, BlockingMode::Nonblocking) {
549                    Ok(ipc_message) => {
550                        selection_results.push(OsIpcSelectionResult::DataReceived(
551                            poll_entry.id,
552                            ipc_message,
553                        ));
554                    },
555                    Err(err) if err.channel_is_closed() => {
556                        self.pollfds.remove(&event_token).unwrap();
557                        self.poll
558                            .registry()
559                            .deregister(&mut SourceFd(&poll_entry.fd))
560                            .unwrap();
561                        unsafe {
562                            libc::close(poll_entry.fd);
563                        }
564
565                        selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id));
566                        break;
567                    },
568                    Err(UnixError::Errno(code)) if code == EWOULDBLOCK => {
569                        // We tried to read another message from the file descriptor and
570                        // it would have blocked, so we have exhausted all of the data
571                        // pending to read.
572                        break;
573                    },
574                    Err(err) => return Err(err),
575                }
576            }
577        }
578
579        Ok(selection_results)
580    }
581}
582
583pub enum OsIpcSelectionResult {
584    DataReceived(u64, IpcMessage),
585    ChannelClosed(u64),
586}
587
588impl OsIpcSelectionResult {
589    pub fn unwrap(self) -> (u64, IpcMessage) {
590        match self {
591            OsIpcSelectionResult::DataReceived(id, ipc_message) => (id, ipc_message),
592            OsIpcSelectionResult::ChannelClosed(id) => {
593                panic!(
594                    "OsIpcSelectionResult::unwrap(): receiver ID {} was closed!",
595                    id
596                )
597            },
598        }
599    }
600}
601
602#[derive(PartialEq, Debug)]
603pub struct OsOpaqueIpcChannel {
604    fd: c_int,
605}
606
607impl Drop for OsOpaqueIpcChannel {
608    fn drop(&mut self) {
609        // Make sure we don't leak!
610        //
611        // The `OsOpaqueIpcChannel` objects should always be used,
612        // i.e. converted with `to_sender()` or `to_receiver()` --
613        // so the value should already be unset before the object gets dropped.
614        debug_assert!(self.fd == -1);
615    }
616}
617
618impl OsOpaqueIpcChannel {
619    fn from_fd(fd: c_int) -> OsOpaqueIpcChannel {
620        OsOpaqueIpcChannel { fd }
621    }
622
623    pub fn to_sender(&mut self) -> OsIpcSender {
624        OsIpcSender::from_fd(mem::replace(&mut self.fd, -1))
625    }
626
627    pub fn to_receiver(&mut self) -> OsIpcReceiver {
628        OsIpcReceiver::from_fd(mem::replace(&mut self.fd, -1))
629    }
630}
631
632pub struct OsIpcOneShotServer {
633    fd: c_int,
634
635    // Object representing the temporary directory the socket was created in.
636    // The directory is automatically deleted (along with the socket inside it)
637    // when this field is dropped.
638    _temp_dir: TempDir,
639}
640
641impl Drop for OsIpcOneShotServer {
642    fn drop(&mut self) {
643        unsafe {
644            let result = libc::close(self.fd);
645            assert!(thread::panicking() || result == 0);
646        }
647    }
648}
649
650impl OsIpcOneShotServer {
651    pub fn new() -> Result<(OsIpcOneShotServer, String), UnixError> {
652        unsafe {
653            let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
654            let temp_dir = Builder::new().tempdir()?;
655            let socket_path = temp_dir.path().join("socket");
656            let path_string = socket_path.to_str().unwrap();
657
658            let path_c_string = CString::new(path_string).unwrap();
659            let (sockaddr, len) = new_sockaddr_un(path_c_string.as_ptr());
660            if libc::bind(
661                fd,
662                &sockaddr as *const _ as *const sockaddr,
663                len as socklen_t,
664            ) != 0
665            {
666                return Err(UnixError::last());
667            }
668
669            if libc::listen(fd, 10) != 0 {
670                return Err(UnixError::last());
671            }
672
673            Ok((
674                OsIpcOneShotServer {
675                    fd,
676                    _temp_dir: temp_dir,
677                },
678                path_string.to_string(),
679            ))
680        }
681    }
682
683    #[allow(clippy::type_complexity)]
684    pub fn accept(self) -> Result<(OsIpcReceiver, IpcMessage), UnixError> {
685        unsafe {
686            let sockaddr: *mut sockaddr = ptr::null_mut();
687            let sockaddr_len: *mut socklen_t = ptr::null_mut();
688            let client_fd = libc::accept(self.fd, sockaddr, sockaddr_len);
689            if client_fd < 0 {
690                return Err(UnixError::last());
691            }
692            make_socket_lingering(client_fd)?;
693
694            let receiver = OsIpcReceiver::from_fd(client_fd);
695            let ipc_message = receiver.recv()?;
696            Ok((receiver, ipc_message))
697        }
698    }
699}
700
701// Make sure that the kernel doesn't return errors to readers if there's still data left after we
702// close our end.
703//
704// See, for example, https://github.com/servo/ipc-channel/issues/29
705fn make_socket_lingering(sockfd: c_int) -> Result<(), UnixError> {
706    let linger = linger {
707        l_onoff: 1,
708        l_linger: 30,
709    };
710    let err = unsafe {
711        setsockopt(
712            sockfd,
713            SOL_SOCKET,
714            SO_LINGER,
715            &linger as *const _ as *const c_void,
716            mem::size_of::<linger>() as socklen_t,
717        )
718    };
719    if err < 0 {
720        let error = UnixError::last();
721        if let UnixError::Errno(libc::EINVAL) = error {
722            // If the other side of the connection is already closed, POSIX.1-2024 (and earlier
723            // versions) require that setsockopt return EINVAL [1]. This is a bit unfortunate
724            // because SO_LINGER for a closed socket is logically a no-op, which is why some OSes
725            // like Linux don't follow this part of the spec. But other OSes like illumos do return
726            // EINVAL here.
727            //
728            // SO_LINGER is widely understood and EINVAL should not occur for any other reason, so
729            // accept those errors.
730            //
731            // Another option would be to call make_socket_lingering on the initial socket created
732            // by libc::socket, but whether accept inherits a particular option is
733            // implementation-defined [2]. This means that special-casing EINVAL is the most
734            // portable thing to do.
735            //
736            // [1] https://pubs.opengroup.org/onlinepubs/9799919799/functions/setsockopt.html:
737            //     "[EINVAL] The specified option is invalid at the specified socket level or the
738            //     socket has been shut down."
739            //
740            // [2] https://pubs.opengroup.org/onlinepubs/9799919799/functions/accept.html: "It is
741            //     implementation-defined which socket options, if any, on the accepted socket will
742            //     have a default value determined by a value previously customized by setsockopt()
743            //     on socket, rather than the default value used for other new sockets."
744        } else {
745            return Err(error);
746        }
747    }
748    Ok(())
749}
750
751struct BackingStore {
752    fd: c_int,
753}
754
755impl BackingStore {
756    pub fn new(length: usize) -> BackingStore {
757        let count = SHM_COUNT.fetch_add(1, Ordering::Relaxed);
758        let timestamp = UNIX_EPOCH.elapsed().unwrap();
759        let name = CString::new(format!(
760            "/ipc-channel-shared-memory.{}.{}.{}.{}",
761            count,
762            *PID,
763            timestamp.as_secs(),
764            timestamp.subsec_nanos()
765        ))
766        .unwrap();
767        let fd = create_shmem(name, length);
768        Self::from_fd(fd)
769    }
770
771    pub fn from_fd(fd: c_int) -> BackingStore {
772        BackingStore { fd }
773    }
774
775    pub fn fd(&self) -> c_int {
776        self.fd
777    }
778
779    pub unsafe fn map_file(&self, length: Option<size_t>) -> (*mut u8, size_t) {
780        let length = length.unwrap_or_else(|| {
781            let mut st = mem::MaybeUninit::uninit();
782            if libc::fstat(self.fd, st.as_mut_ptr()) != 0 {
783                panic!("error stating fd {}: {}", self.fd, UnixError::last());
784            }
785            st.assume_init().st_size as size_t
786        });
787        if length == 0 {
788            // This will cause `mmap` to fail, so handle it explicitly.
789            return (ptr::null_mut(), length);
790        }
791        let address = libc::mmap(
792            ptr::null_mut(),
793            length,
794            PROT_READ | PROT_WRITE,
795            MAP_SHARED,
796            self.fd,
797            0,
798        );
799        assert!(!address.is_null());
800        assert!(address != MAP_FAILED);
801        (address as *mut u8, length)
802    }
803}
804
805impl Drop for BackingStore {
806    fn drop(&mut self) {
807        unsafe {
808            let result = libc::close(self.fd);
809            assert!(thread::panicking() || result == 0);
810        }
811    }
812}
813
814pub struct OsIpcSharedMemory {
815    ptr: *mut u8,
816    length: usize,
817    store: BackingStore,
818}
819
820unsafe impl Send for OsIpcSharedMemory {}
821unsafe impl Sync for OsIpcSharedMemory {}
822
823impl Drop for OsIpcSharedMemory {
824    fn drop(&mut self) {
825        unsafe {
826            if !self.ptr.is_null() {
827                let result = libc::munmap(self.ptr as *mut c_void, self.length);
828                assert!(thread::panicking() || result == 0);
829            }
830        }
831    }
832}
833
834impl Clone for OsIpcSharedMemory {
835    fn clone(&self) -> OsIpcSharedMemory {
836        unsafe {
837            let store = BackingStore::from_fd(libc::dup(self.store.fd()));
838            let (address, _) = store.map_file(Some(self.length));
839            OsIpcSharedMemory::from_raw_parts(address, self.length, store)
840        }
841    }
842}
843
844impl PartialEq for OsIpcSharedMemory {
845    fn eq(&self, other: &OsIpcSharedMemory) -> bool {
846        **self == **other
847    }
848}
849
850impl Debug for OsIpcSharedMemory {
851    fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
852        (**self).fmt(formatter)
853    }
854}
855
856impl Deref for OsIpcSharedMemory {
857    type Target = [u8];
858
859    #[inline]
860    fn deref(&self) -> &[u8] {
861        unsafe { slice::from_raw_parts(self.ptr, self.length) }
862    }
863}
864
865impl OsIpcSharedMemory {
866    #[inline]
867    pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
868        unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
869    }
870}
871
872impl OsIpcSharedMemory {
873    unsafe fn from_raw_parts(
874        ptr: *mut u8,
875        length: usize,
876        store: BackingStore,
877    ) -> OsIpcSharedMemory {
878        OsIpcSharedMemory { ptr, length, store }
879    }
880
881    unsafe fn from_fd(fd: c_int) -> OsIpcSharedMemory {
882        let store = BackingStore::from_fd(fd);
883        let (ptr, length) = store.map_file(None);
884        OsIpcSharedMemory::from_raw_parts(ptr, length, store)
885    }
886
887    pub fn from_byte(byte: u8, length: usize) -> OsIpcSharedMemory {
888        unsafe {
889            let store = BackingStore::new(length);
890            let (address, _) = store.map_file(Some(length));
891            for element in slice::from_raw_parts_mut(address, length) {
892                *element = byte;
893            }
894            OsIpcSharedMemory::from_raw_parts(address, length, store)
895        }
896    }
897
898    pub fn from_bytes(bytes: &[u8]) -> OsIpcSharedMemory {
899        unsafe {
900            let store = BackingStore::new(bytes.len());
901            let (address, _) = store.map_file(Some(bytes.len()));
902            ptr::copy_nonoverlapping(bytes.as_ptr(), address, bytes.len());
903            OsIpcSharedMemory::from_raw_parts(address, bytes.len(), store)
904        }
905    }
906}
907
908#[derive(Debug)]
909pub enum UnixError {
910    Errno(c_int),
911    ChannelClosed,
912    IoError(io::Error),
913}
914
915impl UnixError {
916    fn last() -> UnixError {
917        UnixError::Errno(io::Error::last_os_error().raw_os_error().unwrap())
918    }
919
920    #[allow(dead_code)]
921    pub fn channel_is_closed(&self) -> bool {
922        matches!(self, UnixError::ChannelClosed)
923    }
924}
925
926impl fmt::Display for UnixError {
927    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
928        match self {
929            UnixError::Errno(errno) => {
930                fmt::Display::fmt(&io::Error::from_raw_os_error(*errno), fmt)
931            },
932            UnixError::ChannelClosed => write!(fmt, "All senders for this socket closed"),
933            UnixError::IoError(e) => write!(fmt, "{e}"),
934        }
935    }
936}
937
938impl StdError for UnixError {}
939
940impl From<UnixError> for bincode::Error {
941    fn from(unix_error: UnixError) -> Self {
942        io::Error::from(unix_error).into()
943    }
944}
945
946impl From<UnixError> for io::Error {
947    fn from(unix_error: UnixError) -> io::Error {
948        match unix_error {
949            UnixError::Errno(errno) => io::Error::from_raw_os_error(errno),
950            UnixError::ChannelClosed => io::Error::new(io::ErrorKind::ConnectionReset, unix_error),
951            UnixError::IoError(e) => e,
952        }
953    }
954}
955
956impl From<UnixError> for ipc::IpcError {
957    fn from(error: UnixError) -> Self {
958        match error {
959            UnixError::ChannelClosed => ipc::IpcError::Disconnected,
960            e => ipc::IpcError::Io(io::Error::from(e)),
961        }
962    }
963}
964
965impl From<UnixError> for ipc::TryRecvError {
966    fn from(error: UnixError) -> Self {
967        match error {
968            UnixError::ChannelClosed => ipc::TryRecvError::IpcError(ipc::IpcError::Disconnected),
969            UnixError::Errno(code) if code == EAGAIN || code == EWOULDBLOCK => {
970                ipc::TryRecvError::Empty
971            },
972            e => ipc::TryRecvError::IpcError(ipc::IpcError::Io(io::Error::from(e))),
973        }
974    }
975}
976
977impl From<io::Error> for UnixError {
978    fn from(e: io::Error) -> UnixError {
979        if let Some(errno) = e.raw_os_error() {
980            UnixError::Errno(errno)
981        } else {
982            assert!(e.kind() == io::ErrorKind::ConnectionReset);
983            UnixError::ChannelClosed
984        }
985    }
986}
987
988#[derive(Copy, Clone)]
989enum BlockingMode {
990    Blocking,
991    Nonblocking,
992    Timeout(Duration),
993}
994
995#[allow(clippy::uninit_vec, clippy::type_complexity)]
996fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result<IpcMessage, UnixError> {
997    let (mut channels, mut shared_memory_regions) = (Vec::new(), Vec::new());
998
999    // First fragments begins with a header recording the total data length.
1000    //
1001    // We use this to determine whether we already got the entire message,
1002    // or need to receive additional fragments -- and if so, how much.
1003    let mut total_size = 0usize;
1004    let mut main_data_buffer;
1005    unsafe {
1006        // Allocate a buffer without initialising the memory.
1007        main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size());
1008        main_data_buffer.set_len(OsIpcSender::get_max_fragment_size());
1009
1010        let mut iovec = [
1011            iovec {
1012                iov_base: &mut total_size as *mut _ as *mut c_void,
1013                iov_len: mem::size_of_val(&total_size),
1014            },
1015            iovec {
1016                iov_base: main_data_buffer.as_mut_ptr() as *mut c_void,
1017                iov_len: main_data_buffer.len(),
1018            },
1019        ];
1020        let mut cmsg = UnixCmsg::new(&mut iovec)?;
1021
1022        let bytes_read = cmsg.recv(fd, blocking_mode)?;
1023        main_data_buffer.set_len(bytes_read - mem::size_of_val(&total_size));
1024
1025        let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int;
1026        let cmsg_length = cmsg.msghdr.msg_controllen;
1027        let channel_length = if cmsg_length == 0 {
1028            0
1029        } else {
1030            // The control header is followed by an array of FDs. The size of the control header is
1031            // determined by CMSG_SPACE. (On Linux this would the same as CMSG_ALIGN, but that isn't
1032            // exposed by libc. CMSG_SPACE(0) is the portable version of that.)
1033            (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::<c_int>()
1034        };
1035        for index in 0..channel_length {
1036            let fd = *cmsg_fds.add(index);
1037            if is_socket(fd) {
1038                channels.push(OsOpaqueIpcChannel::from_fd(fd));
1039                continue;
1040            }
1041            shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd));
1042        }
1043    }
1044
1045    if total_size == main_data_buffer.len() {
1046        // Fast path: no fragments.
1047        return Ok(IpcMessage::new(
1048            main_data_buffer,
1049            channels,
1050            shared_memory_regions,
1051        ));
1052    }
1053
1054    // Reassemble fragments.
1055    //
1056    // The initial fragment carries the receive end of a dedicated channel
1057    // through which all the remaining fragments will be coming in.
1058    let dedicated_rx = channels.pop().unwrap().to_receiver();
1059
1060    // Extend the buffer to hold the entire message, without initialising the memory.
1061    let len = main_data_buffer.len();
1062    main_data_buffer.reserve_exact(total_size - len);
1063
1064    // Receive followup fragments directly into the main buffer.
1065    while main_data_buffer.len() < total_size {
1066        let write_pos = main_data_buffer.len();
1067        let end_pos = cmp::min(
1068            write_pos + OsIpcSender::fragment_size(*SYSTEM_SENDBUF_SIZE),
1069            total_size,
1070        );
1071        let result = unsafe {
1072            assert!(end_pos <= main_data_buffer.capacity());
1073            main_data_buffer.set_len(end_pos);
1074
1075            // Integer underflow could make the following code unsound...
1076            assert!(end_pos >= write_pos);
1077
1078            // Note: we always use blocking mode for followup fragments,
1079            // to make sure that once we start receiving a multi-fragment message,
1080            // we don't abort in the middle of it...
1081            let result = libc::recv(
1082                dedicated_rx.fd.get(),
1083                main_data_buffer[write_pos..].as_mut_ptr() as *mut c_void,
1084                end_pos - write_pos,
1085                0,
1086            );
1087            main_data_buffer.set_len(write_pos + cmp::max(result, 0) as usize);
1088            result
1089        };
1090
1091        match result.cmp(&0) {
1092            cmp::Ordering::Greater => continue,
1093            cmp::Ordering::Equal => return Err(UnixError::ChannelClosed),
1094            cmp::Ordering::Less => return Err(UnixError::last()),
1095        }
1096    }
1097
1098    Ok(IpcMessage::new(
1099        main_data_buffer,
1100        channels,
1101        shared_memory_regions,
1102    ))
1103}
1104
1105// https://github.com/servo/ipc-channel/issues/192
1106fn new_msghdr(iovec: &mut [iovec], cmsg_buffer: *mut cmsghdr, cmsg_space: MsgControlLen) -> msghdr {
1107    let mut msghdr: msghdr = unsafe { mem::zeroed() };
1108    msghdr.msg_name = ptr::null_mut();
1109    msghdr.msg_namelen = 0;
1110    msghdr.msg_iov = iovec.as_mut_ptr();
1111    msghdr.msg_iovlen = iovec.len() as IovLen;
1112    msghdr.msg_control = cmsg_buffer as *mut c_void;
1113    msghdr.msg_controllen = cmsg_space;
1114    msghdr.msg_flags = 0;
1115    msghdr
1116}
1117
1118fn create_shmem(name: CString, length: usize) -> c_int {
1119    unsafe {
1120        let fd = libc::memfd_create(name.as_ptr(), libc::MFD_CLOEXEC);
1121        assert!(fd >= 0);
1122        assert_eq!(libc::ftruncate(fd, length as off_t), 0);
1123        fd
1124    }
1125}
1126
1127struct UnixCmsg {
1128    cmsg_buffer: *mut cmsghdr,
1129    msghdr: msghdr,
1130}
1131
1132unsafe impl Send for UnixCmsg {}
1133
1134impl Drop for UnixCmsg {
1135    fn drop(&mut self) {
1136        unsafe {
1137            libc::free(self.cmsg_buffer as *mut c_void);
1138        }
1139    }
1140}
1141
1142impl UnixCmsg {
1143    unsafe fn new(iovec: &mut [iovec]) -> Result<UnixCmsg, UnixError> {
1144        let cmsg_length = CMSG_SPACE(MAX_FDS_IN_CMSG * (mem::size_of::<c_int>() as c_uint));
1145        let cmsg_buffer = libc::malloc(cmsg_length as usize) as *mut cmsghdr;
1146        if cmsg_buffer.is_null() {
1147            return Err(UnixError::last());
1148        }
1149        Ok(UnixCmsg {
1150            cmsg_buffer,
1151            msghdr: new_msghdr(iovec, cmsg_buffer, cmsg_length as MsgControlLen),
1152        })
1153    }
1154
1155    unsafe fn recv(&mut self, fd: c_int, blocking_mode: BlockingMode) -> Result<usize, UnixError> {
1156        match blocking_mode {
1157            BlockingMode::Nonblocking => {
1158                if libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) < 0 {
1159                    return Err(UnixError::last());
1160                }
1161            },
1162            BlockingMode::Timeout(duration) => {
1163                let events = libc::POLLIN | libc::POLLPRI | libc::POLLRDHUP;
1164
1165                let mut fd = [libc::pollfd {
1166                    fd,
1167                    events,
1168                    revents: 0,
1169                }];
1170                let result = libc::poll(
1171                    fd.as_mut_ptr(),
1172                    fd.len() as _,
1173                    duration.as_millis().try_into().unwrap_or(-1),
1174                );
1175
1176                match result.cmp(&0) {
1177                    cmp::Ordering::Equal => return Err(UnixError::Errno(EAGAIN)),
1178                    cmp::Ordering::Less => return Err(UnixError::last()),
1179                    cmp::Ordering::Greater => {},
1180                }
1181            },
1182            BlockingMode::Blocking => {},
1183        }
1184
1185        let result = recvmsg(fd, &mut self.msghdr, RECVMSG_FLAGS);
1186
1187        let result = match result.cmp(&0) {
1188            cmp::Ordering::Equal => Err(UnixError::ChannelClosed),
1189            cmp::Ordering::Less => Err(UnixError::last()),
1190            cmp::Ordering::Greater => Ok(result as usize),
1191        };
1192
1193        if let BlockingMode::Nonblocking = blocking_mode {
1194            if libc::fcntl(fd, libc::F_SETFL, 0) < 0 {
1195                return Err(UnixError::last());
1196            }
1197        }
1198        result
1199    }
1200
1201    unsafe fn cmsg_len(&self) -> size_t {
1202        (*(self.msghdr.msg_control as *const cmsghdr)).cmsg_len as size_t
1203    }
1204}
1205
1206fn is_socket(fd: c_int) -> bool {
1207    unsafe {
1208        let mut st = mem::MaybeUninit::uninit();
1209        if libc::fstat(fd, st.as_mut_ptr()) != 0 {
1210            return false;
1211        }
1212        (st.assume_init().st_mode & S_IFMT) == S_IFSOCK
1213    }
1214}