1use crate::ipc::IpcMessage;
11use libc::{
12 self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, MSG_EOR,
13 PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET,
14};
15use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK};
16use libc::{iovec, msghdr, off_t, recvmsg, sendmsg};
17use libc::{sa_family_t, setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t};
18use libc::{EAGAIN, EWOULDBLOCK};
19use mio::unix::SourceFd;
20use mio::{Events, Interest, Poll, Token};
21use rustc_hash::FxHasher;
22use std::cell::Cell;
23use std::cmp;
24use std::collections::HashMap;
25use std::convert::TryInto;
26use std::ffi::{c_uint, CString};
27use std::fmt::{self, Debug, Formatter};
28use std::hash::BuildHasherDefault;
29use std::io;
30use std::mem;
31use std::ops::{Deref, RangeFrom};
32use std::os::fd::RawFd;
33use std::ptr;
34use std::slice;
35use std::sync::atomic::{AtomicUsize, Ordering};
36use std::sync::{Arc, LazyLock};
37use std::thread;
38use std::time::{Duration, UNIX_EPOCH};
39use tempfile::{Builder, TempDir};
40use thiserror::Error;
41
42const MAX_FDS_IN_CMSG: u32 = 64;
43
44const RESERVED_SIZE: usize = unsafe { CMSG_SPACE(MAX_FDS_IN_CMSG * 8) } as usize;
55
56#[cfg(any(target_os = "linux", target_os = "illumos", target_os = "freebsd"))]
57const SOCK_FLAGS: c_int = libc::SOCK_CLOEXEC;
58#[cfg(not(any(target_os = "linux", target_os = "illumos", target_os = "freebsd")))]
59const SOCK_FLAGS: c_int = 0;
60
61#[cfg(any(target_os = "linux", target_os = "illumos", target_os = "freebsd"))]
62const RECVMSG_FLAGS: c_int = libc::MSG_CMSG_CLOEXEC;
63#[cfg(not(any(target_os = "linux", target_os = "illumos", target_os = "freebsd")))]
64const RECVMSG_FLAGS: c_int = 0;
65
66#[cfg(target_env = "gnu")]
67type IovLen = usize;
68
69#[cfg(not(target_env = "gnu"))]
70type IovLen = i32;
71#[cfg(target_env = "gnu")]
72type MsgControlLen = size_t;
73#[cfg(not(target_env = "gnu"))]
74type MsgControlLen = socklen_t;
75
76#[derive(Debug, Error)]
77pub enum OsTrySelectError {
78 #[error("Error in IO: {0}.")]
79 IoError(#[from] UnixError),
80 #[error("No messages were received and no disconnections occurred.")]
81 Empty,
82}
83
84unsafe fn new_sockaddr_un(path: *const c_char) -> (sockaddr_un, usize) {
85 let mut sockaddr: sockaddr_un = mem::zeroed();
86 libc::strncpy(
87 sockaddr.sun_path.as_mut_ptr(),
88 path,
89 sockaddr.sun_path.len() - 1,
90 );
91 sockaddr.sun_family = libc::AF_UNIX as sa_family_t;
92 (sockaddr, mem::size_of::<sockaddr_un>())
93}
94
95static SYSTEM_SENDBUF_SIZE: LazyLock<usize> = LazyLock::new(|| {
96 let (tx, _) = channel().expect("Failed to obtain a socket for checking maximum send size");
97 tx.get_system_sendbuf_size()
98 .expect("Failed to obtain maximum send size for socket")
99});
100
101static PID: LazyLock<u32> = LazyLock::new(std::process::id);
103
104static SHM_COUNT: AtomicUsize = AtomicUsize::new(0);
106
107pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), UnixError> {
108 let mut results = [0, 0];
109 unsafe {
110 if socketpair(
111 libc::AF_UNIX,
112 SOCK_SEQPACKET | SOCK_FLAGS,
113 0,
114 &mut results[0],
115 ) >= 0
116 {
117 Ok((
118 OsIpcSender::from_fd(results[0]),
119 OsIpcReceiver::from_fd(results[1]),
120 ))
121 } else {
122 Err(UnixError::last())
123 }
124 }
125}
126
127#[derive(Clone, Copy)]
128struct PollEntry {
129 pub id: u64,
130 pub fd: RawFd,
131}
132
133#[derive(Debug)]
134pub struct OsIpcReceiver {
135 fd: Cell<c_int>,
136}
137
138impl Drop for OsIpcReceiver {
139 fn drop(&mut self) {
140 unsafe {
141 let fd = self.fd.get();
142 if fd >= 0 {
143 let result = libc::close(fd);
144 assert!(
145 thread::panicking() || result == 0,
146 "closed receiver (fd: {}): {}",
147 fd,
148 UnixError::last(),
149 );
150 }
151 }
152 }
153}
154
155impl OsIpcReceiver {
156 fn from_fd(fd: c_int) -> OsIpcReceiver {
157 OsIpcReceiver { fd: Cell::new(fd) }
158 }
159
160 fn consume_fd(&self) -> c_int {
161 let fd = self.fd.get();
162 self.fd.set(-1);
163 fd
164 }
165
166 pub fn consume(&self) -> OsIpcReceiver {
167 OsIpcReceiver::from_fd(self.consume_fd())
168 }
169
170 #[allow(clippy::type_complexity)]
171 pub fn recv(&self) -> Result<IpcMessage, UnixError> {
172 recv(self.fd.get(), BlockingMode::Blocking)
173 }
174
175 #[allow(clippy::type_complexity)]
176 pub fn try_recv(&self) -> Result<IpcMessage, UnixError> {
177 recv(self.fd.get(), BlockingMode::Nonblocking)
178 }
179
180 #[allow(clippy::type_complexity)]
181 pub fn try_recv_timeout(&self, duration: Duration) -> Result<IpcMessage, UnixError> {
182 recv(self.fd.get(), BlockingMode::Timeout(duration))
183 }
184}
185
186#[derive(PartialEq, Debug)]
187struct SharedFileDescriptor(c_int);
188
189impl Drop for SharedFileDescriptor {
190 fn drop(&mut self) {
191 unsafe {
192 let result = libc::close(self.0);
193 assert!(thread::panicking() || result == 0);
194 }
195 }
196}
197
198#[derive(Debug, Clone)]
199pub struct OsIpcSender {
200 fd: Arc<SharedFileDescriptor>,
201}
202
203impl OsIpcSender {
204 fn from_fd(fd: c_int) -> OsIpcSender {
205 OsIpcSender {
206 fd: Arc::new(SharedFileDescriptor(fd)),
207 }
208 }
209
210 fn get_system_sendbuf_size(&self) -> Result<usize, UnixError> {
215 unsafe {
216 let mut socket_sendbuf_size: c_int = 0;
217 let mut socket_sendbuf_size_len = mem::size_of::<c_int>() as socklen_t;
218 if getsockopt(
219 self.fd.0,
220 libc::SOL_SOCKET,
221 libc::SO_SNDBUF,
222 &mut socket_sendbuf_size as *mut _ as *mut c_void,
223 &mut socket_sendbuf_size_len as *mut socklen_t,
224 ) < 0
225 {
226 return Err(UnixError::last());
227 }
228 Ok(socket_sendbuf_size.try_into().unwrap())
229 }
230 }
231
232 fn fragment_size(sendbuf_size: usize) -> usize {
240 sendbuf_size - RESERVED_SIZE
241 }
242
243 fn first_fragment_size(sendbuf_size: usize) -> usize {
247 (Self::fragment_size(sendbuf_size) - 2 * mem::size_of::<usize>()) & (!8usize + 1)
248 }
250
251 pub fn get_max_fragment_size() -> usize {
261 Self::first_fragment_size(*SYSTEM_SENDBUF_SIZE)
262 }
263
264 pub fn send(
265 &self,
266 data: &[u8],
267 channels: Vec<OsIpcChannel>,
268 shared_memory_regions: Vec<OsIpcSharedMemory>,
269 ) -> Result<(), UnixError> {
270 let mut fds = Vec::new();
271 for channel in channels.iter() {
272 fds.push(channel.fd());
273 }
274 for shared_memory_region in shared_memory_regions.iter() {
275 fds.push(shared_memory_region.store.fd());
276 }
277
278 fn send_first_fragment(
285 sender_fd: c_int,
286 fds: &[c_int],
287 data_buffer: &[u8],
288 len: usize,
289 ) -> Result<(), UnixError> {
290 assert!(fds.len() <= MAX_FDS_IN_CMSG as usize);
291
292 let mut fragment_size: usize = 0;
293 fragment_size =
294 mem::size_of_val(&fragment_size) + mem::size_of_val(&len) + data_buffer.len();
295
296 let result = unsafe {
297 let cmsg_length = mem::size_of_val(fds) as c_uint;
298 let (cmsg_buffer, cmsg_space) = if cmsg_length > 0 {
299 let cmsg_buffer =
300 libc::malloc(CMSG_SPACE(cmsg_length) as usize) as *mut cmsghdr;
301 if cmsg_buffer.is_null() {
302 return Err(UnixError::last());
303 }
304 (*cmsg_buffer).cmsg_len = CMSG_LEN(cmsg_length) as MsgControlLen;
305 (*cmsg_buffer).cmsg_level = libc::SOL_SOCKET;
306 (*cmsg_buffer).cmsg_type = libc::SCM_RIGHTS;
307
308 ptr::copy_nonoverlapping(
309 fds.as_ptr(),
310 CMSG_DATA(cmsg_buffer) as *mut c_int,
311 fds.len(),
312 );
313 (cmsg_buffer, CMSG_SPACE(cmsg_length))
314 } else {
315 (ptr::null_mut(), 0)
316 };
317
318 let mut iovec = [
319 iovec {
326 iov_base: &fragment_size as *const _ as *mut c_void,
327 iov_len: mem::size_of_val(&fragment_size),
328 },
329 iovec {
330 iov_base: &len as *const _ as *mut c_void,
331 iov_len: mem::size_of_val(&len),
332 },
333 iovec {
334 iov_base: data_buffer.as_ptr() as *mut c_void,
335 iov_len: data_buffer.len(),
336 },
337 ];
338
339 let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen);
340 let result = sendmsg(sender_fd, &msghdr, MSG_EOR);
341 libc::free(cmsg_buffer as *mut c_void);
342 result
343 };
344
345 if result > 0 {
346 assert_eq!(fragment_size, result.try_into().unwrap());
347 Ok(())
348 } else {
349 Err(UnixError::last())
350 }
351 }
352
353 fn send_followup_fragment(sender_fd: c_int, data_buffer: &[u8]) -> Result<(), UnixError> {
354 let result = unsafe {
355 libc::send(
356 sender_fd,
357 data_buffer.as_ptr() as *const c_void,
358 data_buffer.len(),
359 0,
360 )
361 };
362
363 if result > 0 {
364 assert_eq!(data_buffer.len(), result.try_into().unwrap());
365 Ok(())
366 } else {
367 Err(UnixError::last())
368 }
369 }
370
371 let mut sendbuf_size = *SYSTEM_SENDBUF_SIZE;
372
373 fn downsize(sendbuf_size: &mut usize, sent_size: usize) -> Result<(), ()> {
382 if sent_size > 2000 {
383 *sendbuf_size /= 2;
384 if *sendbuf_size >= sent_size {
386 *sendbuf_size = sent_size / 2;
387 }
388 Ok(())
389 } else {
390 Err(())
391 }
392 }
393
394 if data.len() <= Self::get_max_fragment_size() {
396 match send_first_fragment(self.fd.0, &fds[..], data, data.len()) {
397 Ok(_) => return Ok(()),
398 Err(error) => {
399 if !(matches!(error, UnixError::Errno(libc::ENOBUFS))
407 && downsize(&mut sendbuf_size, data.len()).is_ok())
408 {
409 return Err(error);
410 }
411 },
412 }
413 }
414
415 let (dedicated_tx, dedicated_rx) = channel()?;
423 fds.push(dedicated_rx.fd.get());
425
426 let mut byte_position = 0;
428 while byte_position < data.len() {
429 let end_byte_position;
430 let result = if byte_position == 0 {
431 end_byte_position = Self::first_fragment_size(sendbuf_size);
436 send_first_fragment(self.fd.0, &fds[..], &data[..end_byte_position], data.len())
437 } else {
438 end_byte_position = cmp::min(
441 byte_position + Self::fragment_size(sendbuf_size),
442 data.len(),
443 );
444 send_followup_fragment(dedicated_tx.fd.0, &data[byte_position..end_byte_position])
445 };
446
447 if let Err(error) = result {
448 if matches!(error, UnixError::Errno(libc::ENOBUFS))
449 && downsize(&mut sendbuf_size, end_byte_position - byte_position).is_ok()
450 {
451 continue;
454 } else {
455 return Err(error);
456 }
457 }
458
459 byte_position = end_byte_position;
460 }
461
462 Ok(())
463 }
464
465 pub fn connect(name: String) -> Result<OsIpcSender, UnixError> {
466 let name = CString::new(name).unwrap();
467 unsafe {
468 let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
469 let (sockaddr, len) = new_sockaddr_un(name.as_ptr());
470 if libc::connect(
471 fd,
472 &sockaddr as *const _ as *const sockaddr,
473 len as socklen_t,
474 ) < 0
475 {
476 return Err(UnixError::last());
477 }
478
479 Ok(OsIpcSender::from_fd(fd))
480 }
481 }
482}
483
484#[derive(Debug)]
485pub enum OsIpcChannel {
486 Sender(OsIpcSender),
487 Receiver(OsIpcReceiver),
488}
489
490impl OsIpcChannel {
491 fn fd(&self) -> c_int {
492 match *self {
493 OsIpcChannel::Sender(ref sender) => sender.fd.0,
494 OsIpcChannel::Receiver(ref receiver) => receiver.fd.get(),
495 }
496 }
497}
498
499pub struct OsIpcReceiverSet {
500 incrementor: RangeFrom<u64>,
501 poll: Poll,
502 pollfds: HashMap<Token, PollEntry, BuildHasherDefault<FxHasher>>,
503 events: Events,
504}
505
506impl Drop for OsIpcReceiverSet {
507 fn drop(&mut self) {
508 for &PollEntry { id: _, fd } in self.pollfds.values() {
509 let result = unsafe { libc::close(fd) };
510 assert!(thread::panicking() || result == 0);
511 }
512 }
513}
514
515impl OsIpcReceiverSet {
516 pub fn new() -> Result<OsIpcReceiverSet, UnixError> {
517 let fx = BuildHasherDefault::<FxHasher>::default();
518 Ok(OsIpcReceiverSet {
519 incrementor: 0..,
520 poll: Poll::new()?,
521 pollfds: HashMap::with_hasher(fx),
522 events: Events::with_capacity(10),
523 })
524 }
525
526 pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64, UnixError> {
527 let last_index = self.incrementor.next().unwrap();
528 let fd = receiver.consume_fd();
529 let fd_token = Token(fd as usize);
530 let poll_entry = PollEntry { id: last_index, fd };
531 self.poll
532 .registry()
533 .register(&mut SourceFd(&fd), fd_token, Interest::READABLE)?;
534 self.pollfds.insert(fd_token, poll_entry);
535 Ok(last_index)
536 }
537
538 pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>, UnixError> {
539 loop {
541 match self.poll.poll(&mut self.events, None) {
542 Ok(()) if !self.events.is_empty() => break,
543 Ok(()) => {},
544 Err(ref error) => {
545 if error.kind() != io::ErrorKind::Interrupted {
546 return Err(UnixError::last());
547 }
548 },
549 }
550
551 if !self.events.is_empty() {
552 break;
553 }
554 }
555
556 match self.selection_results() {
557 Ok(v) => Ok(v),
558 Err(OsTrySelectError::Empty) => panic!("Blocking select cannot return Empty"),
559 Err(OsTrySelectError::IoError(e)) => Err(e),
560 }
561 }
562
563 pub fn try_select(&mut self) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
564 self.try_select_timeout(Duration::ZERO)
566 }
567
568 pub fn try_select_timeout(
569 &mut self,
570 duration: Duration,
571 ) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
572 match self.poll.poll(&mut self.events, Some(duration)) {
574 Ok(()) => (),
575 Err(ref error) => {
576 if error.kind() != io::ErrorKind::Interrupted {
577 return Err(OsTrySelectError::IoError(UnixError::last()));
578 }
579 },
580 }
581
582 let v = self.selection_results()?;
583 if v.is_empty() {
584 Err(OsTrySelectError::Empty)
585 } else {
586 Ok(v)
587 }
588 }
589
590 fn selection_results(&mut self) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
591 let mut selection_results = Vec::new();
592 for event in self.events.iter() {
593 assert!(event.is_readable());
595
596 let event_token = event.token();
597 let poll_entry = *self
598 .pollfds
599 .get(&event_token)
600 .expect("Got event for unknown token.");
601 loop {
602 match recv(poll_entry.fd, BlockingMode::Nonblocking) {
603 Ok(ipc_message) => {
604 selection_results.push(OsIpcSelectionResult::DataReceived(
605 poll_entry.id,
606 ipc_message,
607 ));
608 },
609 Err(err) if err.channel_is_closed() => {
610 self.pollfds.remove(&event_token).unwrap();
611 self.poll
612 .registry()
613 .deregister(&mut SourceFd(&poll_entry.fd))
614 .unwrap();
615 unsafe {
616 libc::close(poll_entry.fd);
617 }
618
619 selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id));
620 break;
621 },
622 Err(UnixError::Errno(code)) if code == EWOULDBLOCK => {
623 break;
627 },
628 Err(e) => return Err(OsTrySelectError::IoError(e)),
629 }
630 }
631 }
632
633 Ok(selection_results)
634 }
635}
636
637pub enum OsIpcSelectionResult {
638 DataReceived(u64, IpcMessage),
639 ChannelClosed(u64),
640}
641
642impl OsIpcSelectionResult {
643 pub fn unwrap(self) -> (u64, IpcMessage) {
644 match self {
645 OsIpcSelectionResult::DataReceived(id, ipc_message) => (id, ipc_message),
646 OsIpcSelectionResult::ChannelClosed(id) => {
647 panic!("OsIpcSelectionResult::unwrap(): receiver ID {id} was closed!")
648 },
649 }
650 }
651}
652
653#[derive(Debug)]
654pub struct OsOpaqueIpcChannel {
655 fd: c_int,
656}
657
658impl Drop for OsOpaqueIpcChannel {
659 fn drop(&mut self) {
660 debug_assert!(self.fd == -1);
666 }
667}
668
669impl OsOpaqueIpcChannel {
670 fn from_fd(fd: c_int) -> OsOpaqueIpcChannel {
671 OsOpaqueIpcChannel { fd }
672 }
673
674 pub fn to_sender(&mut self) -> OsIpcSender {
675 OsIpcSender::from_fd(mem::replace(&mut self.fd, -1))
676 }
677
678 pub fn to_receiver(&mut self) -> OsIpcReceiver {
679 OsIpcReceiver::from_fd(mem::replace(&mut self.fd, -1))
680 }
681}
682
683pub struct OsIpcOneShotServer {
684 fd: c_int,
685
686 _temp_dir: TempDir,
690}
691
692impl Drop for OsIpcOneShotServer {
693 fn drop(&mut self) {
694 unsafe {
695 let result = libc::close(self.fd);
696 assert!(thread::panicking() || result == 0);
697 }
698 }
699}
700
701impl OsIpcOneShotServer {
702 pub fn new() -> Result<(OsIpcOneShotServer, String), UnixError> {
703 unsafe {
704 let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
705 let temp_dir = Builder::new().tempdir()?;
706 let socket_path = temp_dir.path().join("socket");
707 let path_string = socket_path.to_str().unwrap();
708
709 let path_c_string = CString::new(path_string).unwrap();
710 let (sockaddr, len) = new_sockaddr_un(path_c_string.as_ptr());
711 if libc::bind(
712 fd,
713 &sockaddr as *const _ as *const sockaddr,
714 len as socklen_t,
715 ) != 0
716 {
717 return Err(UnixError::last());
718 }
719
720 if libc::listen(fd, 10) != 0 {
721 return Err(UnixError::last());
722 }
723
724 Ok((
725 OsIpcOneShotServer {
726 fd,
727 _temp_dir: temp_dir,
728 },
729 path_string.to_string(),
730 ))
731 }
732 }
733
734 #[allow(clippy::type_complexity)]
735 pub fn accept(self) -> Result<(OsIpcReceiver, IpcMessage), UnixError> {
736 unsafe {
737 let sockaddr: *mut sockaddr = ptr::null_mut();
738 let sockaddr_len: *mut socklen_t = ptr::null_mut();
739 let client_fd = libc::accept(self.fd, sockaddr, sockaddr_len);
740 if client_fd < 0 {
741 return Err(UnixError::last());
742 }
743 make_socket_lingering(client_fd)?;
744
745 let receiver = OsIpcReceiver::from_fd(client_fd);
746 let ipc_message = receiver.recv()?;
747 Ok((receiver, ipc_message))
748 }
749 }
750}
751
752fn make_socket_lingering(sockfd: c_int) -> Result<(), UnixError> {
757 let linger = linger {
758 l_onoff: 1,
759 l_linger: 30,
760 };
761 let err = unsafe {
762 setsockopt(
763 sockfd,
764 SOL_SOCKET,
765 SO_LINGER,
766 &linger as *const _ as *const c_void,
767 mem::size_of::<linger>() as socklen_t,
768 )
769 };
770 if err < 0 {
771 let error = UnixError::last();
772 if let UnixError::Errno(libc::EINVAL) = error {
773 } else {
796 return Err(error);
797 }
798 }
799 Ok(())
800}
801
802struct BackingStore {
803 fd: c_int,
804}
805
806impl BackingStore {
807 pub fn new(length: usize) -> BackingStore {
808 let count = SHM_COUNT.fetch_add(1, Ordering::Relaxed);
809 let timestamp = UNIX_EPOCH.elapsed().unwrap();
810 let name = CString::new(format!(
811 "/ipc-channel-shared-memory.{}.{}.{}.{}",
812 count,
813 *PID,
814 timestamp.as_secs(),
815 timestamp.subsec_nanos()
816 ))
817 .unwrap();
818 let fd = create_shmem(name, length);
819 Self::from_fd(fd)
820 }
821
822 pub fn from_fd(fd: c_int) -> BackingStore {
823 BackingStore { fd }
824 }
825
826 pub fn fd(&self) -> c_int {
827 self.fd
828 }
829
830 pub unsafe fn map_file(&self, length: Option<size_t>) -> (*mut u8, size_t) {
831 let length = length.unwrap_or_else(|| {
832 let mut st = mem::MaybeUninit::uninit();
833 if libc::fstat(self.fd, st.as_mut_ptr()) != 0 {
834 panic!("error stating fd {}: {}", self.fd, UnixError::last());
835 }
836 st.assume_init().st_size as size_t
837 });
838 if length == 0 {
839 return (ptr::null_mut(), length);
841 }
842 let address = libc::mmap(
843 ptr::null_mut(),
844 length,
845 PROT_READ | PROT_WRITE,
846 MAP_SHARED,
847 self.fd,
848 0,
849 );
850 assert!(!address.is_null());
851 assert!(address != MAP_FAILED);
852 (address as *mut u8, length)
853 }
854}
855
856impl Drop for BackingStore {
857 fn drop(&mut self) {
858 unsafe {
859 let result = libc::close(self.fd);
860 assert!(thread::panicking() || result == 0);
861 }
862 }
863}
864
865pub struct OsIpcSharedMemory {
866 ptr: *mut u8,
867 length: usize,
868 store: BackingStore,
869}
870
871unsafe impl Send for OsIpcSharedMemory {}
872unsafe impl Sync for OsIpcSharedMemory {}
873
874impl Drop for OsIpcSharedMemory {
875 fn drop(&mut self) {
876 unsafe {
877 if !self.ptr.is_null() {
878 let result = libc::munmap(self.ptr as *mut c_void, self.length);
879 assert!(thread::panicking() || result == 0);
880 }
881 }
882 }
883}
884
885impl Clone for OsIpcSharedMemory {
886 fn clone(&self) -> OsIpcSharedMemory {
887 unsafe {
888 let store = BackingStore::from_fd(libc::dup(self.store.fd()));
889 let (address, _) = store.map_file(Some(self.length));
890 OsIpcSharedMemory::from_raw_parts(address, self.length, store)
891 }
892 }
893}
894
895impl PartialEq for OsIpcSharedMemory {
896 fn eq(&self, other: &OsIpcSharedMemory) -> bool {
897 **self == **other
898 }
899}
900
901impl Debug for OsIpcSharedMemory {
902 fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
903 (**self).fmt(formatter)
904 }
905}
906
907impl Deref for OsIpcSharedMemory {
908 type Target = [u8];
909
910 #[inline]
911 fn deref(&self) -> &[u8] {
912 unsafe { slice::from_raw_parts(self.ptr, self.length) }
913 }
914}
915
916impl OsIpcSharedMemory {
917 #[inline]
923 pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
924 unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
925 }
926
927 pub fn take(self) -> Option<Vec<u8>> {
928 Some((*self).to_vec())
929 }
930}
931
932impl OsIpcSharedMemory {
933 unsafe fn from_raw_parts(
934 ptr: *mut u8,
935 length: usize,
936 store: BackingStore,
937 ) -> OsIpcSharedMemory {
938 OsIpcSharedMemory { ptr, length, store }
939 }
940
941 unsafe fn from_fd(fd: c_int) -> OsIpcSharedMemory {
942 let store = BackingStore::from_fd(fd);
943 let (ptr, length) = store.map_file(None);
944 OsIpcSharedMemory::from_raw_parts(ptr, length, store)
945 }
946
947 pub fn from_byte(byte: u8, length: usize) -> OsIpcSharedMemory {
948 unsafe {
949 let store = BackingStore::new(length);
950 let (address, _) = store.map_file(Some(length));
951 for element in slice::from_raw_parts_mut(address, length) {
952 *element = byte;
953 }
954 OsIpcSharedMemory::from_raw_parts(address, length, store)
955 }
956 }
957
958 pub fn from_bytes(bytes: &[u8]) -> OsIpcSharedMemory {
959 unsafe {
960 let store = BackingStore::new(bytes.len());
961 let (address, _) = store.map_file(Some(bytes.len()));
962 ptr::copy_nonoverlapping(bytes.as_ptr(), address, bytes.len());
963 OsIpcSharedMemory::from_raw_parts(address, bytes.len(), store)
964 }
965 }
966}
967
968#[derive(Debug, Error)]
969pub enum UnixError {
970 Errno(c_int),
971 ChannelClosed,
972 IoError(io::Error),
973}
974
975impl UnixError {
976 fn last() -> UnixError {
977 UnixError::Errno(io::Error::last_os_error().raw_os_error().unwrap())
978 }
979
980 #[allow(dead_code)]
981 pub fn channel_is_closed(&self) -> bool {
982 matches!(self, UnixError::ChannelClosed)
983 }
984}
985
986impl From<UnixError> for crate::IpcError {
987 fn from(error: UnixError) -> Self {
988 match error {
989 UnixError::ChannelClosed => crate::IpcError::Disconnected,
990 e => crate::IpcError::Io(io::Error::from(e)),
991 }
992 }
993}
994
995impl fmt::Display for UnixError {
996 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
997 match self {
998 UnixError::Errno(errno) => {
999 fmt::Display::fmt(&io::Error::from_raw_os_error(*errno), fmt)
1000 },
1001 UnixError::ChannelClosed => write!(fmt, "All senders for this socket closed"),
1002 UnixError::IoError(e) => write!(fmt, "{e}"),
1003 }
1004 }
1005}
1006
1007impl From<UnixError> for io::Error {
1008 fn from(unix_error: UnixError) -> io::Error {
1009 match unix_error {
1010 UnixError::Errno(errno) => io::Error::from_raw_os_error(errno),
1011 UnixError::ChannelClosed => io::Error::new(io::ErrorKind::ConnectionReset, unix_error),
1012 UnixError::IoError(e) => e,
1013 }
1014 }
1015}
1016
1017impl From<UnixError> for crate::TryRecvError {
1018 fn from(error: UnixError) -> Self {
1019 match error {
1020 UnixError::ChannelClosed => {
1021 crate::TryRecvError::IpcError(crate::IpcError::Disconnected)
1022 },
1023 UnixError::Errno(code) if code == EAGAIN || code == EWOULDBLOCK => {
1024 crate::TryRecvError::Empty
1025 },
1026 e => crate::TryRecvError::IpcError(crate::IpcError::Io(io::Error::from(e))),
1027 }
1028 }
1029}
1030
1031impl From<io::Error> for UnixError {
1032 fn from(e: io::Error) -> UnixError {
1033 if let Some(errno) = e.raw_os_error() {
1034 UnixError::Errno(errno)
1035 } else {
1036 assert!(e.kind() == io::ErrorKind::ConnectionReset);
1037 UnixError::ChannelClosed
1038 }
1039 }
1040}
1041
1042#[derive(Copy, Clone)]
1043enum BlockingMode {
1044 Blocking,
1045 Nonblocking,
1046 Timeout(Duration),
1047}
1048
1049#[allow(clippy::uninit_vec, clippy::type_complexity)]
1050fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result<IpcMessage, UnixError> {
1051 let (mut channels, mut shared_memory_regions) = (Vec::new(), Vec::new());
1052
1053 let mut total_size = 0usize;
1058 let mut main_data_buffer;
1059 unsafe {
1060 main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size());
1062 main_data_buffer.set_len(OsIpcSender::get_max_fragment_size());
1063
1064 let mut fragment_size: usize = 0;
1065
1066 let mut bytes_read: usize = 0;
1067
1068 let mut blocking_mode = blocking_mode;
1069
1070 loop {
1071 let mut iovec = [
1072 iovec {
1073 iov_base: &mut fragment_size as *mut _ as *mut c_void,
1074 iov_len: mem::size_of_val(&fragment_size),
1075 },
1076 iovec {
1077 iov_base: &mut total_size as *mut _ as *mut c_void,
1078 iov_len: mem::size_of_val(&total_size),
1079 },
1080 iovec {
1081 iov_base: main_data_buffer.as_mut_ptr() as *mut c_void,
1082 iov_len: main_data_buffer.len(),
1083 },
1084 ];
1085
1086 let mut iovec: &mut [iovec] = &mut iovec;
1087
1088 let mut offset = bytes_read;
1089 while offset > 0 {
1090 let change = offset.min(iovec[0].iov_len);
1091 iovec[0].iov_base = iovec[0].iov_base.offset(change.try_into().unwrap());
1092 iovec[0].iov_len -= change;
1093 offset -= change;
1094
1095 if iovec[0].iov_len == 0 {
1096 iovec = &mut iovec[1..];
1097 }
1098 }
1099
1100 let mut cmsg = UnixCmsg::new(iovec)?;
1101
1102 bytes_read += cmsg.recv(fd, blocking_mode).inspect_err(|_| {
1103 assert_eq!(bytes_read, 0);
1104 })?;
1105
1106 let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int;
1107 let cmsg_length = cmsg.msghdr.msg_controllen;
1108 let channel_length = if cmsg_length == 0 {
1109 0
1110 } else {
1111 (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::<c_int>()
1115 };
1116 for index in 0..channel_length {
1117 let fd = *cmsg_fds.add(index);
1118 if is_socket(fd) {
1119 channels.push(OsOpaqueIpcChannel::from_fd(fd));
1120 continue;
1121 }
1122 shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd));
1123 }
1124
1125 if bytes_read >= mem::size_of_val(&fragment_size) && bytes_read == fragment_size {
1126 break;
1127 }
1128
1129 blocking_mode = BlockingMode::Blocking;
1131 }
1132
1133 main_data_buffer
1134 .set_len(bytes_read - mem::size_of_val(&fragment_size) - mem::size_of_val(&total_size));
1135 }
1136
1137 if total_size == main_data_buffer.len() {
1138 return Ok(IpcMessage::new(
1140 main_data_buffer,
1141 channels,
1142 shared_memory_regions,
1143 ));
1144 }
1145
1146 let dedicated_rx = channels.pop().unwrap().to_receiver();
1151
1152 let len = main_data_buffer.len();
1154 main_data_buffer.reserve_exact(total_size - len);
1155
1156 while main_data_buffer.len() < total_size {
1158 let write_pos = main_data_buffer.len();
1159 let end_pos = cmp::min(
1160 write_pos + OsIpcSender::fragment_size(*SYSTEM_SENDBUF_SIZE),
1161 total_size,
1162 );
1163 let result = unsafe {
1164 assert!(end_pos <= main_data_buffer.capacity());
1165 main_data_buffer.set_len(end_pos);
1166
1167 assert!(end_pos >= write_pos);
1169
1170 let result = libc::recv(
1174 dedicated_rx.fd.get(),
1175 main_data_buffer[write_pos..].as_mut_ptr() as *mut c_void,
1176 end_pos - write_pos,
1177 0,
1178 );
1179 main_data_buffer.set_len(write_pos + cmp::max(result, 0) as usize);
1180 result
1181 };
1182
1183 match result.cmp(&0) {
1184 cmp::Ordering::Greater => continue,
1185 cmp::Ordering::Equal => return Err(UnixError::ChannelClosed),
1186 cmp::Ordering::Less => return Err(UnixError::last()),
1187 }
1188 }
1189
1190 Ok(IpcMessage::new(
1191 main_data_buffer,
1192 channels,
1193 shared_memory_regions,
1194 ))
1195}
1196
1197fn new_msghdr(iovec: &mut [iovec], cmsg_buffer: *mut cmsghdr, cmsg_space: MsgControlLen) -> msghdr {
1199 let mut msghdr: msghdr = unsafe { mem::zeroed() };
1200 msghdr.msg_name = ptr::null_mut();
1201 msghdr.msg_namelen = 0;
1202 msghdr.msg_iov = iovec.as_mut_ptr();
1203 msghdr.msg_iovlen = iovec.len() as IovLen;
1204 msghdr.msg_control = cmsg_buffer as *mut c_void;
1205 msghdr.msg_controllen = cmsg_space;
1206 msghdr.msg_flags = 0;
1207 msghdr
1208}
1209
1210fn create_shmem(name: CString, length: usize) -> c_int {
1211 unsafe {
1212 let fd = libc::memfd_create(name.as_ptr(), libc::MFD_CLOEXEC);
1213 assert!(fd >= 0);
1214 assert_eq!(libc::ftruncate(fd, length as off_t), 0);
1215 fd
1216 }
1217}
1218
1219struct UnixCmsg {
1220 cmsg_buffer: *mut cmsghdr,
1221 msghdr: msghdr,
1222}
1223
1224unsafe impl Send for UnixCmsg {}
1225
1226impl Drop for UnixCmsg {
1227 fn drop(&mut self) {
1228 unsafe {
1229 libc::free(self.cmsg_buffer as *mut c_void);
1230 }
1231 }
1232}
1233
1234impl UnixCmsg {
1235 unsafe fn new(iovec: &mut [iovec]) -> Result<UnixCmsg, UnixError> {
1236 let cmsg_length = CMSG_SPACE(MAX_FDS_IN_CMSG * (mem::size_of::<c_int>() as c_uint));
1237 let cmsg_buffer = libc::malloc(cmsg_length as usize) as *mut cmsghdr;
1238 if cmsg_buffer.is_null() {
1239 return Err(UnixError::last());
1240 }
1241 Ok(UnixCmsg {
1242 cmsg_buffer,
1243 msghdr: new_msghdr(iovec, cmsg_buffer, cmsg_length as MsgControlLen),
1244 })
1245 }
1246
1247 unsafe fn recv(&mut self, fd: c_int, blocking_mode: BlockingMode) -> Result<usize, UnixError> {
1248 match blocking_mode {
1249 BlockingMode::Nonblocking => {
1250 if libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) < 0 {
1251 return Err(UnixError::last());
1252 }
1253 },
1254 BlockingMode::Timeout(duration) => {
1255 let events = libc::POLLIN | libc::POLLPRI | libc::POLLRDHUP;
1256
1257 let mut fd = [libc::pollfd {
1258 fd,
1259 events,
1260 revents: 0,
1261 }];
1262 let result = libc::poll(
1263 fd.as_mut_ptr(),
1264 fd.len() as _,
1265 duration.as_millis().try_into().unwrap_or(-1),
1266 );
1267
1268 match result.cmp(&0) {
1269 cmp::Ordering::Equal => return Err(UnixError::Errno(EAGAIN)),
1270 cmp::Ordering::Less => return Err(UnixError::last()),
1271 cmp::Ordering::Greater => {},
1272 }
1273 },
1274 BlockingMode::Blocking => {},
1275 }
1276
1277 let result = recvmsg(fd, &mut self.msghdr, RECVMSG_FLAGS);
1278
1279 let result = match result.cmp(&0) {
1280 cmp::Ordering::Equal => Err(UnixError::ChannelClosed),
1281 cmp::Ordering::Less => Err(UnixError::last()),
1282 cmp::Ordering::Greater => Ok(result as usize),
1283 };
1284
1285 if let BlockingMode::Nonblocking = blocking_mode {
1286 if libc::fcntl(fd, libc::F_SETFL, 0) < 0 {
1287 return Err(UnixError::last());
1288 }
1289 }
1290 result
1291 }
1292
1293 unsafe fn cmsg_len(&self) -> size_t {
1294 (*(self.msghdr.msg_control as *const cmsghdr)).cmsg_len as size_t
1295 }
1296}
1297
1298fn is_socket(fd: c_int) -> bool {
1299 unsafe {
1300 let mut st = mem::MaybeUninit::uninit();
1301 if libc::fstat(fd, st.as_mut_ptr()) != 0 {
1302 return false;
1303 }
1304 (st.assume_init().st_mode & S_IFMT) == S_IFSOCK
1305 }
1306}