Skip to main content

ipc_channel/platform/unix/
mod.rs

1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10use crate::ipc::IpcMessage;
11use libc::{
12    self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, PROT_READ,
13    PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET,
14};
15use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK};
16use libc::{iovec, msghdr, off_t, recvmsg, sendmsg};
17use libc::{sa_family_t, setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t};
18use libc::{EAGAIN, EWOULDBLOCK};
19use mio::unix::SourceFd;
20use mio::{Events, Interest, Poll, Token};
21use rustc_hash::FxHasher;
22use std::cell::Cell;
23use std::cmp;
24use std::collections::HashMap;
25use std::convert::TryInto;
26use std::ffi::{c_uint, CString};
27use std::fmt::{self, Debug, Formatter};
28use std::hash::BuildHasherDefault;
29use std::io;
30use std::mem;
31use std::ops::{Deref, RangeFrom};
32use std::os::fd::RawFd;
33use std::ptr;
34use std::slice;
35use std::sync::atomic::{AtomicUsize, Ordering};
36use std::sync::{Arc, LazyLock};
37use std::thread;
38use std::time::{Duration, UNIX_EPOCH};
39use tempfile::{Builder, TempDir};
40use thiserror::Error;
41
42const MAX_FDS_IN_CMSG: u32 = 64;
43
44// The value Linux returns for SO_SNDBUF
45// is not the size we are actually allowed to use...
46// Empirically, we have to deduct 32 bytes from that.
47const RESERVED_SIZE: usize = 32;
48
49#[cfg(any(target_os = "linux", target_os = "illumos"))]
50const SOCK_FLAGS: c_int = libc::SOCK_CLOEXEC;
51#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
52const SOCK_FLAGS: c_int = 0;
53
54#[cfg(any(target_os = "linux", target_os = "illumos"))]
55const RECVMSG_FLAGS: c_int = libc::MSG_CMSG_CLOEXEC;
56#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
57const RECVMSG_FLAGS: c_int = 0;
58
59#[cfg(target_env = "gnu")]
60type IovLen = usize;
61
62#[cfg(not(target_env = "gnu"))]
63type IovLen = i32;
64#[cfg(target_env = "gnu")]
65type MsgControlLen = size_t;
66#[cfg(not(target_env = "gnu"))]
67type MsgControlLen = socklen_t;
68
69#[derive(Debug, Error)]
70pub enum OsTrySelectError {
71    #[error("Error in IO: {0}.")]
72    IoError(#[from] UnixError),
73    #[error("No messages were received and no disconnections occurred.")]
74    Empty,
75}
76
77unsafe fn new_sockaddr_un(path: *const c_char) -> (sockaddr_un, usize) {
78    let mut sockaddr: sockaddr_un = mem::zeroed();
79    libc::strncpy(
80        sockaddr.sun_path.as_mut_ptr(),
81        path,
82        sockaddr.sun_path.len() - 1,
83    );
84    sockaddr.sun_family = libc::AF_UNIX as sa_family_t;
85    (sockaddr, mem::size_of::<sockaddr_un>())
86}
87
88static SYSTEM_SENDBUF_SIZE: LazyLock<usize> = LazyLock::new(|| {
89    let (tx, _) = channel().expect("Failed to obtain a socket for checking maximum send size");
90    tx.get_system_sendbuf_size()
91        .expect("Failed to obtain maximum send size for socket")
92});
93
94// The pid of the current process which is used to create unique IDs
95static PID: LazyLock<u32> = LazyLock::new(std::process::id);
96
97// A global count used to create unique IDs
98static SHM_COUNT: AtomicUsize = AtomicUsize::new(0);
99
100pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), UnixError> {
101    let mut results = [0, 0];
102    unsafe {
103        if socketpair(
104            libc::AF_UNIX,
105            SOCK_SEQPACKET | SOCK_FLAGS,
106            0,
107            &mut results[0],
108        ) >= 0
109        {
110            Ok((
111                OsIpcSender::from_fd(results[0]),
112                OsIpcReceiver::from_fd(results[1]),
113            ))
114        } else {
115            Err(UnixError::last())
116        }
117    }
118}
119
120#[derive(Clone, Copy)]
121struct PollEntry {
122    pub id: u64,
123    pub fd: RawFd,
124}
125
126#[derive(Debug)]
127pub struct OsIpcReceiver {
128    fd: Cell<c_int>,
129}
130
131impl Drop for OsIpcReceiver {
132    fn drop(&mut self) {
133        unsafe {
134            let fd = self.fd.get();
135            if fd >= 0 {
136                let result = libc::close(fd);
137                assert!(
138                    thread::panicking() || result == 0,
139                    "closed receiver (fd: {}): {}",
140                    fd,
141                    UnixError::last(),
142                );
143            }
144        }
145    }
146}
147
148impl OsIpcReceiver {
149    fn from_fd(fd: c_int) -> OsIpcReceiver {
150        OsIpcReceiver { fd: Cell::new(fd) }
151    }
152
153    fn consume_fd(&self) -> c_int {
154        let fd = self.fd.get();
155        self.fd.set(-1);
156        fd
157    }
158
159    pub fn consume(&self) -> OsIpcReceiver {
160        OsIpcReceiver::from_fd(self.consume_fd())
161    }
162
163    #[allow(clippy::type_complexity)]
164    pub fn recv(&self) -> Result<IpcMessage, UnixError> {
165        recv(self.fd.get(), BlockingMode::Blocking)
166    }
167
168    #[allow(clippy::type_complexity)]
169    pub fn try_recv(&self) -> Result<IpcMessage, UnixError> {
170        recv(self.fd.get(), BlockingMode::Nonblocking)
171    }
172
173    #[allow(clippy::type_complexity)]
174    pub fn try_recv_timeout(&self, duration: Duration) -> Result<IpcMessage, UnixError> {
175        recv(self.fd.get(), BlockingMode::Timeout(duration))
176    }
177}
178
179#[derive(PartialEq, Debug)]
180struct SharedFileDescriptor(c_int);
181
182impl Drop for SharedFileDescriptor {
183    fn drop(&mut self) {
184        unsafe {
185            let result = libc::close(self.0);
186            assert!(thread::panicking() || result == 0);
187        }
188    }
189}
190
191#[derive(Debug, Clone)]
192pub struct OsIpcSender {
193    fd: Arc<SharedFileDescriptor>,
194}
195
196impl OsIpcSender {
197    fn from_fd(fd: c_int) -> OsIpcSender {
198        OsIpcSender {
199            fd: Arc::new(SharedFileDescriptor(fd)),
200        }
201    }
202
203    /// Maximum size of the kernel buffer used for transfers over this channel.
204    ///
205    /// Note: This is *not* the actual maximal packet size we are allowed to use...
206    /// Some of it is reserved by the kernel for bookkeeping.
207    fn get_system_sendbuf_size(&self) -> Result<usize, UnixError> {
208        unsafe {
209            let mut socket_sendbuf_size: c_int = 0;
210            let mut socket_sendbuf_size_len = mem::size_of::<c_int>() as socklen_t;
211            if getsockopt(
212                self.fd.0,
213                libc::SOL_SOCKET,
214                libc::SO_SNDBUF,
215                &mut socket_sendbuf_size as *mut _ as *mut c_void,
216                &mut socket_sendbuf_size_len as *mut socklen_t,
217            ) < 0
218            {
219                return Err(UnixError::last());
220            }
221            Ok(socket_sendbuf_size.try_into().unwrap())
222        }
223    }
224
225    /// Calculate maximum payload data size per fragment.
226    ///
227    /// It is the total size of the kernel buffer, minus the part reserved by the kernel.
228    ///
229    /// The `sendbuf_size` passed in should usually be the maximum kernel buffer size,
230    /// i.e. the value of *SYSTEM_SENDBUF_SIZE --
231    /// except after getting ENOBUFS, in which case it needs to be reduced.
232    fn fragment_size(sendbuf_size: usize) -> usize {
233        sendbuf_size - RESERVED_SIZE
234    }
235
236    /// Calculate maximum payload data size of first fragment.
237    ///
238    /// This one is smaller than regular fragments, because it carries the message (size) header.
239    fn first_fragment_size(sendbuf_size: usize) -> usize {
240        (Self::fragment_size(sendbuf_size) - mem::size_of::<usize>()) & (!8usize + 1)
241        // Ensure optimal alignment.
242    }
243
244    /// Maximum data size that can be transferred over this channel in a single packet.
245    ///
246    /// This is the size of the main data chunk only --
247    /// it's independent of any auxiliary data (FDs) transferred along with it.
248    ///
249    /// A send on this channel won't block for transfers up to this size
250    /// under normal circumstances.
251    /// (It might still block if heavy memory pressure causes ENOBUFS,
252    /// forcing us to reduce the packet size.)
253    pub fn get_max_fragment_size() -> usize {
254        Self::first_fragment_size(*SYSTEM_SENDBUF_SIZE)
255    }
256
257    pub fn send(
258        &self,
259        data: &[u8],
260        channels: Vec<OsIpcChannel>,
261        shared_memory_regions: Vec<OsIpcSharedMemory>,
262    ) -> Result<(), UnixError> {
263        let mut fds = Vec::new();
264        for channel in channels.iter() {
265            fds.push(channel.fd());
266        }
267        for shared_memory_region in shared_memory_regions.iter() {
268            fds.push(shared_memory_region.store.fd());
269        }
270
271        // `len` is the total length of the message.
272        // Its value will be sent as a message header before the payload data.
273        //
274        // Not to be confused with the length of the data to send in this packet
275        // (i.e. the length of the data buffer passed in),
276        // which in a fragmented send will be smaller than the total message length.
277        fn send_first_fragment(
278            sender_fd: c_int,
279            fds: &[c_int],
280            data_buffer: &[u8],
281            len: usize,
282        ) -> Result<(), UnixError> {
283            let result = unsafe {
284                let cmsg_length = mem::size_of_val(fds) as c_uint;
285                let (cmsg_buffer, cmsg_space) = if cmsg_length > 0 {
286                    let cmsg_buffer =
287                        libc::malloc(CMSG_SPACE(cmsg_length) as usize) as *mut cmsghdr;
288                    if cmsg_buffer.is_null() {
289                        return Err(UnixError::last());
290                    }
291                    (*cmsg_buffer).cmsg_len = CMSG_LEN(cmsg_length) as MsgControlLen;
292                    (*cmsg_buffer).cmsg_level = libc::SOL_SOCKET;
293                    (*cmsg_buffer).cmsg_type = libc::SCM_RIGHTS;
294
295                    ptr::copy_nonoverlapping(
296                        fds.as_ptr(),
297                        CMSG_DATA(cmsg_buffer) as *mut c_int,
298                        fds.len(),
299                    );
300                    (cmsg_buffer, CMSG_SPACE(cmsg_length))
301                } else {
302                    (ptr::null_mut(), 0)
303                };
304
305                let mut iovec = [
306                    // First fragment begins with a header recording the total data length.
307                    //
308                    // The receiver uses this to determine
309                    // whether it already got the entire message,
310                    // or needs to receive additional fragments -- and if so, how much.
311                    iovec {
312                        iov_base: &len as *const _ as *mut c_void,
313                        iov_len: mem::size_of_val(&len),
314                    },
315                    iovec {
316                        iov_base: data_buffer.as_ptr() as *mut c_void,
317                        iov_len: data_buffer.len(),
318                    },
319                ];
320
321                let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen);
322                let result = sendmsg(sender_fd, &msghdr, 0);
323                libc::free(cmsg_buffer as *mut c_void);
324                result
325            };
326
327            if result > 0 {
328                Ok(())
329            } else {
330                Err(UnixError::last())
331            }
332        }
333
334        fn send_followup_fragment(sender_fd: c_int, data_buffer: &[u8]) -> Result<(), UnixError> {
335            let result = unsafe {
336                libc::send(
337                    sender_fd,
338                    data_buffer.as_ptr() as *const c_void,
339                    data_buffer.len(),
340                    0,
341                )
342            };
343
344            if result > 0 {
345                Ok(())
346            } else {
347                Err(UnixError::last())
348            }
349        }
350
351        let mut sendbuf_size = *SYSTEM_SENDBUF_SIZE;
352
353        /// Reduce send buffer size after getting ENOBUFS,
354        /// i.e. when the kernel failed to allocate a large enough buffer.
355        ///
356        /// (If the buffer already was significantly smaller
357        /// than the memory page size though,
358        /// if means something else must have gone wrong;
359        /// so there is no point in further downsizing,
360        /// and we error out instead.)
361        fn downsize(sendbuf_size: &mut usize, sent_size: usize) -> Result<(), ()> {
362            if sent_size > 2000 {
363                *sendbuf_size /= 2;
364                // Make certain we end up with less than what we tried before...
365                if *sendbuf_size >= sent_size {
366                    *sendbuf_size = sent_size / 2;
367                }
368                Ok(())
369            } else {
370                Err(())
371            }
372        }
373
374        // If the message is small enough, try sending it in a single fragment.
375        if data.len() <= Self::get_max_fragment_size() {
376            match send_first_fragment(self.fd.0, &fds[..], data, data.len()) {
377                Ok(_) => return Ok(()),
378                Err(error) => {
379                    // ENOBUFS means the kernel failed to allocate a buffer large enough
380                    // to actually transfer the message,
381                    // although the message was small enough to fit the maximum send size --
382                    // so we have to proceed with a fragmented send nevertheless,
383                    // using a reduced send buffer size.
384                    //
385                    // Any other errors we might get here are non-recoverable.
386                    if !(matches!(error, UnixError::Errno(libc::ENOBUFS))
387                        && downsize(&mut sendbuf_size, data.len()).is_ok())
388                    {
389                        return Err(error);
390                    }
391                },
392            }
393        }
394
395        // The packet is too big. Fragmentation time!
396        //
397        // Create dedicated channel to send all but the first fragment.
398        // This way we avoid fragments of different messages interleaving in the receiver.
399        //
400        // The receiver end of the channel is sent with the first fragment
401        // along any other file descriptors that are to be transferred in the message.
402        let (dedicated_tx, dedicated_rx) = channel()?;
403        // Extract FD handle without consuming the Receiver, so the FD doesn't get closed.
404        fds.push(dedicated_rx.fd.get());
405
406        // Split up the packet into fragments.
407        let mut byte_position = 0;
408        while byte_position < data.len() {
409            let end_byte_position;
410            let result = if byte_position == 0 {
411                // First fragment. No offset; but contains message header (total size).
412                // The auxiliary data (FDs) is also sent along with this one.
413
414                // This fragment always uses the full allowable buffer size.
415                end_byte_position = Self::first_fragment_size(sendbuf_size);
416                send_first_fragment(self.fd.0, &fds[..], &data[..end_byte_position], data.len())
417            } else {
418                // Followup fragment. No header; but offset by amount of data already sent.
419
420                end_byte_position = cmp::min(
421                    byte_position + Self::fragment_size(sendbuf_size),
422                    data.len(),
423                );
424                send_followup_fragment(dedicated_tx.fd.0, &data[byte_position..end_byte_position])
425            };
426
427            if let Err(error) = result {
428                if matches!(error, UnixError::Errno(libc::ENOBUFS))
429                    && downsize(&mut sendbuf_size, end_byte_position - byte_position).is_ok()
430                {
431                    // If the kernel failed to allocate a buffer large enough for the packet,
432                    // retry with a smaller size (if possible).
433                    continue;
434                } else {
435                    return Err(error);
436                }
437            }
438
439            byte_position = end_byte_position;
440        }
441
442        Ok(())
443    }
444
445    pub fn connect(name: String) -> Result<OsIpcSender, UnixError> {
446        let name = CString::new(name).unwrap();
447        unsafe {
448            let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
449            let (sockaddr, len) = new_sockaddr_un(name.as_ptr());
450            if libc::connect(
451                fd,
452                &sockaddr as *const _ as *const sockaddr,
453                len as socklen_t,
454            ) < 0
455            {
456                return Err(UnixError::last());
457            }
458
459            Ok(OsIpcSender::from_fd(fd))
460        }
461    }
462}
463
464#[derive(Debug)]
465pub enum OsIpcChannel {
466    Sender(OsIpcSender),
467    Receiver(OsIpcReceiver),
468}
469
470impl OsIpcChannel {
471    fn fd(&self) -> c_int {
472        match *self {
473            OsIpcChannel::Sender(ref sender) => sender.fd.0,
474            OsIpcChannel::Receiver(ref receiver) => receiver.fd.get(),
475        }
476    }
477}
478
479pub struct OsIpcReceiverSet {
480    incrementor: RangeFrom<u64>,
481    poll: Poll,
482    pollfds: HashMap<Token, PollEntry, BuildHasherDefault<FxHasher>>,
483    events: Events,
484}
485
486impl Drop for OsIpcReceiverSet {
487    fn drop(&mut self) {
488        for &PollEntry { id: _, fd } in self.pollfds.values() {
489            let result = unsafe { libc::close(fd) };
490            assert!(thread::panicking() || result == 0);
491        }
492    }
493}
494
495impl OsIpcReceiverSet {
496    pub fn new() -> Result<OsIpcReceiverSet, UnixError> {
497        let fx = BuildHasherDefault::<FxHasher>::default();
498        Ok(OsIpcReceiverSet {
499            incrementor: 0..,
500            poll: Poll::new()?,
501            pollfds: HashMap::with_hasher(fx),
502            events: Events::with_capacity(10),
503        })
504    }
505
506    pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64, UnixError> {
507        let last_index = self.incrementor.next().unwrap();
508        let fd = receiver.consume_fd();
509        let fd_token = Token(fd as usize);
510        let poll_entry = PollEntry { id: last_index, fd };
511        self.poll
512            .registry()
513            .register(&mut SourceFd(&fd), fd_token, Interest::READABLE)?;
514        self.pollfds.insert(fd_token, poll_entry);
515        Ok(last_index)
516    }
517
518    pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>, UnixError> {
519        // Poll until we receive at least one event.
520        loop {
521            match self.poll.poll(&mut self.events, None) {
522                Ok(()) if !self.events.is_empty() => break,
523                Ok(()) => {},
524                Err(ref error) => {
525                    if error.kind() != io::ErrorKind::Interrupted {
526                        return Err(UnixError::last());
527                    }
528                },
529            }
530
531            if !self.events.is_empty() {
532                break;
533            }
534        }
535
536        match self.selection_results() {
537            Ok(v) => Ok(v),
538            Err(OsTrySelectError::Empty) => panic!("Blocking select cannot return Empty"),
539            Err(OsTrySelectError::IoError(e)) => Err(e),
540        }
541    }
542
543    pub fn try_select(&mut self) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
544        // Non-blocking poll to receive zero or more events.
545        self.try_select_timeout(Duration::ZERO)
546    }
547
548    pub fn try_select_timeout(
549        &mut self,
550        duration: Duration,
551    ) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
552        // Poll for the specified duration until we receive zero or more events.
553        match self.poll.poll(&mut self.events, Some(duration)) {
554            Ok(()) => (),
555            Err(ref error) => {
556                if error.kind() != io::ErrorKind::Interrupted {
557                    return Err(OsTrySelectError::IoError(UnixError::last()));
558                }
559            },
560        }
561
562        let v = self.selection_results()?;
563        if v.is_empty() {
564            Err(OsTrySelectError::Empty)
565        } else {
566            Ok(v)
567        }
568    }
569
570    fn selection_results(&mut self) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
571        let mut selection_results = Vec::new();
572        for event in self.events.iter() {
573            // We only register this `Poll` for readable events.
574            assert!(event.is_readable());
575
576            let event_token = event.token();
577            let poll_entry = *self
578                .pollfds
579                .get(&event_token)
580                .expect("Got event for unknown token.");
581            loop {
582                match recv(poll_entry.fd, BlockingMode::Nonblocking) {
583                    Ok(ipc_message) => {
584                        selection_results.push(OsIpcSelectionResult::DataReceived(
585                            poll_entry.id,
586                            ipc_message,
587                        ));
588                    },
589                    Err(err) if err.channel_is_closed() => {
590                        self.pollfds.remove(&event_token).unwrap();
591                        self.poll
592                            .registry()
593                            .deregister(&mut SourceFd(&poll_entry.fd))
594                            .unwrap();
595                        unsafe {
596                            libc::close(poll_entry.fd);
597                        }
598
599                        selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id));
600                        break;
601                    },
602                    Err(UnixError::Errno(code)) if code == EWOULDBLOCK => {
603                        // We tried to read another message from the file descriptor and
604                        // it would have blocked, so we have exhausted all of the data
605                        // pending to read.
606                        break;
607                    },
608                    Err(e) => return Err(OsTrySelectError::IoError(e)),
609                }
610            }
611        }
612
613        Ok(selection_results)
614    }
615}
616
617pub enum OsIpcSelectionResult {
618    DataReceived(u64, IpcMessage),
619    ChannelClosed(u64),
620}
621
622impl OsIpcSelectionResult {
623    pub fn unwrap(self) -> (u64, IpcMessage) {
624        match self {
625            OsIpcSelectionResult::DataReceived(id, ipc_message) => (id, ipc_message),
626            OsIpcSelectionResult::ChannelClosed(id) => {
627                panic!("OsIpcSelectionResult::unwrap(): receiver ID {id} was closed!")
628            },
629        }
630    }
631}
632
633#[derive(Debug)]
634pub struct OsOpaqueIpcChannel {
635    fd: c_int,
636}
637
638impl Drop for OsOpaqueIpcChannel {
639    fn drop(&mut self) {
640        // Make sure we don't leak!
641        //
642        // The `OsOpaqueIpcChannel` objects should always be used,
643        // i.e. converted with `to_sender()` or `to_receiver()` --
644        // so the value should already be unset before the object gets dropped.
645        debug_assert!(self.fd == -1);
646    }
647}
648
649impl OsOpaqueIpcChannel {
650    fn from_fd(fd: c_int) -> OsOpaqueIpcChannel {
651        OsOpaqueIpcChannel { fd }
652    }
653
654    pub fn to_sender(&mut self) -> OsIpcSender {
655        OsIpcSender::from_fd(mem::replace(&mut self.fd, -1))
656    }
657
658    pub fn to_receiver(&mut self) -> OsIpcReceiver {
659        OsIpcReceiver::from_fd(mem::replace(&mut self.fd, -1))
660    }
661}
662
663pub struct OsIpcOneShotServer {
664    fd: c_int,
665
666    // Object representing the temporary directory the socket was created in.
667    // The directory is automatically deleted (along with the socket inside it)
668    // when this field is dropped.
669    _temp_dir: TempDir,
670}
671
672impl Drop for OsIpcOneShotServer {
673    fn drop(&mut self) {
674        unsafe {
675            let result = libc::close(self.fd);
676            assert!(thread::panicking() || result == 0);
677        }
678    }
679}
680
681impl OsIpcOneShotServer {
682    pub fn new() -> Result<(OsIpcOneShotServer, String), UnixError> {
683        unsafe {
684            let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
685            let temp_dir = Builder::new().tempdir()?;
686            let socket_path = temp_dir.path().join("socket");
687            let path_string = socket_path.to_str().unwrap();
688
689            let path_c_string = CString::new(path_string).unwrap();
690            let (sockaddr, len) = new_sockaddr_un(path_c_string.as_ptr());
691            if libc::bind(
692                fd,
693                &sockaddr as *const _ as *const sockaddr,
694                len as socklen_t,
695            ) != 0
696            {
697                return Err(UnixError::last());
698            }
699
700            if libc::listen(fd, 10) != 0 {
701                return Err(UnixError::last());
702            }
703
704            Ok((
705                OsIpcOneShotServer {
706                    fd,
707                    _temp_dir: temp_dir,
708                },
709                path_string.to_string(),
710            ))
711        }
712    }
713
714    #[allow(clippy::type_complexity)]
715    pub fn accept(self) -> Result<(OsIpcReceiver, IpcMessage), UnixError> {
716        unsafe {
717            let sockaddr: *mut sockaddr = ptr::null_mut();
718            let sockaddr_len: *mut socklen_t = ptr::null_mut();
719            let client_fd = libc::accept(self.fd, sockaddr, sockaddr_len);
720            if client_fd < 0 {
721                return Err(UnixError::last());
722            }
723            make_socket_lingering(client_fd)?;
724
725            let receiver = OsIpcReceiver::from_fd(client_fd);
726            let ipc_message = receiver.recv()?;
727            Ok((receiver, ipc_message))
728        }
729    }
730}
731
732// Make sure that the kernel doesn't return errors to readers if there's still data left after we
733// close our end.
734//
735// See, for example, https://github.com/servo/ipc-channel/issues/29
736fn make_socket_lingering(sockfd: c_int) -> Result<(), UnixError> {
737    let linger = linger {
738        l_onoff: 1,
739        l_linger: 30,
740    };
741    let err = unsafe {
742        setsockopt(
743            sockfd,
744            SOL_SOCKET,
745            SO_LINGER,
746            &linger as *const _ as *const c_void,
747            mem::size_of::<linger>() as socklen_t,
748        )
749    };
750    if err < 0 {
751        let error = UnixError::last();
752        if let UnixError::Errno(libc::EINVAL) = error {
753            // If the other side of the connection is already closed, POSIX.1-2024 (and earlier
754            // versions) require that setsockopt return EINVAL [1]. This is a bit unfortunate
755            // because SO_LINGER for a closed socket is logically a no-op, which is why some OSes
756            // like Linux don't follow this part of the spec. But other OSes like illumos do return
757            // EINVAL here.
758            //
759            // SO_LINGER is widely understood and EINVAL should not occur for any other reason, so
760            // accept those errors.
761            //
762            // Another option would be to call make_socket_lingering on the initial socket created
763            // by libc::socket, but whether accept inherits a particular option is
764            // implementation-defined [2]. This means that special-casing EINVAL is the most
765            // portable thing to do.
766            //
767            // [1] https://pubs.opengroup.org/onlinepubs/9799919799/functions/setsockopt.html:
768            //     "[EINVAL] The specified option is invalid at the specified socket level or the
769            //     socket has been shut down."
770            //
771            // [2] https://pubs.opengroup.org/onlinepubs/9799919799/functions/accept.html: "It is
772            //     implementation-defined which socket options, if any, on the accepted socket will
773            //     have a default value determined by a value previously customized by setsockopt()
774            //     on socket, rather than the default value used for other new sockets."
775        } else {
776            return Err(error);
777        }
778    }
779    Ok(())
780}
781
782struct BackingStore {
783    fd: c_int,
784}
785
786impl BackingStore {
787    pub fn new(length: usize) -> BackingStore {
788        let count = SHM_COUNT.fetch_add(1, Ordering::Relaxed);
789        let timestamp = UNIX_EPOCH.elapsed().unwrap();
790        let name = CString::new(format!(
791            "/ipc-channel-shared-memory.{}.{}.{}.{}",
792            count,
793            *PID,
794            timestamp.as_secs(),
795            timestamp.subsec_nanos()
796        ))
797        .unwrap();
798        let fd = create_shmem(name, length);
799        Self::from_fd(fd)
800    }
801
802    pub fn from_fd(fd: c_int) -> BackingStore {
803        BackingStore { fd }
804    }
805
806    pub fn fd(&self) -> c_int {
807        self.fd
808    }
809
810    pub unsafe fn map_file(&self, length: Option<size_t>) -> (*mut u8, size_t) {
811        let length = length.unwrap_or_else(|| {
812            let mut st = mem::MaybeUninit::uninit();
813            if libc::fstat(self.fd, st.as_mut_ptr()) != 0 {
814                panic!("error stating fd {}: {}", self.fd, UnixError::last());
815            }
816            st.assume_init().st_size as size_t
817        });
818        if length == 0 {
819            // This will cause `mmap` to fail, so handle it explicitly.
820            return (ptr::null_mut(), length);
821        }
822        let address = libc::mmap(
823            ptr::null_mut(),
824            length,
825            PROT_READ | PROT_WRITE,
826            MAP_SHARED,
827            self.fd,
828            0,
829        );
830        assert!(!address.is_null());
831        assert!(address != MAP_FAILED);
832        (address as *mut u8, length)
833    }
834}
835
836impl Drop for BackingStore {
837    fn drop(&mut self) {
838        unsafe {
839            let result = libc::close(self.fd);
840            assert!(thread::panicking() || result == 0);
841        }
842    }
843}
844
845pub struct OsIpcSharedMemory {
846    ptr: *mut u8,
847    length: usize,
848    store: BackingStore,
849}
850
851unsafe impl Send for OsIpcSharedMemory {}
852unsafe impl Sync for OsIpcSharedMemory {}
853
854impl Drop for OsIpcSharedMemory {
855    fn drop(&mut self) {
856        unsafe {
857            if !self.ptr.is_null() {
858                let result = libc::munmap(self.ptr as *mut c_void, self.length);
859                assert!(thread::panicking() || result == 0);
860            }
861        }
862    }
863}
864
865impl Clone for OsIpcSharedMemory {
866    fn clone(&self) -> OsIpcSharedMemory {
867        unsafe {
868            let store = BackingStore::from_fd(libc::dup(self.store.fd()));
869            let (address, _) = store.map_file(Some(self.length));
870            OsIpcSharedMemory::from_raw_parts(address, self.length, store)
871        }
872    }
873}
874
875impl PartialEq for OsIpcSharedMemory {
876    fn eq(&self, other: &OsIpcSharedMemory) -> bool {
877        **self == **other
878    }
879}
880
881impl Debug for OsIpcSharedMemory {
882    fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
883        (**self).fmt(formatter)
884    }
885}
886
887impl Deref for OsIpcSharedMemory {
888    type Target = [u8];
889
890    #[inline]
891    fn deref(&self) -> &[u8] {
892        unsafe { slice::from_raw_parts(self.ptr, self.length) }
893    }
894}
895
896impl OsIpcSharedMemory {
897    /// # Safety
898    ///
899    /// This is safe if there is only one reader/writer on the data.
900    /// User can achieve this by not cloning [crate::ipc::IpcSharedMemory]
901    /// and serializing/deserializing only once.
902    #[inline]
903    pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
904        unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
905    }
906
907    pub fn take(self) -> Option<Vec<u8>> {
908        Some((*self).to_vec())
909    }
910}
911
912impl OsIpcSharedMemory {
913    unsafe fn from_raw_parts(
914        ptr: *mut u8,
915        length: usize,
916        store: BackingStore,
917    ) -> OsIpcSharedMemory {
918        OsIpcSharedMemory { ptr, length, store }
919    }
920
921    unsafe fn from_fd(fd: c_int) -> OsIpcSharedMemory {
922        let store = BackingStore::from_fd(fd);
923        let (ptr, length) = store.map_file(None);
924        OsIpcSharedMemory::from_raw_parts(ptr, length, store)
925    }
926
927    pub fn from_byte(byte: u8, length: usize) -> OsIpcSharedMemory {
928        unsafe {
929            let store = BackingStore::new(length);
930            let (address, _) = store.map_file(Some(length));
931            for element in slice::from_raw_parts_mut(address, length) {
932                *element = byte;
933            }
934            OsIpcSharedMemory::from_raw_parts(address, length, store)
935        }
936    }
937
938    pub fn from_bytes(bytes: &[u8]) -> OsIpcSharedMemory {
939        unsafe {
940            let store = BackingStore::new(bytes.len());
941            let (address, _) = store.map_file(Some(bytes.len()));
942            ptr::copy_nonoverlapping(bytes.as_ptr(), address, bytes.len());
943            OsIpcSharedMemory::from_raw_parts(address, bytes.len(), store)
944        }
945    }
946}
947
948#[derive(Debug, Error)]
949pub enum UnixError {
950    Errno(c_int),
951    ChannelClosed,
952    IoError(io::Error),
953}
954
955impl UnixError {
956    fn last() -> UnixError {
957        UnixError::Errno(io::Error::last_os_error().raw_os_error().unwrap())
958    }
959
960    #[allow(dead_code)]
961    pub fn channel_is_closed(&self) -> bool {
962        matches!(self, UnixError::ChannelClosed)
963    }
964}
965
966impl From<UnixError> for crate::IpcError {
967    fn from(error: UnixError) -> Self {
968        match error {
969            UnixError::ChannelClosed => crate::IpcError::Disconnected,
970            e => crate::IpcError::Io(io::Error::from(e)),
971        }
972    }
973}
974
975impl fmt::Display for UnixError {
976    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
977        match self {
978            UnixError::Errno(errno) => {
979                fmt::Display::fmt(&io::Error::from_raw_os_error(*errno), fmt)
980            },
981            UnixError::ChannelClosed => write!(fmt, "All senders for this socket closed"),
982            UnixError::IoError(e) => write!(fmt, "{e}"),
983        }
984    }
985}
986
987impl From<UnixError> for io::Error {
988    fn from(unix_error: UnixError) -> io::Error {
989        match unix_error {
990            UnixError::Errno(errno) => io::Error::from_raw_os_error(errno),
991            UnixError::ChannelClosed => io::Error::new(io::ErrorKind::ConnectionReset, unix_error),
992            UnixError::IoError(e) => e,
993        }
994    }
995}
996
997impl From<UnixError> for crate::TryRecvError {
998    fn from(error: UnixError) -> Self {
999        match error {
1000            UnixError::ChannelClosed => {
1001                crate::TryRecvError::IpcError(crate::IpcError::Disconnected)
1002            },
1003            UnixError::Errno(code) if code == EAGAIN || code == EWOULDBLOCK => {
1004                crate::TryRecvError::Empty
1005            },
1006            e => crate::TryRecvError::IpcError(crate::IpcError::Io(io::Error::from(e))),
1007        }
1008    }
1009}
1010
1011impl From<io::Error> for UnixError {
1012    fn from(e: io::Error) -> UnixError {
1013        if let Some(errno) = e.raw_os_error() {
1014            UnixError::Errno(errno)
1015        } else {
1016            assert!(e.kind() == io::ErrorKind::ConnectionReset);
1017            UnixError::ChannelClosed
1018        }
1019    }
1020}
1021
1022#[derive(Copy, Clone)]
1023enum BlockingMode {
1024    Blocking,
1025    Nonblocking,
1026    Timeout(Duration),
1027}
1028
1029#[allow(clippy::uninit_vec, clippy::type_complexity)]
1030fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result<IpcMessage, UnixError> {
1031    let (mut channels, mut shared_memory_regions) = (Vec::new(), Vec::new());
1032
1033    // First fragments begins with a header recording the total data length.
1034    //
1035    // We use this to determine whether we already got the entire message,
1036    // or need to receive additional fragments -- and if so, how much.
1037    let mut total_size = 0usize;
1038    let mut main_data_buffer;
1039    unsafe {
1040        // Allocate a buffer without initialising the memory.
1041        main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size());
1042        main_data_buffer.set_len(OsIpcSender::get_max_fragment_size());
1043
1044        let mut iovec = [
1045            iovec {
1046                iov_base: &mut total_size as *mut _ as *mut c_void,
1047                iov_len: mem::size_of_val(&total_size),
1048            },
1049            iovec {
1050                iov_base: main_data_buffer.as_mut_ptr() as *mut c_void,
1051                iov_len: main_data_buffer.len(),
1052            },
1053        ];
1054        let mut cmsg = UnixCmsg::new(&mut iovec)?;
1055
1056        let bytes_read = cmsg.recv(fd, blocking_mode)?;
1057        main_data_buffer.set_len(bytes_read - mem::size_of_val(&total_size));
1058
1059        let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int;
1060        let cmsg_length = cmsg.msghdr.msg_controllen;
1061        let channel_length = if cmsg_length == 0 {
1062            0
1063        } else {
1064            // The control header is followed by an array of FDs. The size of the control header is
1065            // determined by CMSG_SPACE. (On Linux this would the same as CMSG_ALIGN, but that isn't
1066            // exposed by libc. CMSG_SPACE(0) is the portable version of that.)
1067            (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::<c_int>()
1068        };
1069        for index in 0..channel_length {
1070            let fd = *cmsg_fds.add(index);
1071            if is_socket(fd) {
1072                channels.push(OsOpaqueIpcChannel::from_fd(fd));
1073                continue;
1074            }
1075            shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd));
1076        }
1077    }
1078
1079    if total_size == main_data_buffer.len() {
1080        // Fast path: no fragments.
1081        return Ok(IpcMessage::new(
1082            main_data_buffer,
1083            channels,
1084            shared_memory_regions,
1085        ));
1086    }
1087
1088    // Reassemble fragments.
1089    //
1090    // The initial fragment carries the receive end of a dedicated channel
1091    // through which all the remaining fragments will be coming in.
1092    let dedicated_rx = channels.pop().unwrap().to_receiver();
1093
1094    // Extend the buffer to hold the entire message, without initialising the memory.
1095    let len = main_data_buffer.len();
1096    main_data_buffer.reserve_exact(total_size - len);
1097
1098    // Receive followup fragments directly into the main buffer.
1099    while main_data_buffer.len() < total_size {
1100        let write_pos = main_data_buffer.len();
1101        let end_pos = cmp::min(
1102            write_pos + OsIpcSender::fragment_size(*SYSTEM_SENDBUF_SIZE),
1103            total_size,
1104        );
1105        let result = unsafe {
1106            assert!(end_pos <= main_data_buffer.capacity());
1107            main_data_buffer.set_len(end_pos);
1108
1109            // Integer underflow could make the following code unsound...
1110            assert!(end_pos >= write_pos);
1111
1112            // Note: we always use blocking mode for followup fragments,
1113            // to make sure that once we start receiving a multi-fragment message,
1114            // we don't abort in the middle of it...
1115            let result = libc::recv(
1116                dedicated_rx.fd.get(),
1117                main_data_buffer[write_pos..].as_mut_ptr() as *mut c_void,
1118                end_pos - write_pos,
1119                0,
1120            );
1121            main_data_buffer.set_len(write_pos + cmp::max(result, 0) as usize);
1122            result
1123        };
1124
1125        match result.cmp(&0) {
1126            cmp::Ordering::Greater => continue,
1127            cmp::Ordering::Equal => return Err(UnixError::ChannelClosed),
1128            cmp::Ordering::Less => return Err(UnixError::last()),
1129        }
1130    }
1131
1132    Ok(IpcMessage::new(
1133        main_data_buffer,
1134        channels,
1135        shared_memory_regions,
1136    ))
1137}
1138
1139// https://github.com/servo/ipc-channel/issues/192
1140fn new_msghdr(iovec: &mut [iovec], cmsg_buffer: *mut cmsghdr, cmsg_space: MsgControlLen) -> msghdr {
1141    let mut msghdr: msghdr = unsafe { mem::zeroed() };
1142    msghdr.msg_name = ptr::null_mut();
1143    msghdr.msg_namelen = 0;
1144    msghdr.msg_iov = iovec.as_mut_ptr();
1145    msghdr.msg_iovlen = iovec.len() as IovLen;
1146    msghdr.msg_control = cmsg_buffer as *mut c_void;
1147    msghdr.msg_controllen = cmsg_space;
1148    msghdr.msg_flags = 0;
1149    msghdr
1150}
1151
1152fn create_shmem(name: CString, length: usize) -> c_int {
1153    unsafe {
1154        let fd = libc::memfd_create(name.as_ptr(), libc::MFD_CLOEXEC);
1155        assert!(fd >= 0);
1156        assert_eq!(libc::ftruncate(fd, length as off_t), 0);
1157        fd
1158    }
1159}
1160
1161struct UnixCmsg {
1162    cmsg_buffer: *mut cmsghdr,
1163    msghdr: msghdr,
1164}
1165
1166unsafe impl Send for UnixCmsg {}
1167
1168impl Drop for UnixCmsg {
1169    fn drop(&mut self) {
1170        unsafe {
1171            libc::free(self.cmsg_buffer as *mut c_void);
1172        }
1173    }
1174}
1175
1176impl UnixCmsg {
1177    unsafe fn new(iovec: &mut [iovec]) -> Result<UnixCmsg, UnixError> {
1178        let cmsg_length = CMSG_SPACE(MAX_FDS_IN_CMSG * (mem::size_of::<c_int>() as c_uint));
1179        let cmsg_buffer = libc::malloc(cmsg_length as usize) as *mut cmsghdr;
1180        if cmsg_buffer.is_null() {
1181            return Err(UnixError::last());
1182        }
1183        Ok(UnixCmsg {
1184            cmsg_buffer,
1185            msghdr: new_msghdr(iovec, cmsg_buffer, cmsg_length as MsgControlLen),
1186        })
1187    }
1188
1189    unsafe fn recv(&mut self, fd: c_int, blocking_mode: BlockingMode) -> Result<usize, UnixError> {
1190        match blocking_mode {
1191            BlockingMode::Nonblocking => {
1192                if libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) < 0 {
1193                    return Err(UnixError::last());
1194                }
1195            },
1196            BlockingMode::Timeout(duration) => {
1197                let events = libc::POLLIN | libc::POLLPRI | libc::POLLRDHUP;
1198
1199                let mut fd = [libc::pollfd {
1200                    fd,
1201                    events,
1202                    revents: 0,
1203                }];
1204                let result = libc::poll(
1205                    fd.as_mut_ptr(),
1206                    fd.len() as _,
1207                    duration.as_millis().try_into().unwrap_or(-1),
1208                );
1209
1210                match result.cmp(&0) {
1211                    cmp::Ordering::Equal => return Err(UnixError::Errno(EAGAIN)),
1212                    cmp::Ordering::Less => return Err(UnixError::last()),
1213                    cmp::Ordering::Greater => {},
1214                }
1215            },
1216            BlockingMode::Blocking => {},
1217        }
1218
1219        let result = recvmsg(fd, &mut self.msghdr, RECVMSG_FLAGS);
1220
1221        let result = match result.cmp(&0) {
1222            cmp::Ordering::Equal => Err(UnixError::ChannelClosed),
1223            cmp::Ordering::Less => Err(UnixError::last()),
1224            cmp::Ordering::Greater => Ok(result as usize),
1225        };
1226
1227        if let BlockingMode::Nonblocking = blocking_mode {
1228            if libc::fcntl(fd, libc::F_SETFL, 0) < 0 {
1229                return Err(UnixError::last());
1230            }
1231        }
1232        result
1233    }
1234
1235    unsafe fn cmsg_len(&self) -> size_t {
1236        (*(self.msghdr.msg_control as *const cmsghdr)).cmsg_len as size_t
1237    }
1238}
1239
1240fn is_socket(fd: c_int) -> bool {
1241    unsafe {
1242        let mut st = mem::MaybeUninit::uninit();
1243        if libc::fstat(fd, st.as_mut_ptr()) != 0 {
1244            return false;
1245        }
1246        (st.assume_init().st_mode & S_IFMT) == S_IFSOCK
1247    }
1248}