1use crate::ipc::IpcMessage;
11use libc::{
12 self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, PROT_READ,
13 PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET,
14};
15use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK};
16use libc::{iovec, msghdr, off_t, recvmsg, sendmsg};
17use libc::{sa_family_t, setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t};
18use libc::{EAGAIN, EWOULDBLOCK};
19use mio::unix::SourceFd;
20use mio::{Events, Interest, Poll, Token};
21use rustc_hash::FxHasher;
22use std::cell::Cell;
23use std::cmp;
24use std::collections::HashMap;
25use std::convert::TryInto;
26use std::ffi::{c_uint, CString};
27use std::fmt::{self, Debug, Formatter};
28use std::hash::BuildHasherDefault;
29use std::io;
30use std::mem;
31use std::ops::{Deref, RangeFrom};
32use std::os::fd::RawFd;
33use std::ptr;
34use std::slice;
35use std::sync::atomic::{AtomicUsize, Ordering};
36use std::sync::{Arc, LazyLock};
37use std::thread;
38use std::time::{Duration, UNIX_EPOCH};
39use tempfile::{Builder, TempDir};
40use thiserror::Error;
41
42const MAX_FDS_IN_CMSG: u32 = 64;
43
44const RESERVED_SIZE: usize = 32;
48
49#[cfg(any(target_os = "linux", target_os = "illumos"))]
50const SOCK_FLAGS: c_int = libc::SOCK_CLOEXEC;
51#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
52const SOCK_FLAGS: c_int = 0;
53
54#[cfg(any(target_os = "linux", target_os = "illumos"))]
55const RECVMSG_FLAGS: c_int = libc::MSG_CMSG_CLOEXEC;
56#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
57const RECVMSG_FLAGS: c_int = 0;
58
59#[cfg(target_env = "gnu")]
60type IovLen = usize;
61
62#[cfg(not(target_env = "gnu"))]
63type IovLen = i32;
64#[cfg(target_env = "gnu")]
65type MsgControlLen = size_t;
66#[cfg(not(target_env = "gnu"))]
67type MsgControlLen = socklen_t;
68
69#[derive(Debug, Error)]
70pub enum OsTrySelectError {
71 #[error("Error in IO: {0}.")]
72 IoError(#[from] UnixError),
73 #[error("No messages were received and no disconnections occurred.")]
74 Empty,
75}
76
77unsafe fn new_sockaddr_un(path: *const c_char) -> (sockaddr_un, usize) {
78 let mut sockaddr: sockaddr_un = mem::zeroed();
79 libc::strncpy(
80 sockaddr.sun_path.as_mut_ptr(),
81 path,
82 sockaddr.sun_path.len() - 1,
83 );
84 sockaddr.sun_family = libc::AF_UNIX as sa_family_t;
85 (sockaddr, mem::size_of::<sockaddr_un>())
86}
87
88static SYSTEM_SENDBUF_SIZE: LazyLock<usize> = LazyLock::new(|| {
89 let (tx, _) = channel().expect("Failed to obtain a socket for checking maximum send size");
90 tx.get_system_sendbuf_size()
91 .expect("Failed to obtain maximum send size for socket")
92});
93
94static PID: LazyLock<u32> = LazyLock::new(std::process::id);
96
97static SHM_COUNT: AtomicUsize = AtomicUsize::new(0);
99
100pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), UnixError> {
101 let mut results = [0, 0];
102 unsafe {
103 if socketpair(
104 libc::AF_UNIX,
105 SOCK_SEQPACKET | SOCK_FLAGS,
106 0,
107 &mut results[0],
108 ) >= 0
109 {
110 Ok((
111 OsIpcSender::from_fd(results[0]),
112 OsIpcReceiver::from_fd(results[1]),
113 ))
114 } else {
115 Err(UnixError::last())
116 }
117 }
118}
119
120#[derive(Clone, Copy)]
121struct PollEntry {
122 pub id: u64,
123 pub fd: RawFd,
124}
125
126#[derive(Debug)]
127pub struct OsIpcReceiver {
128 fd: Cell<c_int>,
129}
130
131impl Drop for OsIpcReceiver {
132 fn drop(&mut self) {
133 unsafe {
134 let fd = self.fd.get();
135 if fd >= 0 {
136 let result = libc::close(fd);
137 assert!(
138 thread::panicking() || result == 0,
139 "closed receiver (fd: {}): {}",
140 fd,
141 UnixError::last(),
142 );
143 }
144 }
145 }
146}
147
148impl OsIpcReceiver {
149 fn from_fd(fd: c_int) -> OsIpcReceiver {
150 OsIpcReceiver { fd: Cell::new(fd) }
151 }
152
153 fn consume_fd(&self) -> c_int {
154 let fd = self.fd.get();
155 self.fd.set(-1);
156 fd
157 }
158
159 pub fn consume(&self) -> OsIpcReceiver {
160 OsIpcReceiver::from_fd(self.consume_fd())
161 }
162
163 #[allow(clippy::type_complexity)]
164 pub fn recv(&self) -> Result<IpcMessage, UnixError> {
165 recv(self.fd.get(), BlockingMode::Blocking)
166 }
167
168 #[allow(clippy::type_complexity)]
169 pub fn try_recv(&self) -> Result<IpcMessage, UnixError> {
170 recv(self.fd.get(), BlockingMode::Nonblocking)
171 }
172
173 #[allow(clippy::type_complexity)]
174 pub fn try_recv_timeout(&self, duration: Duration) -> Result<IpcMessage, UnixError> {
175 recv(self.fd.get(), BlockingMode::Timeout(duration))
176 }
177}
178
179#[derive(PartialEq, Debug)]
180struct SharedFileDescriptor(c_int);
181
182impl Drop for SharedFileDescriptor {
183 fn drop(&mut self) {
184 unsafe {
185 let result = libc::close(self.0);
186 assert!(thread::panicking() || result == 0);
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
192pub struct OsIpcSender {
193 fd: Arc<SharedFileDescriptor>,
194}
195
196impl OsIpcSender {
197 fn from_fd(fd: c_int) -> OsIpcSender {
198 OsIpcSender {
199 fd: Arc::new(SharedFileDescriptor(fd)),
200 }
201 }
202
203 fn get_system_sendbuf_size(&self) -> Result<usize, UnixError> {
208 unsafe {
209 let mut socket_sendbuf_size: c_int = 0;
210 let mut socket_sendbuf_size_len = mem::size_of::<c_int>() as socklen_t;
211 if getsockopt(
212 self.fd.0,
213 libc::SOL_SOCKET,
214 libc::SO_SNDBUF,
215 &mut socket_sendbuf_size as *mut _ as *mut c_void,
216 &mut socket_sendbuf_size_len as *mut socklen_t,
217 ) < 0
218 {
219 return Err(UnixError::last());
220 }
221 Ok(socket_sendbuf_size.try_into().unwrap())
222 }
223 }
224
225 fn fragment_size(sendbuf_size: usize) -> usize {
233 sendbuf_size - RESERVED_SIZE
234 }
235
236 fn first_fragment_size(sendbuf_size: usize) -> usize {
240 (Self::fragment_size(sendbuf_size) - mem::size_of::<usize>()) & (!8usize + 1)
241 }
243
244 pub fn get_max_fragment_size() -> usize {
254 Self::first_fragment_size(*SYSTEM_SENDBUF_SIZE)
255 }
256
257 pub fn send(
258 &self,
259 data: &[u8],
260 channels: Vec<OsIpcChannel>,
261 shared_memory_regions: Vec<OsIpcSharedMemory>,
262 ) -> Result<(), UnixError> {
263 let mut fds = Vec::new();
264 for channel in channels.iter() {
265 fds.push(channel.fd());
266 }
267 for shared_memory_region in shared_memory_regions.iter() {
268 fds.push(shared_memory_region.store.fd());
269 }
270
271 fn send_first_fragment(
278 sender_fd: c_int,
279 fds: &[c_int],
280 data_buffer: &[u8],
281 len: usize,
282 ) -> Result<(), UnixError> {
283 let result = unsafe {
284 let cmsg_length = mem::size_of_val(fds) as c_uint;
285 let (cmsg_buffer, cmsg_space) = if cmsg_length > 0 {
286 let cmsg_buffer =
287 libc::malloc(CMSG_SPACE(cmsg_length) as usize) as *mut cmsghdr;
288 if cmsg_buffer.is_null() {
289 return Err(UnixError::last());
290 }
291 (*cmsg_buffer).cmsg_len = CMSG_LEN(cmsg_length) as MsgControlLen;
292 (*cmsg_buffer).cmsg_level = libc::SOL_SOCKET;
293 (*cmsg_buffer).cmsg_type = libc::SCM_RIGHTS;
294
295 ptr::copy_nonoverlapping(
296 fds.as_ptr(),
297 CMSG_DATA(cmsg_buffer) as *mut c_int,
298 fds.len(),
299 );
300 (cmsg_buffer, CMSG_SPACE(cmsg_length))
301 } else {
302 (ptr::null_mut(), 0)
303 };
304
305 let mut iovec = [
306 iovec {
312 iov_base: &len as *const _ as *mut c_void,
313 iov_len: mem::size_of_val(&len),
314 },
315 iovec {
316 iov_base: data_buffer.as_ptr() as *mut c_void,
317 iov_len: data_buffer.len(),
318 },
319 ];
320
321 let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen);
322 let result = sendmsg(sender_fd, &msghdr, 0);
323 libc::free(cmsg_buffer as *mut c_void);
324 result
325 };
326
327 if result > 0 {
328 Ok(())
329 } else {
330 Err(UnixError::last())
331 }
332 }
333
334 fn send_followup_fragment(sender_fd: c_int, data_buffer: &[u8]) -> Result<(), UnixError> {
335 let result = unsafe {
336 libc::send(
337 sender_fd,
338 data_buffer.as_ptr() as *const c_void,
339 data_buffer.len(),
340 0,
341 )
342 };
343
344 if result > 0 {
345 Ok(())
346 } else {
347 Err(UnixError::last())
348 }
349 }
350
351 let mut sendbuf_size = *SYSTEM_SENDBUF_SIZE;
352
353 fn downsize(sendbuf_size: &mut usize, sent_size: usize) -> Result<(), ()> {
362 if sent_size > 2000 {
363 *sendbuf_size /= 2;
364 if *sendbuf_size >= sent_size {
366 *sendbuf_size = sent_size / 2;
367 }
368 Ok(())
369 } else {
370 Err(())
371 }
372 }
373
374 if data.len() <= Self::get_max_fragment_size() {
376 match send_first_fragment(self.fd.0, &fds[..], data, data.len()) {
377 Ok(_) => return Ok(()),
378 Err(error) => {
379 if !(matches!(error, UnixError::Errno(libc::ENOBUFS))
387 && downsize(&mut sendbuf_size, data.len()).is_ok())
388 {
389 return Err(error);
390 }
391 },
392 }
393 }
394
395 let (dedicated_tx, dedicated_rx) = channel()?;
403 fds.push(dedicated_rx.fd.get());
405
406 let mut byte_position = 0;
408 while byte_position < data.len() {
409 let end_byte_position;
410 let result = if byte_position == 0 {
411 end_byte_position = Self::first_fragment_size(sendbuf_size);
416 send_first_fragment(self.fd.0, &fds[..], &data[..end_byte_position], data.len())
417 } else {
418 end_byte_position = cmp::min(
421 byte_position + Self::fragment_size(sendbuf_size),
422 data.len(),
423 );
424 send_followup_fragment(dedicated_tx.fd.0, &data[byte_position..end_byte_position])
425 };
426
427 if let Err(error) = result {
428 if matches!(error, UnixError::Errno(libc::ENOBUFS))
429 && downsize(&mut sendbuf_size, end_byte_position - byte_position).is_ok()
430 {
431 continue;
434 } else {
435 return Err(error);
436 }
437 }
438
439 byte_position = end_byte_position;
440 }
441
442 Ok(())
443 }
444
445 pub fn connect(name: String) -> Result<OsIpcSender, UnixError> {
446 let name = CString::new(name).unwrap();
447 unsafe {
448 let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
449 let (sockaddr, len) = new_sockaddr_un(name.as_ptr());
450 if libc::connect(
451 fd,
452 &sockaddr as *const _ as *const sockaddr,
453 len as socklen_t,
454 ) < 0
455 {
456 return Err(UnixError::last());
457 }
458
459 Ok(OsIpcSender::from_fd(fd))
460 }
461 }
462}
463
464#[derive(Debug)]
465pub enum OsIpcChannel {
466 Sender(OsIpcSender),
467 Receiver(OsIpcReceiver),
468}
469
470impl OsIpcChannel {
471 fn fd(&self) -> c_int {
472 match *self {
473 OsIpcChannel::Sender(ref sender) => sender.fd.0,
474 OsIpcChannel::Receiver(ref receiver) => receiver.fd.get(),
475 }
476 }
477}
478
479pub struct OsIpcReceiverSet {
480 incrementor: RangeFrom<u64>,
481 poll: Poll,
482 pollfds: HashMap<Token, PollEntry, BuildHasherDefault<FxHasher>>,
483 events: Events,
484}
485
486impl Drop for OsIpcReceiverSet {
487 fn drop(&mut self) {
488 for &PollEntry { id: _, fd } in self.pollfds.values() {
489 let result = unsafe { libc::close(fd) };
490 assert!(thread::panicking() || result == 0);
491 }
492 }
493}
494
495impl OsIpcReceiverSet {
496 pub fn new() -> Result<OsIpcReceiverSet, UnixError> {
497 let fx = BuildHasherDefault::<FxHasher>::default();
498 Ok(OsIpcReceiverSet {
499 incrementor: 0..,
500 poll: Poll::new()?,
501 pollfds: HashMap::with_hasher(fx),
502 events: Events::with_capacity(10),
503 })
504 }
505
506 pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64, UnixError> {
507 let last_index = self.incrementor.next().unwrap();
508 let fd = receiver.consume_fd();
509 let fd_token = Token(fd as usize);
510 let poll_entry = PollEntry { id: last_index, fd };
511 self.poll
512 .registry()
513 .register(&mut SourceFd(&fd), fd_token, Interest::READABLE)?;
514 self.pollfds.insert(fd_token, poll_entry);
515 Ok(last_index)
516 }
517
518 pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>, UnixError> {
519 loop {
521 match self.poll.poll(&mut self.events, None) {
522 Ok(()) if !self.events.is_empty() => break,
523 Ok(()) => {},
524 Err(ref error) => {
525 if error.kind() != io::ErrorKind::Interrupted {
526 return Err(UnixError::last());
527 }
528 },
529 }
530
531 if !self.events.is_empty() {
532 break;
533 }
534 }
535
536 match self.selection_results() {
537 Ok(v) => Ok(v),
538 Err(OsTrySelectError::Empty) => panic!("Blocking select cannot return Empty"),
539 Err(OsTrySelectError::IoError(e)) => Err(e),
540 }
541 }
542
543 pub fn try_select(&mut self) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
544 self.try_select_timeout(Duration::ZERO)
546 }
547
548 pub fn try_select_timeout(
549 &mut self,
550 duration: Duration,
551 ) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
552 match self.poll.poll(&mut self.events, Some(duration)) {
554 Ok(()) => (),
555 Err(ref error) => {
556 if error.kind() != io::ErrorKind::Interrupted {
557 return Err(OsTrySelectError::IoError(UnixError::last()));
558 }
559 },
560 }
561
562 let v = self.selection_results()?;
563 if v.is_empty() {
564 Err(OsTrySelectError::Empty)
565 } else {
566 Ok(v)
567 }
568 }
569
570 fn selection_results(&mut self) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
571 let mut selection_results = Vec::new();
572 for event in self.events.iter() {
573 assert!(event.is_readable());
575
576 let event_token = event.token();
577 let poll_entry = *self
578 .pollfds
579 .get(&event_token)
580 .expect("Got event for unknown token.");
581 loop {
582 match recv(poll_entry.fd, BlockingMode::Nonblocking) {
583 Ok(ipc_message) => {
584 selection_results.push(OsIpcSelectionResult::DataReceived(
585 poll_entry.id,
586 ipc_message,
587 ));
588 },
589 Err(err) if err.channel_is_closed() => {
590 self.pollfds.remove(&event_token).unwrap();
591 self.poll
592 .registry()
593 .deregister(&mut SourceFd(&poll_entry.fd))
594 .unwrap();
595 unsafe {
596 libc::close(poll_entry.fd);
597 }
598
599 selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id));
600 break;
601 },
602 Err(UnixError::Errno(code)) if code == EWOULDBLOCK => {
603 break;
607 },
608 Err(e) => return Err(OsTrySelectError::IoError(e)),
609 }
610 }
611 }
612
613 Ok(selection_results)
614 }
615}
616
617pub enum OsIpcSelectionResult {
618 DataReceived(u64, IpcMessage),
619 ChannelClosed(u64),
620}
621
622impl OsIpcSelectionResult {
623 pub fn unwrap(self) -> (u64, IpcMessage) {
624 match self {
625 OsIpcSelectionResult::DataReceived(id, ipc_message) => (id, ipc_message),
626 OsIpcSelectionResult::ChannelClosed(id) => {
627 panic!("OsIpcSelectionResult::unwrap(): receiver ID {id} was closed!")
628 },
629 }
630 }
631}
632
633#[derive(Debug)]
634pub struct OsOpaqueIpcChannel {
635 fd: c_int,
636}
637
638impl Drop for OsOpaqueIpcChannel {
639 fn drop(&mut self) {
640 debug_assert!(self.fd == -1);
646 }
647}
648
649impl OsOpaqueIpcChannel {
650 fn from_fd(fd: c_int) -> OsOpaqueIpcChannel {
651 OsOpaqueIpcChannel { fd }
652 }
653
654 pub fn to_sender(&mut self) -> OsIpcSender {
655 OsIpcSender::from_fd(mem::replace(&mut self.fd, -1))
656 }
657
658 pub fn to_receiver(&mut self) -> OsIpcReceiver {
659 OsIpcReceiver::from_fd(mem::replace(&mut self.fd, -1))
660 }
661}
662
663pub struct OsIpcOneShotServer {
664 fd: c_int,
665
666 _temp_dir: TempDir,
670}
671
672impl Drop for OsIpcOneShotServer {
673 fn drop(&mut self) {
674 unsafe {
675 let result = libc::close(self.fd);
676 assert!(thread::panicking() || result == 0);
677 }
678 }
679}
680
681impl OsIpcOneShotServer {
682 pub fn new() -> Result<(OsIpcOneShotServer, String), UnixError> {
683 unsafe {
684 let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
685 let temp_dir = Builder::new().tempdir()?;
686 let socket_path = temp_dir.path().join("socket");
687 let path_string = socket_path.to_str().unwrap();
688
689 let path_c_string = CString::new(path_string).unwrap();
690 let (sockaddr, len) = new_sockaddr_un(path_c_string.as_ptr());
691 if libc::bind(
692 fd,
693 &sockaddr as *const _ as *const sockaddr,
694 len as socklen_t,
695 ) != 0
696 {
697 return Err(UnixError::last());
698 }
699
700 if libc::listen(fd, 10) != 0 {
701 return Err(UnixError::last());
702 }
703
704 Ok((
705 OsIpcOneShotServer {
706 fd,
707 _temp_dir: temp_dir,
708 },
709 path_string.to_string(),
710 ))
711 }
712 }
713
714 #[allow(clippy::type_complexity)]
715 pub fn accept(self) -> Result<(OsIpcReceiver, IpcMessage), UnixError> {
716 unsafe {
717 let sockaddr: *mut sockaddr = ptr::null_mut();
718 let sockaddr_len: *mut socklen_t = ptr::null_mut();
719 let client_fd = libc::accept(self.fd, sockaddr, sockaddr_len);
720 if client_fd < 0 {
721 return Err(UnixError::last());
722 }
723 make_socket_lingering(client_fd)?;
724
725 let receiver = OsIpcReceiver::from_fd(client_fd);
726 let ipc_message = receiver.recv()?;
727 Ok((receiver, ipc_message))
728 }
729 }
730}
731
732fn make_socket_lingering(sockfd: c_int) -> Result<(), UnixError> {
737 let linger = linger {
738 l_onoff: 1,
739 l_linger: 30,
740 };
741 let err = unsafe {
742 setsockopt(
743 sockfd,
744 SOL_SOCKET,
745 SO_LINGER,
746 &linger as *const _ as *const c_void,
747 mem::size_of::<linger>() as socklen_t,
748 )
749 };
750 if err < 0 {
751 let error = UnixError::last();
752 if let UnixError::Errno(libc::EINVAL) = error {
753 } else {
776 return Err(error);
777 }
778 }
779 Ok(())
780}
781
782struct BackingStore {
783 fd: c_int,
784}
785
786impl BackingStore {
787 pub fn new(length: usize) -> BackingStore {
788 let count = SHM_COUNT.fetch_add(1, Ordering::Relaxed);
789 let timestamp = UNIX_EPOCH.elapsed().unwrap();
790 let name = CString::new(format!(
791 "/ipc-channel-shared-memory.{}.{}.{}.{}",
792 count,
793 *PID,
794 timestamp.as_secs(),
795 timestamp.subsec_nanos()
796 ))
797 .unwrap();
798 let fd = create_shmem(name, length);
799 Self::from_fd(fd)
800 }
801
802 pub fn from_fd(fd: c_int) -> BackingStore {
803 BackingStore { fd }
804 }
805
806 pub fn fd(&self) -> c_int {
807 self.fd
808 }
809
810 pub unsafe fn map_file(&self, length: Option<size_t>) -> (*mut u8, size_t) {
811 let length = length.unwrap_or_else(|| {
812 let mut st = mem::MaybeUninit::uninit();
813 if libc::fstat(self.fd, st.as_mut_ptr()) != 0 {
814 panic!("error stating fd {}: {}", self.fd, UnixError::last());
815 }
816 st.assume_init().st_size as size_t
817 });
818 if length == 0 {
819 return (ptr::null_mut(), length);
821 }
822 let address = libc::mmap(
823 ptr::null_mut(),
824 length,
825 PROT_READ | PROT_WRITE,
826 MAP_SHARED,
827 self.fd,
828 0,
829 );
830 assert!(!address.is_null());
831 assert!(address != MAP_FAILED);
832 (address as *mut u8, length)
833 }
834}
835
836impl Drop for BackingStore {
837 fn drop(&mut self) {
838 unsafe {
839 let result = libc::close(self.fd);
840 assert!(thread::panicking() || result == 0);
841 }
842 }
843}
844
845pub struct OsIpcSharedMemory {
846 ptr: *mut u8,
847 length: usize,
848 store: BackingStore,
849}
850
851unsafe impl Send for OsIpcSharedMemory {}
852unsafe impl Sync for OsIpcSharedMemory {}
853
854impl Drop for OsIpcSharedMemory {
855 fn drop(&mut self) {
856 unsafe {
857 if !self.ptr.is_null() {
858 let result = libc::munmap(self.ptr as *mut c_void, self.length);
859 assert!(thread::panicking() || result == 0);
860 }
861 }
862 }
863}
864
865impl Clone for OsIpcSharedMemory {
866 fn clone(&self) -> OsIpcSharedMemory {
867 unsafe {
868 let store = BackingStore::from_fd(libc::dup(self.store.fd()));
869 let (address, _) = store.map_file(Some(self.length));
870 OsIpcSharedMemory::from_raw_parts(address, self.length, store)
871 }
872 }
873}
874
875impl PartialEq for OsIpcSharedMemory {
876 fn eq(&self, other: &OsIpcSharedMemory) -> bool {
877 **self == **other
878 }
879}
880
881impl Debug for OsIpcSharedMemory {
882 fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
883 (**self).fmt(formatter)
884 }
885}
886
887impl Deref for OsIpcSharedMemory {
888 type Target = [u8];
889
890 #[inline]
891 fn deref(&self) -> &[u8] {
892 unsafe { slice::from_raw_parts(self.ptr, self.length) }
893 }
894}
895
896impl OsIpcSharedMemory {
897 #[inline]
903 pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
904 unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
905 }
906
907 pub fn take(self) -> Option<Vec<u8>> {
908 Some((*self).to_vec())
909 }
910}
911
912impl OsIpcSharedMemory {
913 unsafe fn from_raw_parts(
914 ptr: *mut u8,
915 length: usize,
916 store: BackingStore,
917 ) -> OsIpcSharedMemory {
918 OsIpcSharedMemory { ptr, length, store }
919 }
920
921 unsafe fn from_fd(fd: c_int) -> OsIpcSharedMemory {
922 let store = BackingStore::from_fd(fd);
923 let (ptr, length) = store.map_file(None);
924 OsIpcSharedMemory::from_raw_parts(ptr, length, store)
925 }
926
927 pub fn from_byte(byte: u8, length: usize) -> OsIpcSharedMemory {
928 unsafe {
929 let store = BackingStore::new(length);
930 let (address, _) = store.map_file(Some(length));
931 for element in slice::from_raw_parts_mut(address, length) {
932 *element = byte;
933 }
934 OsIpcSharedMemory::from_raw_parts(address, length, store)
935 }
936 }
937
938 pub fn from_bytes(bytes: &[u8]) -> OsIpcSharedMemory {
939 unsafe {
940 let store = BackingStore::new(bytes.len());
941 let (address, _) = store.map_file(Some(bytes.len()));
942 ptr::copy_nonoverlapping(bytes.as_ptr(), address, bytes.len());
943 OsIpcSharedMemory::from_raw_parts(address, bytes.len(), store)
944 }
945 }
946}
947
948#[derive(Debug, Error)]
949pub enum UnixError {
950 Errno(c_int),
951 ChannelClosed,
952 IoError(io::Error),
953}
954
955impl UnixError {
956 fn last() -> UnixError {
957 UnixError::Errno(io::Error::last_os_error().raw_os_error().unwrap())
958 }
959
960 #[allow(dead_code)]
961 pub fn channel_is_closed(&self) -> bool {
962 matches!(self, UnixError::ChannelClosed)
963 }
964}
965
966impl From<UnixError> for crate::IpcError {
967 fn from(error: UnixError) -> Self {
968 match error {
969 UnixError::ChannelClosed => crate::IpcError::Disconnected,
970 e => crate::IpcError::Io(io::Error::from(e)),
971 }
972 }
973}
974
975impl fmt::Display for UnixError {
976 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
977 match self {
978 UnixError::Errno(errno) => {
979 fmt::Display::fmt(&io::Error::from_raw_os_error(*errno), fmt)
980 },
981 UnixError::ChannelClosed => write!(fmt, "All senders for this socket closed"),
982 UnixError::IoError(e) => write!(fmt, "{e}"),
983 }
984 }
985}
986
987impl From<UnixError> for io::Error {
988 fn from(unix_error: UnixError) -> io::Error {
989 match unix_error {
990 UnixError::Errno(errno) => io::Error::from_raw_os_error(errno),
991 UnixError::ChannelClosed => io::Error::new(io::ErrorKind::ConnectionReset, unix_error),
992 UnixError::IoError(e) => e,
993 }
994 }
995}
996
997impl From<UnixError> for crate::TryRecvError {
998 fn from(error: UnixError) -> Self {
999 match error {
1000 UnixError::ChannelClosed => {
1001 crate::TryRecvError::IpcError(crate::IpcError::Disconnected)
1002 },
1003 UnixError::Errno(code) if code == EAGAIN || code == EWOULDBLOCK => {
1004 crate::TryRecvError::Empty
1005 },
1006 e => crate::TryRecvError::IpcError(crate::IpcError::Io(io::Error::from(e))),
1007 }
1008 }
1009}
1010
1011impl From<io::Error> for UnixError {
1012 fn from(e: io::Error) -> UnixError {
1013 if let Some(errno) = e.raw_os_error() {
1014 UnixError::Errno(errno)
1015 } else {
1016 assert!(e.kind() == io::ErrorKind::ConnectionReset);
1017 UnixError::ChannelClosed
1018 }
1019 }
1020}
1021
1022#[derive(Copy, Clone)]
1023enum BlockingMode {
1024 Blocking,
1025 Nonblocking,
1026 Timeout(Duration),
1027}
1028
1029#[allow(clippy::uninit_vec, clippy::type_complexity)]
1030fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result<IpcMessage, UnixError> {
1031 let (mut channels, mut shared_memory_regions) = (Vec::new(), Vec::new());
1032
1033 let mut total_size = 0usize;
1038 let mut main_data_buffer;
1039 unsafe {
1040 main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size());
1042 main_data_buffer.set_len(OsIpcSender::get_max_fragment_size());
1043
1044 let mut iovec = [
1045 iovec {
1046 iov_base: &mut total_size as *mut _ as *mut c_void,
1047 iov_len: mem::size_of_val(&total_size),
1048 },
1049 iovec {
1050 iov_base: main_data_buffer.as_mut_ptr() as *mut c_void,
1051 iov_len: main_data_buffer.len(),
1052 },
1053 ];
1054 let mut cmsg = UnixCmsg::new(&mut iovec)?;
1055
1056 let bytes_read = cmsg.recv(fd, blocking_mode)?;
1057 main_data_buffer.set_len(bytes_read - mem::size_of_val(&total_size));
1058
1059 let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int;
1060 let cmsg_length = cmsg.msghdr.msg_controllen;
1061 let channel_length = if cmsg_length == 0 {
1062 0
1063 } else {
1064 (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::<c_int>()
1068 };
1069 for index in 0..channel_length {
1070 let fd = *cmsg_fds.add(index);
1071 if is_socket(fd) {
1072 channels.push(OsOpaqueIpcChannel::from_fd(fd));
1073 continue;
1074 }
1075 shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd));
1076 }
1077 }
1078
1079 if total_size == main_data_buffer.len() {
1080 return Ok(IpcMessage::new(
1082 main_data_buffer,
1083 channels,
1084 shared_memory_regions,
1085 ));
1086 }
1087
1088 let dedicated_rx = channels.pop().unwrap().to_receiver();
1093
1094 let len = main_data_buffer.len();
1096 main_data_buffer.reserve_exact(total_size - len);
1097
1098 while main_data_buffer.len() < total_size {
1100 let write_pos = main_data_buffer.len();
1101 let end_pos = cmp::min(
1102 write_pos + OsIpcSender::fragment_size(*SYSTEM_SENDBUF_SIZE),
1103 total_size,
1104 );
1105 let result = unsafe {
1106 assert!(end_pos <= main_data_buffer.capacity());
1107 main_data_buffer.set_len(end_pos);
1108
1109 assert!(end_pos >= write_pos);
1111
1112 let result = libc::recv(
1116 dedicated_rx.fd.get(),
1117 main_data_buffer[write_pos..].as_mut_ptr() as *mut c_void,
1118 end_pos - write_pos,
1119 0,
1120 );
1121 main_data_buffer.set_len(write_pos + cmp::max(result, 0) as usize);
1122 result
1123 };
1124
1125 match result.cmp(&0) {
1126 cmp::Ordering::Greater => continue,
1127 cmp::Ordering::Equal => return Err(UnixError::ChannelClosed),
1128 cmp::Ordering::Less => return Err(UnixError::last()),
1129 }
1130 }
1131
1132 Ok(IpcMessage::new(
1133 main_data_buffer,
1134 channels,
1135 shared_memory_regions,
1136 ))
1137}
1138
1139fn new_msghdr(iovec: &mut [iovec], cmsg_buffer: *mut cmsghdr, cmsg_space: MsgControlLen) -> msghdr {
1141 let mut msghdr: msghdr = unsafe { mem::zeroed() };
1142 msghdr.msg_name = ptr::null_mut();
1143 msghdr.msg_namelen = 0;
1144 msghdr.msg_iov = iovec.as_mut_ptr();
1145 msghdr.msg_iovlen = iovec.len() as IovLen;
1146 msghdr.msg_control = cmsg_buffer as *mut c_void;
1147 msghdr.msg_controllen = cmsg_space;
1148 msghdr.msg_flags = 0;
1149 msghdr
1150}
1151
1152fn create_shmem(name: CString, length: usize) -> c_int {
1153 unsafe {
1154 let fd = libc::memfd_create(name.as_ptr(), libc::MFD_CLOEXEC);
1155 assert!(fd >= 0);
1156 assert_eq!(libc::ftruncate(fd, length as off_t), 0);
1157 fd
1158 }
1159}
1160
1161struct UnixCmsg {
1162 cmsg_buffer: *mut cmsghdr,
1163 msghdr: msghdr,
1164}
1165
1166unsafe impl Send for UnixCmsg {}
1167
1168impl Drop for UnixCmsg {
1169 fn drop(&mut self) {
1170 unsafe {
1171 libc::free(self.cmsg_buffer as *mut c_void);
1172 }
1173 }
1174}
1175
1176impl UnixCmsg {
1177 unsafe fn new(iovec: &mut [iovec]) -> Result<UnixCmsg, UnixError> {
1178 let cmsg_length = CMSG_SPACE(MAX_FDS_IN_CMSG * (mem::size_of::<c_int>() as c_uint));
1179 let cmsg_buffer = libc::malloc(cmsg_length as usize) as *mut cmsghdr;
1180 if cmsg_buffer.is_null() {
1181 return Err(UnixError::last());
1182 }
1183 Ok(UnixCmsg {
1184 cmsg_buffer,
1185 msghdr: new_msghdr(iovec, cmsg_buffer, cmsg_length as MsgControlLen),
1186 })
1187 }
1188
1189 unsafe fn recv(&mut self, fd: c_int, blocking_mode: BlockingMode) -> Result<usize, UnixError> {
1190 match blocking_mode {
1191 BlockingMode::Nonblocking => {
1192 if libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) < 0 {
1193 return Err(UnixError::last());
1194 }
1195 },
1196 BlockingMode::Timeout(duration) => {
1197 let events = libc::POLLIN | libc::POLLPRI | libc::POLLRDHUP;
1198
1199 let mut fd = [libc::pollfd {
1200 fd,
1201 events,
1202 revents: 0,
1203 }];
1204 let result = libc::poll(
1205 fd.as_mut_ptr(),
1206 fd.len() as _,
1207 duration.as_millis().try_into().unwrap_or(-1),
1208 );
1209
1210 match result.cmp(&0) {
1211 cmp::Ordering::Equal => return Err(UnixError::Errno(EAGAIN)),
1212 cmp::Ordering::Less => return Err(UnixError::last()),
1213 cmp::Ordering::Greater => {},
1214 }
1215 },
1216 BlockingMode::Blocking => {},
1217 }
1218
1219 let result = recvmsg(fd, &mut self.msghdr, RECVMSG_FLAGS);
1220
1221 let result = match result.cmp(&0) {
1222 cmp::Ordering::Equal => Err(UnixError::ChannelClosed),
1223 cmp::Ordering::Less => Err(UnixError::last()),
1224 cmp::Ordering::Greater => Ok(result as usize),
1225 };
1226
1227 if let BlockingMode::Nonblocking = blocking_mode {
1228 if libc::fcntl(fd, libc::F_SETFL, 0) < 0 {
1229 return Err(UnixError::last());
1230 }
1231 }
1232 result
1233 }
1234
1235 unsafe fn cmsg_len(&self) -> size_t {
1236 (*(self.msghdr.msg_control as *const cmsghdr)).cmsg_len as size_t
1237 }
1238}
1239
1240fn is_socket(fd: c_int) -> bool {
1241 unsafe {
1242 let mut st = mem::MaybeUninit::uninit();
1243 if libc::fstat(fd, st.as_mut_ptr()) != 0 {
1244 return false;
1245 }
1246 (st.assume_init().st_mode & S_IFMT) == S_IFSOCK
1247 }
1248}